Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 13efcdd

Browse files
charlesccychensilviulica
authored andcommitted
Treat creation of side input views as a PTransform
This change introduces PCollectionView as a specialization of PValue. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=120200326
1 parent 77d500f commit 13efcdd

12 files changed

Lines changed: 488 additions & 153 deletions

File tree

google/cloud/dataflow/dataflow_test.py

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,19 +250,91 @@ def match(actual):
250250
assert_that(results, matcher(1, a_list, some_pairs))
251251
pipeline.run()
252252

253-
def test_as_list_without_unique_labels(self):
254-
a_list = [1, 2, 3]
253+
def test_as_singleton_without_unique_labels(self):
254+
# This should succeed as calling AsSingleton on the same PCollection twice
255+
# with the same defaults will return the same PCollectionView.
256+
a_list = [2]
257+
pipeline = Pipeline('DirectPipelineRunner')
258+
main_input = pipeline | Create('main input', [1])
259+
side_list = pipeline | Create('side list', a_list)
260+
results = main_input | FlatMap(
261+
'test',
262+
lambda x, s1, s2: [[x, s1, s2]],
263+
AsSingleton(side_list), AsSingleton(side_list))
264+
265+
def matcher(expected_elem, expected_singleton):
266+
def match(actual):
267+
[[actual_elem, actual_singleton1, actual_singleton2]] = actual
268+
equal_to([expected_elem])([actual_elem])
269+
equal_to([expected_singleton])([actual_singleton1])
270+
equal_to([expected_singleton])([actual_singleton2])
271+
return match
272+
273+
assert_that(results, matcher(1, 2))
274+
pipeline.run()
275+
276+
def test_as_singleton_with_different_defaults_without_unique_labels(self):
277+
# This should fail as AsSingleton with distinct default values should create
278+
# distinct PCollectionViews with the same full_label.
279+
a_list = [2]
255280
pipeline = Pipeline('DirectPipelineRunner')
256281
main_input = pipeline | Create('main input', [1])
257282
side_list = pipeline | Create('side list', a_list)
283+
258284
with self.assertRaises(RuntimeError) as e:
259285
_ = main_input | FlatMap(
260286
'test',
261-
lambda x, ls1, ls2: [[x, ls1, ls2]],
262-
AsList(side_list), AsList(side_list))
287+
lambda x, s1, s2: [[x, s1, s2]],
288+
AsSingleton(side_list), AsSingleton(side_list, default_value=3))
263289
self.assertTrue(
264290
e.exception.message.startswith(
265-
'Transform "AsList" does not have a stable unique label.'))
291+
'Transform "ViewAsSingleton(side list.None)" does not have a '
292+
'stable unique label.'))
293+
294+
def test_as_singleton_with_different_defaults_with_unique_labels(self):
295+
a_list = []
296+
pipeline = Pipeline('DirectPipelineRunner')
297+
main_input = pipeline | Create('main input', [1])
298+
side_list = pipeline | Create('side list', a_list)
299+
results = main_input | FlatMap(
300+
'test',
301+
lambda x, s1, s2: [[x, s1, s2]],
302+
AsSingleton('si1', side_list, default_value=2),
303+
AsSingleton('si2', side_list, default_value=3))
304+
305+
def matcher(expected_elem, expected_singleton1, expected_singleton2):
306+
def match(actual):
307+
[[actual_elem, actual_singleton1, actual_singleton2]] = actual
308+
equal_to([expected_elem])([actual_elem])
309+
equal_to([expected_singleton1])([actual_singleton1])
310+
equal_to([expected_singleton2])([actual_singleton2])
311+
return match
312+
313+
assert_that(results, matcher(1, 2, 3))
314+
pipeline.run()
315+
316+
def test_as_list_without_unique_labels(self):
317+
# This should succeed as calling AsList on the same PCollection twice will
318+
# return the same PCollectionView.
319+
a_list = [1, 2, 3]
320+
pipeline = Pipeline('DirectPipelineRunner')
321+
main_input = pipeline | Create('main input', [1])
322+
side_list = pipeline | Create('side list', a_list)
323+
results = main_input | FlatMap(
324+
'test',
325+
lambda x, ls1, ls2: [[x, ls1, ls2]],
326+
AsList(side_list), AsList(side_list))
327+
328+
def matcher(expected_elem, expected_list):
329+
def match(actual):
330+
[[actual_elem, actual_list1, actual_list2]] = actual
331+
equal_to([expected_elem])([actual_elem])
332+
equal_to(expected_list)(actual_list1)
333+
equal_to(expected_list)(actual_list2)
334+
return match
335+
336+
assert_that(results, matcher(1, [1, 2, 3]))
337+
pipeline.run()
266338

267339
def test_as_list_with_unique_labels(self):
268340
a_list = [1, 2, 3]
@@ -282,6 +354,9 @@ def match(actual):
282354
equal_to(expected_list)(actual_list2)
283355
return match
284356

357+
assert_that(results, matcher(1, [1, 2, 3]))
358+
pipeline.run()
359+
285360
def test_as_dict_with_unique_labels(self):
286361
some_kvs = [('a', 1), ('b', 2)]
287362
pipeline = Pipeline('DirectPipelineRunner')

google/cloud/dataflow/internal/apiclient.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ class Step(object):
124124
"""Wrapper for a dataflow Step protobuf."""
125125

126126
def __init__(self, step_kind, step_name):
127+
self.step_kind = step_kind
128+
self.step_name = step_name
127129
self.proto = dataflow.Step(kind=step_kind, name=step_name)
128130
self.proto.properties = {}
129131

google/cloud/dataflow/pipeline.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ def __init__(self, runner=None, options=None, argv=None):
137137
# If a transform is applied and the full label is already in the set
138138
# then the transform will have to be cloned with a new label.
139139
self.applied_labels = set()
140+
# Store cache of views created from PCollections. For reference, see
141+
# pvalue._cache_view().
142+
self._view_cache = {}
140143

141144
def _add_pvalue(self, pval):
142145
"""Adds a PValue to the pipeline's node list."""
@@ -263,7 +266,8 @@ def apply(self, transform, pvalueish=None):
263266
# TODO(robertwb): Multi-input, multi-output inference.
264267
# TODO(robertwb): Ideally we'd do intersection here.
265268
if (type_options is not None and type_options.pipeline_type_check and
266-
isinstance(result, pvalue.PCollection) and not result.element_type):
269+
isinstance(result, (pvalue.PCollection, pvalue.PCollectionView))
270+
and not result.element_type):
267271
input_element_type = (
268272
inputs[0].element_type
269273
if len(inputs) == 1
@@ -377,7 +381,7 @@ def real_producer(pv):
377381
if not isinstance(main_input, pvalue.PBegin):
378382
real_producer(main_input).refcounts[main_input.tag] += 1
379383
for side_input in self.side_inputs:
380-
real_producer(side_input.pvalue).refcounts[side_input.pvalue.tag] += 1
384+
real_producer(side_input).refcounts[side_input.tag] += 1
381385

382386
def add_output(self, output):
383387
assert (isinstance(output, pvalue.PValue) or
@@ -409,8 +413,7 @@ def visit(self, visitor, pipeline, visited):
409413

410414
# Visit side inputs.
411415
for pval in self.side_inputs:
412-
if isinstance(pval, pvalue.AsSideInput) and pval.pvalue not in visited:
413-
pval = pval.pvalue # Unpack marker-object-wrapped pvalue.
416+
if isinstance(pval, pvalue.PCollectionView) and pval not in visited:
414417
assert pval.producer is not None
415418
pval.producer.visit(visitor, pipeline, visited)
416419
# The value should be visited now since we visit outputs too.

0 commit comments

Comments
 (0)