From 3252eb205a5ebcd5712b5d7184556e47c68e6da0 Mon Sep 17 00:00:00 2001 From: Maxime Toussaint Date: Tue, 16 Dec 2025 11:10:24 -0500 Subject: [PATCH 1/5] Support parameter reference resolution in service generator Enhances the Python service generator to resolve parameter references from OpenAPI components, ensuring referenced parameters are correctly included in generated service methods. Adds internal helper for reference resolution, updates function signatures to accept components, and introduces tests to verify correct parameter resolution for both OpenAPI 3.0 and 3.1. --- .../language_converters/python/generator.py | 2 +- .../python/service_generator.py | 188 ++++++++---------- tests/test_parameter_reference_resolution.py | 188 ++++++++++++++++++ 3 files changed, 274 insertions(+), 104 deletions(-) create mode 100644 tests/test_parameter_reference_resolution.py diff --git a/src/openapi_python_generator/language_converters/python/generator.py b/src/openapi_python_generator/language_converters/python/generator.py index 0760ff3..c73a357 100644 --- a/src/openapi_python_generator/language_converters/python/generator.py +++ b/src/openapi_python_generator/language_converters/python/generator.py @@ -41,7 +41,7 @@ def generator( models = [] if data.paths is not None: - services = generate_services(data.paths, library_config) + services = generate_services(data.paths, library_config, data.components) else: services = [] diff --git a/src/openapi_python_generator/language_converters/python/service_generator.py b/src/openapi_python_generator/language_converters/python/service_generator.py index 613039e..b0e8720 100644 --- a/src/openapi_python_generator/language_converters/python/service_generator.py +++ b/src/openapi_python_generator/language_converters/python/service_generator.py @@ -37,6 +37,8 @@ Schema as Schema31, ) from openapi_pydantic.v3.v3_1.parameter import Parameter as Parameter31 +from openapi_pydantic.v3.v3_0 import Components as Components30 +from openapi_pydantic.v3.v3_1 import Components as Components31 from openapi_python_generator.language_converters.python import common from openapi_python_generator.language_converters.python.common import normalize_symbol @@ -54,6 +56,32 @@ TypeConversion, ) +# Type alias for Components +Components = Union[Components30, Components31] + +# Module-level storage for component parameters (set by generate_services) +_component_params: Optional[Dict[str, Union[Parameter30, Parameter31]]] = None + + +def _resolve_parameter_ref( + param: Union[Parameter30, Parameter31, Reference30, Reference31], +) -> Optional[Union[Parameter30, Parameter31]]: + if isinstance(param, (Parameter30, Parameter31)): + return param + + if isinstance(param, (Reference, Reference30, Reference31)): + if _component_params is None: + return None + # Extract parameter name from $ref like "#/components/parameters/LangParameter" + ref_str = getattr(param, "ref", None) + if ref_str and ref_str.startswith("#/components/parameters/"): + param_name = ref_str.split("/")[-1] + resolved = _component_params.get(param_name) + if resolved is not None: + return resolved + + return None + # Helper functions for isinstance checks across OpenAPI versions def is_response_type(obj) -> bool: @@ -97,9 +125,7 @@ def generate_body_param(operation: Operation) -> Union[str, None]: if operation.requestBody is None: return None else: - if isinstance(operation.requestBody, Reference30) or isinstance( - operation.requestBody, Reference31 - ): + if isinstance(operation.requestBody, Reference30) or isinstance(operation.requestBody, Reference31): return "data.dict()" if operation.requestBody.content is None: @@ -113,9 +139,7 @@ def generate_body_param(operation: Operation) -> Union[str, None]: if media_type is None: return None # pragma: no cover - if isinstance( - media_type.media_type_schema, (Reference, Reference30, Reference31) - ): + if isinstance(media_type.media_type_schema, (Reference, Reference30, Reference31)): return "data.dict()" elif hasattr(media_type.media_type_schema, "ref"): # Handle Reference objects from different OpenAPI versions @@ -127,9 +151,7 @@ def generate_body_param(operation: Operation) -> Union[str, None]: elif schema.type == "object": return "data" else: - raise Exception( - f"Unsupported schema type for request body: {schema.type}" - ) # pragma: no cover + raise Exception(f"Unsupported schema type for request body: {schema.type}") # pragma: no cover else: raise Exception( f"Unsupported schema type for request body: {type(media_type.media_type_schema)}" @@ -153,32 +175,26 @@ def _generate_params_from_content(content: Any): default_params = "" if operation.parameters is not None: for param in operation.parameters: - if not isinstance(param, (Parameter30, Parameter31)): - continue # pragma: no cover + # Resolve parameter references to their actual definitions + resolved_param = _resolve_parameter_ref(param) + if resolved_param is None: + continue # Skip if we can't resolve the reference + param = resolved_param converted_result = "" required = False param_name_cleaned = common.normalize_symbol(param.name) - if isinstance(param.param_schema, Schema30) or isinstance( - param.param_schema, Schema31 - ): + if isinstance(param.param_schema, Schema30) or isinstance(param.param_schema, Schema31): converted_result = ( f"{param_name_cleaned} : {type_converter(param.param_schema, param.required).converted_type}" + ("" if param.required else " = None") ) required = param.required - elif isinstance(param.param_schema, Reference30) or isinstance( - param.param_schema, Reference31 - ): - converted_result = ( - f"{param_name_cleaned} : {param.param_schema.ref.split('/')[-1] }" - + ( - "" - if isinstance(param, Reference30) - or isinstance(param, Reference31) - or param.required - else " = None" - ) + elif isinstance(param.param_schema, Reference30) or isinstance(param.param_schema, Reference31): + converted_result = f"{param_name_cleaned} : {param.param_schema.ref.split('/')[-1]}" + ( + "" + if isinstance(param, Reference30) or isinstance(param, Reference31) or param.required + else " = None" ) required = isinstance(param, Reference) or param.required @@ -194,17 +210,11 @@ def _generate_params_from_content(content: Any): "application/octet-stream", ] - if operation.requestBody is not None and not is_reference_type( - operation.requestBody - ): + if operation.requestBody is not None and not is_reference_type(operation.requestBody): # Safe access only if it's a concrete RequestBody object rb_content = getattr(operation.requestBody, "content", None) - if isinstance(rb_content, dict) and any( - rb_content.get(i) is not None for i in operation_request_body_types - ): - get_keyword = [ - i for i in operation_request_body_types if rb_content.get(i) - ][0] + if isinstance(rb_content, dict) and any(rb_content.get(i) is not None for i in operation_request_body_types): + get_keyword = [i for i in operation_request_body_types if rb_content.get(i)][0] content = rb_content.get(get_keyword) if content is not None and hasattr(content, "media_type_schema"): mts = getattr(content, "media_type_schema", None) @@ -214,9 +224,7 @@ def _generate_params_from_content(content: Any): ): params += f"{_generate_params_from_content(mts)}, " else: # pragma: no cover - raise Exception( - f"Unsupported media type schema for {str(operation)}: {type(mts)}" - ) + raise Exception(f"Unsupported media type schema for {str(operation)}: {type(mts)}") # else: silently ignore unsupported body shapes (could extend later) # Replace - with _ in params params = params.replace("-", "_") @@ -225,9 +233,7 @@ def _generate_params_from_content(content: Any): return params + default_params -def generate_operation_id( - operation: Operation, http_op: str, path_name: Optional[str] = None -) -> str: +def generate_operation_id(operation: Operation, http_op: str, path_name: Optional[str] = None) -> str: if operation.operationId is not None: return common.normalize_symbol(operation.operationId) elif path_name is not None: @@ -238,17 +244,18 @@ def generate_operation_id( ) # pragma: no cover -def _generate_params( - operation: Operation, param_in: Literal["query", "header"] = "query" -): +def _generate_params(operation: Operation, param_in: Literal["query", "header"] = "query"): if operation.parameters is None: return [] params = [] for param in operation.parameters: - if isinstance(param, (Parameter30, Parameter31)) and param.param_in == param_in: - param_name_cleaned = common.normalize_symbol(param.name) - params.append(f"{param.name!r} : {param_name_cleaned}") + # Resolve parameter references to their actual definitions + resolved_param = _resolve_parameter_ref(param) + if resolved_param is None or resolved_param.param_in != param_in: + continue # Skip if we can't resolve the reference + param_name_cleaned = common.normalize_symbol(resolved_param.name) + params.append(f"{resolved_param.name!r} : {param_name_cleaned}") return params @@ -284,9 +291,7 @@ def generate_return_type(operation: Operation) -> OpReturnType: media_type_schema = create_media_type_for_reference(chosen_response) if media_type_schema is None: - return OpReturnType( - type=None, status_code=good_responses[0][0], complex_type=False - ) + return OpReturnType(type=None, status_code=good_responses[0][0], complex_type=False) if is_media_type(media_type_schema): inner_schema = getattr(media_type_schema, "media_type_schema", None) @@ -303,25 +308,18 @@ def generate_return_type(operation: Operation) -> OpReturnType: ) elif is_schema_type(inner_schema): converted_result = type_converter(inner_schema, True) # type: ignore - if "array" in converted_result.original_type and isinstance( - converted_result.import_types, list - ): + if "array" in converted_result.original_type and isinstance(converted_result.import_types, list): matched = re.findall(r"List\[(.+)\]", converted_result.converted_type) if len(matched) > 0: list_type = matched[0] else: # pragma: no cover - raise Exception( - f"Unable to parse list type from {converted_result.converted_type}" - ) + raise Exception(f"Unable to parse list type from {converted_result.converted_type}") else: list_type = None return OpReturnType( type=converted_result, status_code=good_responses[0][0], - complex_type=bool( - converted_result.import_types - and len(converted_result.import_types) > 0 - ), + complex_type=bool(converted_result.import_types and len(converted_result.import_types) > 0), list_type=list_type, ) else: # pragma: no cover @@ -337,18 +335,31 @@ def generate_return_type(operation: Operation) -> OpReturnType: def generate_services( - paths: Dict[str, PathItem], library_config: LibraryConfig + paths: Dict[str, PathItem], + library_config: LibraryConfig, + components: Optional[Components] = None, ) -> List[Service]: """ Generates services from a paths object. :param paths: paths object to be converted + :param library_config: Library configuration + :param components: Optional OpenAPI components for resolving parameter references :return: List of services """ + global _component_params + + # Build a lookup dict for component parameters if available + if components is not None and hasattr(components, "parameters") and components.parameters is not None: + _component_params = {} + for param_name, param_or_ref in components.parameters.items(): + if isinstance(param_or_ref, (Parameter30, Parameter31)): + _component_params[param_name] = param_or_ref + else: + _component_params = None + jinja_env = create_jinja_env() - def generate_service_operation( - op: Operation, path_name: str, async_type: bool - ) -> ServiceOperation: + def generate_service_operation(op: Operation, path_name: str, async_type: bool) -> ServiceOperation: # Merge path-level parameters (always required by spec) into the # operation-level parameters so they get turned into function args. try: @@ -362,36 +373,25 @@ def generate_service_operation( if isinstance(p, (Parameter30, Parameter31)): existing_names.add(p.name) for p in path_level_params: - if ( - isinstance(p, (Parameter30, Parameter31)) - and p.name not in existing_names - ): + if isinstance(p, (Parameter30, Parameter31)) and p.name not in existing_names: if op.parameters is None: op.parameters = [] # type: ignore op.parameters.append(p) # type: ignore except Exception: # pragma: no cover - print( - f"Error merging path-level parameters for {path_name}" - ) # pragma: no cover + print(f"Error merging path-level parameters for {path_name}") # pragma: no cover pass params = generate_params(op) # Fallback: ensure all {placeholders} in path are present as function params try: - placeholder_names = [ - m.group(1) for m in re.finditer(r"\{([^}/]+)\}", path_name) - ] - existing_param_names = { - p.split(":")[0].strip() for p in params.split(",") if ":" in p - } + placeholder_names = [m.group(1) for m in re.finditer(r"\{([^}/]+)\}", path_name)] + existing_param_names = {p.split(":")[0].strip() for p in params.split(",") if ":" in p} for ph in placeholder_names: norm_ph = common.normalize_symbol(ph) if norm_ph not in existing_param_names and norm_ph: params = f"{norm_ph}: Any, " + params except Exception: # pragma: no cover - print( - f"Error ensuring path placeholders in params for {path_name}" - ) # pragma: no cover + print(f"Error ensuring path placeholders in params for {path_name}") # pragma: no cover pass operation_id = generate_operation_id(op, http_operation, path_name) query_params = generate_query_params(op) @@ -415,9 +415,7 @@ def generate_service_operation( use_orjson=common.get_use_orjson(), ) - so.content = jinja_env.get_template(library_config.template_name).render( - **so.model_dump() - ) + so.content = jinja_env.get_template(library_config.template_name).render(**so.model_dump()) if op.tags is not None and len(op.tags) > 0: so.tag = normalize_symbol(op.tags[0]) @@ -457,16 +455,8 @@ def generate_service_operation( services.append( Service( file_name=f"{tag}_service", - operations=[ - so for so in service_ops if so.tag == tag and not so.async_client - ], - content="\n".join( - [ - so.content - for so in service_ops - if so.tag == tag and not so.async_client - ] - ), + operations=[so for so in service_ops if so.tag == tag and not so.async_client], + content="\n".join([so.content for so in service_ops if so.tag == tag and not so.async_client]), async_client=False, library_import=library_config.library_name, use_orjson=common.get_use_orjson(), @@ -477,16 +467,8 @@ def generate_service_operation( services.append( Service( file_name=f"async_{tag}_service", - operations=[ - so for so in service_ops if so.tag == tag and so.async_client - ], - content="\n".join( - [ - so.content - for so in service_ops - if so.tag == tag and so.async_client - ] - ), + operations=[so for so in service_ops if so.tag == tag and so.async_client], + content="\n".join([so.content for so in service_ops if so.tag == tag and so.async_client]), async_client=True, library_import=library_config.library_name, use_orjson=common.get_use_orjson(), diff --git a/tests/test_parameter_reference_resolution.py b/tests/test_parameter_reference_resolution.py new file mode 100644 index 0000000..b250ef8 --- /dev/null +++ b/tests/test_parameter_reference_resolution.py @@ -0,0 +1,188 @@ +from openapi_pydantic.v3.v3_0 import ( + Components as Components30, + Reference as Reference30, + PathItem as PathItem30, + Operation as Operation30, + Response as Response30, + MediaType as MediaType30, + Schema as Schema30, +) +from openapi_pydantic.v3.v3_0.parameter import Parameter as Parameter30 +from openapi_pydantic.v3.v3_1 import ( + Components as Components31, + Reference as Reference31, + PathItem as PathItem31, + Operation as Operation31, + Response as Response31, + MediaType as MediaType31, + Schema as Schema31, +) +from openapi_pydantic.v3.v3_1.parameter import Parameter as Parameter31 + +from openapi_python_generator.common import HTTPLibrary, library_config_dict +from openapi_python_generator.language_converters.python.service_generator import ( + generate_services, + _resolve_parameter_ref, +) +import openapi_python_generator.language_converters.python.service_generator as sg + + +def test_resolve_parameter_ref_returns_direct_parameter(): + param = Parameter30( + name="lang", + param_in="query", + required=True, + param_schema=Schema30(type="string"), + ) + assert _resolve_parameter_ref(param) is param + + +def test_resolve_parameter_ref_resolves_reference(): + lang_param = Parameter30( + name="lang", + param_in="query", + required=True, + param_schema=Schema30(type="string"), + ) + sg._component_params = {"LangParameter": lang_param} + + try: + ref = Reference30(ref="#/components/parameters/LangParameter") + assert _resolve_parameter_ref(ref) is lang_param + finally: + sg._component_params = None + + +def test_resolve_parameter_ref_returns_none_for_missing(): + sg._component_params = {} + try: + ref = Reference30(ref="#/components/parameters/NonExistent") + assert _resolve_parameter_ref(ref) is None + finally: + sg._component_params = None + + +def test_generate_services_resolves_parameter_references_30(): + components = Components30( + parameters={ + "LangParameter": Parameter30( + name="lang", + param_in="query", + required=True, + param_schema=Schema30(type="string"), + ) + } + ) + paths = { + "/test": PathItem30( + get=Operation30( + operationId="getTest", + parameters=[Reference30(ref="#/components/parameters/LangParameter")], + responses={ + "200": Response30( + description="Success", + content={ + "application/json": MediaType30( + media_type_schema=Schema30(type="object") + ) + }, + ) + }, + ) + ) + } + + services = generate_services( + paths, library_config_dict[HTTPLibrary.httpx], components + ) + + sync_services = [s for s in services if not s.async_client] + assert len(sync_services) > 0 + assert "lang" in sync_services[0].operations[0].params + + +def test_generate_services_resolves_parameter_references_31(): + components = Components31( + parameters={ + "TypeParameter": Parameter31( + name="type", + param_in="query", + required=False, + param_schema=Schema31(type="string"), + ) + } + ) + paths = { + "/items": PathItem31( + get=Operation31( + operationId="listItems", + parameters=[Reference31(ref="#/components/parameters/TypeParameter")], + responses={ + "200": Response31( + description="Success", + content={ + "application/json": MediaType31( + media_type_schema=Schema31(type="array") + ) + }, + ) + }, + ) + ) + } + + services = generate_services( + paths, library_config_dict[HTTPLibrary.httpx], components + ) + + sync_services = [s for s in services if not s.async_client] + assert len(sync_services) > 0 + assert "type" in sync_services[0].operations[0].params + + +def test_generate_services_handles_mixed_parameters(): + components = Components30( + parameters={ + "LangParameter": Parameter30( + name="lang", + param_in="query", + required=True, + param_schema=Schema30(type="string"), + ) + } + ) + paths = { + "/test/{id}": PathItem30( + get=Operation30( + operationId="getTestById", + parameters=[ + Parameter30( + name="id", + param_in="path", + required=True, + param_schema=Schema30(type="integer"), + ), + Reference30(ref="#/components/parameters/LangParameter"), + ], + responses={ + "200": Response30( + description="Success", + content={ + "application/json": MediaType30( + media_type_schema=Schema30(type="object") + ) + }, + ) + }, + ) + ) + } + + services = generate_services( + paths, library_config_dict[HTTPLibrary.httpx], components + ) + + sync_services = [s for s in services if not s.async_client] + params = sync_services[0].operations[0].params + assert "id" in params + assert "lang" in params From 661b0cef9252708679f3bf6d81d9a50103ed507e Mon Sep 17 00:00:00 2001 From: Maxime Toussaint Date: Tue, 16 Dec 2025 11:11:39 -0500 Subject: [PATCH 2/5] Update service_generator.py --- .../language_converters/python/service_generator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/openapi_python_generator/language_converters/python/service_generator.py b/src/openapi_python_generator/language_converters/python/service_generator.py index b0e8720..34a93ae 100644 --- a/src/openapi_python_generator/language_converters/python/service_generator.py +++ b/src/openapi_python_generator/language_converters/python/service_generator.py @@ -56,7 +56,6 @@ TypeConversion, ) -# Type alias for Components Components = Union[Components30, Components31] # Module-level storage for component parameters (set by generate_services) From c68ef34a8592a7fd607cc15fec86820bba1593f3 Mon Sep 17 00:00:00 2001 From: Maxime Toussaint Date: Tue, 16 Dec 2025 11:14:14 -0500 Subject: [PATCH 3/5] Update service_generator.py --- .../python/service_generator.py | 133 ++++++++++++++---- 1 file changed, 102 insertions(+), 31 deletions(-) diff --git a/src/openapi_python_generator/language_converters/python/service_generator.py b/src/openapi_python_generator/language_converters/python/service_generator.py index 34a93ae..9820f52 100644 --- a/src/openapi_python_generator/language_converters/python/service_generator.py +++ b/src/openapi_python_generator/language_converters/python/service_generator.py @@ -124,7 +124,9 @@ def generate_body_param(operation: Operation) -> Union[str, None]: if operation.requestBody is None: return None else: - if isinstance(operation.requestBody, Reference30) or isinstance(operation.requestBody, Reference31): + if isinstance(operation.requestBody, Reference30) or isinstance( + operation.requestBody, Reference31 + ): return "data.dict()" if operation.requestBody.content is None: @@ -138,7 +140,9 @@ def generate_body_param(operation: Operation) -> Union[str, None]: if media_type is None: return None # pragma: no cover - if isinstance(media_type.media_type_schema, (Reference, Reference30, Reference31)): + if isinstance( + media_type.media_type_schema, (Reference, Reference30, Reference31) + ): return "data.dict()" elif hasattr(media_type.media_type_schema, "ref"): # Handle Reference objects from different OpenAPI versions @@ -150,7 +154,9 @@ def generate_body_param(operation: Operation) -> Union[str, None]: elif schema.type == "object": return "data" else: - raise Exception(f"Unsupported schema type for request body: {schema.type}") # pragma: no cover + raise Exception( + f"Unsupported schema type for request body: {schema.type}" + ) # pragma: no cover else: raise Exception( f"Unsupported schema type for request body: {type(media_type.media_type_schema)}" @@ -183,17 +189,26 @@ def _generate_params_from_content(content: Any): required = False param_name_cleaned = common.normalize_symbol(param.name) - if isinstance(param.param_schema, Schema30) or isinstance(param.param_schema, Schema31): + if isinstance(param.param_schema, Schema30) or isinstance( + param.param_schema, Schema31 + ): converted_result = ( f"{param_name_cleaned} : {type_converter(param.param_schema, param.required).converted_type}" + ("" if param.required else " = None") ) required = param.required - elif isinstance(param.param_schema, Reference30) or isinstance(param.param_schema, Reference31): - converted_result = f"{param_name_cleaned} : {param.param_schema.ref.split('/')[-1]}" + ( - "" - if isinstance(param, Reference30) or isinstance(param, Reference31) or param.required - else " = None" + elif isinstance(param.param_schema, Reference30) or isinstance( + param.param_schema, Reference31 + ): + converted_result = ( + f"{param_name_cleaned} : {param.param_schema.ref.split('/')[-1]}" + + ( + "" + if isinstance(param, Reference30) + or isinstance(param, Reference31) + or param.required + else " = None" + ) ) required = isinstance(param, Reference) or param.required @@ -209,11 +224,17 @@ def _generate_params_from_content(content: Any): "application/octet-stream", ] - if operation.requestBody is not None and not is_reference_type(operation.requestBody): + if operation.requestBody is not None and not is_reference_type( + operation.requestBody + ): # Safe access only if it's a concrete RequestBody object rb_content = getattr(operation.requestBody, "content", None) - if isinstance(rb_content, dict) and any(rb_content.get(i) is not None for i in operation_request_body_types): - get_keyword = [i for i in operation_request_body_types if rb_content.get(i)][0] + if isinstance(rb_content, dict) and any( + rb_content.get(i) is not None for i in operation_request_body_types + ): + get_keyword = [ + i for i in operation_request_body_types if rb_content.get(i) + ][0] content = rb_content.get(get_keyword) if content is not None and hasattr(content, "media_type_schema"): mts = getattr(content, "media_type_schema", None) @@ -223,7 +244,9 @@ def _generate_params_from_content(content: Any): ): params += f"{_generate_params_from_content(mts)}, " else: # pragma: no cover - raise Exception(f"Unsupported media type schema for {str(operation)}: {type(mts)}") + raise Exception( + f"Unsupported media type schema for {str(operation)}: {type(mts)}" + ) # else: silently ignore unsupported body shapes (could extend later) # Replace - with _ in params params = params.replace("-", "_") @@ -232,7 +255,9 @@ def _generate_params_from_content(content: Any): return params + default_params -def generate_operation_id(operation: Operation, http_op: str, path_name: Optional[str] = None) -> str: +def generate_operation_id( + operation: Operation, http_op: str, path_name: Optional[str] = None +) -> str: if operation.operationId is not None: return common.normalize_symbol(operation.operationId) elif path_name is not None: @@ -243,7 +268,9 @@ def generate_operation_id(operation: Operation, http_op: str, path_name: Optiona ) # pragma: no cover -def _generate_params(operation: Operation, param_in: Literal["query", "header"] = "query"): +def _generate_params( + operation: Operation, param_in: Literal["query", "header"] = "query" +): if operation.parameters is None: return [] @@ -290,7 +317,9 @@ def generate_return_type(operation: Operation) -> OpReturnType: media_type_schema = create_media_type_for_reference(chosen_response) if media_type_schema is None: - return OpReturnType(type=None, status_code=good_responses[0][0], complex_type=False) + return OpReturnType( + type=None, status_code=good_responses[0][0], complex_type=False + ) if is_media_type(media_type_schema): inner_schema = getattr(media_type_schema, "media_type_schema", None) @@ -307,18 +336,25 @@ def generate_return_type(operation: Operation) -> OpReturnType: ) elif is_schema_type(inner_schema): converted_result = type_converter(inner_schema, True) # type: ignore - if "array" in converted_result.original_type and isinstance(converted_result.import_types, list): + if "array" in converted_result.original_type and isinstance( + converted_result.import_types, list + ): matched = re.findall(r"List\[(.+)\]", converted_result.converted_type) if len(matched) > 0: list_type = matched[0] else: # pragma: no cover - raise Exception(f"Unable to parse list type from {converted_result.converted_type}") + raise Exception( + f"Unable to parse list type from {converted_result.converted_type}" + ) else: list_type = None return OpReturnType( type=converted_result, status_code=good_responses[0][0], - complex_type=bool(converted_result.import_types and len(converted_result.import_types) > 0), + complex_type=bool( + converted_result.import_types + and len(converted_result.import_types) > 0 + ), list_type=list_type, ) else: # pragma: no cover @@ -348,7 +384,11 @@ def generate_services( global _component_params # Build a lookup dict for component parameters if available - if components is not None and hasattr(components, "parameters") and components.parameters is not None: + if ( + components is not None + and hasattr(components, "parameters") + and components.parameters is not None + ): _component_params = {} for param_name, param_or_ref in components.parameters.items(): if isinstance(param_or_ref, (Parameter30, Parameter31)): @@ -358,7 +398,9 @@ def generate_services( jinja_env = create_jinja_env() - def generate_service_operation(op: Operation, path_name: str, async_type: bool) -> ServiceOperation: + def generate_service_operation( + op: Operation, path_name: str, async_type: bool + ) -> ServiceOperation: # Merge path-level parameters (always required by spec) into the # operation-level parameters so they get turned into function args. try: @@ -372,25 +414,36 @@ def generate_service_operation(op: Operation, path_name: str, async_type: bool) if isinstance(p, (Parameter30, Parameter31)): existing_names.add(p.name) for p in path_level_params: - if isinstance(p, (Parameter30, Parameter31)) and p.name not in existing_names: + if ( + isinstance(p, (Parameter30, Parameter31)) + and p.name not in existing_names + ): if op.parameters is None: op.parameters = [] # type: ignore op.parameters.append(p) # type: ignore except Exception: # pragma: no cover - print(f"Error merging path-level parameters for {path_name}") # pragma: no cover + print( + f"Error merging path-level parameters for {path_name}" + ) # pragma: no cover pass params = generate_params(op) # Fallback: ensure all {placeholders} in path are present as function params try: - placeholder_names = [m.group(1) for m in re.finditer(r"\{([^}/]+)\}", path_name)] - existing_param_names = {p.split(":")[0].strip() for p in params.split(",") if ":" in p} + placeholder_names = [ + m.group(1) for m in re.finditer(r"\{([^}/]+)\}", path_name) + ] + existing_param_names = { + p.split(":")[0].strip() for p in params.split(",") if ":" in p + } for ph in placeholder_names: norm_ph = common.normalize_symbol(ph) if norm_ph not in existing_param_names and norm_ph: params = f"{norm_ph}: Any, " + params except Exception: # pragma: no cover - print(f"Error ensuring path placeholders in params for {path_name}") # pragma: no cover + print( + f"Error ensuring path placeholders in params for {path_name}" + ) # pragma: no cover pass operation_id = generate_operation_id(op, http_operation, path_name) query_params = generate_query_params(op) @@ -414,7 +467,9 @@ def generate_service_operation(op: Operation, path_name: str, async_type: bool) use_orjson=common.get_use_orjson(), ) - so.content = jinja_env.get_template(library_config.template_name).render(**so.model_dump()) + so.content = jinja_env.get_template(library_config.template_name).render( + **so.model_dump() + ) if op.tags is not None and len(op.tags) > 0: so.tag = normalize_symbol(op.tags[0]) @@ -454,8 +509,16 @@ def generate_service_operation(op: Operation, path_name: str, async_type: bool) services.append( Service( file_name=f"{tag}_service", - operations=[so for so in service_ops if so.tag == tag and not so.async_client], - content="\n".join([so.content for so in service_ops if so.tag == tag and not so.async_client]), + operations=[ + so for so in service_ops if so.tag == tag and not so.async_client + ], + content="\n".join( + [ + so.content + for so in service_ops + if so.tag == tag and not so.async_client + ] + ), async_client=False, library_import=library_config.library_name, use_orjson=common.get_use_orjson(), @@ -466,8 +529,16 @@ def generate_service_operation(op: Operation, path_name: str, async_type: bool) services.append( Service( file_name=f"async_{tag}_service", - operations=[so for so in service_ops if so.tag == tag and so.async_client], - content="\n".join([so.content for so in service_ops if so.tag == tag and so.async_client]), + operations=[ + so for so in service_ops if so.tag == tag and so.async_client + ], + content="\n".join( + [ + so.content + for so in service_ops + if so.tag == tag and so.async_client + ] + ), async_client=True, library_import=library_config.library_name, use_orjson=common.get_use_orjson(), From bb7e19c148c5b17e658909c741fe53efe0edef10 Mon Sep 17 00:00:00 2001 From: Maxime Toussaint Date: Tue, 16 Dec 2025 11:24:29 -0500 Subject: [PATCH 4/5] Update service_generator.py --- .../python/service_generator.py | 299 ++++++++---------- 1 file changed, 125 insertions(+), 174 deletions(-) diff --git a/src/openapi_python_generator/language_converters/python/service_generator.py b/src/openapi_python_generator/language_converters/python/service_generator.py index 9820f52..fd17127 100644 --- a/src/openapi_python_generator/language_converters/python/service_generator.py +++ b/src/openapi_python_generator/language_converters/python/service_generator.py @@ -9,6 +9,9 @@ Response, Schema, ) +from openapi_pydantic.v3.v3_0 import ( + Components as Components30, +) from openapi_pydantic.v3.v3_0 import ( MediaType as MediaType30, ) @@ -24,6 +27,9 @@ Schema as Schema30, ) from openapi_pydantic.v3.v3_0.parameter import Parameter as Parameter30 +from openapi_pydantic.v3.v3_1 import ( + Components as Components31, +) from openapi_pydantic.v3.v3_1 import ( MediaType as MediaType31, ) @@ -37,8 +43,6 @@ Schema as Schema31, ) from openapi_pydantic.v3.v3_1.parameter import Parameter as Parameter31 -from openapi_pydantic.v3.v3_0 import Components as Components30 -from openapi_pydantic.v3.v3_1 import Components as Components31 from openapi_python_generator.language_converters.python import common from openapi_python_generator.language_converters.python.common import normalize_symbol @@ -58,7 +62,6 @@ Components = Union[Components30, Components31] -# Module-level storage for component parameters (set by generate_services) _component_params: Optional[Dict[str, Union[Parameter30, Parameter31]]] = None @@ -71,7 +74,6 @@ def _resolve_parameter_ref( if isinstance(param, (Reference, Reference30, Reference31)): if _component_params is None: return None - # Extract parameter name from $ref like "#/components/parameters/LangParameter" ref_str = getattr(param, "ref", None) if ref_str and ref_str.startswith("#/components/parameters/"): param_name = ref_str.split("/")[-1] @@ -124,9 +126,7 @@ def generate_body_param(operation: Operation) -> Union[str, None]: if operation.requestBody is None: return None else: - if isinstance(operation.requestBody, Reference30) or isinstance( - operation.requestBody, Reference31 - ): + if isinstance(operation.requestBody, Reference30) or isinstance(operation.requestBody, Reference31): return "data.dict()" if operation.requestBody.content is None: @@ -140,9 +140,7 @@ def generate_body_param(operation: Operation) -> Union[str, None]: if media_type is None: return None # pragma: no cover - if isinstance( - media_type.media_type_schema, (Reference, Reference30, Reference31) - ): + if isinstance(media_type.media_type_schema, (Reference, Reference30, Reference31)): return "data.dict()" elif hasattr(media_type.media_type_schema, "ref"): # Handle Reference objects from different OpenAPI versions @@ -154,9 +152,7 @@ def generate_body_param(operation: Operation) -> Union[str, None]: elif schema.type == "object": return "data" else: - raise Exception( - f"Unsupported schema type for request body: {schema.type}" - ) # pragma: no cover + raise Exception(f"Unsupported schema type for request body: {schema.type}") # pragma: no cover else: raise Exception( f"Unsupported schema type for request body: {type(media_type.media_type_schema)}" @@ -180,35 +176,25 @@ def _generate_params_from_content(content: Any): default_params = "" if operation.parameters is not None: for param in operation.parameters: - # Resolve parameter references to their actual definitions resolved_param = _resolve_parameter_ref(param) if resolved_param is None: - continue # Skip if we can't resolve the reference + continue param = resolved_param converted_result = "" required = False param_name_cleaned = common.normalize_symbol(param.name) - if isinstance(param.param_schema, Schema30) or isinstance( - param.param_schema, Schema31 - ): + if isinstance(param.param_schema, Schema30) or isinstance(param.param_schema, Schema31): converted_result = ( f"{param_name_cleaned} : {type_converter(param.param_schema, param.required).converted_type}" + ("" if param.required else " = None") ) required = param.required - elif isinstance(param.param_schema, Reference30) or isinstance( - param.param_schema, Reference31 - ): - converted_result = ( - f"{param_name_cleaned} : {param.param_schema.ref.split('/')[-1]}" - + ( - "" - if isinstance(param, Reference30) - or isinstance(param, Reference31) - or param.required - else " = None" - ) + elif isinstance(param.param_schema, Reference30) or isinstance(param.param_schema, Reference31): + converted_result = f"{param_name_cleaned} : {param.param_schema.ref.split('/')[-1]}" + ( + "" + if isinstance(param, Reference30) or isinstance(param, Reference31) or param.required + else " = None" ) required = isinstance(param, Reference) or param.required @@ -224,17 +210,11 @@ def _generate_params_from_content(content: Any): "application/octet-stream", ] - if operation.requestBody is not None and not is_reference_type( - operation.requestBody - ): + if operation.requestBody is not None and not is_reference_type(operation.requestBody): # Safe access only if it's a concrete RequestBody object rb_content = getattr(operation.requestBody, "content", None) - if isinstance(rb_content, dict) and any( - rb_content.get(i) is not None for i in operation_request_body_types - ): - get_keyword = [ - i for i in operation_request_body_types if rb_content.get(i) - ][0] + if isinstance(rb_content, dict) and any(rb_content.get(i) is not None for i in operation_request_body_types): + get_keyword = [i for i in operation_request_body_types if rb_content.get(i)][0] content = rb_content.get(get_keyword) if content is not None and hasattr(content, "media_type_schema"): mts = getattr(content, "media_type_schema", None) @@ -244,9 +224,7 @@ def _generate_params_from_content(content: Any): ): params += f"{_generate_params_from_content(mts)}, " else: # pragma: no cover - raise Exception( - f"Unsupported media type schema for {str(operation)}: {type(mts)}" - ) + raise Exception(f"Unsupported media type schema for {str(operation)}: {type(mts)}") # else: silently ignore unsupported body shapes (could extend later) # Replace - with _ in params params = params.replace("-", "_") @@ -255,9 +233,7 @@ def _generate_params_from_content(content: Any): return params + default_params -def generate_operation_id( - operation: Operation, http_op: str, path_name: Optional[str] = None -) -> str: +def generate_operation_id(operation: Operation, http_op: str, path_name: Optional[str] = None) -> str: if operation.operationId is not None: return common.normalize_symbol(operation.operationId) elif path_name is not None: @@ -268,18 +244,15 @@ def generate_operation_id( ) # pragma: no cover -def _generate_params( - operation: Operation, param_in: Literal["query", "header"] = "query" -): +def _generate_params(operation: Operation, param_in: Literal["query", "header"] = "query"): if operation.parameters is None: return [] params = [] for param in operation.parameters: - # Resolve parameter references to their actual definitions resolved_param = _resolve_parameter_ref(param) if resolved_param is None or resolved_param.param_in != param_in: - continue # Skip if we can't resolve the reference + continue param_name_cleaned = common.normalize_symbol(resolved_param.name) params.append(f"{resolved_param.name!r} : {param_name_cleaned}") @@ -317,9 +290,7 @@ def generate_return_type(operation: Operation) -> OpReturnType: media_type_schema = create_media_type_for_reference(chosen_response) if media_type_schema is None: - return OpReturnType( - type=None, status_code=good_responses[0][0], complex_type=False - ) + return OpReturnType(type=None, status_code=good_responses[0][0], complex_type=False) if is_media_type(media_type_schema): inner_schema = getattr(media_type_schema, "media_type_schema", None) @@ -336,25 +307,18 @@ def generate_return_type(operation: Operation) -> OpReturnType: ) elif is_schema_type(inner_schema): converted_result = type_converter(inner_schema, True) # type: ignore - if "array" in converted_result.original_type and isinstance( - converted_result.import_types, list - ): + if "array" in converted_result.original_type and isinstance(converted_result.import_types, list): matched = re.findall(r"List\[(.+)\]", converted_result.converted_type) if len(matched) > 0: list_type = matched[0] else: # pragma: no cover - raise Exception( - f"Unable to parse list type from {converted_result.converted_type}" - ) + raise Exception(f"Unable to parse list type from {converted_result.converted_type}") else: list_type = None return OpReturnType( type=converted_result, status_code=good_responses[0][0], - complex_type=bool( - converted_result.import_types - and len(converted_result.import_types) > 0 - ), + complex_type=bool(converted_result.import_types and len(converted_result.import_types) > 0), list_type=list_type, ) else: # pragma: no cover @@ -369,6 +333,81 @@ def generate_return_type(operation: Operation) -> OpReturnType: raise Exception("Unknown media type schema type") # pragma: no cover +def _generate_service_operation( + op: Operation, + path_name: str, + async_type: bool, + path: PathItem, + http_operation: str, + jinja_env: Any, + library_config: LibraryConfig, +) -> ServiceOperation: + try: + path_level_params = [] + if hasattr(path, "parameters") and path.parameters is not None: + path_level_params = [p for p in path.parameters if p is not None] + if path_level_params: + existing_names = set() + if op.parameters is not None: + for p in op.parameters: # type: ignore + if isinstance(p, (Parameter30, Parameter31)): + existing_names.add(p.name) + for p in path_level_params: + if isinstance(p, (Parameter30, Parameter31)) and p.name not in existing_names: + if op.parameters is None: + op.parameters = [] # type: ignore + op.parameters.append(p) # type: ignore + except Exception: # pragma: no cover + print(f"Error merging path-level parameters for {path_name}") # pragma: no cover + pass + + params = generate_params(op) + # Fallback: ensure all {placeholders} in path are present as function params + try: + placeholder_names = [m.group(1) for m in re.finditer(r"\{([^}/]+)\}", path_name)] + existing_param_names = {p.split(":")[0].strip() for p in params.split(",") if ":" in p} + for ph in placeholder_names: + norm_ph = common.normalize_symbol(ph) + if norm_ph not in existing_param_names and norm_ph: + params = f"{norm_ph}: Any, " + params + except Exception: # pragma: no cover + print(f"Error ensuring path placeholders in params for {path_name}") # pragma: no cover + pass + operation_id = generate_operation_id(op, http_operation, path_name) + query_params = generate_query_params(op) + header_params = generate_header_params(op) + return_type = generate_return_type(op) + body_param = generate_body_param(op) + + so = ServiceOperation( + params=params, + operation_id=operation_id, + query_params=query_params, + header_params=header_params, + return_type=return_type, + operation=op, + pathItem=path, + content="", + async_client=async_type, + body_param=body_param, + path_name=path_name, + method=http_operation, + use_orjson=common.get_use_orjson(), + ) + + so.content = jinja_env.get_template(library_config.template_name).render(**so.model_dump()) + + if op.tags is not None and len(op.tags) > 0: + so.tag = normalize_symbol(op.tags[0]) + + try: + compile(so.content, "", "exec") + except SyntaxError as e: # pragma: no cover + click.echo(f"Error in service {so.operation_id}: {e}") # pragma: no cover + + return so + + def generate_services( paths: Dict[str, PathItem], library_config: LibraryConfig, @@ -383,12 +422,7 @@ def generate_services( """ global _component_params - # Build a lookup dict for component parameters if available - if ( - components is not None - and hasattr(components, "parameters") - and components.parameters is not None - ): + if components is not None and hasattr(components, "parameters") and components.parameters is not None: _component_params = {} for param_name, param_or_ref in components.parameters.items(): if isinstance(param_or_ref, (Parameter30, Parameter31)): @@ -398,89 +432,6 @@ def generate_services( jinja_env = create_jinja_env() - def generate_service_operation( - op: Operation, path_name: str, async_type: bool - ) -> ServiceOperation: - # Merge path-level parameters (always required by spec) into the - # operation-level parameters so they get turned into function args. - try: - path_level_params = [] - if hasattr(path, "parameters") and path.parameters is not None: # type: ignore - path_level_params = [p for p in path.parameters if p is not None] # type: ignore - if path_level_params: - existing_names = set() - if op.parameters is not None: - for p in op.parameters: # type: ignore - if isinstance(p, (Parameter30, Parameter31)): - existing_names.add(p.name) - for p in path_level_params: - if ( - isinstance(p, (Parameter30, Parameter31)) - and p.name not in existing_names - ): - if op.parameters is None: - op.parameters = [] # type: ignore - op.parameters.append(p) # type: ignore - except Exception: # pragma: no cover - print( - f"Error merging path-level parameters for {path_name}" - ) # pragma: no cover - pass - - params = generate_params(op) - # Fallback: ensure all {placeholders} in path are present as function params - try: - placeholder_names = [ - m.group(1) for m in re.finditer(r"\{([^}/]+)\}", path_name) - ] - existing_param_names = { - p.split(":")[0].strip() for p in params.split(",") if ":" in p - } - for ph in placeholder_names: - norm_ph = common.normalize_symbol(ph) - if norm_ph not in existing_param_names and norm_ph: - params = f"{norm_ph}: Any, " + params - except Exception: # pragma: no cover - print( - f"Error ensuring path placeholders in params for {path_name}" - ) # pragma: no cover - pass - operation_id = generate_operation_id(op, http_operation, path_name) - query_params = generate_query_params(op) - header_params = generate_header_params(op) - return_type = generate_return_type(op) - body_param = generate_body_param(op) - - so = ServiceOperation( - params=params, - operation_id=operation_id, - query_params=query_params, - header_params=header_params, - return_type=return_type, - operation=op, - pathItem=path, - content="", - async_client=async_type, - body_param=body_param, - path_name=path_name, - method=http_operation, - use_orjson=common.get_use_orjson(), - ) - - so.content = jinja_env.get_template(library_config.template_name).render( - **so.model_dump() - ) - - if op.tags is not None and len(op.tags) > 0: - so.tag = normalize_symbol(op.tags[0]) - - try: - compile(so.content, "", "exec") - except SyntaxError as e: # pragma: no cover - click.echo(f"Error in service {so.operation_id}: {e}") # pragma: no cover - - return so - services = [] service_ops = [] for path_name, path in paths.items(): @@ -491,11 +442,27 @@ def generate_service_operation( continue if library_config.include_sync: - sync_so = generate_service_operation(op, clean_path_name, False) + sync_so = _generate_service_operation( + op, + clean_path_name, + False, + path, + http_operation, + jinja_env, + library_config, + ) service_ops.append(sync_so) if library_config.include_async: - async_so = generate_service_operation(op, clean_path_name, True) + async_so = _generate_service_operation( + op, + clean_path_name, + True, + path, + http_operation, + jinja_env, + library_config, + ) service_ops.append(async_so) # Ensure every operation has a tag; fallback to "default" for untagged operations @@ -509,16 +476,8 @@ def generate_service_operation( services.append( Service( file_name=f"{tag}_service", - operations=[ - so for so in service_ops if so.tag == tag and not so.async_client - ], - content="\n".join( - [ - so.content - for so in service_ops - if so.tag == tag and not so.async_client - ] - ), + operations=[so for so in service_ops if so.tag == tag and not so.async_client], + content="\n".join([so.content for so in service_ops if so.tag == tag and not so.async_client]), async_client=False, library_import=library_config.library_name, use_orjson=common.get_use_orjson(), @@ -529,16 +488,8 @@ def generate_service_operation( services.append( Service( file_name=f"async_{tag}_service", - operations=[ - so for so in service_ops if so.tag == tag and so.async_client - ], - content="\n".join( - [ - so.content - for so in service_ops - if so.tag == tag and so.async_client - ] - ), + operations=[so for so in service_ops if so.tag == tag and so.async_client], + content="\n".join([so.content for so in service_ops if so.tag == tag and so.async_client]), async_client=True, library_import=library_config.library_name, use_orjson=common.get_use_orjson(), From 282d47d5a4ffaa37ed4a6fe8fcab8514e048c0ef Mon Sep 17 00:00:00 2001 From: Maxime Toussaint Date: Tue, 16 Dec 2025 11:28:48 -0500 Subject: [PATCH 5/5] Update service_generator.py --- .../python/service_generator.py | 129 ++++++++++++++---- 1 file changed, 99 insertions(+), 30 deletions(-) diff --git a/src/openapi_python_generator/language_converters/python/service_generator.py b/src/openapi_python_generator/language_converters/python/service_generator.py index fd17127..d0f67d6 100644 --- a/src/openapi_python_generator/language_converters/python/service_generator.py +++ b/src/openapi_python_generator/language_converters/python/service_generator.py @@ -126,7 +126,9 @@ def generate_body_param(operation: Operation) -> Union[str, None]: if operation.requestBody is None: return None else: - if isinstance(operation.requestBody, Reference30) or isinstance(operation.requestBody, Reference31): + if isinstance(operation.requestBody, Reference30) or isinstance( + operation.requestBody, Reference31 + ): return "data.dict()" if operation.requestBody.content is None: @@ -140,7 +142,9 @@ def generate_body_param(operation: Operation) -> Union[str, None]: if media_type is None: return None # pragma: no cover - if isinstance(media_type.media_type_schema, (Reference, Reference30, Reference31)): + if isinstance( + media_type.media_type_schema, (Reference, Reference30, Reference31) + ): return "data.dict()" elif hasattr(media_type.media_type_schema, "ref"): # Handle Reference objects from different OpenAPI versions @@ -152,7 +156,9 @@ def generate_body_param(operation: Operation) -> Union[str, None]: elif schema.type == "object": return "data" else: - raise Exception(f"Unsupported schema type for request body: {schema.type}") # pragma: no cover + raise Exception( + f"Unsupported schema type for request body: {schema.type}" + ) # pragma: no cover else: raise Exception( f"Unsupported schema type for request body: {type(media_type.media_type_schema)}" @@ -184,17 +190,26 @@ def _generate_params_from_content(content: Any): required = False param_name_cleaned = common.normalize_symbol(param.name) - if isinstance(param.param_schema, Schema30) or isinstance(param.param_schema, Schema31): + if isinstance(param.param_schema, Schema30) or isinstance( + param.param_schema, Schema31 + ): converted_result = ( f"{param_name_cleaned} : {type_converter(param.param_schema, param.required).converted_type}" + ("" if param.required else " = None") ) required = param.required - elif isinstance(param.param_schema, Reference30) or isinstance(param.param_schema, Reference31): - converted_result = f"{param_name_cleaned} : {param.param_schema.ref.split('/')[-1]}" + ( - "" - if isinstance(param, Reference30) or isinstance(param, Reference31) or param.required - else " = None" + elif isinstance(param.param_schema, Reference30) or isinstance( + param.param_schema, Reference31 + ): + converted_result = ( + f"{param_name_cleaned} : {param.param_schema.ref.split('/')[-1]}" + + ( + "" + if isinstance(param, Reference30) + or isinstance(param, Reference31) + or param.required + else " = None" + ) ) required = isinstance(param, Reference) or param.required @@ -210,11 +225,17 @@ def _generate_params_from_content(content: Any): "application/octet-stream", ] - if operation.requestBody is not None and not is_reference_type(operation.requestBody): + if operation.requestBody is not None and not is_reference_type( + operation.requestBody + ): # Safe access only if it's a concrete RequestBody object rb_content = getattr(operation.requestBody, "content", None) - if isinstance(rb_content, dict) and any(rb_content.get(i) is not None for i in operation_request_body_types): - get_keyword = [i for i in operation_request_body_types if rb_content.get(i)][0] + if isinstance(rb_content, dict) and any( + rb_content.get(i) is not None for i in operation_request_body_types + ): + get_keyword = [ + i for i in operation_request_body_types if rb_content.get(i) + ][0] content = rb_content.get(get_keyword) if content is not None and hasattr(content, "media_type_schema"): mts = getattr(content, "media_type_schema", None) @@ -224,7 +245,9 @@ def _generate_params_from_content(content: Any): ): params += f"{_generate_params_from_content(mts)}, " else: # pragma: no cover - raise Exception(f"Unsupported media type schema for {str(operation)}: {type(mts)}") + raise Exception( + f"Unsupported media type schema for {str(operation)}: {type(mts)}" + ) # else: silently ignore unsupported body shapes (could extend later) # Replace - with _ in params params = params.replace("-", "_") @@ -233,7 +256,9 @@ def _generate_params_from_content(content: Any): return params + default_params -def generate_operation_id(operation: Operation, http_op: str, path_name: Optional[str] = None) -> str: +def generate_operation_id( + operation: Operation, http_op: str, path_name: Optional[str] = None +) -> str: if operation.operationId is not None: return common.normalize_symbol(operation.operationId) elif path_name is not None: @@ -244,7 +269,9 @@ def generate_operation_id(operation: Operation, http_op: str, path_name: Optiona ) # pragma: no cover -def _generate_params(operation: Operation, param_in: Literal["query", "header"] = "query"): +def _generate_params( + operation: Operation, param_in: Literal["query", "header"] = "query" +): if operation.parameters is None: return [] @@ -290,7 +317,9 @@ def generate_return_type(operation: Operation) -> OpReturnType: media_type_schema = create_media_type_for_reference(chosen_response) if media_type_schema is None: - return OpReturnType(type=None, status_code=good_responses[0][0], complex_type=False) + return OpReturnType( + type=None, status_code=good_responses[0][0], complex_type=False + ) if is_media_type(media_type_schema): inner_schema = getattr(media_type_schema, "media_type_schema", None) @@ -307,18 +336,25 @@ def generate_return_type(operation: Operation) -> OpReturnType: ) elif is_schema_type(inner_schema): converted_result = type_converter(inner_schema, True) # type: ignore - if "array" in converted_result.original_type and isinstance(converted_result.import_types, list): + if "array" in converted_result.original_type and isinstance( + converted_result.import_types, list + ): matched = re.findall(r"List\[(.+)\]", converted_result.converted_type) if len(matched) > 0: list_type = matched[0] else: # pragma: no cover - raise Exception(f"Unable to parse list type from {converted_result.converted_type}") + raise Exception( + f"Unable to parse list type from {converted_result.converted_type}" + ) else: list_type = None return OpReturnType( type=converted_result, status_code=good_responses[0][0], - complex_type=bool(converted_result.import_types and len(converted_result.import_types) > 0), + complex_type=bool( + converted_result.import_types + and len(converted_result.import_types) > 0 + ), list_type=list_type, ) else: # pragma: no cover @@ -353,25 +389,36 @@ def _generate_service_operation( if isinstance(p, (Parameter30, Parameter31)): existing_names.add(p.name) for p in path_level_params: - if isinstance(p, (Parameter30, Parameter31)) and p.name not in existing_names: + if ( + isinstance(p, (Parameter30, Parameter31)) + and p.name not in existing_names + ): if op.parameters is None: op.parameters = [] # type: ignore op.parameters.append(p) # type: ignore except Exception: # pragma: no cover - print(f"Error merging path-level parameters for {path_name}") # pragma: no cover + print( + f"Error merging path-level parameters for {path_name}" + ) # pragma: no cover pass params = generate_params(op) # Fallback: ensure all {placeholders} in path are present as function params try: - placeholder_names = [m.group(1) for m in re.finditer(r"\{([^}/]+)\}", path_name)] - existing_param_names = {p.split(":")[0].strip() for p in params.split(",") if ":" in p} + placeholder_names = [ + m.group(1) for m in re.finditer(r"\{([^}/]+)\}", path_name) + ] + existing_param_names = { + p.split(":")[0].strip() for p in params.split(",") if ":" in p + } for ph in placeholder_names: norm_ph = common.normalize_symbol(ph) if norm_ph not in existing_param_names and norm_ph: params = f"{norm_ph}: Any, " + params except Exception: # pragma: no cover - print(f"Error ensuring path placeholders in params for {path_name}") # pragma: no cover + print( + f"Error ensuring path placeholders in params for {path_name}" + ) # pragma: no cover pass operation_id = generate_operation_id(op, http_operation, path_name) query_params = generate_query_params(op) @@ -395,7 +442,9 @@ def _generate_service_operation( use_orjson=common.get_use_orjson(), ) - so.content = jinja_env.get_template(library_config.template_name).render(**so.model_dump()) + so.content = jinja_env.get_template(library_config.template_name).render( + **so.model_dump() + ) if op.tags is not None and len(op.tags) > 0: so.tag = normalize_symbol(op.tags[0]) @@ -422,7 +471,11 @@ def generate_services( """ global _component_params - if components is not None and hasattr(components, "parameters") and components.parameters is not None: + if ( + components is not None + and hasattr(components, "parameters") + and components.parameters is not None + ): _component_params = {} for param_name, param_or_ref in components.parameters.items(): if isinstance(param_or_ref, (Parameter30, Parameter31)): @@ -476,8 +529,16 @@ def generate_services( services.append( Service( file_name=f"{tag}_service", - operations=[so for so in service_ops if so.tag == tag and not so.async_client], - content="\n".join([so.content for so in service_ops if so.tag == tag and not so.async_client]), + operations=[ + so for so in service_ops if so.tag == tag and not so.async_client + ], + content="\n".join( + [ + so.content + for so in service_ops + if so.tag == tag and not so.async_client + ] + ), async_client=False, library_import=library_config.library_name, use_orjson=common.get_use_orjson(), @@ -488,8 +549,16 @@ def generate_services( services.append( Service( file_name=f"async_{tag}_service", - operations=[so for so in service_ops if so.tag == tag and so.async_client], - content="\n".join([so.content for so in service_ops if so.tag == tag and so.async_client]), + operations=[ + so for so in service_ops if so.tag == tag and so.async_client + ], + content="\n".join( + [ + so.content + for so in service_ops + if so.tag == tag and so.async_client + ] + ), async_client=True, library_import=library_config.library_name, use_orjson=common.get_use_orjson(),