Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 03a6da2

Browse files
chamikaramjsilviulica
authored andcommitted
Updates BatchWorker to report failure to shutdown progress reporter to service properly.
A progress reporter shutdown failure may indicate a failure to send the last dynamic split response to the service. Hence work item must be marked as failed when such failures occur. Added tests to cover this. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=119558030
1 parent 0499bbe commit 03a6da2

2 files changed

Lines changed: 58 additions & 18 deletions

File tree

google/cloud/dataflow/worker/batchworker.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -435,24 +435,35 @@ def do_work(self, work_item, deferred_exception_details=None):
435435
'was found: %s', work_item, deferred_exception_details)
436436
return
437437

438+
exception_details = None
438439
try:
439440
progress_reporter.start_reporting_progress()
440441
work_executor.execute(work_item.map_task)
441442
except Exception: # pylint: disable=broad-except
442-
progress_reporter.stop_reporting_progress()
443443
exception_details = traceback.format_exc()
444-
logging.error('Exception: %s', exception_details, exc_info=True)
445-
# Completed with errors means failed.
444+
logging.error('An exception was raised when trying to execute the '
445+
'work item %s : %s',
446+
work_item,
447+
exception_details, exc_info=True)
448+
finally:
449+
try:
450+
progress_reporter.stop_reporting_progress()
451+
except Exception: # pylint: disable=broad-except
452+
logging.error('An exception was raised when trying to stop the '
453+
'progress reporter : %s',
454+
traceback.format_exc(), exc_info=True)
455+
# If 'exception_details' was already set, we were already going to
456+
# mark this work item as failed. Hence only logging this error and
457+
# reporting the original error.
458+
if exception_details is None:
459+
# This will be reported to the service and work item will be marked as
460+
# failed.
461+
exception_details = traceback.format_exc()
462+
446463
with work_item.lock:
447-
self.report_completion_status(work_item,
448-
progress_reporter,
464+
self.report_completion_status(work_item, progress_reporter,
449465
exception_details=exception_details)
450466
work_item.done = True
451-
else:
452-
progress_reporter.stop_reporting_progress()
453-
with work_item.lock:
454-
self.report_completion_status(work_item, progress_reporter)
455-
work_item.done = True
456467

457468
def status_server(self):
458469
"""Executes the serving loop for the status server."""

google/cloud/dataflow/worker/batchworker_test.py

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,10 @@ def test_worker_fails_for_deferred_exceptions(
108108
assert not mock_start.called
109109
assert not mock_execute.called
110110

111-
@patch.object(executor.MapTaskExecutor, 'execute')
112-
@patch.object(batchworker.ProgressReporter, 'start_reporting_progress')
113-
@patch.object(batchworker.ProgressReporter, 'stop_reporting_progress')
114-
@patch.object(batchworker.ProgressReporter, 'report_status')
115-
def test_worker_sends_completion_in_case_of_a_failure(
116-
self, mock_report_status, mock_stop, mock_start, mock_execute):
111+
def _run_send_completion_test(self, mock_report_status, mock_stop, mock_start,
112+
mock_execute, expected_exception):
117113
worker = batchworker.BatchWorker(self.dummy_properties(), {})
118114
mock_work_item = mock.MagicMock()
119-
mock_execute.side_effect = Exception('test_exception')
120115
worker.do_work(mock_work_item)
121116

122117
class AnyStringWith(str):
@@ -125,12 +120,46 @@ def __eq__(self, other):
125120
return self in other
126121

127122
mock_report_status.assert_called_with(
128-
completed=True, exception_details=AnyStringWith('test_exception'))
123+
completed=True,
124+
exception_details=AnyStringWith(expected_exception))
129125

130126
mock_start.assert_called_once_with()
131127
mock_execute.assert_called_once_with(mock.ANY)
132128
mock_stop.assert_called_once_with()
133129

130+
@patch.object(executor.MapTaskExecutor, 'execute')
131+
@patch.object(batchworker.ProgressReporter, 'start_reporting_progress')
132+
@patch.object(batchworker.ProgressReporter, 'stop_reporting_progress')
133+
@patch.object(batchworker.ProgressReporter, 'report_status')
134+
def test_send_completion_execute_failure(self, mock_report_status, mock_stop,
135+
mock_start, mock_execute):
136+
mock_execute.side_effect = Exception('test_exception')
137+
self._run_send_completion_test(mock_report_status, mock_stop, mock_start,
138+
mock_execute, 'test_exception')
139+
140+
@patch.object(executor.MapTaskExecutor, 'execute')
141+
@patch.object(batchworker.ProgressReporter, 'start_reporting_progress')
142+
@patch.object(batchworker.ProgressReporter, 'stop_reporting_progress')
143+
@patch.object(batchworker.ProgressReporter, 'report_status')
144+
def test_send_completion_stop_progress_reporter_failure(self,
145+
mock_report_status,
146+
mock_stop, mock_start,
147+
mock_execute):
148+
mock_stop.side_effect = Exception('test_exception')
149+
self._run_send_completion_test(mock_report_status, mock_stop, mock_start,
150+
mock_execute, 'test_exception')
151+
152+
@patch.object(executor.MapTaskExecutor, 'execute')
153+
@patch.object(batchworker.ProgressReporter, 'start_reporting_progress')
154+
@patch.object(batchworker.ProgressReporter, 'stop_reporting_progress')
155+
@patch.object(batchworker.ProgressReporter, 'report_status')
156+
def test_send_completion_execute_and_stop_progress_reporter_failure(
157+
self, mock_report_status, mock_stop, mock_start, mock_execute):
158+
mock_execute.side_effect = Exception('test_exception_1')
159+
mock_stop.side_effect = Exception('test_exception_2')
160+
self._run_send_completion_test(mock_report_status, mock_stop, mock_start,
161+
mock_execute, 'test_exception_1')
162+
134163

135164
class ProgressReporterTest(unittest.TestCase):
136165

0 commit comments

Comments
 (0)