Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def generator(
models = []

if data.paths is not None:
services = generate_services(data.paths, library_config)
services = generate_services(data.paths, library_config, data.components)
else:
services = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
Response,
Schema,
)
from openapi_pydantic.v3.v3_0 import (
Components as Components30,
)
from openapi_pydantic.v3.v3_0 import (
MediaType as MediaType30,
)
Expand All @@ -24,6 +27,9 @@
Schema as Schema30,
)
from openapi_pydantic.v3.v3_0.parameter import Parameter as Parameter30
from openapi_pydantic.v3.v3_1 import (
Components as Components31,
)
from openapi_pydantic.v3.v3_1 import (
MediaType as MediaType31,
)
Expand Down Expand Up @@ -54,6 +60,29 @@
TypeConversion,
)

Components = Union[Components30, Components31]

_component_params: Optional[Dict[str, Union[Parameter30, Parameter31]]] = None


def _resolve_parameter_ref(
param: Union[Parameter30, Parameter31, Reference30, Reference31],
) -> Optional[Union[Parameter30, Parameter31]]:
if isinstance(param, (Parameter30, Parameter31)):
return param

if isinstance(param, (Reference, Reference30, Reference31)):
if _component_params is None:
return None
ref_str = getattr(param, "ref", None)
if ref_str and ref_str.startswith("#/components/parameters/"):
param_name = ref_str.split("/")[-1]
resolved = _component_params.get(param_name)
if resolved is not None:
return resolved

return None


# Helper functions for isinstance checks across OpenAPI versions
def is_response_type(obj) -> bool:
Expand Down Expand Up @@ -153,8 +182,10 @@ def _generate_params_from_content(content: Any):
default_params = ""
if operation.parameters is not None:
for param in operation.parameters:
if not isinstance(param, (Parameter30, Parameter31)):
continue # pragma: no cover
resolved_param = _resolve_parameter_ref(param)
if resolved_param is None:
continue
param = resolved_param
converted_result = ""
required = False
param_name_cleaned = common.normalize_symbol(param.name)
Expand All @@ -171,7 +202,7 @@ def _generate_params_from_content(content: Any):
param.param_schema, Reference31
):
converted_result = (
f"{param_name_cleaned} : {param.param_schema.ref.split('/')[-1] }"
f"{param_name_cleaned} : {param.param_schema.ref.split('/')[-1]}"
+ (
""
if isinstance(param, Reference30)
Expand Down Expand Up @@ -246,9 +277,11 @@ def _generate_params(

params = []
for param in operation.parameters:
if isinstance(param, (Parameter30, Parameter31)) and param.param_in == param_in:
param_name_cleaned = common.normalize_symbol(param.name)
params.append(f"{param.name!r} : {param_name_cleaned}")
resolved_param = _resolve_parameter_ref(param)
if resolved_param is None or resolved_param.param_in != param_in:
continue
param_name_cleaned = common.normalize_symbol(resolved_param.name)
params.append(f"{resolved_param.name!r} : {param_name_cleaned}")

return params

Expand Down Expand Up @@ -336,98 +369,121 @@ def generate_return_type(operation: Operation) -> OpReturnType:
raise Exception("Unknown media type schema type") # pragma: no cover


def _generate_service_operation(
op: Operation,
path_name: str,
async_type: bool,
path: PathItem,
http_operation: str,
jinja_env: Any,
library_config: LibraryConfig,
) -> ServiceOperation:
try:
path_level_params = []
if hasattr(path, "parameters") and path.parameters is not None:
path_level_params = [p for p in path.parameters if p is not None]
if path_level_params:
existing_names = set()
if op.parameters is not None:
for p in op.parameters: # type: ignore
if isinstance(p, (Parameter30, Parameter31)):
existing_names.add(p.name)
for p in path_level_params:
if (
isinstance(p, (Parameter30, Parameter31))
and p.name not in existing_names
):
if op.parameters is None:
op.parameters = [] # type: ignore
op.parameters.append(p) # type: ignore
except Exception: # pragma: no cover
print(
f"Error merging path-level parameters for {path_name}"
) # pragma: no cover
pass

params = generate_params(op)
# Fallback: ensure all {placeholders} in path are present as function params
try:
placeholder_names = [
m.group(1) for m in re.finditer(r"\{([^}/]+)\}", path_name)
]
existing_param_names = {
p.split(":")[0].strip() for p in params.split(",") if ":" in p
}
for ph in placeholder_names:
norm_ph = common.normalize_symbol(ph)
if norm_ph not in existing_param_names and norm_ph:
params = f"{norm_ph}: Any, " + params
except Exception: # pragma: no cover
print(
f"Error ensuring path placeholders in params for {path_name}"
) # pragma: no cover
pass
operation_id = generate_operation_id(op, http_operation, path_name)
query_params = generate_query_params(op)
header_params = generate_header_params(op)
return_type = generate_return_type(op)
body_param = generate_body_param(op)

so = ServiceOperation(
params=params,
operation_id=operation_id,
query_params=query_params,
header_params=header_params,
return_type=return_type,
operation=op,
pathItem=path,
content="",
async_client=async_type,
body_param=body_param,
path_name=path_name,
method=http_operation,
use_orjson=common.get_use_orjson(),
)

so.content = jinja_env.get_template(library_config.template_name).render(
**so.model_dump()
)

if op.tags is not None and len(op.tags) > 0:
so.tag = normalize_symbol(op.tags[0])

try:
compile(so.content, "<string>", "exec")
except SyntaxError as e: # pragma: no cover
click.echo(f"Error in service {so.operation_id}: {e}") # pragma: no cover

return so


def generate_services(
paths: Dict[str, PathItem], library_config: LibraryConfig
paths: Dict[str, PathItem],
library_config: LibraryConfig,
components: Optional[Components] = None,
) -> List[Service]:
"""
Generates services from a paths object.
:param paths: paths object to be converted
:param library_config: Library configuration
:param components: Optional OpenAPI components for resolving parameter references
:return: List of services
"""
jinja_env = create_jinja_env()

def generate_service_operation(
op: Operation, path_name: str, async_type: bool
) -> ServiceOperation:
# Merge path-level parameters (always required by spec) into the
# operation-level parameters so they get turned into function args.
try:
path_level_params = []
if hasattr(path, "parameters") and path.parameters is not None: # type: ignore
path_level_params = [p for p in path.parameters if p is not None] # type: ignore
if path_level_params:
existing_names = set()
if op.parameters is not None:
for p in op.parameters: # type: ignore
if isinstance(p, (Parameter30, Parameter31)):
existing_names.add(p.name)
for p in path_level_params:
if (
isinstance(p, (Parameter30, Parameter31))
and p.name not in existing_names
):
if op.parameters is None:
op.parameters = [] # type: ignore
op.parameters.append(p) # type: ignore
except Exception: # pragma: no cover
print(
f"Error merging path-level parameters for {path_name}"
) # pragma: no cover
pass

params = generate_params(op)
# Fallback: ensure all {placeholders} in path are present as function params
try:
placeholder_names = [
m.group(1) for m in re.finditer(r"\{([^}/]+)\}", path_name)
]
existing_param_names = {
p.split(":")[0].strip() for p in params.split(",") if ":" in p
}
for ph in placeholder_names:
norm_ph = common.normalize_symbol(ph)
if norm_ph not in existing_param_names and norm_ph:
params = f"{norm_ph}: Any, " + params
except Exception: # pragma: no cover
print(
f"Error ensuring path placeholders in params for {path_name}"
) # pragma: no cover
pass
operation_id = generate_operation_id(op, http_operation, path_name)
query_params = generate_query_params(op)
header_params = generate_header_params(op)
return_type = generate_return_type(op)
body_param = generate_body_param(op)

so = ServiceOperation(
params=params,
operation_id=operation_id,
query_params=query_params,
header_params=header_params,
return_type=return_type,
operation=op,
pathItem=path,
content="",
async_client=async_type,
body_param=body_param,
path_name=path_name,
method=http_operation,
use_orjson=common.get_use_orjson(),
)
global _component_params

so.content = jinja_env.get_template(library_config.template_name).render(
**so.model_dump()
)

if op.tags is not None and len(op.tags) > 0:
so.tag = normalize_symbol(op.tags[0])

try:
compile(so.content, "<string>", "exec")
except SyntaxError as e: # pragma: no cover
click.echo(f"Error in service {so.operation_id}: {e}") # pragma: no cover
if (
components is not None
and hasattr(components, "parameters")
and components.parameters is not None
):
_component_params = {}
for param_name, param_or_ref in components.parameters.items():
if isinstance(param_or_ref, (Parameter30, Parameter31)):
_component_params[param_name] = param_or_ref
else:
_component_params = None

return so
jinja_env = create_jinja_env()

services = []
service_ops = []
Expand All @@ -439,11 +495,27 @@ def generate_service_operation(
continue

if library_config.include_sync:
sync_so = generate_service_operation(op, clean_path_name, False)
sync_so = _generate_service_operation(
op,
clean_path_name,
False,
path,
http_operation,
jinja_env,
library_config,
)
service_ops.append(sync_so)

if library_config.include_async:
async_so = generate_service_operation(op, clean_path_name, True)
async_so = _generate_service_operation(
op,
clean_path_name,
True,
path,
http_operation,
jinja_env,
library_config,
)
service_ops.append(async_so)

# Ensure every operation has a tag; fallback to "default" for untagged operations
Expand Down
Loading