@@ -345,6 +345,72 @@ def process(self, context):
345345 p .run ()
346346
347347
348+ def pipeline_monitoring (renames ):
349+ """Using monitoring interface snippets.
350+
351+ URL: https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf
352+ """
353+
354+ import re
355+ import google .cloud .dataflow as df
356+ from google .cloud .dataflow .utils .options import PipelineOptions
357+
358+ class WordCountOptions (PipelineOptions ):
359+
360+ @classmethod
361+ def _add_argparse_args (cls , parser ):
362+ parser .add_argument ('--input' ,
363+ help = 'Input for the dataflow pipeline' ,
364+ default = 'gs://my-bucket/input' )
365+ parser .add_argument ('--output' ,
366+ help = 'output for the dataflow pipeline' ,
367+ default = 'gs://my-bucket/output' )
368+
369+ class ExtractWordsFn (df .DoFn ):
370+
371+ def process (self , context ):
372+ words = re .findall (r'[A-Za-z\']+' , context .element )
373+ for word in words :
374+ yield word
375+
376+ class FormatCountsFn (df .DoFn ):
377+
378+ def process (self , context ):
379+ word , count = context .element
380+ yield '%s: %s' % (word , count )
381+
382+ # [START pipeline_monitoring_composite]
383+ # The CountWords Composite Transform inside the WordCount pipeline.
384+ class CountWords (df .PTransform ):
385+
386+ def apply (self , pcoll ):
387+ return (pcoll
388+ # Convert lines of text into individual words.
389+ | df .ParDo ('ExtractWords' , ExtractWordsFn ())
390+ # Count the number of times each word occurs.
391+ | df .combiners .Count .PerElement ()
392+ # Format each word and count into a printable string.
393+ | df .ParDo ('FormatCounts' , FormatCountsFn ()))
394+ # [END pipeline_monitoring_composite]
395+
396+ pipeline_options = PipelineOptions ()
397+ options = pipeline_options .view_as (WordCountOptions )
398+ p = df .Pipeline (options = pipeline_options )
399+
400+ # [START pipeline_monitoring_execution]
401+ (p
402+ # Read the lines of the input text.
403+ | df .io .Read ('ReadLines' , df .io .TextFileSource (options .input ))
404+ # Count the words.
405+ | CountWords ()
406+ # Write the formatted word counts to output.
407+ | df .io .Write ('WriteCounts' , df .io .TextFileSink (options .output )))
408+ # [END pipeline_monitoring_execution]
409+
410+ p .visit (SnippetUtils .RenameFiles (renames ))
411+ p .run ()
412+
413+
348414def examples_wordcount_minimal (renames ):
349415 """MinimalWordCount example snippets.
350416
0 commit comments