Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 2f9e11c

Browse files
robertwbaaltay
authored andcommitted
Add autoscaling pipeline options
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=124417190
1 parent 9782343 commit 2f9e11c

2 files changed

Lines changed: 23 additions & 3 deletions

File tree

google/cloud/dataflow/internal/apiclient.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,19 @@ def __init__(self, packages, options, environment_version):
224224
parallelWorkerSettings=dataflow.WorkerSettings(
225225
baseUrl='https://dataflow.googleapis.com',
226226
servicePath=self.google_cloud_options.dataflow_endpoint)))
227+
pool.autoscalingSettings = dataflow.AutoscalingSettings()
227228
# Set worker pool options received through command line.
228229
if self.worker_options.num_workers:
229230
pool.numWorkers = self.worker_options.num_workers
231+
if self.worker_options.max_num_workers:
232+
pool.autoscalingSettings.maxNumWorkers = (
233+
self.worker_options.max_num_workers)
234+
if self.worker_options.autoscaling_algorithm:
235+
values_enum = dataflow.AutoscalingSettings.AlgorithmValueValuesEnum
236+
pool.autoscalingSettings.algorithm = {
237+
'NONE': values_enum.AUTOSCALING_ALGORITHM_NONE,
238+
'THROUGHPUT_BASED': values_enum.AUTOSCALING_ALGORITHM_BASIC,
239+
}.get(self.worker_options.autoscaling_algorithm)
230240
if self.worker_options.machine_type:
231241
pool.machineType = self.worker_options.machine_type
232242
if self.worker_options.disk_size_gb:

google/cloud/dataflow/utils/options.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,19 @@ def _add_argparse_args(cls, parser):
269269
help=
270270
('Number of workers to use when executing the Dataflow job. If not '
271271
'set, the Dataflow service will use a reasonable default.'))
272+
parser.add_argument(
273+
'--max_num_workers',
274+
type=int,
275+
default=None,
276+
help=
277+
('Maximum number of workers to use when executing the Dataflow job.'))
278+
parser.add_argument(
279+
'--autoscaling_algorithm',
280+
type=str,
281+
choices=['NONE', 'THROUGHPUT_BASED'],
282+
default=None, # Meaning unset, distinct from 'NONE' meaning don't scale
283+
help=
284+
('If and how to auotscale the workerpool.'))
272285
# TODO(silviuc): Remove --machine_type variant of the flag.
273286
parser.add_argument(
274287
'--worker_machine_type', '--machine_type',
@@ -428,9 +441,6 @@ def _add_argparse_args(cls, parser):
428441
'workers will install them in same order they were specified on the '
429442
'command line.'))
430443

431-
# TODO(silviuc): Add autoscaling related options:
432-
# --autoscaling_algorithm, --max_num_workers.
433-
434444
# TODO(silviuc): Add --files_to_stage option.
435445
# This could potentially replace the --requirements_file and --setup_file.
436446

0 commit comments

Comments
 (0)