Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 96583d0

Browse files
committed
Ignore undeclared side outputs of DoFns in cloud executor
DoFns may emit undeclared side outputs, executor is crashing on those instead they should be ignored. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=123371414
1 parent 574a29e commit 96583d0

3 files changed

Lines changed: 50 additions & 2 deletions

File tree

google/cloud/dataflow/transforms/core.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,8 @@ def with_outputs(self, *tags, **main_kw):
575575
the available tags (e.g., for tag in o: ...).
576576
577577
Args:
578-
*tags: if non-empty, list of valid tags
578+
*tags: if non-empty, list of valid tags. If a list of valid tags is given,
579+
it will be an error to use an undeclared tag later in the pipeline.
579580
**main_kw: dictionary empty or with one key 'main' defining the tag to be
580581
used for the main output (which will not have a tag associated with it).
581582

google/cloud/dataflow/worker/executor.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,19 @@ def process(self, o):
388388
self.receivers[0].update_counters_finish()
389389

390390

391+
class _TaggedReceivers(dict):
392+
393+
class NullReceiver(object):
394+
395+
def output(self, element):
396+
pass
397+
398+
def __missing__(self, unused_key):
399+
if not getattr(self, '_null_receiver', None):
400+
self._null_receiver = _TaggedReceivers.NullReceiver()
401+
return self._null_receiver
402+
403+
391404
class DoOperation(Operation):
392405
"""A Do operation that will execute a custom DoFn for each input element."""
393406

@@ -473,7 +486,8 @@ def start(self):
473486
# Tag to output index map used to dispatch the side output values emitted
474487
# by the DoFn function to the appropriate receivers. The main output is
475488
# tagged with None and is associated with its corresponding index.
476-
tagged_receivers = {}
489+
tagged_receivers = _TaggedReceivers()
490+
477491
output_tag_prefix = PropertyNames.OUT + '_'
478492
for index, tag in enumerate(self.spec.output_tags):
479493
if tag == PropertyNames.OUT:

google/cloud/dataflow/worker/executor_test.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,14 @@ def finish_bundle(self, context, *args, **kwargs):
8787
f.write('finish called.')
8888

8989

90+
class DoFnUsingWithUndeclaredSideOutput(ptransform.DoFn):
91+
"""A DoFn class that emits an undeclared side output."""
92+
93+
def process(self, context, *args, **kwargs):
94+
yield pvalue.SideOutputValue('undeclared', context.element)
95+
yield context.element
96+
97+
9098
class ProgressRequestRecordingInMemoryReader(inmemory.InMemoryReader):
9199

92100
def __init__(self, source):
@@ -175,6 +183,31 @@ def test_read_do_write_with_start_bundle(self):
175183
with open(finish_path) as f:
176184
self.assertEqual('finish called.', f.read())
177185

186+
def test_read_do_write_with_undeclared_output(self):
187+
input_path = self.create_temp_file('01234567890123456789\n0123456789')
188+
output_path = '%s.out' % input_path
189+
work_item = workitem.BatchWorkItem(None)
190+
work_item.map_task = make_map_task([
191+
maptask.WorkerRead(
192+
fileio.TextFileSource(file_path=input_path,
193+
start_offset=0,
194+
end_offset=15,
195+
strip_trailing_newlines=True,
196+
coder=coders.StrUtf8Coder()),
197+
output_coders=[self.OUTPUT_CODER]),
198+
maptask.WorkerDoFn(serialized_fn=pickle_with_side_inputs(
199+
DoFnUsingWithUndeclaredSideOutput()),
200+
output_tags=['out'],
201+
output_coders=[self.OUTPUT_CODER],
202+
input=(0, 0),
203+
side_inputs=None),
204+
make_text_sink(output_path, input=(1, 0))
205+
])
206+
207+
executor.MapTaskExecutor(work_item.map_task).execute()
208+
with open(output_path) as f:
209+
self.assertEqual('01234567890123456789\n', f.read())
210+
178211
def test_read_do_shuffle_write(self):
179212
input_path = self.create_temp_file('a\nb\nc\nd\n')
180213
work_spec = [

0 commit comments

Comments
 (0)