Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit a94e7e3

Browse files
aaltaysilviulica
authored andcommitted
WordCount, minimal WordCount, and debugging WordCount snippets
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=117396167
1 parent ac319b0 commit a94e7e3

2 files changed

Lines changed: 215 additions & 0 deletions

File tree

google/cloud/dataflow/examples/snippets/snippets.py

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,198 @@ def process(self, context):
345345
p.run()
346346

347347

348+
def examples_wordcount_minimal(renames):
349+
"""MinimalWordCount example snippets.
350+
351+
URL:
352+
https://cloud.google.com/dataflow/examples/wordcount-example#MinimalWordCount
353+
"""
354+
import re
355+
356+
import google.cloud.dataflow as df
357+
358+
from google.cloud.dataflow.utils.options import GoogleCloudOptions
359+
from google.cloud.dataflow.utils.options import StandardOptions
360+
from google.cloud.dataflow.utils.options import PipelineOptions
361+
362+
# [START examples_wordcount_minimal_options]
363+
options = PipelineOptions()
364+
google_cloud_options = options.view_as(GoogleCloudOptions)
365+
google_cloud_options.project = 'my-project-id'
366+
google_cloud_options.job_name = 'myjob'
367+
google_cloud_options.staging_location = 'gs://your-bucket-name-here/staging'
368+
google_cloud_options.temp_location = 'gs://your-bucket-name-here/temp'
369+
options.view_as(StandardOptions).runner = 'BlockingDataflowPipelineRunner'
370+
# [END examples_wordcount_minimal_options]
371+
372+
# Run it locally for testing.
373+
options = PipelineOptions()
374+
375+
# [START examples_wordcount_minimal_create]
376+
p = df.Pipeline(options=options)
377+
# [END examples_wordcount_minimal_create]
378+
379+
(
380+
# [START examples_wordcount_minimal_read]
381+
p | df.io.Read(df.io.TextFileSource(
382+
'gs://dataflow-samples/shakespeare/kinglear.txt'))
383+
# [END examples_wordcount_minimal_read]
384+
385+
# [START examples_wordcount_minimal_pardo]
386+
| df.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
387+
# [END examples_wordcount_minimal_pardo]
388+
389+
# [START examples_wordcount_minimal_count]
390+
| df.combiners.Count.PerElement()
391+
# [END examples_wordcount_minimal_count]
392+
393+
# [START examples_wordcount_minimal_map]
394+
| df.Map(lambda (word, count): '%s: %s' % (word, count))
395+
# [END examples_wordcount_minimal_map]
396+
397+
# [START examples_wordcount_minimal_write]
398+
| df.io.Write(df.io.TextFileSink('gs://my-bucket/counts.txt'))
399+
# [END examples_wordcount_minimal_write]
400+
)
401+
402+
p.visit(SnippetUtils.RenameFiles(renames))
403+
404+
# [START examples_wordcount_minimal_run]
405+
p.run()
406+
# [END examples_wordcount_minimal_run]
407+
408+
409+
def examples_wordcount_wordcount(renames):
410+
"""WordCount example snippets.
411+
412+
URL:
413+
https://cloud.google.com/dataflow/examples/wordcount-example#WordCount
414+
"""
415+
import re
416+
417+
import google.cloud.dataflow as df
418+
from google.cloud.dataflow.utils.options import PipelineOptions
419+
420+
argv = []
421+
422+
# [START examples_wordcount_wordcount_options]
423+
class WordCountOptions(PipelineOptions):
424+
425+
@classmethod
426+
def _add_argparse_args(cls, parser):
427+
parser.add_argument('--input',
428+
help='Input for the dataflow pipeline',
429+
default='gs://my-bucket/input')
430+
431+
options = PipelineOptions(argv)
432+
p = df.Pipeline(options=options)
433+
# [END examples_wordcount_wordcount_options]
434+
435+
lines = p | df.io.Read(df.io.TextFileSource(
436+
'gs://dataflow-samples/shakespeare/kinglear.txt'))
437+
438+
# [START examples_wordcount_wordcount_composite]
439+
class CountWords(df.PTransform):
440+
441+
def apply(self, pcoll):
442+
return (pcoll
443+
# Convert lines of text into individual words.
444+
| df.FlatMap(
445+
'ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
446+
447+
# Count the number of times each word occurs.
448+
| df.combiners.Count.PerElement())
449+
450+
counts = lines | CountWords()
451+
# [END examples_wordcount_wordcount_composite]
452+
453+
# [START examples_wordcount_wordcount_dofn]
454+
class FormatAsTextFn(df.DoFn):
455+
456+
def process(self, context):
457+
word, count = context.element
458+
yield '%s: %s' % (word, count)
459+
460+
formatted = counts | df.ParDo(FormatAsTextFn())
461+
# [END examples_wordcount_wordcount_dofn]
462+
463+
formatted | df.io.Write(df.io.TextFileSink('gs://my-bucket/counts.txt'))
464+
p.visit(SnippetUtils.RenameFiles(renames))
465+
p.run()
466+
467+
468+
def examples_wordcount_debugging(renames):
469+
"""DebuggingWordCount example snippets.
470+
471+
URL:
472+
https://cloud.google.com/dataflow/examples/wordcount-example#DebuggingWordCount
473+
"""
474+
import re
475+
476+
import google.cloud.dataflow as df
477+
from google.cloud.dataflow.utils.options import PipelineOptions
478+
479+
# [START example_wordcount_debugging_logging]
480+
# [START example_wordcount_debugging_aggregators]
481+
import logging
482+
483+
class FilterTextFn(df.DoFn):
484+
"""A DoFn that filters for a specific key based on a regular expression."""
485+
486+
# A custom aggregator can track values in your pipeline as it runs. Create
487+
# custom aggregators matched_word and unmatched_words.
488+
matched_words = df.Aggregator('matched_words')
489+
umatched_words = df.Aggregator('umatched_words')
490+
491+
def __init__(self, pattern):
492+
self.pattern = pattern
493+
494+
def process(self, context):
495+
word, _ = context.element
496+
if re.match(self.pattern, word):
497+
# Log at INFO level each element we match. When executing this pipeline
498+
# using the Dataflow service, these log lines will appear in the Cloud
499+
# Logging UI.
500+
logging.info('Matched %s', word)
501+
502+
# Add 1 to the custom aggregator matched_words
503+
context.aggregate_to(self.matched_words, 1)
504+
yield context.element
505+
else:
506+
# Log at the "DEBUG" level each element that is not matched. Different
507+
# log levels can be used to control the verbosity of logging providing
508+
# an effective mechanism to filter less important information. Note
509+
# currently only "INFO" and higher level logs are emitted to the Cloud
510+
# Logger. This log message will not be visible in the Cloud Logger.
511+
logging.debug('Did not match %s', word)
512+
513+
# Add 1 to the custom aggregator umatched_words
514+
context.aggregate_to(self.umatched_words, 1)
515+
# [END example_wordcount_debugging_logging]
516+
# [END example_wordcount_debugging_aggregators]
517+
518+
p = df.Pipeline(options=PipelineOptions())
519+
filtered_words = (
520+
p
521+
| df.io.Read(df.io.TextFileSource(
522+
'gs://dataflow-samples/shakespeare/kinglear.txt'))
523+
| df.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
524+
| df.combiners.Count.PerElement()
525+
| df.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
526+
527+
# [START example_wordcount_debugging_assert]
528+
df.assert_that(filtered_words, df.equal_to([('Flourish', 3), ('stomach', 1)]))
529+
# [END example_wordcount_debugging_assert]
530+
531+
output = (filtered_words
532+
| df.Map('format', lambda (word, c): '%s: %s' % (word, c))
533+
| df.io.Write(
534+
'write', df.io.TextFileSink('gs://my-bucket/counts.txt')))
535+
536+
p.visit(SnippetUtils.RenameFiles(renames))
537+
p.run()
538+
539+
348540
def model_textio(renames):
349541
"""Using a Read and Write transform to read/write text files.
350542

google/cloud/dataflow/examples/snippets/snippets_test.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,29 @@ def test_pipeline_logging(self):
402402
sorted(' '.join(lines).split(' ')),
403403
self.get_output(result_path))
404404

405+
def test_examples_wordcount(self):
406+
pipelines = [snippets.examples_wordcount_minimal,
407+
snippets.examples_wordcount_wordcount]
408+
409+
for pipeline in pipelines:
410+
temp_path = self.create_temp_file(
411+
'abc def ghi\n abc jkl')
412+
result_path = self.create_temp_file()
413+
pipeline({'read': temp_path, 'write': result_path})
414+
self.assertEqual(
415+
self.get_output(result_path),
416+
['abc: 2', 'def: 1', 'ghi: 1', 'jkl: 1'])
417+
418+
def test_examples_wordcount_debugging(self):
419+
temp_path = self.create_temp_file(
420+
'Flourish Flourish Flourish stomach abc def')
421+
result_path = self.create_temp_file()
422+
snippets.examples_wordcount_debugging(
423+
{'read': temp_path, 'write': result_path})
424+
self.assertEqual(
425+
self.get_output(result_path),
426+
['Flourish: 3', 'stomach: 1'])
427+
405428
def test_model_composite_transform_example(self):
406429
contents = ['aa bb cc', 'bb cc', 'cc']
407430
result_path = self.create_temp_file()

0 commit comments

Comments
 (0)