From 93fe8f4d5621cfd095fd049afc24fca4fca114e2 Mon Sep 17 00:00:00 2001 From: Mridang Agarwalla Date: Wed, 23 Apr 2025 13:19:57 +0300 Subject: [PATCH 1/7] Improved the spec tests by adding some tests for the session service --- ...o_sanity_check_the_session_service_spec.py | 137 ++++++++++++++++++ zitadel_client/api/session_service_api.py | 10 +- 2 files changed, 142 insertions(+), 5 deletions(-) create mode 100644 spec/sdk_test_to_sanity_check_the_session_service_spec.py diff --git a/spec/sdk_test_to_sanity_check_the_session_service_spec.py b/spec/sdk_test_to_sanity_check_the_session_service_spec.py new file mode 100644 index 00000000..c4756aa9 --- /dev/null +++ b/spec/sdk_test_to_sanity_check_the_session_service_spec.py @@ -0,0 +1,137 @@ +""" +SessionService Integration Tests + +This suite verifies the Zitadel SessionService API's basic operations using a +personal access token: + + 1. Create a session with specified checks and lifetime + 2. Retrieve the session by ID + 3. List sessions and ensure the created session appears + 4. Update the session's lifetime and confirm a new token is returned + 5. Error when retrieving a non-existent session + +Each test runs in isolation: a new session is created in the `session` fixture and +deleted after the test to ensure a clean state. +""" + +import os +import uuid +from typing import Generator + +import pytest + +import zitadel_client as zitadel +from zitadel_client.exceptions import ApiException +from zitadel_client.models import ( + SessionServiceChecks, + SessionServiceCheckUser, + SessionServiceCreateSessionRequest, + SessionServiceCreateSessionResponse, + SessionServiceDeleteSessionBody, + SessionServiceGetSessionResponse, + SessionServiceListSessionsRequest, + SessionServiceListSessionsResponse, + SessionServiceSetSessionRequest, + SessionServiceSetSessionResponse, +) + + +def pytest_configure() -> None: + """Skip tests if required environment variables are missing.""" + if not os.getenv("AUTH_TOKEN") or not os.getenv("BASE_URL"): + pytest.skip( + "Environment variables AUTH_TOKEN and BASE_URL must be set", + allow_module_level=True, + ) + + +@pytest.fixture(scope="module") +def client() -> zitadel.Zitadel: + """Provides a Zitadel client configured with a personal access token.""" + token: str = os.getenv("AUTH_TOKEN") # type: ignore + base_url: str = os.getenv("BASE_URL") # type: ignore + return zitadel.Zitadel.with_access_token(base_url, token) + + +@pytest.fixture +def session(client: zitadel.Zitadel) -> Generator[SessionServiceCreateSessionResponse, None, None]: + """Creates a fresh session for each test and cleans up afterward.""" + request = SessionServiceCreateSessionRequest( + checks=SessionServiceChecks( + user=SessionServiceCheckUser(loginName="johndoe") + ), + lifetime="18000s", + ) + response = client.sessions.session_service_create_session(request) + yield response + # Teardown + delete_body = SessionServiceDeleteSessionBody() + try: + client.sessions.session_service_delete_session( + response.session_id if response.session_id is not None else "", + delete_body, + ) + except ApiException: + pass + + +class TestSessionServiceSanityCheckSpec: + """ + Integration tests for the SessionService endpoints. + + Verifies create, get, list, set, and error behaviors for sessions. + """ + + def test_retrieves_session_details_by_id( + self, + client: zitadel.Zitadel, + session: SessionServiceCreateSessionResponse, + ) -> None: + """Retrieves the session details by ID.""" + response: SessionServiceGetSessionResponse = ( + client.sessions.session_service_get_session( + session.session_id if session.session_id is not None else "" + ) + ) + assert response.session is not None + assert response.session.id == session.session_id + + def test_includes_created_session_when_listing( + self, + client: zitadel.Zitadel, + session: SessionServiceCreateSessionResponse, + ) -> None: + """Includes the created session when listing all sessions.""" + request = SessionServiceListSessionsRequest(queries=[]) + response: SessionServiceListSessionsResponse = ( + client.sessions.session_service_list_sessions(request) + ) + assert response.sessions is not None + assert session.session_id in [session.id for session in response.sessions] + + def test_updates_session_lifetime_and_returns_new_token( + self, + client: zitadel.Zitadel, + session: SessionServiceCreateSessionResponse, + ) -> None: + """Updates the session lifetime and returns a new token.""" + request = SessionServiceSetSessionRequest(lifetime="36000s") + response: SessionServiceSetSessionResponse = ( + client.sessions.session_service_set_session( + session.session_id if session.session_id is not None else "", + request, + ) + ) + assert isinstance(response.session_token, str) + + def test_raises_api_exception_for_nonexistent_session( + self, + client: zitadel.Zitadel, + session: SessionServiceCreateSessionResponse, + ) -> None: + """Raises an ApiException when retrieving a non-existent session.""" + with pytest.raises(ApiException): + client.sessions.session_service_get_session( + str(uuid.uuid4()), + session_token=session.session_token, + ) diff --git a/zitadel_client/api/session_service_api.py b/zitadel_client/api/session_service_api.py index 8efe7aa4..b55a49c3 100644 --- a/zitadel_client/api/session_service_api.py +++ b/zitadel_client/api/session_service_api.py @@ -101,7 +101,7 @@ def session_service_create_session( ) _response_types_map: Dict[str, Optional[str]] = { - '200': "SessionServiceCreateSessionResponse", + '201': "SessionServiceCreateSessionResponse", '403': "SessionServiceRpcStatus", '404': "SessionServiceRpcStatus", } @@ -170,7 +170,7 @@ def session_service_create_session_with_http_info( ) _response_types_map: Dict[str, Optional[str]] = { - '200': "SessionServiceCreateSessionResponse", + '201': "SessionServiceCreateSessionResponse", '403': "SessionServiceRpcStatus", '404': "SessionServiceRpcStatus", } @@ -239,7 +239,7 @@ def session_service_create_session_without_preload_content( ) _response_types_map: Dict[str, Optional[str]] = { - '200': "SessionServiceCreateSessionResponse", + '201': "SessionServiceCreateSessionResponse", '403': "SessionServiceRpcStatus", '404': "SessionServiceRpcStatus", } @@ -866,9 +866,9 @@ def _session_service_get_session_serialize( _path_params['sessionId'] = session_id # process the query parameters if session_token is not None: - + _query_params.append(('sessionToken', session_token)) - + # process the header parameters # process the form parameters # process the body parameter From 3d20b7f9f28a381dacba9b2f467f4a8ea1bdf7de Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 23 Apr 2025 10:20:53 +0000 Subject: [PATCH 2/7] style: Apply automated code formatting [skip ci] --- ...o_sanity_check_the_session_service_spec.py | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/spec/sdk_test_to_sanity_check_the_session_service_spec.py b/spec/sdk_test_to_sanity_check_the_session_service_spec.py index c4756aa9..1efd4220 100644 --- a/spec/sdk_test_to_sanity_check_the_session_service_spec.py +++ b/spec/sdk_test_to_sanity_check_the_session_service_spec.py @@ -57,9 +57,7 @@ def client() -> zitadel.Zitadel: def session(client: zitadel.Zitadel) -> Generator[SessionServiceCreateSessionResponse, None, None]: """Creates a fresh session for each test and cleans up afterward.""" request = SessionServiceCreateSessionRequest( - checks=SessionServiceChecks( - user=SessionServiceCheckUser(loginName="johndoe") - ), + checks=SessionServiceChecks(user=SessionServiceCheckUser(loginName="johndoe")), lifetime="18000s", ) response = client.sessions.session_service_create_session(request) @@ -88,10 +86,8 @@ def test_retrieves_session_details_by_id( session: SessionServiceCreateSessionResponse, ) -> None: """Retrieves the session details by ID.""" - response: SessionServiceGetSessionResponse = ( - client.sessions.session_service_get_session( - session.session_id if session.session_id is not None else "" - ) + response: SessionServiceGetSessionResponse = client.sessions.session_service_get_session( + session.session_id if session.session_id is not None else "" ) assert response.session is not None assert response.session.id == session.session_id @@ -103,9 +99,7 @@ def test_includes_created_session_when_listing( ) -> None: """Includes the created session when listing all sessions.""" request = SessionServiceListSessionsRequest(queries=[]) - response: SessionServiceListSessionsResponse = ( - client.sessions.session_service_list_sessions(request) - ) + response: SessionServiceListSessionsResponse = client.sessions.session_service_list_sessions(request) assert response.sessions is not None assert session.session_id in [session.id for session in response.sessions] @@ -116,11 +110,9 @@ def test_updates_session_lifetime_and_returns_new_token( ) -> None: """Updates the session lifetime and returns a new token.""" request = SessionServiceSetSessionRequest(lifetime="36000s") - response: SessionServiceSetSessionResponse = ( - client.sessions.session_service_set_session( - session.session_id if session.session_id is not None else "", - request, - ) + response: SessionServiceSetSessionResponse = client.sessions.session_service_set_session( + session.session_id if session.session_id is not None else "", + request, ) assert isinstance(response.session_token, str) From 2b37d4562a90d09ddb4dcc2b8cf0939cd3a8645a Mon Sep 17 00:00:00 2001 From: Mridang Agarwalla Date: Wed, 23 Apr 2025 13:35:43 +0300 Subject: [PATCH 3/7] Test commit to trigger the workflow runs From c0d1b308a8548ca0348d7e3738232f1410d54f1f Mon Sep 17 00:00:00 2001 From: Mridang Agarwalla Date: Thu, 24 Apr 2025 10:08:54 +0300 Subject: [PATCH 4/7] Added more tests to better verify the auth --- spec/auth/using_access_token_spec.py | 55 ++++++++ spec/auth/using_client_credentials_spec.py | 66 ++++++++++ spec/auth/using_private_key_spec.py | 56 ++++++++ ..._spec.py => check_session_service_spec.py} | 29 +++-- spec/check_user_service_spec.py | 122 ++++++++++++++++++ ..._client_credentials_authentication_spec.py | 66 ---------- ...rsonal_access_token_authentication_spec.py | 65 ---------- ...est_using_web_token_authentication_spec.py | 53 -------- zitadel_client/auth/oauth_authenticator.py | 3 +- 9 files changed, 319 insertions(+), 196 deletions(-) create mode 100644 spec/auth/using_access_token_spec.py create mode 100644 spec/auth/using_client_credentials_spec.py create mode 100644 spec/auth/using_private_key_spec.py rename spec/{sdk_test_to_sanity_check_the_session_service_spec.py => check_session_service_spec.py} (85%) create mode 100644 spec/check_user_service_spec.py delete mode 100644 spec/sdk_test_using_client_credentials_authentication_spec.py delete mode 100644 spec/sdk_test_using_personal_access_token_authentication_spec.py delete mode 100644 spec/sdk_test_using_web_token_authentication_spec.py diff --git a/spec/auth/using_access_token_spec.py b/spec/auth/using_access_token_spec.py new file mode 100644 index 00000000..83486c63 --- /dev/null +++ b/spec/auth/using_access_token_spec.py @@ -0,0 +1,55 @@ +""" +SettingsService Integration Tests (Personal Access Token) + +This suite verifies the Zitadel SettingsService API's general settings +endpoint works when authenticating via a Personal Access Token: + + 1. Retrieve general settings successfully with a valid token + 2. Expect an ApiException when using an invalid token + +Each test instantiates a new client to ensure a clean, stateless call. +""" + +import os + +import pytest + +import zitadel_client as zitadel +from zitadel_client.exceptions import OpenApiError + + +@pytest.fixture(scope="module") +def base_url() -> str: + """Provides the base URL for tests, skipping if unset.""" + url = os.getenv("BASE_URL") + if not url: + pytest.skip("Environment variable BASE_URL must be set", allow_module_level=True) + return url + + +@pytest.fixture(scope="module") +def auth_token() -> str: + """Provides the auth token for tests, skipping if unset.""" + url = os.getenv("AUTH_TOKEN") + if not url: + pytest.skip("Environment variable AUTH_TOKEN must be set", allow_module_level=True) + return url + + +def test_retrieves_general_settings_with_valid_token(base_url: str, auth_token: str) -> None: + """Retrieves general settings successfully with a valid access token.""" + client = zitadel.Zitadel.with_access_token( + base_url, + auth_token, + ) + client.settings.settings_service_get_general_settings() + + +def test_raises_api_exception_with_invalid_token(base_url: str) -> None: + """Raises ApiException when using an invalid access token.""" + client = zitadel.Zitadel.with_access_token( + base_url, + "invalid", + ) + with pytest.raises(OpenApiError): + client.settings.settings_service_get_general_settings() diff --git a/spec/auth/using_client_credentials_spec.py b/spec/auth/using_client_credentials_spec.py new file mode 100644 index 00000000..ce9bbdb9 --- /dev/null +++ b/spec/auth/using_client_credentials_spec.py @@ -0,0 +1,66 @@ +""" +SettingsService Integration Tests (Client Credentials) + +This suite verifies the Zitadel SettingsService API's general settings +endpoint works when authenticating via Client Credentials: + + 1. Retrieve general settings successfully with valid credentials + 2. Expect an ApiException when using invalid credentials + +Each test instantiates a new client to ensure a clean, stateless call. +""" + +import os + +import pytest + +import zitadel_client as zitadel +from zitadel_client.exceptions import OpenApiError + + +@pytest.fixture(scope="module") +def base_url() -> str: + """Provides the base URL for tests, skipping if unset.""" + url = os.getenv("BASE_URL") + if not url: + pytest.skip("Environment variable BASE_URL must be set", allow_module_level=True) + return url + + +@pytest.fixture(scope="module") +def client_id() -> str: + """Provides the client ID for tests, skipping if unset.""" + cid = os.getenv("CLIENT_ID") + if not cid: + pytest.skip("Environment variable CLIENT_ID must be set", allow_module_level=True) + return cid + + +@pytest.fixture(scope="module") +def client_secret() -> str: + """Provides the client secret for tests, skipping if unset.""" + cs = os.getenv("CLIENT_SECRET") + if not cs: + pytest.skip("Environment variable CLIENT_SECRET must be set", allow_module_level=True) + return cs + + +def test_retrieves_general_settings_with_valid_client_credentials(base_url: str, client_id: str, client_secret: str) -> None: + """Retrieves general settings successfully with valid client credentials.""" + client = zitadel.Zitadel.with_client_credentials( + base_url, + client_id, + client_secret, + ) + client.settings.settings_service_get_general_settings() + + +def test_raises_api_exception_with_invalid_client_credentials(base_url: str) -> None: + """Raises ApiException when using invalid client credentials.""" + client = zitadel.Zitadel.with_client_credentials( + base_url, + "invalid", + "invalid", + ) + with pytest.raises(OpenApiError): + client.settings.settings_service_get_general_settings() diff --git a/spec/auth/using_private_key_spec.py b/spec/auth/using_private_key_spec.py new file mode 100644 index 00000000..eb2480e4 --- /dev/null +++ b/spec/auth/using_private_key_spec.py @@ -0,0 +1,56 @@ +""" +SettingsService Integration Tests (Private Key Assertion) + +This suite verifies the Zitadel SettingsService API's general settings +endpoint works when authenticating via a private key assertion: + + 1. Retrieve general settings successfully with a valid private key + 2. Expect an ApiException when using an invalid private key path + +Each test instantiates a new client to ensure a clean, stateless call. +""" + +import os +import pathlib +from pathlib import Path + +import pytest + +import zitadel_client as zitadel +from zitadel_client import OpenApiError + + +@pytest.fixture(scope="module") +def base_url() -> str: + """Provides the base URL for tests, skipping if unset.""" + url = os.getenv("BASE_URL") + if not url: + pytest.skip("Environment variable BASE_URL must be set", allow_module_level=True) + return url + + +@pytest.fixture +def key_file(tmp_path: pathlib.Path) -> str: + raw: str = os.getenv("JWT_KEY") or "" + file_path: pathlib.Path = tmp_path / "jwt.json" + file_path.write_text(raw) + return str(file_path) + + +def test_retrieves_general_settings_with_valid_private_key(base_url: str, key_file: str) -> None: + """Retrieves general settings successfully with a valid private key.""" + client = zitadel.Zitadel.with_private_key( + base_url, + key_file, + ) + client.settings.settings_service_get_general_settings() + + +def test_raises_api_exception_with_invalid_private_key(key_file: str) -> None: + """Raises ApiException when using an invalid private key path.""" + client = zitadel.Zitadel.with_private_key( + "https://zitadel.cloud", + key_file, + ) + with pytest.raises(OpenApiError): + client.settings.settings_service_get_general_settings() diff --git a/spec/sdk_test_to_sanity_check_the_session_service_spec.py b/spec/check_session_service_spec.py similarity index 85% rename from spec/sdk_test_to_sanity_check_the_session_service_spec.py rename to spec/check_session_service_spec.py index 1efd4220..516a76cb 100644 --- a/spec/sdk_test_to_sanity_check_the_session_service_spec.py +++ b/spec/check_session_service_spec.py @@ -36,21 +36,28 @@ ) -def pytest_configure() -> None: - """Skip tests if required environment variables are missing.""" - if not os.getenv("AUTH_TOKEN") or not os.getenv("BASE_URL"): - pytest.skip( - "Environment variables AUTH_TOKEN and BASE_URL must be set", - allow_module_level=True, - ) +@pytest.fixture(scope="module") +def base_url() -> str: + """Provides the base URL for tests, skipping if unset.""" + url = os.getenv("BASE_URL") + if not url: + pytest.skip("Environment variable BASE_URL must be set", allow_module_level=True) + return url + + +@pytest.fixture(scope="module") +def auth_token() -> str: + """Provides a valid personal access token, skipping if unset.""" + token = os.getenv("AUTH_TOKEN") + if not token: + pytest.skip("Environment variable AUTH_TOKEN must be set", allow_module_level=True) + return token @pytest.fixture(scope="module") -def client() -> zitadel.Zitadel: +def client(base_url: str, auth_token: str) -> zitadel.Zitadel: """Provides a Zitadel client configured with a personal access token.""" - token: str = os.getenv("AUTH_TOKEN") # type: ignore - base_url: str = os.getenv("BASE_URL") # type: ignore - return zitadel.Zitadel.with_access_token(base_url, token) + return zitadel.Zitadel.with_access_token(base_url, auth_token) @pytest.fixture diff --git a/spec/check_user_service_spec.py b/spec/check_user_service_spec.py new file mode 100644 index 00000000..28c239d4 --- /dev/null +++ b/spec/check_user_service_spec.py @@ -0,0 +1,122 @@ +""" +UserService Integration Tests + +This suite verifies the Zitadel UserService API's basic operations using a +personal access token: + + 1. Create a human user + 2. Retrieve the user by ID + 3. List users and ensure the created user appears + 4. Update the user's email and confirm the change + 5. Error when retrieving a non-existent user + +Each test runs in isolation: a new user is created in the `user` fixture and +removed after the test to ensure a clean state. +""" + +import os +import uuid +from typing import Generator + +import pytest + +import zitadel_client as zitadel +from zitadel_client.exceptions import ApiException +from zitadel_client.models import ( + UserServiceAddHumanUserRequest, + UserServiceAddHumanUserResponse, + UserServiceGetUserByIDResponse, + UserServiceListUsersRequest, + UserServiceSetHumanEmail, + UserServiceSetHumanProfile, + UserServiceUpdateHumanUserRequest, +) + + +@pytest.fixture(scope="module") +def base_url() -> str: + """Provides the base URL for tests, skipping if unset.""" + url = os.getenv("BASE_URL") + if not url: + pytest.skip("Environment variable BASE_URL must be set", allow_module_level=True) + return url + + +@pytest.fixture(scope="module") +def auth_token() -> str: + """Provides a valid personal access token, skipping if unset.""" + token = os.getenv("AUTH_TOKEN") + if not token: + pytest.skip("Environment variable AUTH_TOKEN must be set", allow_module_level=True) + return token + + +@pytest.fixture(scope="module") +def client(base_url: str, auth_token: str) -> zitadel.Zitadel: + """Provides a Zitadel client configured with a personal access token.""" + return zitadel.Zitadel.with_access_token(base_url, auth_token) + + +@pytest.fixture +def user(client: zitadel.Zitadel) -> Generator[UserServiceAddHumanUserResponse, None, None]: + """Creates a fresh human user for each test and cleans up afterward.""" + request = UserServiceAddHumanUserRequest( + username=uuid.uuid4().hex, + profile=UserServiceSetHumanProfile(given_name="John", family_name="Doe"), # type: ignore[call-arg] + email=UserServiceSetHumanEmail(email=f"johndoe{uuid.uuid4().hex}@example.com"), + ) + response = client.users.user_service_add_human_user(request) + yield response + try: + client.users.user_service_delete_user(response.user_id) # type: ignore[arg-type] + except ApiException: + pass + + +class TestUserServiceSanityCheckSpec: + """ + Integration tests for the UserService endpoints. + + Verifies create, get, list, set, and error behaviors for users. + """ + + def test_retrieves_user_details_by_id( + self, + client: zitadel.Zitadel, + user: UserServiceAddHumanUserResponse, + ) -> None: + """Retrieves the user details by ID.""" + response: UserServiceGetUserByIDResponse = client.users.user_service_get_user_by_id(user.user_id) # type: ignore[arg-type] + assert response.user.user_id == user.user_id # type: ignore[union-attr] + + def test_includes_created_user_when_listing( + self, + client: zitadel.Zitadel, + user: UserServiceAddHumanUserResponse, + ) -> None: + """Includes the created user when listing all users.""" + request = UserServiceListUsersRequest(queries=[]) + response = client.users.user_service_list_users(request) + ids = [u.user_id for u in response.result] # type: ignore + assert user.user_id in ids + + def test_updates_user_email_and_reflects_in_get( + self, + client: zitadel.Zitadel, + user: UserServiceAddHumanUserResponse, + ) -> None: + """Updates the user's email and verifies the change.""" + client.users.user_service_update_human_user( + user.user_id, # type: ignore[arg-type] + UserServiceUpdateHumanUserRequest(email=UserServiceSetHumanEmail(email=f"updated{uuid.uuid4().hex}@example.com")), + ) + response = client.users.user_service_get_user_by_id(user.user_id) # type: ignore[arg-type] + assert "updated" in response.user.human.email.email # type: ignore + + def test_raises_api_exception_for_nonexistent_user( + self, + client: zitadel.Zitadel, + ) -> None: + """Raises an ApiException when retrieving a non-existent user.""" + with pytest.raises(ApiException): + client.users.user_service_get_user_by_id(str(uuid.uuid4())) diff --git a/spec/sdk_test_using_client_credentials_authentication_spec.py b/spec/sdk_test_using_client_credentials_authentication_spec.py deleted file mode 100644 index 7e3a16b2..00000000 --- a/spec/sdk_test_using_client_credentials_authentication_spec.py +++ /dev/null @@ -1,66 +0,0 @@ -import os -import uuid - -import pytest - -import zitadel_client as zitadel - - -@pytest.fixture -def client_id() -> str | None: - """Fixture to return a valid personal access token.""" - return os.getenv("CLIENT_ID") - - -@pytest.fixture -def client_secret() -> str | None: - """Fixture to return a valid personal access token.""" - return os.getenv("CLIENT_SECRET") - - -@pytest.fixture -def base_url() -> str | None: - """Fixture to return the base URL.""" - return os.getenv("BASE_URL") - - -@pytest.fixture -def user_id(client_id: str, client_secret: str, base_url: str) -> str | None: - """Fixture to create a user and return their ID.""" - with zitadel.Zitadel.with_client_credentials(base_url, client_id, client_secret) as client: - try: - response = client.users.user_service_add_human_user( - zitadel.models.UserServiceAddHumanUserRequest( - username=uuid.uuid4().hex, - profile=zitadel.models.UserServiceSetHumanProfile(given_name="John", family_name="Doe"), # type: ignore[call-arg] - email=zitadel.models.UserServiceSetHumanEmail(email=f"johndoe{uuid.uuid4().hex}@caos.ag"), - ) - ) - return response.user_id - except Exception as e: - pytest.fail(f"Exception while creating user: {e}") - - -def test_should_deactivate_and_reactivate_user_with_valid_token( - user_id: str, client_id: str, client_secret: str, base_url: str -) -> None: - """Test to (de)activate the user with a valid token.""" - with zitadel.Zitadel.with_client_credentials(base_url, client_id, client_secret) as client: - try: - deactivate_response = client.users.user_service_deactivate_user(user_id=user_id) - assert deactivate_response is not None, "Deactivation response is None" - - reactivate_response = client.users.user_service_reactivate_user(user_id=user_id) - assert reactivate_response is not None, "Reactivation response is None" - except Exception as e: - pytest.fail(f"Exception when calling deactivate_user or reactivate_user with valid token: {e}") - - -def test_should_not_deactivate_or_reactivate_user_with_invalid_token(user_id: str, base_url: str) -> None: - """Test to attempt (de)activating the user with an invalid token.""" - with zitadel.Zitadel.with_client_credentials(base_url, "id", "secret") as client: - with pytest.raises(Exception, match="Failed to refresh token: invalid_client: client not found"): - client.users.user_service_deactivate_user(user_id=user_id) - - with pytest.raises(Exception, match="Failed to refresh token: invalid_client: client not found"): - client.users.user_service_reactivate_user(user_id=user_id) diff --git a/spec/sdk_test_using_personal_access_token_authentication_spec.py b/spec/sdk_test_using_personal_access_token_authentication_spec.py deleted file mode 100644 index e1a300ba..00000000 --- a/spec/sdk_test_using_personal_access_token_authentication_spec.py +++ /dev/null @@ -1,65 +0,0 @@ -import os -import uuid - -import pytest - -import zitadel_client as zitadel -from zitadel_client.exceptions import UnauthorizedException - - -@pytest.fixture -def valid_token() -> str | None: - """Fixture to return a valid personal access token.""" - return os.getenv("AUTH_TOKEN") - - -@pytest.fixture -def invalid_token() -> str | None: - """Fixture to return an invalid token.""" - return "whoops" - - -@pytest.fixture -def base_url() -> str | None: - """Fixture to return the base URL.""" - return os.getenv("BASE_URL") - - -@pytest.fixture -def user_id(valid_token: str, base_url: str) -> str | None: - """Fixture to create a user and return their ID.""" - with zitadel.Zitadel.with_access_token(base_url, valid_token) as client: - try: - response = client.users.user_service_add_human_user( - zitadel.models.UserServiceAddHumanUserRequest( - username=uuid.uuid4().hex, - profile=zitadel.models.UserServiceSetHumanProfile(given_name="John", family_name="Doe"), # type: ignore[call-arg] - email=zitadel.models.UserServiceSetHumanEmail(email=f"johndoe{uuid.uuid4().hex}@caos.ag"), - ) - ) - return response.user_id - except Exception as e: - pytest.fail(f"Exception while creating user: {e}") - - -def test_should_deactivate_and_reactivate_user_with_valid_token(user_id: str, valid_token: str, base_url: str) -> None: - """Test to (de)activate the user with a valid token.""" - with zitadel.Zitadel.with_access_token(base_url, valid_token) as client: - try: - deactivate_response = client.users.user_service_deactivate_user(user_id=user_id) - assert deactivate_response is not None, "Deactivation response is None" - - reactivate_response = client.users.user_service_reactivate_user(user_id=user_id) - assert reactivate_response is not None, "Reactivation response is None" - except Exception as e: - pytest.fail(f"Exception when calling deactivate_user or reactivate_user with valid token: {e}") - - -def test_should_not_deactivate_or_reactivate_user_with_invalid_token(user_id: str, invalid_token: str, base_url: str) -> None: - """Test to attempt (de)activating the user with an invalid token.""" - with zitadel.Zitadel.with_access_token(base_url, invalid_token) as client: - with pytest.raises(UnauthorizedException): - client.users.user_service_deactivate_user(user_id=user_id) - - with pytest.raises(UnauthorizedException): - client.users.user_service_reactivate_user(user_id=user_id) diff --git a/spec/sdk_test_using_web_token_authentication_spec.py b/spec/sdk_test_using_web_token_authentication_spec.py deleted file mode 100644 index 79e273e8..00000000 --- a/spec/sdk_test_using_web_token_authentication_spec.py +++ /dev/null @@ -1,53 +0,0 @@ -import os -import tempfile -import uuid - -import pytest - -import zitadel_client as zitadel - - -@pytest.fixture -def key_file() -> str: - jwt_key = os.getenv("JWT_KEY") - if jwt_key is None: - pytest.fail("JWT_KEY is not set in the environment") - with tempfile.NamedTemporaryFile(delete=False, mode="w") as tf: - tf.write(jwt_key) - return tf.name - - -@pytest.fixture -def base_url() -> str | None: - """Fixture to return the base URL.""" - return os.getenv("BASE_URL") - - -@pytest.fixture -def user_id(key_file: str, base_url: str) -> str | None: - """Fixture to create a user and return their ID.""" - with zitadel.Zitadel.with_private_key(base_url, key_file) as client: - try: - response = client.users.user_service_add_human_user( - zitadel.models.UserServiceAddHumanUserRequest( - username=uuid.uuid4().hex, - profile=zitadel.models.UserServiceSetHumanProfile(given_name="John", family_name="Doe"), # type: ignore[call-arg] - email=zitadel.models.UserServiceSetHumanEmail(email=f"johndoe{uuid.uuid4().hex}@caos.ag"), - ) - ) - return response.user_id - except Exception as e: - pytest.fail(f"Exception while creating user: {e}") - - -def test_should_deactivate_and_reactivate_user_with_valid_token(user_id: str, key_file: str, base_url: str) -> None: - """Test to (de)activate the user with a valid token.""" - with zitadel.Zitadel.with_private_key(base_url, key_file) as client: - try: - deactivate_response = client.users.user_service_deactivate_user(user_id=user_id) - assert deactivate_response is not None, "Deactivation response is None" - - reactivate_response = client.users.user_service_reactivate_user(user_id=user_id) - assert reactivate_response is not None, "Reactivation response is None" - except Exception as e: - pytest.fail(f"Exception when calling deactivate_user or reactivate_user with valid token: {e}") diff --git a/zitadel_client/auth/oauth_authenticator.py b/zitadel_client/auth/oauth_authenticator.py index b3a4780d..80ef1b7f 100644 --- a/zitadel_client/auth/oauth_authenticator.py +++ b/zitadel_client/auth/oauth_authenticator.py @@ -4,6 +4,7 @@ from authlib.integrations.requests_client import OAuth2Session +from zitadel_client import ApiException, OpenApiError from zitadel_client.auth.authenticator import Authenticator, Token from zitadel_client.auth.open_id import OpenId @@ -71,7 +72,7 @@ def refresh_token(self) -> Token: self.token = Token(access_token, expires_at) return self.token except Exception as e: - raise Exception("Failed to refresh token: " + str(e)) from e + raise OpenApiError("Failed to refresh token: " + str(e)) from e T = TypeVar("T", bound="OAuthAuthenticatorBuilder[Any]") From 13cac6a63f296f41a5697950b3a6b3bcb724bb43 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 24 Apr 2025 07:09:31 +0000 Subject: [PATCH 5/7] style: Apply automated code formatting [skip ci] --- spec/check_user_service_spec.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/spec/check_user_service_spec.py b/spec/check_user_service_spec.py index 28c239d4..ee47aed1 100644 --- a/spec/check_user_service_spec.py +++ b/spec/check_user_service_spec.py @@ -68,7 +68,7 @@ def user(client: zitadel.Zitadel) -> Generator[UserServiceAddHumanUserResponse, response = client.users.user_service_add_human_user(request) yield response try: - client.users.user_service_delete_user(response.user_id) # type: ignore[arg-type] + client.users.user_service_delete_user(response.user_id) # type: ignore[arg-type] except ApiException: pass @@ -86,8 +86,8 @@ def test_retrieves_user_details_by_id( user: UserServiceAddHumanUserResponse, ) -> None: """Retrieves the user details by ID.""" - response: UserServiceGetUserByIDResponse = client.users.user_service_get_user_by_id(user.user_id) # type: ignore[arg-type] - assert response.user.user_id == user.user_id # type: ignore[union-attr] + response: UserServiceGetUserByIDResponse = client.users.user_service_get_user_by_id(user.user_id) # type: ignore[arg-type] + assert response.user.user_id == user.user_id # type: ignore[union-attr] def test_includes_created_user_when_listing( self, @@ -97,7 +97,7 @@ def test_includes_created_user_when_listing( """Includes the created user when listing all users.""" request = UserServiceListUsersRequest(queries=[]) response = client.users.user_service_list_users(request) - ids = [u.user_id for u in response.result] # type: ignore + ids = [u.user_id for u in response.result] # type: ignore assert user.user_id in ids def test_updates_user_email_and_reflects_in_get( @@ -107,11 +107,11 @@ def test_updates_user_email_and_reflects_in_get( ) -> None: """Updates the user's email and verifies the change.""" client.users.user_service_update_human_user( - user.user_id, # type: ignore[arg-type] + user.user_id, # type: ignore[arg-type] UserServiceUpdateHumanUserRequest(email=UserServiceSetHumanEmail(email=f"updated{uuid.uuid4().hex}@example.com")), ) - response = client.users.user_service_get_user_by_id(user.user_id) # type: ignore[arg-type] - assert "updated" in response.user.human.email.email # type: ignore + response = client.users.user_service_get_user_by_id(user.user_id) # type: ignore[arg-type] + assert "updated" in response.user.human.email.email # type: ignore def test_raises_api_exception_for_nonexistent_user( self, From 5081582a1eaa0faa2a0d1697ab896f0aa1c07264 Mon Sep 17 00:00:00 2001 From: Mridang Agarwalla Date: Thu, 24 Apr 2025 10:09:37 +0300 Subject: [PATCH 6/7] Added more tests to better verify the auth --- spec/auth/using_private_key_spec.py | 1 - spec/check_user_service_spec.py | 14 +++++++------- zitadel_client/auth/oauth_authenticator.py | 2 +- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/spec/auth/using_private_key_spec.py b/spec/auth/using_private_key_spec.py index eb2480e4..f2354e0f 100644 --- a/spec/auth/using_private_key_spec.py +++ b/spec/auth/using_private_key_spec.py @@ -12,7 +12,6 @@ import os import pathlib -from pathlib import Path import pytest diff --git a/spec/check_user_service_spec.py b/spec/check_user_service_spec.py index 28c239d4..ee47aed1 100644 --- a/spec/check_user_service_spec.py +++ b/spec/check_user_service_spec.py @@ -68,7 +68,7 @@ def user(client: zitadel.Zitadel) -> Generator[UserServiceAddHumanUserResponse, response = client.users.user_service_add_human_user(request) yield response try: - client.users.user_service_delete_user(response.user_id) # type: ignore[arg-type] + client.users.user_service_delete_user(response.user_id) # type: ignore[arg-type] except ApiException: pass @@ -86,8 +86,8 @@ def test_retrieves_user_details_by_id( user: UserServiceAddHumanUserResponse, ) -> None: """Retrieves the user details by ID.""" - response: UserServiceGetUserByIDResponse = client.users.user_service_get_user_by_id(user.user_id) # type: ignore[arg-type] - assert response.user.user_id == user.user_id # type: ignore[union-attr] + response: UserServiceGetUserByIDResponse = client.users.user_service_get_user_by_id(user.user_id) # type: ignore[arg-type] + assert response.user.user_id == user.user_id # type: ignore[union-attr] def test_includes_created_user_when_listing( self, @@ -97,7 +97,7 @@ def test_includes_created_user_when_listing( """Includes the created user when listing all users.""" request = UserServiceListUsersRequest(queries=[]) response = client.users.user_service_list_users(request) - ids = [u.user_id for u in response.result] # type: ignore + ids = [u.user_id for u in response.result] # type: ignore assert user.user_id in ids def test_updates_user_email_and_reflects_in_get( @@ -107,11 +107,11 @@ def test_updates_user_email_and_reflects_in_get( ) -> None: """Updates the user's email and verifies the change.""" client.users.user_service_update_human_user( - user.user_id, # type: ignore[arg-type] + user.user_id, # type: ignore[arg-type] UserServiceUpdateHumanUserRequest(email=UserServiceSetHumanEmail(email=f"updated{uuid.uuid4().hex}@example.com")), ) - response = client.users.user_service_get_user_by_id(user.user_id) # type: ignore[arg-type] - assert "updated" in response.user.human.email.email # type: ignore + response = client.users.user_service_get_user_by_id(user.user_id) # type: ignore[arg-type] + assert "updated" in response.user.human.email.email # type: ignore def test_raises_api_exception_for_nonexistent_user( self, diff --git a/zitadel_client/auth/oauth_authenticator.py b/zitadel_client/auth/oauth_authenticator.py index 80ef1b7f..24d54017 100644 --- a/zitadel_client/auth/oauth_authenticator.py +++ b/zitadel_client/auth/oauth_authenticator.py @@ -4,7 +4,7 @@ from authlib.integrations.requests_client import OAuth2Session -from zitadel_client import ApiException, OpenApiError +from zitadel_client import OpenApiError from zitadel_client.auth.authenticator import Authenticator, Token from zitadel_client.auth.open_id import OpenId From 18f29c53f22ed475fece4e82c68cbaf676ca5828 Mon Sep 17 00:00:00 2001 From: Mridang Agarwalla Date: Thu, 24 Apr 2025 10:50:24 +0300 Subject: [PATCH 7/7] Encapsulated all the specificiation tests into classes --- spec/auth/using_access_token_spec.py | 59 +++++++++++--------- spec/auth/using_client_credentials_spec.py | 64 ++++++++++++---------- spec/auth/using_private_key_spec.py | 59 +++++++++++--------- spec/check_session_service_spec.py | 30 ++++------ spec/check_user_service_spec.py | 30 ++++------ 5 files changed, 126 insertions(+), 116 deletions(-) diff --git a/spec/auth/using_access_token_spec.py b/spec/auth/using_access_token_spec.py index 83486c63..66656912 100644 --- a/spec/auth/using_access_token_spec.py +++ b/spec/auth/using_access_token_spec.py @@ -1,15 +1,3 @@ -""" -SettingsService Integration Tests (Personal Access Token) - -This suite verifies the Zitadel SettingsService API's general settings -endpoint works when authenticating via a Personal Access Token: - - 1. Retrieve general settings successfully with a valid token - 2. Expect an ApiException when using an invalid token - -Each test instantiates a new client to ensure a clean, stateless call. -""" - import os import pytest @@ -36,20 +24,39 @@ def auth_token() -> str: return url -def test_retrieves_general_settings_with_valid_token(base_url: str, auth_token: str) -> None: - """Retrieves general settings successfully with a valid access token.""" - client = zitadel.Zitadel.with_access_token( - base_url, - auth_token, - ) - client.settings.settings_service_get_general_settings() +class TestUseAccessTokenSpec: + """ + SettingsService Integration Tests (Personal Access Token) + This suite verifies the Zitadel SettingsService API's general settings + endpoint works when authenticating via a Personal Access Token: -def test_raises_api_exception_with_invalid_token(base_url: str) -> None: - """Raises ApiException when using an invalid access token.""" - client = zitadel.Zitadel.with_access_token( - base_url, - "invalid", - ) - with pytest.raises(OpenApiError): + 1. Retrieve general settings successfully with a valid token + 2. Expect an ApiException when using an invalid token + + Each test instantiates a new client to ensure a clean, stateless call. + """ + + def test_retrieves_general_settings_with_valid_token( + self, + base_url: str, + auth_token: str, + ) -> None: + """Retrieves general settings successfully with a valid access token.""" + client = zitadel.Zitadel.with_access_token( + base_url, + auth_token, + ) client.settings.settings_service_get_general_settings() + + def test_raises_api_exception_with_invalid_token( + self, + base_url: str, + ) -> None: + """Raises ApiException when using an invalid access token.""" + client = zitadel.Zitadel.with_access_token( + base_url, + "invalid", + ) + with pytest.raises(OpenApiError): + client.settings.settings_service_get_general_settings() diff --git a/spec/auth/using_client_credentials_spec.py b/spec/auth/using_client_credentials_spec.py index ce9bbdb9..e42dee7a 100644 --- a/spec/auth/using_client_credentials_spec.py +++ b/spec/auth/using_client_credentials_spec.py @@ -1,15 +1,3 @@ -""" -SettingsService Integration Tests (Client Credentials) - -This suite verifies the Zitadel SettingsService API's general settings -endpoint works when authenticating via Client Credentials: - - 1. Retrieve general settings successfully with valid credentials - 2. Expect an ApiException when using invalid credentials - -Each test instantiates a new client to ensure a clean, stateless call. -""" - import os import pytest @@ -45,22 +33,42 @@ def client_secret() -> str: return cs -def test_retrieves_general_settings_with_valid_client_credentials(base_url: str, client_id: str, client_secret: str) -> None: - """Retrieves general settings successfully with valid client credentials.""" - client = zitadel.Zitadel.with_client_credentials( - base_url, - client_id, - client_secret, - ) - client.settings.settings_service_get_general_settings() +class TestUseClientCredentialsSpec: + """ + SettingsService Integration Tests (Client Credentials) + This suite verifies the Zitadel SettingsService API's general settings + endpoint works when authenticating via Client Credentials: -def test_raises_api_exception_with_invalid_client_credentials(base_url: str) -> None: - """Raises ApiException when using invalid client credentials.""" - client = zitadel.Zitadel.with_client_credentials( - base_url, - "invalid", - "invalid", - ) - with pytest.raises(OpenApiError): + 1. Retrieve general settings successfully with valid credentials + 2. Expect an ApiException when using invalid credentials + + Each test instantiates a new client to ensure a clean, stateless call. + """ + + def test_retrieves_general_settings_with_valid_client_credentials( + self, + base_url: str, + client_id: str, + client_secret: str, + ) -> None: + """Retrieves general settings successfully with valid client credentials.""" + client = zitadel.Zitadel.with_client_credentials( + base_url, + client_id, + client_secret, + ) client.settings.settings_service_get_general_settings() + + def test_raises_api_exception_with_invalid_client_credentials( + self, + base_url: str, + ) -> None: + """Raises ApiException when using invalid client credentials.""" + client = zitadel.Zitadel.with_client_credentials( + base_url, + "invalid", + "invalid", + ) + with pytest.raises(OpenApiError): + client.settings.settings_service_get_general_settings() diff --git a/spec/auth/using_private_key_spec.py b/spec/auth/using_private_key_spec.py index f2354e0f..639bba49 100644 --- a/spec/auth/using_private_key_spec.py +++ b/spec/auth/using_private_key_spec.py @@ -1,15 +1,3 @@ -""" -SettingsService Integration Tests (Private Key Assertion) - -This suite verifies the Zitadel SettingsService API's general settings -endpoint works when authenticating via a private key assertion: - - 1. Retrieve general settings successfully with a valid private key - 2. Expect an ApiException when using an invalid private key path - -Each test instantiates a new client to ensure a clean, stateless call. -""" - import os import pathlib @@ -36,20 +24,39 @@ def key_file(tmp_path: pathlib.Path) -> str: return str(file_path) -def test_retrieves_general_settings_with_valid_private_key(base_url: str, key_file: str) -> None: - """Retrieves general settings successfully with a valid private key.""" - client = zitadel.Zitadel.with_private_key( - base_url, - key_file, - ) - client.settings.settings_service_get_general_settings() +class TestUsePrivateKeySpec: + """ + SettingsService Integration Tests (Private Key Assertion) + This suite verifies the Zitadel SettingsService API's general settings + endpoint works when authenticating via a private key assertion: -def test_raises_api_exception_with_invalid_private_key(key_file: str) -> None: - """Raises ApiException when using an invalid private key path.""" - client = zitadel.Zitadel.with_private_key( - "https://zitadel.cloud", - key_file, - ) - with pytest.raises(OpenApiError): + 1. Retrieve general settings successfully with a valid private key + 2. Expect an ApiException when using an invalid private key path + + Each test instantiates a new client to ensure a clean, stateless call. + """ + + def test_retrieves_general_settings_with_valid_private_key( + self, + base_url: str, + key_file: str, + ) -> None: + """Retrieves general settings successfully with a valid private key.""" + client = zitadel.Zitadel.with_private_key( + base_url, + key_file, + ) client.settings.settings_service_get_general_settings() + + def test_raises_api_exception_with_invalid_private_key( + self, + key_file: str, + ) -> None: + """Raises ApiException when using an invalid private key path.""" + client = zitadel.Zitadel.with_private_key( + "https://zitadel.cloud", + key_file, + ) + with pytest.raises(OpenApiError): + client.settings.settings_service_get_general_settings() diff --git a/spec/check_session_service_spec.py b/spec/check_session_service_spec.py index 516a76cb..e62977ef 100644 --- a/spec/check_session_service_spec.py +++ b/spec/check_session_service_spec.py @@ -1,19 +1,3 @@ -""" -SessionService Integration Tests - -This suite verifies the Zitadel SessionService API's basic operations using a -personal access token: - - 1. Create a session with specified checks and lifetime - 2. Retrieve the session by ID - 3. List sessions and ensure the created session appears - 4. Update the session's lifetime and confirm a new token is returned - 5. Error when retrieving a non-existent session - -Each test runs in isolation: a new session is created in the `session` fixture and -deleted after the test to ensure a clean state. -""" - import os import uuid from typing import Generator @@ -82,9 +66,19 @@ def session(client: zitadel.Zitadel) -> Generator[SessionServiceCreateSessionRes class TestSessionServiceSanityCheckSpec: """ - Integration tests for the SessionService endpoints. + SessionService Integration Tests + + This suite verifies the Zitadel SessionService API's basic operations using a + personal access token: + + 1. Create a session with specified checks and lifetime + 2. Retrieve the session by ID + 3. List sessions and ensure the created session appears + 4. Update the session's lifetime and confirm a new token is returned + 5. Error when retrieving a non-existent session - Verifies create, get, list, set, and error behaviors for sessions. + Each test runs in isolation: a new session is created in the `session` fixture and + deleted after the test to ensure a clean state. """ def test_retrieves_session_details_by_id( diff --git a/spec/check_user_service_spec.py b/spec/check_user_service_spec.py index ee47aed1..a43d617d 100644 --- a/spec/check_user_service_spec.py +++ b/spec/check_user_service_spec.py @@ -1,19 +1,3 @@ -""" -UserService Integration Tests - -This suite verifies the Zitadel UserService API's basic operations using a -personal access token: - - 1. Create a human user - 2. Retrieve the user by ID - 3. List users and ensure the created user appears - 4. Update the user's email and confirm the change - 5. Error when retrieving a non-existent user - -Each test runs in isolation: a new user is created in the `user` fixture and -removed after the test to ensure a clean state. -""" - import os import uuid from typing import Generator @@ -75,9 +59,19 @@ def user(client: zitadel.Zitadel) -> Generator[UserServiceAddHumanUserResponse, class TestUserServiceSanityCheckSpec: """ - Integration tests for the UserService endpoints. + UserService Integration Tests + + This suite verifies the Zitadel UserService API's basic operations using a + personal access token: + + 1. Create a human user + 2. Retrieve the user by ID + 3. List users and ensure the created user appears + 4. Update the user's email and confirm the change + 5. Error when retrieving a non-existent user - Verifies create, get, list, set, and error behaviors for users. + Each test runs in isolation: a new user is created in the `user` fixture and + removed after the test to ensure a clean state. """ def test_retrieves_user_details_by_id(