From 3d90b236c8828121e6c9682bdcc273326fc2ecb6 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 12 Feb 2026 13:13:03 -0500 Subject: [PATCH] Run Python Flink PVR tests on DataStream API --- .../beam_PostCommit_Python_Portable_Flink.yml | 5 +++-- .../sql/expansion-service/build.gradle | 1 - .../runners/portability/flink_runner_test.py | 19 +++++++++++++++---- .../python/test-suites/portable/common.gradle | 7 ++++++- 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml b/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml index f3c032ebffe2..2fee4fb7dfdc 100644 --- a/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml +++ b/.github/workflows/beam_PostCommit_Python_Portable_Flink.yml @@ -63,8 +63,9 @@ jobs: job_name: ["beam_PostCommit_Python_Portable_Flink"] job_phrase: ["Run Python Portable Flink"] # TODO: Enable PROCESS https://github.com/apache/beam/issues/35702 - # environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS'] - environment_type: ['DOCKER', 'LOOPBACK'] + # environment_type: [..., 'PROCESS'] + # Run environment types not covered by PreCommit (LOOPBACK) + environment_type: ['DOCKER', 'LOOPBACK_DATASTREAM'] steps: - uses: actions/checkout@v4 - name: Setup repository diff --git a/sdks/java/extensions/sql/expansion-service/build.gradle b/sdks/java/extensions/sql/expansion-service/build.gradle index 8b5bd8c69240..562c1ac8dc76 100644 --- a/sdks/java/extensions/sql/expansion-service/build.gradle +++ b/sdks/java/extensions/sql/expansion-service/build.gradle @@ -56,5 +56,4 @@ shadowJar { manifest { attributes(["Multi-Release": true]) } - outputs.upToDateWhen { false } } diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index dbeef557ab5a..3cec2bb6b2fb 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -18,6 +18,7 @@ import argparse import logging +import platform import shlex import typing import unittest @@ -106,6 +107,12 @@ def parse_options(self, request): 'For PROCESS: process_command (required), process_variables ' '(optional, comma-separated)\n ' 'For EXTERNAL: external_service_address (required)')) + parser.add_argument( + '--use_data_stream_for_batch', + help=( + 'Use Flink DataStream API for batch pipeline. Only effective for ' + 'Flink 1.x. DataStream API always uses DataStream API.'), + action='store_true') known_args, unknown_args = parser.parse_known_args( shlex.split(test_pipeline_options)) if unknown_args: @@ -117,6 +124,7 @@ def parse_options(self, request): FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]))) self.environment_type = known_args.environment_type self.environment_options = known_args.environment_options + self.use_data_stream_for_batch = known_args.use_data_stream_for_batch @classmethod def tearDownClass(cls): @@ -158,11 +166,12 @@ def _subprocess_command(cls, job_port, expansion_port): cls._create_conf_dir() cls.expansion_port = expansion_port - + platform_specific_opts = [] + if platform.system() == 'Linux': + # UseContainerSupport is supported in Linux and turned on by default + platform_specific_opts.append('-XX:-UseContainerSupport') try: - return [ - 'java', - '-XX:-UseContainerSupport', + return ['java'] + platform_specific_opts + [ '--add-opens=java.base/java.lang=ALL-UNNAMED', '--add-opens=java.base/java.nio=ALL-UNNAMED', '--add-opens=java.base/java.util=ALL-UNNAMED', @@ -210,6 +219,8 @@ def create_options(self): options._all_options['checkpointing_interval'] = 3000 options._all_options['shutdown_sources_after_idle_ms'] = 60000 options._all_options['number_of_execution_retries'] = 1 + if self.use_data_stream_for_batch: + options._all_options['use_data_stream_for_batch'] = True return options diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index f7fa3e8e0b26..64e441ba4cc5 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -29,7 +29,7 @@ ext { pythonContainerTask = ":sdks:python:container:py${pythonVersionSuffix}:docker" } -def createFlinkRunnerTestTask(String workerType) { +def createFlinkRunnerTestTask(String workerType, boolean useDataStreamForBatch = false) { def taskName = "flinkCompatibilityMatrix${workerType}" // project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath is not resolvable until runtime, so hard-code it here. def jobServerJar = "${rootDir}/runners/flink/${latestFlinkVersion}/job-server/build/libs/beam-runners-flink-${latestFlinkVersion}-job-server-${version}.jar" @@ -37,6 +37,10 @@ def createFlinkRunnerTestTask(String workerType) { if (workerType == 'PROCESS') { options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh" } + if (useDataStreamForBatch) { + options += " --use_data_stream_for_batch" + taskName += "_DATASTREAM" + } def task = toxTask(taskName, 'flink-runner-test', options) // Through the Flink job server, we transitively add dependencies on the expansion services needed in tests. task.configure { @@ -56,6 +60,7 @@ def createFlinkRunnerTestTask(String workerType) { createFlinkRunnerTestTask('DOCKER') createFlinkRunnerTestTask('PROCESS') createFlinkRunnerTestTask('LOOPBACK') +createFlinkRunnerTestTask('LOOPBACK', true) task flinkValidatesRunner() { dependsOn 'flinkCompatibilityMatrixLOOPBACK'