Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 668b7bb

Browse files
chamikaramjgildea
authored andcommitted
Updates bigquery source/sink to use executing project by default.
This will be used if a project is not specified with the input/output table schema. Updates direct pipeline runner so that sinks have access to PipelineOptions object similar to sources. This fixes #1 ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=116987206
1 parent 43e4942 commit 668b7bb

4 files changed

Lines changed: 36 additions & 7 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ you can write to.
355355
import google.cloud.dataflow as df
356356
input_table = 'clouddataflow-readonly:samples.weather_stations'
357357
project = 'YOUR-PROJECT'
358-
output_table = '%s:DATASET.TABLENAME' % project
358+
output_table = 'DATASET.TABLENAME'
359359
p = df.Pipeline(argv=['--project', project])
360360
(p
361361
| df.Read('read', df.io.BigQuerySource(input_table))
@@ -379,7 +379,7 @@ of using the whole table.
379379
```python
380380
import google.cloud.dataflow as df
381381
project = 'YOUR-PROJECT'
382-
output_table = '%s:DATASET.TABLENAME' % project
382+
output_table = 'DATASET.TABLENAME'
383383
input_query = 'SELECT month, COUNT(month) AS tornado_count ' \
384384
'FROM [clouddataflow-readonly:samples.weather_stations] ' \
385385
'WHERE tornado=true GROUP BY month'

google/cloud/dataflow/io/bigquery.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,8 +466,13 @@ def __init__(self, source, test_bigquery_client=None):
466466
# getting additional details.
467467
self.schema = None
468468
if self.source.query is None:
469+
# If table schema did not define a project we default to executing
470+
# project.
471+
project_id = self.source.table_reference.projectId
472+
if not project_id:
473+
project_id = self.executing_project
469474
self.query = 'SELECT * FROM [%s:%s.%s];' % (
470-
self.source.table_reference.projectId,
475+
project_id,
471476
self.source.table_reference.datasetId,
472477
self.source.table_reference.tableId)
473478
else:
@@ -505,6 +510,12 @@ def __init__(self, sink, test_bigquery_client=None, buffer_size=None):
505510
self.rows_buffer_flush_threshold = buffer_size or 1000
506511
# Figure out the project, dataset, and table used for the sink.
507512
self.project_id = self.sink.table_reference.projectId
513+
514+
# If table schema did not define a project we default to executing project.
515+
if self.project_id is None and hasattr(sink, 'pipeline_options'):
516+
self.project_id = (
517+
sink.pipeline_options.view_as(GoogleCloudOptions).project)
518+
508519
assert self.project_id is not None
509520

510521
self.dataset_id = self.sink.table_reference.datasetId

google/cloud/dataflow/io/bigquery_test.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from google.cloud.dataflow.internal.json_value import to_json_value
2525
from google.cloud.dataflow.io.bigquery import RowAsDictJsonCoder
2626
from google.cloud.dataflow.io.bigquery import TableRowJsonCoder
27+
from google.cloud.dataflow.utils.options import PipelineOptions
2728

2829
from apitools.base.py.exceptions import HttpError
2930
from apitools.clients import bigquery
@@ -270,6 +271,15 @@ def test_read_from_table_and_multiple_pages(self):
270271
# adjust our expectation below accordingly.
271272
self.assertEqual(actual_rows, expected_rows * 2)
272273

274+
def test_table_schema_without_project(self):
275+
# Reader should pick executing project by default.
276+
source = df.io.BigQuerySource(table='mydataset.mytable')
277+
options = PipelineOptions(flags=['--project', 'myproject'])
278+
source.pipeline_options = options
279+
reader = source.reader()
280+
self.assertEquals('SELECT * FROM [myproject:mydataset.mytable];',
281+
reader.query)
282+
273283

274284
class TestBigQueryWriter(unittest.TestCase):
275285

@@ -427,6 +437,13 @@ def test_rows_are_written(self):
427437
tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest(
428438
rows=expected_rows)))
429439

440+
def test_table_schema_without_project(self):
441+
# Writer should pick executing project by default.
442+
sink = df.io.BigQuerySink(table='mydataset.mytable')
443+
options = PipelineOptions(flags=['--project', 'myproject'])
444+
sink.pipeline_options = options
445+
writer = sink.writer()
446+
self.assertEquals('myproject', writer.project_id)
430447

431448
if __name__ == '__main__':
432449
logging.getLogger().setLevel(logging.INFO)

google/cloud/dataflow/runners/direct_runner.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,8 @@ def run_Flatten(self, transform_node):
189189

190190
@skip_if_cached
191191
def run_Read(self, transform_node):
192-
# TODO(chamikara) Implement a more generic way for passing PipelineOption
193-
# to sources when using DirectRunner.
192+
# TODO(chamikara) Implement a more generic way for passing PipelineOptions
193+
# to sources and sinks when using DirectRunner.
194194
source = transform_node.transform.source
195195
source.pipeline_options = transform_node.inputs[0].pipeline.options
196196
with source.reader() as reader:
@@ -199,7 +199,8 @@ def run_Read(self, transform_node):
199199

200200
@skip_if_cached
201201
def run__NativeWrite(self, transform_node):
202-
transform = transform_node.transform
203-
with transform.sink.writer() as writer:
202+
sink = transform_node.transform.sink
203+
sink.pipeline_options = transform_node.inputs[0].pipeline.options
204+
with sink.writer() as writer:
204205
for v in self._cache.get_pvalue(transform_node.inputs[0]):
205206
writer.Write(v.value)

0 commit comments

Comments
 (0)