Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit c644665

Browse files
robertwbaaltay
authored andcommitted
Allow Pipeline objects to be used in Python with statements
----Release Notes---- One can now write with df.Pipeline(...) as p: p | df.io.Read(...) | Map(...) | ... and p.run() will be called automatically at the end of the block. [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=124308722
1 parent 30af51a commit c644665

2 files changed

Lines changed: 15 additions & 0 deletions

File tree

google/cloud/dataflow/pipeline.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,13 @@ def run(self):
157157
shutil.rmtree(tmpdir)
158158
return self.runner.run(self)
159159

160+
def __enter__(self):
161+
return self
162+
163+
def __exit__(self, exc_type, exc_val, exc_tb):
164+
if not exc_type:
165+
self.run()
166+
160167
def visit(self, visitor):
161168
"""Visits depth-first every node of a pipeline's DAG.
162169

google/cloud/dataflow/pipeline_test.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,14 @@ def create_dupes(o, _):
245245
('oom:combine/GroupByKey/group_by_window', None): 1,
246246
('oom:combine/Combine/ParDo(CombineValuesDoFn)', None): 1})
247247

248+
def test_pipeline_as_context(self):
249+
def raise_exception(exn):
250+
raise exn
251+
with self.assertRaises(ValueError):
252+
with Pipeline(self.runner_name) as p:
253+
# pylint: disable=expression-not-assigned
254+
p | Create([ValueError]) | Map(raise_exception)
255+
248256
def test_eager_pipeline(self):
249257
p = Pipeline('EagerPipelineRunner')
250258
self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x))

0 commit comments

Comments
 (0)