Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .claude/settings.local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"permissions": {
"allow": [
"Bash(make lint)"
],
"deny": []
}
}
245 changes: 243 additions & 2 deletions wish-models/src/wish_models/command_result/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import json
from enum import Enum
from typing import Any, Dict
from typing import Any, Dict, List

from pydantic import BaseModel

Expand All @@ -12,7 +12,8 @@ class CommandType(Enum):

BASH = "bash"
MSFCONSOLE = "msfconsole"
# Future extensions
MSFCONSOLE_RESOURCE = "msfconsole_resource"
METERPRETER = "meterpreter"
PYTHON = "python"
POWERSHELL = "powershell"

Expand Down Expand Up @@ -119,6 +120,246 @@ def create_msfconsole_command(
tool_parameters=tool_parameters
)

@classmethod
def create_msfconsole_resource_command(
cls,
commands: List[str],
resource_file: str = "/tmp/msf_exploit.rc",
**kwargs
) -> "Command":
"""Create an msfconsole resource file command.

Args:
commands: List of MSF commands to execute
resource_file: Path to resource file
**kwargs: Additional tool parameters

Returns:
Command configured for msfconsole resource execution.
"""
# Create resource file content
resource_content = '\n'.join(commands)
bash_cmd = f"cat << 'EOF' > {resource_file}\n{resource_content}\nEOF\nmsfconsole -q -r {resource_file}"

tool_parameters = {
"resource_file": resource_file,
"commands": commands,
"timeout": 600,
**kwargs
}

return cls(
command=bash_cmd,
tool_type=CommandType.MSFCONSOLE_RESOURCE,
tool_parameters=tool_parameters
)

@classmethod
def create_meterpreter_command(
cls,
command: str,
session_id: str = "",
**kwargs
) -> "Command":
"""Create a meterpreter session command.

Args:
command: The meterpreter command to execute
session_id: Meterpreter session identifier
**kwargs: Additional tool parameters

Returns:
Command configured for meterpreter execution.
"""
tool_parameters = {
"timeout": 300,
**kwargs
}
if session_id:
tool_parameters["session_id"] = session_id

return cls(
command=command,
tool_type=CommandType.METERPRETER,
tool_parameters=tool_parameters
)

@classmethod
def create_python_command(
cls,
script_path: str,
arguments: List[str] = None,
**kwargs
) -> "Command":
"""Create a python command.

Args:
script_path: Path to Python script
arguments: Script arguments
**kwargs: Additional tool parameters

Returns:
Command configured for python execution.
"""
if arguments is None:
arguments = []

args_str = ' '.join(f'"{arg}"' for arg in arguments)
command = f"python3 {script_path} {args_str}".strip()

tool_parameters = {
"script_path": script_path,
"arguments": arguments,
"timeout": 300,
**kwargs
}

return cls(
command=command,
tool_type=CommandType.PYTHON,
tool_parameters=tool_parameters
)

@classmethod
def create_powershell_command(
cls,
command: str,
execution_policy: str = "Bypass",
**kwargs
) -> "Command":
"""Create a PowerShell command.

Args:
command: The PowerShell command/script to execute
execution_policy: PowerShell execution policy
**kwargs: Additional tool parameters

Returns:
Command configured for PowerShell execution.
"""
tool_parameters = {
"execution_policy": execution_policy,
"timeout": 300,
**kwargs
}

return cls(
command=command,
tool_type=CommandType.POWERSHELL,
tool_parameters=tool_parameters
)

def validate_tool_parameters(self) -> List[str]:
"""Validate tool parameters for the specified tool type.

Returns:
List of validation error messages.
"""
errors = []

if self.tool_type == CommandType.MSFCONSOLE:
# MSFコマンドでexploit/を使う場合はRHOSTSが必要
if 'use exploit/' in self.command:
if not self.tool_parameters.get('rhosts'):
errors.append("RHOSTS parameter required for exploit modules")

# Reverse payloadにはLHOSTが必要
if any(payload in self.command.lower() for payload in ['reverse', 'bind']):
if not self.tool_parameters.get('lhost'):
errors.append("LHOST parameter required for reverse/bind payloads")

elif self.tool_type == CommandType.MSFCONSOLE_RESOURCE:
if not self.tool_parameters.get('commands'):
errors.append("Commands list required for resource file execution")

elif self.tool_type == CommandType.PYTHON:
if not self.tool_parameters.get('script_path'):
errors.append("Script path required for Python execution")

return errors

@property
def is_valid(self) -> bool:
"""Check if command is valid for its tool type."""
return len(self.validate_tool_parameters()) == 0

def get_execution_context(self) -> Dict[str, Any]:
"""Get execution context information for the command.

Returns:
Dictionary containing execution context details.
"""
context = {
"tool_type": self.tool_type.value,
"requires_privileges": self._requires_privileges(),
"estimated_duration": self._estimate_duration(),
"risk_level": self._assess_risk_level(),
"dependencies": self._get_dependencies()
}
return context

def _requires_privileges(self) -> bool:
"""Check if command requires elevated privileges."""
privileged_commands = ['sudo', 'su', 'chmod', 'chown', 'mount', 'systemctl']
if self.tool_type == CommandType.BASH:
return any(cmd in self.command.lower() for cmd in privileged_commands)
elif self.tool_type == CommandType.MSFCONSOLE:
return 'exploit' in self.command.lower()
return False

def _estimate_duration(self) -> int:
"""Estimate command execution duration in seconds."""
if self.tool_type == CommandType.BASH:
if 'nmap' in self.command:
return 180 # nmap scans take time
elif any(tool in self.command for tool in ['hydra', 'john', 'hashcat']):
return 1800 # Password cracking tools
elif self.tool_type == CommandType.MSFCONSOLE:
return 60 # MSF commands typically quick
elif self.tool_type == CommandType.MSFCONSOLE_RESOURCE:
return 120 # Resource files may take longer

return self.tool_parameters.get('timeout', 300)

def _assess_risk_level(self) -> str:
"""Assess risk level of command execution."""
high_risk_patterns = ['rm -rf', 'dd if=', 'mkfs', 'fdisk', 'exploit', 'payload']
medium_risk_patterns = ['chmod', 'chown', 'sudo', 'su', 'mount']

command_lower = self.command.lower()

if any(pattern in command_lower for pattern in high_risk_patterns):
return "HIGH"
elif any(pattern in command_lower for pattern in medium_risk_patterns):
return "MEDIUM"
else:
return "LOW"

def _get_dependencies(self) -> List[str]:
"""Get list of command dependencies."""
deps = []

if self.tool_type in [CommandType.MSFCONSOLE, CommandType.MSFCONSOLE_RESOURCE]:
deps.append("metasploit-framework")
elif self.tool_type == CommandType.METERPRETER:
deps.append("metasploit-framework")
elif self.tool_type == CommandType.PYTHON:
deps.append("python3")
elif self.tool_type == CommandType.POWERSHELL:
deps.append("powershell")

# Tool-specific dependencies from command content
if 'nmap' in self.command:
deps.append("nmap")
if 'hydra' in self.command:
deps.append("hydra")
if 'john' in self.command:
deps.append("john")
if 'hashcat' in self.command:
deps.append("hashcat")

return list(set(deps)) # Remove duplicates


def parse_command_from_string(input_str: str) -> Command:
"""Parse command input from string (JSON or plain command).
Expand Down
Loading