From 812788316a4642f2ec2c9b77bd8f6f19b235fb16 Mon Sep 17 00:00:00 2001 From: Helmi Date: Sun, 10 Aug 2025 12:15:07 +0200 Subject: [PATCH] feat: refactor replacements.json structure to eliminate duplication - Consolidate project storage to single 'projects' section with rich metadata - Add automatic migration from old format to new format - Update all replacement functions to use new unified structure - Add comprehensive default commands and patterns - Update README documentation with new configuration format Closes #6 --- README.md | 32 +- src/ccnotify/__init__.py | 7 +- src/ccnotify/cli.py | 131 +++---- src/ccnotify/config.py | 103 +++-- src/ccnotify/installer/__init__.py | 6 +- src/ccnotify/installer/detector.py | 111 +++--- src/ccnotify/installer/flows.py | 260 +++++++------ src/ccnotify/installer/updater.py | 100 ++--- src/ccnotify/installer/welcome.py | 45 ++- src/ccnotify/notify.py | 592 ++++++++++++++++++----------- src/ccnotify/setup.py | 176 +++++---- src/ccnotify/tts/__init__.py | 2 +- src/ccnotify/tts/base.py | 37 +- src/ccnotify/tts/elevenlabs.py | 139 ++++--- src/ccnotify/tts/factory.py | 122 +++--- src/ccnotify/tts/kokoro.py | 162 ++++---- src/ccnotify/version.py | 43 ++- 17 files changed, 1149 insertions(+), 919 deletions(-) diff --git a/README.md b/README.md index e2ea179..43dcbde 100644 --- a/README.md +++ b/README.md @@ -113,25 +113,45 @@ Customize how project names and commands are pronounced by editing `~/.claude/cc ```json { - "project_names": { - "agent-zero": "agent zero", - "ccnotify": "CC notify", - "roo-code": "roo code" + "projects": { + "ccnotify": { + "folder": "-Users-helmi-code-ccnotify", + "display_name": "CCNotify", + "pronunciation": "CC notify" + }, + "agent-zero": { + "folder": "-Users-helmi-code-agent-zero", + "display_name": "Agent Zero", + "pronunciation": "agent zero" + } }, "commands": { "ls": "list", "cd": "change directory", - "rm": "remove" + "rm": "remove", + "mkdir": "make directory", + "npm": "N P M", + "uvx": "U V X" }, "patterns": [ { "pattern": "npm run (\\w+)", - "replacement": "npm run {1}" + "replacement": "N P M run {1}" + }, + { + "pattern": "git (push|pull|commit)", + "replacement": "git {1}" + }, + { + "pattern": "(.+)\\.py", + "replacement": "{1} python file" } ] } ``` +**Note:** Existing configurations will be automatically migrated to the new format on first load. + ## Requirements - macOS or Linux diff --git a/src/ccnotify/__init__.py b/src/ccnotify/__init__.py index 6fb82f9..55f8634 100644 --- a/src/ccnotify/__init__.py +++ b/src/ccnotify/__init__.py @@ -6,15 +6,20 @@ __author__ = "Helmi" __license__ = "MIT" + # Lazy imports to avoid heavy dependencies during CLI usage def get_tts_provider(*args, **kwargs): """Lazy import wrapper for TTS provider""" from .tts import get_tts_provider as _get_tts_provider + return _get_tts_provider(*args, **kwargs) + def get_tts_provider_class(): """Lazy import wrapper for TTS provider class""" from .tts import TTSProvider + return TTSProvider -__all__ = ["get_tts_provider", "get_tts_provider_class"] \ No newline at end of file + +__all__ = ["get_tts_provider", "get_tts_provider_class"] diff --git a/src/ccnotify/cli.py b/src/ccnotify/cli.py index 34755e2..e43c938 100644 --- a/src/ccnotify/cli.py +++ b/src/ccnotify/cli.py @@ -29,68 +29,54 @@ def main(): uvx ccnotify install --force # Force complete reinstallation uvx ccnotify install --config-only # Only update configuration uvx ccnotify install --quiet # Minimal output mode - """ + """, ) - + # Simplified argument structure parser.add_argument( - "command", - nargs='?', + "command", + nargs="?", default="install", choices=["install"], - help="Command to execute (default: install)" - ) - - parser.add_argument( - "--force", - action="store_true", - help="Force complete reinstallation" - ) - - parser.add_argument( - "--config-only", - action="store_true", - help="Only update configuration, skip script updates" + help="Command to execute (default: install)", ) - - parser.add_argument( - "--quiet", - action="store_true", - help="Minimal output mode" - ) - + + parser.add_argument("--force", action="store_true", help="Force complete reinstallation") + parser.add_argument( - "--logging", - action="store_true", - help="Enable logging to file (off by default)" + "--config-only", action="store_true", help="Only update configuration, skip script updates" ) - + + parser.add_argument("--quiet", action="store_true", help="Minimal output mode") + parser.add_argument( - "--version", - action="version", - version=f"CCNotify {get_package_version()}" + "--logging", action="store_true", help="Enable logging to file (off by default)" ) - + + parser.add_argument("--version", action="version", version=f"CCNotify {get_package_version()}") + args = parser.parse_args() - + # Always execute install command with intelligent detection success = execute_install_command(args.force, args.config_only, args.quiet, args.logging) - + if not success: sys.exit(1) -def execute_install_command(force: bool = False, config_only: bool = False, quiet: bool = False, logging: bool = False) -> bool: +def execute_install_command( + force: bool = False, config_only: bool = False, quiet: bool = False, logging: bool = False +) -> bool: """Execute the intelligent install command with detection logic.""" # Validate parameters if not isinstance(logging, bool): raise TypeError("logging parameter must be a boolean") - + try: # Detect existing installation detector = InstallationDetector() status = detector.check_existing_installation() - + if status.exists and not force: # Existing installation - run update flow update_flow = UpdateFlow() @@ -99,7 +85,7 @@ def execute_install_command(force: bool = False, config_only: bool = False, quie # No installation or force requested - run first-time flow first_time_flow = FirstTimeFlow() return first_time_flow.run(force=force, quiet=quiet, logging=logging) - + except KeyboardInterrupt: if not quiet: display_error_message("Installation cancelled by user") @@ -116,17 +102,19 @@ def get_notify_template() -> str: from .notify import main as notify_main from .version import get_package_version import inspect - + # Get the complete notify.py file content from pathlib import Path + notify_file = Path(__file__).parent / "notify.py" - + if notify_file.exists(): content = notify_file.read_text() # Embed version in the generated script from .version import embed_version_in_script + return embed_version_in_script(content, get_package_version()) - + # Fallback minimal template version = get_package_version() return f'''#!/usr/bin/env python3 @@ -188,60 +176,57 @@ def update_claude_settings(script_path: str, logging: bool = False) -> bool: import json import shutil from pathlib import Path - + claude_dir = Path.home() / ".claude" settings_file = claude_dir / "settings.json" - + try: if settings_file.exists(): # Create backup first - backup_file = settings_file.with_suffix('.json.ccnotify.bak') + backup_file = settings_file.with_suffix(".json.ccnotify.bak") shutil.copy2(settings_file, backup_file) - - with open(settings_file, 'r') as f: + + with open(settings_file, "r") as f: settings = json.load(f) else: settings = {} - + # Add our hook configuration if "hooks" not in settings: settings["hooks"] = {} - + # Configure ccnotify hook for relevant events # Add --logging flag to command if logging is enabled command = f"uv run {script_path}" if logging: command += " --logging" - - hook_config = { - "type": "command", - "command": command - } - + + hook_config = {"type": "command", "command": command} + events_to_hook = ["PreToolUse", "PostToolUse", "Stop", "SubagentStop", "Notification"] hooks_added = False - + for event in events_to_hook: if event not in settings["hooks"]: settings["hooks"][event] = [] - + # Check if our hook is already configured and update if needed # Hook structure: {"matcher": ".*", "hooks": [{"type": "command", "command": "..."}]} hook_updated = False hook_exists = False - + for i, entry in enumerate(settings["hooks"][event]): if not isinstance(entry, dict): continue - + hooks_list = entry.get("hooks", []) if not isinstance(hooks_list, list): continue - + for j, hook in enumerate(hooks_list): if not isinstance(hook, dict): continue - + existing_command = hook.get("command", "") # Check if this is our ccnotify hook if "ccnotify.py" in existing_command or str(script_path) in existing_command: @@ -254,33 +239,33 @@ def update_claude_settings(script_path: str, logging: bool = False) -> bool: hooks_added = True except (KeyError, IndexError) as e: # Log error but continue processing - print(f"Warning: Could not update hook for {event}: {e}", file=sys.stderr) + print( + f"Warning: Could not update hook for {event}: {e}", + file=sys.stderr, + ) break - + if hook_exists: break - + if not hook_exists: - settings["hooks"][event].append({ - "matcher": ".*", - "hooks": [hook_config] - }) + settings["hooks"][event].append({"matcher": ".*", "hooks": [hook_config]}) hooks_added = True - + # Enable hooks if not already enabled if not settings.get("hooksEnabled", False): settings["hooksEnabled"] = True hooks_added = True - + if hooks_added: - with open(settings_file, 'w') as f: + with open(settings_file, "w") as f: json.dump(settings, f, indent=2) - + return True - + except Exception: return False if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/ccnotify/config.py b/src/ccnotify/config.py index 5758cc7..bd7c00a 100644 --- a/src/ccnotify/config.py +++ b/src/ccnotify/config.py @@ -22,7 +22,7 @@ def get_config_dir() -> Path: else: # Fallback for other platforms config_dir = Path.home() / ".ccnotify" - + return config_dir @@ -37,7 +37,7 @@ def get_cache_dir() -> Path: else: # Fallback for other platforms cache_dir = Path.home() / ".ccnotify" / "cache" - + return cache_dir @@ -52,11 +52,11 @@ def ensure_config_dirs() -> bool: config_dir = get_config_dir() cache_dir = get_cache_dir() models_dir = get_models_dir() - + config_dir.mkdir(parents=True, exist_ok=True) cache_dir.mkdir(parents=True, exist_ok=True) models_dir.mkdir(parents=True, exist_ok=True) - + return True except Exception as e: print(f"❌ Failed to create config directories: {e}") @@ -71,49 +71,38 @@ def get_default_config() -> Dict[str, Any]: "cache_enabled": True, "cache_dir": str(get_cache_dir()), }, - "kokoro": { - "models_dir": str(get_models_dir()), - "voice": "af_heart", - "speed": "1.0" - }, + "kokoro": {"models_dir": str(get_models_dir()), "voice": "af_heart", "speed": "1.0"}, "elevenlabs": { "voice_id": "21m00Tcm4TlvDq8ikWAM", "model_id": "eleven_flash_v2_5", "stability": 0.5, - "similarity_boost": 0.5 - }, - "notifications": { - "enabled": True, - "sound_enabled": True, - "logging_enabled": False + "similarity_boost": 0.5, }, - "replacements": { - "enabled": True, - "auto_add_projects": True - } + "notifications": {"enabled": True, "sound_enabled": True, "logging_enabled": False}, + "replacements": {"enabled": True, "auto_add_projects": True}, } def load_config() -> Dict[str, Any]: """Load configuration from file""" config_file = get_config_dir() / "config.json" - + if not config_file.exists(): # Create default config config = get_default_config() save_config(config) return config - + try: - with open(config_file, 'r') as f: + with open(config_file, "r") as f: config = json.load(f) - + # Merge with defaults to ensure all keys exist default_config = get_default_config() merged_config = merge_configs(default_config, config) - + return merged_config - + except Exception as e: print(f"⚠️ Failed to load config: {e}") print("Using default configuration") @@ -125,12 +114,12 @@ def save_config(config: Dict[str, Any]) -> bool: try: ensure_config_dirs() config_file = get_config_dir() / "config.json" - - with open(config_file, 'w') as f: + + with open(config_file, "w") as f: json.dump(config, f, indent=2) - + return True - + except Exception as e: print(f"❌ Failed to save config: {e}") return False @@ -139,13 +128,13 @@ def save_config(config: Dict[str, Any]) -> bool: def merge_configs(default: Dict[str, Any], user: Dict[str, Any]) -> Dict[str, Any]: """Merge user config with default config""" merged = default.copy() - + for key, value in user.items(): if key in merged and isinstance(merged[key], dict) and isinstance(value, dict): merged[key].update(value) else: merged[key] = value - + return merged @@ -160,23 +149,27 @@ def get_claude_profile_dir() -> Optional[Path]: Path.home() / ".claude", Path.home() / ".config" / "claude", ] - + # Add platform-specific locations if sys.platform == "darwin": # macOS - possible_locations.extend([ - Path.home() / "Library" / "Application Support" / "Claude", - Path.home() / "Library" / "Preferences" / "Claude", - ]) + possible_locations.extend( + [ + Path.home() / "Library" / "Application Support" / "Claude", + Path.home() / "Library" / "Preferences" / "Claude", + ] + ) elif sys.platform == "win32": # Windows - possible_locations.extend([ - Path.home() / "AppData" / "Roaming" / "Claude", - Path.home() / "AppData" / "Local" / "Claude", - ]) - + possible_locations.extend( + [ + Path.home() / "AppData" / "Roaming" / "Claude", + Path.home() / "AppData" / "Local" / "Claude", + ] + ) + for location in possible_locations: if location.exists() and location.is_dir(): return location - + return None @@ -185,51 +178,51 @@ def list_claude_projects() -> Dict[str, Path]: profile_dir = get_claude_profile_dir() if not profile_dir: return {} - + projects_dir = profile_dir / "projects" if not projects_dir.exists(): return {} - + projects = {} for project_file in projects_dir.glob("*.json"): try: - with open(project_file, 'r') as f: + with open(project_file, "r") as f: project_data = json.load(f) - + project_name = project_data.get("name", project_file.stem) project_path = Path(project_data.get("path", "")) - + if project_path.exists(): projects[project_name] = project_path - + except Exception as e: print(f"⚠️ Could not read project file {project_file}: {e}") - + return projects def init_config() -> bool: """Initialize configuration system""" print("🔧 Initializing ccnotify configuration...") - + if not ensure_config_dirs(): return False - + config = load_config() - + print(f"✅ Configuration directory: {get_config_dir()}") print(f"✅ Cache directory: {get_cache_dir()}") print(f"✅ Models directory: {get_models_dir()}") - + # Check for Claude profile claude_profile = get_claude_profile_dir() if claude_profile: print(f"✅ Found Claude profile: {claude_profile}") else: print("⚠️ Could not find Claude Code profile directory") - + return True if __name__ == "__main__": - init_config() \ No newline at end of file + init_config() diff --git a/src/ccnotify/installer/__init__.py b/src/ccnotify/installer/__init__.py index 473dfc5..c0080c3 100644 --- a/src/ccnotify/installer/__init__.py +++ b/src/ccnotify/installer/__init__.py @@ -6,10 +6,10 @@ __all__ = [ "InstallationDetector", - "InstallationStatus", + "InstallationStatus", "display_welcome_screen", "display_success_message", "display_error_message", "FirstTimeFlow", - "UpdateFlow" -] \ No newline at end of file + "UpdateFlow", +] diff --git a/src/ccnotify/installer/detector.py b/src/ccnotify/installer/detector.py index a2fe661..7b53061 100644 --- a/src/ccnotify/installer/detector.py +++ b/src/ccnotify/installer/detector.py @@ -13,6 +13,7 @@ @dataclass class InstallationStatus: """Status of an existing CCNotify installation.""" + exists: bool = False script_version: Optional[str] = None config_version: Optional[str] = None @@ -21,21 +22,22 @@ class InstallationStatus: hooks_configured: bool = False legacy_hooks_dir: bool = False issues: List[str] = None - + def __post_init__(self): if self.issues is None: self.issues = [] -@dataclass +@dataclass class ConfigStatus: """Status of CCNotify configuration.""" + exists: bool = False valid: bool = False provider: Optional[str] = None version: Optional[str] = None issues: List[str] = None - + def __post_init__(self): if self.issues is None: self.issues = [] @@ -44,12 +46,13 @@ def __post_init__(self): @dataclass class ModelStatus: """Status of TTS models.""" + provider: Optional[str] = None kokoro_downloaded: bool = False kokoro_models_count: int = 0 elevenlabs_configured: bool = False issues: List[str] = None - + def __post_init__(self): if self.issues is None: self.issues = [] @@ -57,17 +60,17 @@ def __post_init__(self): class InstallationDetector: """Detects and analyzes existing CCNotify installations.""" - + def __init__(self): self.claude_dir = get_claude_config_dir() self.ccnotify_dir = self.claude_dir / "ccnotify" self.legacy_hooks_dir = self.claude_dir / "hooks" self.settings_file = self.claude_dir / "settings.json" - + def check_existing_installation(self) -> InstallationStatus: """Check for existing CCNotify installation and return comprehensive status.""" status = InstallationStatus() - + # Check if ccnotify directory exists if self.ccnotify_dir.exists(): status.exists = True @@ -75,40 +78,44 @@ def check_existing_installation(self) -> InstallationStatus: config_status = self.get_config_status() status.config_version = config_status.version status.tts_provider = config_status.provider - + # Check hooks configuration status.hooks_configured = self._check_hooks_configured() - + # Check models model_status = self.get_model_status() - status.models_downloaded = model_status.kokoro_downloaded or model_status.elevenlabs_configured - + status.models_downloaded = ( + model_status.kokoro_downloaded or model_status.elevenlabs_configured + ) + # Collect issues status.issues.extend(config_status.issues) status.issues.extend(model_status.issues) - + # Check for legacy hooks directory if self.legacy_hooks_dir.exists(): status.legacy_hooks_dir = True if not status.exists: - status.issues.append("Found legacy installation in ~/.claude/hooks/ - needs migration") - + status.issues.append( + "Found legacy installation in ~/.claude/hooks/ - needs migration" + ) + # Additional validation if status.exists and not status.hooks_configured: status.issues.append("CCNotify installed but not configured in Claude settings") - + return status - + def get_installed_version(self) -> Optional[str]: """Extract version from installed ccnotify.py script.""" return self._get_script_version() - + def _get_script_version(self) -> Optional[str]: """Get version from ccnotify.py script file.""" script_file = self.ccnotify_dir / "ccnotify.py" if not script_file.exists(): return None - + try: content = script_file.read_text() # Look for version pattern in the script @@ -117,56 +124,56 @@ def _get_script_version(self) -> Optional[str]: return version_match.group(1) except Exception: pass - + return None - + def get_config_status(self) -> ConfigStatus: """Analyze configuration file status.""" status = ConfigStatus() config_file = self.ccnotify_dir / "config.json" - + if not config_file.exists(): return status - + status.exists = True - + try: with open(config_file) as f: config = json.load(f) - + status.valid = True status.provider = config.get("tts_provider") status.version = config.get("config_version", "1.0") - + # Validate required fields required_fields = ["tts_provider"] for field in required_fields: if field not in config: status.issues.append(f"Missing required config field: {field}") status.valid = False - + # Provider-specific validation if status.provider == "elevenlabs": if not config.get("elevenlabs_api_key"): status.issues.append("ElevenLabs provider selected but no API key configured") - + except json.JSONDecodeError: status.issues.append("Config file exists but contains invalid JSON") except Exception as e: status.issues.append(f"Error reading config file: {str(e)}") - + return status - + def get_model_status(self) -> ModelStatus: """Check TTS model download status.""" status = ModelStatus() config_status = self.get_config_status() - + if not config_status.exists: return status - + status.provider = config_status.provider - + if status.provider == "kokoro": # Check for Kokoro models models_dir = self.ccnotify_dir / "models" @@ -174,39 +181,39 @@ def get_model_status(self) -> ModelStatus: onnx_files = list(models_dir.glob("*.onnx")) status.kokoro_models_count = len(onnx_files) status.kokoro_downloaded = len(onnx_files) > 0 - + if status.kokoro_models_count == 0: status.issues.append("Kokoro provider selected but no models downloaded") else: status.issues.append("Kokoro provider selected but models directory missing") - + elif status.provider == "elevenlabs": # Check ElevenLabs configuration try: with open(self.ccnotify_dir / "config.json") as f: config = json.load(f) - + if config.get("elevenlabs_api_key"): status.elevenlabs_configured = True else: status.issues.append("ElevenLabs provider selected but no API key configured") - + except Exception: status.issues.append("Could not verify ElevenLabs configuration") - + return status - + def _check_hooks_configured(self) -> bool: """Check if CCNotify hooks are configured in Claude settings.""" if not self.settings_file.exists(): return False - + try: with open(self.settings_file) as f: settings = json.load(f) - + hooks = settings.get("hooks", {}) - + # Look for ccnotify.py in any hook configuration # Structure: {"hooks": {"PreToolUse": [{"matcher": ".*", "hooks": [{"command": "..."}]}]}} for hook_name, hook_list in hooks.items(): @@ -219,40 +226,40 @@ def _check_hooks_configured(self) -> bool: command = hook.get("command", "") if "ccnotify.py" in command: return True - + except Exception: pass - + return False - + def get_platform_info(self) -> Dict[str, str]: """Get platform information.""" return { "system": platform.system(), "platform": platform.platform(), "architecture": platform.machine(), - "python_version": platform.python_version() + "python_version": platform.python_version(), } - + def needs_migration(self) -> bool: """Check if installation needs migration from legacy structure.""" return self.legacy_hooks_dir.exists() and not self.ccnotify_dir.exists() - + def get_migration_info(self) -> Dict[str, Any]: """Get information about what needs to be migrated.""" if not self.needs_migration(): return {} - + info = { "legacy_dir": str(self.legacy_hooks_dir), "target_dir": str(self.ccnotify_dir), - "files_to_migrate": [] + "files_to_migrate": [], } - + # Check for files that need migration if self.legacy_hooks_dir.exists(): for item in self.legacy_hooks_dir.iterdir(): if item.is_file(): info["files_to_migrate"].append(item.name) - - return info \ No newline at end of file + + return info diff --git a/src/ccnotify/installer/flows.py b/src/ccnotify/installer/flows.py index fca67f3..47f18de 100644 --- a/src/ccnotify/installer/flows.py +++ b/src/ccnotify/installer/flows.py @@ -14,8 +14,12 @@ from .detector import InstallationDetector, InstallationStatus from .updater import UpdateManager, UpdateInfo from .welcome import ( - display_welcome_screen, display_success_message, display_error_message, - display_warning_message, display_progress_header, animate_thinking + display_welcome_screen, + display_success_message, + display_error_message, + display_warning_message, + display_progress_header, + animate_thinking, ) from ..config import get_claude_config_dir from ..setup import setup_kokoro @@ -25,37 +29,38 @@ class BaseFlow: """Base class for installation flows.""" - + def __init__(self): self.detector = InstallationDetector() self.updater = UpdateManager() self.claude_dir = get_claude_config_dir() self.ccnotify_dir = self.claude_dir / "ccnotify" - + def _setup_kokoro(self) -> Optional[Dict[str, Any]]: """Setup Kokoro TTS provider.""" console.print("\n[bold cyan]Setting up Kokoro TTS...[/bold cyan]") - + try: # Change to ccnotify directory so models are created in the right place import os + original_cwd = os.getcwd() os.chdir(str(self.ccnotify_dir)) - + try: # Call the existing setup_kokoro function setup_result = setup_kokoro(force_download=False) - + if setup_result: # Include enhanced Kokoro configuration return { - "tts_provider": "kokoro", + "tts_provider": "kokoro", "models_downloaded": True, "models_dir": str(self.ccnotify_dir / "models"), "voice": "af_heart", # Popular voices: af_heart, af_sarah, am_adam, af_sky, am_michael "speed": 1.0, # 0.5 = slower, 2.0 = faster "format": "mp3", # mp3 for smaller files, wav for quality, aiff for Mac compatibility - "mp3_bitrate": "128k" # For MP3 encoding quality + "mp3_bitrate": "128k", # For MP3 encoding quality } else: console.print(f"[red]Failed to download Kokoro models[/red]") @@ -63,22 +68,22 @@ def _setup_kokoro(self) -> Optional[Dict[str, Any]]: finally: # Always restore original directory os.chdir(original_cwd) - + except Exception as e: console.print(f"[red]Error setting up Kokoro: {e}[/red]") return None - + def _configure_claude_hooks(self, logging: bool = False) -> bool: """Configure Claude hooks to use ccnotify.""" from ..cli import update_claude_settings - + script_path = self.ccnotify_dir / "ccnotify.py" return update_claude_settings(str(script_path), logging=logging) class FirstTimeFlow(BaseFlow): """Handles first-time installation flow.""" - + def run(self, force: bool = False, quiet: bool = False, logging: bool = False) -> bool: """Execute first-time installation flow.""" try: @@ -87,7 +92,7 @@ def run(self, force: bool = False, quiet: bool = False, logging: bool = False) - version = self.updater.get_current_package_version() platform_info = self.detector.get_platform_info() display_welcome_screen(version, platform_info["system"], is_update=False) - + # Check for existing installation status = self.detector.check_existing_installation() if status.exists and not force: @@ -96,59 +101,59 @@ def run(self, force: bool = False, quiet: bool = False, logging: bool = False) - "CCNotify is already installed. Use 'uvx ccnotify install --force' to reinstall." ) return False - + # Step 1: Platform check if not quiet: display_progress_header("Platform Compatibility Check", 1, 5) animate_thinking("Checking platform compatibility") - + if not self._check_platform_compatibility(quiet): if not quiet: display_error_message("Platform not supported") return False - + # Step 2: Migration check if self.detector.needs_migration(): if not quiet: display_progress_header("Legacy Installation Migration", 2, 5) - + if not self._handle_migration(): display_error_message("Failed to migrate legacy installation") return False - + # Step 3: TTS Provider setup if not quiet: display_progress_header("TTS Provider Configuration", 3, 5) - + provider_config = self._setup_tts_provider(quiet) if not provider_config: display_error_message("TTS provider setup failed") return False - + # Step 4: Install script and configuration if not quiet: display_progress_header("Installing CCNotify Script", 4, 5) animate_thinking("Installing script and configuration") - + if not self._install_script_and_config(provider_config): display_error_message("Failed to install script and configuration") return False - + # Step 5: Configure Claude hooks if not quiet: display_progress_header("Configuring Claude Integration", 5, 5) animate_thinking("Updating Claude settings") - + if not self._configure_claude_hooks(logging=logging): display_error_message("Failed to configure Claude hooks") return False - + # Success message if not quiet: self._display_installation_success() - + return True - + except KeyboardInterrupt: if not quiet: display_warning_message("Installation cancelled by user") @@ -157,11 +162,11 @@ def run(self, force: bool = False, quiet: bool = False, logging: bool = False) - if not quiet: display_error_message("Installation failed", str(e)) return False - + def _check_platform_compatibility(self, quiet: bool = False) -> bool: """Check if current platform is supported.""" platform_info = self.detector.get_platform_info() - + # Currently only macOS is fully supported if platform_info["system"] != "Darwin": if quiet: @@ -171,24 +176,26 @@ def _check_platform_compatibility(self, quiet: bool = False) -> bool: console.print("[yellow]Warning: Full functionality only tested on macOS[/yellow]") if not Confirm.ask("Continue anyway?"): return False - + return True - + def _handle_migration(self) -> bool: """Handle migration from legacy installation.""" migration_info = self.detector.get_migration_info() - - console.print(f"[yellow]Found legacy installation in {migration_info['legacy_dir']}[/yellow]") + + console.print( + f"[yellow]Found legacy installation in {migration_info['legacy_dir']}[/yellow]" + ) console.print("Files to migrate:") - - for file_name in migration_info['files_to_migrate']: + + for file_name in migration_info["files_to_migrate"]: console.print(f" • {file_name}") - + if not Confirm.ask("Migrate these files to the new location?"): return False - + return self.updater.migrate_legacy_installation() - + def _setup_tts_provider(self, quiet: bool = False) -> Optional[Dict[str, Any]]: """Setup TTS provider with user interaction.""" if quiet: @@ -198,146 +205,151 @@ def _setup_tts_provider(self, quiet: bool = False) -> Optional[Dict[str, Any]]: "voice": "af_heart", "speed": 1.0, "format": "mp3", - "mp3_bitrate": "128k" + "mp3_bitrate": "128k", } - + # Show provider options table = Table(title="TTS Provider Options") table.add_column("Option", style="cyan") table.add_column("Provider", style="bold") table.add_column("Description", style="dim") - + table.add_row("1", "Kokoro", "Local AI models (recommended, privacy-focused)") table.add_row("2", "ElevenLabs", "Cloud-based premium quality (requires API key)") table.add_row("3", "None", "Silent mode (no voice notifications)") - + console.print(table) console.print() - + while True: choice = Prompt.ask("Select TTS provider", choices=["1", "2", "3"], default="1") - + if choice == "1": # Kokoro setup if Confirm.ask("Download Kokoro TTS models? (~500MB)", default=True): kokoro_config = self._setup_kokoro() if not kokoro_config: - console.print("[yellow]Kokoro setup failed. Please choose another provider.[/yellow]") + console.print( + "[yellow]Kokoro setup failed. Please choose another provider.[/yellow]" + ) continue return kokoro_config else: - console.print("[yellow]Kokoro requires model files to function. Please choose another provider.[/yellow]") + console.print( + "[yellow]Kokoro requires model files to function. Please choose another provider.[/yellow]" + ) continue - + elif choice == "2": # ElevenLabs setup elevenlabs_config = self._setup_elevenlabs() if not elevenlabs_config: - console.print("[yellow]ElevenLabs setup failed. Please choose another provider.[/yellow]") + console.print( + "[yellow]ElevenLabs setup failed. Please choose another provider.[/yellow]" + ) continue return elevenlabs_config - + elif choice == "3": # Silent mode return {"tts_provider": "none"} - + def _setup_elevenlabs(self) -> Optional[Dict[str, Any]]: """Setup ElevenLabs TTS provider.""" console.print("\n[bold cyan]Setting up ElevenLabs TTS...[/bold cyan]") - + api_key = Prompt.ask("Enter your ElevenLabs API key", password=True) if not api_key: console.print("[red]API key required for ElevenLabs[/red]") return None - + # TODO: Validate API key - return { - "tts_provider": "elevenlabs", - "elevenlabs_api_key": api_key - } - + return {"tts_provider": "elevenlabs", "elevenlabs_api_key": api_key} + def _install_script_and_config(self, provider_config: Dict[str, Any]) -> bool: """Install ccnotify.py script and configuration.""" try: # Create ccnotify directory self.ccnotify_dir.mkdir(exist_ok=True) - + # Generate ccnotify.py script using existing template system from ..cli import get_notify_template + script_content = get_notify_template() - + script_file = self.ccnotify_dir / "ccnotify.py" script_file.write_text(script_content) script_file.chmod(0o755) - + # Create configuration file - config = { - "config_version": "1.0", - **provider_config - } - + config = {"config_version": "1.0", **provider_config} + config_file = self.ccnotify_dir / "config.json" - with open(config_file, 'w') as f: + with open(config_file, "w") as f: json.dump(config, f, indent=2) - + return True except Exception: return False - + def _display_installation_success(self) -> None: """Display installation success message.""" success_text = Text() success_text.append("CCNotify has been successfully installed!\n\n", style="bold green") - success_text.append("🔊 Voice notifications are now active for Claude Code\n", style="green") - success_text.append("🎯 Try running commands in Claude Code to test notifications\n", style="green") + success_text.append( + "🔊 Voice notifications are now active for Claude Code\n", style="green" + ) + success_text.append( + "🎯 Try running commands in Claude Code to test notifications\n", style="green" + ) success_text.append("⚙️ Configuration stored in: ~/.claude/ccnotify/", style="dim") - + panel = Panel( success_text, title="[bold green]Installation Complete[/bold green]", - border_style="green" + border_style="green", ) console.print(panel) class UpdateFlow(BaseFlow): """Handles update flow for existing installations.""" - + def run(self, config_only: bool = False, quiet: bool = False, logging: bool = False) -> bool: """Execute update flow.""" try: # Check existing installation status = self.detector.check_existing_installation() - + if not status.exists: if not quiet: display_error_message("No existing CCNotify installation found") return False - + if not quiet: # Display welcome screen version = self.updater.get_current_package_version() platform_info = self.detector.get_platform_info() display_welcome_screen(version, platform_info["system"], is_update=True) - + # Check for updates update_info = self.updater.check_for_updates(status) - + if not quiet: self._display_installation_status(status, update_info) - + # Check if there are no updates AND no issues if not self._has_updates(update_info) and not status.issues and not config_only: if not quiet: display_success_message("CCNotify is already up to date!") return True - + # Show update options if not quiet and not config_only: if not self._confirm_updates(update_info, status): display_warning_message("Update cancelled by user") return False - + # Perform updates if not quiet: if update_info.script_update_available or update_info.config_migration_needed: @@ -346,13 +358,13 @@ def run(self, config_only: bool = False, quiet: bool = False, logging: bool = Fa elif status.issues: display_progress_header("Fixing Installation Issues", 1, 1) animate_thinking("Resolving issues") - + # Create backup backup_paths = self.updater.create_backup() - + try: success = True - + if config_only: success = self._update_config_only() else: @@ -360,19 +372,24 @@ def run(self, config_only: bool = False, quiet: bool = False, logging: bool = Fa if not quiet: display_progress_header("Updating CCNotify Script", 1, 1) success = self.updater.update_script_only(preserve_config=True) - + if success and update_info.config_migration_needed: if not quiet: display_progress_header("Migrating Configuration", 1, 1) success = self._migrate_config() - + # Fix missing models if needed if success and status.issues: for issue in status.issues: - if "models directory missing" in issue and status.tts_provider == "kokoro": + if ( + "models directory missing" in issue + and status.tts_provider == "kokoro" + ): if not quiet: - display_progress_header("Downloading Missing Kokoro Models", 1, 1) - + display_progress_header( + "Downloading Missing Kokoro Models", 1, 1 + ) + # Download models kokoro_config = self._setup_kokoro() if kokoro_config: @@ -383,22 +400,25 @@ def run(self, config_only: bool = False, quiet: bool = False, logging: bool = Fa with open(config_file) as f: config = json.load(f) config.update(kokoro_config) - with open(config_file, 'w') as f: + with open(config_file, "w") as f: json.dump(config, f, indent=2) except Exception: pass - + elif "not configured in Claude settings" in issue: if not quiet: display_progress_header("Configuring Claude Integration", 1, 1) self._configure_claude_hooks(logging=logging) - + if success: # Clean up backups self.updater.cleanup_backups(backup_paths) - + if not quiet: - if update_info.script_update_available or update_info.config_migration_needed: + if ( + update_info.script_update_available + or update_info.config_migration_needed + ): display_success_message("CCNotify updated successfully!") elif status.issues: display_success_message("Installation issues resolved successfully!") @@ -409,16 +429,16 @@ def run(self, config_only: bool = False, quiet: bool = False, logging: bool = Fa self.updater.restore_from_backup(backup_paths) if not quiet: display_error_message("Update failed, restored from backup") - + return success - + except Exception as e: # Restore from backup on error self.updater.restore_from_backup(backup_paths) if not quiet: display_error_message("Update failed", str(e)) return False - + except KeyboardInterrupt: if not quiet: display_warning_message("Update cancelled by user") @@ -427,61 +447,65 @@ def run(self, config_only: bool = False, quiet: bool = False, logging: bool = Fa if not quiet: display_error_message("Update failed", str(e)) return False - - def _display_installation_status(self, status: InstallationStatus, update_info: UpdateInfo) -> None: + + def _display_installation_status( + self, status: InstallationStatus, update_info: UpdateInfo + ) -> None: """Display current installation status.""" table = Table(title="Current Installation Status") table.add_column("Component", style="cyan") table.add_column("Status", style="bold") table.add_column("Version/Details", style="dim") - + # Script status script_status = "✓ Installed" if status.script_version else "✗ Missing" table.add_row("Script", script_status, status.script_version or "N/A") - + # TTS Provider provider_status = "✓ Configured" if status.tts_provider else "✗ Not configured" table.add_row("TTS Provider", provider_status, status.tts_provider or "N/A") - + # Models models_status = "✓ Downloaded" if status.models_downloaded else "✗ Missing" table.add_row("TTS Models", models_status, "") - + # Hooks hooks_status = "✓ Configured" if status.hooks_configured else "✗ Not configured" table.add_row("Claude Hooks", hooks_status, "") - + console.print(table) console.print() - + # Show issues if any if status.issues: console.print("[bold red]Issues found:[/bold red]") for issue in status.issues: console.print(f" • [red]{issue}[/red]") console.print() - + # Show available updates if update_info.recommended_actions: console.print("[bold cyan]Available updates:[/bold cyan]") for action in update_info.recommended_actions: console.print(f" • [cyan]{action}[/cyan]") console.print() - + def _has_updates(self, update_info: UpdateInfo) -> bool: """Check if any updates are available.""" - return (update_info.script_update_available or - update_info.config_migration_needed or - update_info.model_update_available) - + return ( + update_info.script_update_available + or update_info.config_migration_needed + or update_info.model_update_available + ) + def _confirm_updates(self, update_info: UpdateInfo, status: InstallationStatus = None) -> bool: """Confirm with user which updates to apply.""" actions_to_apply = [] - + # Add version updates if update_info.recommended_actions: actions_to_apply.extend(update_info.recommended_actions) - + # Add issue fixes if status and status.issues: for issue in status.issues: @@ -489,22 +513,22 @@ def _confirm_updates(self, update_info: UpdateInfo, status: InstallationStatus = actions_to_apply.append("Download missing Kokoro models") elif "not configured in Claude settings" in issue: actions_to_apply.append("Configure Claude hooks") - + if not actions_to_apply: return True - + console.print("[bold]The following updates will be applied:[/bold]") for action in actions_to_apply: console.print(f" • {action}") - + return Confirm.ask("\nProceed with these updates?") - + def _update_config_only(self) -> bool: """Update only configuration without touching script.""" # This would re-run the configuration setup return True - + def _migrate_config(self) -> bool: """Migrate configuration to newer format.""" # This would handle config format migrations - return True \ No newline at end of file + return True diff --git a/src/ccnotify/installer/updater.py b/src/ccnotify/installer/updater.py index 75fa64b..1211259 100644 --- a/src/ccnotify/installer/updater.py +++ b/src/ccnotify/installer/updater.py @@ -14,13 +14,14 @@ @dataclass class UpdateInfo: """Information about available updates.""" + script_update_available: bool = False current_script_version: Optional[str] = None available_script_version: Optional[str] = None config_migration_needed: bool = False model_update_available: bool = False recommended_actions: List[str] = None - + def __post_init__(self): if self.recommended_actions is None: self.recommended_actions = [] @@ -28,100 +29,103 @@ def __post_init__(self): class UpdateManager: """Manages updates for CCNotify installation.""" - + def __init__(self): self.detector = InstallationDetector() self.claude_dir = get_claude_config_dir() self.ccnotify_dir = self.claude_dir / "ccnotify" - + def check_for_updates(self, installation_status: InstallationStatus) -> UpdateInfo: """Check what updates are available for existing installation.""" update_info = UpdateInfo() - + # Get current package version try: from .. import __version__ as package_version + update_info.available_script_version = package_version except ImportError: # Fallback if we can't import version update_info.available_script_version = "unknown" - + # Compare script versions if installation_status.script_version: update_info.current_script_version = installation_status.script_version - - if self._is_newer_version(update_info.available_script_version, installation_status.script_version): + + if self._is_newer_version( + update_info.available_script_version, installation_status.script_version + ): update_info.script_update_available = True update_info.recommended_actions.append( f"Update script from v{installation_status.script_version} to v{update_info.available_script_version}" ) - + # Check if config needs migration if installation_status.config_version: if self._needs_config_migration(installation_status.config_version): update_info.config_migration_needed = True update_info.recommended_actions.append("Migrate configuration to newer format") - + # Check model updates (for Kokoro provider) if installation_status.tts_provider == "kokoro": if self._check_model_updates(): update_info.model_update_available = True update_info.recommended_actions.append("Update Kokoro TTS models") - + return update_info - + def _is_newer_version(self, new_version: str, current_version: str) -> bool: """Compare versions using semantic versioning.""" if new_version == "unknown" or current_version == "unknown": return False - + try: return version.parse(new_version) > version.parse(current_version) except Exception: # Fallback to string comparison if version parsing fails return new_version != current_version - + def _needs_config_migration(self, current_config_version: str) -> bool: """Check if config format needs migration.""" # Define config version migrations here CURRENT_CONFIG_VERSION = "1.0" - + try: return version.parse(current_config_version) < version.parse(CURRENT_CONFIG_VERSION) except Exception: # If we can't parse versions, assume migration is needed return True - + def _check_model_updates(self) -> bool: """Check if TTS model updates are available.""" # For now, return False - this would check remote model versions # in a real implementation return False - + def create_backup(self, backup_suffix: str = ".backup") -> Dict[str, Path]: """Create backup of current installation before update.""" backups = {} - + if self.ccnotify_dir.exists(): backup_dir = self.ccnotify_dir.with_suffix(backup_suffix) - + # Remove existing backup if it exists if backup_dir.exists(): shutil.rmtree(backup_dir) - + # Create new backup shutil.copytree(self.ccnotify_dir, backup_dir) backups["ccnotify_dir"] = backup_dir - + # Backup Claude settings settings_file = self.claude_dir / "settings.json" if settings_file.exists(): backup_settings = settings_file.with_suffix(f".json{backup_suffix}") shutil.copy2(settings_file, backup_settings) backups["settings"] = backup_settings - + return backups - + def restore_from_backup(self, backup_paths: Dict[str, Path]) -> bool: """Restore from backup in case of failed update.""" try: @@ -130,16 +134,16 @@ def restore_from_backup(self, backup_paths: Dict[str, Path]) -> bool: if self.ccnotify_dir.exists(): shutil.rmtree(self.ccnotify_dir) shutil.copytree(backup_paths["ccnotify_dir"], self.ccnotify_dir) - + # Restore settings if "settings" in backup_paths and backup_paths["settings"].exists(): settings_file = self.claude_dir / "settings.json" shutil.copy2(backup_paths["settings"], settings_file) - + return True except Exception: return False - + def cleanup_backups(self, backup_paths: Dict[str, Path]) -> None: """Clean up backup files after successful update.""" for backup_path in backup_paths.values(): @@ -150,49 +154,49 @@ def cleanup_backups(self, backup_paths: Dict[str, Path]) -> None: backup_path.unlink() except Exception: pass # Ignore cleanup errors - + def migrate_legacy_installation(self) -> bool: """Migrate from legacy ~/.claude/hooks/ structure to ~/.claude/ccnotify/.""" legacy_hooks_dir = self.claude_dir / "hooks" - + if not legacy_hooks_dir.exists(): return True # Nothing to migrate - + if self.ccnotify_dir.exists(): return True # Already migrated - + try: # Create ccnotify directory self.ccnotify_dir.mkdir(exist_ok=True) - + # Migrate files for item in legacy_hooks_dir.iterdir(): if item.is_file(): target = self.ccnotify_dir / item.name shutil.copy2(item, target) - + # Update Claude settings to point to new location self._update_hooks_path_in_settings() - + return True except Exception: return False - + def _update_hooks_path_in_settings(self) -> bool: """Update Claude settings.json to use new ccnotify path.""" settings_file = self.claude_dir / "settings.json" - + if not settings_file.exists(): return True - + try: with open(settings_file) as f: settings = json.load(f) - + # Update hook commands to use new path hooks = settings.get("hooks", {}) updated = False - + for hook_name, hook_config in hooks.items(): if isinstance(hook_config, dict): command = hook_config.get("command", "") @@ -200,15 +204,15 @@ def _update_hooks_path_in_settings(self) -> bool: new_command = command.replace("/.claude/hooks/", "/.claude/ccnotify/") hook_config["command"] = new_command updated = True - + if updated: - with open(settings_file, 'w') as f: + with open(settings_file, "w") as f: json.dump(settings, f, indent=2) - + return True except Exception: return False - + def update_script_only(self, preserve_config: bool = True) -> bool: """Update only the ccnotify.py script, preserving configuration.""" try: @@ -218,29 +222,31 @@ def update_script_only(self, preserve_config: bool = True) -> bool: config_file = self.ccnotify_dir / "config.json" if config_file.exists(): config_backup = config_file.read_text() - + # Generate new script with updated template from ..cli import get_notify_template + script_content = get_notify_template() - + # Update the script file script_file = self.ccnotify_dir / "ccnotify.py" script_file.write_text(script_content) script_file.chmod(0o755) - + # Restore configuration if config_backup and preserve_config: config_file = self.ccnotify_dir / "config.json" config_file.write_text(config_backup) - + return True except Exception: return False - + def get_current_package_version(self) -> str: """Get the current package version.""" try: from .. import __version__ + return __version__ except ImportError: - return "unknown" \ No newline at end of file + return "unknown" diff --git a/src/ccnotify/installer/welcome.py b/src/ccnotify/installer/welcome.py index 6a8a14f..b6c00ed 100644 --- a/src/ccnotify/installer/welcome.py +++ b/src/ccnotify/installer/welcome.py @@ -23,41 +23,47 @@ def display_welcome_screen(version: str, platform: str, is_update: bool = False) -> None: """Display the animated welcome screen with ANSI art.""" console.clear() - + # Create gradient ASCII art ascii_text = Text(CCNOTIFY_ASCII_ART) ascii_text.stylize("bold blue", 0, len(CCNOTIFY_ASCII_ART) // 3) - ascii_text.stylize("bold cyan", len(CCNOTIFY_ASCII_ART) // 3, (len(CCNOTIFY_ASCII_ART) * 2) // 3) + ascii_text.stylize( + "bold cyan", len(CCNOTIFY_ASCII_ART) // 3, (len(CCNOTIFY_ASCII_ART) * 2) // 3 + ) ascii_text.stylize("bold magenta", (len(CCNOTIFY_ASCII_ART) * 2) // 3, len(CCNOTIFY_ASCII_ART)) - + # Create subtitle subtitle = "Voice Notification System for Claude Code" subtitle_text = Text(subtitle, style="italic dim") - + # Create version and platform info action = "UPDATE" if is_update else "INSTALLATION" info_text = Text(f"v{version} • {platform} • {action}", style="dim") - + # Create a Group to combine elements properly from rich.console import Group - + content = Group( Align.center(ascii_text), "", # Empty line Align.center(subtitle_text), - "", # Empty line - Align.center(info_text) + "", # Empty line + Align.center(info_text), ) - + # Create panel with border panel = Panel( content, border_style="blue", padding=(1, 2), - title="[bold blue]CCNotify Installer[/bold blue]" if not is_update else "[bold blue]CCNotify Updater[/bold blue]", - title_align="center" + title=( + "[bold blue]CCNotify Installer[/bold blue]" + if not is_update + else "[bold blue]CCNotify Updater[/bold blue]" + ), + title_align="center", ) - + console.print(panel) console.print() @@ -75,7 +81,7 @@ def display_success_message(message: str) -> None: Text(message, style="bold green", justify="center"), border_style="green", title="[bold green]✓ Success[/bold green]", - title_align="center" + title_align="center", ) console.print(success_panel) @@ -85,12 +91,9 @@ def display_error_message(message: str, details: str = None) -> None: error_text = Text(message, style="bold red") if details: error_text.append(f"\n\n{details}", style="dim red") - + error_panel = Panel( - error_text, - border_style="red", - title="[bold red]✗ Error[/bold red]", - title_align="center" + error_text, border_style="red", title="[bold red]✗ Error[/bold red]", title_align="center" ) console.print(error_panel) @@ -101,7 +104,7 @@ def display_warning_message(message: str) -> None: Text(message, style="bold yellow", justify="center"), border_style="yellow", title="[bold yellow]⚠ Warning[/bold yellow]", - title_align="center" + title_align="center", ) console.print(warning_panel) @@ -110,7 +113,7 @@ def animate_thinking(message: str = "Processing", duration: float = 2.0) -> None """Display an animated thinking indicator.""" chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" end_time = time.time() + duration - + with console.status(f"[bold blue]{message}...", spinner="dots"): while time.time() < end_time: time.sleep(0.1) @@ -120,4 +123,4 @@ def animate_thinking(message: str = "Processing", duration: float = 2.0) -> None # Demo the welcome screen display_welcome_screen("1.0.0", "macOS", False) time.sleep(2) - display_welcome_screen("1.0.0", "macOS", True) \ No newline at end of file + display_welcome_screen("1.0.0", "macOS", True) diff --git a/src/ccnotify/notify.py b/src/ccnotify/notify.py index 2a966b0..ce22886 100644 --- a/src/ccnotify/notify.py +++ b/src/ccnotify/notify.py @@ -5,7 +5,7 @@ # requires-python = ">=3.9" # dependencies = [ # "pync", -# "requests", +# "requests", # "python-dotenv", # "kokoro-onnx", # "pydub", @@ -50,7 +50,7 @@ class KokoroTTSProvider: """Minimal embedded Kokoro TTS provider""" - + def __init__(self, models_dir: Path): self.models_dir = Path(models_dir) self.model_path = self.models_dir / "kokoro-v1.0.onnx" @@ -60,7 +60,7 @@ def __init__(self, models_dir: Path): self.speed = 1.0 self.format = "mp3" # Default to MP3 self.mp3_bitrate = "128k" - + def is_available(self) -> bool: """Check if Kokoro is available""" try: @@ -71,9 +71,10 @@ def is_available(self) -> bool: if not self.voices_path.exists(): logger.warning(f"Kokoro voices not found: {self.voices_path}") return False - + # Try to import kokoro from kokoro_onnx import Kokoro + self._kokoro = Kokoro(str(self.model_path), str(self.voices_path)) logger.debug(f"Kokoro TTS initialized successfully with models from {self.models_dir}") return True @@ -83,11 +84,11 @@ def is_available(self) -> bool: except Exception as e: logger.error(f"Kokoro initialization failed: {e}") return False - + def get_cache_key(self, text: str) -> str: """Generate cache key for text""" return hashlib.md5(text.encode()).hexdigest()[:16] - + def get_file_extension(self) -> str: """Get file extension based on format""" if self.format == "mp3": @@ -96,50 +97,52 @@ def get_file_extension(self) -> str: return ".aiff" else: return ".wav" - + def generate(self, text: str, output_path: Path) -> bool: """Generate TTS audio""" try: if not self._kokoro: from kokoro_onnx import Kokoro + self._kokoro = Kokoro(str(self.model_path), str(self.voices_path)) - + # Generate audio with Kokoro (returns numpy array and sample rate) audio_array, sample_rate = self._kokoro.create( - text=text, - voice=self.voice, - speed=self.speed + text=text, voice=self.voice, speed=self.speed ) - + # Convert numpy array to WAV bytes import soundfile + wav_buffer = io.BytesIO() - soundfile.write(wav_buffer, audio_array, sample_rate, format='WAV') + soundfile.write(wav_buffer, audio_array, sample_rate, format="WAV") audio_data = wav_buffer.getvalue() - + # Save based on format if self.format == "mp3": # Convert to MP3 using pydub try: from pydub import AudioSegment + audio = AudioSegment.from_wav(io.BytesIO(audio_data)) audio.export(str(output_path), format="mp3", bitrate=self.mp3_bitrate) except ImportError: # Fallback to WAV if pydub not available output_path = output_path.with_suffix(".wav") - with open(output_path, 'wb') as f: + with open(output_path, "wb") as f: f.write(audio_data) else: # Save as WAV - with open(output_path, 'wb') as f: + with open(output_path, "wb") as f: f.write(audio_data) - + return True - + except Exception as e: logger.error(f"TTS generation failed: {e}") return False + # Configuration BASE_DIR = Path.home() / ".claude" / "ccnotify" SOUNDS_DIR = BASE_DIR / "sounds" @@ -162,13 +165,24 @@ def generate(self, text: str, output_path: Path) -> bool: # KOKORO_PATH is now determined from config file KOKORO_SPEED = os.getenv("KOKORO_SPEED", "1.0") # Speed multiplier (0.5-2.0) + # Create a no-op logger class for when logging is disabled class NoOpLogger: - def debug(self, msg, *args, **kwargs): pass - def info(self, msg, *args, **kwargs): pass - def warning(self, msg, *args, **kwargs): pass - def error(self, msg, *args, **kwargs): pass - def critical(self, msg, *args, **kwargs): pass + def debug(self, msg, *args, **kwargs): + pass + + def info(self, msg, *args, **kwargs): + pass + + def warning(self, msg, *args, **kwargs): + pass + + def error(self, msg, *args, **kwargs): + pass + + def critical(self, msg, *args, **kwargs): + pass + # Setup logging (will be configured properly in setup_logging()) logger = NoOpLogger() @@ -180,39 +194,40 @@ def critical(self, msg, *args, **kwargs): pass def setup_logging(enable_logging=None): """Setup logging based on --logging flag or USE_LOGGING environment variable""" global logger - + # Command-line flag takes precedence over environment variable if enable_logging is None: enable_logging = USE_LOGGING - + if enable_logging: try: # Create logs directory only if logging is enabled LOGS_DIR.mkdir(parents=True, exist_ok=True) - + log_file = LOGS_DIR / f"notifications_{datetime.datetime.now().strftime('%Y%m%d')}.log" - + # Check log file size and rotate if needed (max 10MB) max_size = 10 * 1024 * 1024 # 10MB if log_file.exists() and log_file.stat().st_size > max_size: # Rotate the log file - backup_file = log_file.with_suffix(f'.{datetime.datetime.now().strftime("%H%M%S")}.log') + backup_file = log_file.with_suffix( + f'.{datetime.datetime.now().strftime("%H%M%S")}.log' + ) log_file.rename(backup_file) - + # Clean up old log files (keep last 5) - log_files = sorted(LOGS_DIR.glob("notifications_*.log"), key=lambda f: f.stat().st_mtime) + log_files = sorted( + LOGS_DIR.glob("notifications_*.log"), key=lambda f: f.stat().st_mtime + ) if len(log_files) > 5: for old_file in log_files[:-5]: old_file.unlink() - + logging.basicConfig( level=logging.DEBUG, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(log_file), - logging.StreamHandler(sys.stderr) - ], - force=True # Force reconfiguration + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[logging.FileHandler(log_file), logging.StreamHandler(sys.stderr)], + force=True, # Force reconfiguration ) logger = logging.getLogger(__name__) logger.info("Logging enabled") @@ -229,7 +244,7 @@ def load_project_cache() -> Dict[str, Dict[str, Any]]: """Load the project cache from disk""" if CACHE_FILE.exists(): try: - with open(CACHE_FILE, 'r') as f: + with open(CACHE_FILE, "r") as f: return json.load(f) except Exception as e: logger.warning(f"Failed to load cache: {e}") @@ -239,7 +254,7 @@ def load_project_cache() -> Dict[str, Dict[str, Any]]: def save_project_cache(cache: Dict[str, Dict[str, Any]]): """Save the project cache to disk""" try: - with open(CACHE_FILE, 'w') as f: + with open(CACHE_FILE, "w") as f: json.dump(cache, f, indent=2) except Exception as e: logger.warning(f"Failed to save cache: {e}") @@ -249,12 +264,12 @@ def clean_old_cache_entries(cache: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[ """Remove cache entries older than CACHE_EXPIRY_DAYS""" current_time = time.time() expiry_seconds = CACHE_EXPIRY_DAYS * 24 * 60 * 60 - + cleaned_cache = {} for session_id, data in cache.items(): - if current_time - data.get('timestamp', 0) < expiry_seconds: + if current_time - data.get("timestamp", 0) < expiry_seconds: cleaned_cache[session_id] = data - + return cleaned_cache @@ -263,14 +278,14 @@ def decode_project_folder_name(folder_name: str) -> Optional[str]: Decode project folder name to actual path -Users-helmi-code-paperless-ai → /Users/helmi/code/paperless-ai """ - if not folder_name.startswith('-'): + if not folder_name.startswith("-"): return None - + # Remove leading dash and split - parts = folder_name[1:].split('-') - + parts = folder_name[1:].split("-") + # Reconstruct path with proper separators - path = '/' + '/'.join(parts) + path = "/" + "/".join(parts) return path @@ -288,49 +303,35 @@ def auto_add_project_to_replacements(project_name: str, folder_name: str = None) """Automatically add project to replacements.json if not present""" try: replacements = load_replacements() - project_replacements = replacements.get("project_names", {}).get("replacements", {}) - + projects = replacements.get("projects", {}) + # Check if project already exists (case-insensitive) - exists = any(project_name.lower() == key.lower() for key in project_replacements.keys()) - + exists = any(project_name.lower() == key.lower() for key in projects.keys()) + if not exists: # Create pronunciation-friendly version (replace hyphens with spaces) - friendly_name = project_name.replace('-', ' ') - - # Add to replacements - project_replacements[project_name] = friendly_name - - # Ensure structure exists - if "project_names" not in replacements: - replacements["project_names"] = {} - replacements["project_names"]["replacements"] = project_replacements - - # Add metadata comment if it doesn't exist - if "comment" not in replacements["project_names"]: - replacements["project_names"]["comment"] = "Maps project names to human-friendly names. Keys are matched case-insensitively." - - # Add info about auto-discovery - if "_auto_discovered" not in replacements: - replacements["_auto_discovered"] = { - "comment": "Projects below were auto-discovered. You can customize their pronunciation by editing the values." - } - - if "projects" not in replacements["_auto_discovered"]: - replacements["_auto_discovered"]["projects"] = {} - - # Track what was auto-discovered with folder reference - replacements["_auto_discovered"]["projects"][project_name] = { + friendly_name = project_name.replace("-", " ") + + # Add to projects section with rich metadata + projects[project_name] = { "folder": folder_name, "display_name": project_name, - "pronunciation": friendly_name + "pronunciation": friendly_name, } - + + # Ensure structure exists + if "projects" not in replacements: + replacements["projects"] = {} + replacements["projects"] = projects + # Save back to file - with open(REPLACEMENTS_FILE, 'w') as f: + with open(REPLACEMENTS_FILE, "w") as f: json.dump(replacements, f, indent=2) - - logger.info(f"Auto-added project '{project_name}' to replacements (pronounce as '{friendly_name}')") - + + logger.info( + f"Auto-added project '{project_name}' to replacements (pronounce as '{friendly_name}')" + ) + except Exception as e: logger.warning(f"Failed to auto-add project to replacements: {e}") @@ -340,16 +341,18 @@ def resolve_project_name(session_id: str, cwd: str = None) -> str: # Load and clean cache cache = load_project_cache() cache = clean_old_cache_entries(cache) - + # Check if we have a cached result if session_id in cache: - cached_name = cache[session_id]['project_name'] - + cached_name = cache[session_id]["project_name"] + # If we have cwd, validate the cache is still correct - if cwd and 'project_path' in cache[session_id]: - cached_path = cache[session_id]['project_path'] + if cwd and "project_path" in cache[session_id]: + cached_path = cache[session_id]["project_path"] if not is_cwd_under_project(cwd, cached_path): - logger.warning(f"Cached project {cached_name} doesn't match cwd {cwd}, re-resolving") + logger.warning( + f"Cached project {cached_name} doesn't match cwd {cwd}, re-resolving" + ) # Continue to re-resolve else: logger.debug(f"Found cached project name for session {session_id}: {cached_name}") @@ -357,35 +360,44 @@ def resolve_project_name(session_id: str, cwd: str = None) -> str: else: logger.debug(f"Found cached project name for session {session_id}: {cached_name}") return cached_name - + # Search for the session file in project folders try: session_file = f"{session_id}.jsonl" pattern = str(PROJECTS_DIR / "*" / session_file) matches = glob.glob(pattern) - + if matches: # Get the project folder name project_path = Path(matches[0]).parent folder_name = project_path.name - + # Decode the folder name to get actual project path actual_project_path = decode_project_folder_name(folder_name) - + if actual_project_path: # Extract meaningful project name from the actual path # For paths like /Users/helmi/code/agent/zero, we want "zero" or "agent-zero" path_parts = Path(actual_project_path).parts - + # Look for common project parent directories - common_parents = ["code", "projects", "dev", "work", "repos", "src", "Documents", "Desktop"] + common_parents = [ + "code", + "projects", + "dev", + "work", + "repos", + "src", + "Documents", + "Desktop", + ] project_name = Path(actual_project_path).name # Default to last part - + # Try to find a more meaningful project name for i, part in enumerate(path_parts): if part in common_parents and i + 1 < len(path_parts): # Use everything after the common parent as the project name - remaining_parts = path_parts[i + 1:] + remaining_parts = path_parts[i + 1 :] if len(remaining_parts) == 1: project_name = remaining_parts[0] else: @@ -398,84 +410,171 @@ def resolve_project_name(session_id: str, cwd: str = None) -> str: # For deeper nesting, just use the last part project_name = remaining_parts[-1] break - + # Validate against cwd if provided if cwd and not is_cwd_under_project(cwd, actual_project_path): - logger.warning(f"CWD {cwd} doesn't appear to be under project {actual_project_path}") + logger.warning( + f"CWD {cwd} doesn't appear to be under project {actual_project_path}" + ) # Still proceed but log the warning - + # Auto-add to replacements if not present auto_add_project_to_replacements(project_name, folder_name) - + # Cache the result cache[session_id] = { - 'project_name': project_name, - 'timestamp': time.time(), - 'project_path': actual_project_path, - 'claude_folder': folder_name + "project_name": project_name, + "timestamp": time.time(), + "project_path": actual_project_path, + "claude_folder": folder_name, } save_project_cache(cache) - - logger.debug(f"Resolved project name for session {session_id}: {project_name} (from {actual_project_path})") + + logger.debug( + f"Resolved project name for session {session_id}: {project_name} (from {actual_project_path})" + ) return project_name else: logger.warning(f"Could not decode project folder name: {folder_name}") - + except Exception as e: logger.warning(f"Error resolving project name: {e}") - + return "unknown" +def migrate_replacements_format(old_data: Dict[str, Any]) -> Dict[str, Any]: + """Migrate from old replacements format to new unified structure""" + new_data = {"projects": {}, "commands": {}, "patterns": []} + + # Migrate auto-discovered projects + if "_auto_discovered" in old_data: + auto_projects = old_data["_auto_discovered"].get("projects", {}) + for proj_name, proj_info in auto_projects.items(): + new_data["projects"][proj_name] = { + "folder": proj_info.get("folder"), + "display_name": proj_info.get("display_name", proj_name), + "pronunciation": proj_info.get("pronunciation", proj_name), + } + + # Migrate project_names replacements (if not already in projects) + if "project_names" in old_data: + project_replacements = old_data["project_names"].get("replacements", {}) + for proj_name, pronunciation in project_replacements.items(): + if proj_name not in new_data["projects"]: + new_data["projects"][proj_name] = { + "folder": None, # Will be filled on next auto-discovery + "display_name": proj_name, + "pronunciation": pronunciation, + } + + # Migrate commands + if "commands" in old_data: + cmd_replacements = old_data["commands"].get("replacements", {}) + new_data["commands"] = cmd_replacements + + # Migrate patterns + if "patterns" in old_data: + pattern_replacements = old_data["patterns"].get("replacements", []) + new_data["patterns"] = pattern_replacements + + # Add default commands if empty + if not new_data["commands"]: + new_data["commands"] = { + "ls": "list", + "cd": "change directory", + "rm": "remove", + "mkdir": "make directory", + "npm": "N P M", + "uvx": "U V X", + } + + # Add default patterns if empty + if not new_data["patterns"]: + new_data["patterns"] = [ + {"pattern": "npm run (\\w+)", "replacement": "N P M run {1}"}, + {"pattern": "git (push|pull|commit)", "replacement": "git {1}"}, + {"pattern": "(.+)\\.py", "replacement": "{1} python file"}, + ] + + return new_data + + def load_replacements() -> Dict[str, Any]: - """Load replacements configuration""" + """Load replacements configuration with automatic migration from old format""" if REPLACEMENTS_FILE.exists(): try: - with open(REPLACEMENTS_FILE, 'r') as f: - return json.load(f) + with open(REPLACEMENTS_FILE, "r") as f: + data = json.load(f) + + # Check if migration is needed (old format detection) + needs_migration = False + if "project_names" in data and "replacements" in data.get("project_names", {}): + needs_migration = True + elif "_auto_discovered" in data: + needs_migration = True + + if needs_migration: + # Migrate from old format to new format + migrated = migrate_replacements_format(data) + # Save migrated format + with open(REPLACEMENTS_FILE, "w") as f: + json.dump(migrated, f, indent=2) + logger.info("Migrated replacements.json to new format") + return migrated + + return data except Exception as e: logger.warning(f"Failed to load replacements: {e}") + + # Return new default structure return { - "project_names": {"replacements": {}}, - "commands": {"replacements": {}}, - "patterns": {"replacements": []} + "projects": {}, + "commands": { + "ls": "list", + "cd": "change directory", + "rm": "remove", + "mkdir": "make directory", + "npm": "N P M", + "uvx": "U V X", + }, + "patterns": [ + {"pattern": "npm run (\\w+)", "replacement": "N P M run {1}"}, + {"pattern": "git (push|pull|commit)", "replacement": "git {1}"}, + {"pattern": "(.+)\\.py", "replacement": "{1} python file"}, + ], } def apply_project_name_replacement(project_name: str, replacements: Dict[str, Any]) -> str: """Apply project name replacement for display""" - project_replacements = replacements.get("project_names", {}).get("replacements", {}) - + # Look up in projects section + projects = replacements.get("projects", {}) + # Case-insensitive matching - for original, replacement in project_replacements.items(): - if project_name.lower() == original.lower(): - return replacement - + for proj_key, proj_data in projects.items(): + if project_name.lower() == proj_key.lower(): + return proj_data.get("display_name", project_name) + return project_name def get_project_pronunciation(project_name: str, replacements: Dict[str, Any]) -> str: """Get the pronunciation for a project name for TTS""" - # First check auto-discovered pronunciations - auto_discovered = replacements.get("_auto_discovered", {}).get("projects", {}) - for proj_key, proj_data in auto_discovered.items(): + # Look up in projects section + projects = replacements.get("projects", {}) + for proj_key, proj_data in projects.items(): if proj_key.lower() == project_name.lower(): return proj_data.get("pronunciation", project_name) - - # Fall back to project_names replacements - project_replacements = replacements.get("project_names", {}).get("replacements", {}) - for original, replacement in project_replacements.items(): - if project_name.lower() == original.lower(): - return replacement - + return project_name def apply_command_replacement(command: str, replacements: Dict[str, Any]) -> str: """Apply command replacement for audio""" - cmd_replacements = replacements.get("commands", {}).get("replacements", {}) - pattern_replacements = replacements.get("patterns", {}).get("replacements", []) - + cmd_replacements = replacements.get("commands", {}) + pattern_replacements = replacements.get("patterns", []) + # First check pattern replacements for pattern_config in pattern_replacements: pattern = pattern_config.get("pattern", "") @@ -488,14 +587,14 @@ def apply_command_replacement(command: str, replacements: Dict[str, Any]) -> str for i, group in enumerate(match.groups(), 1): result = result.replace(f"${i}", group) return result - + # Then check direct command replacements cmd_parts = command.split() if cmd_parts: base_cmd = cmd_parts[0] if base_cmd in cmd_replacements: return cmd_replacements[base_cmd] - + # Return the original command if no replacement found return f"running {cmd_parts[0]}" if cmd_parts else "running command" @@ -505,13 +604,13 @@ def __init__(self): self.sounds_cache = {} self.tts_provider = None self._init_tts_provider() - + def _init_tts_provider(self): """Initialize TTS provider""" if not USE_TTS: logger.debug("TTS disabled") return - + try: # Load configuration file config_file = BASE_DIR / "config.json" @@ -522,18 +621,18 @@ def _init_tts_provider(self): config = json.load(f) except Exception: pass - + # Use embedded Kokoro provider if TTS_PROVIDER == "kokoro": models_dir = config.get("models_dir", str(BASE_DIR / "models")) self.tts_provider = KokoroTTSProvider(models_dir) - + # Configure from environment/config self.tts_provider.voice = config.get("voice", KOKORO_VOICE) self.tts_provider.speed = float(config.get("speed", KOKORO_SPEED)) self.tts_provider.format = config.get("format", "mp3").lower() self.tts_provider.mp3_bitrate = config.get("mp3_bitrate", "128k") - + if self.tts_provider.is_available(): logger.info(f"Initialized embedded Kokoro TTS provider") else: @@ -542,22 +641,18 @@ def _init_tts_provider(self): else: logger.warning(f"Unsupported TTS provider: {TTS_PROVIDER}") self.tts_provider = None - + except Exception as e: logger.error(f"Failed to initialize TTS provider: {e}") self.tts_provider = None - + def notify(self, title: str, message: str, sound_name: str = "Glass"): """Display macOS notification - simple and clean""" try: - pync.notify( - message=message, - title=title, - sound=sound_name - ) + pync.notify(message=message, title=title, sound=sound_name) except Exception as e: logger.error(f"Failed to send notification: {e}") - + def play_sound_file(self, sound_path: Path): """Play a sound file using macOS afplay""" if sound_path.exists(): @@ -565,38 +660,40 @@ def play_sound_file(self, sound_path: Path): subprocess.Popen(["afplay", str(sound_path)]) except Exception as e: logger.error(f"Failed to play sound: {e}") - + def get_notification_sound(self, event_type: str, custom_text: str = "") -> Optional[Path]: """Get or generate sound for notification""" if not USE_TTS or not self.tts_provider: logger.warning("TTS not available - no sound will be played") return None - + # Default texts for each event type default_texts = { "tool_activity": "Tool activity", - "execution_complete": "Task complete", + "execution_complete": "Task complete", "subagent_done": "Sub agent done", "error": "Error occurred", "tool_blocked": "Tool blocked", "compaction": "Compacting context", - "input_needed": "Input needed" + "input_needed": "Input needed", } - + # Determine text to speak - text_to_speak = custom_text if custom_text else default_texts.get(event_type, "Claude notification") + text_to_speak = ( + custom_text if custom_text else default_texts.get(event_type, "Claude notification") + ) logger.debug(f"TTS text: '{text_to_speak}'") - + # Generate cache key and file path cache_key = self.tts_provider.get_cache_key(text_to_speak) file_extension = self.tts_provider.get_file_extension() sound_file = SOUNDS_DIR / f"{event_type}_{cache_key}{file_extension}" - + # Use cached file if exists if sound_file.exists(): logger.debug(f"Using cached sound: {sound_file}") return sound_file - + # Generate new sound file try: success = self.tts_provider.generate(text_to_speak, sound_file) @@ -606,31 +703,31 @@ def get_notification_sound(self, event_type: str, custom_text: str = "") -> Opti else: logger.warning(f"TTS generation failed") return None - + except Exception as e: logger.error(f"TTS generation error: {e}") return None - + def handle_hook(self, hook_data: Dict[str, Any]): """Process hook data and generate appropriate notification""" # Claude Code uses "event" field, not "hook_event_name" hook_type = hook_data.get("event", hook_data.get("hook_event_name", "unknown")) logger.info(f"Processing hook type: {hook_type}") - + # Log all available fields for this hook type logger.debug(f"Available fields: {list(hook_data.keys())}") - + event_type = None message = None custom_tts = None - + # Load replacements configuration replacements = load_replacements() - + # Extract session context - Claude Code uses camelCase "sessionId" session_id = hook_data.get("sessionId", hook_data.get("session_id", "unknown")) cwd = hook_data.get("cwd", os.getcwd()) # Fall back to current working directory - + # If no session ID but we have cwd, try to extract project name from path if session_id == "unknown" and cwd: # Try to extract project name from cwd @@ -644,50 +741,72 @@ def handle_hook(self, hook_data: Dict[str, Any]): project_name = Path(cwd).name else: project_name = Path(cwd).name - + logger.info(f"No session ID, extracted project from cwd: {project_name}") else: project_name = resolve_project_name(session_id, cwd) logger.info(f"Resolved project name from session: {project_name}") - + # Apply project name replacement for display and TTS display_project_name = apply_project_name_replacement(project_name, replacements) tts_project_name = get_project_pronunciation(project_name, replacements) - + cwd_name = Path(cwd).name if cwd else "unknown" - + if hook_type == "PreToolUse": # Claude Code sends "tool" not "tool_name" tool_name = hook_data.get("tool", hook_data.get("tool_name", "unknown")) - + # Only notify for truly dangerous operations # Skip notifications for common safe operations if tool_name == "Bash": # Claude Code sends "parameters" not "tool_input" - command = hook_data.get("parameters", hook_data.get("tool_input", {})).get("command", "") + command = hook_data.get("parameters", hook_data.get("tool_input", {})).get( + "command", "" + ) # Skip common safe commands - safe_prefixes = ["echo", "pwd", "ls", "cat", "head", "tail", "grep", "find", "which"] + safe_prefixes = [ + "echo", + "pwd", + "ls", + "cat", + "head", + "tail", + "grep", + "find", + "which", + ] if any(command.strip().startswith(prefix) for prefix in safe_prefixes): return # Skip notification - + # Only notify for potentially dangerous commands - dangerous_prefixes = ["rm", "mv", "cp", "sudo", "chmod", "chown", ">", "curl", "wget"] + dangerous_prefixes = [ + "rm", + "mv", + "cp", + "sudo", + "chmod", + "chown", + ">", + "curl", + "wget", + ] if any(prefix in command for prefix in dangerous_prefixes): event_type = "tool_activity" # Extract the command and first argument for cleaner message # Split only up to pipe to avoid getting arguments from piped commands - base_command = command.split('|')[0] if '|' in command else command + base_command = command.split("|")[0] if "|" in command else command cmd_parts = base_command.split() cmd_summary = cmd_parts[0] if cmd_parts else "command" - + # Create natural TTS descriptions target = None - + # Special handling for curl/wget - don't extract file names from URLs - if cmd_summary in ['curl', 'wget']: + if cmd_summary in ["curl", "wget"]: # For curl/wget, only show the domain if it's a URL for part in cmd_parts[1:]: - if part.startswith('http://') or part.startswith('https://'): + if part.startswith("http://") or part.startswith("https://"): # Extract just the domain try: parsed = urlparse(part) @@ -695,27 +814,37 @@ def handle_hook(self, hook_data: Dict[str, Any]): except: pass break - elif not part.startswith('-'): + elif not part.startswith("-"): # If it's a file path (not a URL), show the filename - if '/' not in part and '.' in part: + if "/" not in part and "." in part: target = part break elif len(cmd_parts) > 1 and cmd_parts[1]: # Get just the filename/target, not full path potential_target = cmd_parts[1] - + # Skip if it looks like code or complex syntax - if any(char in potential_target for char in ['(', ')', '[', ']', '{', '}', '"', "'", '|', ';']): + if any( + char in potential_target + for char in ["(", ")", "[", "]", "{", "}", '"', "'", "|", ";"] + ): target = None - elif potential_target.startswith('-'): + elif potential_target.startswith("-"): # It's a flag, try to find a real target for part in cmd_parts[2:]: - if not part.startswith('-') and not any(char in part for char in ['(', ')', '[', ']', '{', '}', '"', "'", '|', ';']): - target = Path(part).name if '/' in part else part + if not part.startswith("-") and not any( + char in part + for char in ["(", ")", "[", "]", "{", "}", '"', "'", "|", ";"] + ): + target = Path(part).name if "/" in part else part break else: - target = Path(potential_target).name if '/' in potential_target else potential_target - + target = ( + Path(potential_target).name + if "/" in potential_target + else potential_target + ) + # Generate human-friendly descriptions for TTS tts_descriptions = { "rm": f"removing {target}" if target else "removing files", @@ -724,12 +853,18 @@ def handle_hook(self, hook_data: Dict[str, Any]): "mv": f"moving {target}" if target else "moving files", "cp": f"copying {target}" if target else "copying files", "sudo": "running with admin privileges", - "chmod": f"changing permissions on {target}" if target else "changing permissions", - "chown": f"changing ownership of {target}" if target else "changing ownership", + "chmod": ( + f"changing permissions on {target}" + if target + else "changing permissions" + ), + "chown": ( + f"changing ownership of {target}" if target else "changing ownership" + ), "curl": "fetching data", - "wget": "downloading file" + "wget": "downloading file", } - + # Check for specific command patterns if cmd_summary == "rm" and "-rf" in command: audio_desc = tts_descriptions["rm -rf"] @@ -739,25 +874,27 @@ def handle_hook(self, hook_data: Dict[str, Any]): audio_desc = tts_descriptions[cmd_summary] else: audio_desc = f"running {cmd_summary}" - + # Build messages if target and len(target) < 50: # Sanity check on target length message = f"[{display_project_name}] Running {cmd_summary} on {target}" else: message = f"[{display_project_name}] Running {cmd_summary}" - + # Natural TTS message custom_tts = f"{tts_project_name}, {audio_desc}" - + # Skip most file edits unless they're system files elif tool_name in ["Write", "MultiEdit", "Edit"]: # Claude Code sends "parameters" not "tool_input" - file_path = hook_data.get("parameters", hook_data.get("tool_input", {})).get("file_path", "") + file_path = hook_data.get("parameters", hook_data.get("tool_input", {})).get( + "file_path", "" + ) # Only notify for system/config files if any(x in file_path for x in ["/etc/", "/usr/", ".env", "config", "secret"]): event_type = "tool_activity" file_name = Path(file_path).name - + # More natural descriptions based on tool if tool_name == "Write": action_desc = "writing" @@ -765,54 +902,59 @@ def handle_hook(self, hook_data: Dict[str, Any]): action_desc = "multi-editing" else: action_desc = "editing" - + message = f"[{display_project_name}] {action_desc.capitalize()} {file_name}" custom_tts = f"{tts_project_name}, {action_desc} {file_name}" - + elif hook_type == "PostToolUse": # Check for errors in tool response # Claude Code might send "response" or "tool_response" tool_response = hook_data.get("response", hook_data.get("tool_response", {})) # Claude Code sends "tool" not "tool_name" tool_name = hook_data.get("tool", hook_data.get("tool_name", "unknown")) - + # Debug log the tool response structure - logger.debug(f"PostToolUse response for {tool_name}: {json.dumps(tool_response, indent=2) if isinstance(tool_response, dict) else tool_response}") - + logger.debug( + f"PostToolUse response for {tool_name}: {json.dumps(tool_response, indent=2) if isinstance(tool_response, dict) else tool_response}" + ) + # Check if response indicates an error (type: "error" or has error field) - if (isinstance(tool_response, dict) and - (tool_response.get("type") == "error" or "error" in tool_response)): + if isinstance(tool_response, dict) and ( + tool_response.get("type") == "error" or "error" in tool_response + ): event_type = "error" - error_msg = tool_response.get("error", tool_response.get("message", "Unknown error")) + error_msg = tool_response.get( + "error", tool_response.get("message", "Unknown error") + ) message = f"[{display_project_name}] Error in {tool_name}: {str(error_msg)[:100]}" custom_tts = f"{tts_project_name}, {tool_name} failed" - + elif hook_type == "Stop": event_type = "execution_complete" message = f"[{display_project_name}] Task complete" custom_tts = f"{tts_project_name}, task completed successfully" - + elif hook_type == "SubagentStop": event_type = "subagent_done" message = f"[{display_project_name}] Subagent finished" custom_tts = f"{tts_project_name}, sub agent finished" - + elif hook_type == "PreCompact": event_type = "compaction" message = "Context compaction starting" - + elif hook_type == "Notification": # Handle user input/confirmation needed scenarios notif_type = hook_data.get("notification_type", "") raw_message = hook_data.get("message", "") - + if "error" in notif_type.lower(): event_type = "error" message = hook_data.get("message", "An error occurred") else: # This is likely a permission/confirmation request event_type = "input_needed" - + # Extract what Claude needs permission for if "permission to use" in raw_message: # Extract tool name from "Claude needs your permission to use [Tool]" @@ -827,20 +969,17 @@ def handle_hook(self, hook_data: Dict[str, Any]): else: message = f"[{display_project_name}] Input needed" custom_tts = f"{tts_project_name}, input needed" - + # Send notification if we have an event if event_type and message: logger.info(f"Sending notification: event_type={event_type}, message={message}") - + # Extract the actual message without the project prefix - clean_message = message.split('] ', 1)[-1] if '] ' in message else message - + clean_message = message.split("] ", 1)[-1] if "] " in message else message + # Simple and clean: project as title, message as content - self.notify( - title=display_project_name, - message=clean_message - ) - + self.notify(title=display_project_name, message=clean_message) + # Play sound if available sound_file = self.get_notification_sound(event_type, custom_tts or "") if sound_file: @@ -854,10 +993,11 @@ def main(): """Main notification handler entry point""" # Parse command-line arguments import argparse + parser = argparse.ArgumentParser(description="CCNotify notification handler") parser.add_argument("--logging", action="store_true", help="Enable logging to file") args, unknown = parser.parse_known_args() - + # Load .env file if available try: env_file = BASE_DIR / ".env" @@ -876,12 +1016,12 @@ def main(): KOKORO_SPEED = os.getenv("KOKORO_SPEED", "1.0") except ImportError: pass - + # Setup logging based on command-line flag or environment variable setup_logging(enable_logging=args.logging) - + handler = NotificationHandler() - + # Check if running interactively (for testing) if sys.stdin.isatty(): # Test mode @@ -900,10 +1040,10 @@ def main(): try: raw_input = sys.stdin.read() logger.info(f"Raw input received: {raw_input}") - + hook_data = json.loads(raw_input) logger.info(f"Parsed hook data: {json.dumps(hook_data, indent=2)}") - + handler.handle_hook(hook_data) except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON input: {e}") @@ -915,4 +1055,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/ccnotify/setup.py b/src/ccnotify/setup.py index 6f5a63b..1dcd66d 100644 --- a/src/ccnotify/setup.py +++ b/src/ccnotify/setup.py @@ -27,22 +27,22 @@ def download_with_progress(url: str, output_path: Path, expected_size: int = Non """Download a file with progress bar""" try: print(f"Downloading {output_path.name}...") - + response = requests.get(url, stream=True) response.raise_for_status() - - total_size = expected_size or int(response.headers.get('content-length', 0)) - - with open(output_path, 'wb') as f: - with tqdm(total=total_size, unit='B', unit_scale=True, desc=output_path.name) as pbar: + + total_size = expected_size or int(response.headers.get("content-length", 0)) + + with open(output_path, "wb") as f: + with tqdm(total=total_size, unit="B", unit_scale=True, desc=output_path.name) as pbar: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) pbar.update(len(chunk)) - + print(f"✓ Downloaded {output_path.name}") return True - + except Exception as e: print(f"✗ Failed to download {output_path.name}: {e}") if output_path.exists(): @@ -54,12 +54,12 @@ def verify_file_hash(file_path: Path, expected_hash: str) -> bool: """Verify file integrity using SHA256 hash""" if not file_path.exists(): return False - + sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): sha256_hash.update(chunk) - + return sha256_hash.hexdigest() == expected_hash @@ -80,11 +80,11 @@ def save_model_info(models_dir: Path, release_info: dict): info = { "version": release_info.get("tag_name", "unknown"), "updated_at": datetime.now().isoformat(), - "release_date": release_info.get("published_at", "unknown") + "release_date": release_info.get("published_at", "unknown"), } - + info_file = models_dir / "model_info.json" - with open(info_file, 'w') as f: + with open(info_file, "w") as f: json.dump(info, f, indent=2) @@ -93,7 +93,7 @@ def get_current_model_info(models_dir: Path) -> dict: info_file = models_dir / "model_info.json" if info_file.exists(): try: - with open(info_file, 'r') as f: + with open(info_file, "r") as f: return json.load(f) except Exception: pass @@ -103,11 +103,11 @@ def get_current_model_info(models_dir: Path) -> dict: def setup_kokoro(force_download: bool = False) -> bool: """Download and setup Kokoro TTS models""" print("🔧 Setting up Kokoro TTS...") - + # Create models directory models_dir = Path("models") models_dir.mkdir(exist_ok=True) - + # Model files with expected sizes models = { "kokoro-v1.0.onnx": { @@ -116,60 +116,57 @@ def setup_kokoro(force_download: bool = False) -> bool: }, "voices-v1.0.bin": { "url": "https://github.com/thewh1teagle/kokoro-onnx/releases/download/model-files-v1.0/voices-v1.0.bin", - "size": 28214398, # ~27MB - } + "size": 28214398, # ~27MB + }, } - + # Check if models already exist models_exist = all((models_dir / filename).exists() for filename in models.keys()) - + if models_exist and not force_download: print("✅ Kokoro models already installed (use --force to reinstall)") return True - + success = True - + for filename, info in models.items(): file_path = models_dir / filename - + # Skip if file exists and not forcing download if file_path.exists() and not force_download: print(f"✓ {filename} already exists") continue - + # Download the file if not download_with_progress(info["url"], file_path, info["size"]): success = False continue - + # Verify file size actual_size = file_path.stat().st_size if actual_size != info["size"]: print(f"✗ {filename} size mismatch: expected {info['size']}, got {actual_size}") success = False - + if success: # Save version info latest_release = get_latest_model_info() if latest_release: save_model_info(models_dir, latest_release) - + print("✅ Kokoro TTS setup completed successfully!") print("\nTo use Kokoro TTS:") print("1. Set TTS_PROVIDER=kokoro in your .env file") print("2. Configure KOKORO_VOICE (e.g., af_heart, am_adam)") print("3. Optionally set KOKORO_SPEED (0.5-2.0)") - + # Test installation print("\n🧪 Testing installation...") try: from .tts.kokoro import KokoroProvider + # Create proper config for KokoroProvider - test_config = { - "models_dir": str(models_dir), - "voice": "af_heart", - "speed": 1.0 - } + test_config = {"models_dir": str(models_dir), "voice": "af_heart", "speed": 1.0} provider = KokoroProvider(test_config) if provider.is_available(): print("✅ Kokoro TTS installation verified!") @@ -182,35 +179,63 @@ def setup_kokoro(force_download: bool = False) -> bool: print(f"⚠️ Installation test failed: {e}") else: print("❌ Kokoro TTS setup failed") - + return success def list_voices() -> None: """List available Kokoro voices""" voices = { - "English (Female)": ["af_alloy", "af_aoede", "af_bella", "af_heart", "af_jessica", - "af_kore", "af_nicole", "af_nova", "af_river", "af_sarah", "af_sky"], - "English (Male)": ["am_adam", "am_echo", "am_eric", "am_fenrir", "am_liam", - "am_michael", "am_onyx", "am_puck", "am_santa"], + "English (Female)": [ + "af_alloy", + "af_aoede", + "af_bella", + "af_heart", + "af_jessica", + "af_kore", + "af_nicole", + "af_nova", + "af_river", + "af_sarah", + "af_sky", + ], + "English (Male)": [ + "am_adam", + "am_echo", + "am_eric", + "am_fenrir", + "am_liam", + "am_michael", + "am_onyx", + "am_puck", + "am_santa", + ], "British English (Female)": ["bf_alice", "bf_emma", "bf_isabella", "bf_lily"], "British English (Male)": ["bm_daniel", "bm_fable", "bm_george", "bm_lewis"], "French": ["ff_siwis"], "Italian": ["if_sara", "im_nicola"], "Japanese": ["jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo"], - "Chinese": ["zf_xiaobei", "zf_xiaoni", "zf_xiaoxiao", "zf_xiaoyi", - "zm_yunjian", "zm_yunxi", "zm_yunxia", "zm_yunyang"] + "Chinese": [ + "zf_xiaobei", + "zf_xiaoni", + "zf_xiaoxiao", + "zf_xiaoyi", + "zm_yunjian", + "zm_yunxi", + "zm_yunxia", + "zm_yunyang", + ], } - + print("🎤 Available Kokoro TTS Voices:") print() - + for category, voice_list in voices.items(): print(f"{category}:") for voice in voice_list: print(f" • {voice}") print() - + print("💡 Voice Blending Examples:") print(" • af_heart:60,am_adam:40 (60% Heart + 40% Adam)") print(" • af_bella:80,af_nova:20 (80% Bella + 20% Nova)") @@ -221,39 +246,39 @@ def check_and_update() -> bool: Check for updates to both package and models, guide user through updates """ print("🔍 Checking for updates...") - + # Check package version try: import subprocess + result = subprocess.run( - ["pip", "show", "ccnotify"], - capture_output=True, text=True, timeout=10 + ["pip", "show", "ccnotify"], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: - for line in result.stdout.split('\n'): - if line.startswith('Version:'): - current_pkg_version = line.split(':')[1].strip() + for line in result.stdout.split("\n"): + if line.startswith("Version:"): + current_pkg_version = line.split(":")[1].strip() print(f"📦 Current package version: {current_pkg_version}") break else: print("📦 Package not installed via pip (development mode?)") except Exception as e: print(f"⚠️ Could not check package version: {e}") - + # Check model version models_dir = Path("models") current_model_info = get_current_model_info(models_dir) latest_release = get_latest_model_info() - + updates_available = False - + if latest_release: latest_version = latest_release.get("tag_name", "unknown") current_version = current_model_info.get("version", "none") - + print(f"🎤 Current model version: {current_version}") print(f"🎤 Latest model version: {latest_version}") - + if current_version != latest_version: updates_available = True print(f"\n📦 Model update available: {current_version} → {latest_version}") @@ -261,16 +286,16 @@ def check_and_update() -> bool: print("✅ Models are up to date!") else: print("⚠️ Could not check for model updates") - + if updates_available: print("\n🚀 Updates available!") response = input("Update models now? [Y/n]: ").strip().lower() - - if response in ['', 'y', 'yes']: + + if response in ["", "y", "yes"]: return setup_kokoro(force_download=True) else: print("⏭️ Skipping model update") - + print("\n💡 To update the package, run: pip install --upgrade ccnotify") return True @@ -278,31 +303,31 @@ def check_and_update() -> bool: def cleanup_models() -> None: """Clean up downloaded model files""" models_dir = Path("models") - + if not models_dir.exists(): print("No models directory found") return - + model_files = list(models_dir.glob("*.onnx")) + list(models_dir.glob("*.bin")) - + if not model_files: print("No model files found to clean up") return - + total_size = sum(f.stat().st_size for f in model_files) print(f"Found {len(model_files)} model files ({total_size / 1024 / 1024:.1f} MB)") - + response = input("Delete all model files? [y/N]: ") - if response.lower() == 'y': + if response.lower() == "y": for file_path in model_files: file_path.unlink() print(f"Deleted {file_path.name}") - + # Remove directory if empty if not any(models_dir.iterdir()): models_dir.rmdir() print("Removed empty models directory") - + print("✅ Cleanup completed") else: print("Cleanup cancelled") @@ -320,36 +345,37 @@ def main(): python setup.py --update Check for and install updates python setup.py --voices List available voices python setup.py --cleanup Remove downloaded models - """) - + """, + ) + parser.add_argument("--kokoro", action="store_true", help="Setup Kokoro TTS") parser.add_argument("--force", action="store_true", help="Force reinstall models") parser.add_argument("--update", action="store_true", help="Check for and install updates") parser.add_argument("--voices", action="store_true", help="List available voices") parser.add_argument("--cleanup", action="store_true", help="Clean up downloaded models") - + args = parser.parse_args() - + if not any([args.kokoro, args.update, args.voices, args.cleanup]): parser.print_help() return - + if args.voices: list_voices() - + if args.update: success = check_and_update() if not success: sys.exit(1) - + elif args.kokoro: success = setup_kokoro(force_download=args.force) if not success: sys.exit(1) - + elif args.cleanup: cleanup_models() if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/ccnotify/tts/__init__.py b/src/ccnotify/tts/__init__.py index e200932..9fd3522 100644 --- a/src/ccnotify/tts/__init__.py +++ b/src/ccnotify/tts/__init__.py @@ -5,4 +5,4 @@ from .base import TTSProvider from .factory import get_tts_provider -__all__ = ["TTSProvider", "get_tts_provider"] \ No newline at end of file +__all__ = ["TTSProvider", "get_tts_provider"] diff --git a/src/ccnotify/tts/base.py b/src/ccnotify/tts/base.py index bc667b8..df7ba74 100644 --- a/src/ccnotify/tts/base.py +++ b/src/ccnotify/tts/base.py @@ -12,63 +12,63 @@ class TTSProvider(ABC): """Abstract base class for TTS providers""" - + def __init__(self, config: Dict[str, Any]): """Initialize TTS provider with configuration""" self.config = config self.logger = logger.getChild(self.__class__.__name__) - + @abstractmethod def is_available(self) -> bool: """Check if TTS provider is available and properly configured""" pass - + @abstractmethod def generate(self, text: str, output_path: Path, **kwargs) -> bool: """ Generate TTS audio from text - + Args: text: Text to convert to speech output_path: Path where audio file should be saved **kwargs: Provider-specific options (voice, speed, etc.) - + Returns: True if generation was successful, False otherwise """ pass - + @abstractmethod def get_file_extension(self) -> str: """Get the file extension for audio files produced by this provider""" pass - + def get_cache_key(self, text: str, **kwargs) -> str: """ Generate a cache key for the given text and options - + Args: text: Text to convert to speech **kwargs: Provider-specific options - + Returns: Cache key string """ import hashlib - + # Create a string representation of all parameters cache_data = f"{text}|{sorted(kwargs.items())}" - + # Generate a hash for the cache key return hashlib.md5(cache_data.encode()).hexdigest()[:8] - + def validate_config(self, required_keys: list) -> bool: """ Validate that required configuration keys are present - + Args: required_keys: List of required configuration keys - + Returns: True if all required keys are present, False otherwise """ @@ -77,7 +77,7 @@ def validate_config(self, required_keys: list) -> bool: self.logger.error(f"Missing required configuration key: {key}") return False return True - + def log_generation(self, text: str, output_path: Path, success: bool, **kwargs): """Log TTS generation attempt""" status = "SUCCESS" if success else "FAILED" @@ -85,21 +85,24 @@ def log_generation(self, text: str, output_path: Path, success: bool, **kwargs): f"TTS generation {status}: '{text[:50]}...' -> {output_path.name} " f"({self.__class__.__name__})" ) - + if kwargs: self.logger.debug(f"Generation options: {kwargs}") class TTSError(Exception): """Base exception for TTS-related errors""" + pass class TTSProviderNotAvailable(TTSError): """Raised when a TTS provider is not available""" + pass class TTSGenerationError(TTSError): """Raised when TTS generation fails""" - pass \ No newline at end of file + + pass diff --git a/src/ccnotify/tts/elevenlabs.py b/src/ccnotify/tts/elevenlabs.py index 1aad53d..fd4614a 100644 --- a/src/ccnotify/tts/elevenlabs.py +++ b/src/ccnotify/tts/elevenlabs.py @@ -9,6 +9,7 @@ # Import requests only when needed try: import requests + REQUESTS_AVAILABLE = True except ImportError: REQUESTS_AVAILABLE = False @@ -20,41 +21,39 @@ class ElevenLabsProvider(TTSProvider): """ElevenLabs TTS provider using cloud API""" - + def __init__(self, config: Dict[str, Any]): super().__init__(config) - + # Configuration self.api_key = config.get("api_key", "") self.voice_id = config.get("voice_id", "21m00Tcm4TlvDq8ikWAM") # Rachel self.model_id = config.get("model_id", "eleven_flash_v2_5") - + # API settings self.base_url = "https://api.elevenlabs.io/v1" self.timeout = config.get("timeout", 30) - + # Voice settings self.stability = float(config.get("stability", 0.5)) self.similarity_boost = float(config.get("similarity_boost", 0.5)) - + def is_available(self) -> bool: """Check if ElevenLabs TTS is available""" if not REQUESTS_AVAILABLE: self.logger.warning("requests library not installed - needed for ElevenLabs") return False - + if not self.api_key: self.logger.warning("ElevenLabs API key not configured") return False - + try: # Test API connectivity response = requests.get( - f"{self.base_url}/voices", - headers={"xi-api-key": self.api_key}, - timeout=5 + f"{self.base_url}/voices", headers={"xi-api-key": self.api_key}, timeout=5 ) - + if response.status_code == 200: return True elif response.status_code == 401: @@ -63,59 +62,50 @@ def is_available(self) -> bool: else: self.logger.warning(f"ElevenLabs API returned status {response.status_code}") return False - + except requests.RequestException as e: self.logger.warning(f"ElevenLabs API connectivity test failed: {e}") return False - + def generate(self, text: str, output_path: Path, **kwargs) -> bool: """ Generate TTS audio using ElevenLabs API - + Args: text: Text to convert to speech output_path: Path where audio file should be saved **kwargs: Additional options (voice_id, model_id, stability, similarity_boost) - + Returns: True if generation was successful, False otherwise """ try: if not self.is_available(): raise TTSProviderNotAvailable("ElevenLabs TTS is not available") - + # Get generation parameters voice_id = kwargs.get("voice_id", self.voice_id) model_id = kwargs.get("model_id", self.model_id) stability = float(kwargs.get("stability", self.stability)) similarity_boost = float(kwargs.get("similarity_boost", self.similarity_boost)) - + # Prepare request url = f"{self.base_url}/text-to-speech/{voice_id}" - headers = { - "xi-api-key": self.api_key, - "Content-Type": "application/json" - } - + headers = {"xi-api-key": self.api_key, "Content-Type": "application/json"} + payload = { "text": text, "model_id": model_id, - "voice_settings": { - "stability": stability, - "similarity_boost": similarity_boost - } + "voice_settings": {"stability": stability, "similarity_boost": similarity_boost}, } - - self.logger.debug(f"Generating TTS: '{text[:50]}...' with voice={voice_id}, model={model_id}") - - # Make API request - response = requests.post( - url, - headers=headers, - json=payload, - timeout=self.timeout + + self.logger.debug( + f"Generating TTS: '{text[:50]}...' with voice={voice_id}, model={model_id}" ) - + + # Make API request + response = requests.post(url, headers=headers, json=payload, timeout=self.timeout) + # Check response if response.status_code != 200: error_msg = f"ElevenLabs API error {response.status_code}" @@ -125,57 +115,54 @@ def generate(self, text: str, output_path: Path, **kwargs) -> bool: error_msg += f": {error_detail}" except: pass - + raise TTSGenerationError(error_msg) - + # Save audio data output_path.parent.mkdir(parents=True, exist_ok=True) - with open(output_path, 'wb') as f: + with open(output_path, "wb") as f: f.write(response.content) - - self.log_generation( - text, output_path, True, - voice_id=voice_id, model_id=model_id - ) + + self.log_generation(text, output_path, True, voice_id=voice_id, model_id=model_id) return True - + except Exception as e: self.logger.error(f"ElevenLabs TTS generation failed: {e}") self.log_generation(text, output_path, False) - + # Clean up partial file if output_path.exists(): try: output_path.unlink() except: pass - + return False - + def get_file_extension(self) -> str: """Get the file extension for ElevenLabs audio files""" return ".mp3" - + def get_available_voices(self) -> Dict[str, Any]: """ Get list of available voices from ElevenLabs API - + Returns: Dictionary containing voice information """ try: if not self.is_available(): return {} - + response = requests.get( f"{self.base_url}/voices", headers={"xi-api-key": self.api_key}, - timeout=self.timeout + timeout=self.timeout, ) - + if response.status_code == 200: voices_data = response.json() - + # Organize voices by category voices = {} for voice in voices_data.get("voices", []): @@ -185,78 +172,80 @@ def get_available_voices(self) -> Dict[str, Any]: "category": voice.get("category", "Unknown"), "description": voice.get("description", ""), "labels": voice.get("labels", {}), - "preview_url": voice.get("preview_url", "") + "preview_url": voice.get("preview_url", ""), } - + category = voice_info["category"] if category not in voices: voices[category] = [] voices[category].append(voice_info) - + return voices else: self.logger.error(f"Failed to fetch voices: {response.status_code}") return {} - + except Exception as e: self.logger.error(f"Error fetching available voices: {e}") return {} - + def get_voice_info(self, voice_id: str) -> Dict[str, Any]: """ Get detailed information about a specific voice - + Args: voice_id: ElevenLabs voice ID - + Returns: Dictionary containing voice information """ try: if not self.is_available(): return {} - + response = requests.get( f"{self.base_url}/voices/{voice_id}", headers={"xi-api-key": self.api_key}, - timeout=self.timeout + timeout=self.timeout, ) - + if response.status_code == 200: return response.json() else: self.logger.error(f"Failed to fetch voice info: {response.status_code}") return {} - + except Exception as e: self.logger.error(f"Error fetching voice info for {voice_id}: {e}") return {} - - def test_voice(self, voice_id: str, test_text: str = "Hello, this is a test of ElevenLabs TTS") -> bool: + + def test_voice( + self, voice_id: str, test_text: str = "Hello, this is a test of ElevenLabs TTS" + ) -> bool: """ Test a specific voice by generating a sample - + Args: voice_id: ElevenLabs voice ID to test test_text: Text to use for testing - + Returns: True if voice test successful, False otherwise """ try: from tempfile import NamedTemporaryFile - + with NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_file: tmp_path = Path(tmp_file.name) - + success = self.generate(test_text, tmp_path, voice_id=voice_id) - + # Clean up test file if tmp_path.exists(): tmp_path.unlink() - + return success - + except Exception as e: self.logger.error(f"Voice test failed for {voice_id}: {e}") - return False \ No newline at end of file + return False diff --git a/src/ccnotify/tts/factory.py b/src/ccnotify/tts/factory.py index 5e9ce65..3ed8018 100644 --- a/src/ccnotify/tts/factory.py +++ b/src/ccnotify/tts/factory.py @@ -22,52 +22,50 @@ def get_tts_provider( - provider_name: str = None, - config: Dict[str, Any] = None, - fallback: bool = True + provider_name: str = None, config: Dict[str, Any] = None, fallback: bool = True ) -> Optional[TTSProvider]: """ Get a TTS provider instance - + Args: provider_name: Name of the TTS provider to use config: Configuration dictionary for the provider fallback: Whether to try fallback providers if primary fails - + Returns: TTS provider instance, or None if no provider available """ if config is None: config = {} - + # Get provider name from config or environment if provider_name is None: provider_name = config.get("provider", os.getenv("TTS_PROVIDER", "kokoro")) - + # Normalize provider name provider_name = provider_name.lower().strip() - + # Try primary provider provider = _create_provider(provider_name, config) if provider and provider.is_available(): logger.info(f"Using TTS provider: {provider_name}") return provider - + if not fallback: return None - + # Try fallback providers fallback_order = _get_fallback_order(provider_name) - + for fallback_name in fallback_order: if fallback_name == provider_name: continue # Skip the primary provider we already tried - + provider = _create_provider(fallback_name, config) if provider and provider.is_available(): logger.info(f"Using fallback TTS provider: {fallback_name}") return provider - + logger.warning("No TTS providers available") return None @@ -78,12 +76,12 @@ def _create_provider(provider_name: str, config: Dict[str, Any]) -> Optional[TTS if provider_name not in TTS_PROVIDERS: logger.error(f"Unknown TTS provider: {provider_name}") return None - + provider_class = TTS_PROVIDERS[provider_name] provider_config = _build_provider_config(provider_name, config) - + return provider_class(provider_config) - + except Exception as e: logger.error(f"Failed to create {provider_name} provider: {e}") return None @@ -92,50 +90,59 @@ def _create_provider(provider_name: str, config: Dict[str, Any]) -> Optional[TTS def _build_provider_config(provider_name: str, base_config: Dict[str, Any]) -> Dict[str, Any]: """Build configuration for a specific provider""" config = base_config.copy() - + if provider_name == "kokoro": - config.update({ - "models_dir": base_config.get("models_dir", "models"), - "voice": os.getenv("KOKORO_VOICE", base_config.get("voice", "af_heart")), - "speed": os.getenv("KOKORO_SPEED", base_config.get("speed", "1.0")), - "format": os.getenv("KOKORO_FORMAT", base_config.get("format", "mp3")), - "mp3_bitrate": os.getenv("KOKORO_MP3_BITRATE", base_config.get("mp3_bitrate", "128k")), - }) - + config.update( + { + "models_dir": base_config.get("models_dir", "models"), + "voice": os.getenv("KOKORO_VOICE", base_config.get("voice", "af_heart")), + "speed": os.getenv("KOKORO_SPEED", base_config.get("speed", "1.0")), + "format": os.getenv("KOKORO_FORMAT", base_config.get("format", "mp3")), + "mp3_bitrate": os.getenv( + "KOKORO_MP3_BITRATE", base_config.get("mp3_bitrate", "128k") + ), + } + ) + elif provider_name == "elevenlabs": - config.update({ - "api_key": os.getenv("ELEVENLABS_API_KEY", base_config.get("api_key", "")), - "voice_id": os.getenv("ELEVENLABS_VOICE_ID", base_config.get("voice_id", "21m00Tcm4TlvDq8ikWAM")), - "model_id": os.getenv("ELEVENLABS_MODEL_ID", base_config.get("model_id", "eleven_flash_v2_5")), - "stability": base_config.get("stability", 0.5), - "similarity_boost": base_config.get("similarity_boost", 0.5), - }) - - + config.update( + { + "api_key": os.getenv("ELEVENLABS_API_KEY", base_config.get("api_key", "")), + "voice_id": os.getenv( + "ELEVENLABS_VOICE_ID", base_config.get("voice_id", "21m00Tcm4TlvDq8ikWAM") + ), + "model_id": os.getenv( + "ELEVENLABS_MODEL_ID", base_config.get("model_id", "eleven_flash_v2_5") + ), + "stability": base_config.get("stability", 0.5), + "similarity_boost": base_config.get("similarity_boost", 0.5), + } + ) + return config def _get_fallback_order(primary_provider: str) -> list: """Get fallback order based on primary provider preference""" - + # Define fallback preferences fallback_map = { "kokoro": ["elevenlabs"], "elevenlabs": ["kokoro"], } - + return fallback_map.get(primary_provider, ["kokoro", "elevenlabs"]) def list_available_providers() -> Dict[str, Dict[str, Any]]: """ List all available TTS providers and their status - + Returns: Dictionary mapping provider names to their availability info """ providers_info = {} - + for provider_name in TTS_PROVIDERS.keys(): try: provider = _create_provider(provider_name, {}) @@ -144,20 +151,17 @@ def list_available_providers() -> Dict[str, Dict[str, Any]]: "available": provider.is_available(), "class": provider.__class__.__name__, "extension": provider.get_file_extension(), - "description": _get_provider_description(provider_name) + "description": _get_provider_description(provider_name), } else: providers_info[provider_name] = { "available": False, - "error": "Failed to create provider" + "error": "Failed to create provider", } - + except Exception as e: - providers_info[provider_name] = { - "available": False, - "error": str(e) - } - + providers_info[provider_name] = {"available": False, "error": str(e)} + return providers_info @@ -165,7 +169,7 @@ def _get_provider_description(provider_name: str) -> str: """Get human-readable description of a provider""" descriptions = { "kokoro": "Local high-quality TTS using ONNX models", - "elevenlabs": "Premium cloud-based TTS with natural voices" + "elevenlabs": "Premium cloud-based TTS with natural voices", } return descriptions.get(provider_name, "Unknown provider") @@ -173,37 +177,39 @@ def _get_provider_description(provider_name: str) -> str: def test_all_providers(test_text: str = "Hello, this is a TTS test") -> Dict[str, bool]: """ Test all available providers - + Args: test_text: Text to use for testing - + Returns: Dictionary mapping provider names to test results """ results = {} - + for provider_name in TTS_PROVIDERS.keys(): try: provider = get_tts_provider(provider_name, fallback=False) if provider: # Test with a temporary file from tempfile import NamedTemporaryFile - - with NamedTemporaryFile(suffix=provider.get_file_extension(), delete=False) as tmp_file: + + with NamedTemporaryFile( + suffix=provider.get_file_extension(), delete=False + ) as tmp_file: tmp_path = Path(tmp_file.name) - + success = provider.generate(test_text, tmp_path) - + # Clean up if tmp_path.exists(): tmp_path.unlink() - + results[provider_name] = success else: results[provider_name] = False - + except Exception as e: logger.error(f"Test failed for {provider_name}: {e}") results[provider_name] = False - - return results \ No newline at end of file + + return results diff --git a/src/ccnotify/tts/kokoro.py b/src/ccnotify/tts/kokoro.py index 24b7402..39a9c4a 100644 --- a/src/ccnotify/tts/kokoro.py +++ b/src/ccnotify/tts/kokoro.py @@ -13,22 +13,24 @@ class KokoroProvider(TTSProvider): """Kokoro TTS provider using local ONNX models""" - + def __init__(self, config: Dict[str, Any]): super().__init__(config) self._kokoro = None self._models_dir = Path(config.get("models_dir", "models")) - + # Default configuration with enhanced options - self.voice = config.get("voice", "af_heart") # Popular voices: af_heart, af_sarah, am_adam, af_sky, am_michael + self.voice = config.get( + "voice", "af_heart" + ) # Popular voices: af_heart, af_sarah, am_adam, af_sky, am_michael self.speed = float(config.get("speed", 1.0)) # 0.5 = slower, 2.0 = faster self.format = config.get("format", "mp3").lower() # mp3, wav, or aiff self.mp3_bitrate = config.get("mp3_bitrate", "128k") # For MP3 encoding - + # Model file paths self.model_path = self._models_dir / "kokoro-v1.0.onnx" self.voices_path = self._models_dir / "voices-v1.0.bin" - + def is_available(self) -> bool: """Check if Kokoro TTS is available""" try: @@ -36,30 +38,30 @@ def is_available(self) -> bool: if not self.model_path.exists(): self.logger.warning(f"Kokoro model file not found: {self.model_path}") return False - + if not self.voices_path.exists(): self.logger.warning(f"Kokoro voices file not found: {self.voices_path}") return False - + # Try to import and initialize Kokoro self._ensure_kokoro_loaded() return True - + except ImportError as e: self.logger.warning(f"Kokoro TTS not available: {e}") return False except Exception as e: self.logger.error(f"Kokoro TTS availability check failed: {e}") return False - + def _ensure_kokoro_loaded(self): """Ensure Kokoro TTS is loaded and ready""" if self._kokoro is None: try: from kokoro_onnx import KokoroTTS + self._kokoro = KokoroTTS( - model_path=str(self.model_path), - voices_path=str(self.voices_path) + model_path=str(self.model_path), voices_path=str(self.voices_path) ) self.logger.info("Kokoro TTS initialized successfully") except ImportError: @@ -68,89 +70,85 @@ def _ensure_kokoro_loaded(self): ) except Exception as e: raise TTSProviderNotAvailable(f"Failed to initialize Kokoro TTS: {e}") - + def generate(self, text: str, output_path: Path, **kwargs) -> bool: """ Generate TTS audio using Kokoro - + Args: text: Text to convert to speech output_path: Path where audio file should be saved **kwargs: Additional options (voice, speed) - + Returns: True if generation was successful, False otherwise """ try: if not self.is_available(): raise TTSProviderNotAvailable("Kokoro TTS is not available") - + # Get generation parameters voice = kwargs.get("voice", self.voice) speed = float(kwargs.get("speed", self.speed)) - + # Validate voice format voice = self._validate_voice(voice) - + # Ensure Kokoro is loaded self._ensure_kokoro_loaded() - + # Generate audio self.logger.debug(f"Generating TTS: '{text[:50]}...' with voice={voice}, speed={speed}") - + # Generate the audio data (Kokoro generates WAV format) - audio_data = self._kokoro.generate( - text=text, - voice=voice, - speed=speed - ) - + audio_data = self._kokoro.generate(text=text, voice=voice, speed=speed) + # Save to file with format conversion if needed output_path.parent.mkdir(parents=True, exist_ok=True) - + # Get desired format from config or output path extension target_format = self.format if output_path.suffix: # Use output path extension if specified target_format = output_path.suffix[1:].lower() - + if target_format in ["mp3", "aiff"] and target_format != "wav": # Convert WAV to target format using pydub self._save_with_conversion(audio_data, output_path, target_format) else: # Save as WAV directly - with open(output_path, 'wb') as f: + with open(output_path, "wb") as f: f.write(audio_data) - + self.log_generation(text, output_path, True, voice=voice, speed=speed) return True - + except Exception as e: self.logger.error(f"Kokoro TTS generation failed: {e}") self.log_generation(text, output_path, False, voice=voice, speed=speed) - + # Clean up partial file if output_path.exists(): try: output_path.unlink() except: pass - + return False - + def get_file_extension(self) -> str: """Get the file extension for Kokoro audio files""" return f".{self.format}" - + def _save_with_conversion(self, audio_data: bytes, output_path: Path, format: str): """Convert WAV audio data to another format and save""" try: from pydub import AudioSegment import io - + # Load WAV data into AudioSegment audio = AudioSegment.from_wav(io.BytesIO(audio_data)) - + # Export to desired format if format == "mp3": audio.export(output_path, format="mp3", bitrate=self.mp3_bitrate) @@ -160,40 +158,40 @@ def _save_with_conversion(self, audio_data: bytes, output_path: Path, format: st self.logger.debug(f"Converted to AIFF") else: # Fallback to WAV - with open(output_path, 'wb') as f: + with open(output_path, "wb") as f: f.write(audio_data) except ImportError: # If pydub is not available, save as WAV self.logger.warning("pydub not available for format conversion, saving as WAV") - with open(output_path, 'wb') as f: + with open(output_path, "wb") as f: f.write(audio_data) except Exception as e: self.logger.error(f"Format conversion failed: {e}, saving as WAV") - with open(output_path, 'wb') as f: + with open(output_path, "wb") as f: f.write(audio_data) - + def _validate_voice(self, voice: str) -> str: """ Validate and normalize voice parameter - + Args: voice: Voice specification (single voice or blended) - + Returns: Validated voice string """ if not voice: return self.voice - + # Handle voice blending (e.g., "af_sarah:60,am_adam:40") - if ',' in voice: + if "," in voice: # Validate blended voice format - voices = voice.split(',') + voices = voice.split(",") normalized_voices = [] - + for v in voices: - if ':' in v: - voice_name, weight = v.strip().split(':') + if ":" in v: + voice_name, weight = v.strip().split(":") try: weight_val = float(weight) if not 0 <= weight_val <= 100: @@ -203,27 +201,43 @@ def _validate_voice(self, voice: str) -> str: normalized_voices.append(f"{voice_name.strip()}:{weight}") else: normalized_voices.append(v.strip()) - - return ','.join(normalized_voices) - + + return ",".join(normalized_voices) + # Single voice return voice.strip() - + def get_available_voices(self) -> Dict[str, list]: """ Get list of available voices organized by category - + Returns: Dictionary mapping voice categories to voice lists """ return { "English (Female)": [ - "af_alloy", "af_aoede", "af_bella", "af_heart", "af_jessica", - "af_kore", "af_nicole", "af_nova", "af_river", "af_sarah", "af_sky" + "af_alloy", + "af_aoede", + "af_bella", + "af_heart", + "af_jessica", + "af_kore", + "af_nicole", + "af_nova", + "af_river", + "af_sarah", + "af_sky", ], "English (Male)": [ - "am_adam", "am_echo", "am_eric", "am_fenrir", "am_liam", - "am_michael", "am_onyx", "am_puck", "am_santa" + "am_adam", + "am_echo", + "am_eric", + "am_fenrir", + "am_liam", + "am_michael", + "am_onyx", + "am_puck", + "am_santa", ], "British English (Female)": ["bf_alice", "bf_emma", "bf_isabella", "bf_lily"], "British English (Male)": ["bm_daniel", "bm_fable", "bm_george", "bm_lewis"], @@ -231,36 +245,44 @@ def get_available_voices(self) -> Dict[str, list]: "Italian": ["if_sara", "im_nicola"], "Japanese": ["jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo"], "Chinese": [ - "zf_xiaobei", "zf_xiaoni", "zf_xiaoxiao", "zf_xiaoyi", - "zm_yunjian", "zm_yunxi", "zm_yunxia", "zm_yunyang" - ] + "zf_xiaobei", + "zf_xiaoni", + "zf_xiaoxiao", + "zf_xiaoyi", + "zm_yunjian", + "zm_yunxi", + "zm_yunxia", + "zm_yunyang", + ], } - - def test_voice(self, voice: str, test_text: str = "Hello, this is a test of Kokoro TTS") -> bool: + + def test_voice( + self, voice: str, test_text: str = "Hello, this is a test of Kokoro TTS" + ) -> bool: """ Test a specific voice by generating a sample - + Args: voice: Voice to test test_text: Text to use for testing - + Returns: True if voice test successful, False otherwise """ try: from tempfile import NamedTemporaryFile - + with NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: tmp_path = Path(tmp_file.name) - + success = self.generate(test_text, tmp_path, voice=voice) - + # Clean up test file if tmp_path.exists(): tmp_path.unlink() - + return success - + except Exception as e: self.logger.error(f"Voice test failed for {voice}: {e}") - return False \ No newline at end of file + return False diff --git a/src/ccnotify/version.py b/src/ccnotify/version.py index 6d16651..03b6000 100644 --- a/src/ccnotify/version.py +++ b/src/ccnotify/version.py @@ -15,6 +15,7 @@ def get_package_version() -> str: """Get the current package version.""" try: from . import __version__ + return __version__ except ImportError: return "unknown" @@ -24,7 +25,7 @@ def extract_script_version(script_path: Path) -> Optional[str]: """Extract version from ccnotify.py script file.""" if not script_path.exists(): return None - + try: content = script_path.read_text() # Look for version pattern in the script @@ -33,14 +34,14 @@ def extract_script_version(script_path: Path) -> Optional[str]: return version_match.group(1) except Exception: pass - + return None def compare_versions(new_version: str, current_version: str) -> int: """ Compare two version strings. - + Returns: 1 if new_version > current_version 0 if new_version == current_version @@ -48,12 +49,12 @@ def compare_versions(new_version: str, current_version: str) -> int: """ if new_version == "unknown" or current_version == "unknown": return 0 - + if version is not None: try: new_v = version.parse(new_version) current_v = version.parse(current_version) - + if new_v > current_v: return 1 elif new_v == current_v: @@ -62,7 +63,7 @@ def compare_versions(new_version: str, current_version: str) -> int: return -1 except Exception: pass - + # Fallback to string comparison if new_version == current_version: return 0 @@ -83,10 +84,10 @@ def format_version_info(current: Optional[str], available: Optional[str]) -> str """Format version information for display.""" if not current: return f"Not installed → {available or 'unknown'}" - + if not available: return f"Installed: {current}" - + if is_newer_version(available, current): return f"{current} → {available} (update available)" elif is_same_version(current, available): @@ -107,7 +108,7 @@ def needs_config_migration(current_config_version: str, target_version: str = "1 return version.parse(current_config_version) < version.parse(target_version) except Exception: pass - + # Fallback comparison if packaging is not available return current_config_version != target_version @@ -116,38 +117,38 @@ def embed_version_in_script(script_content: str, version_string: str) -> str: """Embed version information in generated script.""" # Add version constant at the top of the script version_line = f'__version__ = "{version_string}"\n' - + # Find the appropriate place to insert the version # Look for imports section and add after it - lines = script_content.split('\n') + lines = script_content.split("\n") insert_index = 0 - + # Find last import or the shebang line for i, line in enumerate(lines): - if line.startswith('#!') or line.startswith('import ') or line.startswith('from '): + if line.startswith("#!") or line.startswith("import ") or line.startswith("from "): insert_index = i + 1 - elif line.strip() == '' and insert_index > 0: + elif line.strip() == "" and insert_index > 0: # Found empty line after imports break - + # Insert version line lines.insert(insert_index, version_line) - - return '\n'.join(lines) + + return "\n".join(lines) def get_version_summary() -> dict: """Get a summary of all version information.""" from .installer.detector import InstallationDetector - + detector = InstallationDetector() status = detector.check_existing_installation() package_version = get_package_version() - + return { "package_version": package_version, "installed_script_version": status.script_version, "config_version": status.config_version, "update_available": is_newer_version(package_version, status.script_version or "0.0.0"), - "migration_needed": needs_config_migration(status.config_version or "0.0.0") - } \ No newline at end of file + "migration_needed": needs_config_migration(status.config_version or "0.0.0"), + }