diff --git a/integration/test_client.py b/integration/test_client.py index 8b6df4935..23ece9885 100644 --- a/integration/test_client.py +++ b/integration/test_client.py @@ -419,6 +419,21 @@ def test_client_cluster_minimal(client: weaviate.WeaviateClient, request: SubReq client.collections.delete(request.node.name) +def test_client_cluster_statistics(client: weaviate.WeaviateClient) -> None: + """GET /v1/cluster/statistics – RAFT cluster statistics.""" + stats = client.cluster.statistics() + assert hasattr(stats, "statistics") + assert hasattr(stats, "synchronized") + assert isinstance(stats.statistics, list) + assert isinstance(stats.synchronized, bool) + for node in stats.statistics: + assert hasattr(node, "name") + assert hasattr(node, "status") + assert hasattr(node, "raft") + assert hasattr(node.raft, "state") + assert node.raft.state in ("Leader", "Follower", "Candidate", "") + + def test_client_connect_and_close() -> None: client = weaviate.WeaviateClient( connection_params=weaviate.connect.ConnectionParams.from_url( diff --git a/mock_tests/test_collection.py b/mock_tests/test_collection.py index 8fd489b60..2350ea6b6 100644 --- a/mock_tests/test_collection.py +++ b/mock_tests/test_collection.py @@ -313,6 +313,59 @@ def test_node_with_timeout( assert nodes[0].status == "TIMEOUT" +def test_cluster_statistics(httpserver: HTTPServer, start_grpc_server: grpc.Server) -> None: + httpserver.expect_request("/v1/.well-known/ready").respond_with_json({}) + httpserver.expect_request("/v1/meta").respond_with_json({"version": "1.34"}) + httpserver.expect_request("/v1/cluster/statistics").respond_with_json( + { + "statistics": [ + { + "candidates": {}, + "dbLoaded": True, + "initialLastAppliedIndex": 119, + "isVoter": True, + "leaderAddress": "172.16.11.11:8300", + "leaderId": "weaviate-0", + "name": "weaviate-0", + "open": True, + "raft": { + "appliedIndex": "144", + "commitIndex": "144", + "fsmPending": "0", + "lastContact": "0", + "lastLogIndex": "144", + "lastLogTerm": "31", + "latestConfiguration": [ + {"address": "172.16.11.11:8300", "id": "weaviate-0", "suffrage": 0} + ], + "latestConfigurationIndex": "0", + "numPeers": "2", + "state": "Leader", + "term": "31", + }, + "ready": True, + "status": "HEALTHY", + } + ], + "synchronized": True, + } + ) + + client = weaviate.connect_to_local( + port=MOCK_PORT, + host=MOCK_IP, + grpc_port=MOCK_PORT_GRPC, + ) + client.connect() + + stats = client.cluster.statistics() + assert stats.synchronized is True + assert len(stats.statistics) == 1 + assert stats.statistics[0].name == "weaviate-0" + assert stats.statistics[0].status == "HEALTHY" + assert stats.statistics[0].raft.state == "Leader" + + def test_backup_cancel_while_create_and_restore( weaviate_no_auth_mock: HTTPServer, start_grpc_server: grpc.Server ) -> None: diff --git a/weaviate/cluster/async_.pyi b/weaviate/cluster/async_.pyi index 900fc270c..959063207 100644 --- a/weaviate/cluster/async_.pyi +++ b/weaviate/cluster/async_.pyi @@ -1,7 +1,7 @@ import uuid from typing import List, Literal, Optional, Union, overload -from weaviate.cluster.models import ReplicationType, ShardingState +from weaviate.cluster.models import ClusterStatistics, ReplicationType, ShardingState from weaviate.cluster.replicate import _ReplicateAsync from weaviate.cluster.types import Verbosity from weaviate.collections.classes.cluster import NodeMinimal, NodeVerbose @@ -57,6 +57,7 @@ class _ClusterAsync(_ClusterExecutor[ConnectionAsync]): collection: str, shard: Optional[str] = None, ) -> Optional[ShardingState]: ... + async def statistics(self) -> ClusterStatistics: ... @property def replications(self) -> _ReplicateAsync: """replication (_Replication): Replication object instance connected to the same Weaviate instance as the Client. diff --git a/weaviate/cluster/base.py b/weaviate/cluster/base.py index ce1859d37..1fcdcf370 100644 --- a/weaviate/cluster/base.py +++ b/weaviate/cluster/base.py @@ -4,6 +4,7 @@ from httpx import Response from weaviate.cluster.models import ( + ClusterStatistics, ReplicationType, ShardingState, ) @@ -149,3 +150,30 @@ def resp( params=params, error_msg="Get nodes status failed", ) + + def statistics(self) -> executor.Result[ClusterStatistics]: + """Get RAFT cluster statistics. + + Returns cluster statistics data including RAFT consensus state (leader/follower), + commit/applied indices, and cluster synchronization status. + + Returns: + ClusterStatistics with a list of node statistics and synchronized flag. + + Raises: + weaviate.exceptions.WeaviateConnectionError: If the network connection to weaviate fails. + weaviate.exceptions.UnexpectedStatusCodeError: If weaviate reports a non-OK status. + """ + + def resp(response: Response) -> ClusterStatistics: + response_typed = _decode_json_response_dict(response, "Cluster statistics") + assert response_typed is not None + return ClusterStatistics._from_weaviate(response_typed) + + return executor.execute( + response_callback=resp, + method=self._connection.get, + path="/cluster/statistics", + status_codes=_ExpectedStatusCodes(200, "cluster statistics"), + error_msg="Get cluster statistics failed", + ) diff --git a/weaviate/cluster/models.py b/weaviate/cluster/models.py index c55e5463f..39dc4371f 100644 --- a/weaviate/cluster/models.py +++ b/weaviate/cluster/models.py @@ -1,7 +1,7 @@ import uuid from dataclasses import dataclass from enum import Enum -from typing import Generic, List, TypedDict, TypeVar, Union +from typing import Any, Dict, Generic, List, TypedDict, TypeVar, Union class ReplicationType(str, Enum): @@ -133,3 +133,121 @@ def _from_weaviate(data: _ReplicationShardingStateResponse): collection=ss["collection"], shards=[ShardReplicas._from_weaviate(shard) for shard in ss["shards"]], ) + + +# --- RAFT cluster statistics --- + + +@dataclass +class RaftConfigurationMember: + """A member in the RAFT cluster's latest configuration.""" + + address: str + node_id: str + suffrage: int + + @staticmethod + def _from_weaviate(data: dict) -> "RaftConfigurationMember": + return RaftConfigurationMember( + address=data["address"], + node_id=data["id"], + suffrage=data["suffrage"], + ) + + +@dataclass +class RaftStats: + """RAFT consensus statistics for a node.""" + + applied_index: str + commit_index: str + fsm_pending: str + last_contact: str + last_log_index: str + last_log_term: str + last_snapshot_index: str + last_snapshot_term: str + latest_configuration: List[RaftConfigurationMember] + latest_configuration_index: str + num_peers: str + protocol_version: str + protocol_version_max: str + protocol_version_min: str + snapshot_version_max: str + snapshot_version_min: str + state: str + term: str + + @staticmethod + def _from_weaviate(data: dict) -> "RaftStats": + return RaftStats( + applied_index=data.get("appliedIndex", ""), + commit_index=data.get("commitIndex", ""), + fsm_pending=data.get("fsmPending", ""), + last_contact=data.get("lastContact", ""), + last_log_index=data.get("lastLogIndex", ""), + last_log_term=data.get("lastLogTerm", ""), + last_snapshot_index=data.get("lastSnapshotIndex", ""), + last_snapshot_term=data.get("lastSnapshotTerm", ""), + latest_configuration=[ + RaftConfigurationMember._from_weaviate(m) + for m in data.get("latestConfiguration", []) + ], + latest_configuration_index=data.get("latestConfigurationIndex", ""), + num_peers=data.get("numPeers", ""), + protocol_version=data.get("protocolVersion", ""), + protocol_version_max=data.get("protocolVersionMax", ""), + protocol_version_min=data.get("protocolVersionMin", ""), + snapshot_version_max=data.get("snapshotVersionMax", ""), + snapshot_version_min=data.get("snapshotVersionMin", ""), + state=data.get("state", ""), + term=data.get("term", ""), + ) + + +@dataclass +class NodeStatistics: + """RAFT cluster statistics for a single node.""" + + candidates: Dict[str, Any] + db_loaded: bool + initial_last_applied_index: int + is_voter: bool + leader_address: str + leader_id: str + name: str + is_open: bool + raft: RaftStats + ready: bool + status: str + + @staticmethod + def _from_weaviate(data: dict) -> "NodeStatistics": + return NodeStatistics( + candidates=data.get("candidates", {}), + db_loaded=data.get("dbLoaded", False), + initial_last_applied_index=data.get("initialLastAppliedIndex", 0), + is_voter=data.get("isVoter", False), + leader_address=data.get("leaderAddress", ""), + leader_id=data.get("leaderId", ""), + name=data.get("name", ""), + is_open=data.get("open", False), + raft=RaftStats._from_weaviate(data.get("raft", {})), + ready=data.get("ready", False), + status=data.get("status", ""), + ) + + +@dataclass +class ClusterStatistics: + """Response from GET /v1/cluster/statistics (RAFT cluster statistics).""" + + statistics: List[NodeStatistics] + synchronized: bool + + @staticmethod + def _from_weaviate(data: dict) -> "ClusterStatistics": + return ClusterStatistics( + statistics=[NodeStatistics._from_weaviate(s) for s in data.get("statistics", [])], + synchronized=data.get("synchronized", False), + ) diff --git a/weaviate/cluster/sync.pyi b/weaviate/cluster/sync.pyi index 7bf659dc1..b35166163 100644 --- a/weaviate/cluster/sync.pyi +++ b/weaviate/cluster/sync.pyi @@ -1,7 +1,7 @@ import uuid from typing import List, Literal, Optional, Union, overload -from weaviate.cluster.models import ReplicationType, ShardingState +from weaviate.cluster.models import ClusterStatistics, ReplicationType, ShardingState from weaviate.cluster.replicate import _Replicate from weaviate.cluster.types import Verbosity from weaviate.collections.classes.cluster import NodeMinimal, NodeVerbose @@ -57,6 +57,7 @@ class _Cluster(_ClusterExecutor[ConnectionSync]): collection: str, shard: Optional[str] = None, ) -> Optional[ShardingState]: ... + def statistics(self) -> ClusterStatistics: ... @property def replications(self) -> _Replicate: """replication (_Replication): Replication object instance connected to the same Weaviate instance as the Client. diff --git a/weaviate/outputs/cluster.py b/weaviate/outputs/cluster.py index f176044fa..7dfbb6474 100644 --- a/weaviate/outputs/cluster.py +++ b/weaviate/outputs/cluster.py @@ -1,4 +1,11 @@ -from weaviate.cluster.models import ShardingState, ShardReplicas +from weaviate.cluster.models import ( + ClusterStatistics, + NodeStatistics, + RaftConfigurationMember, + RaftStats, + ShardingState, + ShardReplicas, +) from weaviate.collections.classes.cluster import ( Node, NodeMinimal, @@ -9,12 +16,16 @@ ) __all__ = [ + "ClusterStatistics", "Node", "NodeMinimal", + "NodeStatistics", "NodeVerbose", + "RaftConfigurationMember", + "RaftStats", "Shard", - "Shards", - "Stats", "ShardingState", "ShardReplicas", + "Shards", + "Stats", ]