Maret 10, 2020 —
Posted by By Reza Rokni, Developer Advocate Google Cloud, on behalf of the TFX and Dataflow teams
TFX core mission is to allow models to be moved from research to production, creating and managing production pipelines. Many models will be built using large volumes of data, requiring multiple hosts working in parallel to serve both the processing and serving needs of your production pipelines.
Us…
AnalyzeAndTransformDataset
and finally via two TFX components ExampleGen
and StatisticsGen
.preprocessing_fn,
please refer back to the tutorial. For now, we just need to know that it is transforming the data points passed into the function. virtualenv tfx-beam --python=python3
source tfx-beam/bin/activate
pip install tfx
def main():
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
transformed_dataset, transform_fn = (
(raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset
print('\nRaw data:\n{}\n'.format(pprint.pformat(raw_data)))
print('Transformed data:\n{}'.format(pprint.pformat(transformed_data)))
if __name__ == '__main__':
main()
NOTE:result = pass_this | 'name this step' >> to_this_call
The method to_this_call
is being invoked and passed the object called pass_this
, and this operation will be referred to as name this step
in a stack trace. beam_impl.Context
within a beam.Pipeline
. This gives the ability to pass in arguments, for example "--runner
". For a quick local test you can run the sample below with --runner
set to DirectRunner.import apache_beam as beam
argv=['--runner=DirectRunner']
def main():
with beam.Pipeline(argv=argv) as p:
# Ignore the warnings
with beam_impl.Context(temp_dir=tempfile.mkdtemp()):
input = p | beam.Create(raw_data)
transformed_dataset, transform_fn = (
(input, raw_data_metadata)
| beam_impl.AnalyzeAndTransformDataset(preprocessing_fn))
transformed_dataset[0] |"Print Transformed Dataset" >> beam.Map(print)
if __name__ == '__main__':
main()
Next, we will switch to using the Dataflow Runner. Since Dataflow is a fully managed runner working on Google Cloud, we will need to provide the pipeline with some environmental information. This includes the Google Cloud project and locations for the temporary files used by the pipeline.# Setup our Environment
## The location of Input / Output between various stages ( TFX Components )
## This will also be the location for the Metadata
### Can be used when running the pipeline locally
#LOCAL_PIPELINE_ROOT =
### In production you want the input and output to be stored on non-local location
#GOOGLE_CLOUD_STORAGE_PIPELINE_ROOT=
#GOOGLE_CLOUD_PROJECT =
#GOOGLE_CLOUD_TEMP_LOCATION =
# Will need setup.py to make this work with Dataflow
#
# import setuptools
#
# setuptools.setup(
# name='demo',
# version='0.0',
# install_requires=['tfx==0.21.1'],
# packages=setuptools.find_packages(),)
SETUP_FILE = "./setup.py"
argv=['--project={}'.format(GOOGLE_CLOUD_PROJECT),
'--temp_location={}'.format(GOOGLE_CLOUD_TEMP_LOCATION),
'--setup_file={}'.format(SETUP_FILE),
'--runner=DataflowRunner']
def main():
with beam.Pipeline(argv=argv) as p:
with beam_impl.Context(temp_dir=GOOGLE_CLOUD_TEMP_LOCATION):
input = p | beam.Create(raw_data)
transformed_data, transformed_metadata = (
(input, raw_data_metadata)
| beam_impl.AnalyzeAndTransformDataset(preprocessing_fn))
if __name__ == '__main__':
main()
To get a feel for how much work TFX has abstracted away, below is a visual representation of the graph that the pipeline processed. We had to shrink the image to fit it all in as there are a lot of transforms!def createExampleGen(query: Text):
# Output 2 splits: train:eval=3:1.
output = example_gen_pb2.Output(
split_config=example_gen_pb2.SplitConfig(splits=[
example_gen_pb2.SplitConfig.Split(
name='train', hash_buckets=3),
example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
]))
return BigQueryExampleGen(query=query, output_config=output)
As well as the SQL query to be run, BigQueryExampleGen code is also being passed configuration information via the SplitConfig object.
bigquery-public-data.chicago_taxi_trips.taxi_trips.
query="""
SELECT
pickup_community_area,
fare,
EXTRACT(MONTH FROM trip_start_timestamp) trip_start_month,
EXTRACT(HOUR FROM trip_start_timestamp) trip_start_hour,
EXTRACT(DAYOFWEEK FROM trip_start_timestamp) trip_start_day,
UNIX_Millis(trip_start_timestamp) trip_start_ms_timestamp,
pickup_latitude,
pickup_longitude,
dropoff_latitude,
dropoff_longitude,
trip_miles,
pickup_census_tract,
dropoff_census_tract,
payment_type,
company,
trip_seconds,
dropoff_community_area,
tips
FROM
`bigquery-public-data.chicago_taxi_trips.taxi_trips`
LIMIT 100
"""
Note the use of LIMIT 100, which will limit the output to 100 records, allowing us to quickly test out our code for correctness.
def createStatisticsGen(bigQueryExampleGen: BigQueryExampleGen):
# Computes statistics over data for visualization and example validation.
return StatisticsGen(examples=bigQueryExampleGen.outputs['examples'])
As the output of ExampleGen is required by StatisticsGen, we now have a dependency between the two steps. This producer-consumer pattern is seen throughout most production ML pipelines. To automate this pipeline, we will need something that coordinates these dependencies.
BeamDagRunner
. This means we are using Beam in two different roles - as an execution engine for processing data, and as an orchestrator for sequencing the TFX tasks.
# Used for setting up the orchestration
from tfx.orchestration import pipeline
from tfx.orchestration import metadata
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
The following code creates our pipeline object ready to be executed by the BeamDagRunner. from typing import Text
from typing import Type
def createTfxPipeline(pipeline_name: Text, pipeline_root: Text, query: Text,
beam_pipeline_args) -> pipeline.Pipeline:
output = example_gen_pb2.Output(
# Output 2 splits: train:eval=3:1.
split_config=example_gen_pb2.SplitConfig(splits=[
example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=3),
example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
]))
# Brings data into the pipeline or otherwise joins/converts training data.
example_gen = BigQueryExampleGen(query=query, output_config=output)
# Computes statistics over data for visualization and example validation.
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
return pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=[
example_gen, statistics_gen
],
metadata_connection_config=metadata.sqlite_metadata_connection_config(
os.path.join(".", 'metadata', pipeline_name,'metadata.db')),
enable_cache=False,
additional_pipeline_args=beam_pipeline_args)
To test the code make use of the query using "LIMIT 100
" via the local DirectRunner. tfx_pipeline = createTfxPipeline(
pipeline_name="my_first_directRunner_pipeline",
pipeline_root=LOCAL_PIPELINE_ROOT,
query=query,
beam_pipeline_args= {
'beam_pipeline_args':[
'--project={}'.format(GOOGLE_CLOUD_PROJECT),
'--runner=DirectRunner']})
BeamDagRunner().run(tfx_pipeline)
You can see the results produced by using tfdv with the output to LOCAL_PIPELINE_ROOT
; import os
import tensorflow_data_validation as tfdv
stats = tfdv.load_statistics(os.path.join(LOCAL_PIPELINE_ROOT,"StatisticsGen","statistics","","train","stats_tfrecord"))
tfdv.visualize_statistics(stats)
That works fine for one hundred records, but what if the goal was to process all 187,002,0025 rows in the dataset? For this, the pipeline is switched from the DirectRunner to the production Dataflow runner. A few extra environment parameters are also set, for example the Google Cloud project to run the pipeline in. tfx_pipeline = createTfxPipeline(
pipeline_name="my_first_dataflowRunner_pipeline",
pipeline_root=GOOGLE_CLOUD_STORAGE_PIPELINE_ROOT,
query=query,
beam_pipeline_args={
'beam_pipeline_args':[
'--project={}'.format(GOOGLE_CLOUD_PROJECT)
,
'--temp_location={}'.format(GOOGLE_CLOUD_TEMP_LOCATION),
'--setup_file=./setup.py',
'--runner=DataflowRunner']})
BeamDagRunner().run(tfx_pipeline)
The BeamDagRunner
takes care of submitting ExampleGen
and StatisticsGen
as separate pipelines, ensuring ExampleGen completed successfully first before starting StatisticsGen. The Dataflow service automatically takes care of spinning up workers, autoscaling, retries in the event of worker failure, centralized logging, and monitoring. Autoscaling is based on various signals including throughput rate, illustrated below; The Dataflow monitoring console shows us various metrics about the pipeline, for example, the CPU utilization of the workers. Below we see the utilization of machines as they come on-line, consistently high with most workers over 90%: Apache Beam supports custom counters, which allows developers to create metrics from within their pipelines. The TFX team has used this to create useful information counters for the various components. Below we can see some of the counters recorded during the StatisticsGen run. Filtering for the key word "num_*_feature
", there were roughly a billion integers and float features values.
Maret 10, 2020
—
Posted by By Reza Rokni, Developer Advocate Google Cloud, on behalf of the TFX and Dataflow teams
TFX core mission is to allow models to be moved from research to production, creating and managing production pipelines. Many models will be built using large volumes of data, requiring multiple hosts working in parallel to serve both the processing and serving needs of your production pipelines.
Us…