diff --git a/CHANGELOG.md b/CHANGELOG.md index 1edf1fb..d8f5fa2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,26 @@ # Changelog -## Version 0.4 (development) +## Version 0.5.0 + +- SQLAlchemy session management + * Implemented proper session handling + * Fixed `DetachedInstanceError` issues and added helper method `_get_detached_resource` for consistent session management + * Improved transaction handling with commits and rollbacks + +- New features + * Added cache statistics with `get_stats()` method + * Implemented resource tagging + * Added cache size management + * Added support for file compression + * Added resource validation with checksums + * Improved search + * Added metadata export/import functionality + +## Version 0.4.1 + +- Method to list all resources. + +## Version 0.4 - Migrate the schema to match R/Bioconductor's BiocFileCache (Check out [this issue](https://github.com/BiocPy/pyBiocFileCache/issues/11)). Thanks to [@khoroshevskyi ](https://github.com/khoroshevskyi) for the PR. diff --git a/README.md b/README.md index 976823d..85713ca 100644 --- a/README.md +++ b/README.md @@ -4,74 +4,91 @@ # pyBiocFileCache -File system based cache for resources & metadata. Compatible with [BiocFileCache R package](https://github.com/Bioconductor/BiocFileCache) +`pyBiocFileCache` is a Python package that provides a robust file caching system with resource validation, cache size management, file compression, and resource tagging. Compatible with [BiocFileCache R package](https://github.com/Bioconductor/BiocFileCache). -***Note: Package is in development. Use with caution!!*** +## Installation -### Installation +Install from [PyPI](https://pypi.org/project/pyBiocFileCache/), -Package is published to [PyPI](https://pypi.org/project/pyBiocFileCache/) - -``` +```bash pip install pybiocfilecache ``` -#### Initialize a cache directory +## Quick Start -``` -from pybiocfilecache import BiocFileCache -import os - -bfc = BiocFileCache(cache_dir = os.getcwd() + "/cache") -``` +```python +from biocfilecache import BiocFileCache -Once the cache directory is created, the library provides methods to -- `add`: Add a resource or artifact to cache -- `get`: Get the resource from cache -- `remove`: Remove a resource from cache -- `update`: update the resource in cache -- `purge`: purge the entire cache, removes all files in the cache directory +# Initialize cache +cache = BiocFileCache("path/to/cache/directory") -### Add a resource to cache +# Add a file to cache +resource = cache.add("myfile", "path/to/file.txt") -(for testing use the temp files in the `tests/data` directory) +# Retrieve a file from cache +resource = cache.get("myfile") -``` -rec = bfc.add("test1", os.getcwd() + "/test1.txt") -print(rec) +# Use the cached file +print(resource.rpath) # Path to cached file ``` -### Get resource from cache +## Advanced Usage -``` -rec = bfc.get("test1") -print(rec) -``` +### Configuration -### Remove resource from cache +```python +from biocfilecache import BiocFileCache, CacheConfig +from datetime import timedelta +from pathlib import Path -``` -rec = bfc.remove("test1") -print(rec) +# Create custom configuration +config = CacheConfig( + cache_dir=Path("cache_directory"), + max_size_bytes=1024 * 1024 * 1024, # 1GB + cleanup_interval=timedelta(days=7), + compression=True +) + +# Initialize cache with configuration +cache = BiocFileCache(config=config) ``` -### Update resource in cache +### Resource Management -``` -rec = bfc.get("test1"m os.getcwd() + "test2.txt") -print(rec) -``` +```python +# Add file with tags and expiration +from datetime import datetime, timedelta -### purge the cache +resource = cache.add( + "myfile", + "path/to/file.txt", + tags=["data", "raw"], + expires=datetime.now() + timedelta(days=30) +) -``` -bfc.purge() +# List resources by tag +resources = cache.list_resources(tag="data") + +# Search resources +results = cache.search("myfile", field="rname") + +# Update resource +cache.update("myfile", "path/to/new_file.txt") + +# Remove resource +cache.remove("myfile") ``` +### Cache Statistics and Maintenance - +```python +# Get cache statistics +stats = cache.get_stats() +print(stats) -## Note +# Clean up expired resources +removed_count = cache.cleanup() -This project has been set up using PyScaffold 4.1. For details and usage -information on PyScaffold see https://pyscaffold.org/. +# Purge entire cache +cache.purge() +``` diff --git a/docs/best_practices.md b/docs/best_practices.md new file mode 100644 index 0000000..0a134a9 --- /dev/null +++ b/docs/best_practices.md @@ -0,0 +1,28 @@ +# Best Practices + +1. Use context managers for cleanup: +```python +with BiocFileCache("cache_directory") as cache: + cache.add("myfile", "path/to/file.txt") +``` + +2. Add tags for better organization: +```python +cache.add("data.csv", "data.csv", tags=["raw", "csv", "2024"]) +``` + +3. Set expiration dates for temporary files: +```python +cache.add("temp.txt", "temp.txt", expires=datetime.now() + timedelta(hours=1)) +``` + +4. Regular maintenance: +```python +# Periodically clean up expired resources +cache.cleanup() + +# Monitor cache size +stats = cache.get_stats() +if stats["cache_size_bytes"] > threshold: + # Take action +``` diff --git a/setup.cfg b/setup.cfg index 5ec4014..55ad6f5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -47,7 +47,7 @@ package_dir = # For more information, check out https://semver.org/. install_requires = importlib-metadata; python_version<"3.8" - sqlalchemy>=2,<2.1 + sqlalchemy [options.packages.find] where = src diff --git a/src/pybiocfilecache/BiocFileCache.py b/src/pybiocfilecache/BiocFileCache.py deleted file mode 100644 index e8dfaa2..0000000 --- a/src/pybiocfilecache/BiocFileCache.py +++ /dev/null @@ -1,274 +0,0 @@ -"""Python Implementation of BiocFileCache.""" - -import os -from pathlib import Path -from time import sleep, time -from typing import List, Literal, Optional, Union - -from sqlalchemy import func -from sqlalchemy.orm import Session - -from ._exceptions import NoFpathError, RnameExistsError, RpathTimeoutError -from .db.db_config import create_schema, Resource -from .utils import copy_or_move, create_tmp_dir, generate_id - -__author__ = "Jayaram Kancherla" -__copyright__ = "jkanche" -__license__ = "MIT" - - -class BiocFileCache: - """Class to manage and cache files.""" - - def __init__(self, cacheDirOrPath: Union[str, Path] = create_tmp_dir()): - """Initialize BiocFileCache. - - Args: - cacheDirOrPath: - Path to cache directory. - - Defaults to tmp location, :py:func:`~.utils.create_tmp_dir`. - - Raises: - Exception: Failed to initialize cache. - """ - if isinstance(cacheDirOrPath, str): - cacheDirOrPath = Path(cacheDirOrPath) - - if not cacheDirOrPath.exists(): - mode = 0o777 - try: - cacheDirOrPath.mkdir(mode=mode, parents=True, exist_ok=True) - except Exception as e: - raise Exception(f"Failed to created directory {cacheDirOrPath}") from e - - self.cache = str(cacheDirOrPath) - - # create/access sqlite file - self.db_cache = f"{self.cache}/BiocFileCache.sqlite" - (self.engine, self.sessionLocal) = create_schema(self.db_cache) - - def add( - self, - rname: str, - fpath: Union[str, Path], - rtype: Literal["local", "web", "relative"] = "local", - action: Literal["copy", "move", "asis"] = "copy", - ext: bool = False, - ) -> Resource: - """Add a resource from the provided `fpath` to cache as `rname`. - - Args: - rname: - Name of the resource to add to cache. - - fpath: - Location of the resource. - - rtype: - One of ``local``, ``web``, or ``relative``. - Defaults to ``local``. - - action: - Either ``copy``, ``move`` or ``asis``. - Defaults to ``copy``. - - ext: - Whether to use filepath extension when storing in cache. - Defaults to `False`. - - Raises: - NoFpathError: - When the `fpath` does not exist. - - RnameExistsError: - When the `rname` already exists in the cache. - sqlalchemy exceptions: When something is up with the cache. - - Returns: - Database record of the new resource in cache. - """ - if isinstance(fpath, str): - fpath = Path(fpath) - - if not fpath.exists(): - raise NoFpathError(f"Resource at '{fpath}' does not exist.") - - rid = generate_id() - rpath = f"{self.cache}/{rid}" + (f".{fpath.suffix}" if ext else "") if action != "asis" else str(fpath) - - # create new record in the database - res = Resource( - **dict( - rid=rid, - rname=rname, - rpath=rpath, - rtype=rtype, - fpath=str(fpath), - ) - ) - - # If this was higher up a parallel process could have added the key to - # the cache in the meantime as the above takes a bit, so checking here - # reduces the odds of this happening - # Redirecting to update was removed as it is a scenario better handled - # by the caller. - if self.get(rname) is not None: - raise RnameExistsError("Resource already exists in cache!") - - with self.sessionLocal() as session: - session.add(res) - session.commit() - session.refresh(res) - - # In the "move" scenario if we move the file to rpath before rpath is - # part of the cache and then when trying to add it to the cache an - # exception is raised (such as if it is locked by another process) the - # data essentially disappears to rpath with no way of retrieving its - # location. Thus we add rpath to the cache first, then move the data - # into it so that the data at source does not disappear if accessing - # the cache raises an exception. - copy_or_move(str(fpath), rpath, rname, action) - - return res - - def query(self, query: str, field: str = "rname") -> List[Resource]: - """Search cache for a resource. - - Args: - query: - Query string or keywords to search. - - field: - Field to search. - Defaults to "rname". - - Returns: - List of matching resources from cache. - """ - with self.sessionLocal() as session: - return session.query(Resource).filter(Resource[field].ilike("%{}%".format(query))).all() - - def _get(self, session: Session, rname: str) -> Optional[Resource]: - """Get a resource with `rname` from given `Session`. - - Args: - session: - The `Session` object to use. - - rname: - The `rname` of the `Resource` to get. - - Returns: - The `Resource` for the `rname` if available. - """ - resource: Optional[Resource] = session.query(Resource).filter(Resource.rname == rname).first() - - if resource is not None: - # `Resource` may exist but `rpath` could still be being - # moved/copied into by `add`, wait until `rpath` exists - start = time() - timeout = 30 - while not Path(str(resource.rpath)).exists(): - if time() - start >= timeout: - raise RpathTimeoutError( - f"For resource: '{rname}' the rpath does not exist " f"after {timeout} seconds." - ) - sleep(0.1) - - return resource - - def get(self, rname: str) -> Optional[Resource]: - """Get resource by name from cache. - - Args: - rname: - Name of the file to search. - - Returns: - Matched `Resource` from cache if exists. - """ - return self._get(self.sessionLocal(), rname) - - def remove(self, rname: str) -> None: - """Remove a resource from cache by name. - - Args: - rname: - Name of the resource to remove. - """ - with self.sessionLocal() as session: - res: Optional[Resource] = self._get(session, rname) - - if res is not None: - session.delete(res) - session.commit() - # remove file - Path(res.rpath).unlink() - - def purge(self): - """Remove all files from cache.""" - for file in os.scandir(self.cache): - os.remove(file.path) - - return True - - def update( - self, - rname: str, - fpath: Union[str, Path], - action: Literal["copy", "move", "asis"] = "copy", - ) -> Resource: - """Update a resource in cache. - - Args: - rname: - Name of the resource in cache. - - fpath: - New resource to replace existing file in cache. - - action: - Either ``copy``, ``move`` or ``asis``. - - Defaults to ``copy``. - - Returns: - Updated resource record in cache. - """ - - if isinstance(fpath, str): - fpath = Path(fpath) - - if not fpath.exists(): - raise Exception(f"File: '{fpath}' does not exist") - - with self.sessionLocal() as session: - res = self._get(session, rname) - - if res is not None: - if action != "asis": - # copy the file to cache - copy_or_move(str(fpath), str(res.rpath), rname, action) - else: - res.rpath = str(fpath) - - res.access_time = res.last_modified_time = func.now() - session.merge(res) - session.commit() - session.refresh(res) - else: - # technically an error since update shouldn't be called on - # non-existent resources in cache. - # but lets just add it to the cache. - res = self.add(rname=rname, fpath=fpath, action=action) - return res - - def list_all(self) -> List[Resource]: - """List all resources currently in the cache. - - Returns: - List of all Resource objects in the cache. - """ - with self.sessionLocal() as session: - return session.query(Resource).all() diff --git a/src/pybiocfilecache/__init__.py b/src/pybiocfilecache/__init__.py index ee5910d..9479535 100644 --- a/src/pybiocfilecache/__init__.py +++ b/src/pybiocfilecache/__init__.py @@ -15,11 +15,4 @@ finally: del version, PackageNotFoundError -from .BiocFileCache import BiocFileCache as BiocFileCache - -from .db.db_config import Metadata as Metadata -from .db.db_config import Resource as Resource - -from ._exceptions import NoFpathError as NoFpathError -from ._exceptions import RnameExistsError as RnameExistsError -from ._exceptions import RpathTimeoutError as RpathTimeoutError +from .cache import BiocFileCache diff --git a/src/pybiocfilecache/_exceptions.py b/src/pybiocfilecache/_exceptions.py deleted file mode 100644 index f46bba7..0000000 --- a/src/pybiocfilecache/_exceptions.py +++ /dev/null @@ -1,15 +0,0 @@ -__author__ = "jkanche" -__copyright__ = "jkanche" -__license__ = "MIT" - - -class NoFpathError(Exception): - """An error for when the source file does not exist.""" - - -class RnameExistsError(Exception): - """An error for when the key already exists in the cache.""" - - -class RpathTimeoutError(Exception): - """An error for when the 'rpath' does not exist after a timeout.""" diff --git a/src/pybiocfilecache/cache.py b/src/pybiocfilecache/cache.py new file mode 100644 index 0000000..6179e80 --- /dev/null +++ b/src/pybiocfilecache/cache.py @@ -0,0 +1,633 @@ +import json +import logging +from contextlib import contextmanager +from datetime import datetime +from pathlib import Path +from time import sleep, time +from typing import Any, Dict, Iterator, List, Literal, Optional, Tuple, Union + +from sqlalchemy import create_engine, func +from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.pool import QueuePool + +from .config import CacheConfig +from .exceptions import ( + BiocCacheError, + CacheSizeLimitError, + InvalidRnameError, + NoFpathError, + RnameExistsError, + RpathTimeoutError, +) +from .models import Base, Resource +from .utils import ( + calculate_file_hash, + copy_or_move, + create_tmp_dir, + generate_id, + get_file_size, + validate_rname, +) + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + +logger = logging.getLogger(__name__) + + +class BiocFileCache: + """Enhanced file caching module. + + Features: + - Resource validation and integrity checking + - Cache size management + - File compression + - Resource tagging + - Automatic cleanup of expired resources + """ + + def __init__(self, cache_dir: Optional[Union[str, Path]] = None, config: Optional[CacheConfig] = None): + """Initialize cache with optional configuration. + + Args: + cache_dir: + Path to cache directory. + Defaults to tmp location, :py:func:`~.utils.create_tmp_dir`. + Ignored if config already contains the path to the cache directory. + + config: + Optional configuration. + + """ + if config is None: + cache_dir = Path(cache_dir) if cache_dir else create_tmp_dir() + config = CacheConfig(cache_dir=cache_dir) + + self.config = config + self._setup_cache_dir() + self._setup_database() + self._last_cleanup = datetime.now() + + def _setup_cache_dir(self) -> None: + if not self.config.cache_dir.exists(): + self.config.cache_dir.mkdir(parents=True, exist_ok=True) + + def _setup_database(self) -> None: + db_path = self.config.cache_dir / "BiocFileCache.sqlite" + self.engine = create_engine( + f"sqlite:///{db_path}", + poolclass=QueuePool, + pool_size=5, + max_overflow=10, + pool_timeout=30, + connect_args={"check_same_thread": False}, + ) + + Base.metadata.create_all(self.engine) + self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine) + + def _get_detached_resource(self, session: Session, resource: Resource) -> Optional[Resource]: + """Get a detached copy of a resource.""" + if resource is None: + return None + session.refresh(resource) + session.expunge(resource) + return resource + + def __enter__(self) -> "BiocFileCache": + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.close() + + def close(self) -> None: + """Clean up resources.""" + self.engine.dispose() + + @contextmanager + def get_session(self) -> Iterator[Session]: + """Provide database session with automatic cleanup.""" + session = self.SessionLocal() + try: + yield session + session.commit() + except: + session.rollback() + raise + finally: + session.close() + + def _check_cache_size(self, new_size: int) -> None: + """Verify cache size limit won't be exceeded.""" + if self.config.max_size_bytes is None: + return + + current_size = self.get_cache_size() + if current_size + new_size > self.config.max_size_bytes: + raise CacheSizeLimitError( + f"Adding {new_size} bytes would exceed cache limit of " f"{self.config.max_size_bytes} bytes" + ) + + def _validate_rname(self, rname: str) -> None: + """Validate resource name format.""" + if not validate_rname(rname, self.config.rname_pattern): + raise InvalidRnameError(f"Resource name '{rname}' doesn't match pattern " f"'{self.config.rname_pattern}'") + + def _should_cleanup(self) -> bool: + """Check if cache cleanup should be performed. + + Returns: + True if `cleanup_interval` is set and time since last cleanup exceeds it. + """ + if self.config.cleanup_interval is None: + return False + + return datetime.now() - self._last_cleanup > self.config.cleanup_interval + + def cleanup(self) -> int: + """Remove expired resources from the cache. + + Returns: + Number of resources removed. + + Note: + - If `cleanup_interval` is None, this method will still run if called explicitly. + - Only removes resources with non-None expiration dates. + """ + if not any([self.config.cleanup_interval, self._should_cleanup()]): + return 0 # Early return if automatic cleanup is disabled + + removed = 0 + with self.get_session() as session: + # Only query resources that have expiration dates + expired = ( + session.query(Resource) + .filter( + Resource.expires.isnot(None), # Only check resources with expiration + Resource.expires < datetime.now(), + ) + .all() + ) + + for resource in expired: + try: + Path(resource.rpath).unlink(missing_ok=True) + session.delete(resource) + removed += 1 + except Exception as e: + logger.error(f"Failed to remove expired resource: {resource.rname}", exc_info=e) + + session.commit() + + self._last_cleanup = datetime.now() + return removed + + def get(self, rname: str) -> Optional[Resource]: + """Get resource by name from cache. + + Args: + rname: + Name to identify the resource in cache. + + """ + with self.get_session() as session: + resource = session.query(Resource).filter(Resource.rname == rname).first() + + if resource is not None: + # Check if path exists with timeout + start = time() + timeout = 30 + while not Path(str(resource.rpath)).exists(): + if time() - start >= timeout: + raise RpathTimeoutError( + f"For resource: '{rname}' the rpath does not exist " f"after {timeout} seconds." + ) + sleep(0.1) + + # Update access time + resource.access_time = datetime.now() + session.commit() + return self._get_detached_resource(session, resource) + + return None + + def add( + self, + rname: str, + fpath: Union[str, Path], + rtype: Literal["local", "web", "relative"] = "local", + action: Literal["copy", "move", "asis"] = "copy", + tags: Optional[List[str]] = None, + expires: Optional[datetime] = None, + ext: bool = False, + ) -> Resource: + """Add a resource to the cache. + + Args: + rname: + Name to identify the resource in cache. + + fpath: + Path to the source file. + + rtype: + Type of resource. + One of ``local``, ``web``, or ``relative``. + Defaults to ``local``. + + action: + How to handle the file ("copy", "move", or "asis"). + Defaults to ``copy``. + + tags: + Optional list of tags for categorization. + + expires: + Optional expiration datetime. + If None, resource never expires. + + ext: + Whether to use filepath extension when storing in cache. + Defaults to `False`. + + Returns: + The `Resource` object added to the cache. + """ + self._validate_rname(rname) + fpath = Path(fpath) + + if not fpath.exists(): + raise NoFpathError(f"Resource at '{fpath}' does not exist") + + if self.get(rname) is not None: + raise RnameExistsError(f"Resource '{rname}' already exists") + + # Generate paths and check size + rid = generate_id() + rpath = self.config.cache_dir / f"{rid}{fpath.suffix if ext else ''}" if action != "asis" else fpath + + self._check_cache_size(get_file_size(fpath)) + + # Create resource record + resource = Resource( + rid=rid, + rname=rname, + rpath=str(rpath), + rtype=rtype, + fpath=str(fpath), + tags=",".join(tags) if tags else None, + expires=expires, + size_bytes=get_file_size(fpath), + ) + + # Store file and update database + with self.get_session() as session: + session.add(resource) + session.commit() + + try: + copy_or_move(fpath, rpath, rname, action, self.config.compression) + + # Calculate and store checksum + resource.etag = calculate_file_hash(rpath, self.config.hash_algorithm) + session.commit() + result = self._get_detached_resource(session, resource) + return result + + except Exception as e: + session.delete(resource) + session.commit() + raise BiocCacheError("Failed to add resource") from e + + def add_batch(self, resources: List[Dict[str, Any]]) -> List[Resource]: + """Add multiple resources in a single transaction. + + Args: + resources: + List of resources to add. + """ + results = [] + with self.get_session() as session: + for resource_info in resources: + try: + resource = self.add(**resource_info) + results.append(resource) + except Exception as e: + logger.error(f"Failed to add resource: {resource_info.get('rname')}", exc_info=e) + session.rollback() + raise + return results + + def update( + self, + rname: str, + fpath: Union[str, Path], + action: Literal["copy", "move", "asis"] = "copy", + tags: Optional[List[str]] = None, + ) -> Resource: + """Update an existing resource. + + Args: + rname: + Name to identify the resource in cache. + + fpath: + Path to the new source file. + + action: + Either ``copy``, ``move`` or ``asis``. + Defaults to ``copy``. + + tags: + Optional new list of tags. + + Returns: + Updated `Resource` object. + + """ + fpath = Path(fpath) + if not fpath.exists(): + raise NoFpathError(f"File '{fpath}' does not exist") + + with self.get_session() as session: + resource = session.query(Resource).filter(Resource.rname == rname).first() + + if resource is None: + return self.add(rname=rname, fpath=fpath, action=action) + + old_path = Path(resource.rpath) + try: + copy_or_move(fpath, old_path, rname, action, self.config.compression) + + resource.last_modified_time = datetime.now() + resource.etag = calculate_file_hash(old_path, self.config.hash_algorithm) + resource.size_bytes = get_file_size(old_path) + + if tags is not None: + resource.tags = ",".join(tags) + + session.commit() + return self._get_detached_resource(session, resource) + + except Exception as e: + session.rollback() + raise BiocCacheError("Failed to update resource") from e + + def remove(self, rname: str) -> None: + """Remove a resource from cache by name. + + Removes both the cached file and its database entry. + + Args: + rname: + Name to identify the resource in cache. + + Raises: + BiocCacheError: If resource removal fails + """ + with self.get_session() as session: + resource = session.query(Resource).filter(Resource.rname == rname).first() + + if resource is not None: + try: + # Try to remove the file first + rpath = Path(resource.rpath) + if rpath.exists(): + rpath.unlink() + + # Then remove from database + session.delete(resource) + session.commit() + + except Exception as e: + session.rollback() + raise BiocCacheError(f"Failed to remove resource '{rname}'") from e + + def list_resources( + self, tag: Optional[str] = None, rtype: Optional[str] = None, expired: Optional[bool] = None + ) -> List[Resource]: + """List resources in the cache with optional filtering. + + Args: + tag: + Filter resources by tag. + + rtype: + Filter resources by type. + + expired: + Filter by expiration status + True: only expired resources + False: only non-expired resources + None: all resources + Note: Resources with no expiration are always considered non-expired. + + Returns: + List of Resource objects matching the filters + """ + with self.get_session() as session: + query = session.query(Resource) + + if tag: + query = query.filter(Resource.tags.like(f"%{tag}%")) + if rtype: + query = query.filter(Resource.rtype == rtype) + if expired is not None: + if expired: + query = query.filter( + Resource.expires.isnot(None), # Only check resources with expiration + Resource.expires < datetime.now(), + ) + else: + query = query.filter( + (Resource.expires.is_(None)) # Never expires + | (Resource.expires > datetime.now()) # Not yet expired + ) + + resources = query.all() + return [self._get_detached_resource(session, r) for r in resources] + + def validate_resource(self, resource: Resource) -> bool: + """Validate resource integrity. + + Args: + resource: + Resource to validate. + + Returns: + True if resource is valid, False otherwise. + """ + if not resource.etag: + return True # No validation if no checksum + + try: + current_hash = calculate_file_hash(Path(resource.rpath), self.config.hash_algorithm) + return current_hash == resource.etag + except Exception as e: + logger.error(f"Failed to validate resource: {resource.rname}", exc_info=e) + return False + + def get_cache_size(self) -> int: + """Get total size of cached files in bytes.""" + with self.get_session() as session: + return session.query(func.sum(Resource.size_bytes)).scalar() or 0 + + def export_metadata(self, path: Path) -> None: + """Export cache metadata to JSON file.""" + data = { + "resources": [ + { + "rname": r.rname, + "rtype": r.rtype, + "tags": r.tags, + "expires": r.expires.isoformat() if r.expires else None, + "etag": r.etag, + "size_bytes": r.size_bytes, + } + for r in self.list_resources() + ], + "cache_size": self.get_cache_size(), + "export_time": datetime.now().isoformat(), + } + + with open(path, "w") as f: + json.dump(data, f, indent=2) + + def import_metadata(self, path: Path) -> None: + """Import cache metadata from JSON file.""" + with open(path) as f: + data = json.load(f) + + with self.get_session() as session: + for resource_data in data["resources"]: + resource = self._get(session, resource_data["rname"]) + if resource: + resource.tags = resource_data["tags"] + resource.expires = ( + datetime.fromisoformat(resource_data["expires"]) if resource_data["expires"] else None + ) + session.merge(resource) + session.commit() + + def verify_cache(self) -> Tuple[int, int]: + """Verify integrity of all cached resources. + + Returns: + Tuple of (valid_count, invalid_count). + """ + valid = invalid = 0 + for resource in self.list_resources(): + if self.validate_resource(resource): + valid += 1 + else: + invalid += 1 + return valid, invalid + + def search(self, query: str, field: str = "rname", exact: bool = False) -> List[Resource]: + """Search for resources by field value. + + Args: + query: + Search string. + + field: + Resource field to search ("rname", "rtype", "tags", etc.). + + exact: + Whether to require exact match. + + Returns: + List of matching resources. + """ + with self.get_session() as session: + if exact: + resources = session.query(Resource).filter(Resource[field] == query).all() + else: + resources = session.query(Resource).filter(Resource[field].ilike(f"%{query}%")).all() + + return [self._get_detached_resource(session, r) for r in resources] + + def get_stats(self) -> Dict[str, Any]: + """Get statistics about the cache.""" + with self.get_session() as session: + total = session.query(Resource).count() + expired = ( + session.query(Resource) + .filter( + Resource.expires.isnot(None), # Only check resources with expiration + Resource.expires < datetime.now(), + ) + .count() + ) + types = dict(session.query(Resource.rtype, func.count(Resource.id)).group_by(Resource.rtype).all()) + + return { + "total_resources": total, + "expired_resources": expired, + "cache_size_bytes": self.get_cache_size(), + "resource_types": types, + "last_cleanup": self._last_cleanup.isoformat(), + "cleanup_enabled": self.config.cleanup_interval is not None, + } + + def purge(self, force: bool = False) -> bool: + """Remove all resources from cache and reset database. + + Args: + force: + If True, skip validation and remove all files + even if database operations fail. + + Returns: + True if purge was successful, False otherwise. + + Raises: + BiocCacheError: If purge fails and force=False. + """ + try: + with self.get_session() as session: + resources = session.query(Resource).all() + session.query(Resource).delete() + + for resource in resources: + try: + Path(resource.rpath).unlink(missing_ok=True) + except Exception as e: + if not force: + session.rollback() + raise BiocCacheError(f"Failed to remove file for resource '{resource.rname}'") from e + logger.warning(f"Failed to remove file for resource '{resource.rname}': {e}") + + session.commit() + + if force: + for file in self.config.cache_dir.iterdir(): + if file.name != "BiocFileCache.sqlite": + try: + if file.is_file(): + file.unlink() + elif file.is_dir(): + file.rmdir() + except Exception as e: + logger.warning(f"Failed to remove {file}: {e}") + + self._last_cleanup = datetime.now() + return True + + except Exception as e: + if not force: + raise BiocCacheError("Failed to purge cache") from e + + logger.error("Database cleanup failed, forcing file removal", exc_info=e) + for file in self.config.cache_dir.iterdir(): + if file.name != "BiocFileCache.sqlite": + try: + if file.is_file(): + file.unlink() + elif file.is_dir(): + file.rmdir() + except Exception as file_e: + logger.warning(f"Failed to remove {file}: {file_e}") + + return False diff --git a/src/pybiocfilecache/config.py b/src/pybiocfilecache/config.py new file mode 100644 index 0000000..9f120ad --- /dev/null +++ b/src/pybiocfilecache/config.py @@ -0,0 +1,42 @@ +from dataclasses import dataclass +from datetime import timedelta +from pathlib import Path +from typing import Optional + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +@dataclass +class CacheConfig: + """Configuration for BiocFileCache. + + Attributes: + cache_dir: + Directory to store cached files. + + max_size_bytes: + Maximum total size of cache. + None for unlimited. + + cleanup_interval: + How often to run expired resource cleanup. + None for no cleanup. + + rname_pattern: + Regex pattern for valid resource names. + + hash_algorithm: + Algorithm to use for file checksums. + + compression: + Whether to compress cached files. + """ + + cache_dir: Path + max_size_bytes: Optional[int] = None + cleanup_interval: Optional[timedelta] = None # timedelta(days=30) + rname_pattern: str = r"^[a-zA-Z0-9_-]+$" + hash_algorithm: str = "md5" + compression: bool = False diff --git a/src/pybiocfilecache/db/__init__.py b/src/pybiocfilecache/db/__init__.py deleted file mode 100644 index cf6783a..0000000 --- a/src/pybiocfilecache/db/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -__author__ = "jkanche" -__copyright__ = "jkanche" -__license__ = "MIT" - -from .db_config import create_schema diff --git a/src/pybiocfilecache/db/db_config.py b/src/pybiocfilecache/db/db_config.py deleted file mode 100644 index 638433a..0000000 --- a/src/pybiocfilecache/db/db_config.py +++ /dev/null @@ -1,82 +0,0 @@ -from typing import Tuple - -from sqlalchemy import create_engine, select, Column, Integer, Text, DateTime, func -from sqlalchemy.engine import Engine -from sqlalchemy.orm.session import Session - -from sqlalchemy.orm import declarative_base, sessionmaker - -from ..const import SCHEMA_VERSION - -__author__ = "jkanche" -__copyright__ = "jkanche" -__license__ = "MIT" - -Base = declarative_base() - - -class Metadata(Base): - __tablename__ = "metadata" - key = Column(Text(), primary_key=True, index=True) - value = Column(Text()) - - def __repr__(self): - return "" % (self.key, self.value) - - -class Resource(Base): - __tablename__ = "resource" - id = Column(Integer, primary_key=True, index=True, autoincrement=True) - rid = Column(Text()) - rname = Column(Text()) - create_time = Column(DateTime, server_default=func.now()) - access_time = Column(DateTime, server_default=func.now()) - rpath = Column(Text()) - rtype = Column(Text()) - fpath = Column(Text()) - last_modified_time = Column(DateTime, onupdate=func.now()) - etag = Column(Text()) - expires = Column(DateTime) - - def __repr__(self): - return "" % (self.id, self.rname) - - -def add_metadata(key: str, value: str, engine: Engine) -> None: - """Add metadata to the database. - - Args: - key: - Key of the metadata. - value: - Value of the metadata. - engine: - Engine - """ - with Session(engine) as session: - if session.scalar(select(Metadata).where(Metadata.key == key)): - pass - else: - new_metadata = Metadata(key=key, value=value) - session.add(new_metadata) - session.commit() - - -def create_schema(cache_dir: str) -> Tuple[Engine, sessionmaker]: - """Create the schema in the sqlite database. - - Args: - cache_dir: - Location where the cache directory. - - Returns: - A tuple of sqlalchemy engine and session maker. - """ - engine = create_engine(f"sqlite:///{cache_dir}", connect_args={"check_same_thread": False}) - - Base.metadata.create_all(bind=engine, checkfirst=True) - sessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) - - add_metadata("schema_version", SCHEMA_VERSION, engine) - - return (engine, sessionLocal) diff --git a/src/pybiocfilecache/exceptions.py b/src/pybiocfilecache/exceptions.py new file mode 100644 index 0000000..7bfd11c --- /dev/null +++ b/src/pybiocfilecache/exceptions.py @@ -0,0 +1,31 @@ +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +class BiocCacheError(Exception): + """Base exception for BiocFileCache errors.""" + + +class NoFpathError(BiocCacheError): + """Source file does not exist.""" + + +class RnameExistsError(BiocCacheError): + """Resource name already exists in cache.""" + + +class RpathTimeoutError(BiocCacheError): + """Resource path does not exist after timeout.""" + + +class CacheSizeLimitError(BiocCacheError): + """Cache size limit would be exceeded.""" + + +class ResourceValidationError(BiocCacheError): + """Resource failed validation check.""" + + +class InvalidRnameError(BiocCacheError): + """Invalid resource name format.""" diff --git a/src/pybiocfilecache/models.py b/src/pybiocfilecache/models.py new file mode 100644 index 0000000..71c0998 --- /dev/null +++ b/src/pybiocfilecache/models.py @@ -0,0 +1,84 @@ +from sqlalchemy import Column, DateTime, Integer, Text, func +from sqlalchemy.orm import declarative_base + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + +Base = declarative_base() + + +class Metadata(Base): + """Database metadata information.""" + + __tablename__ = "metadata" + + key = Column(Text(), primary_key=True, index=True) + value = Column(Text()) + + def __repr__(self) -> str: + return f"" + + +class Resource(Base): + """Resource information stored in cache. + + Attributes: + id: + Auto-incrementing primary key + + rid: + Unique resource identifier (UUID) + + rname: + User-provided resource name + + create_time: + When the resource was first added + + access_time: + Last time the resource was accessed + + rpath: + Path to the resource in the cache + + rtype: + Type of resource (local, web, relative) + + fpath: + Original file path + + last_modified_time: + Last time the resource was modified + + etag: + Checksum/hash of the resource + + expires: + When the resource should be considered expired + + tags: + Optional comma-separated tags for categorization + + size_bytes: + Size of the resource in bytes + """ + + __tablename__ = "resource" + + id = Column(Integer, primary_key=True, index=True, autoincrement=True) + rid = Column(Text(), index=True) + rname = Column(Text(), index=True, unique=True) + create_time = Column(DateTime, server_default=func.now()) + access_time = Column(DateTime, server_default=func.now()) + rpath = Column(Text()) + rtype = Column(Text()) + fpath = Column(Text()) + last_modified_time = Column(DateTime, onupdate=func.now()) + etag = Column(Text()) + expires = Column(DateTime) + tags = Column(Text()) + size_bytes = Column(Integer) + + def __repr__(self) -> str: + return f"" diff --git a/src/pybiocfilecache/utils.py b/src/pybiocfilecache/utils.py index 442d762..9dda562 100644 --- a/src/pybiocfilecache/utils.py +++ b/src/pybiocfilecache/utils.py @@ -1,85 +1,83 @@ +import hashlib import logging -import sys +import re import tempfile import uuid +import zlib from pathlib import Path from shutil import copy2, move -from typing import Literal, Union +from typing import Literal + +from .exceptions import BiocCacheError __author__ = "Jayaram Kancherla" -__copyright__ = "jkanche" +__copyright__ = "Jayaram Kancherla" __license__ = "MIT" +logger = logging.getLogger(__name__) -def create_tmp_dir() -> str: - """Create a temporary directory. - Returns: - Temporary path to the directory. - """ - return tempfile.mkdtemp() +def create_tmp_dir() -> Path: + """Create a temporary directory.""" + return Path(tempfile.mkdtemp()) def generate_id() -> str: - """Generate uuid. - - Returns: - Unique string for use as id. - """ + """Generate unique identifier.""" return uuid.uuid4().hex -def copy_or_move( - source: Union[str, Path], - target: Union[str, Path], - rname: str, - action: Literal["copy", "move", "asis"] = "copy", -) -> None: - """Copy or move a resource from ``source`` to ``target``. +def validate_rname(rname: str, pattern: str) -> bool: + """Validate resource name format.""" + return bool(re.match(pattern, rname)) + + +def calculate_file_hash(path: Path, algorithm: str = "md5") -> str: + """Calculate file checksum.""" + hasher = hashlib.new(algorithm) + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hasher.update(chunk) + return hasher.hexdigest() - Args: - source: - Source location of the resource to copy of move. - target: - Destination to copy of move to. +def get_file_size(path: Path) -> int: + """Get file size in bytes.""" + return path.stat().st_size - rname: - Name of resource to add to cache. - action: - Copy of move file from source. - Defaults to copy. +def compress_file(source: Path, target: Path) -> None: + """Compress file using zlib.""" + with open(source, "rb") as sf, open(target, "wb") as tf: + tf.write(zlib.compress(sf.read())) - Raises: - ValueError: - If action is not `copy`, `move` or `asis`. - Exception: - Error storing resource in the cache directory. - """ +def decompress_file(source: Path, target: Path) -> None: + """Decompress file using zlib.""" + with open(source, "rb") as sf, open(target, "wb") as tf: + tf.write(zlib.decompress(sf.read())) + +def copy_or_move( + source: Path, target: Path, rname: str, action: Literal["copy", "move", "asis"] = "copy", compress: bool = False +) -> None: + """Copy or move a resource.""" if action not in ["copy", "move", "asis"]: - raise ValueError(f"Action must be either 'move', 'copy' or 'asis', provided {action}.") + raise ValueError(f"Invalid action: {action}") try: if action == "copy": - copy2(source, target) + if compress: + compress_file(source, target) + else: + copy2(source, target) elif action == "move": - move(str(source), target) + if compress: + compress_file(source, target) + source.unlink() + else: + move(str(source), target) elif action == "asis": pass except Exception as e: - raise Exception( - f"Error storing resource: '{rname}' from: '{source}' in '{target}'.", - ) from e - - -def setup_logging(loglevel): - """Setup basic logging. - - Args: - loglevel (int): minimum loglevel for emitting messages - """ - logformat = "[%(asctime)s] %(levelname)s:%(name)s:%(message)s" - logging.basicConfig(level=loglevel, stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S") + raise BiocCacheError(f"Failed to store resource '{rname}' from '{source}' to '{target}'") from e diff --git a/tests/test_cache.py b/tests/test_cache.py index 941a8ed..8ce25ce 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -1,6 +1,7 @@ import os +import shutil -from pybiocfilecache.BiocFileCache import BiocFileCache +from pybiocfilecache import BiocFileCache __author__ = "jkanche" __copyright__ = "jkanche" @@ -19,8 +20,10 @@ def test_create_cache(): def test_add_get_operations(): bfc = BiocFileCache(CACHE_DIR) - bfc.add("test1", os.getcwd() + "/tests/data/test1.txt") + rtrip = bfc.add("test1", os.getcwd() + "/tests/data/test1.txt") + print("rtrip: ", rtrip) rec1 = bfc.get("test1") + print("rec1: ", rec1) assert rec1 is not None bfc.add("test2", os.getcwd() + "/tests/data/test2.txt") @@ -33,15 +36,16 @@ def test_add_get_operations(): frec2 = open(rec2.rpath, "r").read().strip() assert frec2 == "test2" - bfc.add("test3_asis", os.getcwd() + "/tests/data/test2.txt", action="asis") + shutil.copy(os.getcwd() + "/tests/data/test2.txt", os.getcwd() + "/tests/data/test3.txt") + bfc.add("test3_asis", os.getcwd() + "/tests/data/test3.txt", action="asis") rec3 = bfc.get("test3_asis") assert rec3 is not None - assert rec3.rpath == os.getcwd() + "/tests/data/test2.txt" + assert rec3.rpath == os.getcwd() + "/tests/data/test3.txt" frec3 = open(rec3.rpath, "r").read().strip() assert frec3 == "test2" - rtrip = bfc.list_all() + rtrip = bfc.list_resources() assert len(rtrip) == 3 bfc.purge()