Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit fff0718

Browse files
charlesccychensilviulica
authored andcommitted
Report work item exceptions in the streaming worker
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=118238822
1 parent a1ca465 commit fff0718

5 files changed

Lines changed: 176 additions & 23 deletions

File tree

google/cloud/dataflow/internal/windmill_pb2.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2262,6 +2262,7 @@
22622262
_WATERMARKHOLD.fields_by_name['timestamps'].has_options = True
22632263
_WATERMARKHOLD.fields_by_name['timestamps']._options = _descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\020\001'))
22642264
import abc
2265+
import six
22652266
from grpc.beta import implementations as beta_implementations
22662267
from grpc.framework.common import cardinality
22672268
from grpc.framework.interfaces.face import utilities as face_utilities

google/cloud/dataflow/internal/windmill_service_pb2.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@
4646
DESCRIPTOR.has_options = True
4747
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n5com.google.cloud.dataflow.sdk.runners.worker.windmill'))
4848
import abc
49+
import six
4950
from grpc.beta import implementations as beta_implementations
5051
from grpc.framework.common import cardinality
5152
from grpc.framework.interfaces.face import utilities as face_utilities
5253

53-
class BetaCloudWindmillServiceV1Alpha1Servicer(object):
54+
class BetaCloudWindmillServiceV1Alpha1Servicer(six.with_metaclass(abc.ABCMeta, object)):
5455
"""<fill me in later!>"""
55-
__metaclass__ = abc.ABCMeta
5656
@abc.abstractmethod
5757
def GetWork(self, request, context):
5858
raise NotImplementedError()
@@ -69,9 +69,8 @@ def GetConfig(self, request, context):
6969
def ReportStats(self, request, context):
7070
raise NotImplementedError()
7171

72-
class BetaCloudWindmillServiceV1Alpha1Stub(object):
72+
class BetaCloudWindmillServiceV1Alpha1Stub(six.with_metaclass(abc.ABCMeta, object)):
7373
"""The interface to which stubs will conform."""
74-
__metaclass__ = abc.ABCMeta
7574
@abc.abstractmethod
7675
def GetWork(self, request, timeout):
7776
raise NotImplementedError()

google/cloud/dataflow/worker/streamingworker.py

Lines changed: 61 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import logging
2020
import random
21+
import sys
2122
import time
2223
import traceback
2324

@@ -76,9 +77,13 @@ class StreamingWorker(object):
7677

7778
# Maximum size of the result of a GetWork request.
7879
MAX_GET_WORK_FETCH_BYTES = 64 << 20 # 64m
80+
7981
# Maximum number of items to return in a GetWork request.
8082
MAX_GET_WORK_ITEMS = 100
8183

84+
# Delay to use before retrying work items locally, in seconds.
85+
RETRY_LOCALLY_DELAY = 10.0
86+
8287
def __init__(self, properties):
8388
self.project_id = properties['project_id']
8489
self.job_id = properties['job_id']
@@ -142,24 +147,62 @@ def dispatch_loop(self):
142147
backoff_seconds = min(1.0, backoff_seconds * 2)
143148

144149
for computation_work in work_response.work:
145-
computation_id = computation_work.computation_id
146-
input_data_watermark = windmillio.windmill_to_harness_timestamp(
147-
computation_work.input_data_watermark)
148-
if computation_id not in self.instruction_map:
149-
self.get_config(computation_id)
150-
map_task_proto = self.instruction_map[computation_id]
151-
for work_item in computation_work.work:
152-
try:
153-
self.process(computation_id, map_task_proto, input_data_watermark,
154-
work_item)
155-
except:
156-
logging.error(
157-
'Exception while processing work item for computation %r: '
158-
'%s, %s', computation_id, work_item, traceback.format_exc())
159-
raise
160-
161-
def process(self, computation_id, map_task_proto, input_data_watermark,
162-
work_item):
150+
self.process_computation(computation_work)
151+
152+
def process_computation(self, computation_work):
153+
computation_id = computation_work.computation_id
154+
input_data_watermark = windmillio.windmill_to_harness_timestamp(
155+
computation_work.input_data_watermark)
156+
if computation_id not in self.instruction_map:
157+
self.get_config(computation_id)
158+
map_task_proto = self.instruction_map[computation_id]
159+
for work_item in computation_work.work:
160+
retry_locally = True
161+
while retry_locally:
162+
try:
163+
self.process_work_item(computation_id, map_task_proto,
164+
input_data_watermark, work_item)
165+
break
166+
except: # pylint: disable=bare-except
167+
logging.error(
168+
'Exception while processing work item for computation %r: '
169+
'%s, %s', computation_id, work_item, traceback.format_exc())
170+
171+
# Send exception details to Windmill, retry locally if possible.
172+
retry_locally = self.report_failure(computation_id, work_item,
173+
sys.exc_info())
174+
175+
# TODO(ccy): handle token expiration in retry logic.
176+
# TODO(ccy): handle out-of-memory error in retry logic.
177+
if retry_locally:
178+
logging.error('Execution of work in computation %s for key %r '
179+
'failed; will retry locally.', computation_id,
180+
work_item.key)
181+
time.sleep(StreamingWorker.RETRY_LOCALLY_DELAY)
182+
else:
183+
logging.error('Execution of work in computation %s for key %r '
184+
'failed; Windmill indicated to not retry '
185+
'locally.', computation_id, work_item.key)
186+
187+
def report_failure(self, computation_id, work_item, exc_info):
188+
"""Send exception details to Windmill; returns whether to retry locally."""
189+
exc_type, exc_value, exc_traceback = exc_info
190+
messages = list(line.strip() for line in
191+
(traceback.format_exception_only(exc_type,
192+
exc_value) +
193+
traceback.format_tb(exc_traceback)))
194+
wm_exception = windmill_pb2.Exception(stack_frames=messages)
195+
report_stats_request = windmill_pb2.ReportStatsRequest(
196+
computation_id=computation_id,
197+
key=work_item.key,
198+
sharding_key=work_item.sharding_key,
199+
work_token=work_item.work_token,
200+
exceptions=[wm_exception])
201+
response = self.windmill.ReportStats(report_stats_request)
202+
return not response.failed
203+
204+
def process_work_item(self, computation_id, map_task_proto,
205+
input_data_watermark, work_item):
163206
"""Process a work item."""
164207
workitem_commit_request = windmill_pb2.WorkItemCommitRequest(
165208
key=work_item.key,
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Copyright 2016 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Tests for the streaming worker.
16+
17+
These tests check that the streaming worker harness runs properly, with mocked
18+
interactions with Windmill.
19+
"""
20+
21+
import logging
22+
import unittest
23+
24+
25+
import mock
26+
27+
from google.cloud.dataflow.internal import windmill_pb2
28+
from google.cloud.dataflow.worker.streamingworker import StreamingWorker
29+
30+
31+
class StreamingWorkerTest(unittest.TestCase):
32+
33+
@mock.patch(
34+
'google.cloud.dataflow.worker.streamingworker.WindmillClient')
35+
def _get_worker(self, *unused_mocks):
36+
fake_properties = {
37+
'project_id': 'fake_project',
38+
'job_id': 'fake_job',
39+
'worker_id': 'fake_worker',
40+
'windmill.host': 'fake_host',
41+
'windmill.grpc_port': '12345',
42+
}
43+
return StreamingWorker(fake_properties)
44+
45+
def _get_worker_and_single_computation(self):
46+
worker = self._get_worker()
47+
computation_work = windmill_pb2.ComputationWorkItems(
48+
computation_id='A1',
49+
work=[windmill_pb2.WorkItem(
50+
key='k',
51+
work_token=12345)])
52+
worker.instruction_map['A1'] = mock.Mock()
53+
return worker, computation_work
54+
55+
@mock.patch('google.cloud.dataflow.worker.streamingworker.StreamingWorker.'
56+
'process_work_item')
57+
def test_successful_work_item(self, *unused_mocks):
58+
worker, computation_work = self._get_worker_and_single_computation()
59+
worker.process_computation(computation_work)
60+
self.assertEqual(0, len(worker.windmill.ReportStats.call_args_list))
61+
self.assertEqual(1, len(worker.process_work_item.call_args_list))
62+
63+
@mock.patch('google.cloud.dataflow.worker.streamingworker.StreamingWorker.'
64+
'process_work_item')
65+
@mock.patch('logging.error')
66+
def test_failed_work_item(self, *unused_mocks):
67+
worker, computation_work = self._get_worker_and_single_computation()
68+
worker.windmill.ReportStats.return_value = (
69+
windmill_pb2.ReportStatsResponse(failed=True))
70+
worker.process_work_item.side_effect = Exception
71+
72+
worker.process_computation(computation_work)
73+
74+
# Verify number of attempts and that failed work was reported.
75+
self.assertEqual(1, len(worker.windmill.ReportStats.call_args_list))
76+
self.assertEqual(1, len(worker.process_work_item.call_args_list))
77+
logging.error.assert_called_with(
78+
'Execution of work in computation %s for key %r failed; Windmill '
79+
'indicated to not retry locally.', u'A1', 'k')
80+
81+
@mock.patch('google.cloud.dataflow.worker.streamingworker.StreamingWorker.'
82+
'process_work_item')
83+
@mock.patch('logging.error')
84+
@mock.patch('time.sleep')
85+
def test_retrying_failed_work_item(self, *unused_mocks):
86+
worker, computation_work = self._get_worker_and_single_computation()
87+
retries = 5
88+
worker.windmill.ReportStats.side_effect = (
89+
[windmill_pb2.ReportStatsResponse(failed=False)] * retries)
90+
worker.process_work_item.side_effect = (
91+
[Exception] * retries + [None])
92+
93+
worker.process_computation(computation_work)
94+
95+
# Verify number of attempts and that failed work was reported the correct
96+
# number of times.
97+
self.assertEqual(retries, len(worker.windmill.ReportStats.call_args_list))
98+
self.assertEqual(retries + 1, len(worker.process_work_item.call_args_list))
99+
logging.error.assert_called_with(
100+
'Execution of work in computation %s for key %r failed; will retry '
101+
'locally.', u'A1', 'k')
102+
103+
if __name__ == '__main__':
104+
logging.getLogger().setLevel(logging.INFO)
105+
unittest.main()

setup.cfg

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@ verbosity=2
55
# Exclude some unit tests because they define command line options. Nose runs
66
# tests by loading the corresponding modules in the same process and the
77
# side-effect of defining command line options makes other tests fail.
8+
#
89
# TODO(silviuc): Find a way to run the remaining tests excluded here.
10+
#
911
# The following tests are excluded because they try to load the Cython-based
1012
# fast_coders module which is not available when running unit tests:
1113
# fast_coders_test, typecoders_test, workitem_test, and executor_test.
12-
exclude=examples|bigquery_test|ptransform_test|fast_coders_test|typecoders_test|workitem_test|executor_test
14+
#
15+
# The streamingworker_test test is excluded because it depends on protobuf and
16+
# gRPC.
17+
exclude=examples|bigquery_test|ptransform_test|fast_coders_test|typecoders_test|workitem_test|executor_test|streamingworker_test
1318

0 commit comments

Comments
 (0)