Creating a Custom TFX Executor
септември 19, 2019
Posted by Kevin HaasZhitao Li, and Robert Crowe on behalf of the TFX team

TensorFlow Extended (TFX) is a platform for creating production-ready ML pipelines. TFX was created by Google and provides the backbone of Google’s ML services and applications, and we’ve been open sourcing TFX for everyone who needs to create production ML pipelines.

A diagram of a custom executor
TFX can be extended and customized in several ways, including developing new components and including them in your pipeline. A TFX pipeline is a series of TFX components, each of which performs a separate task, which are sequenced as a directed acyclic graph (DAG). In this post we’ll present an example to illustrate the process of developing a new TFX component. Watch for more posts that will discuss additional ways to extend and customize TFX.

Scenario

We want to replace the behavior of the existing TFX Trainer component with a new component that includes an executor which submits a job to run the same training on the Google Cloud Platform (GCP). Since the upstream and downstream semantics will remain unchanged, we will reuse the existing Trainer component and replace the behavior of the executor.

Anatomy of a Component

A diagram of a custom executor
TFX components consist of three main pieces:
  • Driver
  • Executor
  • Publisher

Driver and Publisher

The driver supplies metadata to the executor by querying the ML Metadata (MLMD) store and the publisher takes the results of the executor and updates the metadata store. As a developer, you will typically not need to interact with the driver and publisher directly, but messages logged by the driver and publisher may be useful during debugging.

Executor

The executor is where a component performs its processing. As a developer you write code which runs in the executor, based on the requirements of the classes which implement the type of component that you’re working with. For example, when you’re working on a Transform component you will need to develop a preprocessing_fn. Executors consume and create artifacts, which are kept in the metadata store.

Adding a custom executor

Creating a custom executor

To create the custom executor, we start with a copy of the current Trainer executor and then make the modifications to our custom executor to initiate a training job on Google Cloud AI Platform. Much of the basic executor structure will remain the same as the inputs, outputs, and execution parameters will be the same. The changes will be in how the inputs are processed and the outputs are generated. This is achieved by creating a new Executor class that extends tfx.components.base.base_executor.BaseExecutor and implements Do().
class Executor(base_executor.BaseExecutor):
  """Start a trainer job on Google Cloud AI Platform."""

  def Do(self, input_dict,
         output_dict,
         exec_properties):
    """Starts a trainer job on Google Cloud AI Platform.

Don’t forget to test it before moving on to the next step! We have created a convenience script for you to try out your executor before putting it into production. You should write similar code to exercise unit tests for your code. As with any production software deployment, when developing for TFX you should make sure to have good test coverage and a strong CI/CD framework.

Override the executor used by the Trainer component

In order to do this, we will replace the default trainer executor used by TFX with the new custom executor which will create the training job on Google Cloud AI Platform. This is achieved with the optional executor_class component parameter.
from tfx.extensions.google_cloud_ai_platform.trainer
import executor as ai_platform_trainer_executor
...
trainer = Trainer(
    ...,
   custom_executor_spec = executor_spec.ExecutorClassSpec(
         ai_platform_trainer_executor.Executor)
)
That’s it! Now when the Trainer component is called by the workflow engine, it will run the custom executor instead of the default executor, while creating and consuming the same ML Metadata artifacts as the default executor.

Pass the component arguments to your trainer

TFX executors are self-contained binaries focused on running a single step of the ML pipeline. Custom executors require the same three parameters as all other TFX executors: input_dict, output_dict, exec_properties. More details on the semantics of these parameters can be found in the BaseExecutor class.
When processing data flowing through your TFX pipeline you will often typically want to read input data from the artifact URIs in your input_dict, and often you might want to write your output to artifact URIs from your output_dict. This may include reading and writing to more than one split, as in the case of processing with train and eval splits.
from tfx.types import artifact_utils

train_input_examples_uri = artifact_utils.get_split_uri(
  input_dict['my_input_data'], 'train')
eval_input_examples_uri = artifact_utils.get_split_uri(
  input_dict['my_input_data'], 'eval')

train_output_examples_uri = artifact_utils.get_split_uri(
  output_dict['my_output_data'], 'train')
eval_output_examples_uri = artifact_utils.get_split_uri(
  output_dict[‘my_output_data'], 'eval')
In the example above the dictionary keys my_input_data and my_output_data are defined in the ComponentSpec for the component that you’re overriding the executor for.
class MyComponentSpec(tfx.types.ComponentSpec):
  PARAMETERS = {
      <...>
  }
  INPUTS = {
      'my_input_data':   
      ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      'my_output_data':
      ChannelParameter(type=standard_artifacts.Examples),                                                            
  }
The splits are defined in the output Channel for the component that you’re overriding the executor for, typically in the constructor:
output_data = tfx.types.Channel(
  type=standard_artifacts.Examples,
    artifacts=[
      standard_artifacts.Examples(split=split)
      for split in artifact.DEFAULT_EXAMPLE_SPLITS
    ])
spec = LabelingComponentSpec(
  input_data=input_data,
  my_output_data=output_data)
Additional parameters are passed to your custom trainer executor using a custom_config dict. These can be retrieved by the custom executor using exec_properties.get(‘custom_config’).get(‘your_config_key’). In the example below, all of the additional arguments needed to submit a Google Cloud AI Platform training job can be found in _ai_platform_training_args.
_ai_platform_training_args = {
   'pythonModule': None,  # Will be populated by TFX
   'args': None,  # Will be populated by TFX
   'region': _gcp_region,
   'jobDir': os.path.join(_output_bucket, 'tmp'),
   'project': ‘your GCP project id’
}

...
trainer = Trainer(
    ...,
    custom_config={'ai_platform_training_args':
    _ai_platform_training_args})   
}

Linking the custom trainer’s output to the expected output artifact

Hooking up the custom trainer to emit the expected outputs is essential to the success of downstream components. For the Google Cloud AI Platform custom trainer, we serialize the executor input parameters so they can be transmitted as part of the GCP training job. Because the Google Cloud AI Platform (CAIP) executor is redirecting the default TFX executor to run on Google Cloud AI Platform, both take the same {transformed examples, a transform_fn, and a schema} input parameters to create a TF model. The custom executor used in this example submits a CAIP training job that will invoke (via run_executor.py) the default TFX trainer as the CAIP python module, effectively opening a conduit from the local workstation to run the TFX trainer on CAIP.
# Configure Google Cloud AI Platform job
training_inputs = exec_properties.get('custom_config',
    {}).pop('ai_platform_training_args')
executor_class_path = '%s.%s' % 
    (tfx_trainer_executor.Executor.__module__,
    tfx_trainer_executor.Executor.__name__)

# Start Google Cloud AI Platform job
return runner.start_cmle_training(input_dict, output_dict,
    exec_properties, executor_class_path, training_inputs)

Running the pipeline remotely with your custom executor

So far we’ve been assuming that your pipeline is running locally, using code available in your $PYTHONPATH. An upcoming blog post will explain how to execute custom executors packaged in containers, or as PyPI packages.

Related topics

In addition to Trainer, the TFX ExampleGen component also supports executor-level customization. ExampleGen provides a generic component and a base executor which apply ML best practices, e.g. data shuffling and consistent/configurable partitions.
If existing ExampleGen components don’t meet your needs, create a new Apache Beam PTransform for handling the conversion from an input split to TF examples and TFX will do the rest. The ExampleGen doc has more details.

For more information

To learn more about TFX check out the TFX website, join the TFX discussion group, and watch our TFX playlist on YouTube, and subscribe to the TensorFlow channel.
Next post
Creating a Custom TFX Executor

Posted by Kevin HaasZhitao Li, and Robert Crowe on behalf of the TFX team

TensorFlow Extended (TFX) is a platform for creating production-ready ML pipelines. TFX was created by Google and provides the backbone of Google’s ML services and applications, and we’ve been open sourcing TFX for everyone who needs to create production ML pipelines.
TFX can be extended and customized in several ways, incl…