Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 5396bfc

Browse files
committed
Modify --requirements_file behavior to cache locally packages
The packages will be staged and later installed in the workers in a manner that reduces to zero (potentially) PyPI downloads during worker startup. To avoid any downloads the requirements file must contain the transitive closure of all dependencies which is the typical result of running 'pip freeze'. ----Release Notes---- The --requirements_file option caches locally all the packages specified in the requirements file and then stages the files during job submission. Introduce --requirements_cache as an option to override the default location for the cache of downloaded packages. [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=120947879
1 parent 29fac0d commit 5396bfc

3 files changed

Lines changed: 89 additions & 10 deletions

File tree

google/cloud/dataflow/utils/dependency.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,27 @@ def _stage_extra_packages(extra_packages,
198198
return resources
199199

200200

201-
def stage_job_resources(options, file_copy=_dependency_file_copy,
202-
build_setup_args=None, temp_dir=None):
201+
def _populate_requirements_cache(requirements_file, cache_dir):
202+
# The 'pip download' command will not download again if it finds the
203+
# tarball with the proper version already present.
204+
# It will get the packages downloaded in the order they are presented in
205+
# the requirements file and will not download package dependencies.
206+
cmd_args = [
207+
'pip', 'install', '--download', cache_dir,
208+
'-r', requirements_file,
209+
# Download from PyPI source distributions.
210+
'--no-binary', ':all:']
211+
logging.info('Executing command: %s', cmd_args)
212+
result = processes.call(cmd_args)
213+
if result != 0:
214+
raise RuntimeError(
215+
'Failed to execute command: %s. Exit code %d',
216+
cmd_args, result)
217+
218+
219+
def stage_job_resources(
220+
options, file_copy=_dependency_file_copy, build_setup_args=None,
221+
temp_dir=None, populate_requirements_cache=_populate_requirements_cache):
203222
"""Creates (if needed) and stages job resources to options.staging_location.
204223
205224
Args:
@@ -214,6 +233,8 @@ def stage_job_resources(options, file_copy=_dependency_file_copy,
214233
testing.
215234
temp_dir: Temporary folder where the resource building can happen. If None
216235
then a unique temp directory will be created. Used only for testing.
236+
populate_requirements_cache: Callable for populating the requirements cache.
237+
Used only for testing.
217238
218239
Returns:
219240
A list of file names (no paths) for the resources staged. All the files
@@ -247,6 +268,20 @@ def stage_job_resources(options, file_copy=_dependency_file_copy,
247268
REQUIREMENTS_FILE)
248269
file_copy(setup_options.requirements_file, staged_path)
249270
resources.append(REQUIREMENTS_FILE)
271+
requirements_cache_path = (
272+
os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
273+
if setup_options.requirements_cache is None
274+
else setup_options.requirements_cache)
275+
# Populate cache with packages from requirements and stage the files
276+
# in the cache.
277+
if not os.path.exists(requirements_cache_path):
278+
os.makedirs(requirements_cache_path)
279+
populate_requirements_cache(
280+
setup_options.requirements_file, requirements_cache_path)
281+
for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
282+
file_copy(pkg, utils.path.join(google_cloud_options.staging_location,
283+
os.path.basename(pkg)))
284+
resources.append(os.path.basename(pkg))
250285

251286
# Handle a setup file if present.
252287
# We will build the setup package locally and then copy it to the staging

google/cloud/dataflow/utils/dependency_test.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ def create_temp_file(self, path, contents):
4343
f.write(contents)
4444
return f.name
4545

46+
def populate_requirements_cache(self, requirements_file, cache_dir):
47+
_ = requirements_file
48+
self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
49+
self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
50+
4651
def test_no_staging_location(self):
4752
with self.assertRaises(RuntimeError) as cm:
4853
dependency.stage_job_resources(PipelineOptions())
@@ -98,12 +103,16 @@ def test_with_requirements_file(self):
98103
self.create_temp_file(
99104
os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
100105
self.assertEqual(
101-
[dependency.REQUIREMENTS_FILE,
102-
names.PICKLED_MAIN_SESSION_FILE],
103-
dependency.stage_job_resources(options))
106+
sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE,
107+
'abc.txt', 'def.txt']),
108+
sorted(dependency.stage_job_resources(
109+
options,
110+
populate_requirements_cache=self.populate_requirements_cache)))
104111
self.assertTrue(
105112
os.path.isfile(
106113
os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
114+
self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
115+
self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
107116

108117
def test_requirements_file_not_present(self):
109118
staging_dir = tempfile.mkdtemp()
@@ -112,12 +121,38 @@ def test_requirements_file_not_present(self):
112121
options.view_as(GoogleCloudOptions).staging_location = staging_dir
113122
self.update_options(options)
114123
options.view_as(SetupOptions).requirements_file = 'nosuchfile'
115-
dependency.stage_job_resources(options)
124+
dependency.stage_job_resources(
125+
options, populate_requirements_cache=self.populate_requirements_cache)
116126
self.assertEqual(
117127
cm.exception.message,
118128
'The file %s cannot be found. It was specified in the '
119129
'--requirements_file command line option.' % 'nosuchfile')
120130

131+
def test_with_requirements_file_and_cache(self):
132+
staging_dir = tempfile.mkdtemp()
133+
source_dir = tempfile.mkdtemp()
134+
135+
options = PipelineOptions()
136+
options.view_as(GoogleCloudOptions).staging_location = staging_dir
137+
self.update_options(options)
138+
options.view_as(SetupOptions).requirements_file = os.path.join(
139+
source_dir, dependency.REQUIREMENTS_FILE)
140+
options.view_as(SetupOptions).requirements_cache = os.path.join(
141+
tempfile.gettempdir(), 'alternative-cache-dir')
142+
self.create_temp_file(
143+
os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
144+
self.assertEqual(
145+
sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE,
146+
'abc.txt', 'def.txt']),
147+
sorted(dependency.stage_job_resources(
148+
options,
149+
populate_requirements_cache=self.populate_requirements_cache)))
150+
self.assertTrue(
151+
os.path.isfile(
152+
os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
153+
self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
154+
self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
155+
121156
def test_with_setup_file(self):
122157
staging_dir = tempfile.mkdtemp()
123158
source_dir = tempfile.mkdtemp()

google/cloud/dataflow/utils/options.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -364,10 +364,19 @@ def _add_argparse_args(cls, parser):
364364
help=
365365
('Path to a requirements file containing package dependencies. '
366366
'Typically it is produced by a pip freeze command. More details: '
367-
'https://pip.pypa.io/en/latest/reference/pip_freeze.html. If '
368-
'specified, the worker will install the required dependenciesi before'
369-
' running any custom code. Typically the file is named '
370-
'requirements.txt.'))
367+
'https://pip.pypa.io/en/latest/reference/pip_freeze.html. '
368+
'If used, all the packages specified will be downloaded, '
369+
'cached (use --requirements_cache to change default location), '
370+
'and then staged so that they can be automatically installed in '
371+
'workers during startup. The cache is refreshed as needed '
372+
'avoiding extra downloads for existing packages. Typically the '
373+
'file is named requirements.txt.'))
374+
parser.add_argument(
375+
'--requirements_cache',
376+
default=None,
377+
help=
378+
('Path to a folder to cache the packages specified in '
379+
'the requirements file using the --requirements_file option.'))
371380
parser.add_argument(
372381
'--setup_file',
373382
default=None,

0 commit comments

Comments
 (0)