Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions integration/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,21 @@ def test_client_cluster_minimal(client: weaviate.WeaviateClient, request: SubReq
client.collections.delete(request.node.name)


def test_client_cluster_statistics(client: weaviate.WeaviateClient) -> None:
"""GET /v1/cluster/statistics – RAFT cluster statistics."""
stats = client.cluster.statistics()
assert hasattr(stats, "statistics")
assert hasattr(stats, "synchronized")
assert isinstance(stats.statistics, list)
assert isinstance(stats.synchronized, bool)
for node in stats.statistics:
assert hasattr(node, "name")
assert hasattr(node, "status")
assert hasattr(node, "raft")
assert hasattr(node.raft, "state")
assert node.raft.state in ("Leader", "Follower", "Candidate", "")


def test_client_connect_and_close() -> None:
client = weaviate.WeaviateClient(
connection_params=weaviate.connect.ConnectionParams.from_url(
Expand Down
53 changes: 53 additions & 0 deletions mock_tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,59 @@ def test_node_with_timeout(
assert nodes[0].status == "TIMEOUT"


def test_cluster_statistics(httpserver: HTTPServer, start_grpc_server: grpc.Server) -> None:
httpserver.expect_request("/v1/.well-known/ready").respond_with_json({})
httpserver.expect_request("/v1/meta").respond_with_json({"version": "1.34"})
httpserver.expect_request("/v1/cluster/statistics").respond_with_json(
{
"statistics": [
{
"candidates": {},
"dbLoaded": True,
"initialLastAppliedIndex": 119,
"isVoter": True,
"leaderAddress": "172.16.11.11:8300",
"leaderId": "weaviate-0",
"name": "weaviate-0",
"open": True,
"raft": {
"appliedIndex": "144",
"commitIndex": "144",
"fsmPending": "0",
"lastContact": "0",
"lastLogIndex": "144",
"lastLogTerm": "31",
"latestConfiguration": [
{"address": "172.16.11.11:8300", "id": "weaviate-0", "suffrage": 0}
],
"latestConfigurationIndex": "0",
"numPeers": "2",
"state": "Leader",
"term": "31",
},
"ready": True,
"status": "HEALTHY",
}
],
"synchronized": True,
}
)

client = weaviate.connect_to_local(
port=MOCK_PORT,
host=MOCK_IP,
grpc_port=MOCK_PORT_GRPC,
)
client.connect()

stats = client.cluster.statistics()
assert stats.synchronized is True
assert len(stats.statistics) == 1
assert stats.statistics[0].name == "weaviate-0"
assert stats.statistics[0].status == "HEALTHY"
assert stats.statistics[0].raft.state == "Leader"


def test_backup_cancel_while_create_and_restore(
weaviate_no_auth_mock: HTTPServer, start_grpc_server: grpc.Server
) -> None:
Expand Down
3 changes: 2 additions & 1 deletion weaviate/cluster/async_.pyi
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
from typing import List, Literal, Optional, Union, overload

from weaviate.cluster.models import ReplicationType, ShardingState
from weaviate.cluster.models import ClusterStatistics, ReplicationType, ShardingState
from weaviate.cluster.replicate import _ReplicateAsync
from weaviate.cluster.types import Verbosity
from weaviate.collections.classes.cluster import NodeMinimal, NodeVerbose
Expand Down Expand Up @@ -57,6 +57,7 @@ class _ClusterAsync(_ClusterExecutor[ConnectionAsync]):
collection: str,
shard: Optional[str] = None,
) -> Optional[ShardingState]: ...
async def statistics(self) -> ClusterStatistics: ...
@property
def replications(self) -> _ReplicateAsync:
"""replication (_Replication): Replication object instance connected to the same Weaviate instance as the Client.
Expand Down
28 changes: 28 additions & 0 deletions weaviate/cluster/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from httpx import Response

from weaviate.cluster.models import (
ClusterStatistics,
ReplicationType,
ShardingState,
)
Expand Down Expand Up @@ -149,3 +150,30 @@ def resp(
params=params,
error_msg="Get nodes status failed",
)

def statistics(self) -> executor.Result[ClusterStatistics]:
"""Get RAFT cluster statistics.

Returns cluster statistics data including RAFT consensus state (leader/follower),
commit/applied indices, and cluster synchronization status.

Returns:
ClusterStatistics with a list of node statistics and synchronized flag.

Raises:
weaviate.exceptions.WeaviateConnectionError: If the network connection to weaviate fails.
weaviate.exceptions.UnexpectedStatusCodeError: If weaviate reports a non-OK status.
"""

def resp(response: Response) -> ClusterStatistics:
response_typed = _decode_json_response_dict(response, "Cluster statistics")
assert response_typed is not None
return ClusterStatistics._from_weaviate(response_typed)

return executor.execute(
response_callback=resp,
method=self._connection.get,
path="/cluster/statistics",
status_codes=_ExpectedStatusCodes(200, "cluster statistics"),
error_msg="Get cluster statistics failed",
)
120 changes: 119 additions & 1 deletion weaviate/cluster/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import Generic, List, TypedDict, TypeVar, Union
from typing import Any, Dict, Generic, List, TypedDict, TypeVar, Union


class ReplicationType(str, Enum):
Expand Down Expand Up @@ -133,3 +133,121 @@ def _from_weaviate(data: _ReplicationShardingStateResponse):
collection=ss["collection"],
shards=[ShardReplicas._from_weaviate(shard) for shard in ss["shards"]],
)


# --- RAFT cluster statistics ---


@dataclass
class RaftConfigurationMember:
"""A member in the RAFT cluster's latest configuration."""

address: str
node_id: str
suffrage: int

@staticmethod
def _from_weaviate(data: dict) -> "RaftConfigurationMember":
return RaftConfigurationMember(
address=data["address"],
node_id=data["id"],
suffrage=data["suffrage"],
)


@dataclass
class RaftStats:
"""RAFT consensus statistics for a node."""

applied_index: str
commit_index: str
fsm_pending: str
last_contact: str
last_log_index: str
last_log_term: str
last_snapshot_index: str
last_snapshot_term: str
latest_configuration: List[RaftConfigurationMember]
latest_configuration_index: str
num_peers: str
protocol_version: str
protocol_version_max: str
protocol_version_min: str
snapshot_version_max: str
snapshot_version_min: str
state: str
term: str

@staticmethod
def _from_weaviate(data: dict) -> "RaftStats":
return RaftStats(
applied_index=data.get("appliedIndex", ""),
commit_index=data.get("commitIndex", ""),
fsm_pending=data.get("fsmPending", ""),
last_contact=data.get("lastContact", ""),
last_log_index=data.get("lastLogIndex", ""),
last_log_term=data.get("lastLogTerm", ""),
last_snapshot_index=data.get("lastSnapshotIndex", ""),
last_snapshot_term=data.get("lastSnapshotTerm", ""),
latest_configuration=[
RaftConfigurationMember._from_weaviate(m)
for m in data.get("latestConfiguration", [])
],
latest_configuration_index=data.get("latestConfigurationIndex", ""),
num_peers=data.get("numPeers", ""),
protocol_version=data.get("protocolVersion", ""),
protocol_version_max=data.get("protocolVersionMax", ""),
protocol_version_min=data.get("protocolVersionMin", ""),
snapshot_version_max=data.get("snapshotVersionMax", ""),
snapshot_version_min=data.get("snapshotVersionMin", ""),
state=data.get("state", ""),
term=data.get("term", ""),
)


@dataclass
class NodeStatistics:
"""RAFT cluster statistics for a single node."""

candidates: Dict[str, Any]
db_loaded: bool
initial_last_applied_index: int
is_voter: bool
leader_address: str
leader_id: str
name: str
is_open: bool
raft: RaftStats
ready: bool
status: str

@staticmethod
def _from_weaviate(data: dict) -> "NodeStatistics":
return NodeStatistics(
candidates=data.get("candidates", {}),
db_loaded=data.get("dbLoaded", False),
initial_last_applied_index=data.get("initialLastAppliedIndex", 0),
is_voter=data.get("isVoter", False),
leader_address=data.get("leaderAddress", ""),
leader_id=data.get("leaderId", ""),
name=data.get("name", ""),
is_open=data.get("open", False),
raft=RaftStats._from_weaviate(data.get("raft", {})),
ready=data.get("ready", False),
status=data.get("status", ""),
)


@dataclass
class ClusterStatistics:
"""Response from GET /v1/cluster/statistics (RAFT cluster statistics)."""

statistics: List[NodeStatistics]
synchronized: bool

@staticmethod
def _from_weaviate(data: dict) -> "ClusterStatistics":
return ClusterStatistics(
statistics=[NodeStatistics._from_weaviate(s) for s in data.get("statistics", [])],
synchronized=data.get("synchronized", False),
)
3 changes: 2 additions & 1 deletion weaviate/cluster/sync.pyi
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
from typing import List, Literal, Optional, Union, overload

from weaviate.cluster.models import ReplicationType, ShardingState
from weaviate.cluster.models import ClusterStatistics, ReplicationType, ShardingState
from weaviate.cluster.replicate import _Replicate
from weaviate.cluster.types import Verbosity
from weaviate.collections.classes.cluster import NodeMinimal, NodeVerbose
Expand Down Expand Up @@ -57,6 +57,7 @@ class _Cluster(_ClusterExecutor[ConnectionSync]):
collection: str,
shard: Optional[str] = None,
) -> Optional[ShardingState]: ...
def statistics(self) -> ClusterStatistics: ...
@property
def replications(self) -> _Replicate:
"""replication (_Replication): Replication object instance connected to the same Weaviate instance as the Client.
Expand Down
17 changes: 14 additions & 3 deletions weaviate/outputs/cluster.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from weaviate.cluster.models import ShardingState, ShardReplicas
from weaviate.cluster.models import (
ClusterStatistics,
NodeStatistics,
RaftConfigurationMember,
RaftStats,
ShardingState,
ShardReplicas,
)
from weaviate.collections.classes.cluster import (
Node,
NodeMinimal,
Expand All @@ -9,12 +16,16 @@
)

__all__ = [
"ClusterStatistics",
"Node",
"NodeMinimal",
"NodeStatistics",
"NodeVerbose",
"RaftConfigurationMember",
"RaftStats",
"Shard",
"Shards",
"Stats",
"ShardingState",
"ShardReplicas",
"Shards",
"Stats",
]
Loading