diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 6060ff8d54a8..82e23c9343aa 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -1454,7 +1454,14 @@ def wait(self, timeout=None): def get(self, timeout=None): # type: (Optional[float]) -> T - return self._func(*(arg.get(timeout) for arg in self._args)) + # List comprehension, not generator: *(gen) causes CPython to build the + # argument tuple incrementally via _PyTuple_Resize, which asserts + # Py_REFCNT(v)==1. A GC cycle between yields can increment that refcount, + # raising SystemError (Objects/tupleobject.c:927). See + # https://github.com/python/cpython/issues/127058 (fixed in 3.14.0a3+: + # https://github.com/python/cpython/commit/5a23994). *[list] allocates the + # tuple once at its final size, avoiding the resize entirely. + return self._func(*[arg.get(timeout) for arg in self._args]) def set(self, value): # type: (T) -> _Future[T] diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py index 7b53f274cac2..76e428f06464 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py @@ -704,6 +704,38 @@ def testShortIdAssignment(self): % case.info) +class DeferredCallTest(unittest.TestCase): + """Tests for _DeferredCall.get().""" + def test_get_single_arg(self): + f = sdk_worker._Future().set(42) + call = sdk_worker._DeferredCall(lambda x: x, f) + self.assertEqual(call.get(), 42) + + def test_get_multiple_args(self): + futures = [sdk_worker._Future().set(i) for i in range(5)] + call = sdk_worker._DeferredCall(lambda *args: sum(args), *futures) + self.assertEqual(call.get(), sum(range(5))) + + def test_get_non_future_args_are_wrapped(self): + # __init__ wraps non-Future values in _Future().set(v); get() must work. + call = sdk_worker._DeferredCall(lambda x, y: x * y, 3, 7) + self.assertEqual(call.get(), 21) + + def test_get_mixed_future_and_value_args(self): + a = sdk_worker._Future().set(10) + call = sdk_worker._DeferredCall(lambda x, y: x + y, a, 5) + self.assertEqual(call.get(), 15) + + def test_get_zero_args(self): + call = sdk_worker._DeferredCall(lambda: 99) + self.assertEqual(call.get(), 99) + + def test_get_preserves_return_value_type(self): + f = sdk_worker._Future().set({'key': 'val'}) + call = sdk_worker._DeferredCall(lambda d: d, f) + self.assertEqual(call.get(), {'key': 'val'}) + + def monitoringInfoMetadata(info): return { descriptor.name: value