diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 94a467d5a249..e081185b5507 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -218,14 +218,12 @@ def run( """Run the job""" try: state_stream = self.job_service.GetStateStream( - beam_job_api_pb2.GetJobStateRequest(job_id=preparation_id), - timeout=self.timeout) + beam_job_api_pb2.GetJobStateRequest(job_id=preparation_id)) # If there's an error, we don't always get it until we try to read. # Fortunately, there's always an immediate current state published. state_stream = itertools.chain([next(state_stream)], state_stream) message_stream = self.job_service.GetMessageStream( - beam_job_api_pb2.JobMessagesRequest(job_id=preparation_id), - timeout=self.timeout) + beam_job_api_pb2.JobMessagesRequest(job_id=preparation_id)) except Exception: # TODO(https://github.com/apache/beam/issues/19284): Unify preparation_id # and job_id for all runners. diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py index f83697732598..843db0170b23 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py @@ -55,6 +55,13 @@ def new_pipeline(): pickle_library='cloudpickle')) +def new_pipeline_expand_test(): + return beam.Pipeline( + runner='FnApiRunner', + options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) + + @unittest.skipIf(jsonschema is None, "Yaml dependencies not installed") class MainTest(unittest.TestCase): def assertYaml(self, expected, result): @@ -1048,7 +1055,7 @@ def test_expand_pipeline_with_pipeline_key_only(self): elements: [1,2,3] - type: LogForTesting ''' - with new_pipeline() as p: + with new_pipeline_expand_test() as p: expand_pipeline(p, spec, validate_schema=None) def test_expand_pipeline_with_pipeline_and_option_keys(self): @@ -1063,7 +1070,7 @@ def test_expand_pipeline_with_pipeline_and_option_keys(self): options: streaming: false ''' - with new_pipeline() as p: + with new_pipeline_expand_test() as p: expand_pipeline(p, spec, validate_schema=None) def test_expand_pipeline_with_extra_top_level_keys(self): @@ -1082,7 +1089,7 @@ def test_expand_pipeline_with_extra_top_level_keys(self): other_metadata: "This is an ignored comment." ''' - with new_pipeline() as p: + with new_pipeline_expand_test() as p: expand_pipeline(p, spec, validate_schema=None) def test_expand_pipeline_with_incorrect_pipelines_key_fails(self): @@ -1095,7 +1102,7 @@ def test_expand_pipeline_with_incorrect_pipelines_key_fails(self): elements: [1,2,3] - type: LogForTesting ''' - with new_pipeline() as p: + with new_pipeline_expand_test() as p: with self.assertRaises(KeyError): expand_pipeline(p, spec, validate_schema=None) @@ -1110,7 +1117,7 @@ def test_expand_pipeline_with_valid_schema(self): elements: [1,2,3] - type: LogForTesting ''' - with new_pipeline() as p: + with new_pipeline_expand_test() as p: expand_pipeline(p, spec, validate_schema='generic') @unittest.skipIf(jsonschema is None, "Yaml dependencies not installed") @@ -1124,7 +1131,7 @@ def test_expand_pipeline_with_invalid_schema(self): elements: [1,2,3] - type: LogForTesting ''' - with new_pipeline() as p: + with new_pipeline_expand_test() as p: with self.assertRaises(jsonschema.ValidationError): expand_pipeline(p, spec, validate_schema='generic') diff --git a/sdks/python/scripts/run_tox_cleanup.sh b/sdks/python/scripts/run_tox_cleanup.sh index be4409525b53..89f5a6c61810 100755 --- a/sdks/python/scripts/run_tox_cleanup.sh +++ b/sdks/python/scripts/run_tox_cleanup.sh @@ -35,7 +35,7 @@ set -e for dir in apache_beam target/build; do if [ -d "${dir}" ]; then for ext in pyc c so; do - find ${dir} -type f -name "*.${ext}" -delete + find ${dir} -type f -name "*.${ext}" -delete || true done fi done