Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 59a4566

Browse files
gildeasilviulica
authored andcommitted
Use a CounterFactory to create counters
With this change, steps with the same name use the same counter. Previously, we would send two counters with the same name to the Dataflow service, which would reject the update. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=118901263
1 parent 253d7e2 commit 59a4566

10 files changed

Lines changed: 185 additions & 107 deletions

File tree

google/cloud/dataflow/internal/apiclient.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -738,9 +738,8 @@ def report_status(self,
738738
work_item_status.errors.append(status)
739739

740740
# Look through the work item for metrics to send.
741-
for op in work_item.map_task.executed_operations:
742-
for counter in op.itercounters():
743-
append_counter(work_item_status, counter, tentative=not completed)
741+
for counter in work_item.map_task.itercounters():
742+
append_counter(work_item_status, counter, tentative=not completed)
744743

745744
report_request = dataflow.ReportWorkItemStatusRequest()
746745
report_request.currentWorkerTime = worker.current_time

google/cloud/dataflow/runners/common.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from google.cloud.dataflow.transforms.window import TimestampedValue
2525
from google.cloud.dataflow.transforms.window import WindowedValue
2626
from google.cloud.dataflow.transforms.window import WindowFn
27-
from google.cloud.dataflow.utils import counters
2827

2928

3029
class FakeLogger(object):
@@ -172,17 +171,11 @@ class DoFnState(object):
172171
"""Keeps track of state that DoFns want, currently, user counters.
173172
"""
174173

175-
def __init__(self):
174+
def __init__(self, counter_factory):
176175
self.step_name = ''
177-
self._user_counters = {}
176+
self._counter_factory = counter_factory
178177

179178
def counter_for(self, aggregator):
180179
"""Looks up the counter for this aggregator, creating one if necessary."""
181-
if aggregator not in self._user_counters:
182-
self._user_counters[aggregator] = counters.AggregatorCounter(
183-
self.step_name, aggregator)
184-
return self._user_counters[aggregator]
185-
186-
def itercounters(self):
187-
"""Returns an iterable of Counters (to be sent to the service)."""
188-
return self._user_counters.values()
180+
return self._counter_factory.get_aggregator_counter(
181+
self.step_name, aggregator)

google/cloud/dataflow/runners/direct_runner.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from google.cloud.dataflow.typehints.typecheck import OutputCheckWrapperDoFn
4040
from google.cloud.dataflow.typehints.typecheck import TypeCheckError
4141
from google.cloud.dataflow.typehints.typecheck import TypeCheckWrapperDoFn
42+
from google.cloud.dataflow.utils import counters
4243
from google.cloud.dataflow.utils.options import TypeOptions
4344

4445

@@ -52,6 +53,7 @@ class DirectPipelineRunner(PipelineRunner):
5253
def __init__(self, cache=None):
5354
# Cache of values computed while the runner executes a pipeline.
5455
self._cache = cache if cache is not None else PValueCache()
56+
self._counter_factory = counters.CounterFactory()
5557

5658
def get_pvalue(self, pvalue):
5759
"""Gets the PValue's computed value from the runner's cache."""
@@ -78,7 +80,8 @@ def func_wrapper(self, pvalue, *args, **kwargs):
7880
def run_ParDo(self, transform_node):
7981
transform = transform_node.transform
8082
# TODO(gildea): what is the appropriate object to attach the state to?
81-
context = DoFnProcessContext(label=transform.label, state=DoFnState())
83+
context = DoFnProcessContext(label=transform.label,
84+
state=DoFnState(self._counter_factory))
8285

8386
# Construct the list of values from side-input PCollections that we'll
8487
# substitute into the arguments for DoFn methods.

google/cloud/dataflow/utils/counters.py

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,60 @@ def _str_internal(self):
9797

9898

9999
class AggregatorCounter(Counter):
100-
"""A Counter that represents a step-specific instance of an Aggregator."""
100+
"""A Counter that represents a step-specific instance of an Aggregator.
101101
102-
def __init__(self, step_name, aggregator):
103-
super(AggregatorCounter, self).__init__(
104-
'user-%s-%s' % (step_name, aggregator.name),
105-
aggregator.aggregation_kind)
102+
Do not create directly, call CounterFactory.get_aggregator_counter instead.
103+
"""
104+
105+
106+
class CounterFactory(object):
107+
"""Keeps track of unique counters."""
108+
109+
def __init__(self):
110+
self.counters = {}
111+
112+
def get_counter(self, name, aggregation_kind):
113+
"""Returns a counter with the requested name.
114+
115+
Passing in the same name will return the same counter; the
116+
aggregation_kind must agree.
117+
118+
Args:
119+
name: the name of this counter. Typically has three parts:
120+
"step-output-counter".
121+
aggregation_kind: one of the kinds defined by this class.
122+
Returns:
123+
A new or existing counter with the requested name.
124+
"""
125+
counter = self.counters.get(name, None)
126+
if counter:
127+
assert counter.aggregation_kind == aggregation_kind
128+
else:
129+
counter = Counter(name, aggregation_kind)
130+
self.counters[name] = counter
131+
return counter
132+
133+
def get_aggregator_counter(self, step_name, aggregator):
134+
"""Returns an AggregationCounter for this step's aggregator.
135+
136+
Passing in the same values will return the same counter.
137+
138+
Args:
139+
step_name: the name of this step.
140+
aggregator: an Aggregator object.
141+
Returns:
142+
A new or existing counter.
143+
"""
144+
name = 'user-%s-%s' % (step_name, aggregator.name)
145+
aggregation_kind = aggregator.aggregation_kind
146+
counter = self.counters.get(name, None)
147+
if counter:
148+
assert isinstance(counter, AggregatorCounter)
149+
assert counter.aggregation_kind == aggregation_kind
150+
else:
151+
counter = AggregatorCounter(name, aggregation_kind)
152+
self.counters[name] = counter
153+
return counter
154+
155+
def itercounters(self):
156+
return self.counters.itervalues()

google/cloud/dataflow/worker/executor.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ cimport cython
1616

1717
cdef class Operation(object):
1818
cdef public spec
19+
cdef public counter_factory
1920
cdef public list receivers
2021
cdef public list counters
2122
cdef readonly bint debug_logging_enabled

0 commit comments

Comments
 (0)