2929import google .cloud .dataflow .transforms as ptransform
3030from google .cloud .dataflow .transforms import combiners
3131from google .cloud .dataflow .transforms import trigger
32- from google .cloud .dataflow .transforms import window
3332from google .cloud .dataflow .transforms .combiners import curry_combine_fn
3433from google .cloud .dataflow .transforms .combiners import PhasedCombineFnExecutor
3534from google .cloud .dataflow .transforms .trigger import InMemoryUnmergedState
@@ -501,9 +500,8 @@ def process(self, o):
501500 logging .debug ('Processing [%s] in %s' , o , self )
502501 assert isinstance (o , WindowedValue )
503502 key , values = o .value
504- windowed_value = WindowedValue (
505- (key , self .phased_combine_fn .apply (values )), o .timestamp , o .windows )
506- self .output (windowed_value )
503+ self .output (
504+ o .with_value ((key , self .phased_combine_fn .apply (values ))))
507505
508506
509507def create_pgbk_op (spec ):
@@ -633,10 +631,7 @@ def process(self, o):
633631 logging .debug ('Processing [%s] in %s' , o , self )
634632 assert isinstance (o , WindowedValue )
635633 k , v = o .value
636- self .output (
637- window .WindowedValue (
638- (k , window .WindowedValue (v , o .timestamp , o .windows )),
639- o .timestamp , o .windows ))
634+ self .output (o .with_value ((k , o .with_value (v ))))
640635
641636
642637class BatchGroupAlsoByWindowsOperation (Operation ):
@@ -669,19 +664,15 @@ def process(self, o):
669664 state = InMemoryUnmergedState ()
670665
671666 # TODO(robertwb): Process in smaller chunks.
672- for out_window , values , timestamp in (
673- driver .process_elements (state , vs , MIN_TIMESTAMP )):
674- self .output (
675- window .WindowedValue ((k , values ), timestamp , [out_window ]))
667+ for wvalue in driver .process_elements (state , vs , MIN_TIMESTAMP ):
668+ self .output (wvalue .with_value ((k , wvalue .value )))
676669
677670 while state .timers :
678671 timers = state .get_and_clear_timers ()
679672 for timer_window , (name , time_domain , timestamp ) in timers :
680- for out_window , values , timestamp in (
681- driver .process_timer (timer_window , name , time_domain , timestamp ,
682- state )):
683- self .output (
684- window .WindowedValue ((k , values ), timestamp , [out_window ]))
673+ for wvalue in driver .process_timer (
674+ timer_window , name , time_domain , timestamp , state ):
675+ self .output (wvalue .with_value ((k , wvalue .value )))
685676
686677
687678class StreamingGroupAlsoByWindowsOperation (Operation ):
@@ -703,19 +694,16 @@ def process(self, o):
703694 state = self .spec .context .state
704695 output_watermark = self .spec .context .output_data_watermark
705696
706- for out_window , values , timestamp in (
707- driver .process_elements (state , keyed_work .elements (),
708- output_watermark )):
709- self .output (window .WindowedValue ((keyed_work .key , values ), timestamp ,
710- [out_window ]))
697+ key = keyed_work .key
698+ for wvalue in driver .process_elements (
699+ state , keyed_work .elements (), output_watermark ):
700+ self .output (wvalue .with_value ((key , wvalue .value )))
711701
712702 for timer in keyed_work .timers ():
713703 timer_window = int (timer .namespace )
714- for out_window , values , timestamp in (
715- driver .process_timer (timer_window , timer .name , timer .time_domain ,
716- timer .timestamp , state )):
717- self .output (window .WindowedValue ((keyed_work .key , values ), timestamp ,
718- [out_window ]))
704+ for wvalue in driver .process_timer (
705+ timer_window , timer .name , timer .time_domain , timer .timestamp , state ):
706+ self .output (wvalue .with_value ((key , wvalue .value )))
719707
720708
721709class MapTaskExecutor (object ):
0 commit comments