Skip to content

[Bug]: Python pipeline having side input fails on Flink runner when useDataStreamForBatch and pre_optimization #37583

@Abacn

Description

@Abacn

What happened?

It is found Python PVR tests failing on Flink 2.0:

#37313 (comment)

The issue also exists in Flink 1.x runner when running same tests setting --use_data_stream_for_batch flag. Example PR: #37582

To reproduce, run

./gradlew :runners:flink:1.20:job-server:shadowJar

then under sdks/python, run

pip install -e .

pytest -v apache_beam/runners/portability/flink_runner_test.py::FlinkRunnerTestOptimized::test_batch_pardo_fusion_break --test-pipeline-options='--flink_job_server_jar=<Path-to-Beam>/runners/flink/1.20/job-server/build/libs/beam-runners-flink-1.20-job-server-<ver>-SNAPSHOT.jar --environment_type=LOOPBACK --use_data_stream_for_batch'

Error:

SEVERE: Error during job invocation test_batch_pardo_fusion_break_1770919885.566644_96979229-2600-4531-b129-1298267665f0.
java.lang.IllegalArgumentException
	at org.apache.beam.model.pipeline.v1.RunnerApi$Components.getTransformsOrThrow(RunnerApi.java:1038)
	at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.getSideInputIdToPCollectionViewMap(FlinkStreamingPortablePipelineTranslator.java:990)
	at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.transformSideInputs(FlinkStreamingPortablePipelineTranslator.java:1032)
	at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateExecutableStage(FlinkStreamingPortablePipelineTranslator.java:754)
	at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:280)
	at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:127)
	at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:105)
	at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:95)
	at org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86)
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:75)
	at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions