From 998e94af0419ec790455152a916e8879744dba05 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Fri, 2 Jan 2026 18:44:48 +0200 Subject: [PATCH 1/7] auth: Extend groups to use them for fine-grained access control Happy new year! :) Implementing as mentioned in issue, also, tighten node edit authz and document groups. Centralize node edit checks (owner, group, runtime, superuser) and reuse them for batch nodeset updates. Lock down user self updates to prevent group escalation and document group management limits and workflows. Add unit tests for authz rules and self-update guard. Ref: https://github.com/kernelci/kernelci-api/issues/640 Signed-off-by: Denys Fedoryshchenko --- api/main.py | 75 +++++++++++++---- doc/api-details.md | 36 ++++++++- tests/unit_tests/test_authz_handler.py | 108 +++++++++++++++++++++++++ 3 files changed, 203 insertions(+), 16 deletions(-) create mode 100644 tests/unit_tests/test_authz_handler.py diff --git a/api/main.py b/api/main.py index 34eb5c0c..d6e5ba33 100644 --- a/api/main.py +++ b/api/main.py @@ -88,6 +88,7 @@ async def lifespan(app: FastAPI): # pylint: disable=redefined-outer-name await pubsub_startup() await create_indexes() await initialize_beanie() + await ensure_legacy_node_editors() yield # List of all the supported API versions. This is a placeholder until the API @@ -145,6 +146,23 @@ async def initialize_beanie(): await db.initialize_beanie() +async def ensure_legacy_node_editors(): + """Grant legacy node edit privileges to specific users.""" + legacy_usernames = {'staging.kernelci.org', 'production'} + group_name = 'node:edit:any' + group = await db.find_one(UserGroup, name=group_name) + if not group: + group = await db.create(UserGroup(name=group_name)) + for username in legacy_usernames: + user = await db.find_one(User, username=username) + if not user: + continue + if any(existing.name == group_name for existing in user.groups): + continue + user.groups.append(group) + await db.update(user) + + @app.exception_handler(ValueError) async def value_error_exception_handler(request: Request, exc: ValueError): """Global exception handler for 'ValueError'""" @@ -547,6 +565,11 @@ async def update_me(request: Request, user: UserUpdateRequest, its own profile. """ metrics.add('http_requests_total', 1) + if user.groups: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="User groups can only be updated by an admin user", + ) if user.username and user.username != current_user.username: existing_user = await db.find_one(User, username=user.username) if existing_user: @@ -621,6 +644,35 @@ async def update_user(user_id: str, request: Request, user: UserUpdateRequest, return updated_user +def _get_node_runtime(node: Node) -> Optional[str]: + """Best-effort runtime lookup from node data.""" + data = getattr(node, 'data', None) + if isinstance(data, dict): + return data.get('runtime') + return getattr(data, 'runtime', None) + + +def _user_can_edit_node(user: User, node: Node) -> bool: + """Return True when user can update the given node.""" + if user.is_superuser: + return True + if user.username == node.owner: + return True + user_group_names = {group.name for group in user.groups} + if 'node:edit:any' in user_group_names: + return True + if any(group_name in user_group_names + for group_name in getattr(node, 'user_groups', [])): + return True + runtime = _get_node_runtime(node) + if runtime: + runtime_editor = f'runtime:{runtime}:node-editor' + runtime_admin = f'runtime:{runtime}:node-admin' + if runtime_editor in user_group_names or runtime_admin in user_group_names: + return True + return False + + async def authorize_user(node_id: str, user: User = Depends(get_current_user)): """Return the user if active, authenticated, and authorized""" @@ -633,17 +685,11 @@ async def authorize_user(node_id: str, status_code=status.HTTP_404_NOT_FOUND, detail=f"Node not found with id: {node_id}" ) - # users staging.kernelci.org and production are superusers - # TBD: This is HACK until qualcomm can migrate to direct KCIDB - if user.username in ['staging.kernelci.org', 'production']: - return user - if not user.username == node_from_id.owner: - if not any(group.name in node_from_id.user_groups - for group in user.groups): - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Unauthorized to complete the operation" - ) + if not _user_can_edit_node(user, node_from_id): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Unauthorized to complete the operation" + ) return user @@ -1108,10 +1154,8 @@ async def put_batch_nodeset(data: NodeUpdateRequest, status_code=status.HTTP_404_NOT_FOUND, detail=f"Node not found with id: {node_id}" ) - # verify ownership, and ignore if not owner - if not user.username == node_from_id.owner\ - and user.username != 'production' and\ - user.username != 'staging.kernelci.org': + # Verify authorization, and ignore if not permitted. + if not _user_can_edit_node(user, node_from_id): continue # right now we support only field: # processed_by_kcidb_bridge, also value should be boolean @@ -1448,6 +1492,7 @@ async def purge_handler(current_user: User = Depends(get_current_superuser), pubsub_startup, create_indexes, initialize_beanie, + ensure_legacy_node_editors, start_background_tasks, ]) diff --git a/doc/api-details.md b/doc/api-details.md index a3a17b41..b1b84759 100644 --- a/doc/api-details.md +++ b/doc/api-details.md @@ -333,7 +333,7 @@ $ curl -X 'POST' \ ### Update own user account A user can update certain information for its own account, such as -`email`, `username`, `password`, and `groups` with a `PATCH /user/me` request. +`email`, `username`, and `password` with a `PATCH /user/me` request. For example, ``` $ curl -X 'PATCH' \ @@ -348,6 +348,7 @@ $ curl -X 'PATCH' \ ``` Please note that user management fields such as `is_useruser`, `is_verified`, and `is_active` can not be updated by this request for security purposes. +User group membership can only be updated by admin users. ### Update an existing user account (Admin only) @@ -365,6 +366,39 @@ $ curl -X 'PATCH' \ -d '{"email": "test-user@kernelci.org", "groups": ["kernelci"]}' ``` +### User groups and permissions + +User groups are plain name strings stored in the `usergroup` collection. Group +names must already exist before they can be assigned to users; otherwise the +API returns `400`. + +There is currently no REST endpoint for creating or deleting user groups. Use +MongoDB tooling to manage them. Example with `mongosh`: + +``` +$ mongosh "mongodb://db:27017/kernelci" +> db.usergroup.insertOne({name: "runtime:lava-collabora:node-editor"}) +``` + +Admin users can assign or remove groups via: + +- `POST /user/invite` with a `groups` list +- `PATCH /user/` with `groups` +- `scripts/usermanager.py update-user --data '{"groups": [...]}'` + +To remove a group, send a `groups` list that omits it; the list replaces the +existing groups. + +Example using the helper script: + +``` +$ ./scripts/usermanager.py list-users +$ ./scripts/usermanager.py update-user 615f30020eb7c3c6616e5ac3 \ + --data '{"groups": ["runtime:lava-collabora:node-editor"]}' +``` + +Users cannot update their own groups; admin access is required. + ### Delete user matching user ID (Admin only) diff --git a/tests/unit_tests/test_authz_handler.py b/tests/unit_tests/test_authz_handler.py new file mode 100644 index 00000000..bea2cc9f --- /dev/null +++ b/tests/unit_tests/test_authz_handler.py @@ -0,0 +1,108 @@ +# SPDX-License-Identifier: LGPL-2.1-or-later +# +# Copyright (C) 2025 Collabora Limited + +"""Unit tests for authorization helpers and user self-update.""" + +import json + +from kernelci.api.models import Node, Revision +from api.main import _user_can_edit_node +from api.models import User, UserGroup +from tests.unit_tests.conftest import BEARER_TOKEN + + +def _make_user(username="alice", is_superuser=False, groups=None): + return User( + id="65265305c74695807499037f", + username=username, + hashed_password="$2b$12$CpJZx5ooxM11bCFXT76/z.o6HWs2sPJy4iP8.xCZGmM8jWXUXJZ4L", + email=f"{username}@kernelci.org", + groups=groups or [], + is_active=True, + is_superuser=is_superuser, + is_verified=True, + ) + + +def _make_node(owner="bob", user_groups=None, runtime=None): + revision_obj = Revision( + tree="mainline", + url="https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git", + branch="master", + commit="2a987e65025e2b79c6d453b78cb5985ac6e5eb26", + describe="v5.16-rc4-31-g2a987e65025e", + ) + data = {"kernel_revision": revision_obj} + if runtime: + data["runtime"] = runtime + return Node( + id="61bda8f2eb1a63d2b7152418", + kind="checkout", + name="checkout", + path=["checkout"], + group="debug", + data=data, + parent=None, + state="closing", + result=None, + treeid="61bda8f2eb1a63d2b7152418", + owner=owner, + user_groups=user_groups or [], + ) + + +def test_user_can_edit_node_as_owner(): + user = _make_user(username="bob") + node = _make_node(owner="bob") + assert _user_can_edit_node(user, node) + + +def test_user_can_edit_node_with_matching_group(): + user = _make_user(groups=[UserGroup(name="team-a")]) + node = _make_node(user_groups=["team-a"]) + assert _user_can_edit_node(user, node) + + +def test_user_can_edit_node_with_runtime_editor_group(): + user = _make_user(groups=[UserGroup(name="runtime:lava-collabora:node-editor")]) + node = _make_node(runtime="lava-collabora") + assert _user_can_edit_node(user, node) + + +def test_user_can_edit_node_with_runtime_admin_group(): + user = _make_user(groups=[UserGroup(name="runtime:lava-collabora:node-admin")]) + node = _make_node(runtime="lava-collabora") + assert _user_can_edit_node(user, node) + + +def test_user_can_edit_node_with_global_group(): + user = _make_user(groups=[UserGroup(name="node:edit:any")]) + node = _make_node() + assert _user_can_edit_node(user, node) + + +def test_user_can_edit_node_as_superuser(): + user = _make_user(is_superuser=True) + node = _make_node() + assert _user_can_edit_node(user, node) + + +def test_user_cannot_edit_node_without_access(): + user = _make_user(username="alice") + node = _make_node(owner="bob", user_groups=["team-a"], runtime="lava-collabora") + assert not _user_can_edit_node(user, node) + + +def test_user_me_rejects_groups_update(test_client): + payload = {"groups": ["node:edit:any"]} + response = test_client.patch( + "user/me", + headers={ + "Accept": "application/json", + "Authorization": BEARER_TOKEN, + }, + data=json.dumps(payload), + ) + assert response.status_code == 400 + assert response.json()["detail"] == "User groups can only be updated by an admin user" From 5e362dabca73728d4da905ad6064c3823352eab1 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Fri, 2 Jan 2026 19:00:52 +0200 Subject: [PATCH 2/7] usermanager: make it a bit more friendly Signed-off-by: Denys Fedoryshchenko --- scripts/usermanager.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/scripts/usermanager.py b/scripts/usermanager.py index c2471f73..be1d22df 100755 --- a/scripts/usermanager.py +++ b/scripts/usermanager.py @@ -121,6 +121,7 @@ def _require_token(token, args): def main(): + default_paths = "\n".join(f" - {path}" for path in DEFAULT_CONFIG_PATHS) parser = argparse.ArgumentParser( description="KernelCI API user management helper", epilog=( @@ -132,10 +133,16 @@ def main(): " ./scripts/usermanager.py whoami\n" " ./scripts/usermanager.py list-users --instance staging\n" " ./scripts/usermanager.py print-config\n" + "\n" + "Default config lookup (first match wins):\n" + f"{default_paths}\n" ), formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument("--config", help="Path to usermanager.toml") + parser.add_argument( + "--config", + help="Path to usermanager.toml (defaults to first match in the lookup list below)", + ) parser.add_argument("--api-url", help="API base URL, e.g. " "http://localhost:8001/latest") parser.add_argument("--token", help="Bearer token for admin/user actions") From 70e545f599d52940d049068091a96c8fa8b58db1 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Fri, 2 Jan 2026 19:09:04 +0200 Subject: [PATCH 3/7] usermanager: Make friendly update-user, add ability to revoke superuser privileges Signed-off-by: Denys Fedoryshchenko --- api/main.py | 9 ++- scripts/usermanager.py | 138 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 137 insertions(+), 10 deletions(-) diff --git a/api/main.py b/api/main.py index d6e5ba33..53ba5b1b 100644 --- a/api/main.py +++ b/api/main.py @@ -588,7 +588,7 @@ async def update_me(request: Request, user: UserUpdateRequest, {group_name}") groups.append(group) user_update = UserUpdate(**(user.model_dump( - exclude={'groups'}, exclude_none=True))) + exclude={'groups', 'is_superuser'}, exclude_none=True))) if groups: user_update.groups = groups return await users_router.routes[1].endpoint( @@ -635,11 +635,10 @@ async def update_user(user_id: str, request: Request, user: UserUpdateRequest, updated_user = await users_router.routes[3].endpoint( user_update, request, user_from_id, user_manager ) - # Update user to be an admin user explicitly if requested as - # `fastapi-users` user update route does not allow it - if user.is_superuser: + # Update superuser explicitly since fastapi-users update route ignores it. + if user.is_superuser is not None: user_from_id = await db.find_by_id(User, updated_user.id) - user_from_id.is_superuser = True + user_from_id.is_superuser = user.is_superuser updated_user = await db.update(user_from_id) return updated_user diff --git a/scripts/usermanager.py b/scripts/usermanager.py index be1d22df..eedfb6e7 100755 --- a/scripts/usermanager.py +++ b/scripts/usermanager.py @@ -76,6 +76,54 @@ def _prompt_if_missing(value, prompt_text, secret=False, default=None): return response +def _parse_group_list(values): + if not values: + return [] + if isinstance(values, str): + values = [values] + groups = [] + for value in values: + for group in value.split(","): + group = group.strip() + if group: + groups.append(group) + return groups + + +def _dedupe(items): + seen = set() + output = [] + for item in items: + if item in seen: + continue + seen.add(item) + output.append(item) + return output + + +def _extract_group_names(payload): + groups = payload.get("groups") or [] + names = [] + for group in groups: + if isinstance(group, dict): + name = group.get("name") + else: + name = getattr(group, "name", None) + if name: + names.append(name) + return _dedupe(names) + + +def _apply_group_changes(current, add_groups, remove_groups): + current = _dedupe(current) + remove_set = set(remove_groups) + updated = [group for group in current if group not in remove_set] + for group in add_groups: + if group not in updated and group not in remove_set: + updated.append(group) + return updated + + def _request_json(method, url, data=None, token=None, form=False): headers = {"accept": "application/json"} body = None @@ -183,8 +231,47 @@ def main(): update_user = subparsers.add_parser("update-user", help="Patch user by id") update_user.add_argument("user_id") - update_user.add_argument("--data", required=True, + update_user.add_argument("--data", help="JSON object with fields to update") + update_user.add_argument("--username", help="Set username") + update_user.add_argument("--email", help="Set email") + update_user.add_argument("--password", help="Set password") + update_user.add_argument("--superuser", dest="is_superuser", + action="store_true", + help="Grant superuser") + update_user.add_argument("--no-superuser", dest="is_superuser", + action="store_false", + help="Revoke superuser") + update_user.add_argument("--active", dest="is_active", + action="store_true", + help="Set is_active true") + update_user.add_argument("--inactive", dest="is_active", + action="store_false", + help="Set is_active false") + update_user.add_argument("--verified", dest="is_verified", + action="store_true", + help="Set is_verified true") + update_user.add_argument("--unverified", dest="is_verified", + action="store_false", + help="Set is_verified false") + update_user.set_defaults(is_active=None, is_verified=None, + is_superuser=None) + update_user.add_argument( + "--set-groups", + help="Replace all groups with a comma-separated list", + ) + update_user.add_argument( + "--add-group", + action="append", + default=[], + help="Add group(s); can be used multiple times or with commas", + ) + update_user.add_argument( + "--remove-group", + action="append", + default=[], + help="Remove group(s); can be used multiple times or with commas", + ) delete_user = subparsers.add_parser("delete-user", help="Delete user by id") @@ -293,10 +380,51 @@ def main(): "GET", f"{api_url}/user/{args.user_id}", token=token ) elif args.command == "update-user": - try: - data = json.loads(args.data) - except json.JSONDecodeError as exc: - raise SystemExit("Invalid JSON for --data") from exc + data = {} + if args.data: + try: + data = json.loads(args.data) + except json.JSONDecodeError as exc: + raise SystemExit("Invalid JSON for --data") from exc + if not isinstance(data, dict): + raise SystemExit("--data must be a JSON object") + if args.username: + data["username"] = args.username + if args.email: + data["email"] = args.email + if args.password: + data["password"] = args.password + if args.is_superuser is not None: + data["is_superuser"] = args.is_superuser + if args.is_active is not None: + data["is_active"] = args.is_active + if args.is_verified is not None: + data["is_verified"] = args.is_verified + + set_groups = _parse_group_list(args.set_groups) + add_groups = _parse_group_list(args.add_group) + remove_groups = _parse_group_list(args.remove_group) + if set_groups or add_groups or remove_groups: + if set_groups: + current_groups = set_groups + else: + status, body = _request_json( + "GET", f"{api_url}/user/{args.user_id}", token=token + ) + if status >= 400: + _print_response(status, body) + raise SystemExit(1) + try: + payload = json.loads(body) if body else {} + except json.JSONDecodeError as exc: + raise SystemExit("Failed to parse user response") from exc + current_groups = _extract_group_names(payload) + data["groups"] = _apply_group_changes( + current_groups, add_groups, remove_groups + ) + + if not data: + raise SystemExit("No updates specified. Use --data or flags.") status, body = _request_json( "PATCH", f"{api_url}/user/{args.user_id}", data, token=token ) From 253d59c95dd9af32ee7a782fd8fd470b00deaac7 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Fri, 2 Jan 2026 19:10:37 +0200 Subject: [PATCH 4/7] usermanager: Allow to do user operations by email Signed-off-by: Denys Fedoryshchenko --- scripts/usermanager.py | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/scripts/usermanager.py b/scripts/usermanager.py index eedfb6e7..b935c15f 100755 --- a/scripts/usermanager.py +++ b/scripts/usermanager.py @@ -124,6 +124,33 @@ def _apply_group_changes(current, add_groups, remove_groups): return updated +def _resolve_user_id(user_id, api_url, token): + if "@" not in user_id: + return user_id + status, body = _request_json("GET", f"{api_url}/users", token=token) + if status >= 400: + _print_response(status, body) + raise SystemExit(1) + try: + payload = json.loads(body) if body else [] + except json.JSONDecodeError as exc: + raise SystemExit("Failed to parse users response") from exc + if not isinstance(payload, list): + raise SystemExit("Unexpected users response") + matches = [ + user for user in payload + if isinstance(user, dict) and user.get("email") == user_id + ] + if not matches: + raise SystemExit(f"No user found with email: {user_id}") + if len(matches) > 1: + raise SystemExit(f"Multiple users found with email: {user_id}") + resolved_id = matches[0].get("id") + if not resolved_id: + raise SystemExit(f"User with email {user_id} has no id") + return resolved_id + + def _request_json(method, url, data=None, token=None, form=False): headers = {"accept": "application/json"} body = None @@ -376,10 +403,12 @@ def main(): elif args.command == "list-users": status, body = _request_json("GET", f"{api_url}/users", token=token) elif args.command == "get-user": + resolved_id = _resolve_user_id(args.user_id, api_url, token) status, body = _request_json( - "GET", f"{api_url}/user/{args.user_id}", token=token + "GET", f"{api_url}/user/{resolved_id}", token=token ) elif args.command == "update-user": + resolved_id = _resolve_user_id(args.user_id, api_url, token) data = {} if args.data: try: @@ -409,7 +438,7 @@ def main(): current_groups = set_groups else: status, body = _request_json( - "GET", f"{api_url}/user/{args.user_id}", token=token + "GET", f"{api_url}/user/{resolved_id}", token=token ) if status >= 400: _print_response(status, body) @@ -426,11 +455,12 @@ def main(): if not data: raise SystemExit("No updates specified. Use --data or flags.") status, body = _request_json( - "PATCH", f"{api_url}/user/{args.user_id}", data, token=token + "PATCH", f"{api_url}/user/{resolved_id}", data, token=token ) elif args.command == "delete-user": + resolved_id = _resolve_user_id(args.user_id, api_url, token) status, body = _request_json( - "DELETE", f"{api_url}/user/{args.user_id}", token=token + "DELETE", f"{api_url}/user/{resolved_id}", token=token ) else: raise SystemExit("Unknown command") From a4e7874215282bc30cbd55dd5b371895b0b92c78 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Fri, 2 Jan 2026 19:15:22 +0200 Subject: [PATCH 5/7] usermanager: Config example adjustments for better UX Signed-off-by: Denys Fedoryshchenko --- scripts/usermanager.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/usermanager.py b/scripts/usermanager.py index b935c15f..83c03648 100755 --- a/scripts/usermanager.py +++ b/scripts/usermanager.py @@ -207,7 +207,7 @@ def main(): " ./scripts/usermanager.py login --username alice\n" " ./scripts/usermanager.py whoami\n" " ./scripts/usermanager.py list-users --instance staging\n" - " ./scripts/usermanager.py print-config\n" + " ./scripts/usermanager.py print-config-example\n" "\n" "Default config lookup (first match wins):\n" f"{default_paths}\n" @@ -304,19 +304,19 @@ def main(): help="Delete user by id") delete_user.add_argument("user_id") - subparsers.add_parser("print-config", + subparsers.add_parser("print-config-example", help="Print a sample usermanager.toml") args = parser.parse_args() - if args.command == "print-config": + if args.command == "print-config-example": print( "default_instance = \"local\"\n\n" "[instances.local]\n" "url = \"http://localhost:8001/latest\"\n" "token = \"\"\n\n" "[instances.staging]\n" - "url = \"https://staging.kernelci.org/latest\"\n" + "url = \"https://staging.kernelci.org:9000/latest\"\n" "token = \"\"\n" ) return From d30fef1e660870fa5626294172c3070bf911368a Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Fri, 2 Jan 2026 19:15:57 +0200 Subject: [PATCH 6/7] usermanager: format by black Signed-off-by: Denys Fedoryshchenko --- scripts/usermanager.py | 113 ++++++++++++++++++++++------------------- 1 file changed, 61 insertions(+), 52 deletions(-) diff --git a/scripts/usermanager.py b/scripts/usermanager.py index 83c03648..2c467a33 100755 --- a/scripts/usermanager.py +++ b/scripts/usermanager.py @@ -16,8 +16,7 @@ DEFAULT_CONFIG_PATHS = [ os.path.join(os.getcwd(), "usermanager.toml"), - os.path.join(os.path.expanduser("~"), ".config", "kernelci", - "usermanager.toml"), + os.path.join(os.path.expanduser("~"), ".config", "kernelci", "usermanager.toml"), ] @@ -138,7 +137,8 @@ def _resolve_user_id(user_id, api_url, token): if not isinstance(payload, list): raise SystemExit("Unexpected users response") matches = [ - user for user in payload + user + for user in payload if isinstance(user, dict) and user.get("email") == user_id ] if not matches: @@ -163,8 +163,7 @@ def _request_json(method, url, data=None, token=None, form=False): headers["Content-Type"] = "application/json" if token: headers["Authorization"] = f"Bearer {token}" - req = urllib.request.Request(url, data=body, headers=headers, - method=method) + req = urllib.request.Request(url, data=body, headers=headers, method=method) try: with urllib.request.urlopen(req) as response: payload = response.read().decode("utf-8") @@ -218,12 +217,14 @@ def main(): "--config", help="Path to usermanager.toml (defaults to first match in the lookup list below)", ) - parser.add_argument("--api-url", help="API base URL, e.g. " - "http://localhost:8001/latest") + parser.add_argument( + "--api-url", help="API base URL, e.g. " "http://localhost:8001/latest" + ) parser.add_argument("--token", help="Bearer token for admin/user actions") parser.add_argument("--instance", help="Instance name from config") - parser.add_argument("--token-label", default="Auth", - help="Label used when prompting for a token") + parser.add_argument( + "--token-label", default="Auth", help="Label used when prompting for a token" + ) subparsers = parser.add_subparsers(dest="command", required=True) @@ -237,8 +238,7 @@ def main(): invite.add_argument("--return-token", action="store_true") invite.add_argument("--resend-if-exists", action="store_true") - invite_url = subparsers.add_parser("invite-url", - help="Preview invite URL base") + invite_url = subparsers.add_parser("invite-url", help="Preview invite URL base") accept = subparsers.add_parser("accept-invite", help="Accept an invite") accept.add_argument("--token") @@ -255,34 +255,40 @@ def main(): get_user = subparsers.add_parser("get-user", help="Get user by id") get_user.add_argument("user_id") - update_user = subparsers.add_parser("update-user", - help="Patch user by id") + update_user = subparsers.add_parser("update-user", help="Patch user by id") update_user.add_argument("user_id") - update_user.add_argument("--data", - help="JSON object with fields to update") + update_user.add_argument("--data", help="JSON object with fields to update") update_user.add_argument("--username", help="Set username") update_user.add_argument("--email", help="Set email") update_user.add_argument("--password", help="Set password") - update_user.add_argument("--superuser", dest="is_superuser", - action="store_true", - help="Grant superuser") - update_user.add_argument("--no-superuser", dest="is_superuser", - action="store_false", - help="Revoke superuser") - update_user.add_argument("--active", dest="is_active", - action="store_true", - help="Set is_active true") - update_user.add_argument("--inactive", dest="is_active", - action="store_false", - help="Set is_active false") - update_user.add_argument("--verified", dest="is_verified", - action="store_true", - help="Set is_verified true") - update_user.add_argument("--unverified", dest="is_verified", - action="store_false", - help="Set is_verified false") - update_user.set_defaults(is_active=None, is_verified=None, - is_superuser=None) + update_user.add_argument( + "--superuser", dest="is_superuser", action="store_true", help="Grant superuser" + ) + update_user.add_argument( + "--no-superuser", + dest="is_superuser", + action="store_false", + help="Revoke superuser", + ) + update_user.add_argument( + "--active", dest="is_active", action="store_true", help="Set is_active true" + ) + update_user.add_argument( + "--inactive", dest="is_active", action="store_false", help="Set is_active false" + ) + update_user.add_argument( + "--verified", + dest="is_verified", + action="store_true", + help="Set is_verified true", + ) + update_user.add_argument( + "--unverified", + dest="is_verified", + action="store_false", + help="Set is_verified false", + ) + update_user.set_defaults(is_active=None, is_verified=None, is_superuser=None) update_user.add_argument( "--set-groups", help="Replace all groups with a comma-separated list", @@ -300,24 +306,24 @@ def main(): help="Remove group(s); can be used multiple times or with commas", ) - delete_user = subparsers.add_parser("delete-user", - help="Delete user by id") + delete_user = subparsers.add_parser("delete-user", help="Delete user by id") delete_user.add_argument("user_id") - subparsers.add_parser("print-config-example", - help="Print a sample usermanager.toml") + subparsers.add_parser( + "print-config-example", help="Print a sample usermanager.toml" + ) args = parser.parse_args() if args.command == "print-config-example": print( - "default_instance = \"local\"\n\n" + 'default_instance = "local"\n\n' "[instances.local]\n" - "url = \"http://localhost:8001/latest\"\n" - "token = \"\"\n\n" + 'url = "http://localhost:8001/latest"\n' + 'token = ""\n\n' "[instances.staging]\n" - "url = \"https://staging.kernelci.org:9000/latest\"\n" - "token = \"\"\n" + 'url = "https://staging.kernelci.org:9000/latest"\n' + 'token = ""\n' ) return @@ -348,8 +354,15 @@ def main(): if not token: token = _get_setting(None, "KCI_API_TOKEN", config, "api.token") - if args.command in {"invite", "invite-url", "whoami", "list-users", - "get-user", "update-user", "delete-user"}: + if args.command in { + "invite", + "invite-url", + "whoami", + "list-users", + "get-user", + "update-user", + "delete-user", + }: token = _require_token(token, args) if args.command == "invite": @@ -367,9 +380,7 @@ def main(): "POST", f"{api_url}/user/invite", payload, token=token ) elif args.command == "invite-url": - status, body = _request_json( - "GET", f"{api_url}/user/invite/url", token=token - ) + status, body = _request_json("GET", f"{api_url}/user/invite/url", token=token) elif args.command == "accept-invite": invite_token = _prompt_if_missing( args.token, @@ -382,9 +393,7 @@ def main(): secret=True, ) payload = {"token": invite_token, "password": password} - status, body = _request_json( - "POST", f"{api_url}/user/accept-invite", payload - ) + status, body = _request_json("POST", f"{api_url}/user/accept-invite", payload) elif args.command == "login": password = _prompt_if_missing( args.password, From 2052a6b18765156dffb8e106aee474bde0c0ae44 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Fri, 2 Jan 2026 19:20:51 +0200 Subject: [PATCH 7/7] linting fixes Signed-off-by: Denys Fedoryshchenko --- api/main.py | 3 ++- tests/unit_tests/test_authz_handler.py | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/api/main.py b/api/main.py index 53ba5b1b..158e3c58 100644 --- a/api/main.py +++ b/api/main.py @@ -667,7 +667,8 @@ def _user_can_edit_node(user: User, node: Node) -> bool: if runtime: runtime_editor = f'runtime:{runtime}:node-editor' runtime_admin = f'runtime:{runtime}:node-admin' - if runtime_editor in user_group_names or runtime_admin in user_group_names: + if (runtime_editor in user_group_names + or runtime_admin in user_group_names): return True return False diff --git a/tests/unit_tests/test_authz_handler.py b/tests/unit_tests/test_authz_handler.py index bea2cc9f..28632901 100644 --- a/tests/unit_tests/test_authz_handler.py +++ b/tests/unit_tests/test_authz_handler.py @@ -4,6 +4,8 @@ """Unit tests for authorization helpers and user self-update.""" +# pylint: disable=duplicate-code + import json from kernelci.api.models import Node, Revision @@ -13,6 +15,7 @@ def _make_user(username="alice", is_superuser=False, groups=None): + """Return a basic user fixture for authz tests.""" return User( id="65265305c74695807499037f", username=username, @@ -26,6 +29,7 @@ def _make_user(username="alice", is_superuser=False, groups=None): def _make_node(owner="bob", user_groups=None, runtime=None): + """Return a basic node fixture for authz tests.""" revision_obj = Revision( tree="mainline", url="https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git", @@ -53,48 +57,56 @@ def _make_node(owner="bob", user_groups=None, runtime=None): def test_user_can_edit_node_as_owner(): + """Owner can edit their own node.""" user = _make_user(username="bob") node = _make_node(owner="bob") assert _user_can_edit_node(user, node) def test_user_can_edit_node_with_matching_group(): + """Matching user group grants edit access.""" user = _make_user(groups=[UserGroup(name="team-a")]) node = _make_node(user_groups=["team-a"]) assert _user_can_edit_node(user, node) def test_user_can_edit_node_with_runtime_editor_group(): + """Runtime editor group grants edit access.""" user = _make_user(groups=[UserGroup(name="runtime:lava-collabora:node-editor")]) node = _make_node(runtime="lava-collabora") assert _user_can_edit_node(user, node) def test_user_can_edit_node_with_runtime_admin_group(): + """Runtime admin group grants edit access.""" user = _make_user(groups=[UserGroup(name="runtime:lava-collabora:node-admin")]) node = _make_node(runtime="lava-collabora") assert _user_can_edit_node(user, node) def test_user_can_edit_node_with_global_group(): + """Global edit group grants access.""" user = _make_user(groups=[UserGroup(name="node:edit:any")]) node = _make_node() assert _user_can_edit_node(user, node) def test_user_can_edit_node_as_superuser(): + """Superuser can edit any node.""" user = _make_user(is_superuser=True) node = _make_node() assert _user_can_edit_node(user, node) def test_user_cannot_edit_node_without_access(): + """Unrelated user cannot edit when no access applies.""" user = _make_user(username="alice") node = _make_node(owner="bob", user_groups=["team-a"], runtime="lava-collabora") assert not _user_can_edit_node(user, node) def test_user_me_rejects_groups_update(test_client): + """Self-update rejects user group changes.""" payload = {"groups": ["node:edit:any"]} response = test_client.patch( "user/me",