March 10, 2020 —
                                          
Posted by By Reza Rokni, Developer Advocate Google Cloud, on behalf of the TFX and Dataflow teams
TFX core mission is to allow models to be moved from research to production, creating and managing production pipelines. Many models will be built using large volumes of data, requiring multiple hosts working in parallel to serve both the processing and serving needs of your production pipelines. 
Us…


AnalyzeAndTransformDataset and finally via two TFX components ExampleGen and StatisticsGen.preprocessing_fn, please refer back to the tutorial. For now, we just need to know that it is transforming the data points passed into the function. virtualenv tfx-beam --python=python3
source tfx-beam/bin/activate
pip install tfxdef main():
  with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    transformed_dataset, transform_fn = (
        (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
            preprocessing_fn))
  transformed_data, transformed_metadata = transformed_dataset
  print('\nRaw data:\n{}\n'.format(pprint.pformat(raw_data)))
  print('Transformed data:\n{}'.format(pprint.pformat(transformed_data)))
if __name__ == '__main__':
  main()result = pass_this | 'name this step' >> to_this_callto_this_call is being invoked and passed the object called pass_this, and this operation will be referred to as name this step in a stack trace. beam_impl.Context within a beam.Pipeline. This gives the ability to pass in arguments, for example "--runner".  For a quick local test you can run the sample below with --runner set to DirectRunner.import apache_beam as beam
argv=['--runner=DirectRunner']
def main():
     with beam.Pipeline(argv=argv) as p:
       # Ignore the warnings
       with beam_impl.Context(temp_dir=tempfile.mkdtemp()):  
         input = p | beam.Create(raw_data)  
         transformed_dataset, transform_fn = (  
             (input, raw_data_metadata)
            | beam_impl.AnalyzeAndTransformDataset(preprocessing_fn))
         transformed_dataset[0] |"Print Transformed Dataset" >>  beam.Map(print)
     
if __name__ == '__main__':
  main()# Setup our Environment
## The location of Input / Output between various stages ( TFX Components )
## This will also be the location for the Metadata 
### Can be used when running the pipeline locally
#LOCAL_PIPELINE_ROOT =
### In production you want the input and output to be stored on non-local location
#GOOGLE_CLOUD_STORAGE_PIPELINE_ROOT=
#GOOGLE_CLOUD_PROJECT = 
#GOOGLE_CLOUD_TEMP_LOCATION = 
# Will need setup.py to make this work with Dataflow
#
# import setuptools
#
# setuptools.setup(
#   name='demo',
#   version='0.0',
#   install_requires=['tfx==0.21.1'],
#   packages=setuptools.find_packages(),)
SETUP_FILE = "./setup.py"
argv=['--project={}'.format(GOOGLE_CLOUD_PROJECT),
      '--temp_location={}'.format(GOOGLE_CLOUD_TEMP_LOCATION),
      '--setup_file={}'.format(SETUP_FILE),
      '--runner=DataflowRunner']
def main():
    with beam.Pipeline(argv=argv) as p:
        with beam_impl.Context(temp_dir=GOOGLE_CLOUD_TEMP_LOCATION):
            input = p | beam.Create(raw_data) 
            transformed_data, transformed_metadata = (
                (input, raw_data_metadata)
                | beam_impl.AnalyzeAndTransformDataset(preprocessing_fn))
if __name__ == '__main__':
  main()    
def createExampleGen(query: Text):
    # Output 2 splits: train:eval=3:1.
    output = example_gen_pb2.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 example_gen_pb2.SplitConfig.Split(
                                 name='train', hash_buckets=3),
                 example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
    return BigQueryExampleGen(query=query, output_config=output)bigquery-public-data.chicago_taxi_trips.taxi_trips.  
query="""
SELECT
pickup_community_area,
  fare,
  EXTRACT(MONTH FROM trip_start_timestamp)  trip_start_month,
  EXTRACT(HOUR FROM trip_start_timestamp)  trip_start_hour,
  EXTRACT(DAYOFWEEK FROM trip_start_timestamp)  trip_start_day,
  UNIX_Millis(trip_start_timestamp) trip_start_ms_timestamp,
  pickup_latitude,
  pickup_longitude,
  dropoff_latitude,
  dropoff_longitude,
  trip_miles,
  pickup_census_tract,
  dropoff_census_tract,
  payment_type,
  company,
  trip_seconds,
  dropoff_community_area,
  tips
FROM
  `bigquery-public-data.chicago_taxi_trips.taxi_trips`
LIMIT 100
"""def createStatisticsGen(bigQueryExampleGen: BigQueryExampleGen):
    # Computes statistics over data for visualization and example validation.
    return StatisticsGen(examples=bigQueryExampleGen.outputs['examples'])BeamDagRunner. This means we are using Beam in two different roles - as an execution engine for processing data, and as an orchestrator for sequencing the TFX tasks.  
# Used for setting up the orchestration 
from tfx.orchestration import pipeline
from tfx.orchestration import metadata
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunnerfrom typing import Text
from typing import Type
def createTfxPipeline(pipeline_name: Text, pipeline_root: Text, query: Text,
                      beam_pipeline_args) -> pipeline.Pipeline:
    output = example_gen_pb2.Output(
        # Output 2 splits: train:eval=3:1.
        split_config=example_gen_pb2.SplitConfig(splits=[
            example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=3),
            example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
        ]))
    # Brings data into the pipeline or otherwise joins/converts training data.
    example_gen = BigQueryExampleGen(query=query, output_config=output)
    
    # Computes statistics over data for visualization and example validation.
    statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
    return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=[
          example_gen, statistics_gen
      ],
      metadata_connection_config=metadata.sqlite_metadata_connection_config(
          os.path.join(".", 'metadata', pipeline_name,'metadata.db')),
      enable_cache=False,
      additional_pipeline_args=beam_pipeline_args)LIMIT 100" via the local DirectRunner.  tfx_pipeline = createTfxPipeline(
    pipeline_name="my_first_directRunner_pipeline",
    pipeline_root=LOCAL_PIPELINE_ROOT,
    query=query,
    beam_pipeline_args=                               {
        'beam_pipeline_args':[
            '--project={}'.format(GOOGLE_CLOUD_PROJECT),
            '--runner=DirectRunner']})
BeamDagRunner().run(tfx_pipeline)LOCAL_PIPELINE_ROOT;  import os
import tensorflow_data_validation as tfdv
stats = tfdv.load_statistics(os.path.join(LOCAL_PIPELINE_ROOT,"StatisticsGen","statistics","","train","stats_tfrecord"))
tfdv.visualize_statistics(stats)  That works fine for one hundred records, but what if the goal was to process all 187,002,0025 rows in the dataset? For this, the pipeline is switched from the DirectRunner to the production Dataflow runner. A few extra environment parameters are also set, for example the Google Cloud project to run the pipeline in.
 That works fine for one hundred records, but what if the goal was to process all 187,002,0025 rows in the dataset? For this, the pipeline is switched from the DirectRunner to the production Dataflow runner. A few extra environment parameters are also set, for example the Google Cloud project to run the pipeline in.  tfx_pipeline = createTfxPipeline(
    pipeline_name="my_first_dataflowRunner_pipeline",
    pipeline_root=GOOGLE_CLOUD_STORAGE_PIPELINE_ROOT,
    query=query,
    beam_pipeline_args={
    'beam_pipeline_args':[
        '--project={}'.format(GOOGLE_CLOUD_PROJECT)
,
    '--temp_location={}'.format(GOOGLE_CLOUD_TEMP_LOCATION),
    '--setup_file=./setup.py',
    '--runner=DataflowRunner']})
BeamDagRunner().run(tfx_pipeline)BeamDagRunner takes care of submitting ExampleGen and StatisticsGen as separate pipelines, ensuring ExampleGen completed successfully first before starting StatisticsGen.  The Dataflow service automatically takes care of spinning up workers, autoscaling, retries in the event of worker failure, centralized logging, and monitoring. Autoscaling is based on various signals including throughput rate, illustrated below;  The Dataflow monitoring console shows us various metrics about the pipeline, for example, the CPU utilization of the workers. Below we see the utilization of machines as they come on-line, consistently high with most workers over 90%:
 The Dataflow monitoring console shows us various metrics about the pipeline, for example, the CPU utilization of the workers. Below we see the utilization of machines as they come on-line, consistently high with most workers over 90%:  Apache Beam supports custom counters, which allows developers to create metrics from within their pipelines. The TFX team has used this to create useful information counters for the various components. Below we can see some of the counters recorded during the StatisticsGen run. Filtering for the key word "
 Apache Beam supports custom counters, which allows developers to create metrics from within their pipelines. The TFX team has used this to create useful information counters for the various components. Below we can see some of the counters recorded during the StatisticsGen run. Filtering for the key word "num_*_feature", there were roughly a billion integers and float features values.  
 
 
March 10, 2020
 —
                                  
Posted by By Reza Rokni, Developer Advocate Google Cloud, on behalf of the TFX and Dataflow teams
TFX core mission is to allow models to be moved from research to production, creating and managing production pipelines. Many models will be built using large volumes of data, requiring multiple hosts working in parallel to serve both the processing and serving needs of your production pipelines. 
Us…