From 0adeae8b771041b17556669b921c722f8a09aee1 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 12 Feb 2026 12:16:29 -0800 Subject: [PATCH 1/2] Optimize data size estimation for proto coder. --- sdks/python/apache_beam/coders/coder_impl.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 3e0b5218b166..bfebba6ec17a 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -131,6 +131,13 @@ Observables = List[Tuple[observable.ObservableMixin, 'CoderImpl']] +def as_nested_size(unnested_size, nested): + if nested: + return unnested_size + get_varint_size(unnested_size) + else: + return unnested_size + + class CoderImpl(object): """For internal use only; no backwards-compatibility guarantees.""" def encode_to_stream(self, value, stream, nested): @@ -316,6 +323,9 @@ def decode(self, encoded): proto_message.ParseFromString(encoded) # This is in effect "ParsePartial". return proto_message + def estimate_size(self, value, nested=False): + return as_nested_size(value.ByteSize(), nested) + class DeterministicProtoCoderImpl(ProtoCoderImpl): """For internal use only; no backwards-compatibility guarantees.""" @@ -335,6 +345,9 @@ def encode(self, value): def decode(self, value): return self.proto_plus_type.deserialize(value) + def estimate_size(self, value, nested=False): + return as_nested_size(type(value).pb(value).ByteSize(), nested) + UNKNOWN_TYPE = 0xFF NONE_TYPE = 0 From 9a1dedfea250434695c1ef8046accc263cc68ca4 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 13 Feb 2026 12:22:58 -0800 Subject: [PATCH 2/2] Use existing _get_nested_size utility. --- sdks/python/apache_beam/coders/coder_impl.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index bfebba6ec17a..2a3ce2cc8828 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -131,13 +131,6 @@ Observables = List[Tuple[observable.ObservableMixin, 'CoderImpl']] -def as_nested_size(unnested_size, nested): - if nested: - return unnested_size + get_varint_size(unnested_size) - else: - return unnested_size - - class CoderImpl(object): """For internal use only; no backwards-compatibility guarantees.""" def encode_to_stream(self, value, stream, nested): @@ -324,7 +317,7 @@ def decode(self, encoded): return proto_message def estimate_size(self, value, nested=False): - return as_nested_size(value.ByteSize(), nested) + return self._get_nested_size(value.ByteSize(), nested) class DeterministicProtoCoderImpl(ProtoCoderImpl): @@ -346,7 +339,7 @@ def decode(self, value): return self.proto_plus_type.deserialize(value) def estimate_size(self, value, nested=False): - return as_nested_size(type(value).pb(value).ByteSize(), nested) + return self._get_nested_size(type(value).pb(value).ByteSize(), nested) UNKNOWN_TYPE = 0xFF