From 29b50cd94b5087ae9887cb04c40984e9e37cb5a8 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Tue, 23 Dec 2025 14:15:44 +0100 Subject: [PATCH 1/3] Initial implementation of CWL support in run_udf. https://github.com/Open-EO/openeo-geopyspark-driver/issues/1476 --- openeo_driver/ProcessGraphDeserializer.py | 8 +++++++- openeo_driver/backend.py | 10 ++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/openeo_driver/ProcessGraphDeserializer.py b/openeo_driver/ProcessGraphDeserializer.py index c4cebc10..e8839f90 100644 --- a/openeo_driver/ProcessGraphDeserializer.py +++ b/openeo_driver/ProcessGraphDeserializer.py @@ -1769,7 +1769,7 @@ def merge_cubes(args: ProcessArgs, env: EvalEnv) -> DriverDataCube: def run_udf(args: ProcessArgs, env: EvalEnv): # TODO: note: this implements a non-standard usage of `run_udf`: processing "vector" cube (direct JSON or from aggregate_spatial, ...) dry_run_tracer: DryRunDataTracer = env.get(ENV_DRY_RUN_TRACER) - data = args.get_required(name="data") + data = args.get_optional(name="data") udf, runtime = _get_udf(args, env=env) context = args.get_optional(name="context", default={}) @@ -1812,6 +1812,12 @@ def run_udf(args: ProcessArgs, env: EvalEnv): structured_data_list=[openeo.udf.StructuredData(description="Data list", data=data, type="list")], user_context=context ) + elif runtime.lower() == "CWL-Calrissian".lower(): + return env.backend_implementation.run_cwl( + env, + udf, + context, + ) else: raise ProcessParameterInvalidException( parameter="data", diff --git a/openeo_driver/backend.py b/openeo_driver/backend.py index f60be53b..fc49192b 100644 --- a/openeo_driver/backend.py +++ b/openeo_driver/backend.py @@ -949,6 +949,16 @@ def load_ml_model(self, job_id: str) -> DriverMlModel: def vector_to_raster(self, input_vector_cube: DriverVectorCube, target: DriverDataCube) -> DriverDataCube: raise NotImplementedError + def run_cwl( + self, + env: EvalEnv, + cwl_url: str, + context: dict, + stac_root: Optional[str] = None, + direct_s3_mode: Optional[bool] = False, + ) -> DriverDataCube: + raise NotImplementedError + def visit_process_graph(self, process_graph: dict) -> ProcessGraphVisitor: """Create a process graph visitor and accept given process graph""" return ProcessGraphVisitor().accept_process_graph(process_graph) From 5418e257408c6c58ba59f27168a718c5207c39f4 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Mon, 5 Jan 2026 19:48:13 +0100 Subject: [PATCH 2/3] Better arguments for run_cwl. Use "EOAP-CWL". https://github.com/Open-EO/openeo-geopyspark-driver/issues/1476 --- openeo_driver/ProcessGraphDeserializer.py | 14 ++++++++------ openeo_driver/backend.py | 6 +++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/openeo_driver/ProcessGraphDeserializer.py b/openeo_driver/ProcessGraphDeserializer.py index e8839f90..8232723e 100644 --- a/openeo_driver/ProcessGraphDeserializer.py +++ b/openeo_driver/ProcessGraphDeserializer.py @@ -1781,6 +1781,14 @@ def run_udf(args: ProcessArgs, env: EvalEnv): # This way a weak_spatial_extent can be calculated from the UDF's output. return data.run_udf() + if runtime.lower() == "EOAP-CWL".lower(): + return env.backend_implementation.run_cwl( + data=data, + env=env, + udf=udf, + context=context, + ) + if env.get("validation", False): raise FeatureUnsupportedException("run_udf is not supported in validation mode.") @@ -1812,12 +1820,6 @@ def run_udf(args: ProcessArgs, env: EvalEnv): structured_data_list=[openeo.udf.StructuredData(description="Data list", data=data, type="list")], user_context=context ) - elif runtime.lower() == "CWL-Calrissian".lower(): - return env.backend_implementation.run_cwl( - env, - udf, - context, - ) else: raise ProcessParameterInvalidException( parameter="data", diff --git a/openeo_driver/backend.py b/openeo_driver/backend.py index fc49192b..727f9081 100644 --- a/openeo_driver/backend.py +++ b/openeo_driver/backend.py @@ -951,11 +951,11 @@ def vector_to_raster(self, input_vector_cube: DriverVectorCube, target: DriverDa def run_cwl( self, + *, + data, # Will be None at atm env: EvalEnv, - cwl_url: str, + cwl: str, context: dict, - stac_root: Optional[str] = None, - direct_s3_mode: Optional[bool] = False, ) -> DriverDataCube: raise NotImplementedError From 707048cf8ba31264d6b7ccd1a3d6ca8d578e85fc Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Mon, 5 Jan 2026 20:38:44 +0100 Subject: [PATCH 3/3] udf->cwl --- openeo_driver/ProcessGraphDeserializer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo_driver/ProcessGraphDeserializer.py b/openeo_driver/ProcessGraphDeserializer.py index 8232723e..132b7a34 100644 --- a/openeo_driver/ProcessGraphDeserializer.py +++ b/openeo_driver/ProcessGraphDeserializer.py @@ -1785,7 +1785,7 @@ def run_udf(args: ProcessArgs, env: EvalEnv): return env.backend_implementation.run_cwl( data=data, env=env, - udf=udf, + cwl=udf, context=context, )