Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit d7a2b0a

Browse files
aaltaysilviulica
authored andcommitted
Remove sdk pipeline options from the DoFn context
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=118403483
1 parent 4f7f210 commit d7a2b0a

5 files changed

Lines changed: 8 additions & 21 deletions

File tree

google/cloud/dataflow/runners/common.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,14 +171,9 @@ def existing_windows(self):
171171

172172
class DoFnState(object):
173173
"""Keeps track of state that DoFns want, currently, user counters.
174-
175-
Attributes:
176-
pipeline_options: a PipelineOptions object associated with this DoFn.
177-
step_name: name of the step as a string.
178174
"""
179175

180-
def __init__(self, pipeline_options):
181-
self.pipeline_options = pipeline_options
176+
def __init__(self):
182177
self.step_name = ''
183178
self._user_counters = {}
184179

google/cloud/dataflow/runners/direct_runner.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,8 @@ def func_wrapper(self, pvalue, *args, **kwargs):
7777
@skip_if_cached
7878
def run_ParDo(self, transform_node):
7979
transform = transform_node.transform
80-
options = transform_node.inputs[0].pipeline.options
8180
# TODO(gildea): what is the appropriate object to attach the state to?
82-
context = DoFnProcessContext(label=transform.label,
83-
state=DoFnState(options))
81+
context = DoFnProcessContext(label=transform.label, state=DoFnState())
8482

8583
# Construct the list of values from side-input PCollections that we'll
8684
# substitute into the arguments for DoFn methods.
@@ -107,6 +105,7 @@ def get_side_input_value(si):
107105

108106
# TODO(robertwb): Do this type checking inside DoFnRunner to get it on
109107
# remote workers as well?
108+
options = transform_node.inputs[0].pipeline.options
110109
if options is not None and options.view_as(TypeOptions).runtime_type_check:
111110
transform.dofn = TypeCheckWrapperDoFn(
112111
transform.dofn, transform.get_type_hints())

google/cloud/dataflow/transforms/core.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ class DoFnProcessContext(object):
6161
(in process method only; always None in start_bundle and finish_bundle)
6262
windows: windows of the element
6363
(in process method only; always None in start_bundle and finish_bundle)
64-
pipeline_options: PipelineOptions object used for creating the pipeline.
6564
state: a DoFnState object, which holds the runner's internal state
6665
for this element. For example, aggregator state is here.
6766
Not used by the pipeline code.
@@ -107,10 +106,6 @@ def aggregate_to(self, aggregator, input_value):
107106
"""
108107
self.state.counter_for(aggregator).update(input_value)
109108

110-
@property
111-
def pipeline_options(self):
112-
return self.state.pipeline_options
113-
114109

115110
class DoFn(WithTypeHints):
116111
"""A function object used by a transform with custom processing.

google/cloud/dataflow/worker/batchworker.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,8 +405,7 @@ def do_work(self, work_item):
405405
self.dynamic_split_result_to_report = None
406406

407407
self.set_current_work_item_and_executor(work_item,
408-
executor.MapTaskExecutor(
409-
self.pipeline_options))
408+
executor.MapTaskExecutor())
410409
self.report_progress = True
411410
self.current_executor.execute(work_item.map_task)
412411
except Exception: # pylint: disable=broad-except

google/cloud/dataflow/worker/executor.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -383,9 +383,9 @@ def process(self, o):
383383
class DoOperation(Operation):
384384
"""A Do operation that will execute a custom DoFn for each input element."""
385385

386-
def __init__(self, spec, pipeline_options):
386+
def __init__(self, spec):
387387
super(DoOperation, self).__init__(spec)
388-
self.state = common.DoFnState(pipeline_options)
388+
self.state = common.DoFnState()
389389

390390
def _read_side_inputs(self, tags_and_types):
391391
"""Generator reading side inputs in the order prescribed by tags_and_types.
@@ -728,8 +728,7 @@ class MapTaskExecutor(object):
728728
multiple_read_instruction_error_msg = (
729729
'Found more than one \'read instruction\' in a single \'map task\'')
730730

731-
def __init__(self, pipeline_options=None):
732-
self.pipeline_options = pipeline_options
731+
def __init__(self):
733732
self._ops = []
734733
self._read_operation = None
735734

@@ -776,7 +775,7 @@ def execute(self, map_task, test_shuffle_source=None, test_shuffle_sink=None):
776775
elif isinstance(spec, maptask.WorkerPartialGroupByKey):
777776
op = create_pgbk_op(spec)
778777
elif isinstance(spec, maptask.WorkerDoFn):
779-
op = DoOperation(spec, self.pipeline_options)
778+
op = DoOperation(spec)
780779
elif isinstance(spec, maptask.WorkerGroupingShuffleRead):
781780
op = GroupedShuffleReadOperation(
782781
spec, shuffle_source=test_shuffle_source)

0 commit comments

Comments
 (0)