Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit cfc6c76

Browse files
robertwbgildea
authored andcommitted
Cythonize runners.common, worker.executor, and utils.counters
Also optimize DoFnRunner for the (common) case of the main output. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=117261672
1 parent 9e84c97 commit cfc6c76

7 files changed

Lines changed: 129 additions & 10 deletions

File tree

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Copyright 2016 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the 'License');
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an 'AS IS' BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
cdef type SideOutputValue, TimestampedValue, WindowedValue
16+
17+
cdef class DoFnRunner(object):
18+
19+
cdef object dofn
20+
cdef object window_fn
21+
cdef object context
22+
cdef object tagged_receivers
23+
cdef object tagged_counters
24+
cdef object logger
25+
cdef object step_name
26+
27+
cdef list main_receivers
28+
cdef object main_counters
29+
30+
cpdef _process_outputs(self, element, results)

google/cloud/dataflow/runners/common.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
# cython: profile=True
16+
1517
"""Worker operations executor."""
1618

1719
import sys
@@ -72,6 +74,10 @@ def finish_bundle(self, context):
7274
self.logger = logger or FakeLogger()
7375
self.step_name = step_name
7476

77+
# Optimize for the common case.
78+
self.main_receivers = tagged_receivers[None]
79+
self.main_counters = tagged_counters[None]
80+
7581
def start(self):
7682
self.context.set_element(None)
7783
try:
@@ -145,10 +151,15 @@ def _process_outputs(self, element, results):
145151
else:
146152
windowed_value = WindowedValue(
147153
result, element.timestamp, element.windows)
148-
for receiver in self.tagged_receivers[tag]:
149-
# TODO(robertwb): Should the counters be on the context?
154+
# TODO(robertwb): Should the counters be on the context?
155+
if tag is None:
156+
self.main_counters.update(windowed_value)
157+
for receiver in self.main_receivers:
158+
receiver.process(windowed_value)
159+
else:
150160
self.tagged_counters[tag].update(windowed_value)
151-
receiver.process(windowed_value)
161+
for receiver in self.tagged_receivers[tag]:
162+
receiver.process(windowed_value)
152163

153164
class NoContext(WindowFn.AssignContext):
154165
"""An uninspectable WindowFn.AssignContext."""
@@ -158,7 +169,7 @@ def __init__(self, value, timestamp=NO_VALUE):
158169
self._timestamp = timestamp
159170
@property
160171
def timestamp(self):
161-
if self._timestamp is NO_VALUE:
172+
if self._timestamp is self.NO_VALUE:
162173
raise ValueError('No timestamp in this context.')
163174
else:
164175
return self._timestamp
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Copyright 2016 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the 'License');
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an 'AS IS' BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# cython: profile=True
16+
# cython: overflowcheck=True
17+
18+
cdef class Counter(object):
19+
cdef readonly object name
20+
cdef readonly object aggregation_kind
21+
cdef long c_total
22+
cdef object py_total
23+
cdef readonly long elements
24+
25+
cdef _update_small(self, long delta)

google/cloud/dataflow/utils/counters.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
# cython: profile=False
16+
# cython: overflowcheck=True
17+
1518
"""Counters collect the progress of the Worker for reporting to the service."""
1619

1720

@@ -61,13 +64,26 @@ def __init__(self, name, aggregation_kind):
6164
"""
6265
self.name = name
6366
self.aggregation_kind = aggregation_kind
64-
self.total = 0
67+
assert aggregation_kind == self.SUM # update only handles sum
68+
self.c_total = 0
69+
self.py_total = 0
6570
self.elements = 0
6671

6772
def update(self, count):
68-
self.total += count
73+
try:
74+
self._update_small(count)
75+
except OverflowError:
76+
self.py_total += count
6977
self.elements += 1
7078

79+
def _update_small(self, delta):
80+
new_total = self.c_total + delta # overflow is checked
81+
self.c_total = new_total
82+
83+
@property
84+
def total(self):
85+
return self.c_total + self.py_total
86+
7187
def __str__(self):
7288
return '<%s>' % self._str_internal()
7389

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Copyright 2016 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the 'License');
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an 'AS IS' BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
cdef class Operation(object):
16+
cdef public spec
17+
cdef public receivers
18+
cdef public counters
19+
20+
cdef public step_name # initialized lazily
21+
22+
cpdef start(self)
23+
cpdef process(self, windowed_value)
24+
cpdef finish(self)
25+
26+
cdef class PGBKCVOperation(Operation):
27+
cdef public object combine_fn
28+
cdef dict table
29+
cdef long max_keys
30+
cdef long key_count
31+
32+
cpdef output(self, tuple wkey, value)

google/cloud/dataflow/worker/executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ def process(self, wkv):
554554
def finish(self):
555555
for wkey, value in self.table.iteritems():
556556
self.output(wkey, value[0])
557-
self.entries = {}
557+
self.table = {}
558558
self.key_count = 0
559559

560560
def output(self, wkey, value):
@@ -815,4 +815,4 @@ def execute(self, map_task, test_shuffle_source=None, test_shuffle_sink=None):
815815
logging.debug('Starting op %d %s', ix, op)
816816
op.start()
817817
for op in self._ops:
818-
op.finish()
818+
op.finish(*())

setup.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,13 @@ def get_download_url():
105105
entry_points={
106106
'console_scripts': CONSOLE_SCRIPTS,
107107
},
108-
ext_modules=cythonize(
109-
['**/*.pyx', 'google/cloud/dataflow/coders/coder_impl.py']),
108+
ext_modules=cythonize([
109+
'**/*.pyx',
110+
'google/cloud/dataflow/coders/coder_impl.py',
111+
'google/cloud/dataflow/runners/common.py',
112+
'google/cloud/dataflow/worker/executor.py',
113+
'google/cloud/dataflow/utils/counters.py',
114+
]),
110115
setup_requires=['nose>=1.0'],
111116
install_requires=REQUIRED_PACKAGES,
112117
test_suite='nose.collector',

0 commit comments

Comments
 (0)