@@ -52,17 +52,19 @@ class ReceiverSet(object):
5252 ReceiverSets are attached to the outputting Operation.
5353 """
5454
55- def __init__ (self , output_index = 0 ):
55+ def __init__ (self , coder , output_index = 0 ):
5656 self .receivers = []
5757 self .opcounter = None
5858 self .output_index = output_index
59-
60- def start (self , step_name ):
61- self .opcounter = opcounters .OperationCounters (step_name , self .output_index )
59+ self .coder = coder
6260
6361 def add_receiver (self , receiving_operation ):
6462 self .receivers .append (receiving_operation )
6563
64+ def start (self , step_name ):
65+ self .opcounter = opcounters .OperationCounters (
66+ step_name , self .coder , self .output_index )
67+
6668 def output (self , windowed_value ):
6769 self .update_counters (windowed_value )
6870 for receiver in self .receivers :
@@ -98,20 +100,20 @@ def __init__(self, spec):
98100 spec: A maptask.Worker* instance.
99101 """
100102 self .spec = spec
101- # Create the ReceiverSet for the default output.
102- # We need this in several cases:
103- # A. There may be no receiver explicitly created for an output:
104- # 1. ParDo without anything following it, executed for side effect.
105- # 2. Partition, which generates a default output that isn't used.
106- # B. Write operations want opcounters, even though they have no outputs.
107- self .receivers = [ReceiverSet ()]
103+ self .receivers = []
104+ # Everything except WorkerSideInputSource, which is not a
105+ # top-level operation, should have output_coders
106+ if getattr (self .spec , 'output_coders' , None ):
107+ for i , coder in enumerate (self .spec .output_coders ):
108+ self .receivers .append (ReceiverSet (coder , i ))
108109
109110 def start (self ):
110111 """Start operation."""
111- for receiver in self .receivers :
112- receiver .start (self .step_name )
113112 self .debug_logging_enabled = logging .getLogger ().isEnabledFor (
114113 logging .DEBUG )
114+ # Start our receivers, now that we know our step name.
115+ for receiver in self .receivers :
116+ receiver .start (self .step_name )
115117
116118 def itercounters (self ):
117119 for receiver in self .receivers :
@@ -131,8 +133,6 @@ def output(self, windowed_value, output_index=0):
131133
132134 def add_receiver (self , operation , output_index = 0 ):
133135 """Adds a receiver operation for the specified output."""
134- while len (self .receivers ) <= output_index :
135- self .receivers .append (ReceiverSet (len (self .receivers )))
136136 self .receivers [output_index ].add_receiver (operation )
137137
138138 def __str__ (self ):
@@ -341,7 +341,7 @@ def __init__(self, spec, shuffle_sink=None):
341341 def start (self ):
342342 super (ShuffleWriteOperation , self ).start ()
343343 self .is_ungrouped = self .spec .shuffle_kind == 'ungrouped'
344- coder = self .spec .coder
344+ coder = self .spec .output_coders [ 0 ]
345345 if self .is_ungrouped :
346346 coders = (BytesCoder (), coder )
347347 else :
@@ -418,7 +418,7 @@ def _read_side_inputs(self, tags_and_types):
418418 results = []
419419 for si in itertools .ifilter (
420420 lambda o : o .tag == side_tag , self .spec .side_inputs ):
421- if isinstance (si , maptask .WorkerRead ):
421+ if isinstance (si , maptask .WorkerSideInputSource ):
422422 op = ReadOperation (si )
423423 else :
424424 raise NotImplementedError ('Unknown side input type: %r' % si )
0 commit comments