@@ -58,23 +58,20 @@ def __init__(self, spec):
5858 spec: A maptask.Worker* instance.
5959 """
6060 self .spec = spec
61- self .receivers = collections . defaultdict ( list )
61+ self .receivers = [[]]
6262 # Initially we have no counters. Initializing this here makes it
6363 # safe to call itercounters() at any time, even if start() has
6464 # not been called yet.
65- self .counters = collections .defaultdict (self .new_operation_counters )
66-
67- def new_operation_counters (self , output_index = 0 ):
68- return opcounters .OperationCounters (self .step_name , output_index )
65+ self .counters = []
6966
7067 def start (self ):
7168 """Start operation."""
7269 # If the operation has receivers, create one counter set per receiver.
73- for output_index in self .receivers :
74- self . counters [ output_index ] = self .new_operation_counters ( output_index )
70+ self . counters = [ opcounters . OperationCounters ( self .step_name , output_index )
71+ for output_index in range ( len ( self .receivers ))]
7572
7673 def itercounters (self ):
77- for opcounter in self .counters . values () :
74+ for opcounter in self .counters :
7875 for counter in opcounter :
7976 yield counter
8077
@@ -86,8 +83,15 @@ def process(self, o):
8683 """Process element in operation."""
8784 pass
8885
86+ def output (self , windowed_value , output_index = 0 ):
87+ self .counters [output_index ].update (windowed_value )
88+ for receiver in self .receivers [output_index ]:
89+ receiver .process (windowed_value )
90+
8991 def add_receiver (self , operation , output_index = 0 ):
9092 """Adds a receiver operation for the specified output."""
93+ while len (self .receivers ) <= output_index :
94+ self .receivers .append ([])
9195 self .receivers [output_index ].append (operation )
9296
9397 def __str__ (self ):
@@ -151,9 +155,7 @@ def start(self):
151155 windowed_value = value
152156 else :
153157 windowed_value = GlobalWindows .WindowedValue (value )
154- self .counters [0 ].update (windowed_value )
155- for receiver in self .receivers [0 ]:
156- receiver .process (windowed_value )
158+ self .output (windowed_value )
157159
158160 def side_read_all (self , singleton = False ):
159161 # TODO(mairbek): Should we return WindowedValue here?
@@ -244,9 +246,7 @@ def start(self):
244246 for key , key_values in reader :
245247 self ._reader = reader
246248 windowed_value = GlobalWindows .WindowedValue ((key , key_values ))
247- self .counters [0 ].update (windowed_value )
248- for receiver in self .receivers [0 ]:
249- receiver .process (windowed_value )
249+ self .output (windowed_value )
250250
251251 def get_progress (self ):
252252 if self ._reader is not None :
@@ -276,9 +276,7 @@ def start(self):
276276 for value in reader :
277277 self ._reader = reader
278278 windowed_value = GlobalWindows .WindowedValue (value )
279- self .counters [0 ].update (windowed_value )
280- for receiver in self .receivers [0 ]:
281- receiver .process (windowed_value )
279+ self .output (windowed_value )
282280
283281 def get_progress (self ):
284282 # 'UngroupedShuffleReader' does not support progress reporting.
@@ -305,6 +303,7 @@ def start(self):
305303 self .spec .shuffle_writer_config , coder = self .spec .coders )
306304 self .writer = self .shuffle_sink .writer ()
307305 self .writer .__enter__ ()
306+ self .is_ungrouped = self .spec .shuffle_kind == 'ungrouped'
308307
309308 def finish (self ):
310309 logging .debug ('Finishing %s' , self )
@@ -323,7 +322,7 @@ def process(self, o):
323322 # used to reshard workflow outputs into a fixed set of files. This is
324323 # achieved by using an UngroupedShuffleSource to read back the values
325324 # written in 'ungrouped' mode.
326- if self .spec . shuffle_kind == 'ungrouped' :
325+ if self .is_ungrouped :
327326 # We want to spread the values uniformly to all shufflers.
328327 k , v = str (random .getrandbits (64 )), o .value
329328 else :
@@ -411,7 +410,6 @@ def start(self):
411410 # tagged with None and is associated with its corresponding index.
412411 tagged_receivers = {}
413412 tagged_counters = {}
414- self ._tag_map = {}
415413 output_tag_prefix = PropertyNames .OUT + '_'
416414 for index , tag in enumerate (self .spec .output_tags ):
417415 if tag == PropertyNames .OUT :
@@ -441,8 +439,7 @@ def finish(self):
441439 self .dofn_runner .finish ()
442440
443441 def process (self , o ):
444- with logger .PerThreadLoggingContext (step_name = self .step_name ):
445- self .dofn_runner .process (o )
442+ self .dofn_runner .process (o )
446443
447444
448445class CombineOperation (Operation ):
@@ -464,11 +461,9 @@ def process(self, o):
464461 logging .debug ('Processing [%s] in %s' , o , self )
465462 assert isinstance (o , WindowedValue )
466463 key , values = o .value
467- windowed_result = WindowedValue (
464+ windowed_value = WindowedValue (
468465 (key , self .phased_combine_fn .apply (values )), o .timestamp , o .windows )
469- self .counters [0 ].update (windowed_result )
470- for receiver in self .receivers [0 ]:
471- receiver .process (windowed_result )
466+ self .output (windowed_value )
472467
473468
474469def create_pgbk_op (spec ):
@@ -516,9 +511,7 @@ def flush(self, target):
516511 windowed_value = WindowedValue (
517512 (key , output_value ),
518513 vs [0 ].timestamp , windows )
519- self .counters [0 ].update (windowed_value )
520- for receiver in self .receivers [0 ]:
521- receiver .process (windowed_value )
514+ self .output (windowed_value )
522515
523516
524517class PGBKCVOperation (Operation ):
@@ -544,25 +537,30 @@ def process(self, wkv):
544537 entry = self .table .get (wkey , None )
545538 if entry is None :
546539 if self .key_count >= self .max_keys :
547- old_wkey = self .table .iterkeys ().next () # Any key, could use LRU
548- self .output (old_wkey , self .table .pop (old_wkey )[0 ])
549- else :
550- self .key_count += 1
540+ target = self .key_count * 9 // 10
541+ old_wkeys = []
542+ # TODO(robertwb): Use an LRU cache?
543+ for old_wkey , old_wvalue in enumerate (self .table .iterkeys ()):
544+ old_wkeys .append (old_wkey ) # Can't mutate while iterating.
545+ self .output_key (old_wkey , old_wvalue )
546+ self .key_count -= 1
547+ if self .key_count <= target :
548+ break
549+ for old_wkey in reversed (old_wkeys ):
550+ del self .table [old_wkey ]
551+ self .key_count += 1
551552 entry = self .table [wkey ] = [self .combine_fn .create_accumulator ()]
552553 entry [0 ] = self .combine_fn .add_inputs (entry [0 ], [value ])
553554
554555 def finish (self ):
555556 for wkey , value in self .table .iteritems ():
556- self .output (wkey , value [0 ])
557+ self .output_key (wkey , value [0 ])
557558 self .table = {}
558559 self .key_count = 0
559560
560- def output (self , wkey , value ):
561+ def output_key (self , wkey , value ):
561562 windows , key = wkey
562- windowed_value = WindowedValue ((key , value ), windows [0 ].end , windows )
563- self .counters [0 ].update (windowed_value )
564- for receiver in self .receivers [0 ]:
565- receiver .process (windowed_value )
563+ self .output (WindowedValue ((key , value ), windows [0 ].end , windows ))
566564
567565
568566class FlattenOperation (Operation ):
@@ -575,10 +573,7 @@ class FlattenOperation(Operation):
575573 def process (self , o ):
576574 logging .debug ('Processing [%s] in %s' , o , self )
577575 assert isinstance (o , WindowedValue )
578- windowed_result = WindowedValue (o .value , o .timestamp , o .windows )
579- self .counters [0 ].update (windowed_result )
580- for receiver in self .receivers [0 ]:
581- receiver .process (windowed_result )
576+ self .output (o )
582577
583578
584579class ReifyTimestampAndWindowsOperation (Operation ):
@@ -601,11 +596,6 @@ def process(self, o):
601596 (k , window .WindowedValue (v , o .timestamp , o .windows )),
602597 o .timestamp , o .windows ))
603598
604- def output (self , windowed_result ):
605- self .counters [0 ].update (windowed_result )
606- for receiver in self .receivers [0 ]:
607- receiver .process (windowed_result )
608-
609599
610600class BatchGroupAlsoByWindowsOperation (Operation ):
611601 """BatchGroupAlsoByWindowsOperation operation.
@@ -650,11 +640,6 @@ def process(self, o):
650640 self .output (
651641 window .WindowedValue ((k , values ), timestamp , [out_window ]))
652642
653- def output (self , windowed_result ):
654- self .counters [0 ].update (windowed_result )
655- for receiver in self .receivers [0 ]:
656- receiver .process (windowed_result )
657-
658643
659644class StreamingGroupAlsoByWindowsOperation (Operation ):
660645 """StreamingGroupAlsoByWindowsOperation operation.
@@ -688,11 +673,6 @@ def process(self, o):
688673 self .output (window .WindowedValue ((keyed_work .key , values ), timestamp ,
689674 [out_window ]))
690675
691- def output (self , windowed_result ):
692- self .counters [0 ].update (windowed_result )
693- for receiver in self .receivers [0 ]:
694- receiver .process (windowed_result )
695-
696676
697677class MapTaskExecutor (object ):
698678 """A class for executing map tasks.
0 commit comments