Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit ac319b0

Browse files
charlesccychensilviulica
authored andcommitted
Forbid use of PubSub I/O in batch and local jobs
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=117392517
1 parent 57415a9 commit ac319b0

2 files changed

Lines changed: 18 additions & 0 deletions

File tree

google/cloud/dataflow/io/pubsub.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ def format(self):
3636
"""Source format name required for remote execution."""
3737
return 'pubsub'
3838

39+
def reader(self):
40+
raise NotImplementedError(
41+
'PubSubSource is not supported in local execution.')
42+
3943

4044
class PubSubSink(iobase.NativeSink):
4145
"""Sink for writing to a given Cloud Pub/Sub topic."""
@@ -48,3 +52,7 @@ def __init__(self, topic, coder=coders.StrUtf8Coder()):
4852
def format(self):
4953
"""Sink format name required for remote execution."""
5054
return 'pubsub'
55+
56+
def writer(self):
57+
raise NotImplementedError(
58+
'PubSubSink is not supported in local execution.')

google/cloud/dataflow/runners/dataflow_runner.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,11 @@ def run_Read(self, transform_node):
460460
' a query',
461461
transform.source)
462462
elif transform.source.format == 'pubsub':
463+
standard_options = (
464+
transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
465+
if not standard_options.streaming:
466+
raise ValueError('PubSubSource is currently only available for use in '
467+
'streaming pipelines.')
463468
step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic)
464469
if transform.source.subscription:
465470
step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION,
@@ -526,6 +531,11 @@ def run__NativeWrite(self, transform_node):
526531
step.add_property(
527532
PropertyNames.BIGQUERY_SCHEMA, transform.sink.schema_as_json())
528533
elif transform.sink.format == 'pubsub':
534+
standard_options = (
535+
transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
536+
if not standard_options.streaming:
537+
raise ValueError('PubSubSink is currently only available for use in '
538+
'streaming pipelines.')
529539
step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.topic)
530540
else:
531541
raise ValueError(

0 commit comments

Comments
 (0)