Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 4bb35c6

Browse files
gildeasilviulica
authored andcommitted
Break up OperationCounters.update() into before and after pieces
Split method update() into update_from(), called before iteration, and update_collect(), called after iteration. This will allow us to attach an observer and estimate the size during the iteration. Correspondingly, ReceiverSet.update_counters() gets split into update_counters_start() and update_counters_finish(). ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=119995822
1 parent 2e69a49 commit 4bb35c6

3 files changed

Lines changed: 52 additions & 18 deletions

File tree

google/cloud/dataflow/worker/executor.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,26 +66,36 @@ def start(self, step_name):
6666
self.counter_factory, step_name, self.coder, self.output_index)
6767

6868
def output(self, windowed_value):
69-
self.update_counters(windowed_value)
69+
self.update_counters_start(windowed_value)
7070
for receiver in self.receivers:
7171
receiver.process(windowed_value)
72+
self.update_counters_finish()
7273

73-
def update_counters(self, windowed_value):
74+
def update_counters_start(self, windowed_value):
7475
if self.opcounter:
75-
self.opcounter.update(windowed_value)
76+
self.opcounter.update_from(windowed_value)
77+
78+
def update_counters_finish(self):
79+
if self.opcounter:
80+
self.opcounter.update_collect()
7681

7782
def __str__(self):
7883
return '[%s]' % ' '.join([r.str_internal(is_recursive=True)
7984
for r in self.receivers])
8085

86+
def __repr__(self):
87+
return '<%s %d %s [%s]>' % (
88+
self.__class__.__name__,
89+
self.output_index,
90+
self.coder,
91+
' '.join([r.str_internal(is_recursive=True) for r in self.receivers]))
92+
8193

8294
class Operation(object):
8395
"""An operation representing the live version of a work item specification.
8496
8597
An operation can have one or more outputs and for each output it can have
8698
one or more receiver operations that will take that as input.
87-
TODO(gildea): Refactor "receivers[OUTPUT][RECEIVER]" as
88-
"outputs[INDEX][RECEIVER]"
8999
"""
90100

91101
def __init__(self, spec, counter_factory):
@@ -238,11 +248,12 @@ def process(self, o):
238248
if self.debug_logging_enabled:
239249
logging.debug('Processing [%s] in %s', o, self)
240250
assert isinstance(o, WindowedValue)
241-
self.receivers[0].update_counters(o)
251+
self.receivers[0].update_counters_start(o)
242252
if self.use_windowed_value:
243253
self.writer.Write(o)
244254
else:
245255
self.writer.Write(o.value)
256+
self.receivers[0].update_counters_finish()
246257

247258

248259
class InMemoryWriteOperation(Operation):
@@ -256,8 +267,9 @@ def process(self, o):
256267
if self.debug_logging_enabled:
257268
logging.debug('Processing [%s] in %s', o, self)
258269
assert isinstance(o, WindowedValue)
259-
self.receivers[0].update_counters(o)
270+
self.receivers[0].update_counters_start(o)
260271
self.spec.output_buffer.append(o.value)
272+
self.receivers[0].update_counters_finish()
261273

262274

263275
class GroupedShuffleReadOperation(Operation):
@@ -352,7 +364,7 @@ def process(self, o):
352364
if self.debug_logging_enabled:
353365
logging.debug('Processing [%s] in %s', o, self)
354366
assert isinstance(o, WindowedValue)
355-
self.receivers[0].update_counters(o)
367+
self.receivers[0].update_counters_start(o)
356368
# We typically write into shuffle key/value pairs. This is the reason why
357369
# the else branch below expects the value attribute of the WindowedValue
358370
# argument to be a KV pair. However the service may write to shuffle in
@@ -370,6 +382,7 @@ def process(self, o):
370382
# TODO(silviuc): Use timestamps for the secondary key to get values in
371383
# times-sorted order.
372384
self.writer.Write(k, '', v)
385+
self.receivers[0].update_counters_finish()
373386

374387

375388
class DoOperation(Operation):

google/cloud/dataflow/worker/opcounters.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from __future__ import absolute_import
1818

19+
from google.cloud.dataflow.utils.counters import Accumulator
1920
from google.cloud.dataflow.utils.counters import Counter
2021

2122

@@ -28,16 +29,28 @@ def __init__(self, counter_factory, step_name, coder, output_index):
2829
self.mean_byte_counter = counter_factory.get_counter(
2930
'%s-out%d-MeanByteCount' % (step_name, output_index), Counter.MEAN)
3031
self.coder = coder
32+
self._active_accumulators = []
3133

32-
def update(self, windowed_value):
34+
def update_from(self, windowed_value):
3335
"""Add one value to this counter."""
3436
self.element_counter.update(1)
37+
byte_size_accumulator = Accumulator(self.mean_byte_counter.name)
38+
self._active_accumulators.append(byte_size_accumulator)
3539
# TODO(gildea):
3640
# Actually compute the encoded size of this value.
3741
# In spirit, something like this:
38-
# size = len(self.coder.encode(windowed_value))
39-
# self.mean_byte_counter.update(size)
40-
# but will need to handle streams and do sampling.
42+
# self.coder.store_estimated_size(windowed_value, byte_size_accumulator)
43+
# but will need to do sampling.
44+
45+
def update_collect(self):
46+
"""Collects the accumulated size estimates.
47+
48+
Now that the element has been processed, we ask our accumulator
49+
for the total and store the result in a counter.
50+
"""
51+
for pending in self._active_accumulators:
52+
self.mean_byte_counter.update(pending.total)
53+
self._active_accumulators = []
4154

4255
def __str__(self):
4356
return '<%s [%s]>' % (self.__class__.__name__,

google/cloud/dataflow/worker/opcounters_test.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,25 @@ def test_update_int(self):
4949
opcounts = OperationCounters(CounterFactory(), 'some-name',
5050
coders.PickleCoder(), 0)
5151
self.verify_counters(opcounts, 0)
52-
opcounts.update(GlobalWindows.WindowedValue(1))
52+
opcounts.update_from(GlobalWindows.WindowedValue(1))
53+
opcounts.update_collect()
5354
self.verify_counters(opcounts, 1)
5455

5556
def test_update_str(self):
5657
opcounts = OperationCounters(CounterFactory(), 'some-name',
5758
coders.PickleCoder(), 0)
5859
self.verify_counters(opcounts, 0)
59-
opcounts.update(GlobalWindows.WindowedValue('abcde'))
60+
opcounts.update_from(GlobalWindows.WindowedValue('abcde'))
61+
opcounts.update_collect()
6062
self.verify_counters(opcounts, 1)
6163

6264
def test_update_old_object(self):
6365
opcounts = OperationCounters(CounterFactory(), 'some-name',
6466
coders.PickleCoder(), 0)
6567
self.verify_counters(opcounts, 0)
6668
obj = OldClassThatDoesNotImplementLen()
67-
opcounts.update(GlobalWindows.WindowedValue(obj))
69+
opcounts.update_from(GlobalWindows.WindowedValue(obj))
70+
opcounts.update_collect()
6871
self.verify_counters(opcounts, 1)
6972

7073
def test_update_new_object(self):
@@ -73,16 +76,21 @@ def test_update_new_object(self):
7376
self.verify_counters(opcounts, 0)
7477

7578
obj = ObjectThatDoesNotImplementLen()
76-
opcounts.update(GlobalWindows.WindowedValue(obj))
79+
opcounts.update_from(GlobalWindows.WindowedValue(obj))
80+
opcounts.update_collect()
7781
self.verify_counters(opcounts, 1)
7882

7983
def test_update_multiple(self):
8084
opcounts = OperationCounters(CounterFactory(), 'some-name',
8185
coders.PickleCoder(), 0)
8286
self.verify_counters(opcounts, 0)
83-
opcounts.update(GlobalWindows.WindowedValue('abcde'))
84-
opcounts.update(GlobalWindows.WindowedValue('defghij'))
87+
opcounts.update_from(GlobalWindows.WindowedValue('abcde'))
88+
opcounts.update_from(GlobalWindows.WindowedValue('defghij'))
89+
opcounts.update_collect()
8590
self.verify_counters(opcounts, 2)
91+
opcounts.update_from(GlobalWindows.WindowedValue('klmnop'))
92+
opcounts.update_collect()
93+
self.verify_counters(opcounts, 3)
8694

8795

8896
if __name__ == '__main__':

0 commit comments

Comments
 (0)