Prerequisites
Internet Connection
ADS cli is installed
Install docker: https://docs.docker.com/get-docker
Write your training code:
Your model training script needs to use one of Distributed Strategies in tensorflow.
For example, you can have the following training Tensorflow script for MultiWorkerMirroredStrategy saved as mnist.py:
# Script adapted from tensorflow tutorial: https://www.tensorflow.org/tutorials/distribute/multi_worker_with_kerasimport tensorflow as tfimport tensorflow_datasets as tfdsimport osimport sysimport timeimport adsfrom ocifs import OCIFileSystemfrom tensorflow.data.experimental import AutoShardPolicyBUFFER_SIZE = 10000BATCH_SIZE_PER_REPLICA = 64if '.' not in sys.path: sys.path.insert(0, '.')def create_dir(dir): if not os.path.exists(dir): os.makedirs(dir)def create_dirs(task_type="worker", task_id=0): artifacts_dir = os.environ.get("OCI__SYNC_DIR", "/opt/ml") model_dir = artifacts_dir + "/model" print("creating dirs for Model: ", model_dir) create_dir(model_dir) checkpoint_dir = write_filepath(artifacts_dir, task_type, task_id) return artifacts_dir, checkpoint_dir, model_dirdef write_filepath(artifacts_dir, task_type, task_id): if task_type == None: task_type = "worker" checkpoint_dir = artifacts_dir + "/checkpoints/" + task_type + "/" + str(task_id) print("creating dirs for Checkpoints: ", checkpoint_dir) create_dir(checkpoint_dir) return checkpoint_dirdef scale(image, label): image = tf.cast(image, tf.float32) image /= 255 return image, labeldef get_data(data_bckt=None, data_dir="/code/data", num_replicas=1, num_workers=1): if data_bckt is not None and not os.path.exists(data_dir + '/mnist'): print(f"downloading data from {data_bckt}") ads.set_auth(os.environ.get("OCI_IAM_TYPE", "resource_principal")) authinfo = ads.common.auth.default_signer() oci_filesystem = OCIFileSystem(**authinfo) lck_file = os.path.join(data_dir, '.lck') if not os.path.exists(lck_file): os.makedirs(os.path.dirname(lck_file), exist_ok=True) open(lck_file, 'w').close() oci_filesystem.download(data_bckt, data_dir, recursive=True) else: print(f"data downloaded by a different process. waiting") time.sleep(30) BATCH_SIZE = BATCH_SIZE_PER_REPLICA * num_replicas * num_workers print("Now printing data_dir:", data_dir) datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True, data_dir=data_dir) mnist_train, mnist_test = datasets['train'], datasets['test'] print("num_train_examples :", info.splits['train'].num_examples, " num_test_examples: ", info.splits['test'].num_examples) train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE) test_dataset = mnist_test.map(scale).batch(BATCH_SIZE) train = shard(train_dataset) test = shard(test_dataset) return train, test, infodef shard(dataset): options = tf.data.Options() options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA return dataset.with_options(options)def decay(epoch): if epoch < 3: return 1e-3 elif epoch >= 3 and epoch < 7: return 1e-4 else: return 1e-5def get_callbacks(model, checkpoint_dir="/opt/ml/checkpoints"): checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}") class PrintLR(tf.keras.callbacks.Callback): def on_epoch_end(self, epoch, logs=None): print('\nLearning rate for epoch {} is {}'.format(epoch + 1, model.optimizer.lr.numpy()), flush=True) callbacks = [ tf.keras.callbacks.TensorBoard(log_dir='./logs'), tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix, # save_weights_only=True ), tf.keras.callbacks.LearningRateScheduler(decay), PrintLR() ] return callbacksdef build_and_compile_cnn_model(): print("TF_CONFIG in model:", os.environ.get("TF_CONFIG")) model = tf.keras.Sequential([ tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Flatten(), tf.keras.layers.Dense(64, activation='relu'), tf.keras.layers.Dense(10) ]) model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy']) return model
And, save the following script as train.py
import tensorflow as tfimport argparseimport mnistprint(tf.__version__)parser = argparse.ArgumentParser(description='Tensorflow Native MNIST Example')parser.add_argument('--data-dir', help='location of the training dataset in the local filesystem (will be downloaded if needed)', default='/code/data')parser.add_argument('--data-bckt', help='location of the training dataset in an object storage bucket', default=None)args = parser.parse_args()artifacts_dir, checkpoint_dir, model_dir = mnist.create_dirs()strategy = tf.distribute.MirroredStrategy()print('Number of devices: {}'.format(strategy.num_replicas_in_sync))train_dataset, test_dataset, info = mnist.get_data(data_bckt=args.data_bckt, data_dir=args.data_dir, num_replicas=strategy.num_replicas_in_sync)with strategy.scope(): model = mnist.build_and_compile_cnn_model()model.fit(train_dataset, epochs=2, callbacks=mnist.get_callbacks(model, checkpoint_dir))model.save(model_dir, save_format='tf')
Initialize a distributed-training folder:
At this point you have created a training file (or files) - train.py
from the aboveexample. Now, run the command below.
ads opctl distributed-training init --framework tensorflow --version v1
This will download the tensorflow
framework and place it inside 'oci_dist_training_artifacts'
folder.
Note: Whenever you change the code, you have to build, tag and push the image to repo. This is automatically done in `ads opctl run`
cli command.
Containerize your code and build container:
The required python dependencies are provided inside the conda environment file oci_dist_training_artifacts/tensorflow/v1/environments.yaml. If your code requires additional dependency, update this file.
Also, while updating environments.yaml do not remove the existing libraries. You can append to the list.
Update the TAG and the IMAGE_NAME as per your needs -
export IMAGE_NAME=<region.ocir.io/my-tenancy/image-name>export TAG=latestexport MOUNT_FOLDER_PATH=.
Build the container image.
ads opctl distributed-training build-image \ -t $TAG \ -reg $IMAGE_NAME \ -df oci_dist_training_artifacts/tensorflow/v1/Dockerfile \
The code is assumed to be in the current working directory. To override the source code directory, use the -s
flag and specify the code dir. This folder should be within the current working directory.
ads opctl distributed-training build-image \ -t $TAG \ -reg $IMAGE_NAME \ -df oci_dist_training_artifacts/tensorflow/v1/Dockerfile \ -s $MOUNT_FOLDER_PATH
If you are behind proxy, ads opctl will automatically use your proxy settings (defined via no_proxy
, http_proxy
and https_proxy
).
Define your workload yaml:
The yaml
file is a declarative way to express the workload.In this example, we bring up 1 worker node and 1 chief-worker node.The training code to run is train.py
.All your training code is assumed to be present inside /code
directory within the container.Additionally, you can also put any data files inside the same directory(and pass on the location ex /code/data/**
as an argument to your training script using runtime->spec->args).
kind: distributedapiVersion: v1.0spec: infrastructure: kind: infrastructure type: dataScienceJob apiVersion: v1.0 spec: projectId: oci.xxxx.<project_ocid> compartmentId: oci.xxxx.<compartment_ocid> displayName: Tensorflow logGroupId: oci.xxxx.<log_group_ocid> subnetId: oci.xxxx.<subnet-ocid> shapeName: VM.GPU2.1 blockStorageSize: 50 cluster: kind: TENSORFLOW apiVersion: v1.0 spec: image: "@image" workDir: "oci://<bucket_name>@<bucket_namespace>/<bucket_prefix>" name: "tf_multiworker" config: env: - name: WORKER_PORT #Optional. Defaults to 12345 value: 12345 - name: SYNC_ARTIFACTS #Mandatory: Switched on by Default. value: 1 - name: WORKSPACE #Mandatory if SYNC_ARTIFACTS==1: Destination object bucket to sync generated artifacts to. value: "<bucket_name>" - name: WORKSPACE_PREFIX #Mandatory if SYNC_ARTIFACTS==1: Destination object bucket folder to sync generated artifacts to. value: "<bucket_prefix>" main: name: "chief" replicas: 1 #this will be always 1. worker: name: "worker" replicas: 1 #number of workers. This is in addition to the 'chief' worker. Could be more than 1 runtime: kind: python apiVersion: v1.0 spec: entryPoint: "/code/train.py" #location of user's training script in the container image. args: #any arguments that the training script requires. - --data-dir # assuming data folder has been bundled in the container image. - /code/data/ env:
Use ads opctl to create the cluster infrastructure and run the workload:
Do a dry run to inspect how the yaml translates to Job and Job Runs
ads opctl run -f train.yaml --dry-run
This will give output similar to this.
-----------------------------Entering dryrun mode----------------------------------Creating Job with payload:kind: jobspec: infrastructure: kind: infrastructure spec: projectId: oci.xxxx.<project_ocid> compartmentId: oci.xxxx.<compartment_ocid> displayName: Tensorflow logGroupId: oci.xxxx.<log_group_ocid> logId: oci.xxx.<log_ocid> subnetId: oci.xxxx.<subnet-ocid> shapeName: VM.GPU2.1 blockStorageSize: 50 type: dataScienceJob name: tf_multiworker runtime: kind: runtime spec: entrypoint: null env: - name: WORKER_PORT value: 12345 - name: SYNC_ARTIFACTS value: 1 - name: WORKSPACE value: "<bucket_name>" - name: WORKSPACE_PREFIX value: "<bucket_prefix>" - name: OCI__WORK_DIR value: oci://<bucket_name>@<bucket_namespace>/<bucket_prefix> - name: OCI__EPHEMERAL value: None - name: OCI__CLUSTER_TYPE value: TENSORFLOW - name: OCI__WORKER_COUNT value: '1' - name: OCI__START_ARGS value: '' - name: OCI__ENTRY_SCRIPT value: /code/train.py image: "<region>.ocir.io/<tenancy_id>/<repo_name>/<image_name>:<image_tag>" type: container++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++Creating Main Job Run with following details:Name: chiefEnvironment Variables: OCI__MODE:MAIN~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Creating Job Runs with following details:Name: worker_0Environment Variables: OCI__MODE:WORKER-----------------------------Ending dryrun mode----------------------------------
Test Locally:
Before submitting the workload to jobs, you can run it locally to test your code, dependencies, configurations etc.With -b local
flag, it uses a local backend. Further when you need to run this workload on OCI data science jobs, simply use -b job
flag instead.
ads opctl run -f train.yaml -b local
If your code requires to use any oci services (like object bucket), you need to mount oci keys from your local host machine onto the container. This is already done for you assuming the typical location of oci keys ~/.oci
. You can modify it though, in-case you have keys at a different location. You need to do this in the config.ini file.
oci_key_mnt = ~/.oci:/home/oci_dist_training/.oci
Note that the local backend requires the source code for your workload is available locally in the source folder specified in the config.ini
file.If you specified Git repository or OCI object storage location as source code location in your workflow YAML, please make sure you have a local copy available for local testing.
Submit the workload:
ads opctl run -f train.yaml -b job
Note:: This will automatically push the docker image to theOCI container registry repo .
Once running, you will see on the terminal outputs similar to the below
info.yaml¶
jobId: oci.xxxx.<job_ocid>mainJobRunId: mainJobRunIdName: oci.xxxx.<job_run_ocid>workDir: oci://my-bucket@my-namespace/cluster-testing/005otherJobRunIds: - workerJobRunIdName_1: oci.xxxx.<job_run_ocid> - workerJobRunIdName_2: oci.xxxx.<job_run_ocid> - workerJobRunIdName_3: oci.xxxx.<job_run_ocid>
This information can be saved as YAML file and used as input to ads opctl distributed-training show-config -f <info.yaml>
.You can use --job-info
to save the job run info into YAML, for example:
ads opctl run -f train.yaml --job-info info.yaml
Saving Artifacts to Object Storage Buckets
In case you want to save the artifacts generated by the training process (model checkpoints, TensorBoard logs, etc.) to an object bucketyou can use the ‘sync’ feature. The environment variable OCI__SYNC_DIR
exposes the directory location that will be automatically synchronizedto the configured object storage bucket location. Use this directory in your training script to save the artifacts.
To configure the destination object storage bucket location, use the following settings in the workload yaml file(train.yaml).
- name: SYNC_ARTIFACTS value: 1- name: WORKSPACE value: "<bucket_name>"- name: WORKSPACE_PREFIX value: "<bucket_prefix>"
Note: Change SYNC_ARTIFACTS
to 0
to disable this feature.Use OCI__SYNC_DIR
env variable in your code to save the artifacts. For Example :
tf.keras.callbacks.ModelCheckpoint(os.path.join(os.environ.get("OCI__SYNC_DIR"),"ckpts",'checkpoint-{epoch}.h5'))
Monitoring the workload logs
To view the logs from a job run, you could run -
ads jobs watch oci.xxxx.<job_run_ocid>
Profiling
You may want to profile your training setup for optimization/performance tuning. Profiling typically provides a detailed analysis of cpu utilization, gpu utilization,top cuda kernels, top operators etc. You can choose to profile your training setup using the native Pytorch profiler or using a third party profiler such as Nvidia Nsights.
Profiling using Tensorflow Profiler
Tensorflow Profiler is a native offering from Tensforflow for Tensorflow performance profiling.
Profiling is invoked using code instrumentation using one of the following apis.
Refer above links for changes that you need to do in your training script for instrumentation.
You should choose the OCI__SYNC_DIR
directory to save the profiling logs. For example:
options = tf.profiler.experimental.ProfilerOptions( host_tracer_level=2, python_tracer_level=1, device_tracer_level=1, delay_ms=None)with tf.profiler.experimental.Profile(os.environ.get("OCI__SYNC_DIR") + "/logs",options=options): # training code
In case of keras callback:
tboard_callback = tf.keras.callbacks.TensorBoard(log_dir = os.environ.get("OCI__SYNC_DIR") + "/logs", histogram_freq = 1, profile_batch = '500,520')model.fit(...,callbacks = [tboard_callback])
Also, the sync feature SYNC_ARTIFACTS
should be enabled '1'
to sync the profiling logs to the configured object storage.
Thereafter, use Tensorboard to view logs. Refer the Tensorboard setup for set-up on your computer.
Profiling using Nvidia Nsights
Nvidia Nsights. is a system wide profiling tool from Nvidia that can be used to profile Deep Learning workloads.
Nsights requires no change in your training code. This works on process level. You can enable this experimental feature in your training setup via the following configuration in the runtime yaml file(highlighted).
spec: image: "@image" workDir: "oci://@/" name: "tf_multiworker" config: env: - name: WORKER_PORT value: 12345 - name: SYNC_ARTIFACTS value: 1 - name: WORKSPACE value: "<bucket_name>" - name: WORKSPACE_PREFIX value: "<bucket_prefix>" - name: PROFILE value: 1 - name: PROFILE_CMD value: "nsys profile -w true -t cuda,nvtx,osrt,cudnn,cublas -s none -o /opt/ml/nsight_report -x true" main: name: "main" replicas: 1 worker: name: "worker" replicas: 1
Refer this for nsys profile command options. You can modify the command within the PROFILE_CMD
but remember this is all experimental. The profiling reports are generated per node. You need to download the reports to your computer manually or via the oci command.
oci os object bulk-download \ -ns <namespace> \ -bn <bucket_name> \ --download-dir /path/on/your/computer \ --prefix path/on/bucket/<job_id>
Note: -bn
== WORKSPACE
and --prefix path
== WORKSPACE_PREFIX/<job_id>
, as configured in the runtime yaml file.To view the reports, you would need to install Nsight Systems app from here. Thereafter, open the downloaded reports in the Nsight Systems app.
Other Tensorflow Strategies supported
Tensorflow has two multi-worker strategies: MultiWorkerMirroredStrategy
and ParameterServerStrategy
.Let’s see changes that you would need to do to run ParameterServerStrategy
workload.
You can have the following training Tensorflow script for ParameterServerStrategy
saved as train.py
(just like mnist.py
and train.py
in case of MultiWorkerMirroredStrategy
):
# Script adapted from tensorflow tutorial: https://www.tensorflow.org/tutorials/distribute/parameter_server_trainingimport osimport tensorflow as tfimport jsonimport multiprocessingNUM_PS = len(json.loads(os.environ['TF_CONFIG'])['cluster']['ps'])global_batch_size = 64def worker(num_workers, cluster_resolver): # Workers need some inter_ops threads to work properly. worker_config = tf.compat.v1.ConfigProto() if multiprocessing.cpu_count() < num_workers + 1: worker_config.inter_op_parallelism_threads = num_workers + 1 for i in range(num_workers): print("cluster_resolver.task_id: ", cluster_resolver.task_id, flush=True) s = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, config=worker_config, protocol="grpc") s.join()def ps(num_ps, cluster_resolver): print("cluster_resolver.task_id: ", cluster_resolver.task_id, flush=True) for i in range(num_ps): s = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, protocol="grpc") s.join()def create_cluster(cluster_resolver, num_workers=1, num_ps=1, mode="worker"): os.environ["GRPC_FAIL_FAST"] = "use_caller" if mode.lower() == 'worker': print("Starting worker server...", flush=True) worker(num_workers, cluster_resolver) else: print("Starting ps server...", flush=True) ps(num_ps, cluster_resolver) return cluster_resolver, cluster_resolver.cluster_spec()def decay(epoch): if epoch < 3: return 1e-3 elif epoch >= 3 and epoch < 7: return 1e-4 else: return 1e-5def get_callbacks(model): class PrintLR(tf.keras.callbacks.Callback): def on_epoch_end(self, epoch, logs=None): print('\nLearning rate for epoch {} is {}'.format(epoch + 1, model.optimizer.lr.numpy()), flush=True) callbacks = [ tf.keras.callbacks.TensorBoard(log_dir='./logs'), tf.keras.callbacks.LearningRateScheduler(decay), PrintLR() ] return callbacksdef create_dir(dir): if not os.path.exists(dir): os.makedirs(dir)def get_artificial_data(): x = tf.random.uniform((10, 10)) y = tf.random.uniform((10,)) dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat() dataset = dataset.batch(global_batch_size) dataset = dataset.prefetch(2) return datasetcluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()if not os.environ["OCI__MODE"] == "MAIN": create_cluster(cluster_resolver, num_workers=1, num_ps=1, mode=os.environ["OCI__MODE"]) passvariable_partitioner = ( tf.distribute.experimental.partitioners.MinSizePartitioner( min_shard_bytes=(256 << 10), max_shards=NUM_PS))strategy = tf.distribute.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner)dataset = get_artificial_data()with strategy.scope(): model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)]) model.compile(tf.keras.optimizers.SGD(), loss="mse", steps_per_execution=10)callbacks = get_callbacks(model)model.fit(dataset, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Train.yaml
: The only difference here is that the parameter server train.yaml also needs to have ps
worker-pool.This will create dedicated instance(s) for Tensorflow Parameter Servers.
Use the following train.yaml:
kind: distributedapiVersion: v1.0spec: infrastructure: kind: infrastructure type: dataScienceJob apiVersion: v1.0 spec: projectId: oci.xxxx.<project_ocid> compartmentId: oci.xxxx.<compartment_ocid> displayName: Distributed-TF logGroupId: oci.xxxx.<log_group_ocid> subnetId: oci.xxxx.<subnet-ocid> shapeName: VM.Standard2.4 blockStorageSize: 50 cluster: kind: TENSORFLOW apiVersion: v1.0 spec: image: "@image" workDir: "oci://<bucket_name>@<bucket_namespace>/<bucket_prefix>" name: "tf_ps" config: env: - name: WORKER_PORT #Optional. Defaults to 12345 value: 12345 - name: SYNC_ARTIFACTS #Mandatory: Switched on by Default. value: 1 - name: WORKSPACE #Mandatory if SYNC_ARTIFACTS==1: Destination object bucket to sync generated artifacts to. value: "<bucket_name>" - name: WORKSPACE_PREFIX #Mandatory if SYNC_ARTIFACTS==1: Destination object bucket folder to sync generated artifacts to. value: "<bucket_prefix>" main: name: "coordinator" replicas: 1 #this will be always 1. worker: name: "worker" replicas: 1 #number of workers; any number > 0 ps: name: "ps" # number of parameter servers; any number > 0 replicas: 1 runtime: kind: python apiVersion: v1.0 spec: spec: entryPoint: "/code/train.py" #location of user's training script in the container image. args: #any arguments that the training script requires. env:
The rest of the steps remain the same and should be followed as it is.