diff --git a/src/rapidata/rapidata_client/config/upload_config.py b/src/rapidata/rapidata_client/config/upload_config.py index f79736924..b2981c390 100644 --- a/src/rapidata/rapidata_client/config/upload_config.py +++ b/src/rapidata/rapidata_client/config/upload_config.py @@ -1,41 +1,37 @@ from pathlib import Path -from typing import Callable +import shutil from pydantic import BaseModel, Field, field_validator from rapidata.rapidata_client.config import logger -# Type alias for config update handlers -UploadConfigUpdateHandler = Callable[["UploadConfig"], None] - -# Global list to store registered handlers -_upload_config_handlers: list[UploadConfigUpdateHandler] = [] - - -def register_upload_config_handler(handler: UploadConfigUpdateHandler) -> None: - """Register a handler to be called when the upload configuration updates.""" - _upload_config_handlers.append(handler) - - -def unregister_upload_config_handler(handler: UploadConfigUpdateHandler) -> None: - """Unregister a previously registered handler.""" - if handler in _upload_config_handlers: - _upload_config_handlers.remove(handler) - class UploadConfig(BaseModel): """ Holds the configuration for the upload process. Attributes: - maxWorkers (int): The maximum number of worker threads for processing media paths. Defaults to 25. + maxWorkers (int): The maximum number of worker threads for concurrent uploads. Defaults to 25. maxRetries (int): The maximum number of retries for failed uploads. Defaults to 3. + cacheUploads (bool): Enable/disable upload caching. Defaults to True. + cacheTimeout (float): Cache operation timeout in seconds. Defaults to 0.1. + cacheLocation (Path): Directory for cache storage. Defaults to ~/.cache/rapidata/upload_cache. + This is immutable + cacheShards (int): Number of cache shards for parallel access. Defaults to 128. + Higher values improve concurrency but increase file handles. Must be positive. + This is immutable """ maxWorkers: int = Field(default=25) maxRetries: int = Field(default=3) cacheUploads: bool = Field(default=True) cacheTimeout: float = Field(default=0.1) - cacheLocation: Path = Field(default=Path.home() / ".rapidata" / "upload_cache") - cacheSizeLimit: int = Field(default=100_000_000) # 100MB + cacheLocation: Path = Field( + default=Path.home() / ".cache" / "rapidata" / "upload_cache", + frozen=True, + ) + cacheShards: int = Field( + default=128, + frozen=True, + ) @field_validator("maxWorkers") @classmethod @@ -47,18 +43,37 @@ def validate_max_workers(cls, v: int) -> int: ) return v + @field_validator("cacheShards") + @classmethod + def validate_cache_shards(cls, v: int) -> int: + if v < 1: + raise ValueError("cacheShards must be at least 1") + if v & (v - 1) != 0: + logger.warning( + f"cacheShards={v} is not a power of 2. Power-of-2 values provide better hash distribution." + ) + return v + def __init__(self, **kwargs): super().__init__(**kwargs) - self._notify_handlers() - - def __setattr__(self, name: str, value) -> None: - super().__setattr__(name, value) - self._notify_handlers() + self._migrate_cache() - def _notify_handlers(self) -> None: - """Notify all registered handlers that the configuration has updated.""" - for handler in _upload_config_handlers: + def _migrate_cache(self) -> None: + """Migrate the cache from the old location to the new location.""" + old_cache = Path.home() / ".rapidata" / "upload_cache" + new_cache = self.cacheLocation + if old_cache.exists() and not new_cache.exists(): try: - handler(self) + logger.info(f"Migrating cache from {old_cache} to {self.cacheLocation}") + self.cacheLocation.parent.mkdir(parents=True, exist_ok=True) + shutil.move(str(old_cache), str(self.cacheLocation)) + + # Clean up old directory if empty + if old_cache.parent.exists() and not any(old_cache.parent.iterdir()): + old_cache.parent.rmdir() + logger.info("Cache migration completed successfully") except Exception as e: - logger.warning(f"Warning: UploadConfig handler failed: {e}") + logger.warning( + f"Failed to migrate cache from {old_cache} to {new_cache}: {e}. " + "Starting with empty cache. You may want to manually move the old cache." + ) diff --git a/src/rapidata/rapidata_client/datapoints/_asset_uploader.py b/src/rapidata/rapidata_client/datapoints/_asset_uploader.py index 594d1ff0d..5be916f79 100644 --- a/src/rapidata/rapidata_client/datapoints/_asset_uploader.py +++ b/src/rapidata/rapidata_client/datapoints/_asset_uploader.py @@ -4,10 +4,6 @@ import os from typing import TYPE_CHECKING -from rapidata.rapidata_client.config.upload_config import ( - register_upload_config_handler, - UploadConfig, -) from rapidata.service.openapi_service import OpenAPIService from rapidata.rapidata_client.config import logger, rapidata_config, tracer from rapidata.rapidata_client.datapoints._single_flight_cache import SingleFlightCache @@ -22,39 +18,14 @@ class AssetUploader: "File cache", storage=FanoutCache( rapidata_config.upload.cacheLocation, - shards=rapidata_config.upload.maxWorkers, + shards=rapidata_config.upload.cacheShards, timeout=rapidata_config.upload.cacheTimeout, - size_limit=rapidata_config.upload.cacheSizeLimit, ), ) _url_cache: SingleFlightCache = SingleFlightCache("URL cache") def __init__(self, openapi_service: OpenAPIService): self.openapi_service = openapi_service - register_upload_config_handler(self._handle_config_update) - - @classmethod - def _handle_config_update(cls, config: UploadConfig): - """Handle updates to the upload config by re-creating the file cache storage.""" - logger.debug("Updating AssetUploader file cache with new config") - try: - cls._file_cache.set_storage( - FanoutCache( - config.cacheLocation, - shards=config.maxWorkers, - timeout=config.cacheTimeout, - size_limit=config.cacheSizeLimit, - ) - ) - logger.info( - "AssetUploader file cache updated: location=%s, shards=%s, timeout=%s, size_limit=%s", - config.cacheLocation, - config.maxWorkers, - config.cacheTimeout, - config.cacheSizeLimit, - ) - except Exception as e: - logger.warning(f"Failed to update AssetUploader file cache: {e}") def _get_file_cache_key(self, asset: str) -> str: """Generate cache key for a file, including environment."""