Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit a724abb

Browse files
robertwbaaltay
authored andcommitted
Implement EagerPipelineRunner, useful for running in a repl.
Using EagerPipelineRunner causes PTransforms to be evaluated eagerly rather than being deferred. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=122183026
1 parent 14719fd commit a724abb

4 files changed

Lines changed: 23 additions & 2 deletions

File tree

google/cloud/dataflow/pipeline_test.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,10 @@ def create_dupes(o, _):
244244
('oom:combine/GroupByKey/group_by_window', None): 1,
245245
('oom:combine/Combine/ParDo(CombineValuesDoFn)', None): 1})
246246

247+
def test_eager_pipeline(self):
248+
p = Pipeline('EagerPipelineRunner')
249+
self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x))
250+
247251

248252
class Bacon(PipelineOptions):
249253

google/cloud/dataflow/runners/direct_runner.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,3 +259,17 @@ def __init__(self, state, counter_factory=None):
259259

260260
def aggregated_values(self, aggregator_or_name):
261261
return self._counter_factory.get_aggregator_values(aggregator_or_name)
262+
263+
264+
class EagerPipelineRunner(DirectPipelineRunner):
265+
266+
is_eager = True
267+
268+
def __init__(self):
269+
super(EagerPipelineRunner, self).__init__()
270+
self._seen_transforms = set()
271+
272+
def run_transform(self, transform):
273+
if transform not in self._seen_transforms:
274+
self._seen_transforms.add(transform)
275+
super(EagerPipelineRunner, self).run_transform(transform)

google/cloud/dataflow/runners/runner.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ def create_runner(runner_name):
3737
if runner_name == 'DirectPipelineRunner':
3838
import google.cloud.dataflow.runners.direct_runner
3939
return google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner()
40+
if runner_name == 'EagerPipelineRunner':
41+
import google.cloud.dataflow.runners.direct_runner
42+
return google.cloud.dataflow.runners.direct_runner.EagerPipelineRunner()
4043
elif runner_name in ('DataflowPipelineRunner',
4144
'BlockingDataflowPipelineRunner'):
4245
import google.cloud.dataflow.runners.dataflow_runner
@@ -45,7 +48,7 @@ def create_runner(runner_name):
4548
else:
4649
raise RuntimeError(
4750
'Unexpected pipeline runner: %s. Valid values are '
48-
'DirectPipelineRunner, DataflowPipelineRunner, or '
51+
'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, or '
4952
'BlockingDataflowPipelineRunner.' % runner_name)
5053

5154

google/cloud/dataflow/transforms/ptransform.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,6 @@ def __ror__(self, left):
387387
p = pipeline.Pipeline(
388388
'DirectPipelineRunner', PipelineOptions(sys.argv))
389389
else:
390-
deferred = True
391390
if not pipelines:
392391
if self.pipeline is not None:
393392
p = self.pipeline
@@ -400,6 +399,7 @@ def __ror__(self, left):
400399
if p != pp:
401400
raise ValueError(
402401
'Mixing value from different pipelines not allowed.')
402+
deferred = not getattr(p.runner, 'is_eager', False)
403403
# pylint: disable=g-import-not-at-top
404404
from google.cloud.dataflow.transforms.core import Create
405405
# pylint: enable=g-import-not-at-top

0 commit comments

Comments
 (0)