4242from google .cloud .dataflow .worker import shuffle
4343
4444
45+ class ReceiverSet (object ):
46+ """A ReceiverSet represents a graph edge between two Operation nodes.
47+
48+ The ReceiverSet object collects information from the output of the
49+ Operation at one end of its edge and the input of the Operation at
50+ the other edge.
51+ ReceiverSets are attached to the outputting Operation.
52+ """
53+
54+ def __init__ (self , output_index = 0 ):
55+ self .receivers = []
56+ self .opcounter = None
57+ self .output_index = output_index
58+
59+ def start (self , step_name ):
60+ self .opcounter = opcounters .OperationCounters (step_name , self .output_index )
61+
62+ def add_receiver (self , receiving_operation ):
63+ self .receivers .append (receiving_operation )
64+
65+ def output (self , windowed_value ):
66+ self .update_counters (windowed_value )
67+ for receiver in self .receivers :
68+ receiver .process (windowed_value )
69+
70+ def update_counters (self , windowed_value ):
71+ if self .opcounter :
72+ self .opcounter .update (windowed_value )
73+
74+ def itercounters (self ):
75+ if self .opcounter :
76+ for counter in self .opcounter :
77+ yield counter
78+
79+ def __str__ (self ):
80+ return '[%s]' % ' ' .join ([r .str_internal (is_recursive = True )
81+ for r in self .receivers ])
82+
83+
4584class Operation (object ):
4685 """An operation representing the live version of a work item specification.
4786
@@ -58,21 +97,22 @@ def __init__(self, spec):
5897 spec: A maptask.Worker* instance.
5998 """
6099 self .spec = spec
61- self .receivers = [[]]
62- # Initially we have no counters. Initializing this here makes it
63- # safe to call itercounters() at any time, even if start() has
64- # not been called yet.
65- self .counters = []
100+ # Create the ReceiverSet for the default output.
101+ # We need this in several cases:
102+ # A. There may be no receiver explicitly created for an output:
103+ # 1. ParDo without anything following it, executed for side effect.
104+ # 2. Partition, which generates a default output that isn't used.
105+ # B. Write operations want opcounters, even though they have no outputs.
106+ self .receivers = [ReceiverSet ()]
66107
67108 def start (self ):
68109 """Start operation."""
69- # If the operation has receivers, create one counter set per receiver.
70- self .counters = [opcounters .OperationCounters (self .step_name , output_index )
71- for output_index in range (len (self .receivers ))]
110+ for receiver in self .receivers :
111+ receiver .start (self .step_name )
72112
73113 def itercounters (self ):
74- for opcounter in self .counters :
75- for counter in opcounter :
114+ for receiver in self .receivers :
115+ for counter in receiver . itercounters () :
76116 yield counter
77117
78118 def finish (self ):
@@ -84,15 +124,13 @@ def process(self, o):
84124 pass
85125
86126 def output (self , windowed_value , output_index = 0 ):
87- self .counters [output_index ].update (windowed_value )
88- for receiver in self .receivers [output_index ]:
89- receiver .process (windowed_value )
127+ self .receivers [output_index ].output (windowed_value )
90128
91129 def add_receiver (self , operation , output_index = 0 ):
92130 """Adds a receiver operation for the specified output."""
93131 while len (self .receivers ) <= output_index :
94- self .receivers .append ([] )
95- self .receivers [output_index ].append (operation )
132+ self .receivers .append (ReceiverSet ( len ( self . receivers )) )
133+ self .receivers [output_index ].add_receiver (operation )
96134
97135 def __str__ (self ):
98136 """Generates a useful string for this object.
@@ -127,9 +165,7 @@ def str_internal(self, is_recursive=False):
127165
128166 if not is_recursive and getattr (self , 'receivers' , []):
129167 printable_fields .append ('receivers=[%s]' % ', ' .join ([
130- rop .str_internal (is_recursive = True )
131- for oplist in self .receivers
132- for rop in oplist ]))
168+ str (receiver ) for receiver in self .receivers ]))
133169
134170 return '<%s %s>' % (printable_name , ', ' .join (printable_fields ))
135171
@@ -206,7 +242,7 @@ def finish(self):
206242 def process (self , o ):
207243 logging .debug ('Processing [%s] in %s' , o , self )
208244 assert isinstance (o , WindowedValue )
209- self .counters [0 ].update (o )
245+ self .receivers [0 ].update_counters (o )
210246 if self .use_windowed_value :
211247 self .writer .Write (o )
212248 else :
@@ -223,7 +259,7 @@ def __init__(self, spec):
223259 def process (self , o ):
224260 logging .debug ('Processing [%s] in %s' , o , self )
225261 assert isinstance (o , WindowedValue )
226- self .counters [0 ].update (o )
262+ self .receivers [0 ].update_counters (o )
227263 self .spec .output_buffer .append (o .value )
228264
229265
@@ -312,7 +348,7 @@ def finish(self):
312348 def process (self , o ):
313349 logging .debug ('Processing [%s] in %s' , o , self )
314350 assert isinstance (o , WindowedValue )
315- self .counters [0 ].update (o )
351+ self .receivers [0 ].update_counters (o )
316352 # We typically write into shuffle key/value pairs. This is the reason why
317353 # the else branch below expects the value attribute of the WindowedValue
318354 # argument to be a KV pair. However the service may write to shuffle in
@@ -409,7 +445,6 @@ def start(self):
409445 # by the DoFn function to the appropriate receivers. The main output is
410446 # tagged with None and is associated with its corresponding index.
411447 tagged_receivers = {}
412- tagged_counters = {}
413448 output_tag_prefix = PropertyNames .OUT + '_'
414449 for index , tag in enumerate (self .spec .output_tags ):
415450 if tag == PropertyNames .OUT :
@@ -418,19 +453,11 @@ def start(self):
418453 original_tag = tag [len (output_tag_prefix ):]
419454 else :
420455 raise ValueError ('Unexpected output name for operation: %s' % tag )
421- # There may be no receiver for this output, in which case the
422- # lookup will create one, and this value will be processed
423- # for any side effect. This is desirable. There are two (known)
424- # cases where there is no receiver for an output:
425- # 1. ParDo without anything following it, executed for side effect.
426- # 2. Partition (shows up here in the worker as Flatten), which
427- # generates a default output that isn't used.
428456 tagged_receivers [original_tag ] = self .receivers [index ]
429- tagged_counters [original_tag ] = self .counters [index ]
430457
431458 self .dofn_runner = common .DoFnRunner (
432459 fn , args , kwargs , self ._read_side_inputs (tags_and_types ),
433- window_fn , self .context , tagged_receivers , tagged_counters ,
460+ window_fn , self .context , tagged_receivers ,
434461 logger , self .step_name )
435462
436463 self .dofn_runner .start ()
@@ -773,12 +800,12 @@ def execute(self, map_task, test_shuffle_source=None, test_shuffle_sink=None):
773800
774801 # Add receiver operations to the appropriate producers.
775802 if hasattr (op .spec , 'input' ):
776- producer , index = op .spec .input
777- self ._ops [producer ].add_receiver (op , index )
803+ producer , output_index = op .spec .input
804+ self ._ops [producer ].add_receiver (op , output_index )
778805 # Flatten has 'inputs', not 'input'
779806 if hasattr (op .spec , 'inputs' ):
780- for producer , index in op .spec .inputs :
781- self ._ops [producer ].add_receiver (op , index )
807+ for producer , output_index in op .spec .inputs :
808+ self ._ops [producer ].add_receiver (op , output_index )
782809
783810 # Inject the step names into the operations.
784811 # This is used for logging and assigning names to counters.
0 commit comments