Skip to content

Commit 3bd0f51

Browse files
committed
Use registered type for Row
1 parent 532dd85 commit 3bd0f51

2 files changed

Lines changed: 20 additions & 15 deletions

File tree

sdks/python/apache_beam/typehints/row_type_test.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ class RowTypeTest(unittest.TestCase):
3333
@staticmethod
3434
def _check_key_type_and_count(x) -> int:
3535
key_type = type(x[0])
36-
if not row_type._user_type_is_generated(key_type):
37-
raise RuntimeError("Expect type after GBK to be generated user type")
36+
if row_type._user_type_is_generated(key_type):
37+
raise RuntimeError("Type after GBK not preserved, get generated type")
38+
if not hasattr(key_type, row_type._BEAM_SCHEMA_ID):
39+
raise RuntimeError("Type after GBK missing Beam schema ID")
3840

3941
return len(x[1])
4042

sdks/python/apache_beam/typehints/schemas.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -587,19 +587,22 @@ def named_tuple_from_schema(self, schema: schema_pb2.Schema) -> type:
587587
descriptions[field.name] = field.description
588588
subfields.append((field.name, field_py_type))
589589

590-
user_type = NamedTuple(type_name, subfields)
591-
592-
# Define a reduce function, otherwise these types can't be pickled
593-
# (See BEAM-9574)
594-
setattr(
595-
user_type,
596-
'__reduce__',
597-
_named_tuple_reduce_method(schema.SerializeToString()))
598-
setattr(user_type, "_field_descriptions", descriptions)
599-
setattr(user_type, row_type._BEAM_SCHEMA_ID, schema.id)
600-
601-
self.schema_registry.add(user_type, schema)
602-
coders.registry.register_coder(user_type, coders.RowCoder)
590+
if schema.id in self.schema_registry.by_id:
591+
user_type = self.schema_registry.by_id[schema.id][0]
592+
else:
593+
user_type = NamedTuple(type_name, subfields)
594+
595+
# Define a reduce function, otherwise these types can't be pickled
596+
# (See BEAM-9574)
597+
setattr(
598+
user_type,
599+
'__reduce__',
600+
_named_tuple_reduce_method(schema.SerializeToString()))
601+
setattr(user_type, "_field_descriptions", descriptions)
602+
setattr(user_type, row_type._BEAM_SCHEMA_ID, schema.id)
603+
604+
self.schema_registry.add(user_type, schema)
605+
coders.registry.register_coder(user_type, coders.RowCoder)
603606

604607
return user_type
605608

0 commit comments

Comments
 (0)