1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15+ # cython: profile=True
16+
1517"""Worker operations executor."""
1618
1719import collections
2426from google .cloud .dataflow .pvalue import EmptySideInput
2527from google .cloud .dataflow .runners import common
2628import google .cloud .dataflow .transforms as ptransform
29+ from google .cloud .dataflow .transforms import combiners
2730from google .cloud .dataflow .transforms import trigger
2831from google .cloud .dataflow .transforms import window
32+ from google .cloud .dataflow .transforms .combiners import curry_combine_fn
2933from google .cloud .dataflow .transforms .combiners import PhasedCombineFnExecutor
3034from google .cloud .dataflow .transforms .trigger import InMemoryUnmergedState
3135from google .cloud .dataflow .transforms .window import GlobalWindows
@@ -147,8 +151,8 @@ def start(self):
147151 windowed_value = value
148152 else :
149153 windowed_value = GlobalWindows .WindowedValue (value )
154+ self .counters [0 ].update (windowed_value )
150155 for receiver in self .receivers [0 ]:
151- self .counters [0 ].update (windowed_value )
152156 receiver .process (windowed_value )
153157
154158 def side_read_all (self , singleton = False ):
@@ -239,9 +243,9 @@ def start(self):
239243 with self .shuffle_source .reader () as reader :
240244 for key , key_values in reader :
241245 self ._reader = reader
246+ windowed_value = GlobalWindows .WindowedValue ((key , key_values ))
247+ self .counters [0 ].update (windowed_value )
242248 for receiver in self .receivers [0 ]:
243- windowed_value = GlobalWindows .WindowedValue ((key , key_values ))
244- self .counters [0 ].update (windowed_value )
245249 receiver .process (windowed_value )
246250
247251 def get_progress (self ):
@@ -271,9 +275,9 @@ def start(self):
271275 with self .shuffle_source .reader () as reader :
272276 for value in reader :
273277 self ._reader = reader
278+ windowed_value = GlobalWindows .WindowedValue (value )
279+ self .counters [0 ].update (windowed_value )
274280 for receiver in self .receivers [0 ]:
275- windowed_value = GlobalWindows .WindowedValue (value )
276- self .counters [0 ].update (windowed_value )
277281 receiver .process (windowed_value )
278282
279283 def get_progress (self ):
@@ -463,11 +467,18 @@ def process(self, o):
463467 key , values = o .value
464468 windowed_result = WindowedValue (
465469 (key , self .phased_combine_fn .apply (values )), o .timestamp , o .windows )
470+ self .counters [0 ].update (windowed_result )
466471 for receiver in self .receivers [0 ]:
467- self .counters [0 ].update (windowed_result )
468472 receiver .process (windowed_result )
469473
470474
475+ def create_pgbk_op (spec ):
476+ if spec .combine_fn :
477+ return PGBKCVOperation (spec )
478+ else :
479+ return PGBKOperation (spec )
480+
481+
471482class PGBKOperation (Operation ):
472483 """Partial group-by-key operation.
473484
@@ -478,16 +489,7 @@ class PGBKOperation(Operation):
478489
479490 def __init__ (self , spec ):
480491 super (PGBKOperation , self ).__init__ (spec )
481- self .phased_combine_fn = None
482- if self .spec .combine_fn :
483- # Combiners do not accept deferred side-inputs (the ignored fourth
484- # argument) and therefore the code to handle the extra args/kwargs is
485- # simpler than for the DoFn's of ParDo.
486- #
487- # TODO(ccy): Combine as we go for each key instead of storing up state
488- # for combination when flushing.
489- fn , args , kwargs = pickler .loads (self .spec .combine_fn )[:3 ]
490- self .phased_combine_fn = PhasedCombineFnExecutor ('add' , fn , args , kwargs )
492+ assert not self .spec .combine_fn
491493 self .table = collections .defaultdict (list )
492494 self .size = 0
493495 # TODO(robertwb) Make this configurable.
@@ -512,16 +514,58 @@ def flush(self, target):
512514 del self .table [kw ]
513515 key , windows = kw
514516 output_value = [v .value [1 ] for v in vs ]
515- if self .phased_combine_fn :
516- output_value = self .phased_combine_fn .apply (output_value )
517517 windowed_value = WindowedValue (
518518 (key , output_value ),
519519 vs [0 ].timestamp , windows )
520+ self .counters [0 ].update (windowed_value )
520521 for receiver in self .receivers [0 ]:
521- self .counters [0 ].update (windowed_value )
522522 receiver .process (windowed_value )
523523
524524
525+ class PGBKCVOperation (Operation ):
526+
527+ def __init__ (self , spec ):
528+ super (PGBKCVOperation , self ).__init__ (spec )
529+ # Combiners do not accept deferred side-inputs (the ignored fourth
530+ # argument) and therefore the code to handle the extra args/kwargs is
531+ # simpler than for the DoFn's of ParDo.
532+ fn , args , kwargs = pickler .loads (self .spec .combine_fn )[:3 ]
533+ self .combine_fn = curry_combine_fn (fn , args , kwargs )
534+ # Optimization for the (known tiny accumulator, often wide keyspace)
535+ # count function.
536+ # TODO(robertwb): Bound by in-memory size rather than key count.
537+ self .max_keys = (
538+ 1000000 if isinstance (fn , combiners .CountCombineFn ) else 10000 )
539+ self .key_count = 0
540+ self .table = {}
541+
542+ def process (self , wkv ):
543+ key , value = wkv .value
544+ wkey = tuple (wkv .windows ), key
545+ entry = self .table .get (wkey , None )
546+ if entry is None :
547+ if self .key_count >= self .max_keys :
548+ old_wkey = self .table .iterkeys ().next () # Any key, could use LRU
549+ self .output (old_wkey , self .table .pop (old_wkey )[0 ])
550+ else :
551+ self .key_count += 1
552+ entry = self .table [wkey ] = [self .combine_fn .create_accumulator ()]
553+ entry [0 ] = self .combine_fn .add_inputs (entry [0 ], [value ])
554+
555+ def finish (self ):
556+ for wkey , value in self .table .iteritems ():
557+ self .output (wkey , value [0 ])
558+ self .entries = {}
559+ self .key_count = 0
560+
561+ def output (self , wkey , value ):
562+ windows , key = wkey
563+ windowed_value = WindowedValue ((key , value ), windows [0 ].end , windows )
564+ self .counters [0 ].update (windowed_value )
565+ for receiver in self .receivers [0 ]:
566+ receiver .process (windowed_value )
567+
568+
525569class FlattenOperation (Operation ):
526570 """Flatten operation.
527571
@@ -533,8 +577,8 @@ def process(self, o):
533577 logging .debug ('Processing [%s] in %s' , o , self )
534578 assert isinstance (o , WindowedValue )
535579 windowed_result = WindowedValue (o .value , o .timestamp , o .windows )
580+ self .counters [0 ].update (windowed_result )
536581 for receiver in self .receivers [0 ]:
537- self .counters [0 ].update (windowed_result )
538582 receiver .process (windowed_result )
539583
540584
@@ -559,8 +603,8 @@ def process(self, o):
559603 o .timestamp , o .windows ))
560604
561605 def output (self , windowed_result ):
606+ self .counters [0 ].update (windowed_result )
562607 for receiver in self .receivers [0 ]:
563- self .counters [0 ].update (windowed_result )
564608 receiver .process (windowed_result )
565609
566610
@@ -608,8 +652,8 @@ def process(self, o):
608652 window .WindowedValue ((k , values ), timestamp , [out_window ]))
609653
610654 def output (self , windowed_result ):
655+ self .counters [0 ].update (windowed_result )
611656 for receiver in self .receivers [0 ]:
612- self .counters [0 ].update (windowed_result )
613657 receiver .process (windowed_result )
614658
615659
@@ -646,8 +690,8 @@ def process(self, o):
646690 [out_window ]))
647691
648692 def output (self , windowed_result ):
693+ self .counters [0 ].update (windowed_result )
649694 for receiver in self .receivers [0 ]:
650- self .counters [0 ].update (windowed_result )
651695 receiver .process (windowed_result )
652696
653697
@@ -706,7 +750,7 @@ def execute(self, map_task, test_shuffle_source=None, test_shuffle_sink=None):
706750 elif isinstance (spec , maptask .WorkerCombineFn ):
707751 op = CombineOperation (spec )
708752 elif isinstance (spec , maptask .WorkerPartialGroupByKey ):
709- op = PGBKOperation (spec )
753+ op = create_pgbk_op (spec )
710754 elif isinstance (spec , maptask .WorkerDoFn ):
711755 op = DoOperation (spec )
712756 elif isinstance (spec , maptask .WorkerGroupingShuffleRead ):
0 commit comments