Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit 77d500f

Browse files
chamikaramjsilviulica
authored andcommitted
Renames Source/Reader classes for native sources/readers.
This makes it clear that these should not be sub-classed by users. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=120172996
1 parent be1d547 commit 77d500f

10 files changed

Lines changed: 37 additions & 33 deletions

File tree

google/cloud/dataflow/io/bigquery.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ def _parse_table_reference(table, dataset=None, project=None):
264264
# BigQuerySource, BigQuerySink.
265265

266266

267-
class BigQuerySource(iobase.Source):
267+
class BigQuerySource(iobase.NativeSource):
268268
"""A source based on a BigQuery table."""
269269

270270
def __init__(self, table=None, dataset=None, project=None, query=None,
@@ -444,7 +444,7 @@ def writer(self, test_bigquery_client=None, buffer_size=None):
444444
# BigQueryReader, BigQueryWriter.
445445

446446

447-
class BigQueryReader(iobase.SourceReader):
447+
class BigQueryReader(iobase.NativeSourceReader):
448448
"""A reader for a BigQuery source."""
449449

450450
def __init__(self, source, test_bigquery_client=None):

google/cloud/dataflow/io/fileio.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def _gcs_file_copy(from_path, to_path, encoding=''):
5656
# TextFileSource, TextFileSink.
5757

5858

59-
class TextFileSource(iobase.Source):
59+
class TextFileSource(iobase.NativeSource):
6060
"""A source for a GCS or local text file.
6161
6262
Parses a text file as newline-delimited elements, by default assuming
@@ -250,7 +250,7 @@ def __eq__(self, other):
250250
# TextFileReader, TextMultiFileReader.
251251

252252

253-
class TextFileReader(iobase.SourceReader):
253+
class TextFileReader(iobase.NativeSourceReader):
254254
"""A reader for a text file source."""
255255

256256
def __init__(self, source):
@@ -347,7 +347,7 @@ def request_dynamic_split(self, dynamic_split_request):
347347
return
348348

349349

350-
class TextMultiFileReader(iobase.SourceReader):
350+
class TextMultiFileReader(iobase.NativeSourceReader):
351351
"""A reader for a multi-file text source."""
352352

353353
def __init__(self, source):

google/cloud/dataflow/io/iobase.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,17 @@ def _dict_printable_fields(dict_object, skip_fields):
5151
'compression_type']
5252

5353

54-
class Source(object):
55-
"""Generic source."""
54+
class NativeSource(object):
55+
"""A source implemented by Dataflow service.
56+
57+
This class is to be only inherited by sources natively implemented by Cloud
58+
Dataflow service, hence should not be sub-classed by users.
59+
60+
This class is deprecated and should not be used to define new sources.
61+
"""
5662

5763
def reader(self):
58-
"""Returns a SourceReader instance associated with this source."""
64+
"""Returns a NativeSourceReader instance associated with this source."""
5965
raise NotImplementedError
6066

6167
def __repr__(self):
@@ -65,8 +71,8 @@ def __repr__(self):
6571
_minor_fields)))
6672

6773

68-
class SourceReader(object):
69-
"""A generic reader for a source."""
74+
class NativeSourceReader(object):
75+
"""A reader for a source implemented by Dataflow service."""
7076

7177
def __enter__(self):
7278
"""Opens everything necessary for a reader to function properly."""
@@ -98,12 +104,13 @@ def request_dynamic_split(self, dynamic_split_request):
98104
"""Attempts to split the input in two parts.
99105
100106
The two parts are named the "primary" part and the "residual" part. The
101-
current 'SourceReader' keeps processing the primary part, while the
107+
current 'NativeSourceReader' keeps processing the primary part, while the
102108
residual part will be processed elsewhere (e.g. perhaps on a different
103109
worker).
104110
105111
The primary and residual parts, if concatenated, must represent the
106-
same input as the current input of this 'SourceReader' before this call.
112+
same input as the current input of this 'NativeSourceReader' before this
113+
call.
107114
108115
The boundary between the primary part and the residual part is
109116
specified in a framework-specific way using 'DynamicSplitRequest' e.g.,
@@ -127,7 +134,7 @@ def request_dynamic_split(self, dynamic_split_request):
127134
128135
Returns:
129136
'None' if the 'DynamicSplitRequest' cannot be honored (in that
130-
case the input represented by this 'SourceReader' stays the same),
137+
case the input represented by this 'NativeSourceReader' stays the same),
131138
or a 'DynamicSplitResult' describing how the input was split into a
132139
primary and residual part.
133140
"""
@@ -139,7 +146,7 @@ def request_dynamic_split(self, dynamic_split_request):
139146

140147

141148
class ReaderProgress(object):
142-
"""A representation of how far a SourceReader has read through the source."""
149+
"""A representation of how far a NativeSourceReader has read."""
143150

144151
def __init__(self, position=None, percent_complete=None, remaining_time=None):
145152

@@ -179,7 +186,7 @@ def remaining_time(self):
179186

180187

181188
class ReaderPosition(object):
182-
"""A representation of position in an iteration through a 'SourceReader'."""
189+
"""A representation of position in an iteration of a 'NativeSourceReader'."""
183190

184191
def __init__(self, end=None, key=None, byte_offset=None, record_index=None,
185192
shuffle_position=None, concat_position=None):
@@ -231,7 +238,7 @@ def __init__(self, index, position):
231238

232239

233240
class DynamicSplitRequest(object):
234-
"""Specifies how 'SourceReader.request_dynamic_split' should split the input.
241+
"""Specifies how 'NativeSourceReader.request_dynamic_split' should split.
235242
"""
236243

237244
def __init__(self, progress):
@@ -291,9 +298,6 @@ def Write(self, o): # pylint: disable=invalid-name
291298
class RangeTracker(object):
292299
"""A thread-safe helper object for implementing dynamic work rebalancing.
293300
294-
``RangeTracker`` can be used for implementing dynamic work rebalancing in
295-
position-based subclasses of ``iobase.SourceReader``.
296-
297301
**Usage of the RangeTracker class hierarchy**
298302
299303
The ``RangeTracker`` class should not be used per se---all users should use

google/cloud/dataflow/io/pubsub.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from google.cloud.dataflow.io import iobase
2424

2525

26-
class PubSubSource(iobase.Source):
26+
class PubSubSource(iobase.NativeSource):
2727
"""Source for reading from a given Cloud Pub/Sub topic.
2828
2929
Attributes:

google/cloud/dataflow/pipeline_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import logging
1919
import unittest
2020

21-
from google.cloud.dataflow.io.iobase import Source
21+
from google.cloud.dataflow.io.iobase import NativeSource
2222
from google.cloud.dataflow.pipeline import Pipeline
2323
from google.cloud.dataflow.pipeline import PipelineOptions
2424
from google.cloud.dataflow.pipeline import PipelineVisitor
@@ -35,7 +35,7 @@
3535
from google.cloud.dataflow.transforms.util import assert_that, equal_to
3636

3737

38-
class FakeSource(Source):
38+
class FakeSource(NativeSource):
3939
"""Fake source returning a fixed list of values."""
4040

4141
class _Reader(object):

google/cloud/dataflow/worker/concat_reader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from google.cloud.dataflow.io import iobase
2424

2525

26-
class ConcatSource(iobase.Source):
26+
class ConcatSource(iobase.NativeSource):
2727
"""A wrapper source class for ConcatReader."""
2828

2929
def __init__(self, sub_sources):
@@ -36,7 +36,7 @@ def __eq__(self, other):
3636
return self.sub_sources == other.sub_sources
3737

3838

39-
class ConcatReader(iobase.SourceReader):
39+
class ConcatReader(iobase.NativeSourceReader):
4040
"""A reader that reads elements from a given set of encoded sources.
4141
4242
Creates readers for sources lazily, i.e. only when elements

google/cloud/dataflow/worker/concat_reader_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from google.cloud.dataflow.worker import concat_reader
2323

2424

25-
class TestSource(iobase.Source):
25+
class TestSource(iobase.NativeSource):
2626

2727
def __init__(
2828
self, elements, index_to_fail_reading=-1, fail_reader_at_close=False):
@@ -35,7 +35,7 @@ def reader(self):
3535
self.fail_reader_at_close)
3636

3737

38-
class TestReader(iobase.SourceReader):
38+
class TestReader(iobase.NativeSourceReader):
3939

4040
def __init__(self, elements, index_to_fail_reading, fail_reader_at_close):
4141
self.elements = elements

google/cloud/dataflow/worker/inmemory.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from google.cloud.dataflow.io import iobase
2121

2222

23-
class InMemorySource(iobase.Source):
23+
class InMemorySource(iobase.NativeSource):
2424
"""In-memory input source."""
2525

2626
def __init__(
@@ -49,7 +49,7 @@ def reader(self):
4949
return InMemoryReader(self)
5050

5151

52-
class InMemoryReader(iobase.SourceReader):
52+
class InMemoryReader(iobase.NativeSourceReader):
5353
"""A reader for in-memory source."""
5454

5555
def __init__(self, source):

google/cloud/dataflow/worker/shuffle.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ def _str_internal(self):
306306
return '%s on %s' % (self.__class__.__name__, self.key)
307307

308308

309-
class ShuffleReaderBase(iobase.SourceReader):
309+
class ShuffleReaderBase(iobase.NativeSourceReader):
310310
"""A base class for grouped and ungrouped shuffle readers."""
311311

312312
def __init__(self, shuffle_source, reader=None):
@@ -415,7 +415,7 @@ def __iter__(self):
415415
yield self.value_coder.decode(entry.value)
416416

417417

418-
class ShuffleSourceBase(iobase.Source):
418+
class ShuffleSourceBase(iobase.NativeSource):
419419
"""A base class for grouped and ungrouped shuffle sources."""
420420

421421
def __init__(self, config_bytes, coder, start_position='', end_position=''):

google/cloud/dataflow/worker/windmillio.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def reader(self):
4949
return PubSubWindmillReader(self)
5050

5151

52-
class PubSubWindmillReader(iobase.SourceReader):
52+
class PubSubWindmillReader(iobase.NativeSourceReader):
5353
"""Internal worker Windmill PubSub reader."""
5454

5555
def __init__(self, source):
@@ -259,7 +259,7 @@ def __repr__(self):
259259
return '<%s %s>' % (self.__class__.__name__, self.key)
260260

261261

262-
class WindowingWindmillSource(iobase.Source):
262+
class WindowingWindmillSource(iobase.NativeSource):
263263
"""Internal worker PubSubSource which reads from Windmill."""
264264

265265
def __init__(self, context, stream_id, coder):
@@ -270,7 +270,7 @@ def reader(self):
270270
return WindowingWindmillReader(self)
271271

272272

273-
class WindowingWindmillReader(iobase.SourceReader):
273+
class WindowingWindmillReader(iobase.NativeSourceReader):
274274
"""Internal worker Windmill PubSub reader."""
275275

276276
def __init__(self, source):

0 commit comments

Comments
 (0)