diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 87d63d423156..8ad16b375906 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -843,9 +843,10 @@ def apply( assert isinstance(result.producer.inputs, tuple) if isinstance(result, pvalue.DoOutputsTuple): - for tag, pc in list(result._pcolls.items()): + all_tags = [result._main_tag] + list(result._tags) + for tag in all_tags: if tag not in current.outputs: - current.add_output(pc, tag) + current.add_output(result[tag], tag) continue # If there is already a tag with the same name, increase a counter for diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 3e7d083cb2fb..4b61902c2870 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -1646,9 +1646,10 @@ def expand(self, pcoll): all_applied_transforms[xform.full_label] = xform current_transforms.extend(xform.parts) xform = all_applied_transforms['Split Sales'] - # Confirm that Split Sales correctly has two outputs as specified by - # ParDo.with_outputs in ParentSalesSplitter. - assert len(xform.outputs) == 2 + # Confirm that Split Sales correctly has three outputs: the main + # (untagged) output plus the two tagged outputs specified by + # ParDo.with_outputs in ParentSalesSplitter. + assert len(xform.outputs) == 3 if __name__ == '__main__':