Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/beam_PostCommit_Python_Portable_Flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ jobs:
job_name: ["beam_PostCommit_Python_Portable_Flink"]
job_phrase: ["Run Python Portable Flink"]
# TODO: Enable PROCESS https://github.com/apache/beam/issues/35702
# environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS']
environment_type: ['DOCKER', 'LOOPBACK']
# environment_type: [..., 'PROCESS']
# Run environment types not covered by PreCommit (LOOPBACK)
environment_type: ['DOCKER', 'LOOPBACK_DATASTREAM']
steps:
- uses: actions/checkout@v4
- name: Setup repository
Expand Down
1 change: 0 additions & 1 deletion sdks/java/extensions/sql/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,4 @@ shadowJar {
manifest {
attributes(["Multi-Release": true])
}
outputs.upToDateWhen { false }
}
19 changes: 15 additions & 4 deletions sdks/python/apache_beam/runners/portability/flink_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import argparse
import logging
import platform
import shlex
import typing
import unittest
Expand Down Expand Up @@ -106,6 +107,12 @@ def parse_options(self, request):
'For PROCESS: process_command (required), process_variables '
'(optional, comma-separated)\n '
'For EXTERNAL: external_service_address (required)'))
parser.add_argument(
'--use_data_stream_for_batch',
help=(
'Use Flink DataStream API for batch pipeline. Only effective for '
'Flink 1.x. DataStream API always uses DataStream API.'),
action='store_true')
known_args, unknown_args = parser.parse_known_args(
shlex.split(test_pipeline_options))
if unknown_args:
Expand All @@ -117,6 +124,7 @@ def parse_options(self, request):
FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1])))
self.environment_type = known_args.environment_type
self.environment_options = known_args.environment_options
self.use_data_stream_for_batch = known_args.use_data_stream_for_batch

@classmethod
def tearDownClass(cls):
Expand Down Expand Up @@ -158,11 +166,12 @@ def _subprocess_command(cls, job_port, expansion_port):

cls._create_conf_dir()
cls.expansion_port = expansion_port

platform_specific_opts = []
if platform.system() == 'Linux':
# UseContainerSupport is supported in Linux and turned on by default
platform_specific_opts.append('-XX:-UseContainerSupport')
try:
return [
'java',
'-XX:-UseContainerSupport',
return ['java'] + platform_specific_opts + [
'--add-opens=java.base/java.lang=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED',
'--add-opens=java.base/java.util=ALL-UNNAMED',
Expand Down Expand Up @@ -210,6 +219,8 @@ def create_options(self):
options._all_options['checkpointing_interval'] = 3000
options._all_options['shutdown_sources_after_idle_ms'] = 60000
options._all_options['number_of_execution_retries'] = 1
if self.use_data_stream_for_batch:
options._all_options['use_data_stream_for_batch'] = True

return options

Expand Down
7 changes: 6 additions & 1 deletion sdks/python/test-suites/portable/common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@ ext {
pythonContainerTask = ":sdks:python:container:py${pythonVersionSuffix}:docker"
}

def createFlinkRunnerTestTask(String workerType) {
def createFlinkRunnerTestTask(String workerType, boolean useDataStreamForBatch = false) {
def taskName = "flinkCompatibilityMatrix${workerType}"
// project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath is not resolvable until runtime, so hard-code it here.
def jobServerJar = "${rootDir}/runners/flink/${latestFlinkVersion}/job-server/build/libs/beam-runners-flink-${latestFlinkVersion}-job-server-${version}.jar"
def options = "--flink_job_server_jar=${jobServerJar} --environment_type=${workerType}"
if (workerType == 'PROCESS') {
options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh"
}
if (useDataStreamForBatch) {
options += " --use_data_stream_for_batch"
taskName += "_DATASTREAM"
}
def task = toxTask(taskName, 'flink-runner-test', options)
// Through the Flink job server, we transitively add dependencies on the expansion services needed in tests.
task.configure {
Expand All @@ -56,6 +60,7 @@ def createFlinkRunnerTestTask(String workerType) {
createFlinkRunnerTestTask('DOCKER')
createFlinkRunnerTestTask('PROCESS')
createFlinkRunnerTestTask('LOOPBACK')
createFlinkRunnerTestTask('LOOPBACK', true)

task flinkValidatesRunner() {
dependsOn 'flinkCompatibilityMatrixLOOPBACK'
Expand Down
Loading