Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 46 additions & 31 deletions src/rapidata/rapidata_client/config/upload_config.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,37 @@
from pathlib import Path
from typing import Callable
import shutil
from pydantic import BaseModel, Field, field_validator
from rapidata.rapidata_client.config import logger

# Type alias for config update handlers
UploadConfigUpdateHandler = Callable[["UploadConfig"], None]

# Global list to store registered handlers
_upload_config_handlers: list[UploadConfigUpdateHandler] = []


def register_upload_config_handler(handler: UploadConfigUpdateHandler) -> None:
"""Register a handler to be called when the upload configuration updates."""
_upload_config_handlers.append(handler)


def unregister_upload_config_handler(handler: UploadConfigUpdateHandler) -> None:
"""Unregister a previously registered handler."""
if handler in _upload_config_handlers:
_upload_config_handlers.remove(handler)


class UploadConfig(BaseModel):
"""
Holds the configuration for the upload process.

Attributes:
maxWorkers (int): The maximum number of worker threads for processing media paths. Defaults to 25.
maxWorkers (int): The maximum number of worker threads for concurrent uploads. Defaults to 25.
maxRetries (int): The maximum number of retries for failed uploads. Defaults to 3.
cacheUploads (bool): Enable/disable upload caching. Defaults to True.
cacheTimeout (float): Cache operation timeout in seconds. Defaults to 0.1.
cacheLocation (Path): Directory for cache storage. Defaults to ~/.cache/rapidata/upload_cache.
This is immutable
cacheShards (int): Number of cache shards for parallel access. Defaults to 128.
Higher values improve concurrency but increase file handles. Must be positive.
This is immutable
"""

maxWorkers: int = Field(default=25)
maxRetries: int = Field(default=3)
cacheUploads: bool = Field(default=True)
cacheTimeout: float = Field(default=0.1)
cacheLocation: Path = Field(default=Path.home() / ".rapidata" / "upload_cache")
cacheSizeLimit: int = Field(default=100_000_000) # 100MB
cacheLocation: Path = Field(
default=Path.home() / ".cache" / "rapidata" / "upload_cache",
frozen=True,
)
cacheShards: int = Field(
default=128,
frozen=True,
)

@field_validator("maxWorkers")
@classmethod
Expand All @@ -47,18 +43,37 @@ def validate_max_workers(cls, v: int) -> int:
)
return v

@field_validator("cacheShards")
@classmethod
def validate_cache_shards(cls, v: int) -> int:
if v < 1:
raise ValueError("cacheShards must be at least 1")
if v & (v - 1) != 0:
logger.warning(
f"cacheShards={v} is not a power of 2. Power-of-2 values provide better hash distribution."
)
return v

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._notify_handlers()

def __setattr__(self, name: str, value) -> None:
super().__setattr__(name, value)
self._notify_handlers()
self._migrate_cache()

def _notify_handlers(self) -> None:
"""Notify all registered handlers that the configuration has updated."""
for handler in _upload_config_handlers:
def _migrate_cache(self) -> None:
"""Migrate the cache from the old location to the new location."""
old_cache = Path.home() / ".rapidata" / "upload_cache"
new_cache = self.cacheLocation
if old_cache.exists() and not new_cache.exists():
try:
handler(self)
logger.info(f"Migrating cache from {old_cache} to {self.cacheLocation}")
self.cacheLocation.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(old_cache), str(self.cacheLocation))

# Clean up old directory if empty
if old_cache.parent.exists() and not any(old_cache.parent.iterdir()):
old_cache.parent.rmdir()
logger.info("Cache migration completed successfully")
except Exception as e:
logger.warning(f"Warning: UploadConfig handler failed: {e}")
logger.warning(
f"Failed to migrate cache from {old_cache} to {new_cache}: {e}. "
"Starting with empty cache. You may want to manually move the old cache."
)
31 changes: 1 addition & 30 deletions src/rapidata/rapidata_client/datapoints/_asset_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
import os
from typing import TYPE_CHECKING

from rapidata.rapidata_client.config.upload_config import (
register_upload_config_handler,
UploadConfig,
)
from rapidata.service.openapi_service import OpenAPIService
from rapidata.rapidata_client.config import logger, rapidata_config, tracer
from rapidata.rapidata_client.datapoints._single_flight_cache import SingleFlightCache
Expand All @@ -22,39 +18,14 @@ class AssetUploader:
"File cache",
storage=FanoutCache(
rapidata_config.upload.cacheLocation,
shards=rapidata_config.upload.maxWorkers,
shards=rapidata_config.upload.cacheShards,
timeout=rapidata_config.upload.cacheTimeout,
size_limit=rapidata_config.upload.cacheSizeLimit,
),
)
_url_cache: SingleFlightCache = SingleFlightCache("URL cache")

def __init__(self, openapi_service: OpenAPIService):
self.openapi_service = openapi_service
register_upload_config_handler(self._handle_config_update)

@classmethod
def _handle_config_update(cls, config: UploadConfig):
"""Handle updates to the upload config by re-creating the file cache storage."""
logger.debug("Updating AssetUploader file cache with new config")
try:
cls._file_cache.set_storage(
FanoutCache(
config.cacheLocation,
shards=config.maxWorkers,
timeout=config.cacheTimeout,
size_limit=config.cacheSizeLimit,
)
)
logger.info(
"AssetUploader file cache updated: location=%s, shards=%s, timeout=%s, size_limit=%s",
config.cacheLocation,
config.maxWorkers,
config.cacheTimeout,
config.cacheSizeLimit,
)
except Exception as e:
logger.warning(f"Failed to update AssetUploader file cache: {e}")

def _get_file_cache_key(self, asset: str) -> str:
"""Generate cache key for a file, including environment."""
Expand Down