From e0f37aebbba899e8643706b84a9441bc82c100bc Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Fri, 6 Feb 2026 16:08:39 +0500 Subject: [PATCH 01/12] Fix IndexError in processes.py when pip subprocess fails with short command - Add _pip_package_from_args() to safely get package string (avoids args[0][6] on short lists) - Guard pip branch with len(args[0]) > 2 before indexing - Use helper in call, check_call, and check_output - Add tests for short pip command path (no IndexError) --- sdks/python/apache_beam/utils/processes.py | 28 ++++++++++--- .../apache_beam/utils/processes_test.py | 39 +++++++++++++++++++ 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py index f6daecea2125..d7af08eeeb0b 100644 --- a/sdks/python/apache_beam/utils/processes.py +++ b/sdks/python/apache_beam/utils/processes.py @@ -44,6 +44,19 @@ else: + def _pip_package_from_args(args): + """Return a safe string for the package field in pip error messages. + + Avoids IndexError when the command list is shorter than 7 elements + (e.g. ['python', '-m', 'pip', 'install', 'pkg']). + """ + if not isinstance(args, tuple) or not args: + return "see output below" + cmd = args[0] + if not isinstance(cmd, (list, tuple)) or len(cmd) <= 6: + return "see output below" + return cmd[6] + def call(*args, **kwargs): if force_shell: kwargs['shell'] = True @@ -52,11 +65,12 @@ def call(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and (args[0][2] == "pip"): + if isinstance(args, tuple) and len(args[0]) > 2 and args[0][2] == "pip": raise RuntimeError( \ "Full traceback: {}\n Pip install failed for package: {} \ \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), args[0][6], error. output)) from error + .format(traceback.format_exc(), + _pip_package_from_args(args), error.output)) from error else: raise RuntimeError("Full trace: {}\ \n Output of the failed child process: {} " \ @@ -71,11 +85,12 @@ def check_call(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and (args[0][2] == "pip"): + if isinstance(args, tuple) and len(args[0]) > 2 and args[0][2] == "pip": raise RuntimeError( \ "Full traceback: {} \n Pip install failed for package: {} \ \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), args[0][6], error.output)) from error + .format(traceback.format_exc(), + _pip_package_from_args(args), error.output)) from error else: raise RuntimeError("Full trace: {} \ \n Output of the failed child process: {}" \ @@ -90,11 +105,12 @@ def check_output(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and (args[0][2] == "pip"): + if isinstance(args, tuple) and len(args[0]) > 2 and args[0][2] == "pip": raise RuntimeError( \ "Full traceback: {} \n Pip install failed for package: {} \ \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), args[0][6], error.output)) from error + .format(traceback.format_exc(), + _pip_package_from_args(args), error.output)) from error else: raise RuntimeError("Full trace: {}, \ output of the failed child process {} "\ diff --git a/sdks/python/apache_beam/utils/processes_test.py b/sdks/python/apache_beam/utils/processes_test.py index 13425550dbbe..777d2948f2c8 100644 --- a/sdks/python/apache_beam/utils/processes_test.py +++ b/sdks/python/apache_beam/utils/processes_test.py @@ -131,6 +131,19 @@ def test_check_call_pip_install_non_existing_package(self): self.assertIn("Pip install failed for package: {}".format(package),\ error.args[0]) + def test_check_call_pip_short_command_no_index_error(self): + """Short pip command (e.g. pip install pkg) must not raise IndexError.""" + returncode = 1 + cmd = ['python', '-m', 'pip', 'install', 'nonexistent-package-xyz'] + output = "ERROR: Could not find a version that satisfies" + self.mock_get.side_effect = subprocess.CalledProcessError( + returncode, cmd, output=output) + with self.assertRaises(RuntimeError) as ctx: + processes.check_call(cmd) + self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) + self.assertIn(output, ctx.exception.args[0]) + self.assertIn("see output below", ctx.exception.args[0]) + class TestErrorHandlingCheckOutput(unittest.TestCase): @classmethod @@ -172,6 +185,19 @@ def test_check_output_pip_install_non_existing_package(self): self.assertIn("Pip install failed for package: {}".format(package),\ error.args[0]) + def test_check_output_pip_short_command_no_index_error(self): + """Short pip command must not raise IndexError.""" + returncode = 1 + cmd = ['python', '-m', 'pip', 'install', 'nonexistent-package-xyz'] + output = "ERROR: Could not find a version" + self.mock_get.side_effect = subprocess.CalledProcessError( + returncode, cmd, output=output) + with self.assertRaises(RuntimeError) as ctx: + processes.check_output(cmd) + self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) + self.assertIn(output, ctx.exception.args[0]) + self.assertIn("see output below", ctx.exception.args[0]) + class TestErrorHandlingCall(unittest.TestCase): @classmethod @@ -213,6 +239,19 @@ def test_check_output_pip_install_non_existing_package(self): self.assertIn("Pip install failed for package: {}".format(package),\ error.args[0]) + def test_call_pip_short_command_no_index_error(self): + """Short pip command must not raise IndexError.""" + returncode = 1 + cmd = ['python', '-m', 'pip', 'install', 'nonexistent-package-xyz'] + output = "ERROR: Could not find a version" + self.mock_get.side_effect = subprocess.CalledProcessError( + returncode, cmd, output=output) + with self.assertRaises(RuntimeError) as ctx: + processes.call(cmd) + self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) + self.assertIn(output, ctx.exception.args[0]) + self.assertIn("see output below", ctx.exception.args[0]) + if __name__ == '__main__': unittest.main() From 3a655ed4523f6092ce0094af1ec1143411249663 Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Fri, 6 Feb 2026 16:17:38 +0500 Subject: [PATCH 02/12] Update CHANGES.md for processes pip IndexError fix --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index f4a04320d66c..7c971698913d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -83,6 +83,7 @@ ## Bugfixes +* Fixed IndexError in `apache_beam.utils.processes` when pip subprocess fails with short command (e.g. `pip install pkg`) (Python) ([#37515](https://github.com/apache/beam/issues/37515)). * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Security Fixes From 1645f97a12b94758a86990c935ad0cd34100a3f5 Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Fri, 6 Feb 2026 17:25:35 +0500 Subject: [PATCH 03/12] Mark MultiProcessSharedTest with no_xdist to fix CI setup failures --- sdks/python/apache_beam/utils/multi_process_shared_test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/utils/multi_process_shared_test.py b/sdks/python/apache_beam/utils/multi_process_shared_test.py index 7b2b11857bfd..3ae0e7b2a92d 100644 --- a/sdks/python/apache_beam/utils/multi_process_shared_test.py +++ b/sdks/python/apache_beam/utils/multi_process_shared_test.py @@ -24,6 +24,8 @@ import unittest from typing import Any +import pytest + from apache_beam.utils import multi_process_shared @@ -85,6 +87,7 @@ def __getattribute__(self, __name: str) -> Any: return object.__getattribute__(self, __name) +@pytest.mark.no_xdist class MultiProcessSharedTest(unittest.TestCase): @classmethod def setUpClass(cls): From 337a137ab5502ae5a43cb75b80771e63bdc0ff64 Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Thu, 19 Feb 2026 11:16:27 +0500 Subject: [PATCH 04/12] Simplify subprocess error handling and remove no_xdist from MultiProcessSharedTest * processes.py: Remove pip-specific parsing (_pip_package_from_args and if blocks). Raise a single RuntimeError with full command, traceback, and child process output for all CalledProcessError cases. * multi_process_shared_test.py: Remove @pytest.mark.no_xdist and unused pytest import so the test suite can run in parallel as intended. --- .../utils/multi_process_shared_test.py | 3 - sdks/python/apache_beam/utils/processes.py | 61 ++++++------------- 2 files changed, 18 insertions(+), 46 deletions(-) diff --git a/sdks/python/apache_beam/utils/multi_process_shared_test.py b/sdks/python/apache_beam/utils/multi_process_shared_test.py index 3ae0e7b2a92d..7b2b11857bfd 100644 --- a/sdks/python/apache_beam/utils/multi_process_shared_test.py +++ b/sdks/python/apache_beam/utils/multi_process_shared_test.py @@ -24,8 +24,6 @@ import unittest from typing import Any -import pytest - from apache_beam.utils import multi_process_shared @@ -87,7 +85,6 @@ def __getattribute__(self, __name: str) -> Any: return object.__getattribute__(self, __name) -@pytest.mark.no_xdist class MultiProcessSharedTest(unittest.TestCase): @classmethod def setUpClass(cls): diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py index d7af08eeeb0b..bae55f8d742a 100644 --- a/sdks/python/apache_beam/utils/processes.py +++ b/sdks/python/apache_beam/utils/processes.py @@ -44,19 +44,6 @@ else: - def _pip_package_from_args(args): - """Return a safe string for the package field in pip error messages. - - Avoids IndexError when the command list is shorter than 7 elements - (e.g. ['python', '-m', 'pip', 'install', 'pkg']). - """ - if not isinstance(args, tuple) or not args: - return "see output below" - cmd = args[0] - if not isinstance(cmd, (list, tuple)) or len(cmd) <= 6: - return "see output below" - return cmd[6] - def call(*args, **kwargs): if force_shell: kwargs['shell'] = True @@ -65,16 +52,12 @@ def call(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and len(args[0]) > 2 and args[0][2] == "pip": - raise RuntimeError( \ - "Full traceback: {}\n Pip install failed for package: {} \ - \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), - _pip_package_from_args(args), error.output)) from error - else: - raise RuntimeError("Full trace: {}\ - \n Output of the failed child process: {} " \ - .format(traceback.format_exc(), error.output)) from error + raise RuntimeError( + "Command that failed: {}\nFull trace: {}\nOutput of the failed " + "child process: {}".format( + args, traceback.format_exc(), + error.output if error.output is not None else "(not captured)") + ) from error return out def check_call(*args, **kwargs): @@ -85,16 +68,12 @@ def check_call(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and len(args[0]) > 2 and args[0][2] == "pip": - raise RuntimeError( \ - "Full traceback: {} \n Pip install failed for package: {} \ - \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), - _pip_package_from_args(args), error.output)) from error - else: - raise RuntimeError("Full trace: {} \ - \n Output of the failed child process: {}" \ - .format(traceback.format_exc(), error.output)) from error + raise RuntimeError( + "Command that failed: {}\nFull trace: {}\nOutput of the failed " + "child process: {}".format( + args, traceback.format_exc(), + error.output if error.output is not None else "(not captured)") + ) from error return out def check_output(*args, **kwargs): @@ -105,16 +84,12 @@ def check_output(*args, **kwargs): except OSError as e: raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: - if isinstance(args, tuple) and len(args[0]) > 2 and args[0][2] == "pip": - raise RuntimeError( \ - "Full traceback: {} \n Pip install failed for package: {} \ - \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), - _pip_package_from_args(args), error.output)) from error - else: - raise RuntimeError("Full trace: {}, \ - output of the failed child process {} "\ - .format(traceback.format_exc(), error.output)) from error + raise RuntimeError( + "Command that failed: {}\nFull trace: {}\nOutput of the failed " + "child process: {}".format( + args, traceback.format_exc(), + error.output if error.output is not None else "(not captured)") + ) from error return out def Popen(*args, **kwargs): From fce90a959b32d9a0e4f9ecb2b9eb2b57bc6f5f06 Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Sat, 21 Feb 2026 12:23:32 +0500 Subject: [PATCH 05/12] Align subprocess error message with tests and drop pip parsing - processes.py: Use 'Output from execution of subprocess:' in RuntimeError and include command and full trace. Keeps test convention without parsing package from args. - processes_test.py: Assert on new message format; remove assertions for 'Pip install failed for package' and 'see output below'. --- sdks/python/apache_beam/utils/processes.py | 24 +++++------ .../apache_beam/utils/processes_test.py | 41 ++++++++----------- 2 files changed, 28 insertions(+), 37 deletions(-) diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py index bae55f8d742a..a5dd5d90aef7 100644 --- a/sdks/python/apache_beam/utils/processes.py +++ b/sdks/python/apache_beam/utils/processes.py @@ -53,10 +53,10 @@ def call(*args, **kwargs): raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: raise RuntimeError( - "Command that failed: {}\nFull trace: {}\nOutput of the failed " - "child process: {}".format( - args, traceback.format_exc(), - error.output if error.output is not None else "(not captured)") + "Output from execution of subprocess: {}\n" + "Command that failed: {}\nFull trace: {}".format( + error.output if error.output is not None else "(not captured)", + args, traceback.format_exc()) ) from error return out @@ -69,10 +69,10 @@ def check_call(*args, **kwargs): raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: raise RuntimeError( - "Command that failed: {}\nFull trace: {}\nOutput of the failed " - "child process: {}".format( - args, traceback.format_exc(), - error.output if error.output is not None else "(not captured)") + "Output from execution of subprocess: {}\n" + "Command that failed: {}\nFull trace: {}".format( + error.output if error.output is not None else "(not captured)", + args, traceback.format_exc()) ) from error return out @@ -85,10 +85,10 @@ def check_output(*args, **kwargs): raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: raise RuntimeError( - "Command that failed: {}\nFull trace: {}\nOutput of the failed " - "child process: {}".format( - args, traceback.format_exc(), - error.output if error.output is not None else "(not captured)") + "Output from execution of subprocess: {}\n" + "Command that failed: {}\nFull trace: {}".format( + error.output if error.output is not None else "(not captured)", + args, traceback.format_exc()) ) from error return out diff --git a/sdks/python/apache_beam/utils/processes_test.py b/sdks/python/apache_beam/utils/processes_test.py index 777d2948f2c8..6330631afdeb 100644 --- a/sdks/python/apache_beam/utils/processes_test.py +++ b/sdks/python/apache_beam/utils/processes_test.py @@ -121,15 +121,13 @@ def test_check_call_pip_install_non_existing_package(self): self.mock_get.side_effect = subprocess.CalledProcessError(returncode,\ cmd, output=output) try: - output = processes.check_call(cmd) + processes.check_call(cmd) self.fail( - "The test failed due to that\ - no error was raised when calling process.check_call") + "The test failed due to that " + "no error was raised when calling process.check_call") except RuntimeError as error: - self.assertIn("Output from execution of subprocess: {}".format(output),\ - error.args[0]) - self.assertIn("Pip install failed for package: {}".format(package),\ - error.args[0]) + self.assertIn("Output from execution of subprocess:", error.args[0]) + self.assertIn(output, error.args[0]) def test_check_call_pip_short_command_no_index_error(self): """Short pip command (e.g. pip install pkg) must not raise IndexError.""" @@ -142,7 +140,6 @@ def test_check_call_pip_short_command_no_index_error(self): processes.check_call(cmd) self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) self.assertIn(output, ctx.exception.args[0]) - self.assertIn("see output below", ctx.exception.args[0]) class TestErrorHandlingCheckOutput(unittest.TestCase): @@ -175,15 +172,13 @@ def test_check_output_pip_install_non_existing_package(self): self.mock_get.side_effect = subprocess.CalledProcessError(returncode,\ cmd, output=output) try: - output = processes.check_output(cmd) + processes.check_output(cmd) self.fail( - "The test failed due to that\ - no error was raised when calling process.check_call") + "The test failed due to that " + "no error was raised when calling process.check_output") except RuntimeError as error: - self.assertIn("Output from execution of subprocess: {}".format(output),\ - error.args[0]) - self.assertIn("Pip install failed for package: {}".format(package),\ - error.args[0]) + self.assertIn("Output from execution of subprocess:", error.args[0]) + self.assertIn(output, error.args[0]) def test_check_output_pip_short_command_no_index_error(self): """Short pip command must not raise IndexError.""" @@ -196,7 +191,6 @@ def test_check_output_pip_short_command_no_index_error(self): processes.check_output(cmd) self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) self.assertIn(output, ctx.exception.args[0]) - self.assertIn("see output below", ctx.exception.args[0]) class TestErrorHandlingCall(unittest.TestCase): @@ -219,7 +213,7 @@ def test_oserror_check_output_message(self): self.assertIn('Executable {} not found'.format(str(cmd)),\ error.args[0]) - def test_check_output_pip_install_non_existing_package(self): + def test_call_pip_install_non_existing_package(self): returncode = 1 package = "non-exsisting-package" cmd = ['python', '-m', 'pip', 'download', '--dest', '/var',\ @@ -229,15 +223,13 @@ def test_check_output_pip_install_non_existing_package(self): self.mock_get.side_effect = subprocess.CalledProcessError(returncode,\ cmd, output=output) try: - output = processes.call(cmd) + processes.call(cmd) self.fail( - "The test failed due to that\ - no error was raised when calling process.check_call") + "The test failed due to that " + "no error was raised when calling process.call") except RuntimeError as error: - self.assertIn("Output from execution of subprocess: {}".format(output),\ - error.args[0]) - self.assertIn("Pip install failed for package: {}".format(package),\ - error.args[0]) + self.assertIn("Output from execution of subprocess:", error.args[0]) + self.assertIn(output, error.args[0]) def test_call_pip_short_command_no_index_error(self): """Short pip command must not raise IndexError.""" @@ -250,7 +242,6 @@ def test_call_pip_short_command_no_index_error(self): processes.call(cmd) self.assertIn("Output from execution of subprocess:", ctx.exception.args[0]) self.assertIn(output, ctx.exception.args[0]) - self.assertIn("see output below", ctx.exception.args[0]) if __name__ == '__main__': From d0461e628167f85bb8d650ed31c8f1435d9d29da Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Tue, 24 Feb 2026 10:26:21 +0500 Subject: [PATCH 06/12] Fix yapf formatting in processes.py --- sdks/python/apache_beam/utils/processes.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py index a5dd5d90aef7..f34b4a48085f 100644 --- a/sdks/python/apache_beam/utils/processes.py +++ b/sdks/python/apache_beam/utils/processes.py @@ -56,8 +56,8 @@ def call(*args, **kwargs): "Output from execution of subprocess: {}\n" "Command that failed: {}\nFull trace: {}".format( error.output if error.output is not None else "(not captured)", - args, traceback.format_exc()) - ) from error + args, + traceback.format_exc())) from error return out def check_call(*args, **kwargs): @@ -72,8 +72,8 @@ def check_call(*args, **kwargs): "Output from execution of subprocess: {}\n" "Command that failed: {}\nFull trace: {}".format( error.output if error.output is not None else "(not captured)", - args, traceback.format_exc()) - ) from error + args, + traceback.format_exc())) from error return out def check_output(*args, **kwargs): @@ -88,8 +88,8 @@ def check_output(*args, **kwargs): "Output from execution of subprocess: {}\n" "Command that failed: {}\nFull trace: {}".format( error.output if error.output is not None else "(not captured)", - args, traceback.format_exc()) - ) from error + args, + traceback.format_exc())) from error return out def Popen(*args, **kwargs): From e291c2b90283981d058458159a1e9d959ec3d162 Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Tue, 3 Mar 2026 11:09:54 +0500 Subject: [PATCH 07/12] Add UnboundedSource API for Python SDK This commit introduces the foundational Python API for unbounded sources in Apache Beam, enabling support for continuous data streams. **Added Components:** - CheckpointMark: Abstract checkpoint interface for stream position tracking - UnboundedReader: Abstract reader protocol for continuous data sources - UnboundedSource: Main abstraction for unbounded data sources - Initial test suite verifying API contracts **Key Features:** - Checkpoint-based recovery for fault tolerance - Watermark tracking for event-time progress - Support for source splitting for parallel reading - Clean separation between unlimited/bounded sources via is_bounded() **Note:** This is a foundational 30-40% implementation of the full PR. Future work includes SDF (Splittable DoFn) wrappers and Read transform integration for complete pipeline support. **Testing:** Added UnboundedSourceTest with 3 test cases covering: - CheckpointMark lifecycle - UnboundedReader interface - UnboundedSource split behavior --- sdks/python/apache_beam/io/iobase.py | 210 ++++++++++++++++++++++ sdks/python/apache_beam/io/iobase_test.py | 87 +++++++++ 2 files changed, 297 insertions(+) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 67d6cd358a07..a30d7adb11e9 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -66,10 +66,13 @@ __all__ = [ 'BoundedSource', + 'CheckpointMark', 'RangeTracker', 'Read', 'RestrictionProgress', 'RestrictionTracker', + 'UnboundedReader', + 'UnboundedSource', 'WatermarkEstimator', 'Sink', 'Write', @@ -241,6 +244,213 @@ def is_bounded(self): return True +class CheckpointMark(object): + """Represents a checkpoint for an UnboundedReader. + + A CheckpointMark is a position in an unbounded source that allows the reader + to resume reading from where it left off. When a checkpoint is finalized, + the reader acknowledges that all records up to this point have been + successfully processed. + + Implementations must be serializable so they can be persisted and restored + across pipeline restarts. + + Example: + class MyCheckpointMark(CheckpointMark): + def __init__(self, offset): + self.offset = offset + + def finalize(self): + # Acknowledge records up to offset + pass + """ + def finalize(self) -> None: + """Called when all records up to this checkpoint have been processed. + + This method should acknowledge or commit the position, ensuring that + records up to this point won't be re-read. This may involve: + - Acknowledging messages in a message queue + - Committing offsets in a log-based system + - Updating a cursor in a database + + Raises: + Exception: if finalization fails + """ + pass + + +class UnboundedReader(object): + """A reader that reads an unbounded amount of input. + + An UnboundedReader is similar to an iterator but designed for continuous + data streams. The reader repeatedly calls start() and advance() to fetch + new records, and returns False when no data is currently available + (but may become available later). + + Lifecycle: + 1. start() - begins reading, returns True if a record is available + 2. advance() - moves to next record, returns True if available + 3. get_current_timestamp() - gets timestamp of current record + 4. get_checkpoint_mark() - creates a checkpoint for resuming + + Example: + reader = source.reader() + if reader.start(): + process(reader.get_current()) + while reader.advance(): + process(reader.get_current()) + """ + def start(self) -> bool: + """Initializes the reader and reads the first record. + + Returns: + True if a record was read, False if no data is currently available. + + Raises: + Exception: if reading fails + """ + raise NotImplementedError + + def advance(self) -> bool: + """Advances to the next record. + + Returns: + True if a record was read, False if no data is currently available. + + Raises: + Exception: if reading fails + """ + raise NotImplementedError + + def get_current(self) -> Any: + """Returns the current record. + + Should only be called after start() or advance() returns True. + + Returns: + The current record. + """ + raise NotImplementedError + + def get_current_timestamp(self) -> timestamp.Timestamp: + """Returns the timestamp of the current record. + + Should only be called after start() or advance() returns True. + The timestamp represents the event time of the record. + + Returns: + A Timestamp object representing when the record was created. + """ + raise NotImplementedError + + def get_watermark(self) -> timestamp.Timestamp: + """Returns the current watermark. + + The watermark is a lower bound on timestamps of future records. + It signals that no records with earlier timestamps will be produced. + + Returns: + A Timestamp representing the current watermark. + """ + raise NotImplementedError + + def get_checkpoint_mark(self) -> CheckpointMark: + """Returns a checkpoint mark for the current position. + + This checkpoint can be used to resume reading from this position in case + of failures or restarts. The checkpoint represents all records that have + been read up to this point. + + Returns: + A CheckpointMark for the current read position. + """ + raise NotImplementedError + + def close(self) -> None: + """Closes the reader and releases resources. + + Should be called when reading is complete to clean up connections, + file handles, or other resources. + """ + pass + + +class UnboundedSource(SourceBase): + """A source that reads an unbounded amount of input. + + An UnboundedSource represents a continuous stream of data, such as: + - Message queues (Kafka, Pub/Sub, Kinesis) + - Database change streams + - Event logs + - Real-time sensors or feeds + + Unlike BoundedSource which reads a finite dataset, UnboundedSource continues + producing data indefinitely. The source is responsible for: + - Creating readers that fetch data continuously + - Tracking watermarks for event time progress + - Supporting checkpointing for fault tolerance + - Splitting into multiple parallel readers if possible + + Example: + class MyUnboundedSource(UnboundedSource): + def reader(self, checkpoint=None): + return MyUnboundedReader(checkpoint) + + def split(self, desired_num_splits): + # Return sub-sources for parallel reading + return [MyUnboundedSource(shard) for shard in shards] + + Note: This is a foundational API. Integration with Read transform and + Splittable DoFn wrappers will be implemented in future work. + """ + def reader( + self, + checkpoint: Optional[CheckpointMark] = None, + ) -> UnboundedReader: + """Returns a reader for this source. + + Args: + checkpoint: Optional checkpoint to resume reading from. If None, + reading starts from the beginning or the default position. + + Returns: + An UnboundedReader that reads from this source. + + Raises: + Exception: if reader creation fails + """ + raise NotImplementedError + + def split( + self, + desired_num_splits: int, + ) -> list['UnboundedSource']: + """Splits this source into approximately desired_num_splits sub-sources. + + Splitting allows parallel reading of the source by creating multiple + independent sub-sources that each read a portion of the data. + For example, a Kafka source might split by partition, or a Pub/Sub + source might split by subscription. + + Args: + desired_num_splits: The desired number of sub-sources. The actual number + returned may be more or less based on the source's characteristics. + + Returns: + A list of UnboundedSource objects that together cover the same data + as this source. Returning [self] is valid for sources that cannot split. + + Raises: + Exception: if splitting fails + """ + # Default implementation: source cannot be split + return [self] + + def is_bounded(self) -> bool: + """Returns False to indicate this is an unbounded source.""" + return False + + class RangeTracker(object): """A thread safe object used by Dataflow source framework. diff --git a/sdks/python/apache_beam/io/iobase_test.py b/sdks/python/apache_beam/io/iobase_test.py index eb9617cfae34..895a3cf5eb55 100644 --- a/sdks/python/apache_beam/io/iobase_test.py +++ b/sdks/python/apache_beam/io/iobase_test.py @@ -220,5 +220,92 @@ def test_sdf_wrap_range_source(self): self._run_sdf_wrapper_pipeline(RangeSource(0, 4), [0, 1, 2, 3]) +class UnboundedSourceTest(unittest.TestCase): + """Basic tests for UnboundedSource, UnboundedReader, and CheckpointMark. + + These tests verify the foundational API structure. Full integration with + Read transform and SDF wrappers will be tested in future implementation. + """ + def test_checkpoint_mark_finalize(self): + """Test that CheckpointMark can be subclassed and finalized.""" + class TestCheckpointMark(iobase.CheckpointMark): + def __init__(self): + self.finalized = False + + def finalize(self): + self.finalized = True + + checkpoint = TestCheckpointMark() + self.assertFalse(checkpoint.finalized) + checkpoint.finalize() + self.assertTrue(checkpoint.finalized) + + def test_unbounded_source_basic_interface(self): + """Test that UnboundedSource can be subclassed with basic methods.""" + from apache_beam.utils import timestamp + + class TestUnboundedSource(iobase.UnboundedSource): + def reader(self, checkpoint=None): + return TestUnboundedReader() + + def default_output_coder(self): + return beam.coders.VarIntCoder() + + class TestUnboundedReader(iobase.UnboundedReader): + def __init__(self): + self.index = -1 + self.data = [1, 2, 3] + + def start(self): + self.index = 0 + return True + + def advance(self): + self.index += 1 + return self.index < len(self.data) + + def get_current(self): + return self.data[self.index] + + def get_current_timestamp(self): + return timestamp.Timestamp.of(self.index) + + def get_watermark(self): + return timestamp.Timestamp.of(self.index) + + def get_checkpoint_mark(self): + return iobase.CheckpointMark() + + source = TestUnboundedSource() + self.assertFalse(source.is_bounded()) + + # Test reader basic operations + reader = source.reader() + self.assertTrue(reader.start()) + self.assertEqual(1, reader.get_current()) + self.assertEqual(timestamp.Timestamp.of(0), reader.get_current_timestamp()) + + self.assertTrue(reader.advance()) + self.assertEqual(2, reader.get_current()) + + self.assertTrue(reader.advance()) + self.assertEqual(3, reader.get_current()) + + self.assertFalse(reader.advance()) + + def test_unbounded_source_split_default(self): + """Test that UnboundedSource.split() returns [self] by default.""" + class SimpleUnboundedSource(iobase.UnboundedSource): + def reader(self, checkpoint=None): + pass + + def default_output_coder(self): + return beam.coders.VarIntCoder() + + source = SimpleUnboundedSource() + splits = source.split(desired_num_splits=10) + self.assertEqual([source], splits) + + if __name__ == '__main__': unittest.main() From a5ba100260b96dd638d4787bceedbf0697fd8bf9 Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Tue, 3 Mar 2026 11:18:11 +0500 Subject: [PATCH 08/12] Update CHANGES.md for UnboundedSource API PR --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 7c971698913d..53cdc1a6c566 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -69,6 +69,7 @@ ## New Features / Improvements +* (Python) Added foundation for UnboundedSource API: CheckpointMark, UnboundedReader, and UnboundedSource abstract base classes enabling support for continuous data streams in Apache Beam ([#37442](https://github.com/apache/beam/issues/37442)). * (Python) Added exception chaining to preserve error context in CloudSQLEnrichmentHandler, processes utilities, and core transforms ([#37422](https://github.com/apache/beam/issues/37422)). * (Python) Added `take(n)` convenience for PCollection: `beam.take(n)` and `pcoll.take(n)` to get the first N elements deterministically without Top.Of + FlatMap ([#X](https://github.com/apache/beam/issues/37429)). * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). From 58b84c406052db5b60c0db2e870bbb054f09479f Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Tue, 3 Mar 2026 17:23:47 +0500 Subject: [PATCH 09/12] Update CHANGES.md entry for UnboundedSource API PR --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 9e55dba0ec06..2f4d926158f1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,7 +68,7 @@ ## New Features / Improvements -* (Python) Added foundation for UnboundedSource API: CheckpointMark, UnboundedReader, and UnboundedSource abstract base classes enabling support for continuous data streams in Apache Beam ([#37442](https://github.com/apache/beam/issues/37442)). +* (Python) Added foundation for UnboundedSource API: CheckpointMark, UnboundedReader, and UnboundedSource abstract base classes enabling support for continuous data streams in Apache Beam ([#19137](https://github.com/apache/beam/issues/19137)). * (Python) Added exception chaining to preserve error context in CloudSQLEnrichmentHandler, processes utilities, and core transforms ([#37422](https://github.com/apache/beam/issues/37422)). * (Python) Added `take(n)` convenience for PCollection: `beam.take(n)` and `pcoll.take(n)` to get the first N elements deterministically without Top.Of + FlatMap ([#X](https://github.com/apache/beam/issues/37429)). * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). From 2c4bf25afce9cc104575acb393a34ca6134dfa2a Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Tue, 3 Mar 2026 17:55:23 +0500 Subject: [PATCH 10/12] Fix import ordering in iobase_test.py for lint compliance - Move timestamp import to top-level imports - Reorder imports alphabetically per isort rules - Remove inline import from test method --- sdks/python/apache_beam/io/iobase_test.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/iobase_test.py b/sdks/python/apache_beam/io/iobase_test.py index 895a3cf5eb55..86b4f7413565 100644 --- a/sdks/python/apache_beam/io/iobase_test.py +++ b/sdks/python/apache_beam/io/iobase_test.py @@ -24,14 +24,15 @@ import mock import apache_beam as beam -from apache_beam.io.concat_source import ConcatSource -from apache_beam.io.concat_source_test import RangeSource from apache_beam.io import iobase from apache_beam.io import range_trackers +from apache_beam.io.concat_source import ConcatSource +from apache_beam.io.concat_source_test import RangeSource from apache_beam.io.iobase import SourceBundle from apache_beam.options.pipeline_options import DebugOptions from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.utils import timestamp class SDFBoundedSourceRestrictionProviderTest(unittest.TestCase): @@ -242,8 +243,6 @@ def finalize(self): def test_unbounded_source_basic_interface(self): """Test that UnboundedSource can be subclassed with basic methods.""" - from apache_beam.utils import timestamp - class TestUnboundedSource(iobase.UnboundedSource): def reader(self, checkpoint=None): return TestUnboundedReader() From e687918438b46fb95449675f06384a1468021e62 Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Tue, 3 Mar 2026 19:46:48 +0500 Subject: [PATCH 11/12] Fix YAPF formatting in UnboundedSourceTest - Remove trailing whitespace from blank lines - Add blank line after class docstring - Clean up spacing around test code - Ensure yapf --diff produces no output --- sdks/python/apache_beam/io/iobase_test.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/iobase_test.py b/sdks/python/apache_beam/io/iobase_test.py index 86b4f7413565..2357ee4aba0b 100644 --- a/sdks/python/apache_beam/io/iobase_test.py +++ b/sdks/python/apache_beam/io/iobase_test.py @@ -223,10 +223,11 @@ def test_sdf_wrap_range_source(self): class UnboundedSourceTest(unittest.TestCase): """Basic tests for UnboundedSource, UnboundedReader, and CheckpointMark. - + These tests verify the foundational API structure. Full integration with Read transform and SDF wrappers will be tested in future implementation. """ + def test_checkpoint_mark_finalize(self): """Test that CheckpointMark can be subclassed and finalized.""" class TestCheckpointMark(iobase.CheckpointMark): @@ -277,19 +278,19 @@ def get_checkpoint_mark(self): source = TestUnboundedSource() self.assertFalse(source.is_bounded()) - + # Test reader basic operations reader = source.reader() self.assertTrue(reader.start()) self.assertEqual(1, reader.get_current()) self.assertEqual(timestamp.Timestamp.of(0), reader.get_current_timestamp()) - + self.assertTrue(reader.advance()) self.assertEqual(2, reader.get_current()) - + self.assertTrue(reader.advance()) self.assertEqual(3, reader.get_current()) - + self.assertFalse(reader.advance()) def test_unbounded_source_split_default(self): From baf8460a5d1259925ec59fc0865cb5ee21d81a9e Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Tue, 31 Mar 2026 00:21:34 +0500 Subject: [PATCH 12/12] Fix --- sdks/python/apache_beam/io/iobase.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index a30d7adb11e9..80b4f3b1587f 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -255,7 +255,8 @@ class CheckpointMark(object): Implementations must be serializable so they can be persisted and restored across pipeline restarts. - Example: + For example:: + class MyCheckpointMark(CheckpointMark): def __init__(self, offset): self.offset = offset @@ -293,7 +294,8 @@ class UnboundedReader(object): 3. get_current_timestamp() - gets timestamp of current record 4. get_checkpoint_mark() - creates a checkpoint for resuming - Example: + For example:: + reader = source.reader() if reader.start(): process(reader.get_current()) @@ -391,7 +393,8 @@ class UnboundedSource(SourceBase): - Supporting checkpointing for fault tolerance - Splitting into multiple parallel readers if possible - Example: + For example:: + class MyUnboundedSource(UnboundedSource): def reader(self, checkpoint=None): return MyUnboundedReader(checkpoint)