Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 69889f6

Browse files
committed
Add check for SDK versus container language/version compatibility
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=120976160
1 parent 5396bfc commit 69889f6

3 files changed

Lines changed: 145 additions & 7 deletions

File tree

google/cloud/dataflow/worker/batchworker.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
from google.cloud.dataflow.utils import options
5454
from google.cloud.dataflow.utils import profiler
5555
from google.cloud.dataflow.utils import retry
56+
from google.cloud.dataflow.worker import environment
5657
from google.cloud.dataflow.worker import executor
5758
from google.cloud.dataflow.worker import logger
5859
from google.cloud.dataflow.worker import maptask
@@ -301,6 +302,8 @@ def __init__(self, properties, sdk_pipeline_options):
301302
self.job_id = properties['job_id']
302303
self.worker_id = properties['worker_id']
303304
self.service_path = properties['service_path']
305+
# TODO(silviuc): Make sure environment_info_path is always specified.
306+
self.environment_info_path = properties.get('environment_info_path', None)
304307
self.pipeline_options = options.PipelineOptions.from_dictionary(
305308
sdk_pipeline_options)
306309
self.capabilities = [self.worker_id, 'remote_source', 'custom_source']
@@ -500,15 +503,25 @@ def run(self):
500503
# happen before they could be reported to the service. If it is not None,
501504
# worker will use the first work item to report deferred exceptions and
502505
# fail eventually.
506+
# TODO(silviuc): Add the deferred exception mechanism to streaming worker
503507
deferred_exception_details = None
504508

505-
logging.info('Loading main session from the staging area...')
506-
try:
507-
self._load_main_session(self.local_staging_directory)
508-
except Exception: # pylint: disable=broad-except
509-
deferred_exception_details = traceback.format_exc()
510-
logging.error('Could not load main session: %s',
511-
deferred_exception_details, exc_info=True)
509+
if self.environment_info_path is not None:
510+
try:
511+
environment.check_sdk_compatibility(self.environment_info_path)
512+
except Exception: # pylint: disable=broad-except
513+
deferred_exception_details = traceback.format_exc()
514+
logging.error('SDK compatibility check failed: %s',
515+
deferred_exception_details, exc_info=True)
516+
517+
if deferred_exception_details is None:
518+
logging.info('Loading main session from the staging area...')
519+
try:
520+
self._load_main_session(self.local_staging_directory)
521+
except Exception: # pylint: disable=broad-except
522+
deferred_exception_details = traceback.format_exc()
523+
logging.error('Could not load main session: %s',
524+
deferred_exception_details, exc_info=True)
512525

513526
# Start status HTTP server thread.
514527
thread = threading.Thread(target=self.status_server)
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Copyright 2016 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Python Dataflow worker environment compatiblity checking."""
16+
17+
import json
18+
import logging
19+
20+
from google.cloud.dataflow import version
21+
22+
23+
def check_sdk_compatibility(environment_info_path):
24+
"""Checks if the SDK is compatible with the container in which it runs.
25+
26+
Args:
27+
environment_info_path: Path to a file in JSON format. The file is expected
28+
to contain a dictionary with at least two properties: 'language'
29+
and 'version'.
30+
31+
Raises:
32+
RuntimeError: For version or language mismatches.
33+
34+
Other exceptions can be raised if the environment file is not present or does
35+
not have the right contents. This can happen only if the base container was
36+
not built correctly.
37+
"""
38+
logging.info('Checking if container and SDK language and versions match ...')
39+
with open(environment_info_path) as f:
40+
info = json.loads(f.read())
41+
if info['language'] != 'python':
42+
message = (
43+
'SDK language \'python\' does not match container language \'%s\'. '
44+
'Please rebuild the container using a matching language container.' % (
45+
info['language']))
46+
logging.error(message)
47+
raise RuntimeError(message)
48+
if info['version'] != version.__version__:
49+
message = (
50+
'SDK version %s does not match container version %s. '
51+
'Please rebuild the container or use a matching version '
52+
'of the SDK.' % (
53+
version.__version__, info['version']))
54+
logging.error(message)
55+
raise RuntimeError(message)
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Copyright 2016 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Tests for worker environment compatibility checking."""
16+
17+
import logging
18+
import os
19+
import tempfile
20+
import unittest
21+
22+
from google.cloud.dataflow import version
23+
from google.cloud.dataflow.worker import environment
24+
25+
26+
class EnvironmentTest(unittest.TestCase):
27+
28+
def create_temp_file(self, path, contents):
29+
with open(path, 'w') as f:
30+
f.write(contents)
31+
return f.name
32+
33+
def test_basics(self):
34+
config_path = os.path.join(tempfile.mkdtemp(), 'config')
35+
self.create_temp_file(
36+
config_path,
37+
'{"language":"python", "version": "%s"}' % version.__version__)
38+
environment.check_sdk_compatibility(config_path)
39+
# If we get here the test passes since no exception was raised.
40+
41+
def test_language_no_match(self):
42+
config_path = os.path.join(tempfile.mkdtemp(), 'config')
43+
self.create_temp_file(
44+
config_path,
45+
'{"language":"java", "version": "%s"}' % version.__version__)
46+
with self.assertRaises(RuntimeError) as exn:
47+
environment.check_sdk_compatibility(config_path)
48+
self.assertEqual(
49+
'SDK language \'python\' does not match container language \'java\'. '
50+
'Please rebuild the container using a matching language container.',
51+
exn.exception.message)
52+
53+
def test_version_no_match(self):
54+
config_path = os.path.join(tempfile.mkdtemp(), 'config')
55+
self.create_temp_file(
56+
config_path, '{"language":"python", "version": "0.0.0"}')
57+
with self.assertRaises(RuntimeError) as exn:
58+
environment.check_sdk_compatibility(config_path)
59+
self.assertEqual(
60+
'SDK version %s does not match container version 0.0.0. '
61+
'Please rebuild the container or use a matching version '
62+
'of the SDK.' % (
63+
version.__version__),
64+
exn.exception.message)
65+
66+
67+
if __name__ == '__main__':
68+
logging.getLogger().setLevel(logging.INFO)
69+
unittest.main()
70+

0 commit comments

Comments
 (0)