From 2cdc723548785dc7b07c21f1e477bdfacddd42df Mon Sep 17 00:00:00 2001 From: Lino Giger <68745352+LinoGiger@users.noreply.github.com> Date: Mon, 26 Jan 2026 16:26:56 +0100 Subject: [PATCH 1/3] added threadsafty to instantiating the cache --- .../rapidata_client/config/upload_config.py | 39 +++++++++-- .../datapoints/_asset_uploader.py | 66 +++++++++++++------ 2 files changed, 79 insertions(+), 26 deletions(-) diff --git a/src/rapidata/rapidata_client/config/upload_config.py b/src/rapidata/rapidata_client/config/upload_config.py index f79736924..189bca07c 100644 --- a/src/rapidata/rapidata_client/config/upload_config.py +++ b/src/rapidata/rapidata_client/config/upload_config.py @@ -1,4 +1,5 @@ from pathlib import Path +import threading from typing import Callable from pydantic import BaseModel, Field, field_validator from rapidata.rapidata_client.config import logger @@ -6,19 +7,22 @@ # Type alias for config update handlers UploadConfigUpdateHandler = Callable[["UploadConfig"], None] -# Global list to store registered handlers +# Global list to store registered handlers with thread-safe access +_handlers_lock = threading.Lock() _upload_config_handlers: list[UploadConfigUpdateHandler] = [] def register_upload_config_handler(handler: UploadConfigUpdateHandler) -> None: """Register a handler to be called when the upload configuration updates.""" - _upload_config_handlers.append(handler) + with _handlers_lock: + _upload_config_handlers.append(handler) def unregister_upload_config_handler(handler: UploadConfigUpdateHandler) -> None: """Unregister a previously registered handler.""" - if handler in _upload_config_handlers: - _upload_config_handlers.remove(handler) + with _handlers_lock: + if handler in _upload_config_handlers: + _upload_config_handlers.remove(handler) class UploadConfig(BaseModel): @@ -26,8 +30,14 @@ class UploadConfig(BaseModel): Holds the configuration for the upload process. Attributes: - maxWorkers (int): The maximum number of worker threads for processing media paths. Defaults to 25. + maxWorkers (int): The maximum number of worker threads for concurrent uploads. Defaults to 25. maxRetries (int): The maximum number of retries for failed uploads. Defaults to 3. + cacheUploads (bool): Enable/disable upload caching. Defaults to True. + cacheTimeout (float): Cache operation timeout in seconds. Defaults to 0.1. + cacheLocation (Path): Directory for cache storage. Defaults to ~/.rapidata/upload_cache. + cacheSizeLimit (int): Maximum total cache size in bytes. Defaults to 100MB. + cacheShards (int): Number of cache shards for parallel access. Defaults to 128. + Higher values improve concurrency but increase file handles. Must be positive. """ maxWorkers: int = Field(default=25) @@ -36,6 +46,7 @@ class UploadConfig(BaseModel): cacheTimeout: float = Field(default=0.1) cacheLocation: Path = Field(default=Path.home() / ".rapidata" / "upload_cache") cacheSizeLimit: int = Field(default=100_000_000) # 100MB + cacheShards: int = Field(default=128) @field_validator("maxWorkers") @classmethod @@ -47,6 +58,17 @@ def validate_max_workers(cls, v: int) -> int: ) return v + @field_validator("cacheShards") + @classmethod + def validate_cache_shards(cls, v: int) -> int: + if v < 1: + raise ValueError("cacheShards must be at least 1") + if v & (v - 1) != 0: + logger.warning( + f"cacheShards={v} is not a power of 2. Power-of-2 values provide better hash distribution." + ) + return v + def __init__(self, **kwargs): super().__init__(**kwargs) self._notify_handlers() @@ -57,7 +79,12 @@ def __setattr__(self, name: str, value) -> None: def _notify_handlers(self) -> None: """Notify all registered handlers that the configuration has updated.""" - for handler in _upload_config_handlers: + # Snapshot handlers under lock to prevent modifications during iteration + with _handlers_lock: + handlers = _upload_config_handlers.copy() + + # Execute handlers outside lock to avoid deadlocks + for handler in handlers: try: handler(self) except Exception as e: diff --git a/src/rapidata/rapidata_client/datapoints/_asset_uploader.py b/src/rapidata/rapidata_client/datapoints/_asset_uploader.py index 594d1ff0d..320462989 100644 --- a/src/rapidata/rapidata_client/datapoints/_asset_uploader.py +++ b/src/rapidata/rapidata_client/datapoints/_asset_uploader.py @@ -2,6 +2,7 @@ import re import os +import threading from typing import TYPE_CHECKING from rapidata.rapidata_client.config.upload_config import ( @@ -18,11 +19,12 @@ class AssetUploader: + _cache_update_lock = threading.RLock() # RLock allows reentrant locking _file_cache: SingleFlightCache = SingleFlightCache( "File cache", storage=FanoutCache( rapidata_config.upload.cacheLocation, - shards=rapidata_config.upload.maxWorkers, + shards=rapidata_config.upload.cacheShards, timeout=rapidata_config.upload.cacheTimeout, size_limit=rapidata_config.upload.cacheSizeLimit, ), @@ -31,30 +33,50 @@ class AssetUploader: def __init__(self, openapi_service: OpenAPIService): self.openapi_service = openapi_service - register_upload_config_handler(self._handle_config_update) @classmethod def _handle_config_update(cls, config: UploadConfig): - """Handle updates to the upload config by re-creating the file cache storage.""" - logger.debug("Updating AssetUploader file cache with new config") - try: - cls._file_cache.set_storage( - FanoutCache( + """Handle updates to the upload config by re-creating the file cache storage if needed.""" + with cls._cache_update_lock: + # Only recreate cache if cache-related parameters changed + # maxWorkers changes don't affect cache structure + logger.debug("Upload config updated, checking if cache recreation needed") + try: + # Get current cache config + current_storage = cls._file_cache._storage + if not isinstance(current_storage, FanoutCache): + return + + # Check if cache parameters changed + needs_recreation = ( + current_storage.directory != str(config.cacheLocation) + or current_storage.timeout != config.cacheTimeout + or current_storage.size_limit != config.cacheSizeLimit / config.cacheShards + or len(current_storage._shards) != config.cacheShards + ) + + if not needs_recreation: + logger.debug("Cache parameters unchanged, skipping recreation") + return + + logger.info("Cache parameters changed, recreating cache") + cls._file_cache.set_storage( + FanoutCache( + config.cacheLocation, + shards=config.cacheShards, + timeout=config.cacheTimeout, + size_limit=config.cacheSizeLimit, + ) + ) + logger.info( + "AssetUploader file cache updated: location=%s, shards=%s, timeout=%s, size_limit=%s", config.cacheLocation, - shards=config.maxWorkers, - timeout=config.cacheTimeout, - size_limit=config.cacheSizeLimit, + config.cacheShards, + config.cacheTimeout, + config.cacheSizeLimit, ) - ) - logger.info( - "AssetUploader file cache updated: location=%s, shards=%s, timeout=%s, size_limit=%s", - config.cacheLocation, - config.maxWorkers, - config.cacheTimeout, - config.cacheSizeLimit, - ) - except Exception as e: - logger.warning(f"Failed to update AssetUploader file cache: {e}") + except Exception as e: + logger.warning(f"Failed to update AssetUploader file cache: {e}") def _get_file_cache_key(self, asset: str) -> str: """Generate cache key for a file, including environment.""" @@ -130,3 +152,7 @@ def __str__(self) -> str: def __repr__(self) -> str: return f"AssetUploader(openapi_service={self.openapi_service})" + + +# Register the config update handler at module level (once, not per instance) +register_upload_config_handler(AssetUploader._handle_config_update) From 4416021fa97892b6515ef10cc63727669cdcf386 Mon Sep 17 00:00:00 2001 From: Lino Giger <68745352+LinoGiger@users.noreply.github.com> Date: Mon, 26 Jan 2026 16:32:07 +0100 Subject: [PATCH 2/3] migrated the cache location as in 2.x --- .../rapidata_client/config/upload_config.py | 23 +++++++++++++++---- .../datapoints/_asset_uploader.py | 6 +---- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/rapidata/rapidata_client/config/upload_config.py b/src/rapidata/rapidata_client/config/upload_config.py index 189bca07c..58cb30e4f 100644 --- a/src/rapidata/rapidata_client/config/upload_config.py +++ b/src/rapidata/rapidata_client/config/upload_config.py @@ -1,5 +1,6 @@ from pathlib import Path import threading +import shutil from typing import Callable from pydantic import BaseModel, Field, field_validator from rapidata.rapidata_client.config import logger @@ -34,8 +35,7 @@ class UploadConfig(BaseModel): maxRetries (int): The maximum number of retries for failed uploads. Defaults to 3. cacheUploads (bool): Enable/disable upload caching. Defaults to True. cacheTimeout (float): Cache operation timeout in seconds. Defaults to 0.1. - cacheLocation (Path): Directory for cache storage. Defaults to ~/.rapidata/upload_cache. - cacheSizeLimit (int): Maximum total cache size in bytes. Defaults to 100MB. + cacheLocation (Path): Directory for cache storage. Defaults to ~/.cache/rapidata/upload_cache. cacheShards (int): Number of cache shards for parallel access. Defaults to 128. Higher values improve concurrency but increase file handles. Must be positive. """ @@ -44,8 +44,9 @@ class UploadConfig(BaseModel): maxRetries: int = Field(default=3) cacheUploads: bool = Field(default=True) cacheTimeout: float = Field(default=0.1) - cacheLocation: Path = Field(default=Path.home() / ".rapidata" / "upload_cache") - cacheSizeLimit: int = Field(default=100_000_000) # 100MB + cacheLocation: Path = Field( + default=Path.home() / ".cache" / "rapidata" / "upload_cache" + ) cacheShards: int = Field(default=128) @field_validator("maxWorkers") @@ -72,6 +73,7 @@ def validate_cache_shards(cls, v: int) -> int: def __init__(self, **kwargs): super().__init__(**kwargs) self._notify_handlers() + self._migrate_cache() def __setattr__(self, name: str, value) -> None: super().__setattr__(name, value) @@ -89,3 +91,16 @@ def _notify_handlers(self) -> None: handler(self) except Exception as e: logger.warning(f"Warning: UploadConfig handler failed: {e}") + + def _migrate_cache(self) -> None: + """Migrate the cache from the old location to the new location.""" + old_cache = Path.home() / ".rapidata" / "upload_cache" + new_cache = self.cacheLocation + if old_cache.exists() and not new_cache.exists(): + logger.info(f"Migrating cache from {old_cache} to {self.cacheLocation}") + self.cacheLocation.parent.mkdir(parents=True, exist_ok=True) + shutil.move(str(old_cache), str(self.cacheLocation)) + + # Clean up old directory if empty + if old_cache.parent.exists() and not any(old_cache.parent.iterdir()): + old_cache.parent.rmdir() diff --git a/src/rapidata/rapidata_client/datapoints/_asset_uploader.py b/src/rapidata/rapidata_client/datapoints/_asset_uploader.py index 320462989..51ccab4b5 100644 --- a/src/rapidata/rapidata_client/datapoints/_asset_uploader.py +++ b/src/rapidata/rapidata_client/datapoints/_asset_uploader.py @@ -26,7 +26,6 @@ class AssetUploader: rapidata_config.upload.cacheLocation, shards=rapidata_config.upload.cacheShards, timeout=rapidata_config.upload.cacheTimeout, - size_limit=rapidata_config.upload.cacheSizeLimit, ), ) _url_cache: SingleFlightCache = SingleFlightCache("URL cache") @@ -51,7 +50,6 @@ def _handle_config_update(cls, config: UploadConfig): needs_recreation = ( current_storage.directory != str(config.cacheLocation) or current_storage.timeout != config.cacheTimeout - or current_storage.size_limit != config.cacheSizeLimit / config.cacheShards or len(current_storage._shards) != config.cacheShards ) @@ -65,15 +63,13 @@ def _handle_config_update(cls, config: UploadConfig): config.cacheLocation, shards=config.cacheShards, timeout=config.cacheTimeout, - size_limit=config.cacheSizeLimit, ) ) logger.info( - "AssetUploader file cache updated: location=%s, shards=%s, timeout=%s, size_limit=%s", + "AssetUploader file cache updated: location=%s, shards=%s, timeout=%s", config.cacheLocation, config.cacheShards, config.cacheTimeout, - config.cacheSizeLimit, ) except Exception as e: logger.warning(f"Failed to update AssetUploader file cache: {e}") From dbf6d0014e9d17dd97170b3cd5faed297a7b86c0 Mon Sep 17 00:00:00 2001 From: Lino Giger <68745352+LinoGiger@users.noreply.github.com> Date: Mon, 26 Jan 2026 17:11:10 +0100 Subject: [PATCH 3/3] simplified update --- .../rapidata_client/config/upload_config.py | 71 ++++++------------- .../datapoints/_asset_uploader.py | 51 ------------- 2 files changed, 22 insertions(+), 100 deletions(-) diff --git a/src/rapidata/rapidata_client/config/upload_config.py b/src/rapidata/rapidata_client/config/upload_config.py index 58cb30e4f..b2981c390 100644 --- a/src/rapidata/rapidata_client/config/upload_config.py +++ b/src/rapidata/rapidata_client/config/upload_config.py @@ -1,30 +1,8 @@ from pathlib import Path -import threading import shutil -from typing import Callable from pydantic import BaseModel, Field, field_validator from rapidata.rapidata_client.config import logger -# Type alias for config update handlers -UploadConfigUpdateHandler = Callable[["UploadConfig"], None] - -# Global list to store registered handlers with thread-safe access -_handlers_lock = threading.Lock() -_upload_config_handlers: list[UploadConfigUpdateHandler] = [] - - -def register_upload_config_handler(handler: UploadConfigUpdateHandler) -> None: - """Register a handler to be called when the upload configuration updates.""" - with _handlers_lock: - _upload_config_handlers.append(handler) - - -def unregister_upload_config_handler(handler: UploadConfigUpdateHandler) -> None: - """Unregister a previously registered handler.""" - with _handlers_lock: - if handler in _upload_config_handlers: - _upload_config_handlers.remove(handler) - class UploadConfig(BaseModel): """ @@ -36,8 +14,10 @@ class UploadConfig(BaseModel): cacheUploads (bool): Enable/disable upload caching. Defaults to True. cacheTimeout (float): Cache operation timeout in seconds. Defaults to 0.1. cacheLocation (Path): Directory for cache storage. Defaults to ~/.cache/rapidata/upload_cache. + This is immutable cacheShards (int): Number of cache shards for parallel access. Defaults to 128. Higher values improve concurrency but increase file handles. Must be positive. + This is immutable """ maxWorkers: int = Field(default=25) @@ -45,9 +25,13 @@ class UploadConfig(BaseModel): cacheUploads: bool = Field(default=True) cacheTimeout: float = Field(default=0.1) cacheLocation: Path = Field( - default=Path.home() / ".cache" / "rapidata" / "upload_cache" + default=Path.home() / ".cache" / "rapidata" / "upload_cache", + frozen=True, + ) + cacheShards: int = Field( + default=128, + frozen=True, ) - cacheShards: int = Field(default=128) @field_validator("maxWorkers") @classmethod @@ -72,35 +56,24 @@ def validate_cache_shards(cls, v: int) -> int: def __init__(self, **kwargs): super().__init__(**kwargs) - self._notify_handlers() self._migrate_cache() - def __setattr__(self, name: str, value) -> None: - super().__setattr__(name, value) - self._notify_handlers() - - def _notify_handlers(self) -> None: - """Notify all registered handlers that the configuration has updated.""" - # Snapshot handlers under lock to prevent modifications during iteration - with _handlers_lock: - handlers = _upload_config_handlers.copy() - - # Execute handlers outside lock to avoid deadlocks - for handler in handlers: - try: - handler(self) - except Exception as e: - logger.warning(f"Warning: UploadConfig handler failed: {e}") - def _migrate_cache(self) -> None: """Migrate the cache from the old location to the new location.""" old_cache = Path.home() / ".rapidata" / "upload_cache" new_cache = self.cacheLocation if old_cache.exists() and not new_cache.exists(): - logger.info(f"Migrating cache from {old_cache} to {self.cacheLocation}") - self.cacheLocation.parent.mkdir(parents=True, exist_ok=True) - shutil.move(str(old_cache), str(self.cacheLocation)) - - # Clean up old directory if empty - if old_cache.parent.exists() and not any(old_cache.parent.iterdir()): - old_cache.parent.rmdir() + try: + logger.info(f"Migrating cache from {old_cache} to {self.cacheLocation}") + self.cacheLocation.parent.mkdir(parents=True, exist_ok=True) + shutil.move(str(old_cache), str(self.cacheLocation)) + + # Clean up old directory if empty + if old_cache.parent.exists() and not any(old_cache.parent.iterdir()): + old_cache.parent.rmdir() + logger.info("Cache migration completed successfully") + except Exception as e: + logger.warning( + f"Failed to migrate cache from {old_cache} to {new_cache}: {e}. " + "Starting with empty cache. You may want to manually move the old cache." + ) diff --git a/src/rapidata/rapidata_client/datapoints/_asset_uploader.py b/src/rapidata/rapidata_client/datapoints/_asset_uploader.py index 51ccab4b5..5be916f79 100644 --- a/src/rapidata/rapidata_client/datapoints/_asset_uploader.py +++ b/src/rapidata/rapidata_client/datapoints/_asset_uploader.py @@ -2,13 +2,8 @@ import re import os -import threading from typing import TYPE_CHECKING -from rapidata.rapidata_client.config.upload_config import ( - register_upload_config_handler, - UploadConfig, -) from rapidata.service.openapi_service import OpenAPIService from rapidata.rapidata_client.config import logger, rapidata_config, tracer from rapidata.rapidata_client.datapoints._single_flight_cache import SingleFlightCache @@ -19,7 +14,6 @@ class AssetUploader: - _cache_update_lock = threading.RLock() # RLock allows reentrant locking _file_cache: SingleFlightCache = SingleFlightCache( "File cache", storage=FanoutCache( @@ -33,47 +27,6 @@ class AssetUploader: def __init__(self, openapi_service: OpenAPIService): self.openapi_service = openapi_service - @classmethod - def _handle_config_update(cls, config: UploadConfig): - """Handle updates to the upload config by re-creating the file cache storage if needed.""" - with cls._cache_update_lock: - # Only recreate cache if cache-related parameters changed - # maxWorkers changes don't affect cache structure - logger.debug("Upload config updated, checking if cache recreation needed") - try: - # Get current cache config - current_storage = cls._file_cache._storage - if not isinstance(current_storage, FanoutCache): - return - - # Check if cache parameters changed - needs_recreation = ( - current_storage.directory != str(config.cacheLocation) - or current_storage.timeout != config.cacheTimeout - or len(current_storage._shards) != config.cacheShards - ) - - if not needs_recreation: - logger.debug("Cache parameters unchanged, skipping recreation") - return - - logger.info("Cache parameters changed, recreating cache") - cls._file_cache.set_storage( - FanoutCache( - config.cacheLocation, - shards=config.cacheShards, - timeout=config.cacheTimeout, - ) - ) - logger.info( - "AssetUploader file cache updated: location=%s, shards=%s, timeout=%s", - config.cacheLocation, - config.cacheShards, - config.cacheTimeout, - ) - except Exception as e: - logger.warning(f"Failed to update AssetUploader file cache: {e}") - def _get_file_cache_key(self, asset: str) -> str: """Generate cache key for a file, including environment.""" env = self.openapi_service.environment @@ -148,7 +101,3 @@ def __str__(self) -> str: def __repr__(self) -> str: return f"AssetUploader(openapi_service={self.openapi_service})" - - -# Register the config update handler at module level (once, not per instance) -register_upload_config_handler(AssetUploader._handle_config_update)