Running Distributed TensorFlow Jobs on Kubeflow 3.5

Author: Josh Patterson

Date: March 5th, 2019

This tutorial is on how to run a distributed TensorFlow job on Kubeflow 3.5. In a previous tutorial, we showed how to setup Kubeflow 3.5 on Google Cloud. For a review on why Kubeflow is compelling in the current machine learning infrastructure landscape, check out our report and our 2019 Trends interview notes on artificial intelligence and machine learning.

A trend we see with our customers is where they want to manage workloads that can easily move between an on-premise infrastructure and a similar cloud managed infrastructure. Kubernetes is quickly becoming the platform for doing just this. While TensorFlow already has a distributed execution mode available, some teams want to manage their systems with kubernetes for the aforementioned reasons. From that perspective we saw it relevant to put together this tutorial.

Concepts in this tutorial:

In this article we'll get a better understanding of:

  • Overall architecture of distributed TensorFlow job execution on Kubeflow 3.5
  • Working with .yaml files for Kubernetes custom resource definitions
  • How to setup a custom TensorFlow job on Kubeflow
  • Managing containers and container registries
  • Working with kubectl to operate the remote Kubernetes cluster
In this tutorial we'll work through new and old concepts to build a working knowledge on how to run a distributed TensorFlow job on Kubeflow 3.5. We'll start out with a high-level review of what we're going to do and then dig into the sub-topics involved in running the job.

Overview of TensorFlow on Kubeflow

The high-level set of tasks needed to run a TensorFlow job on Kubeflow are listed below.

  1. Write/configure (or re-use) python TensorFlow training code
  2. Build a .yaml file based on the custom resource definition "TFJob" describing our job (container image, program or script for training, execution parameters, etc)
  3. Find an existing container or build a docker container image containing our code and dependencies
  4. Send the job yaml file to the cluster for execution with kubectl

Out of the box, Kubernetes does not understand how distributed TensorFlow works. Kubernetes needs help understanding where the daemons are running and how they talk with one another. We can see the general flow of how the different parts of Kubeflow work together to get TensorFlow containers working on Kubernetes and coordinating between worker containers.

In the sections below we'll explain the individual parts from the diagram above, focusing on TFJob, the custom component that manages distributed TensorFlow excecution on Kubeflow.

What is TFJob?

Sidebar: Kubernetes and Custom Resources

A kubernetes resource is an endpoint in the Kubernetes API that stores a group of API Objects of a certain kind. A custom resource is a Kubernetes API extension that is specific to a cluster once installed, customizing said cluster. Custom resources can be created, updated, and deleted on a running cluster through dynamic registration. Once we have installed a custom resource, we can control it with the kubectl tool just like we would built-in resources (e.g., "pods", etc).

By themselves custom resources allow us to store and retrieve structured data. When we combine a custom resource with a controller then they become a true declarative API. The controller interprets the structured data as a record of the user’s desired state. The controller continually takes action to get to this state. Custom controllers are especially effective when combined with custom resources (but they can work with any kind of resource). One example of this combintion (custom controller and custom resource) is the "operator pattern". The operator pattern allows us to encode domain knowledge for specific applications into an extension of the Kubernetes API, bringing us to how

TFJob is a custom component for Kubeflow which contains a Kubernetes custom resource descriptor (CRD) and an associated controller ( tf-operator, which we'll discuss further below). The TFJob CRD (a resource with a simple YAML representation) makes it easy to run distributed or non-distributed TensorFlow jobs on Kubernetes.

The TFJob CRD informs the cluster how to manage kubernetes resource during job training. When we write a new TensorFlow training job for Kubeflow we write a custom .yaml file that references the TFJob CRD, which is a resource with a simple YAML representation.

TensorFlow supports distributed training and can contain 0 or more of the following process types:

  • Chief: responsible for orchestrating training along with other tasks such as checkingpointing the model
  • Ps: Parameter server, provides a distributed data store for the global model parameter vector
  • Worker: Works update a local copy of the model parameter vector by training on the input dataset. Sometimes worker 0 may also serve as the Chief
  • Evaluator: tracks model statistics during training

With our custom job YAML file we'll setup how these processes execute for our specific job. Let's now take a look at a how to setup a TensorFlow job YAML file.

An Example Custom TensorFlow Job Configuration in YAML

In this case we'll take a look at how to get the canonical MNIST job running as a Kubeflow job. In the short example below we can see how the kind field is referenced as "TFJob" and the job roles are configured in the replicaSpecs section:

Understanding the Configuration Options

There are two versions of the TFJob API:

  1. v1alpha2
  2. v1beta1
For the purposes of this example we'll forcus on v1alpha2 because we're running on a deployed Kubeflow 3.5 cluster in this article.

Key entries in the top of the yaml file (and their role) to note:

apiVersion: "kubeflow.org/v1alpha2"
kind: "TFJob"
metadata:
  name: "dist-mnist-pct"
The apiVersion field references the version of our API (v1alpha2). The kind field references the CRD we want to use, "TFJob". The metadata field allows us to set labels and give the job a specific name, along with assigning it to a namespace. The metadata subfields helps uniquely identify the object, including a name string, UID, and optional namespace. In this example we name our job "dist-mnist-pct".

Defining Processes with tfReplicaSpecs

Next we have the tfReplicaSpecs section, as seen below:

spec:
  tfReplicaSpecs:
    PS:
      replicas: 1
      restartPolicy: Never
      template:
        spec:
          containers:
            - name: tensorflow
              image: emsixteeen/tf-dist-mnist-test:1.0
The tfReplicaSpec section defines the processes (distributed) TensorFlow uses:
  • Master
  • Workers
  • Ps
In the code snippet above, we see specifically the definition for the PS (parameter server). A TFJob resource is a collection of TfReplicas. Each TfReplica matches up to a set of TensorFlow processes executing a role in the TensorFlow training job (example: a TensorFlow worker process, or a TensorFlow parameter server process).

The TFJob (v1alpha2) spec has a map that matches the type of replica (the list of processes above) with the TFReplicaSpec for that replica. Each TFReplicaSpec entry has the following 3 sub-fields:

  1. replicas: replica count of this process type to create for the TFJob instance
  2. template: PodTemplateSpec describing the pod we want to create for each replica (the pod must have a container named 'tensorflow')
  3. restartPolicy: the policy for if/how the pod will be restarted when they exit
Each TfReplica contains a standard Kubernetes PodTemplate to specify the processes (including TensorFlow) to run in each replica, allowing us to leverage Kubernetes features, which we'll see in the next section when we highlight the Worker definition.

Defining TensorFlow Worker Processes for Kubeflow

We can see the worker processes defined in the YAML configuration snippet below:

    Worker:
      replicas: 2
      restartPolicy: Never
      template:
        spec:
          containers:
            - name: tensorflow
              image: emsixteeen/tf-dist-mnist-test:1.0
We can see that we're requesting to run 2 worker processes with the replicas: 2 configuration line, as we wrote about above in the previous section. We also set the restartPolicy here to "Never". We can see in the template section, we are defining a PodTemplateSpec for the pod we want to create for the worker replicas. Inside the template, we define a spec (PodSpec) field, which describes the specification of the desired behavior of the pod.

Inside the spec field, we further define a containers field. This field defines a list of containers we want to assign to the pod. There must be at least 1 container assigned to the pod, and we cannot update this field later. In the configuration example above, we can see the following container fields defined:

  • name: container name
  • image: docker image name (for expanding information on docker images, check out this resource)
Some of the other parameters we can set for the containers field are:
  • args
  • command
  • env
  • ports
  • volumeMounts
  • workingDir

We now have a good idea of what our custom TensorFlow job needs to look like and we're ready to run our job. First we need to make sure we have a Kubeflow 3.5 cluster ready and installed with the right TFJob CRD to support out job.

To deploy the TFJob custom resource to our kubernetes cluster we need to deploy kubeflow to our cluster. This way, when we send our custom TensorFlow job to the cluster, the custom operator tf-operator is already installed there. Something to note as well is that each TensorFlow job running on Kubeflow is treated as a component in our Kubeflow application.

Running the TensorFlow Job on Kubeflow with Kubectl

A Tensorflow job on Kubeflow is made up of:

  1. TensorFlow python code to execute for model training
  2. Custom job .yaml file
  3. Docker container to execute the code on inside the cluster (example)
For the purposes of this article we're going to re-use some existing training code for distributed training on the canonical MNIST dataset. We've already setup a pre-built container image on docker hub (referenced in the job yaml file) for the reader to use containing the distributed training python code. All we have remaining to setup this job is to:
  1. Git clone a repository containing the job yaml file already setup for the reader
  2. Run the job on the cluster with kubectl
So let's get to it.

First we'll get a copy of the job yaml file from github:

git clone https://github.com/pattersonconsulting/tf_mnist_kubeflow_3_5.git

Change into the project subdirectory (cd command) and then run the following kubectl command:

Kubectl apply -f tf_mnist_job.yaml

We should now have the job running on the Kubeflow cluster. We won't see the job running and writing to our console screen because it is running on a remote cluster. We can check the job status with the command:

kubectl get pod

Our console output should look something like:

NAME READY STATUS RESTARTS AGE ambassador-f4898cd57-ctsx7 3/3 Running 0 28d ambassador-f4898cd57-ln8p7 3/3 Running 0 28d ambassador-f4898cd57-lxbjg 3/3 Running 0 28d centraldashboard-79645788-2bkht 1/1 Running 0 28d dist-mnist-pct-ps-0 1/1 Running 0 1m dist-mnist-pct-worker-0 1/1 Running 0 1m dist-mnist-pct-worker-1 1/1 Running 0 1m tf-hub-0 1/1 Running 0 28d tf-job-dashboard-7cddcdf9c4-lpktq 1/1 Running 0 28d tf-job-operator-v1alpha2-6566f45db-nxkxr 1/1 Running 0 28d


Successful Job Completion

To check the progress of our job, we want to check the logs of the local TensorFlow process on the cluster. We do that with the following command:

kubectl logs dist-mnist-pct-worker-0

The logs will look something like the output sample below:

1551717385.842652: Worker 0: training step 10020 done (global step: 19974) 1551717385.855453: Worker 0: training step 10021 done (global step: 19976) 1551717385.863761: Worker 0: training step 10022 done (global step: 19978) 1551717385.874690: Worker 0: training step 10023 done (global step: 19979) 1551717385.886346: Worker 0: training step 10024 done (global step: 19981) 1551717385.896427: Worker 0: training step 10025 done (global step: 19983) 1551717385.910320: Worker 0: training step 10026 done (global step: 19984) 1551717385.922846: Worker 0: training step 10027 done (global step: 19986) 1551717385.934626: Worker 0: training step 10028 done (global step: 19988) 1551717385.945602: Worker 0: training step 10029 done (global step: 19990) 1551717385.959762: Worker 0: training step 10030 done (global step: 19992) 1551717385.973269: Worker 0: training step 10031 done (global step: 19994) 1551717385.984852: Worker 0: training step 10032 done (global step: 19995) 1551717385.995786: Worker 0: training step 10033 done (global step: 19997) 1551717386.009679: Worker 0: training step 10034 done (global step: 19999) 1551717386.025252: Worker 0: training step 10035 done (global step: 20001) Training ends @ 1551717386.025338 Training elapsed time: 148.364505 s After 20000 training step(s), validation cross entropy = 1998.18


Checking a Process Logs on Failure

Sometimes we'll notice a job that seems hung or gives an error message in Kubernetes. We can check the logs to see what is going on inside the container with the same command:

kubectl logs dist-mnist-pct-worker-0

The logs will look something like the output sample below:

INFO|2019-03-04T16:26:47|/opt/launcher.py|48| Launcher started. INFO|2019-03-04T16:26:47|/opt/launcher.py|73| Command to run: --job_name=worker --ps_hosts=dist-mnist-pct-ps-0:2222 --worker_hosts=dist-mnist-pct-worker-0:2222,dist-mnist-pct-worker-1:2222 --task_index=0 INFO|2019-03-04T16:26:47|/opt/launcher.py|15| Running --job_name=worker --ps_hosts=dist-mnist-pct-ps-0:2222 --worker_hosts=dist-mnist-pct-worker-0:2222,dist-mnist-pct-worker-1:2222 --task_index=0 Traceback (most recent call last): File "/opt/launcher.py", line 79, in run_and_stream(command) File "/opt/launcher.py", line 17, in run_and_stream stderr=subprocess.STDOUT) File "/usr/lib/python2.7/subprocess.py", line 711, in __init__ errread, errwrite) File "/usr/lib/python2.7/subprocess.py", line 1343, in _execute_child raise child_exception OSError: [Errno 2] No such file or directory


Delete a Job

Periodically we will mis-configure a job or have a missing container in a repo. The job will fail and we need to clear it out of kubeflow. To do this we need to delete the job which we can do with the following command:

kubectl delete -f [file.yaml]

The result of this command will look similar to the console output below:

tfjob.kubeflow.org "dist-mnist-pct" deleted


Accessing the TJob User Interface

To get to the TensorFlow dashboard included with Kubeflow 3.5 we need to setup ports correctly with the command:

kubectl port-forward svc/ambassador 8080:80

Now we go to our web browser and load the url: http://localhost:8080/ and we should see something like the image below:

Optional: Building Custom TensorFlow Docker Containers

This tutorial has dealt with running a pre-made TensorFlow job running in pre-built containers. Eventually the reader will want to customize their own job and that will likely involve updating the container image so in this section we give the reader some basics on how to do that.

To launch a container we run an image. An image includes everything needed to run an application (code, runtime, libraries, etc) as an executable image. In our TensorFlow job's cases, it includes things like the TensorFlow library dependencies and our python training code to run on each container.

Docker hub provides a repository for container images to be stored, searched, and retrieved. Other repositories include Google's Container Registry and on-premise Artifactory installs.

In the code snippet below we can see the docker file used to build this tutorial's example container image.

(Assuming the reader has a docker hub account), we can build the docker file and tag it into our docker hub repository with the command:

docker build -f Dockerfile -t [namespace/tag:version] ./


Summary

In this article we walked the reader through an explanation of how TensorFlow jobs work on Kubeflow. We also showed the reader how to run their own custom TensorFlow jobs on Kubeflow and looked at the various ways to customize a job. In future articles we'll look at how to leverage different data storage / loading patterns and how to launch jobs from JupyterHub notebooks on Kubeflow.

For more perspective on the space, check out our report and our 2019 Trends interview notes on artificial intelligence and machine learning. If working with machine learning workflows on Kubeflow is interesting to your organization, reach and and continue the conversation with us.