Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 65a0bfc

Browse files
aaltaygildea
authored andcommitted
Logging pipeline messages snippet
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=117286960
1 parent 164caa4 commit 65a0bfc

2 files changed

Lines changed: 48 additions & 1 deletion

File tree

google/cloud/dataflow/examples/snippets/snippets.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ def _add_argparse_args(cls, parser):
285285

286286

287287
def pipeline_options_command_line(argv):
288-
""""Creating a Pipeline by passing a list of arguments.
288+
"""Creating a Pipeline by passing a list of arguments.
289289
290290
URL: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
291291
"""
@@ -308,6 +308,43 @@ def pipeline_options_command_line(argv):
308308
p.run()
309309

310310

311+
def pipeline_logging(lines, output):
312+
"""Logging Pipeline Messages.
313+
314+
URL: https://cloud.google.com/dataflow/pipelines/logging
315+
"""
316+
317+
import re
318+
import google.cloud.dataflow as df
319+
from google.cloud.dataflow.utils.options import PipelineOptions
320+
321+
# [START pipeline_logging]
322+
# import Python logging module.
323+
import logging
324+
325+
class ExtractWordsFn(df.DoFn):
326+
327+
def process(self, context):
328+
words = re.findall(r'[A-Za-z\']+', context.element)
329+
for word in words:
330+
yield word
331+
332+
if word.lower() == 'love':
333+
# Log using the root logger at info or higher levels
334+
logging.info('Found : %s', word.lower())
335+
336+
# Remaining WordCount example code ...
337+
# [END pipeline_logging]
338+
339+
p = df.Pipeline(options=PipelineOptions())
340+
(p
341+
| df.Create(lines)
342+
| df.ParDo('ExtractWords', ExtractWordsFn())
343+
| df.io.Write('WriteToText', df.io.TextFileSink(output)))
344+
345+
p.run()
346+
347+
311348
def model_textio(renames):
312349
"""Using a Read and Write transform to read/write text files.
313350

google/cloud/dataflow/examples/snippets/snippets_test.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,16 @@ def test_pipeline_options_remote(self):
392392
def test_pipeline_options_command_line(self):
393393
self._run_test_pipeline_for_options(snippets.pipeline_options_command_line)
394394

395+
def test_pipeline_logging(self):
396+
result_path = self.create_temp_file()
397+
lines = ['we found love right where we are',
398+
'we found love right from the start',
399+
'we found love in a hopeless place']
400+
snippets.pipeline_logging(lines, result_path)
401+
self.assertEqual(
402+
sorted(' '.join(lines).split(' ')),
403+
self.get_output(result_path))
404+
395405
def test_model_composite_transform_example(self):
396406
contents = ['aa bb cc', 'bb cc', 'cc']
397407
result_path = self.create_temp_file()

0 commit comments

Comments
 (0)