From e5b3971391b80260291a9968152e7ab8c6a991f9 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Mon, 9 Feb 2026 17:29:25 -0500 Subject: [PATCH 1/2] Fix DoOutputsTuple composite output registration for lazily-populated _pcolls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous code iterated over _pcolls.items() to register composite outputs, but _pcolls is lazily populated — entries are only added when accessed via __getitem__. This caused unaccessed outputs (e.g. the main output) to be missing from the composite's registered outputs in the pipeline proto. Fix by iterating over all declared tags (_main_tag + _tags) and accessing each via __getitem__ to ensure lazy creation. Co-Authored-By: Claude Opus 4.6 --- sdks/python/apache_beam/pipeline.py | 5 +-- sdks/python/apache_beam/pipeline_test.py | 44 ++++++++++++++++++++++-- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 87d63d423156..8ad16b375906 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -843,9 +843,10 @@ def apply( assert isinstance(result.producer.inputs, tuple) if isinstance(result, pvalue.DoOutputsTuple): - for tag, pc in list(result._pcolls.items()): + all_tags = [result._main_tag] + list(result._tags) + for tag in all_tags: if tag not in current.outputs: - current.add_output(pc, tag) + current.add_output(result[tag], tag) continue # If there is already a tag with the same name, increase a counter for diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 3e7d083cb2fb..46e6c3de6d6f 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -1646,9 +1646,47 @@ def expand(self, pcoll): all_applied_transforms[xform.full_label] = xform current_transforms.extend(xform.parts) xform = all_applied_transforms['Split Sales'] - # Confirm that Split Sales correctly has two outputs as specified by - # ParDo.with_outputs in ParentSalesSplitter. - assert len(xform.outputs) == 2 + # Confirm that Split Sales correctly has three outputs: the main + # (untagged) output plus the two tagged outputs specified by + # ParDo.with_outputs in ParentSalesSplitter. + assert len(xform.outputs) == 3 + + def test_do_outputs_tuple_subclass_registers_all_outputs(self): + """Test that a composite returning a DoOutputsTuple subclass registers + all declared outputs, not just those lazily accessed via _pcolls.""" + class PipeToMain(beam.pvalue.DoOutputsTuple): + """Wrapper enabling: composite | Next (pipes to main output).""" + def __init__(self, wrapped): + self.__dict__.update(wrapped.__dict__) + + def __or__(self, other): + return self[self._main_tag].__or__(other) + + class MyComposite(beam.PTransform): + def expand(self, pcoll): + return PipeToMain( + pcoll | beam.ParDo(beam.DoFn()).with_outputs('dropped')) + + p = beam.Pipeline() + result = p | beam.Create([1]) | 'Composite' >> MyComposite() + _ = result | 'UseMain' >> beam.Map(lambda x: x) + + proto = p.to_runner_api() + composite_outputs = None + consumed_pcoll = None + for t in proto.components.transforms.values(): + if t.unique_name == 'Composite': + composite_outputs = dict(t.outputs) + if t.unique_name == 'UseMain': + consumed_pcoll = list(t.inputs.values())[0] + + self.assertIsNotNone(composite_outputs) + self.assertIsNotNone(consumed_pcoll) + self.assertIn( + consumed_pcoll, + composite_outputs.values(), + "PCollection consumed by downstream transform must be in " + "composite's registered outputs") if __name__ == '__main__': From c7880b97b83b5b9c99736de030828a2f8acfd295 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Tue, 10 Feb 2026 16:46:03 -0500 Subject: [PATCH 2/2] Remove separate regression test; existing test covers the fix Co-Authored-By: Claude Opus 4.6 --- sdks/python/apache_beam/pipeline_test.py | 37 ------------------------ 1 file changed, 37 deletions(-) diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 46e6c3de6d6f..4b61902c2870 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -1651,43 +1651,6 @@ def expand(self, pcoll): # ParDo.with_outputs in ParentSalesSplitter. assert len(xform.outputs) == 3 - def test_do_outputs_tuple_subclass_registers_all_outputs(self): - """Test that a composite returning a DoOutputsTuple subclass registers - all declared outputs, not just those lazily accessed via _pcolls.""" - class PipeToMain(beam.pvalue.DoOutputsTuple): - """Wrapper enabling: composite | Next (pipes to main output).""" - def __init__(self, wrapped): - self.__dict__.update(wrapped.__dict__) - - def __or__(self, other): - return self[self._main_tag].__or__(other) - - class MyComposite(beam.PTransform): - def expand(self, pcoll): - return PipeToMain( - pcoll | beam.ParDo(beam.DoFn()).with_outputs('dropped')) - - p = beam.Pipeline() - result = p | beam.Create([1]) | 'Composite' >> MyComposite() - _ = result | 'UseMain' >> beam.Map(lambda x: x) - - proto = p.to_runner_api() - composite_outputs = None - consumed_pcoll = None - for t in proto.components.transforms.values(): - if t.unique_name == 'Composite': - composite_outputs = dict(t.outputs) - if t.unique_name == 'UseMain': - consumed_pcoll = list(t.inputs.values())[0] - - self.assertIsNotNone(composite_outputs) - self.assertIsNotNone(consumed_pcoll) - self.assertIn( - consumed_pcoll, - composite_outputs.values(), - "PCollection consumed by downstream transform must be in " - "composite's registered outputs") - if __name__ == '__main__': unittest.main()