Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 190 additions & 26 deletions nodescraper/plugins/inband/memory/memory_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@
from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus, OSFamily
from nodescraper.models import TaskResult

from .memorydata import MemoryDataModel
from .memorydata import (
LsmemData,
MemoryBlock,
MemoryDataModel,
MemorySummary,
NumaDistance,
NumaNode,
NumaTopology,
)


class MemoryCollector(InBandDataCollector[MemoryDataModel, None]):
Expand All @@ -42,7 +50,8 @@ class MemoryCollector(InBandDataCollector[MemoryDataModel, None]):
"wmic OS get FreePhysicalMemory /Value; wmic ComputerSystem get TotalPhysicalMemory /Value"
)
CMD = "free -b"
CMD_LSMEM = "/usr/bin/lsmem"
CMD_LSMEM = "lsmem"
CMD_NUMACTL = "numactl -H"

def collect_data(self, args=None) -> tuple[TaskResult, Optional[MemoryDataModel]]:
"""
Expand Down Expand Up @@ -84,12 +93,23 @@ def collect_data(self, args=None) -> tuple[TaskResult, Optional[MemoryDataModel]
lsmem_cmd = self._run_sut_cmd(self.CMD_LSMEM)
if lsmem_cmd.exit_code == 0:
lsmem_data = self._parse_lsmem_output(lsmem_cmd.stdout)
self._log_event(
category=EventCategory.OS,
description="lsmem output collected",
data=lsmem_data,
priority=EventPriority.INFO,
)
if lsmem_data:
self._log_event(
category=EventCategory.OS,
description="lsmem output collected",
data={
"memory_blocks": len(lsmem_data.memory_blocks),
"total_online_memory": lsmem_data.summary.total_online_memory,
},
priority=EventPriority.INFO,
)
else:
self._log_event(
category=EventCategory.OS,
description="Failed to parse lsmem output",
priority=EventPriority.WARNING,
console_log=False,
)
else:
self._log_event(
category=EventCategory.OS,
Expand All @@ -103,9 +123,48 @@ def collect_data(self, args=None) -> tuple[TaskResult, Optional[MemoryDataModel]
console_log=False,
)

# Collect NUMA topology information
numa_topology = None
if self.system_info.os_family != OSFamily.WINDOWS:
numactl_cmd = self._run_sut_cmd(self.CMD_NUMACTL)
if numactl_cmd.exit_code == 0:
numa_topology = self._parse_numactl_hardware(numactl_cmd.stdout)
if numa_topology:
self._log_event(
category=EventCategory.MEMORY,
description="NUMA topology collected",
data={
"available_nodes": numa_topology.available_nodes,
"node_count": len(numa_topology.nodes),
},
priority=EventPriority.INFO,
)
else:
self._log_event(
category=EventCategory.MEMORY,
description="Failed to parse numactl output",
priority=EventPriority.WARNING,
console_log=False,
)
else:
self._log_event(
category=EventCategory.MEMORY,
description="Error running numactl command",
data={
"command": numactl_cmd.command,
"exit_code": numactl_cmd.exit_code,
"stderr": numactl_cmd.stderr,
},
priority=EventPriority.WARNING,
console_log=False,
)

if mem_free and mem_total:
mem_data = MemoryDataModel(
mem_free=mem_free, mem_total=mem_total, lsmem_output=lsmem_data
mem_free=mem_free,
mem_total=mem_total,
lsmem_data=lsmem_data,
numa_topology=numa_topology,
)
self._log_event(
category=EventCategory.OS,
Expand All @@ -122,19 +181,19 @@ def collect_data(self, args=None) -> tuple[TaskResult, Optional[MemoryDataModel]

return self.result, mem_data

def _parse_lsmem_output(self, output: str) -> dict:
def _parse_lsmem_output(self, output: str):
"""
Parse lsmem command output into a structured dictionary.
Parse lsmem command output into a structured LsmemData object.

Args:
output: Raw stdout from lsmem command

Returns:
dict: Parsed lsmem data with memory blocks and summary information
LsmemData: Parsed lsmem data with memory blocks and summary information
"""
lines = output.strip().split("\n")
memory_blocks = []
summary = {}
summary_dict = {}

for line in lines:
line = line.strip()
Expand All @@ -146,21 +205,126 @@ def _parse_lsmem_output(self, output: str) -> dict:
parts = line.split()
if len(parts) >= 4:
memory_blocks.append(
{
"range": parts[0],
"size": parts[1],
"state": parts[2],
"removable": parts[3] if len(parts) > 3 else None,
"block": parts[4] if len(parts) > 4 else None,
}
MemoryBlock(
range=parts[0],
size=parts[1],
state=parts[2],
removable=parts[3] if len(parts) > 3 else None,
block=parts[4] if len(parts) > 4 else None,
)
)
# Parse summary lines
elif ":" in line:
key, value = line.split(":", 1)
summary[key.strip().lower().replace(" ", "_")] = value.strip()
summary_dict[key.strip().lower().replace(" ", "_")] = value.strip()

summary = MemorySummary(
memory_block_size=summary_dict.get("memory_block_size"),
total_online_memory=summary_dict.get("total_online_memory"),
total_offline_memory=summary_dict.get("total_offline_memory"),
)

if not memory_blocks:
return None

return LsmemData(memory_blocks=memory_blocks, summary=summary)

def _parse_numactl_hardware(self, output: str):
"""
Parse 'numactl -H' output into NumaTopology structure.

Args:
output: Raw stdout from numactl -H command

Returns:
NumaTopology object or None if parsing fails
"""
lines = output.strip().split("\n")
available_nodes = []
nodes = []
distances = []
distance_matrix = {}

current_section = None

for line in lines:
line = line.strip()
if not line:
continue

# Parse available nodes line
if line.startswith("available:"):
match = re.search(r"available:\s*(\d+)\s+nodes?\s*\(([^)]+)\)", line)
if match:
node_range = match.group(2)
if "-" in node_range:
start, end = node_range.split("-")
available_nodes = list(range(int(start), int(end) + 1))
else:
available_nodes = [int(x.strip()) for x in node_range.split()]

# Parse node CPU line
elif line.startswith("node") and "cpus:" in line:
match = re.search(r"node\s+(\d+)\s+cpus:\s*(.+)", line)
if match:
node_id = int(match.group(1))
cpu_list_str = match.group(2).strip()
if cpu_list_str:
cpus = [int(x) for x in cpu_list_str.split()]
else:
cpus = []
nodes.append(NumaNode(node_id=node_id, cpus=cpus))

# Parse node memory size
elif line.startswith("node") and "size:" in line:
match = re.search(r"node\s+(\d+)\s+size:\s*(\d+)\s*MB", line)
if match:
node_id = int(match.group(1))
size_mb = int(match.group(2))
# Find existing node and update
for node in nodes:
if node.node_id == node_id:
node.memory_size_mb = size_mb
break

# Parse node free memory
elif line.startswith("node") and "free:" in line:
match = re.search(r"node\s+(\d+)\s+free:\s*(\d+)\s*MB", line)
if match:
node_id = int(match.group(1))
free_mb = int(match.group(2))
# Find existing node and update
for node in nodes:
if node.node_id == node_id:
node.memory_free_mb = free_mb
break

# Parse distance matrix
elif line.startswith("node distances:"):
current_section = "distances"

elif current_section == "distances":
if line.startswith("node") and ":" not in line:
continue
elif ":" in line:
parts = line.split(":")
if len(parts) == 2:
from_node = int(parts[0].strip())
dist_values = [int(x) for x in parts[1].split()]

distance_matrix[from_node] = {}
for to_node, dist in enumerate(dist_values):
distance_matrix[from_node][to_node] = dist
distances.append(
NumaDistance(from_node=from_node, to_node=to_node, distance=dist)
)

if not nodes:
return None

return {
"raw_output": output,
"memory_blocks": memory_blocks,
"summary": summary,
}
return NumaTopology(
available_nodes=available_nodes if available_nodes else [n.node_id for n in nodes],
nodes=nodes,
distances=distances,
distance_matrix=distance_matrix if distance_matrix else None,
)
58 changes: 57 additions & 1 deletion nodescraper/plugins/inband/memory/memorydata.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,66 @@
###############################################################################
from typing import Optional

from pydantic import BaseModel

from nodescraper.models import DataModel


class MemoryBlock(BaseModel):
"""Memory block information from lsmem"""

range: str
size: str
state: str
removable: Optional[str] = None
block: Optional[str] = None


class MemorySummary(BaseModel):
"""Summary information from lsmem"""

memory_block_size: Optional[str] = None
total_online_memory: Optional[str] = None
total_offline_memory: Optional[str] = None


class LsmemData(BaseModel):
"""Complete lsmem output data"""

memory_blocks: list[MemoryBlock]
summary: MemorySummary


class NumaNode(BaseModel):
"""NUMA node information"""

node_id: int
cpus: list[int]
memory_size_mb: Optional[int] = None
memory_free_mb: Optional[int] = None


class NumaDistance(BaseModel):
"""Distance between two NUMA nodes"""

from_node: int
to_node: int
distance: int


class NumaTopology(BaseModel):
"""Complete NUMA topology from 'numactl --hardware'"""

available_nodes: list[int]
nodes: list[NumaNode]
distances: list[NumaDistance]
distance_matrix: Optional[dict[int, dict[int, int]]] = None


class MemoryDataModel(DataModel):
"""Memory data model"""

mem_free: str
mem_total: str
lsmem_output: Optional[dict] = None
lsmem_data: Optional[LsmemData] = None
numa_topology: Optional[NumaTopology] = None
Loading