Deep Learning at Alibaba Cloud with Alluxio – Running PyTorch on HDFS

Google’s TensorFlow and Facebook’s PyTorch are two Deep Learning frameworks that have been popular with the open source community. Although PyTorch is still a relatively new framework, many developers have successfully adopted it due to its ease of use. 

By default, PyTorch does not support Deep Learning model training directly in HDFS, which brings challenges to users who store data sets in HDFS. These users need to either export HDFS data at the start of each training job or modify the source code of PyTorch to support reading from HDFS. Both approaches are not ideal because they require additional manual work that may introduce additional uncertainties to the training job.

To avoid this problem, we choose to use Alluxio as an interface to access  HDFS via a POSIX FileSystem interface. This approach greatly improved development efficiency at Alibaba Cloud. 

This article demonstrates how this work was achieved within a Kubernetes environment.

Prepare HDFS 2.7.2 environment

For this tutorial, we used a Helm Chart to install HDFS to mock an existing HDFS cluster.

1. Install the helm chart of Hadoop 2.7.2

git clone https://github.com/cheyang/kubernetes-HDFS.git

kubectl label nodes cn-huhehaote.192.168.0.117 hdfs-namenode-selector=hdfs-namenode-0
#helm install -f values.yaml hdfs charts/hdfs-k8s
helm dependency build charts/hdfs-k8s
helm install hdfs charts/hdfs-k8s \
      --set tags.ha=false  \
      --set tags.simple=true  \
      --set global.namenodeHAEnabled=false  \
      --set hdfs-simple-namenode-k8s.nodeSelector.hdfs-namenode-selector=hdfs-namenode-0

2. Check the status of the helm chart

kubectl get all -l release=hdfs

3. Client access hdfs

kubectl exec -it hdfs-client-f5bc448dd-rc28d bash
root@hdfs-client-f5bc448dd-rc28d:/# hdfs dfsadmin -report
Configured Capacity: 422481862656 (393.47 GB)
Present Capacity: 355748564992 (331.32 GB)
DFS Remaining: 355748515840 (331.32 GB)
DFS Used: 49152 (48 KB)
DFS Used%: 0.00%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0

-------------------------------------------------
Live datanodes (2):

Name: 172.31.136.180:50010 (172-31-136-180.node-exporter.arms-prom.svc.cluster.local)
Hostname: iZj6c7rzs9xaeczn47omzcZ
Decommission Status : Normal
Configured Capacity: 211240931328 (196.73 GB)
DFS Used: 24576 (24 KB)
Non DFS Used: 32051716096 (29.85 GB)
DFS Remaining: 179189190656 (166.88 GB)
DFS Used%: 0.00%
DFS Remaining%: 84.83%
Configured Cache Capacity: 0 (0 B)
Cache Used: 0 (0 B)
Cache Remaining: 0 (0 B)
Cache Used%: 100.00%
Cache Remaining%: 0.00%
Xceivers: 1
Last contact: Tue Mar 31 16:48:52 UTC 2020

4. HDFS client configuration

[root@iZj6c61fdnjcrcrc2sevsfZ kubernetes-HDFS]# kubectl exec -it hdfs-client-f5bc448dd-rc28d bash
root@hdfs-client-f5bc448dd-rc28d:/# cat /etc/hadoop-custom-conf
cat: /etc/hadoop-custom-conf: Is a directory
root@hdfs-client-f5bc448dd-rc28d:/# cd /etc/hadoop-custom-conf
root@hdfs-client-f5bc448dd-rc28d:/etc/hadoop-custom-conf# ls
core-site.xml  hdfs-site.xml
root@hdfs-client-f5bc448dd-rc28d:/etc/hadoop-custom-conf# cat core-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://hdfs-namenode-0.hdfs-namenode.default.svc.cluster.local:8020</value>
  </property>
</configuration>
root@hdfs-client-f5bc448dd-rc28d:/etc/hadoop-custom-conf# cat hdfs-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>dfs.namenode.name.dir</name>
    <value>file:///hadoop/dfs/name</value>
  </property>
  <property>
    <name>dfs.namenode.datanode.registration.ip-hostname-check</name>
    <value>false</value>
  </property>
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>/hadoop/dfs/data/0</value>
  </property>
</configuration>
root@hdfs-client-f5bc448dd-rc28d:/etc/hadoop-custom-conf# hadoop --version
Error: No command named `--version' was found. Perhaps you meant `hadoop -version'
root@hdfs-client-f5bc448dd-rc28d:/etc/hadoop-custom-conf# hadoop -version
Error: No command named `-version' was found. Perhaps you meant `hadoop version'
root@hdfs-client-f5bc448dd-rc28d:/etc/hadoop-custom-conf# hadoop version
Hadoop 2.7.2
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r b165c4fe8a74265c792ce23f546c64604acf0e41
Compiled by jenkins on 2016-01-26T00:08Z
Compiled with protoc 2.5.0
From source with checksum d0fda26633fa762bff87ec759ebe689c
This command was run using /opt/hadoop-2.7.2/share/hadoop/common/hadoop-common-2.7.2.jar


5. Experimental HDFS basic file operations

# hdfs dfs -ls /
Found 1 items
drwxr-xr-x   - root supergroup          0 2020-03-31 16:51 /test
# hdfs dfs -mkdir /mytest
# hdfs dfs -copyFromLocal /etc/hadoop/hadoop-env.cmd /test/
# hdfs dfs -ls /test
Found 2 items
-rw-r--r--   3 root supergroup       3670 2020-04-20 08:51 /test/hadoop-env.cmd

6. Download data

mkdir -p /data/MNIST/raw/
cd /data/MNIST/raw/
wget http://kubeflow.oss-cn-beijing.aliyuncs.com/mnist/train-images-idx3-ubyte.gz
wget http://kubeflow.oss-cn-beijing.aliyuncs.com/mnist/train-labels-idx1-ubyte.gz
wget http://kubeflow.oss-cn-beijing.aliyuncs.com/mnist/t10k-images-idx3-ubyte.gz
wget http://kubeflow.oss-cn-beijing.aliyuncs.com/mnist/t10k-labels-idx1-ubyte.gz
hdfs dfs -mkdir -p /data/MNIST/raw
hdfs dfs -copyFromLocal *.gz /data/MNIST/raw

Deploy Alluxio

1. First select the designated node, which can be one or more

kubectl label nodes cn-huhehaote.192.168.0.117 dataset=mnist

2. Create config.yaml, in which you need to configure the node selector to specify the node

cat << EOF > config.yaml
image: registry.cn-huhehaote.aliyuncs.com/alluxio/alluxio
imageTag: "2.2.0-SNAPSHOT-b2c7e50"
nodeSelector:
    dataset: mnist
properties:
    alluxio.fuse.debug.enabled: "false"
    alluxio.user.file.writetype.default: MUST_CACHE
    alluxio.master.journal.folder: /journal
    alluxio.master.journal.type: UFS
    alluxio.master.mount.table.root.ufs: "hdfs://hdfs-namenode-0.hdfs-namenode.default.svc.cluster.local:8020"
worker:
    jvmOptions: " -Xmx4G "
master:
    jvmOptions: " -Xmx4G "
tieredstore:
  levels:
  - alias: MEM
    level: 0
    quota: 20GB
    type: hostPath
    path: /dev/shm
    high: 0.99
    low: 0.8
fuse:
  image: registry.cn-huhehaote.aliyuncs.com/alluxio/alluxio-fuse
  imageTag: "2.2.0-SNAPSHOT-b2c7e50"
  jvmOptions: " -Xmx4G -Xms4G "
  args:
    - fuse
    - --fuse-opts=direct_io
EOF

It should be noted that theHDFS version needs to be specified at compilation time. 

3. Deploy Alluxio

wget http://kubeflow.oss-cn-beijing.aliyuncs.com/alluxio-0.12.0.tgz
tar -xvf alluxio-0.12.0.tgz
helm install alluxio -f config.yaml alluxio

4. Check the status of Alluxio, wait until all components are ready

helm get manifest alluxio | kubectl get -f -
NAME                     TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                                   AGE
service/alluxio-master   ClusterIP   None         <none>        19998/TCP,19999/TCP,20001/TCP,20002/TCP   14h

NAME                            DESIRED   CURRENT   READY   UP-TO-DATE   AVAILABLE   NODE SELECTOR   AGE
daemonset.apps/alluxio-fuse     4         4         4       4            4           <none>          14h
NAME                            DESIRED   CURRENT   READY   UP-TO-DATE   AVAILABLE   NODE SELECTOR   AGE
daemonset.apps/alluxio-worker   4         4         4       4            4           <none>          14h

NAME                              READY   AGE
statefulset.apps/alluxio-master   1/1     14h

Prepare PyTorch container image

1. Create a Dockerfile

mkdir pytorch-mnist
cd pytorch-mnist
vim Dockerfile

Populate the Dockerfile with the following content:

FROM pytorch/pytorch:1.4-cuda10.1-cudnn7-devel

# pytorch/pytorch:1.4-cuda10.1-cudnn7-devel

ADD mnist.py /

CMD ["python", "/mnist.py"]

2. Create a PyTorch python file called mnist.py

cd pytorch-mnist
vim mnist.py

Populate the python file with the following content:

# -*- coding: utf-8 -*-
# @Author: cheyang
# @Date:   2020-04-18 22:41:12
# @Last Modified by:   cheyang
# @Last Modified time: 2020-04-18 22:44:06
from __future__ import print_function
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output


def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))


def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output,
                                    target,
                                    reduction='sum').item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))


def main():
    # Training settings
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int,
                        default=1000,
                        metavar='N',
                        help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=14, metavar='N',
                        help='number of epochs to train (default: 14)')
    parser.add_argument('--lr', type=float, default=1.0, metavar='LR',
                        help='learning rate (default: 1.0)')
    parser.add_argument('--gamma', type=float, default=0.7, metavar='M',
                        help='Learning rate step gamma (default: 0.7)')
    parser.add_argument('--no-cuda', action='store_true', default=False,
                        help='disables CUDA training')
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                        help='random seed (default: 1)')
    parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                        help='how many batches to wait before logging training status')

    parser.add_argument('--save-model', action='store_true', default=False,
                        help='For Saving the current Model')
    args = parser.parse_args()
    use_cuda = not args.no_cuda and torch.cuda.is_available()

    torch.manual_seed(args.seed)

    device = torch.device("cuda" if use_cuda else "cpu")

    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
    train_loader = torch.utils.data.DataLoader(
        datasets.MNIST('../data', train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=args.batch_size, shuffle=True, **kwargs)
    test_loader = torch.utils.data.DataLoader(
        datasets.MNIST('../data', train=False, transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=args.test_batch_size, shuffle=True, **kwargs)

    model = Net().to(device)
    optimizer = optim.Adadelta(model.parameters(), lr=args.lr)

    scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
    for epoch in range(1, args.epochs + 1):
        train(args, model, device, train_loader, optimizer, epoch)
        test(model, device, test_loader)
        scheduler.step()

    if args.save_model:
        torch.save(model.state_dict(), "mnist_cnn.pt")


if __name__ == '__main__':
    main()

3. Build the image

Build a custom image under the same level of the directory, the target container image in this example is registry.cn-shanghai.aliyuncs.com/tensorflow-samples/mnist:pytorch-1.4-cuda10.1-cudnn7-devel

docker build -t \
 registry.cn-shanghai.aliyuncs.com/tensorflow-samples/mnist:pytorch-1.4-cuda10.1-cudnn7-devel .

4. Push the built mirror

registry.cn-shanghai.aliyuncs.com/tensorflow-samples/mnist:pytorch-1.4-cuda10.1-cudnn7-devel to the mirror warehouse created in the East China 1 area for users who are in the Greater China area (Alibaba Cloud). You can refer to the basic operation of mirroring.

Submit PyTorch training tasks

1. Install arena

$ wget http://kubeflow.oss-cn-beijing.aliyuncs.com/arena-installer-0.3.3-332fcde-linux-amd64.tar.gz
$ tar -xvf arena-installer-0.3.3-332fcde-linux-amd64.tar.gz
$ cd arena-installer/
$ ./install.
$ yum install bash-completion -y
$ echo "source <(arena completion bash)" >> ~/.bashrc
$ chmod u+x ~/.bashrc

2. Use arena to submit training tasks, remember to choose selector as dataset=mnist

arena submit tf \
             --name=alluxio-pytorch \
             --selector=dataset=mnist \
             --data-dir=/alluxio-fuse/data:/data \
             --gpus=1 \
             --image=registry.cn-shanghai.aliyuncs.com/tensorflow-samples/mnist:pytorch-1.4-cuda10.1-cudnn7-devel \

3. And view the training log through arena

# arena logs --tail=20 alluxio-pytorch
Train Epoch: 12 [49280/60000 (82%)] Loss: 0.021669
Train Epoch: 12 [49920/60000 (83%)] Loss: 0.008180
Train Epoch: 12 [50560/60000 (84%)] Loss: 0.009288
Train Epoch: 12 [51200/60000 (85%)] Loss: 0.035657
Train Epoch: 12 [51840/60000 (86%)] Loss: 0.006190
Train Epoch: 12 [52480/60000 (87%)] Loss: 0.007776
Train Epoch: 12 [53120/60000 (88%)] Loss: 0.001990
Train Epoch: 12 [53760/60000 (90%)] Loss: 0.003609
Train Epoch: 12 [54400/60000 (91%)] Loss: 0.001943
Train Epoch: 12 [55040/60000 (92%)] Loss: 0.078825
Train Epoch: 12 [55680/60000 (93%)] Loss: 0.000925
Train Epoch: 12 [56320/60000 (94%)] Loss: 0.018071
Train Epoch: 12 [56960/60000 (95%)] Loss: 0.031451
Train Epoch: 12 [57600/60000 (96%)] Loss: 0.031353
Train Epoch: 12 [58240/60000 (97%)] Loss: 0.075761
Train Epoch: 12 [58880/60000 (98%)] Loss: 0.003975
Train Epoch: 12 [59520/60000 (99%)] Loss: 0.085389

Test set: Average loss: 0.0256, Accuracy: 9921/10000 (99%)

Summary

Previously, running the PyTorch program required users to modify the PyTorch adapter code to be able to access data in HDFS. Using  Alluxio, we were able to quickly develop and train models without any additional work to modify PyTorch code or manually copy HDFS data. This approach is further simplified by setting up the entire environment within Kubernetes.