Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 0499bbe

Browse files
charlesccychensilviulica
authored andcommitted
Clean up PValue and PCollection with clearer argument passing
Removes the deprecated "transform" argument to pvalue.PValue. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=119557938
1 parent e958bbb commit 0499bbe

6 files changed

Lines changed: 28 additions & 46 deletions

File tree

google/cloud/dataflow/io/iobase.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ def __init__(self, *args, **kwargs):
731731

732732
def apply(self, pcoll):
733733
self._check_pcollection(pcoll)
734-
return pvalue.PValue(pipeline=pcoll.pipeline, transform=self)
734+
return pvalue.PValue(pcoll.pipeline)
735735

736736

737737
class Read(ptransform.PTransform):
@@ -753,7 +753,7 @@ def __init__(self, *args, **kwargs):
753753
def apply(self, pbegin):
754754
assert isinstance(pbegin, pvalue.PBegin)
755755
self.pipeline = pbegin.pipeline
756-
return pvalue.PCollection(pipeline=self.pipeline, transform=self)
756+
return pvalue.PCollection(self.pipeline)
757757

758758
def get_windowing(self, unused_inputs):
759759
return core.Windowing(window.GlobalWindows())

google/cloud/dataflow/pvalue.py

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -30,38 +30,26 @@
3030
class PValue(object):
3131
"""Base class for PCollection.
3232
33+
Dataflow users should not construct PValue objects directly in their
34+
pipelines.
35+
3336
A PValue has the following main characteristics:
3437
(1) Belongs to a pipeline. Added during object initialization.
3538
(2) Has a transform that can compute the value if executed.
3639
(3) Has a value which is meaningful if the transform was executed.
3740
"""
3841

39-
def __init__(self, **kwargs):
42+
def __init__(self, pipeline, tag=None, element_type=None):
4043
"""Initializes a PValue with all arguments hidden behind keyword arguments.
4144
4245
Args:
43-
**kwargs: keyword arguments.
44-
45-
Raises:
46-
ValueError: if the expected keyword arguments (pipeline, transform,
47-
and optionally tag) are not present.
48-
49-
The method expects a pipeline and a transform keyword argument. However in
50-
order to give a signal to users that they should not create these PValues
51-
directly we obfuscate the arguments.
46+
pipeline: Pipeline object for this PValue.
47+
tag: Tag of this PValue.
48+
element_type: The type of this PValue.
5249
"""
53-
if 'pipeline' not in kwargs or 'transform' not in kwargs:
54-
raise ValueError(
55-
'Missing required arguments (pipeline and transform): %s'
56-
% kwargs.keys)
57-
self.pipeline = kwargs.pop('pipeline')
58-
# TODO(silviuc): Remove usage of the transform argument from all call sites.
59-
# It is not used anymore and has been replaced with the producer attribute.
60-
kwargs.pop('transform')
61-
self.tag = kwargs.pop('tag', None)
62-
self.element_type = kwargs.pop('element_type', None)
63-
if kwargs:
64-
raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())
50+
self.pipeline = pipeline
51+
self.tag = tag
52+
self.element_type = element_type
6553
self.pipeline._add_pvalue(self)
6654
# The AppliedPTransform instance for the application of the PTransform
6755
# generating this PValue. The field gets initialized when a transform
@@ -103,11 +91,15 @@ def __or__(self, ptransform):
10391

10492

10593
class PCollection(PValue):
106-
"""A multiple values (potentially huge) container."""
94+
"""A multiple values (potentially huge) container.
95+
96+
Dataflow users should not construct PCollection objects directly in their
97+
pipelines.
98+
"""
10799

108-
def __init__(self, **kwargs):
100+
def __init__(self, pipeline, **kwargs):
109101
"""Initializes a PCollection. Do not call directly."""
110-
super(PCollection, self).__init__(**kwargs)
102+
super(PCollection, self).__init__(pipeline, **kwargs)
111103

112104
@property
113105
def windowing(self):
@@ -216,10 +208,7 @@ def __getitem__(self, tag):
216208
return self._pcolls[tag]
217209
if tag is not None:
218210
self._transform.side_output_tags.add(tag)
219-
pcoll = PCollection(
220-
pipeline=self._pipeline,
221-
transform=self._transform,
222-
tag=tag)
211+
pcoll = PCollection(self._pipeline, tag=tag)
223212
# Transfer the producer from the DoOutputsTuple to the resulting
224213
# PCollection.
225214
pcoll.producer = self.producer

google/cloud/dataflow/pvalue_test.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
from google.cloud.dataflow.pipeline import Pipeline
2020
from google.cloud.dataflow.pvalue import PValue
21-
from google.cloud.dataflow.transforms import PTransform
2221

2322

2423
class FakePipeline(Pipeline):
@@ -33,15 +32,9 @@ class PValueTest(unittest.TestCase):
3332

3433
def test_pvalue_expected_arguments(self):
3534
pipeline = Pipeline('DirectPipelineRunner')
36-
transform = PTransform()
37-
value = PValue(pipeline=pipeline, transform=transform)
35+
value = PValue(pipeline)
3836
self.assertEqual(pipeline, value.pipeline)
3937

40-
def test_pvalue_missing_arguments(self):
41-
self.assertRaises(ValueError, PValue,
42-
pipeline=Pipeline('DirectPipelineRunner'))
43-
self.assertRaises(ValueError, PValue, transform=PTransform())
44-
4538

4639
if __name__ == '__main__':
4740
unittest.main()

google/cloud/dataflow/runners/dataflow_runner.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ def apply_GroupByKey(self, transform, pcoll):
322322
coders.registry.verify_deterministic(
323323
coder.key_coder(), 'GroupByKey operation "%s"' % transform.label)
324324

325-
return pvalue.PCollection(pipeline=pcoll.pipeline, transform=transform)
325+
return pvalue.PCollection(pcoll.pipeline)
326326

327327
def run_GroupByKey(self, transform_node):
328328
input_tag = transform_node.inputs[0].tag
@@ -413,7 +413,7 @@ def run_ParDo(self, transform_node):
413413
step.add_property(PropertyNames.OUTPUT_INFO, outputs)
414414

415415
def apply_CombineValues(self, transform, pcoll):
416-
return pvalue.PCollection(pipeline=pcoll.pipeline, transform=transform)
416+
return pvalue.PCollection(pcoll.pipeline)
417417

418418
def run_CombineValues(self, transform_node):
419419
transform = transform_node.transform

google/cloud/dataflow/transforms/core.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ def apply(self, pcoll):
537537
self.side_output_tags = set()
538538
# TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
539539
self.dofn = self.fn
540-
return pvalue.PCollection(pipeline=pcoll.pipeline, transform=self)
540+
return pvalue.PCollection(pcoll.pipeline)
541541

542542
def with_outputs(self, *tags, **main_kw):
543543
"""Returns a tagged tuple allowing access to the outputs of a ParDo.
@@ -1032,7 +1032,7 @@ def infer_output_type(self, input_type):
10321032

10331033
def apply(self, pcoll):
10341034
self._check_pcollection(pcoll)
1035-
return pvalue.PCollection(pipeline=pcoll.pipeline, transform=self)
1035+
return pvalue.PCollection(pcoll.pipeline)
10361036

10371037

10381038
class Partition(PTransformWithSideInputs):
@@ -1207,7 +1207,7 @@ def _extract_input_pvalues(self, pvalueish):
12071207
def apply(self, pcolls):
12081208
for pcoll in pcolls:
12091209
self._check_pcollection(pcoll)
1210-
return pvalue.PCollection(pipeline=self.pipeline, transform=self)
1210+
return pvalue.PCollection(self.pipeline)
12111211

12121212

12131213
class Create(PTransform):
@@ -1240,7 +1240,7 @@ def infer_output_type(self, unused_input_type):
12401240
def apply(self, pbegin):
12411241
assert isinstance(pbegin, pvalue.PBegin)
12421242
self.pipeline = pbegin.pipeline
1243-
return pvalue.PCollection(pipeline=self.pipeline, transform=self)
1243+
return pvalue.PCollection(self.pipeline)
12441244

12451245
def get_windowing(self, unused_inputs):
12461246
return Windowing(window.GlobalWindows())

google/cloud/dataflow/transforms/ptransform.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ def _extract_input_pvalues(self, pvalueish):
410410
from google.cloud.dataflow import pipeline
411411
# pylint: enable=g-import-not-at-top
412412
if isinstance(pvalueish, pipeline.Pipeline):
413-
pvalueish = pvalue.PBegin(pipeline=pvalueish, transform=None)
413+
pvalueish = pvalue.PBegin(pvalueish)
414414

415415
return pvalueish, (pvalueish,)
416416

0 commit comments

Comments
 (0)