Skip to content
33 changes: 12 additions & 21 deletions app/api/main/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
DomainErrorTranslator,
)
from enums import DomainCodes
from ldap_protocol.custom_requests.rename import RenameRequest
from ldap_protocol.identity.exceptions import UnauthorizedError
from ldap_protocol.ldap_requests import (
AddRequest,
Expand All @@ -37,7 +38,6 @@

translator = DomainErrorTranslator(DomainCodes.LDAP)


error_map: ERROR_MAP_TYPE = {
UnauthorizedError: rule(
status=status.HTTP_401_UNAUTHORIZED,
Expand All @@ -54,10 +54,7 @@


@entry_router.post("/search", error_map=error_map)
async def search(
request: SearchRequest,
req: Request,
) -> SearchResponse:
async def search(request: SearchRequest, req: Request) -> SearchResponse:
"""LDAP SEARCH entry request."""
responses = await request.handle_api(req.state.dishka_container)
metadata: SearchResultDone = responses.pop(-1) # type: ignore
Expand All @@ -73,19 +70,13 @@ async def search(


@entry_router.post("/add", error_map=error_map)
async def add(
request: AddRequest,
req: Request,
) -> LDAPResult:
async def add(request: AddRequest, req: Request) -> LDAPResult:
"""LDAP ADD entry request."""
return await request.handle_api(req.state.dishka_container)


@entry_router.patch("/update", error_map=error_map)
async def modify(
request: ModifyRequest,
req: Request,
) -> LDAPResult:
async def modify(request: ModifyRequest, req: Request) -> LDAPResult:
"""LDAP MODIFY entry request."""
return await request.handle_api(req.state.dishka_container)

Expand All @@ -103,10 +94,7 @@ async def modify_many(


@entry_router.put("/update/dn", error_map=error_map)
async def modify_dn(
request: ModifyDNRequest,
req: Request,
) -> LDAPResult:
async def modify_dn(request: ModifyDNRequest, req: Request) -> LDAPResult:
"""LDAP MODIFY entry DN request."""
return await request.handle_api(req.state.dishka_container)

Expand All @@ -123,11 +111,14 @@ async def modify_dn_many(
return results


@entry_router.put("/rename", error_map=error_map)
async def rename(request: RenameRequest, req: Request) -> LDAPResult:
"""LDAP rename entry request."""
return await request.handle_api(req.state.dishka_container)


@entry_router.delete("/delete", error_map=error_map)
async def delete(
request: DeleteRequest,
req: Request,
) -> LDAPResult:
async def delete(request: DeleteRequest, req: Request) -> LDAPResult:
"""LDAP DELETE entry request."""
return await request.handle_api(req.state.dishka_container)

Expand Down
9 changes: 9 additions & 0 deletions app/ldap_protocol/custom_requests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Custom Requests.

Copyright (c) 2026 MultiFactor
License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE
"""

from .rename import RenameRequest

__all__ = ["RenameRequest"]
85 changes: 85 additions & 0 deletions app/ldap_protocol/custom_requests/rename.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""RenameRequest for main router.

Copyright (c) 2026 MultiFactor
License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE
"""

from dishka import AsyncContainer
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession

from ldap_protocol.ldap_requests import (
ModifyDNRequest as LDAPModifyDNRequest,
ModifyRequest as LDAPModifyRequest,
)
from ldap_protocol.ldap_responses import LDAPResult
from ldap_protocol.objects import Changes


class RenameRequest(BaseModel):
"""Rename Request. It's not RFC 4511.

Combines ModifyDN and Modify operations.
"""

object: str
newrdn: str
changes: list[Changes]

@property
def _new_object(self) -> str:
return f"{self.newrdn},{','.join(self.object.split(',')[1:])}"

@property
def _oldrdn(self) -> str:
return self.object.split(",")[0]

async def _modify_dn_request(
self,
container: AsyncContainer,
entry: str,
newrdn: str,
) -> LDAPResult:
modify_dn_request = LDAPModifyDNRequest(
entry=entry,
newrdn=newrdn,
deleteoldrdn=True,
new_superior=None,
)
return await modify_dn_request.handle_api(container)

async def _expire_session_objects(self, container: AsyncContainer) -> None:
session = await container.get(AsyncSession)
session.expire_all()

async def _modify_request(self, container: AsyncContainer) -> LDAPResult:
modify_request = LDAPModifyRequest(
object=self._new_object,
changes=self.changes,
)
return await modify_request.handle_api(container)

async def handle_api(self, container: AsyncContainer) -> LDAPResult:
"""Handle RenameRequest by executing ModifyDN then Modify.

If ModifyRequest fails, rollback the ModifyDnRequest and return error.
"""
modify_dn_response = await self._modify_dn_request(
container,
self.object,
self.newrdn,
)
if not modify_dn_response or modify_dn_response.result_code != 0:
return modify_dn_response

await self._expire_session_objects(container)

modify_response = await self._modify_request(container)
if not modify_response or modify_response.result_code != 0:
await self._modify_dn_request(
container,
self._new_object,
self._oldrdn,
)

return modify_response
138 changes: 138 additions & 0 deletions tests/test_api/test_main/test_router/test_rename.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Test API Rename.

Copyright (c) 2026 MultiFactor
License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE
"""

import pytest
from httpx import AsyncClient

from ldap_protocol.ldap_codes import LDAPCodes
from ldap_protocol.ldap_requests.modify import Operation


@pytest.mark.asyncio
@pytest.mark.usefixtures("adding_test_user")
@pytest.mark.usefixtures("setup_session")
@pytest.mark.usefixtures("session")
async def test_api_correct_rename_user(http_client: AsyncClient) -> None:
response = await http_client.put(
"/entry/rename",
json={
"object": "cn=test,dc=md,dc=test",
"newrdn": "cn=admin2",
"changes": [
{
"operation": Operation.REPLACE,
"modification": {
"type": "sAMAccountName",
"vals": ["admin2"],
},
},
{
"operation": Operation.REPLACE,
"modification": {
"type": "displayName",
"vals": ["Administrator"],
},
},
],
},
)

data = response.json()
assert isinstance(data, dict)
assert data.get("resultCode") == LDAPCodes.SUCCESS

response = await http_client.post(
"entry/search",
json={
"base_object": "cn=admin2,dc=md,dc=test",
"scope": 0,
"deref_aliases": 0,
"size_limit": 1000,
"time_limit": 10,
"types_only": True,
"filter": "(objectClass=*)",
"attributes": ["*"],
"page_number": 1,
},
)

data = response.json()
assert data["resultCode"] == LDAPCodes.SUCCESS
assert data["search_result"][0]["object_name"] == "cn=admin2,dc=md,dc=test"

for attr in data["search_result"][0]["partial_attributes"]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

тут тоже можно сжать. либо собрать в 1 цикл, либо сделать функцию проверки и снизу тоже заюзать

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

выписал это и отдельным пр сделаю по всем тестам

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

из вариантов такое могу предложить, вролде должно работать

sAMAccountName_found = False
displayName_found = False

for attr in data["search_result"][0]["partial_attributes"]:
    if attr["type"] == "sAMAccountName":
        assert attr["vals"][0] == "admin2"
        sAMAccountName_found = True
    elif attr["type"] == "displayName":
        assert attr["vals"][0] == "Administrator"
        displayName_found = True

    if sAMAccountName_found and displayName_found:
        break

if not sAMAccountName_found:
    raise Exception("User without sAMAccountName")
if not displayName_found:
    raise Exception("User without displayName")
#----------------------------
attrs_dict = {attr["type"]: attr["vals"][0] 
              for attr in data["search_result"][0]["partial_attributes"]}

assert attrs_dict.get("sAMAccountName") == "admin2"
assert attrs_dict.get("displayName") == "Administrator"

if attr["type"] == "sAMAccountName":
assert attr["vals"][0] == "admin2"
break
else:
raise Exception("User without sAMAccountName")

for attr in data["search_result"][0]["partial_attributes"]:
if attr["type"] == "displayName":
assert attr["vals"][0] == "Administrator"
break
else:
raise Exception("User without displayName")


@pytest.mark.asyncio
@pytest.mark.usefixtures("adding_test_computer")
@pytest.mark.usefixtures("setup_session")
@pytest.mark.usefixtures("session")
async def test_api_correct_rename_computer(http_client: AsyncClient) -> None:
response = await http_client.put(
"/entry/rename",
json={
"object": "cn=mycomputer,dc=md,dc=test",
"newrdn": "cn=maincomputer",
"changes": [
{
"operation": Operation.REPLACE,
"modification": {
"type": "sAMAccountName",
"vals": ["__invalid name for error__"],
},
},
{
"operation": Operation.REPLACE,
"modification": {
"type": "displayName",
"vals": ["Main Computer"],
},
},
],
},
)

data = response.json()
assert isinstance(data, dict)
assert data.get("resultCode") == LDAPCodes.UNDEFINED_ATTRIBUTE_TYPE

response = await http_client.post(
"entry/search",
json={
"base_object": "cn=mycomputer,dc=md,dc=test",
"scope": 0,
"deref_aliases": 0,
"size_limit": 1000,
"time_limit": 10,
"types_only": True,
"filter": "(objectClass=*)",
"attributes": ["*"],
"page_number": 1,
},
)

data = response.json()
assert data["resultCode"] == LDAPCodes.SUCCESS
assert data["search_result"][0]["object_name"] == "cn=mycomputer,dc=md,dc=test" # noqa: E501 # fmt: skip

for attr in data["search_result"][0]["partial_attributes"]:
if attr["type"] == "name":
assert attr["vals"][0] == "mycomputer name"
break
else:
raise Exception("Computer without name")