Continuous Adaptation for Machine Learning System to Data Changes
December 01, 2021

A guest post by Chansung Park, Sayak Paul (ML-GDEs)

Continuous integration and delivery (CI/CD) is a much sought-after topic in the DevOps domain. In the MLOps (Machine Learning + Operations) domain, we have another form of continuity -- continuous evaluation and retraining. MLOps systems evolve according to the changes of the world, and that is usually caused by data/concept drift. So, to cater to the data changes we need to continuously evaluate our deployed ML models and retrain and re-deploy them as necessary.

In this blog post, we present a project that implements a workflow combining batch prediction and model evaluation for continuous evaluation retraining In order to capture changes in the data. We will first discuss the general setup of the project. Then we will move on to key components (batch prediction, new data spans, retraining, etc.) that are important for continuously evaluating an ML model and then re-training it if needed. Rather than discussing the technical implementation details of the project, we will keep it high-level so that we will focus on understanding the underlying concepts.

The project is implemented with TensorFlow Extended (TFX), Keras, and various services offered from Google Cloud Platform. You can find the project on GitHub.

Overview

This project shows how to build two separate pipelines working together to create a CI/CD workflow which responds to changes in the data. The first pipeline is for model training, and the second pipeline is for model evaluation based on the result of a batch prediction as shown in Figure 1.

Figure 1. Overview of the project structure (original)

The model training pipeline is built by combining standard TFX components such as ImportExampleGen and Trainer with custom TFX components such as VertexUploader and VertexDeployer. Since the Pusher standard component had an issue when we were doing this project, we have brought custom components from our previous project, Dual Deployments.

There is one significant implementation detail on how ImportExampleGen handles the dataset to be fed into the model. We have designed our project to hold datasets from different distributions in separate folders with filesystem paths which indicate the span number. For instance, the initial training and test dataset can be stored in SPAN-1/train and SPAN-2/test while the drifted dataset can be stored in SPAN-2/train and SPAN-2/test respectively as shown in Figure 2.

For the sake of the versioning feature in Google Cloud Storage (GCS), you might think we don’t need to manage datasets in this manner. However, we thought our way makes datasets much more manageable. For example, you might want to pick data from SPAN-1 and SPAN-2 or SPAN-1 and SPAN-3 to train the model depending on situations. Also, datasets belonging to the same distribution can still benefit from the versioning feature in GCS.

Figure 2. How datasets are managed (original)

The batch evaluation pipeline does not leverage any standard TFX components. Rather it consists of five custom TFX components which are FileListGen, BatchPredictionGen, PerformanceEvaluator, SpanPreparator, and PipelineTrigger. These components are available as standalone modules here.

Figure 3. Custom TFX components in batch evaluation pipeline (original)

FileListGen generates a text file to be looked up by the currently deployed model on Vertex AI to perform batch prediction according to the format required by Vertex Prediction. Then BatchPredictionGen will simply perform Vertex Prediction based on the prepared text file from the FileListGen and output a set of files containing the batch prediction results. PerformanceEvaluator calculates the average accuracy based on the batch prediction results and outputs False if it is less than the threshold. If the output is True, the pipeline will be terminated. Or if the output is False, SpanPreparator prepares TFRecord files by compressing the list of raw data, and then puts those TFRecords into a new folder whose name contains the successive span number such as span-2. Finally, PipelineTrigger triggers the model training pipeline by passing the span numbers for the data which should be included for training the model through RuntimeParameter.

General setup

In this section, we walk through the key components of the project and also leave some notes on the tools we used to implement them.

Getting the initial model ready

We focus on the concepts and consider implementing them in a minimal manner so that our implementations are as reproducible and as accessible as possible. Keeping that in mind, we use the CIFAR-10 training set as our training data and we fine-tune a ResNet50 model to fit the data. Our training pipeline is demonstrated in this notebook.

Simulating data drift and labeling new data

To simulate a data drift scenario, we then collect a bunch of images from the internet matching CIFAR-10 classes. To make it easy to follow we implement this workflow inside a Colab Notebook which is available here. This workflow also includes uploading and deploying the trained model as a service on the Vertex AI platform.

Continuous evaluation with batch inference

We then perform inference on these images with the trained model from the above step. We perform batch inference rather than online inference to get the results. We use Vertex AI’s batch prediction service to realize this. In practice, usually after this step, the model test images and model predictions are sent to domain experts for audit purposes. They also provide the expected ground-truth labels on the test images. Only after that, we can validate the prediction results. But for the purpose of this project, we eliminate this step and pretend that the ground-truth labels are already available. So, as soon as the batch prediction results are available we evaluate them. This entire workflow is covered in this notebook.

We deploy a Cloud Function to monitor a specific location inside a Google Cloud Storage (GCS) bucket. If there is a sufficient number of new test images available inside that location, we trigger the batch prediction pipeline. We cover this workflow in this notebook. This is how we achieve the “continuous evaluation” aspect of our project.

There are other ways to capture drift in data, though. For example, using JS-Divergence, we can compare the distributions between the newly available data and training data. You can follow this Coursera lecture from Robert Crowe which dives deep into these techniques.

Model retraining

After the batch predictions are evaluated, the next step is to determine if we need to re-train the model based on a predefined performance threshold that generally depends on the business context and a lot of other factors. We set this threshold to 0.9 in the project. If we need to re-train then we trigger the same model training pipeline (as shown in this notebook) but with the newly available data added to the CIFAR-10 training set. We can either warm-start our model from a previous checkpoint or we can train the model from scratch using all the available training data. For this project, we do the latter.

In the following section, we will go over a few non-trivial components from our implementation and discuss their motivation and technicalities. As a reminder, our implementation is fully open-sourced here.

Implementation details on managing datasets with span numbers

In this section, we walk through the implementation details on some key aspects of the project. Please go through the project repository and review all notebooks for further information.

The initial CIFAR-10 datasets are stored in {bucket-name}/span-1/train and {bucket-name}/span-1/test GCS locations respectively. This step is done through the first notebook. Then, we download more images of the same categories as in CIFAR-10 by using Bing Image Downloader. Those images are resized by 32x32 to make them compatible with CIFAR-10 datasets, and they are stored in a separate GCS bucket such as {bucket-batch-prediction}/2021-10/.

Note we used the YYYY-MM for the name where the images are stored. This is because Cloud Function which is fired by Cloud Scheduler will look for the latest GCS location to launch the batch evaluation pipeline as shown below.

def get_latest_directory(storage_client, bucket):
    blobs = storage_client.list_blobs(bucket)

    folders = list(
        set(
            [
                os.path.dirname(blob.name)
                for blob in blobs
                if bool(
                    re.match(
                        "[1-9][0-9][0-9][0-9]-[0-1][0-9]", os.path.dirname(blob.name)
                    )
                )
                is True
            ]
        )
    )

    folders.sort(key=lambda date: datetime.strptime(date, "%Y-%m"))
    return folders[0]

As you see, it only looks for the GCS location that exactly matches the YYYY-MM format. The Cloud Function launches the batch evaluation pipeline by passing which GCS location to look up for batch prediction via RuntimeParameter. The code snippet below shows how it is passed to the pipeline with the name data_gcs_prefix on the Cloud Function side.

from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(project_id=project, region=region)

response = api_client.create_run_from_job_spec(
     ...
     parameter_values={"data_gcs_prefix": latest_directory},
)

The pipeline recognizes data_gcs_prefix is a type of RuntimeParameter, and it is used in the FileListGen component which prepares a text file in the required format to perform Vertex AI Batch Prediction.

def _create_pipeline(
    data_gcs_prefix: data_types.RuntimeParameter,
    ...
) -> Pipeline:

   filelist_gen = FileListGen(
        ...
        gcs_source_bucket=data_gcs_bucket,
        gcs_source_prefix=data_gcs_prefix,
    ).with_id("filelist_gen")

    ....

Let’s skip the batch prediction performed by the BatchPredictionGen component.

When the PerformanceEvaluator component determines that retraining should be performed based on the result from the BatchPredictionGen component, the SpanPreparator prepares a TFRecord file with the newly collected images, moves it to {bucket-name}/span-1/train and {bucket-name}/span-2/test where the training pipeline is ingesting data for model training, and renames the GCS location where the newly collected images are to {bucket-batch-prediction}/YYYY-MM_old/.

We add the _old suffix so that Cloud Function will ignore the renamed GCS location. If the retrained model doesn’t show a good enough performance metric, then you can have a chance to collect more data and merge them with the images in the _old GCS location.

The PipelineTrigger component at the end of the batch evaluation pipeline will trigger the training pipeline by passing which span numbers to look for in order to do model training. The data will be consumed by ImportExampleGen, based on the glob pattern matching feature. For instance, if data from span-1 and span-2 should be used for model training, then the glob pattern for the training dataset might be span-[12]/train/*.tfrecord. The code snippet below clearly shows the generalized version of the idea.

response = api_client.create_run_from_job_spec(
 ...
 parameter_values={
  "input-config": json.dumps(
      {
         "splits": [
             {
                "name": "train",
                "pattern": f"span-[{int(latest_span)-1}{latest_span}]/train/*.tfrecord",
             },
             {
                "name": "val",
                "pattern": f"span-[{int(latest_span)-1}{latest_span}]/test/*.tfrecord",
             },
        ]
      }
  ),
  "output-config": json.dumps({}),
 },
)

The reason we formed the RuntimeParameter in the parameter_values in this way is that the pattern matching feature of the ImportExampleGen component should be specified in the input-config and output-config parameters. We do not need the output-config parameter for our purpose, but it is required when passing the input-config parameter as a RuntimeParameter. That’s why the output-config parameter is left empty. Note that you have to form the parameter in protocol buffer format when using RuntimeParameter for standard TFX components. The code below shows how the passed input-config and output-config can be consumed by the ImportExampleGen component.

example_gen = tfx.components.ImportExampleGen(
     input_base=data_root, input_config=input_config, output_config=output_config
)

It is worth noting that you can leverage the rolling window feature supported by TFX with the standard components if the backend environment is Kubeflow Pipeline v1. The code snippet below shows how to achieve this with the CsvExampleGen component and a Resolver node.

examplegen_range_config = proto.RangeConfig(
     static_range=proto.StaticRange(
         start_span_number=2, end_span_number=2))

example_gen = tfx.components.CsvExampleGen(
     input_base=data_root,
     input_config=examplegen_input_config,
     range_config=examplegen_range_config)

resolver_range_config = proto.RangeConfig(
     rolling_range=proto.RollingRange(num_spans=2))

examples_resolver = tfx.dsl.Resolver(
     strategy_class=tfx.dsl.experimental.SpanRangeStrategy,
     config={
         'range_config': resolver_range_config
     },
     examples=tfx.dsl.Channel(
         type=tfx.types.standard_artifacts.Examples,
         producer_component_id=example_gen.id)).with_id('span_resolver')

This is a much better way since it reuses the artifacts generated by the previous ExampleGens, and the current pipeline run only takes care of the data in the new span. Unfortunately however this feature is not supported by Vertex AI Pipeline which is based on Kubeflow Pipeline v2. We had an extensive discussion with the TFX team about this, which is why we came up with a different approach from the standard way.

Cost

Vertex AI Training is a separate service from Pipeline. We need to pay for the Vertex AI Pipeline individually, and at the time of writing this article, it costs about $0.03 USD per pipeline run. The type of compute instance for each TFX component was e2-standard-4, and it costs about $0.134 per hour. Since the whole pipeline took less than an hour to be finished, we can estimate that the total cost was about $0.164 for a Vertex AI Pipeline run.

The cost of custom model training depends on the type of machine and the number of hours. Also, you have to consider that you pay for the server and the accelerator separately. For this project, we chose n1-standard-4 machine type whose price is $0.19 per hour and NVIDIA_TESLA_K80 accelerator type whose price is $0.45 per hour. The training for each model was done in less than an hour, so it cost about $1.28 in total. So, as per our estimates, the upper bound of the costs incurred should not be more than $5.

The cost only stems from Vertex AI because the rest of the components like Pub/Sub, Cloud Functions, etc., have very minimal usage. So even if we add a small estimate for those costs, the upper bound of the total cost for this project should not be more than $5. Please refer to the official documents on the price: Vertex AI price reference, Cloud Build price reference.

In any case, you should use this GCP Price Calculator to get a better understanding of how your cost for the GCP services might differ.

Summary

In this blog post, we touched upon the idea of continuous evaluation and re-training for machine learning systems as well as the tooling needed to implement them. There is also a more traditional form of CI/CD for ML systems in response to code changes including changes in hyperparameters, model architecture, etc. We have a separate project demonstrating that use case. You are encouraged to check them here: Part I and Part II.

Acknowledgements

We are grateful to the ML-GDE program that provided GCP credits for supporting our experiments. We sincerely thank Robert Crowe and Jiayi Zhao of Google for their help with the review.

Next post
Continuous Adaptation for Machine Learning System to Data Changes

A guest post by Chansung Park, Sayak Paul (ML-GDEs) Continuous integration and delivery (CI/CD) is a much sought-after topic in the DevOps domain. In the MLOps (Machine Learning + Operations) domain, we have another form of continuity -- continuous evaluation and retraining. MLOps systems evolve according to the changes of the world, and that is usually caused by data/concept drift. So, to cater …