Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit c4e9656

Browse files
charlesccychenaaltay
authored andcommitted
Add support for deduplicating id_label in PubSubSource
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=117988854
1 parent 08c56ac commit c4e9656

3 files changed

Lines changed: 21 additions & 2 deletions

File tree

google/cloud/dataflow/io/pubsub.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,26 @@
2424

2525

2626
class PubSubSource(iobase.Source):
27-
"""Source for reading from a given Cloud Pub/Sub topic."""
27+
"""Source for reading from a given Cloud Pub/Sub topic.
2828
29-
def __init__(self, topic, subscription=None, coder=coders.StrUtf8Coder()):
29+
Attributes:
30+
topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
31+
subscription: Optional existing Cloud Pub/Sub subscription to use in the
32+
form "projects/<project>/subscriptions/<subscription>".
33+
id_label: The attribute on incoming Pub/Sub messages to use as a unique
34+
record identifier. When specified, the value of this attribute (which can
35+
be any string that uniquely identifies the record) will be used for
36+
deduplication of messages. If not provided, Dataflow cannot guarantee
37+
that no duplicate data will be delivered on the Pub/Sub stream. In this
38+
case, deduplication of the stream will be strictly best effort.
39+
coder: The Coder to use for decoding incoming Pub/Sub messages.
40+
"""
41+
42+
def __init__(self, topic, subscription=None, id_label=None,
43+
coder=coders.StrUtf8Coder()):
3044
self.topic = topic
3145
self.subscription = subscription
46+
self.id_label = id_label
3247
self.coder = coder
3348

3449
@property

google/cloud/dataflow/runners/dataflow_runner.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,9 @@ def run_Read(self, transform_node):
488488
if transform.source.subscription:
489489
step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION,
490490
transform.source.topic)
491+
if transform.source.id_label:
492+
step.add_property(PropertyNames.PUBSUB_ID_LABEL,
493+
transform.source.id_label)
491494
elif transform.source.format == 'custom':
492495
# TODO(silviuc): Implement custom sources.
493496
raise NotImplementedError

google/cloud/dataflow/utils/names.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class PropertyNames(object):
5858
PARALLEL_INPUT = 'parallel_input'
5959
PUBSUB_TOPIC = 'pubsub_topic'
6060
PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
61+
PUBSUB_ID_LABEL = 'pubsub_id_label'
6162
SERIALIZED_FN = 'serialized_fn'
6263
SHARD_NAME_TEMPLATE = 'shard_template'
6364
STEP_NAME = 'step_name'

0 commit comments

Comments
 (0)