Creating a Custom TFX Component
január 22, 2020
Posted by Ruoyu Liu and Robert Crowe on behalf of the TFX team

TensorFlow Extended (TFX) is a platform for creating production-ready machine learning (ML) pipelines. TFX was created by Google and provides the backbone of Google’s ML services and applications, and now Google has open sourced TFX for anyone who wants to create production ML pipelines.

TFX can be extended and customized in several ways. We previously covered how to change the behavior of a TFX component by using a custom executor. In this post, we will demonstrate how to customize TFX by creating a completely new TFX component and using it a TFX pipeline.

Introduction

TFX provides a collection of standard components that can be linked together to form a standard ML workflow. While this meets the needs of many use cases, there are still some scenarios that standard components do not support. To support those scenarios, TFX can be extended with custom components.

In cases such as in the previous blog post where the upstream and downstream semantics - the inputs and outputs of the component - are the same as an existing component, you can create a new “semi-custom” component by reusing the existing component and replacing the behavior of the executor. The existing component may be one of the standard components, or may be a custom component that you or someone else has created.

If however the upstream and downstream semantics of your new component are not the same as an existing component, then you need to create a new “fully-custom” custom component, which is the topic of this blog post.

The remainder of this post will illustrate how to create a custom component from scratch with a simple HelloWorld component. For simplicity, the HelloWorld component will only replicate all its inputs as its own outputs and make them available to downstream components, to demonstrate consuming and emitting data artifacts.

Updated Pipeline Workflow

Before we start coding, let’s take a look at the updated workflow with the new custom component. As illustrated in Figures 1 and 2 below, we’ll insert the new HelloWorld component between ExampleGen and all downstream components which depends on example data. This implies two facts about the new component:
  • It needs to take the output of ExampleGen as one of its inputs
  • It needs to produce the output of the same type as ExampleGen so that the components that originally depend on ExampleGen will have the same type of input
Figure 1. Before inserting the new custom component
Figure 2. After inserting the new custom component

Building your own custom component

Next we will build the new component step by step.

Channels

TFX Channel is an abstract concept that connects data producers and data consumers. Conceptually a component reads input artifacts from channels and writes output artifacts to channels which will be used by downstream components as inputs. Channels are typed with artifact type (as discussed in the next section), which means all artifacts written to or read from a channel share the same artifact type.

ComponentSpec

The first step is to define the inputs and outputs of the new component as well as other parameters that will be used in component execution. ComponentSpec is the class where we will define this contract with detailed type info. There are three parameters expected:
  • INPUTS: A dictionary of typed parameters for the input artifacts that will be passed into the component executor. Normally input artifacts are the outputs from upstream components and thus share the same type.
  • OUTPUTS: A dictionary of typed parameters for the output artifacts which the component will produce.
  • PARAMETERS: A dictionary of additional ExecutionParameter items that will be passed into the component executor. These are non-artifact parameters that we want to define flexibly in the pipeline DSL and pass into execution.
As discussed in previous section, we need to guarantee that:
  • One of the inputs of the HelloWorld component is the same type as the ExampleGen output, since it is directly passed down by it. As shown in Figure 3, 'input_data' is the spec for it.
  • One of the outputs of the HelloWorld component is the same type as the ExampleGen output, since it will be passed down to downstream components, which originally expected ExampleGen output. As shown in Figure 3, 'output_data' is the spec for it.
In the parameters spec section, only 'name'is declared for demonstration purpose.
class HelloComponentSpec(types.ComponentSpec):
  """ComponentSpec for Custom TFX Hello World Component."""
  # The following declares inputs to the component.
  INPUTS = {
    'input_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  # The following declares outputs from the component.
  OUTPUTS = {
    'output_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  # The following declares extra parameters used to create an instance of
  # this component
  PARAMETERS = {
    'name': ExecutionParameter(type=Text),
  }
Figure 3. ComponentSpec for HelloWorld component.

Executor

Next, let’s write the code for the executor of our new component. As we discussed in the previous post, we will need to create a new subclass of base_executor.BaseExecutor and override its Do function.
class Executor(base_executor.BaseExecutor):
  """Executor for HelloWorld component."""
  ...  
  def Do(self, input_dict: Dict[Text, List[types.Artifact]],
         output_dict: Dict[Text, List[types.Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
    ...
    split_to_instance = {}
    for artifact in input_dict['input_data']:
      for split in json.loads(artifact.split_names):
        uri = os.path.join(artifact.uri, split)
        split_to_instance[split] = uri
    for split, instance in split_to_instance.items():
      input_dir = instance
      output_dir = artifact_utils.get_split_uri(
          output_dict['output_data'], split)
      for filename in tf.io.gfile.listdir(input_dir):
        input_uri = os.path.join(input_dir, filename)
        output_uri = os.path.join(output_dir, filename)
        io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)
Figure 4. Executor for HelloWorld component.
As shown in Figure 4, we can get input and output artifacts and execution properties using the same keys we defined previously in the ComponentSpec. After we have all the needed values, we can go ahead and add more logic using them and write the output into the URI pointed to by the output artifact ('output_data').

Don’t forget to test it before moving on to the next step! We have created a convenience script for you to try out your executor before putting it into production. You should write similar code to exercise unit tests for your code. As with any production software deployment, when developing for TFX you should make sure to have good test coverage and a strong CI/CD framework.

Component interface

Now that we have finished the most complex part, we need to assemble these pieces into a component interface, to enable the component to be used in a pipeline. The process (shown in Figure 5) requires the following steps:
  1. Make the component interface a subclass of base_component.BaseComponent
  2. Assign a class variable SPEC_CLASS with the HelloComponentSpec class we defined earlier
  3. Assign a class variable EXECUTOR_SPEC with the Executor class we defined earlier
  4. Define the __init__() function by using the args to the function to construct an instance of HelloComponentSpec and invoke the super function with the value, along with an optional name
When an instance of the component is created, type checking logic in the base_component.BaseComponent class will be invoked to ensure that the arguments passed in are compatible with the parameter types defined in the HelloComponentSpec class.
from hello_component import executor
class HelloComponent(base_component.BaseComponent):
  """Custom TFX HelloWorld Component."""
  SPEC_CLASS = HelloComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)

  def __init__(self,
               input_data: channel.Channel,
               output_data: channel.Channel,
               name: Text):
    if not output_data:
      examples_artifact = standard_artifacts.Examples()
      examples_artifact.split_names = input_data.get()[0].split_names
      output_data = channel_utils.as_channel([examples_artifact])
    spec = HelloComponentSpec(input_data=input_data,
                              output_data=output_data, name=name)
    super(HelloComponent, self).__init__(spec=spec)
Figure 5. Component interface.

Plugging into the TFX pipeline

Good news! Our brand new component is ready to use after our hard work in the previous sections. Let’s plug it into our Chicago taxi example pipeline. Beside adding an instance of the new component, we also need to:
  • Adjust the parameters when we instantiate components that originally expected the output of ExampleGen to now take the output of our new component
  • Add the new component instance to the components list when constructing the pipeline
Figure 6 highlights these changes. Full example can be found in our GitHub repo.
def _create_pipeline():
  ...
  example_gen = CsvExampleGen(input_base=examples)
  hello = component.HelloComponent(
      input_data=example_gen.outputs['examples'], name=u'HelloWorld')
  statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
  return pipeline.Pipeline(
      ...
      components=[example_gen, hello, statistics_gen],
      ...
  )
Figure 6. Using the new component.

For more information

To learn more about TFX check out the TFX website, join the TFX discussion group, read the TFX blog, and watch our TFX playlist on YouTube, and subscribe to the TensorFlow channel.
Next post
Creating a Custom TFX Component

Posted by Ruoyu Liu and Robert Crowe on behalf of the TFX team

TensorFlow Extended (TFX) is a platform for creating production-ready machine learning (ML) pipelines. TFX was created by Google and provides the backbone of Google’s ML services and applications, and now Google has open sourced TFX for anyone who wants to create production ML pipelines.

TFX can be extended and customized in several w…