https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhm7q6rv8tiNn0M6Xla7qTF_hveSNLWGZ1SIYu7i-H0ddTuMhJDZVFSqQ27MOgrxa_RAB-QTKpSfhKvqFxUOWEQ8fYzJ_W6PX4VOSEDBqOzGboQTZc0rzxEuV1-TotV4yruTR4SnDBmcgQ/s1600/custom-comp-figure1.png
Posted by
Ruoyu Liu and
Robert Crowe on behalf of the TFX team
TensorFlow Extended (
TFX) is a platform for creating production-ready machine learning (ML) pipelines. TFX was created by Google and provides the backbone of Google’s ML services and applications, and now Google has open sourced TFX for anyone who wants to create production ML pipelines.
TFX can be extended and customized in several ways. We previously covered how to change the behavior of a
TFX component by using a
custom executor. In this post, we will demonstrate how to customize TFX by creating a
completely new TFX component and using it a TFX pipeline.
Introduction
TFX provides a collection of
standard components that can be linked together to form a standard ML workflow. While this meets the needs of many use cases, there are still some scenarios that standard components do not support. To support those scenarios, TFX can be extended with custom components.
In cases such as in
the previous blog post where the upstream and downstream semantics - the inputs and outputs of the component - are the same as an existing component, you can create a new “semi-custom” component by reusing the existing component and replacing the behavior of the executor. The existing component may be one of the standard components, or may be a custom component that you or someone else has created.
If however the upstream and downstream semantics of your new component are not the same as an existing component, then you need to create a new “fully-custom” custom component, which is the topic of this blog post.
The remainder of this post will illustrate how to create a custom component from scratch with a simple
HelloWorld component
. For simplicity, the HelloWorld component will only replicate all its inputs as its own outputs and make them available to downstream components, to demonstrate consuming and emitting data artifacts.
Updated Pipeline Workflow
Before we start coding, let’s take a look at the updated workflow with the new custom component. As illustrated in Figures 1 and 2 below, we’ll insert the new HelloWorld component between
ExampleGen
and all downstream components which depends on example data. This implies two facts about the new component:
- It needs to take the output of ExampleGen as one of its inputs
- It needs to produce the output of the same type as ExampleGen so that the components that originally depend on ExampleGen will have the same type of input
|
Figure 1. Before inserting the new custom component |
|
Figure 2. After inserting the new custom component |
Building your own custom component
Next we will build the new component step by step.
Channels
TFX Channel is an abstract concept that connects data producers and data consumers. Conceptually a component reads input artifacts from channels and writes output artifacts to channels which will be used by downstream components as inputs. Channels are typed with artifact type (as discussed in the next section), which means all artifacts written to or read from a channel share the same artifact type.
ComponentSpec
The first step is to define the inputs and outputs of the new component as well as other parameters that will be used in component execution.
ComponentSpec
is the class where we will define this contract with detailed type info. There are three parameters expected:
INPUTS
: A dictionary of typed parameters for the input artifacts that will be passed into the component executor. Normally input artifacts are the outputs from upstream components and thus share the same type.
OUTPUTS
: A dictionary of typed parameters for the output artifacts which the component will produce.
PARAMETERS
: A dictionary of additional ExecutionParameter
items that will be passed into the component executor. These are non-artifact parameters that we want to define flexibly in the pipeline DSL and pass into execution.
As discussed in previous section, we need to guarantee that:
- One of the inputs of the
HelloWorld
component is the same type as the ExampleGen
output, since it is directly passed down by it. As shown in Figure 3, 'input_data'
is the spec for it.
- One of the outputs of the
HelloWorld
component is the same type as the ExampleGen
output, since it will be passed down to downstream components, which originally expected ExampleGen output. As shown in Figure 3, 'output_data'
is the spec for it.
In the parameters spec section, only
'name'
is declared for demonstration purpose.
class HelloComponentSpec(types.ComponentSpec):
"""ComponentSpec for Custom TFX Hello World Component."""
# The following declares inputs to the component.
INPUTS = {
'input_data': ChannelParameter(type=standard_artifacts.Examples),
}
# The following declares outputs from the component.
OUTPUTS = {
'output_data': ChannelParameter(type=standard_artifacts.Examples),
}
# The following declares extra parameters used to create an instance of
# this component
PARAMETERS = {
'name': ExecutionParameter(type=Text),
}
|
Figure 3. ComponentSpec for HelloWorld component. |
Executor
Next, let’s write the code for the executor of our new component. As we discussed in the previous post, we will need to create a new subclass of
base_executor.BaseExecutor
and override its
Do
function.
class Executor(base_executor.BaseExecutor):
"""Executor for HelloWorld component."""
...
def Do(self, input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
exec_properties: Dict[Text, Any]) -> None:
...
split_to_instance = {}
for artifact in input_dict['input_data']:
for split in json.loads(artifact.split_names):
uri = os.path.join(artifact.uri, split)
split_to_instance[split] = uri
for split, instance in split_to_instance.items():
input_dir = instance
output_dir = artifact_utils.get_split_uri(
output_dict['output_data'], split)
for filename in tf.io.gfile.listdir(input_dir):
input_uri = os.path.join(input_dir, filename)
output_uri = os.path.join(output_dir, filename)
io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)
|
Figure 4. Executor for HelloWorld component. |
As shown in
Figure 4, we can get input and output artifacts and execution properties using the same keys we defined previously in the
ComponentSpec
. After we have all the needed values, we can go ahead and add more logic using them and write the output into the URI pointed to by the output artifact (
'output_data'
).
Don’t forget to test it before moving on to the next step! We have created a convenience
script for you to try out your executor before putting it into production. You should write similar code to exercise unit tests for your code. As with any production software deployment, when developing for TFX you should make sure to have good test coverage and a strong CI/CD framework.
Component interface
Now that we have finished the most complex part, we need to assemble these pieces into a component interface, to enable the component to be used in a pipeline. The process (shown in
Figure 5) requires the following steps:
- Make the component interface a subclass of
base_component.BaseComponent
- Assign a class variable
SPEC_CLASS
with the HelloComponentSpec
class we defined earlier
- Assign a class variable
EXECUTOR_SPEC
with the Executor
class we defined earlier
- Define the
__init__()
function by using the args to the function to construct an instance of HelloComponentSpec
and invoke the super function with the value, along with an optional name
When an instance of the component is created, type checking logic in the
base_component.BaseComponent
class will be invoked to ensure that the arguments passed in are compatible with the parameter types defined in the
HelloComponentSpec
class.
from hello_component import executor
class HelloComponent(base_component.BaseComponent):
"""Custom TFX HelloWorld Component."""
SPEC_CLASS = HelloComponentSpec
EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)
def __init__(self,
input_data: channel.Channel,
output_data: channel.Channel,
name: Text):
if not output_data:
examples_artifact = standard_artifacts.Examples()
examples_artifact.split_names = input_data.get()[0].split_names
output_data = channel_utils.as_channel([examples_artifact])
spec = HelloComponentSpec(input_data=input_data,
output_data=output_data, name=name)
super(HelloComponent, self).__init__(spec=spec)
|
Figure 5. Component interface. |
Plugging into the TFX pipeline
Good news! Our brand new component is ready to use after our hard work in the previous sections. Let’s plug it into our Chicago taxi example
pipeline. Beside adding an instance of the new component, we also need to:
- Adjust the parameters when we instantiate components that originally expected the output of
ExampleGen
to now take the output of our new component
- Add the new component instance to the components list when constructing the pipeline
Figure 6 highlights these changes. Full example can be found in our
GitHub repo.
def _create_pipeline():
...
example_gen = CsvExampleGen(input_base=examples)
hello = component.HelloComponent(
input_data=example_gen.outputs['examples'], name=u'HelloWorld')
statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
return pipeline.Pipeline(
...
components=[example_gen, hello, statistics_gen],
...
)
|
Figure 6. Using the new component. |
For more information
To learn more about TFX check out the
TFX website, join the
TFX discussion group, read the
TFX blog, and watch our
TFX playlist on YouTube, and
subscribe to the TensorFlow channel.