Creating Tensorflow Workloads - ADS v2.11.10


Write your training code:

Your model training script needs to use one of Distributed Strategies in tensorflow.

For example, you can have the following training Tensorflow script for MultiWorkerMirroredStrategy saved as

# Script adapted from tensorflow tutorial: tensorflow as tfimport tensorflow_datasets as tfdsimport osimport sysimport timeimport adsfrom ocifs import OCIFileSystemfrom import AutoShardPolicyBUFFER_SIZE = 10000BATCH_SIZE_PER_REPLICA = 64if '.' not in sys.path: sys.path.insert(0, '.')def create_dir(dir): if not os.path.exists(dir): os.makedirs(dir)def create_dirs(task_type="worker", task_id=0): artifacts_dir = os.environ.get("OCI__SYNC_DIR", "/opt/ml") model_dir = artifacts_dir + "/model" print("creating dirs for Model: ", model_dir) create_dir(model_dir) checkpoint_dir = write_filepath(artifacts_dir, task_type, task_id) return artifacts_dir, checkpoint_dir, model_dirdef write_filepath(artifacts_dir, task_type, task_id): if task_type == None: task_type = "worker" checkpoint_dir = artifacts_dir + "/checkpoints/" + task_type + "/" + str(task_id) print("creating dirs for Checkpoints: ", checkpoint_dir) create_dir(checkpoint_dir) return checkpoint_dirdef scale(image, label): image = tf.cast(image, tf.float32) image /= 255 return image, labeldef get_data(data_bckt=None, data_dir="/code/data", num_replicas=1, num_workers=1): if data_bckt is not None and not os.path.exists(data_dir + '/mnist'): print(f"downloading data from {data_bckt}") ads.set_auth(os.environ.get("OCI_IAM_TYPE", "resource_principal")) authinfo = ads.common.auth.default_signer() oci_filesystem = OCIFileSystem(**authinfo) lck_file = os.path.join(data_dir, '.lck') if not os.path.exists(lck_file): os.makedirs(os.path.dirname(lck_file), exist_ok=True) open(lck_file, 'w').close(), data_dir, recursive=True) else: print(f"data downloaded by a different process. waiting") time.sleep(30) BATCH_SIZE = BATCH_SIZE_PER_REPLICA * num_replicas * num_workers print("Now printing data_dir:", data_dir) datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True, data_dir=data_dir) mnist_train, mnist_test = datasets['train'], datasets['test'] print("num_train_examples :", info.splits['train'].num_examples, " num_test_examples: ", info.splits['test'].num_examples) train_dataset = test_dataset = train = shard(train_dataset) test = shard(test_dataset) return train, test, infodef shard(dataset): options = options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA return dataset.with_options(options)def decay(epoch): if epoch < 3: return 1e-3 elif epoch >= 3 and epoch < 7: return 1e-4 else: return 1e-5def get_callbacks(model, checkpoint_dir="/opt/ml/checkpoints"): checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}") class PrintLR(tf.keras.callbacks.Callback): def on_epoch_end(self, epoch, logs=None): print('\nLearning rate for epoch {} is {}'.format(epoch + 1,, flush=True) callbacks = [ tf.keras.callbacks.TensorBoard(log_dir='./logs'), tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix, # save_weights_only=True ), tf.keras.callbacks.LearningRateScheduler(decay), PrintLR() ] return callbacksdef build_and_compile_cnn_model(): print("TF_CONFIG in model:", os.environ.get("TF_CONFIG")) model = tf.keras.Sequential([ tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Flatten(), tf.keras.layers.Dense(64, activation='relu'), tf.keras.layers.Dense(10) ]) model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy']) return model

And, save the following script as

import tensorflow as tfimport argparseimport mnistprint(tf.__version__)parser = argparse.ArgumentParser(description='Tensorflow Native MNIST Example')parser.add_argument('--data-dir', help='location of the training dataset in the local filesystem (will be downloaded if needed)', default='/code/data')parser.add_argument('--data-bckt', help='location of the training dataset in an object storage bucket', default=None)args = parser.parse_args()artifacts_dir, checkpoint_dir, model_dir = mnist.create_dirs()strategy = tf.distribute.MirroredStrategy()print('Number of devices: {}'.format(strategy.num_replicas_in_sync))train_dataset, test_dataset, info = mnist.get_data(data_bckt=args.data_bckt, data_dir=args.data_dir, num_replicas=strategy.num_replicas_in_sync)with strategy.scope(): model = mnist.build_and_compile_cnn_model(), epochs=2, callbacks=mnist.get_callbacks(model, checkpoint_dir)), save_format='tf')

Initialize a distributed-training folder:

At this point you have created a training file (or files) - from the aboveexample. Now, run the command below.

ads opctl distributed-training init --framework tensorflow --version v1

This will download the tensorflow framework and place it inside 'oci_dist_training_artifacts' folder.

Note: Whenever you change the code, you have to build, tag and push the image to repo. This is automatically done in `ads opctl run` cli command.

Containerize your code and build container:

The required python dependencies are provided inside the conda environment file oci_dist_training_artifacts/tensorflow/v1/environments.yaml. If your code requires additional dependency, update this file.

Also, while updating environments.yaml do not remove the existing libraries. You can append to the list.

Update the TAG and the IMAGE_NAME as per your needs -

export IMAGE_NAME=<>export TAG=latestexport MOUNT_FOLDER_PATH=.

Build the container image.

ads opctl distributed-training build-image \ -t $TAG \ -reg $IMAGE_NAME \ -df oci_dist_training_artifacts/tensorflow/v1/Dockerfile \

The code is assumed to be in the current working directory. To override the source code directory, use the -s flag and specify the code dir. This folder should be within the current working directory.

ads opctl distributed-training build-image \ -t $TAG \ -reg $IMAGE_NAME \ -df oci_dist_training_artifacts/tensorflow/v1/Dockerfile \ -s $MOUNT_FOLDER_PATH

If you are behind proxy, ads opctl will automatically use your proxy settings (defined via no_proxy, http_proxy and https_proxy).

Define your workload yaml:

The yaml file is a declarative way to express the workload.In this example, we bring up 1 worker node and 1 chief-worker node.The training code to run is your training code is assumed to be present inside /code directory within the container.Additionally, you can also put any data files inside the same directory(and pass on the location ex /code/data/** as an argument to your training script using runtime->spec->args).

kind: distributedapiVersion: v1.0spec: infrastructure: kind: infrastructure type: dataScienceJob apiVersion: v1.0 spec: projectId: oci.xxxx.<project_ocid> compartmentId: oci.xxxx.<compartment_ocid> displayName: Tensorflow logGroupId: oci.xxxx.<log_group_ocid> subnetId: oci.xxxx.<subnet-ocid> shapeName: VM.GPU2.1 blockStorageSize: 50 cluster: kind: TENSORFLOW apiVersion: v1.0 spec: image: "@image" workDir: "oci://<bucket_name>@<bucket_namespace>/<bucket_prefix>" name: "tf_multiworker" config: env: - name: WORKER_PORT #Optional. Defaults to 12345 value: 12345 - name: SYNC_ARTIFACTS #Mandatory: Switched on by Default. value: 1 - name: WORKSPACE #Mandatory if SYNC_ARTIFACTS==1: Destination object bucket to sync generated artifacts to. value: "<bucket_name>" - name: WORKSPACE_PREFIX #Mandatory if SYNC_ARTIFACTS==1: Destination object bucket folder to sync generated artifacts to. value: "<bucket_prefix>" main: name: "chief" replicas: 1 #this will be always 1. worker: name: "worker" replicas: 1 #number of workers. This is in addition to the 'chief' worker. Could be more than 1 runtime: kind: python apiVersion: v1.0 spec: entryPoint: "/code/" #location of user's training script in the container image. args: #any arguments that the training script requires. - --data-dir # assuming data folder has been bundled in the container image. - /code/data/ env:

Use ads opctl to create the cluster infrastructure and run the workload:

Do a dry run to inspect how the yaml translates to Job and Job Runs

ads opctl run -f train.yaml --dry-run

This will give output similar to this.

-----------------------------Entering dryrun mode----------------------------------Creating Job with payload:kind: jobspec: infrastructure: kind: infrastructure spec: projectId: oci.xxxx.<project_ocid> compartmentId: oci.xxxx.<compartment_ocid> displayName: Tensorflow logGroupId: oci.xxxx.<log_group_ocid> logId:<log_ocid> subnetId: oci.xxxx.<subnet-ocid> shapeName: VM.GPU2.1 blockStorageSize: 50 type: dataScienceJob name: tf_multiworker runtime: kind: runtime spec: entrypoint: null env: - name: WORKER_PORT value: 12345 - name: SYNC_ARTIFACTS value: 1 - name: WORKSPACE value: "<bucket_name>" - name: WORKSPACE_PREFIX value: "<bucket_prefix>" - name: OCI__WORK_DIR value: oci://<bucket_name>@<bucket_namespace>/<bucket_prefix> - name: OCI__EPHEMERAL value: None - name: OCI__CLUSTER_TYPE value: TENSORFLOW - name: OCI__WORKER_COUNT value: '1' - name: OCI__START_ARGS value: '' - name: OCI__ENTRY_SCRIPT value: /code/ image: "<region><tenancy_id>/<repo_name>/<image_name>:<image_tag>" type: container++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++Creating Main Job Run with following details:Name: chiefEnvironment Variables: OCI__MODE:MAIN~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Creating Job Runs with following details:Name: worker_0Environment Variables: OCI__MODE:WORKER-----------------------------Ending dryrun mode----------------------------------

Test Locally:

Before submitting the workload to jobs, you can run it locally to test your code, dependencies, configurations etc.With -b local flag, it uses a local backend. Further when you need to run this workload on OCI data science jobs, simply use -b job flag instead.

ads opctl run -f train.yaml -b local

If your code requires to use any oci services (like object bucket), you need to mount oci keys from your local host machine onto the container. This is already done for you assuming the typical location of oci keys ~/.oci. You can modify it though, in-case you have keys at a different location. You need to do this in the config.ini file.

oci_key_mnt = ~/.oci:/home/oci_dist_training/.oci

Note that the local backend requires the source code for your workload is available locally in the source folder specified in the config.ini file.If you specified Git repository or OCI object storage location as source code location in your workflow YAML, please make sure you have a local copy available for local testing.

Submit the workload:

ads opctl run -f train.yaml -b job

Note:: This will automatically push the docker image to theOCI container registry repo .

Once running, you will see on the terminal outputs similar to the below


jobId: oci.xxxx.<job_ocid>mainJobRunId: mainJobRunIdName: oci.xxxx.<job_run_ocid>workDir: oci://my-bucket@my-namespace/cluster-testing/005otherJobRunIds: - workerJobRunIdName_1: oci.xxxx.<job_run_ocid> - workerJobRunIdName_2: oci.xxxx.<job_run_ocid> - workerJobRunIdName_3: oci.xxxx.<job_run_ocid>

This information can be saved as YAML file and used as input to ads opctl distributed-training show-config -f <info.yaml>.You can use --job-info to save the job run info into YAML, for example:

ads opctl run -f train.yaml --job-info info.yaml

Saving Artifacts to Object Storage Buckets

In case you want to save the artifacts generated by the training process (model checkpoints, TensorBoard logs, etc.) to an object bucketyou can use the ‘sync’ feature. The environment variable OCI__SYNC_DIR exposes the directory location that will be automatically synchronizedto the configured object storage bucket location. Use this directory in your training script to save the artifacts.

To configure the destination object storage bucket location, use the following settings in the workload yaml file(train.yaml).

- name: SYNC_ARTIFACTS value: 1- name: WORKSPACE value: "<bucket_name>"- name: WORKSPACE_PREFIX value: "<bucket_prefix>"

Note: Change SYNC_ARTIFACTS to 0 to disable this feature.Use OCI__SYNC_DIR env variable in your code to save the artifacts. For Example :


Monitoring the workload logs

To view the logs from a job run, you could run -

ads jobs watch oci.xxxx.<job_run_ocid>


You may want to profile your training setup for optimization/performance tuning. Profiling typically provides a detailed analysis of cpu utilization, gpu utilization,top cuda kernels, top operators etc. You can choose to profile your training setup using the native Pytorch profiler or using a third party profiler such as Nvidia Nsights.

Profiling using Tensorflow Profiler

Tensorflow Profiler is a native offering from Tensforflow for Tensorflow performance profiling.

Profiling is invoked using code instrumentation using one of the following apis.

Refer above links for changes that you need to do in your training script for instrumentation.

You should choose the OCI__SYNC_DIR directory to save the profiling logs. For example:

options = tf.profiler.experimental.ProfilerOptions( host_tracer_level=2, python_tracer_level=1, device_tracer_level=1, delay_ms=None)with tf.profiler.experimental.Profile(os.environ.get("OCI__SYNC_DIR") + "/logs",options=options): # training code

In case of keras callback:

tboard_callback = tf.keras.callbacks.TensorBoard(log_dir = os.environ.get("OCI__SYNC_DIR") + "/logs", histogram_freq = 1, profile_batch = '500,520'),callbacks = [tboard_callback])

Also, the sync feature SYNC_ARTIFACTS should be enabled '1' to sync the profiling logs to the configured object storage.

Thereafter, use Tensorboard to view logs. Refer the Tensorboard setup for set-up on your computer.

Profiling using Nvidia Nsights

Nvidia Nsights. is a system wide profiling tool from Nvidia that can be used to profile Deep Learning workloads.

Nsights requires no change in your training code. This works on process level. You can enable this experimental feature in your training setup via the following configuration in the runtime yaml file(highlighted).

 spec: image: "@image" workDir: "oci://@/" name: "tf_multiworker" config: env: - name: WORKER_PORT value: 12345 - name: SYNC_ARTIFACTS value: 1 - name: WORKSPACE value: "<bucket_name>" - name: WORKSPACE_PREFIX value: "<bucket_prefix>" - name: PROFILE value: 1 - name: PROFILE_CMD value: "nsys profile -w true -t cuda,nvtx,osrt,cudnn,cublas -s none -o /opt/ml/nsight_report -x true" main: name: "main" replicas: 1 worker: name: "worker" replicas: 1

Refer this for nsys profile command options. You can modify the command within the PROFILE_CMD but remember this is all experimental. The profiling reports are generated per node. You need to download the reports to your computer manually or via the oci command.

oci os object bulk-download \ -ns <namespace> \ -bn <bucket_name> \ --download-dir /path/on/your/computer \ --prefix path/on/bucket/<job_id>

Note: -bn == WORKSPACE and --prefix path == WORKSPACE_PREFIX/<job_id> , as configured in the runtime yaml file.To view the reports, you would need to install Nsight Systems app from here. Thereafter, open the downloaded reports in the Nsight Systems app.

Other Tensorflow Strategies supported

Tensorflow has two multi-worker strategies: MultiWorkerMirroredStrategy and ParameterServerStrategy.Let’s see changes that you would need to do to run ParameterServerStrategy workload.

You can have the following training Tensorflow script for ParameterServerStrategy saved as like and in case of MultiWorkerMirroredStrategy):

# Script adapted from tensorflow tutorial: osimport tensorflow as tfimport jsonimport multiprocessingNUM_PS = len(json.loads(os.environ['TF_CONFIG'])['cluster']['ps'])global_batch_size = 64def worker(num_workers, cluster_resolver): # Workers need some inter_ops threads to work properly. worker_config = tf.compat.v1.ConfigProto() if multiprocessing.cpu_count() < num_workers + 1: worker_config.inter_op_parallelism_threads = num_workers + 1 for i in range(num_workers): print("cluster_resolver.task_id: ", cluster_resolver.task_id, flush=True) s = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, config=worker_config, protocol="grpc") s.join()def ps(num_ps, cluster_resolver): print("cluster_resolver.task_id: ", cluster_resolver.task_id, flush=True) for i in range(num_ps): s = tf.distribute.Server( cluster_resolver.cluster_spec(), job_name=cluster_resolver.task_type, task_index=cluster_resolver.task_id, protocol="grpc") s.join()def create_cluster(cluster_resolver, num_workers=1, num_ps=1, mode="worker"): os.environ["GRPC_FAIL_FAST"] = "use_caller" if mode.lower() == 'worker': print("Starting worker server...", flush=True) worker(num_workers, cluster_resolver) else: print("Starting ps server...", flush=True) ps(num_ps, cluster_resolver) return cluster_resolver, cluster_resolver.cluster_spec()def decay(epoch): if epoch < 3: return 1e-3 elif epoch >= 3 and epoch < 7: return 1e-4 else: return 1e-5def get_callbacks(model): class PrintLR(tf.keras.callbacks.Callback): def on_epoch_end(self, epoch, logs=None): print('\nLearning rate for epoch {} is {}'.format(epoch + 1,, flush=True) callbacks = [ tf.keras.callbacks.TensorBoard(log_dir='./logs'), tf.keras.callbacks.LearningRateScheduler(decay), PrintLR() ] return callbacksdef create_dir(dir): if not os.path.exists(dir): os.makedirs(dir)def get_artificial_data(): x = tf.random.uniform((10, 10)) y = tf.random.uniform((10,)) dataset =, y)).shuffle(10).repeat() dataset = dataset.batch(global_batch_size) dataset = dataset.prefetch(2) return datasetcluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()if not os.environ["OCI__MODE"] == "MAIN": create_cluster(cluster_resolver, num_workers=1, num_ps=1, mode=os.environ["OCI__MODE"]) passvariable_partitioner = ( tf.distribute.experimental.partitioners.MinSizePartitioner( min_shard_bytes=(256 << 10), max_shards=NUM_PS))strategy = tf.distribute.ParameterServerStrategy( cluster_resolver, variable_partitioner=variable_partitioner)dataset = get_artificial_data()with strategy.scope(): model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)]) model.compile(tf.keras.optimizers.SGD(), loss="mse", steps_per_execution=10)callbacks = get_callbacks(model), epochs=5, steps_per_epoch=20, callbacks=callbacks)

Train.yaml: The only difference here is that the parameter server train.yaml also needs to have ps worker-pool.This will create dedicated instance(s) for Tensorflow Parameter Servers.

Use the following train.yaml:

kind: distributedapiVersion: v1.0spec: infrastructure: kind: infrastructure type: dataScienceJob apiVersion: v1.0 spec: projectId: oci.xxxx.<project_ocid> compartmentId: oci.xxxx.<compartment_ocid> displayName: Distributed-TF logGroupId: oci.xxxx.<log_group_ocid> subnetId: oci.xxxx.<subnet-ocid> shapeName: VM.Standard2.4 blockStorageSize: 50 cluster: kind: TENSORFLOW apiVersion: v1.0 spec: image: "@image" workDir: "oci://<bucket_name>@<bucket_namespace>/<bucket_prefix>" name: "tf_ps" config: env: - name: WORKER_PORT #Optional. Defaults to 12345 value: 12345 - name: SYNC_ARTIFACTS #Mandatory: Switched on by Default. value: 1 - name: WORKSPACE #Mandatory if SYNC_ARTIFACTS==1: Destination object bucket to sync generated artifacts to. value: "<bucket_name>" - name: WORKSPACE_PREFIX #Mandatory if SYNC_ARTIFACTS==1: Destination object bucket folder to sync generated artifacts to. value: "<bucket_prefix>" main: name: "coordinator" replicas: 1 #this will be always 1. worker: name: "worker" replicas: 1 #number of workers; any number > 0 ps: name: "ps" # number of parameter servers; any number > 0 replicas: 1 runtime: kind: python apiVersion: v1.0 spec: spec: entryPoint: "/code/" #location of user's training script in the container image. args: #any arguments that the training script requires. env:

The rest of the steps remain the same and should be followed as it is.

