Giter Site home page Giter Site logo

pytorch-operator's Introduction

Kubernetes Custom Resource and Operator for PyTorch jobs

⚠️ kubeflow/pytorch-operator is not maintained

This operator has been merged into Kubeflow Training Operator. This repository is not maintained and has been archived.

Build Status Go Report Card

Overview

This repository contains the specification and implementation of PyTorchJob custom resource definition. Using this custom resource, users can create and manage PyTorch jobs like other built-in resources in Kubernetes. See CRD definition

Prerequisites

Installing PyTorch Operator

Please refer to the installation instructions in the Kubeflow user guide. This installs pytorchjob CRD and pytorch-operator controller to manage the lifecycle of PyTorch jobs.

Creating a PyTorch Job

You can create PyTorch Job by defining a PyTorchJob config file. See the manifests for the distributed MNIST example. You may change the config file based on your requirements.

cat examples/mnist/v1/pytorch_job_mnist_gloo.yaml

Deploy the PyTorchJob resource to start training:

kubectl create -f examples/mnist/v1/pytorch_job_mnist_gloo.yaml

You should now be able to see the created pods matching the specified number of replicas.

kubectl get pods -l pytorch-job-name=pytorch-dist-mnist-gloo

Training should run for about 10 epochs and takes 5-10 minutes on a cpu cluster. Logs can be inspected to see its training progress.

PODNAME=$(kubectl get pods -l pytorch-job-name=pytorch-dist-mnist-gloo,pytorch-replica-type=master -o name)
kubectl logs -f ${PODNAME}

Monitoring a PyTorch Job

kubectl get -o yaml pytorchjobs pytorch-dist-mnist-gloo

See status section to monitor the job status. Here is sample output when the job is successfully completed.

apiVersion: v1
items:
- apiVersion: kubeflow.org/v1
  kind: PyTorchJob
  metadata:
    creationTimestamp: 2019-01-11T00:51:48Z
    generation: 1
    name: pytorch-dist-mnist-gloo
    namespace: default
    resourceVersion: "2146573"
    selfLink: /apis/kubeflow.org/v1/namespaces/kubeflow/pytorchjobs/pytorch-dist-mnist-gloo
    uid: 13ad0e7f-153b-11e9-b5c1-42010a80001e
  spec:
    pytorchReplicaSpecs:
      Master:
        replicas: 1
        restartPolicy: OnFailure
        template:
          spec:
            containers:
            - args:
              - --backend
              - gloo
              image: gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0
              name: pytorch
              resources:
                limits:
                  nvidia.com/gpu: "1"
      Worker:
        replicas: 1
        restartPolicy: OnFailure
        template:
          spec:
            containers:
            - args:
              - --backend
              - gloo
              image: gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0
              name: pytorch
              resources:
                limits:
                  nvidia.com/gpu: "1"
  status:
    completionTime: 2019-01-11T01:03:15Z
    conditions:
    - lastTransitionTime: 2019-01-11T00:51:48Z
      lastUpdateTime: 2019-01-11T00:51:48Z
      message: PyTorchJob pytorch-dist-mnist-gloo is created.
      reason: PyTorchJobCreated
      status: "True"
      type: Created
    - lastTransitionTime: 2019-01-11T00:57:22Z
      lastUpdateTime: 2019-01-11T00:57:22Z
      message: PyTorchJob pytorch-dist-mnist-gloo is running.
      reason: PyTorchJobRunning
      status: "False"
      type: Running
    - lastTransitionTime: 2019-01-11T01:03:15Z
      lastUpdateTime: 2019-01-11T01:03:15Z
      message: PyTorchJob pytorch-dist-mnist-gloo is successfully completed.
      reason: PyTorchJobSucceeded
      status: "True"
      type: Succeeded
    replicaStatuses:
      Master:
        succeeded: 1
      Worker:
        succeeded: 1
    startTime: 2019-01-11T00:57:22Z

Contributing

Please refer to the developer_guide.

pytorch-operator's People

Contributors

akado2009 avatar alembiewski avatar andreyvelich avatar elsonrodriguez avatar gaocegege avatar hmtai avatar hougangliu avatar jagadeeshi2i avatar jeffwan avatar jiaqianjing avatar jinchihe avatar jlewi avatar johnugeorge avatar jose5918 avatar krishnadurai avatar leileiwan avatar mhbuehler avatar myonlyzzy avatar ohmystack avatar patrickxys avatar pingsutw avatar richardsliu avatar terrytangyuan avatar timzaman avatar vpavlin avatar wackxu avatar xrmzju avatar yanniszark avatar yeya24 avatar zlcnju avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pytorch-operator's Issues

how could i delete this pod

when execute “kubectl delete pod pytorch-dist-mnist-gloo-master-0” and"kubectl delete pod pytorch-dist-mnist-gloo-worker-0",after a very short time ,these 2 pods could be created .How can I delete these pods completely? thank you very much.

The Docker image used in .4.1 has multiple security issues

There are several High security issues with gcr.io/kubeflow-images-public/pytorch-operator:v0.4.0. I see there is a v0.5.0. I don't know if these issues have been addressed and/or what the reverse-compatibility between this image and 0.4.1 might be.

Friction log

$ minikube version
minikube version: v0.27.0
$ kubectl version
Client Version: version.Info{Major:"1", Minor:"10", GitVersion:"v1.10.4", GitCommit:"5ca598b4ba5abb89bb773071ce452e33fb66339d", GitTreeState:"clean", BuildDate:"2018-06-06T15:22:13Z", GoVersion:"go1.9.6", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"10", GitVersion:"v1.10.0", GitCommit:"fc32d2f3698e36b93322a3465f63a14e9f0eaead", GitTreeState:"clean", BuildDate:"2018-03-26T16:44:10Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}

These paths in the README don't exist:

kubectl create -f examples/mnist/configmap.yaml
kubectl create -f examples/mnist/pytorchjob.yaml

Minimal smoke test for distributed training

We need a minimal E2E test for distributed training just like we do for the TF operator.

The E2E test should do something like the following

  • Create N replicas
  • Ensure work can be assigned and performed by all replicas

In the TFJob test case we just do a simple matrix multiple.

P1 because this is a blocker to including the PyTorch operator in our 0.2 release

[discussion] Need for 'Master' replica type?

Currently, there are two replica types in Pytorch -' Master' and 'Worker' Do we really need 'Master' replica type? From the pytorch distributed description, master is the worker with the rank 0(first worker) From @jose5918, I got to know that it was kept to indicate the difference between a 'master' and 'worker' incase of a distributed job.
Even if we keep master, no of master replicas has to be always set to 1. Currently, For a local job, config has 1 master and 0 workers, and for a distributed job, config has 1 master and n workers
if we have just 'worker' as type(without 'master' type), config will have 1 worker for a local job and config will have n workers for a distributed job.

We can do it either way. Looking for suggestions/thoughts which would bring more clarity and ease of use

@gaocegege @jlewi @jose5918

Pytorch operator v1beta2 API

There are couple of minor api changes that are suggested. We can incorporate all these changes in the next API version.

Related: kubeflow/training-operator#935

  • Requires support of Status subresource in CRD
  • Add ActiveDeadlineSeconds and BackoffLimit
  • Use pod group instead of PDB for gang scheduling
  • Supporting multiple versions of CRD

[examples] Add an example for PyTorch 0.4

We have an example using PyTorch v0.2, which is released a year ago. I think we could add a v0.4 example.

When I tried to use the dist MNIST code in examples and v0.4 image to run it, the pods are failed.

Ensuring CRD requires cluster-level authority

PR #99 ensures CRD exists when starting pytorch-operator.

However, the CRD Checking function requires cluster-level authority which is hard to obtain mainly for safety reason. I wonder if we can use a alternative method to avoid the cluster-level authority requirement? Thanks!

Distribution across multi-gpu nodes

Thanks for the work in this! This is somewhat tied #30, but I'm used to using DistributedDataParallel with a script similar to this to use multi-gpu for speed/performance over the DataParallel wrapper!

I've started using kubeflow for single GPU nodes, but I'm curious if there is any way I could use two separate 8GPU nodes to train while using the DistributedDataParallel locally on each 8GPU node? Anything I can do to help include this?

ksonnet package

Right now the operator is using helm; can we use ksonnet instead to be consistent with the rest of Kubeflow?

Improve e2e test coverage

Right now e2e test coverage only checks that the PyTorchJob from the examples reaches a running state. Would like to test deletion, multiple jobs etc.

Pytorch workers keep crashing if master is not up yet.

I've been observing this for a while now, and now I'm confident this consistently happens (for me):

  • Create a pytorch job:
    A) If the master is up first, things go well, no pods crash
    B) If the master is not up first, the worker pod keep crashing with below error, until the master pod is up, and then things run fine. See below:
 kubectl logs optimizer-worker-0
2019-01-11 05:43:30,310 INFO     main(rmq_host=rmq.default.svc.cluster.local, rmq_port=5672, batch_size=12)
2019-01-11 05:43:30,310 INFO     init_distribution
Traceback (most recent call last):
  File "optimizer.py", line 459, in <module>
    pretrained_model=args.pretrained_model,
  File "optimizer.py", line 422, in main
    init_distribution()
  File "optimizer.py", line 413, in init_distribution
    torch.distributed.init_process_group(backend=backend)
  File "/root/.local/lib/python3.7/site-packages/torch/distributed/distributed_c10d.py", line 354, in init_process_group
    store, rank, world_size = next(rendezvous(url))
  File "/root/.local/lib/python3.7/site-packages/torch/distributed/rendezvous.py", line 143, in _env_rendezvous_handler
    store = TCPStore(master_addr, master_port, start_daemon)
ValueError: host not found: Name or service not known

The reason this occurs is simply because PyTorch workers require the master to be up to connect to. If they cannot connect to the master, they will die, this is intended behaviour (and nothing to do with the pytorch operator or K8s). However, I would expect the pytorch operator to handle this correctly, and bring the master up before the others.

Double gradient reduction in examples?

First, I'm confused why the example models implement their own DistributedDataParallel module. Why not use the torch one?

DistributedDataParallel is used in the examples:
https://github.com/kubeflow/pytorch-operator/blob/master/examples/ddp/mnist/cpu/mnist_ddp_cpu.py#L154

The DistributedDataParallel documentation states:

During the backwards pass, gradients from each node are averaged.

However, a DIY average_gradients function is used in the same example as well: https://github.com/kubeflow/pytorch-operator/blob/master/examples/ddp/mnist/cpu/mnist_ddp_cpu.py#L168

Double trouble?

Server is unable to handle kubeflow.org/v1alpha1

I have already installed tf-operator in kubernetes cluster. Now I want to deploy pytorch-operator in kubernets. I use Ksonnet.
Here is my steps:

$ ks pkg install kubeflow/pytorch-job
INFO Retrieved 6 files                            
$ ks pkg list
REGISTRY  NAME             INSTALLED
========  ====             =========
incubator apache
incubator efk
incubator mariadb
incubator memcached
incubator mongodb
incubator mysql
incubator nginx
incubator node
incubator postgres
incubator redis
incubator tomcat
kubeflow  argo
kubeflow  core             *
kubeflow  new-package-stub
kubeflow  openmpi
kubeflow  pachyderm
kubeflow  pytorch-job      *
kubeflow  tf-job           *
kubeflow  tf-serving       *
$ ks generate pytorch-job pytorch-job
INFO Writing component at '/home/farrell/work/ks_dir/kubeflow/components/pytorch-job.jsonnet' 
$ ks component list
COMPONENT
=========
kubeflow
kubeflow-core
pytorch-job
$ ks apply default -c pytorch-job
INFO Updating pytorchjob kubeflow.pytorch-job     
ERROR handle object: Server is unable to handle kubeflow.org/v1alpha1, Kind=PyTorchJob 

So I got the error:

ERROR handle object: Server is unable to handle kubeflow.org/v1alpha1, Kind=PyTorchJob

Error applying PyTorchJob: no matches for kind "PyTorchJob" in version "kubeflow.org/v1alpha1"

I'm trying to submit PyTorch job, by following Kubeflow user-guide. However I encountered this error:

ERROR handle object: patching object from cluster: merging object with existing state: unable to recognize "/var/folders/fd/h7tg23rd2p3cx7mnp7j47_gw0000gn/T/ksonnet-mergepatch958071722": no matches for kind "PyTorchJob" in version "kubeflow.org/v1alpha1"

Any thought on how to move forward?

Environment: minikube using Kubernetes 1.10

Here's my sequence of command to setup Minikube + Kubeflow

minikube stop
minikube delete
minikube start
export KUBEFLOW_KS_DIR=/Volumes/tvlk-repo/github/nvs-kubeflow-poc/kubeflow-ks
export KUBEFLOW_DEPLOY=false
export KUBEFLOW_VERSION=0.2.2
export NAMESPACE=default
curl https://raw.githubusercontent.com/kubeflow/kubeflow/v${KUBEFLOW_VERSION}/scripts/deploy.sh | bash
cd ${KUBEFLOW_KS_DIR}
ks param set kubeflow-core reportUsage false
ks apply default

Here's my sequence of command to setup PyTorch:

KS_ENV=default
IMAGE=gcr.io/kubeflow-ci/pytorch-dist-mnist_test:1.0
MNIST_JOB_NAME=mnist
ks generate pytorch-job ${MNIST_JOB_NAME} --name=${MNIST_JOB_NAME}
ks param set ${MNIST_JOB_NAME} image ${IMAGE}
ks apply ${KS_ENV} -c ${MNIST_JOB_NAME}

kubectl version output

ip-10-10-177-11:kubeflow-ks arinto$ kubectl version
Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.7", GitCommit:"dd5e1a2978fd0b97d9b78e1564398aeea7e7fe92", GitTreeState:"clean", BuildDate:"2018-04-19T00:05:56Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"10", GitVersion:"v1.10.0", GitCommit:"fc32d2f3698e36b93322a3465f63a14e9f0eaead", GitTreeState:"clean", BuildDate:"2018-03-26T16:44:10Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}

[discussion] Refactor pytorch operator APIs

Most of the types defined in https://github.com/kubeflow/pytorch-operator/blob/master/pkg/apis/pytorch/v1alpha2/types.go overlaps with TFJob. The structures of the APIs in tf-operator and pytorch-operator are similar enough such that they should just extend from a single API.

I propose something like:

Common:

  • JobStatus
  • ReplicaStatus
  • JobCondition
  • JobConditionType
  • CleanPodPolicy
  • RestartPolicy

TFJob:

  • TFJobSpec
  • TFJobReplicaSpec
  • TFJobReplicaType

Pytorch:

  • PyTorchJobSpec
  • PyTorchReplicaSpec
  • PyTorchReplicaType

The common types can reside in the tf-operator repository for now. This will allow us to:

  1. Keep API semantics and vocabulary consistent across all training components;
  2. Reduce code duplication when possible;
  3. Ensure feature parity across components.

Thoughts?

PyTorchJob reaches Unknown state

After a PyTorch job completes and training should be successful, the state of the PyTorchJob shows as unknown.

Should also update e2e to check for this once it is fixed

Push a `latest` image on postsubmit

The helm chart in the repo should be up to date with the latest build right now since there are no releases. This could be accomplished by using a latest tag when building postsubmits and setting the helm chart image to latest.

MPI backend mnist gpu example error: "No space left on device"

def average_gradients(model):
""" Gradient averaging. """
size = float(dist.get_world_size())
group = dist.new_group([0])
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM, group=group)

When running MPI gpu examples with more epochs, average_gradients called dist.new_group in every backward iterations, which creates new ProcessGroupMPI each time and raises No space left on device error after few epocs as below. Also, original code creates group with group = dist.new_group([0]) instead of whole MPI world for back propagation.

image

Label naming style inconsistent

Nit: the labels dash/underscore are inconsistent. This hurts my inner-zen 🏯 .

Labels:
pytorch-replica-index=0
pytorch-replica-type=master
pytorch_job_name=job2-optimizer
pytorch_job_role=master

Distributed training can not increase the speed of loss reduction

If you only look at the time spent on a single epoch, distributed training is indeed faster than singleton training. But our goal is to have a smaller loss, so I did an experiment. In the same time, the loss of single-instance training is smaller than the loss of distributed training.
The data set is imagenet, the network is resnet50, and the following is my code.

distributed_train.py

# -*- coding: utf-8 -*-

import datetime
import os
import shutil
import sys
import time
import torch
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.nn as nn
import torch.nn.parallel
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms

from torch.autograd import Variable

best_acc1 = 0

def get_dataset(world_size, rank, batch_size, num_workers):
    traindir = os.path.join('/root/data/ILSVRC2012', 'train')
    valdir = os.path.join('/root/data/ILSVRC2012', 'val')
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])

    train_dataset = datasets.ImageFolder(
        traindir,
        transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            normalize,
        ]))

    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset,
        num_replicas=world_size,
        rank=rank,
    )

    train_batch_size = int(batch_size / world_size)
    train_set = torch.utils.data.DataLoader(
        train_dataset, batch_size=train_batch_size, shuffle=(train_sampler is None),
        num_workers=num_workers, pin_memory=True, sampler=train_sampler)

    val_set = torch.utils.data.DataLoader(
        datasets.ImageFolder(valdir, transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            normalize,
        ])),
        batch_size=batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=True)

    return train_set, val_set


def init_print(rank, size, debug_print=True):
    if not debug_print:
        """ In case run on hundreds of nodes, you may want to mute all the nodes except master """
        if rank > 0:
            sys.stdout = open(os.devnull, 'w')
            sys.stderr = open(os.devnull, 'w')
    else:
        # labelled print with info of [rank/size]
        old_out = sys.stdout

        class LabeledStdout:
            def __init__(self, rank, size):
                self._r = rank
                self._s = size
                self.flush = sys.stdout.flush
                self._start_time = datetime.datetime.now()

            def write(self, x):
                if x == '\n':
                    old_out.write(x)
                else:
                    now = datetime.datetime.now()
                    seconds = (now - self._start_time).total_seconds()
                    old_out.write('%s [%d/%d] %s' % (seconds, self._r, self._s, x))

        sys.stdout = LabeledStdout(rank, size)


def main():
    multiprocessing_distributed = True
    lr = 0.01
    weight_decay = 1e-4
    momentum = 0.9
    start_epoch = 0
    epochs = 2

    num_workers = 16
    batch_size = 64

    ngpus_per_node = torch.cuda.device_count()
    print('ngpus per node = %s' % ngpus_per_node)
    print('epochs = %s' % epochs)

    dist_backend = dist.Backend.MPI
    rank = int(os.environ.get('RANK', '0'))
    world_size = int(os.environ.get('WORLD_SIZE', '0'))
    print('world size = %s, rank = %s' % (world_size, rank))
    dist.init_process_group(backend=dist_backend, rank=rank, world_size=world_size)

    ngpus_per_node = torch.cuda.device_count()

    global best_acc1

    init_print(rank, world_size)
    device = torch.device('cuda')

    # create model
    model = models.resnet50(pretrained=False)
    model = model.to(device)

    # When using a single GPU per process and per
    # DistributedDataParallel, we need to divide the batch size
    # ourselves based on the total number of GPUs we have
    model = torch.nn.parallel.DistributedDataParallel(
        model,
    )

    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss()
    # criterion = criterion.cuda()
    criterion = criterion.to(device)

    optimizer = torch.optim.SGD(model.parameters(), lr,
                                momentum=momentum,
                                weight_decay=weight_decay)

    cudnn.benchmark = True

    # partition dataset
    print('batch size = %s' % batch_size)
    print('num_workers = %s' % num_workers)
    train_set, val_set = get_dataset(world_size, rank, batch_size, num_workers)

    print('start epoch = %s, epochs = %s' % (start_epoch, epochs))
    start_time = datetime.datetime.now()
    for epoch in range(start_epoch, epochs):
        print('epoch = %s' % epoch)
        adjust_learning_rate(optimizer, epoch)

        # train for one epoch
        train(train_set, model, criterion, optimizer, epoch, device)

        # evaluate on validation set
        acc1 = validate(val_set, model, criterion, start_time)

        # remember best acc@1 and save checkpoint
        print('acc1 = %s, best_acc1 = %s' % (acc1, best_acc1))
        is_best = acc1 > best_acc1
        best_acc1 = max(acc1, best_acc1)
        print('acc1 = %s, best_acc1 = %s, is_best = %s' % (acc1, best_acc1, is_best))

        arch = 'resnet50'
        if not multiprocessing_distributed or (multiprocessing_distributed
                                                    and rank % ngpus_per_node == 0):
            save_checkpoint({
                'epoch': epoch + 1,
                'arch': arch,
                'state_dict': model.state_dict(),
                'best_acc1': best_acc1,
                'optimizer': optimizer.state_dict(),
            }, is_best)


def train(train_loader, model, criterion, optimizer, epoch, device):
    batch_time = AverageMeter()
    data_time = AverageMeter()
    losses = AverageMeter()
    top1 = AverageMeter()
    top5 = AverageMeter()

    # switch to train mode
    model.train()

    end = time.time()
    print('end = %s' % end)
    print_freq = 20
    for i, (input, target) in enumerate(train_loader):
        # measure data loading time
        data_time.update(time.time() - end)

        input = input.to(device)
        target = target.to(device)

        # compute output
        output = model(input)
        loss = criterion(output, target)

        # measure accuracy and record loss
        acc1, acc5 = accuracy(output, target, topk=(1, 5))
        losses.update(loss.item(), input.size(0))
        top1.update(acc1[0], input.size(0))
        top5.update(acc5[0], input.size(0))

        # compute gradient and do SGD step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()

        if i % print_freq == 0:
            print('Epoch: [{0}][{1}/{2}]\t'
                  'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
                  'Data {data_time.val:.3f} ({data_time.avg:.3f})\t'
                  'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
                  'Acc@1 {top1.val:.3f} ({top1.avg:.3f})\t'
                  'Acc@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
                epoch, i, len(train_loader), batch_time=batch_time,
                data_time=data_time, loss=losses, top1=top1, top5=top5))


def validate(val_loader, model, criterion, start_time):
    batch_time = AverageMeter()
    losses = AverageMeter()
    top1 = AverageMeter()
    top5 = AverageMeter()

    # switch to evaluate mode
    model.eval()

    with torch.no_grad():
        end = time.time()
        for i, (input, target) in enumerate(val_loader):
            input = Variable(input).cuda(non_blocking=True)
            target = Variable(target).cuda(non_blocking=True)

            # compute output
            output = model(input)
            loss = criterion(output, target)

            # measure accuracy and record loss
            acc1, acc5 = accuracy(output, target, topk=(1, 5))
            losses.update(loss.item(), input.size(0))
            top1.update(acc1[0], input.size(0))
            top5.update(acc5[0], input.size(0))

            # measure elapsed time
            batch_time.update(time.time() - end)
            end = time.time()

            print_freq = 10
            if i % print_freq == 0:
                print('Test: [{0}/{1}]\t'
                      'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
                      'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
                      'Acc@1 {top1.val:.3f} ({top1.avg:.3f})\t'
                      'Acc@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
                    i, len(val_loader), batch_time=batch_time, loss=losses,
                    top1=top1, top5=top5))

        now = datetime.datetime.now()
        delta_seconds = (now - start_time).total_seconds()
        print('delta seconds {delta_seconds} * Acc@1 {top1.val:.3f} ({top1.avg:.3f})\t'
              'Acc@5 {top5.val:.3f} ({top5.avg:.3f})\t'
              'Loss {loss.val:.4f} ({loss.avg:.4f})\t'.format(delta_seconds=delta_seconds, top1=top1, top5=top5, loss=losses))

    return top1.avg


def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
    torch.save(state, filename)
    if is_best:
        shutil.copyfile(filename, 'model_best.pth.tar')


class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count


def adjust_learning_rate(optimizer, epoch):
    lr = 0.01
    """Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
    lr = lr * (0.1 ** (epoch // 30))
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr


def accuracy(output, target, topk=(1,)):
    """Computes the accuracy over the k top predictions for the specified values of k"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res


if __name__ == "__main__":
    main()

distributed_train.yaml

apiVersion: "kubeflow.org/v1beta1"
kind: "PyTorchJob"
metadata:
  name: "pytorch-mpi-imagenet-gpu"
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      restartPolicy: OnFailure
      template:
        spec:
          volumes:
            - name: data
              nfs:
                server: 10.255.1.115
                path: "/mnt/data"
            - name: aaaa
              nfs:
                server: 172.12.5.221
                path: "/mnt/aaaa"
            - name: shm
              hostPath:
                path: "/dev/shm"
            - name: nvidia-driver
              hostPath:
                path: "/var/lib/nvidia-docker/volumes/nvidia_driver/375.66"
          containers:
            - name: pytorch
              image: "172.16.3.205:5000/nvidia/cuda:9.0-cudnn7-devel-ubuntu16.04-mpi"
              command:
                - "mpirun"
                - "-n"
                - "1"
                - "--cpu-set"
                - "0,1,2,3,4,5,6,7"
                - "--allow-run-as-root"
                - "python"
                - "-u"
                - "/root/aaaa/distributed_train.py"
              workingDir: "/root/aaaa"
              imagePullPolicy: Always
              resources:
                limits:
                  cpu: 8
                  memory: 30Gi
                  alpha.kubernetes.io/nvidia-gpu: 1
              volumeMounts:
                - name: data
                  mountPath: "/root/data"
                - name: aaaa
                  mountPath: "/root/aaaa"
                - name: shm
                  mountPath: "/dev/shm"
                - name: nvidia-driver
                  mountPath: "/usr/local/nvidia"
          tolerations:
            - key: CriticalAddonsOnly
              operator: Exists
    Worker:
      replicas: 1
      restartPolicy: OnFailure
      template:
        spec:
          volumes:
          - name: data
            nfs:
              server: 10.255.1.115
              path: "/mnt/data"
          - name: aaaa
            nfs:
              server: 172.12.5.221
              path: "/mnt/aaaa"
          - name: shm
            hostPath:
              path: "/dev/shm"
          - name: nvidia-driver
            hostPath:
              path: "/var/lib/nvidia-docker/volumes/nvidia_driver/375.66"
          containers:
            - name: pytorch
              image: "172.16.3.205:5000/nvidia/cuda:9.0-cudnn7-devel-ubuntu16.04-mpi"
              command:
                - "mpirun"
                - "-n"
                - "1"
                - "--cpu-set"
                - "0,1,2,3,4,5,6,7"
                - "--allow-run-as-root"
                - "python"
                - "-u"
                - "/root/aaaa/distributed_train.py"
              workingDir: "/root/aaaa"
              imagePullPolicy: Always
              resources:
                limits:
                  cpu: 8
                  memory: 30Gi
                  alpha.kubernetes.io/nvidia-gpu: 1
              volumeMounts:
              - name: data
                mountPath: "/root/data"
              - name: aaaa
                mountPath: "/root/aaaa"
              - name: shm
                mountPath: "/dev/shm"
              - name: nvidia-driver
                mountPath: "/usr/local/nvidia"
          tolerations:
          - key: CriticalAddonsOnly
            operator: Exists

single_train.py

# -*- coding: utf-8 -*-

import datetime
import os
import shutil
import sys
import time
import torch
import torch.backends.cudnn as cudnn
import torch.nn as nn
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms

from torch.autograd import Variable

best_acc1 = 0

def main():
    global best_acc1

    lr = 0.01
    momentum = 0.9
    weight_decay = 1e-4
    start_epoch = 0
    epochs = 2

    num_workers = 16
    batch_size = 64
    print('num_workers = %s' % num_workers)
    print('batch_size = %s' % batch_size)
    print('epochs = %s' % epochs)

    init_print(0, 1)
    device = torch.device('cuda')

    # create model
    model = models.resnet50(pretrained=False)
    model = model.to(device)

    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss()
    criterion = criterion.to(device)

    optimizer = torch.optim.SGD(model.parameters(), lr,
                                momentum=momentum,
                                weight_decay=weight_decay)

    cudnn.benchmark = True

    # Data loading code
    traindir = os.path.join('/root/data/ILSVRC2012', 'train')
    valdir = os.path.join('/root/data/ILSVRC2012', 'val')
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])

    train_dataset = datasets.ImageFolder(
        traindir,
        transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            normalize,
        ]))

    train_sampler = None

    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, shuffle=(train_sampler is None),
        num_workers=num_workers, pin_memory=True, sampler=train_sampler)

    val_loader = torch.utils.data.DataLoader(
        datasets.ImageFolder(valdir, transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            normalize,
        ])),
        batch_size=batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=True)

    sys.stdout.flush()
    for epoch in range(start_epoch, epochs):
        adjust_learning_rate(optimizer, epoch, lr)

        # train for one epoch
        train(train_loader, model, criterion, optimizer, epoch, device)

        # evaluate on validation set
        acc1 = validate(val_loader, model, criterion)

        # remember best acc@1 and save checkpoint
        is_best = acc1 > best_acc1
        best_acc1 = max(acc1, best_acc1)

    arch = 'resnet50'
    save_checkpoint({
        'epoch': epoch + 1,
        'arch': arch,
        'state_dict': model.state_dict(),
        'best_acc1': best_acc1,
        'optimizer': optimizer.state_dict(),
    }, is_best)


def train(train_loader, model, criterion, optimizer, epoch, device):
    batch_time = AverageMeter()
    data_time = AverageMeter()
    losses = AverageMeter()
    top1 = AverageMeter()
    top5 = AverageMeter()

    # switch to train mode
    model.train()

    end = time.time()
    print_freq = 20
    for i, (input, target) in enumerate(train_loader):
        # measure data loading time
        data_time.update(time.time() - end)

        input = input.to(device)
        target = target.to(device)

        # compute output
        output = model(input)
        loss = criterion(output, target)

        # measure accuracy and record loss
        acc1, acc5 = accuracy(output, target, topk=(1, 5))
        losses.update(loss.item(), input.size(0))
        top1.update(acc1[0], input.size(0))
        top5.update(acc5[0], input.size(0))

        # compute gradient and do SGD step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()

        if i % print_freq == 0:
            print('Epoch: [{0}][{1}/{2}]\t'
                  'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
                  'Data {data_time.val:.3f} ({data_time.avg:.3f})\t'
                  'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
                  'Acc@1 {top1.val:.3f} ({top1.avg:.3f})\t'
                  'Acc@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
                   epoch, i, len(train_loader), batch_time=batch_time,
                   data_time=data_time, loss=losses, top1=top1, top5=top5))
            sys.stdout.flush()


def validate(val_loader, model, criterion):
    batch_time = AverageMeter()
    losses = AverageMeter()
    top1 = AverageMeter()
    top5 = AverageMeter()

    # switch to evaluate mode
    model.eval()

    with torch.no_grad():
        end = time.time()
        print_freq = 20
        for i, (input, target) in enumerate(val_loader):
            input = Variable(input).cuda(non_blocking=True)
            target = Variable(target).cuda(non_blocking=True)

            # compute output
            output = model(input)
            loss = criterion(output, target)

            # measure accuracy and record loss
            acc1, acc5 = accuracy(output, target, topk=(1, 5))
            losses.update(loss.item(), input.size(0))
            top1.update(acc1[0], input.size(0))
            top5.update(acc5[0], input.size(0))

            # measure elapsed time
            batch_time.update(time.time() - end)
            end = time.time()

            if i % print_freq == 0:
                print('Test: [{0}/{1}]\t'
                      'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
                      'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
                      'Acc@1 {top1.val:.3f} ({top1.avg:.3f})\t'
                      'Acc@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
                       i, len(val_loader), batch_time=batch_time, loss=losses,
                       top1=top1, top5=top5))

        print(' * Acc@1 {top1.val:.3f} ({top1.avg:.3f})\t'
              'Acc@5 {top5.val:.3f} ({top5.avg:.3f})\t'
              'Loss {loss.val:.4f} ({loss.avg:.4f})\t'.format(top1=top1, top5=top5, loss=losses))

    return top1.avg


def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
    torch.save(state, filename)
    if is_best:
        shutil.copyfile(filename, 'model_best.pth.tar')


class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count


def adjust_learning_rate(optimizer, epoch, lr):
    """Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
    lr = lr * (0.1 ** (epoch // 30))
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr


def accuracy(output, target, topk=(1,)):
    """Computes the accuracy over the k top predictions for the specified values of k"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res


def init_print(rank, size, debug_print=True):
    if not debug_print:
        """ In case run on hundreds of nodes, you may want to mute all the nodes except master """
        if rank > 0:
            sys.stdout = open(os.devnull, 'w')
            sys.stderr = open(os.devnull, 'w')
    else:
        # labelled print with info of [rank/size]
        old_out = sys.stdout

        class LabeledStdout:
            def __init__(self, rank, size):
                self._r = rank
                self._s = size
                self.flush = sys.stdout.flush
                self._start_time = datetime.datetime.now()

            def write(self, x):
                if x == '\n':
                    old_out.write(x)
                else:
                    now = datetime.datetime.now()
                    seconds = (now - self._start_time).total_seconds()
                    old_out.write('%s [%d/%d] %s' % (seconds, self._r, self._s, x))

        sys.stdout = LabeledStdout(rank, size)


if __name__ == '__main__':
    main()

single_train.json

{
  "kind": "Job",
  "apiVersion": "batch/v1",
  "metadata": {
    "name": "hqx-pytorch-1wxud3d",
    "labels": {
      "name": "hqx-pytorch-1wxud3d"
    }
  },
  "spec": {
    "parallelism": 1,
    "completions": 1,
    "backoffLimit": 4,
    "template": {
      "spec": {
        "volumes": [
          {
            "name": "aaaa",
            "nfs": {
              "server": "172.12.5.221",
              "path": "/mnt/aaaa"
            }
          },
          {
            "name": "data",
            "nfs": {
              "server": "11.255.3.115",
              "path": "/mnt/data"
            }
          },
          {
            "name": "shm",
            "hostPath": {
              "path": "/dev/shm"
            }
          },
          {
            "name": "nvidia-driver",
            "hostPath": {
              "path": "/var/lib/nvidia-docker/volumes/nvidia_driver/375.66"
            }
          }
        ],
        "containers": [
          {
            "name": "hqx-pytorch-1wxud3d",
            "image": "172.16.3.205:5000/pytorch/pytorch:latest",
            "command": [
              "python3",
              "/root/aaaa/single_train.py"
            ],
            "workingDir": "/root/aaaa/",
            "env": [
              {
                "name": "POD_HOST_IP",
                "valueFrom": {
                  "fieldRef": {
                    "fieldPath": "status.hostIP"
                  }
                }
              }
            ],
            "resources": {
              "limits": {
                "cpu": "8",
                "memory": "30Gi",
                "alpha.kubernetes.io/nvidia-gpu": 1
              }
            },
            "volumeMounts": [
              {
                "name": "aaaa",
                "mountPath": "/root/aaaa"
              },
              {
                "name": "data",
                "mountPath": "/root/data"
              },
              {
                "name": "shm",
                "mountPath": "/dev/shm"
              },
              {
                "name": "nvidia-driver",
                "mountPath": "/usr/local/nvidia"
              }
            ],
            "imagePullPolicy": "IfNotPresent"
          }
        ],
        "tolerations": [
          {
            "key": "CriticalAddonsOnly",
            "operator": "Exists"
          }
        ],
        "restartPolicy": "Never"
      }
    }
  }
}

MPI backend examples launch processes independently in each pod

https://github.com/kubeflow/pytorch-operator/blob/master/examples/ddp/mnist/gpu/v1alpha2/job_mnist_DDP_GPU.yaml

When launching MPI backend jobs examples above with ENTRYPOINT ["mpirun", "-n", "4", "--allow-run-as-root", "python", "-u", "/opt/pytorch_dist_mnist/mnist_ddp_gpu.py"] in Dockerfile,I expected to do distributed training where it launched 1 process on each pod(totally 4, with 1 master and 3 workers).

However, it seems like it launched 4 processes on each pod and trained independently.
Is there anything I misunderstood of this examples?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.