Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sqlspec/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ def _read_file_content(self, path: str | Path) -> str:
if file_path and len(file_path) > 2 and file_path[2] == ":": # noqa: PLR2004
file_path = file_path[1:]
filename = Path(file_path).name
return backend.read_text(filename, encoding=self.encoding)
return backend.read_text(path_str, encoding=self.encoding)
return backend.read_text_sync(filename, encoding=self.encoding)
return backend.read_text_sync(path_str, encoding=self.encoding)
except KeyError as e:
raise SQLFileNotFoundError(path_str) from e
except FileNotFoundInStorageError as e:
Expand Down
73 changes: 38 additions & 35 deletions sqlspec/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,83 +476,86 @@ def get_data(self) -> Any: ...

@runtime_checkable
class ObjectStoreProtocol(Protocol):
"""Protocol for object storage operations."""
"""Protocol for object storage operations.

All synchronous methods use the `*_sync` suffix for consistency with async methods.
"""

protocol: str
backend_type: str

def __init__(self, uri: str, **kwargs: Any) -> None:
return

def read_bytes(self, path: "str | Path", **kwargs: Any) -> bytes:
"""Read bytes from an object."""
def read_bytes_sync(self, path: "str | Path", **kwargs: Any) -> bytes:
"""Read bytes from an object synchronously."""
return b""

def write_bytes(self, path: "str | Path", data: bytes, **kwargs: Any) -> None:
"""Write bytes to an object."""
def write_bytes_sync(self, path: "str | Path", data: bytes, **kwargs: Any) -> None:
"""Write bytes to an object synchronously."""
return

def read_text(self, path: "str | Path", encoding: str = "utf-8", **kwargs: Any) -> str:
"""Read text from an object."""
def read_text_sync(self, path: "str | Path", encoding: str = "utf-8", **kwargs: Any) -> str:
"""Read text from an object synchronously."""
return ""

def write_text(self, path: "str | Path", data: str, encoding: str = "utf-8", **kwargs: Any) -> None:
"""Write text to an object."""
def write_text_sync(self, path: "str | Path", data: str, encoding: str = "utf-8", **kwargs: Any) -> None:
"""Write text to an object synchronously."""
return

def exists(self, path: "str | Path", **kwargs: Any) -> bool:
"""Check if an object exists."""
def exists_sync(self, path: "str | Path", **kwargs: Any) -> bool:
"""Check if an object exists synchronously."""
return False

def delete(self, path: "str | Path", **kwargs: Any) -> None:
"""Delete an object."""
def delete_sync(self, path: "str | Path", **kwargs: Any) -> None:
"""Delete an object synchronously."""
return

def copy(self, source: "str | Path", destination: "str | Path", **kwargs: Any) -> None:
"""Copy an object."""
def copy_sync(self, source: "str | Path", destination: "str | Path", **kwargs: Any) -> None:
"""Copy an object synchronously."""
return

def move(self, source: "str | Path", destination: "str | Path", **kwargs: Any) -> None:
"""Move an object."""
def move_sync(self, source: "str | Path", destination: "str | Path", **kwargs: Any) -> None:
"""Move an object synchronously."""
return

def list_objects(self, prefix: str = "", recursive: bool = True, **kwargs: Any) -> list[str]:
"""List objects with optional prefix."""
def list_objects_sync(self, prefix: str = "", recursive: bool = True, **kwargs: Any) -> list[str]:
"""List objects with optional prefix synchronously."""
return []

def glob(self, pattern: str, **kwargs: Any) -> list[str]:
"""Find objects matching a glob pattern."""
def glob_sync(self, pattern: str, **kwargs: Any) -> list[str]:
"""Find objects matching a glob pattern synchronously."""
return []

def is_object(self, path: "str | Path") -> bool:
"""Check if path points to an object."""
def is_object_sync(self, path: "str | Path") -> bool:
"""Check if path points to an object synchronously."""
return False

def is_path(self, path: "str | Path") -> bool:
"""Check if path points to a prefix (directory-like)."""
def is_path_sync(self, path: "str | Path") -> bool:
"""Check if path points to a prefix (directory-like) synchronously."""
return False

def get_metadata(self, path: "str | Path", **kwargs: Any) -> dict[str, object]:
"""Get object metadata."""
def get_metadata_sync(self, path: "str | Path", **kwargs: Any) -> dict[str, object]:
"""Get object metadata synchronously."""
return {}

def read_arrow(self, path: "str | Path", **kwargs: Any) -> "ArrowTable":
"""Read an Arrow table from storage."""
def read_arrow_sync(self, path: "str | Path", **kwargs: Any) -> "ArrowTable":
"""Read an Arrow table from storage synchronously."""
msg = "Arrow reading not implemented"
raise NotImplementedError(msg)

def write_arrow(self, path: "str | Path", table: "ArrowTable", **kwargs: Any) -> None:
"""Write an Arrow table to storage."""
def write_arrow_sync(self, path: "str | Path", table: "ArrowTable", **kwargs: Any) -> None:
"""Write an Arrow table to storage synchronously."""
msg = "Arrow writing not implemented"
raise NotImplementedError(msg)

def stream_arrow(self, pattern: str, **kwargs: Any) -> "Iterator[ArrowRecordBatch]":
"""Stream Arrow record batches from matching objects."""
def stream_arrow_sync(self, pattern: str, **kwargs: Any) -> "Iterator[ArrowRecordBatch]":
"""Stream Arrow record batches from matching objects synchronously."""
msg = "Arrow streaming not implemented"
raise NotImplementedError(msg)

def stream_read(self, path: "str | Path", chunk_size: "int | None" = None, **kwargs: Any) -> "Iterator[bytes]":
"""Stream bytes from an object."""
def stream_read_sync(self, path: "str | Path", chunk_size: "int | None" = None, **kwargs: Any) -> "Iterator[bytes]":
"""Stream bytes from an object synchronously."""
msg = "Stream reading not implemented"
raise NotImplementedError(msg)

Expand Down
120 changes: 79 additions & 41 deletions sqlspec/storage/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing_extensions import Self

from sqlspec.typing import ArrowRecordBatch, ArrowTable
from sqlspec.utils.sync_tools import CapacityLimiter

__all__ = (
"AsyncArrowBatchIterator",
Expand All @@ -17,8 +18,38 @@
"AsyncObStoreStreamIterator",
"AsyncThreadedBytesIterator",
"ObjectStoreBase",
"storage_limiter",
)

# Dedicated capacity limiter for storage I/O operations (100 concurrent ops)
# This is shared across all storage backends to prevent overwhelming the system
storage_limiter = CapacityLimiter(100)


class _ExhaustedSentinel:
"""Sentinel value to signal iterator exhaustion across thread boundaries.

StopIteration cannot be raised into asyncio Futures, so we use this sentinel
to signal iterator exhaustion from the thread pool back to the async context.
"""

__slots__ = ()


_EXHAUSTED = _ExhaustedSentinel()


def _next_or_sentinel(iterator: "Iterator[Any]") -> "Any":
"""Get next item or return sentinel if exhausted.

This helper wraps next() to catch StopIteration in the thread,
since StopIteration cannot propagate through asyncio Futures.
"""
try:
return next(iterator)
except StopIteration:
return _EXHAUSTED


class AsyncArrowBatchIterator:
"""Async iterator wrapper for sync Arrow batch iterators.
Expand Down Expand Up @@ -47,16 +78,19 @@ def __aiter__(self) -> "AsyncArrowBatchIterator":
async def __anext__(self) -> "ArrowRecordBatch":
"""Get the next item from the iterator asynchronously.

Uses asyncio.to_thread to offload the blocking next() call
to a thread pool, preventing event loop blocking.

Returns:
The next Arrow record batch.

Raises:
StopAsyncIteration: When the iterator is exhausted.
"""
try:
return next(self._sync_iter)
except StopIteration:
raise StopAsyncIteration from None
result = await asyncio.to_thread(_next_or_sentinel, self._sync_iter)
if result is _EXHAUSTED:
raise StopAsyncIteration
return cast("ArrowRecordBatch", result)


class AsyncBytesIterator:
Expand Down Expand Up @@ -309,93 +343,97 @@ async def __anext__(self) -> bytes:

@mypyc_attr(allow_interpreted_subclasses=True)
class ObjectStoreBase(ABC):
"""Base class for storage backends."""
"""Base class for storage backends.

All synchronous methods follow the *_sync naming convention for consistency
with their async counterparts.
"""

__slots__ = ()

@abstractmethod
def read_bytes(self, path: str, **kwargs: Any) -> bytes:
"""Read bytes from storage."""
def read_bytes_sync(self, path: str, **kwargs: Any) -> bytes:
"""Read bytes from storage synchronously."""
raise NotImplementedError

@abstractmethod
def write_bytes(self, path: str, data: bytes, **kwargs: Any) -> None:
"""Write bytes to storage."""
def write_bytes_sync(self, path: str, data: bytes, **kwargs: Any) -> None:
"""Write bytes to storage synchronously."""
raise NotImplementedError

@abstractmethod
def stream_read(self, path: str, chunk_size: "int | None" = None, **kwargs: Any) -> Iterator[bytes]:
"""Stream bytes from storage."""
def stream_read_sync(self, path: str, chunk_size: "int | None" = None, **kwargs: Any) -> Iterator[bytes]:
"""Stream bytes from storage synchronously."""
raise NotImplementedError

@abstractmethod
def read_text(self, path: str, encoding: str = "utf-8", **kwargs: Any) -> str:
"""Read text from storage."""
def read_text_sync(self, path: str, encoding: str = "utf-8", **kwargs: Any) -> str:
"""Read text from storage synchronously."""
raise NotImplementedError

@abstractmethod
def write_text(self, path: str, data: str, encoding: str = "utf-8", **kwargs: Any) -> None:
"""Write text to storage."""
def write_text_sync(self, path: str, data: str, encoding: str = "utf-8", **kwargs: Any) -> None:
"""Write text to storage synchronously."""
raise NotImplementedError

@abstractmethod
def list_objects(self, prefix: str = "", recursive: bool = True, **kwargs: Any) -> "list[str]":
"""List objects in storage."""
def list_objects_sync(self, prefix: str = "", recursive: bool = True, **kwargs: Any) -> "list[str]":
"""List objects in storage synchronously."""
raise NotImplementedError

@abstractmethod
def exists(self, path: str, **kwargs: Any) -> bool:
"""Check if object exists in storage."""
def exists_sync(self, path: str, **kwargs: Any) -> bool:
"""Check if object exists in storage synchronously."""
raise NotImplementedError

@abstractmethod
def delete(self, path: str, **kwargs: Any) -> None:
"""Delete object from storage."""
def delete_sync(self, path: str, **kwargs: Any) -> None:
"""Delete object from storage synchronously."""
raise NotImplementedError

@abstractmethod
def copy(self, source: str, destination: str, **kwargs: Any) -> None:
"""Copy object within storage."""
def copy_sync(self, source: str, destination: str, **kwargs: Any) -> None:
"""Copy object within storage synchronously."""
raise NotImplementedError

@abstractmethod
def move(self, source: str, destination: str, **kwargs: Any) -> None:
"""Move object within storage."""
def move_sync(self, source: str, destination: str, **kwargs: Any) -> None:
"""Move object within storage synchronously."""
raise NotImplementedError

@abstractmethod
def glob(self, pattern: str, **kwargs: Any) -> "list[str]":
"""Find objects matching pattern."""
def glob_sync(self, pattern: str, **kwargs: Any) -> "list[str]":
"""Find objects matching pattern synchronously."""
raise NotImplementedError

@abstractmethod
def get_metadata(self, path: str, **kwargs: Any) -> "dict[str, object]":
"""Get object metadata from storage."""
def get_metadata_sync(self, path: str, **kwargs: Any) -> "dict[str, object]":
"""Get object metadata from storage synchronously."""
raise NotImplementedError

@abstractmethod
def is_object(self, path: str) -> bool:
"""Check if path points to an object."""
def is_object_sync(self, path: str) -> bool:
"""Check if path points to an object synchronously."""
raise NotImplementedError

@abstractmethod
def is_path(self, path: str) -> bool:
"""Check if path points to a directory."""
def is_path_sync(self, path: str) -> bool:
"""Check if path points to a directory synchronously."""
raise NotImplementedError

@abstractmethod
def read_arrow(self, path: str, **kwargs: Any) -> ArrowTable:
"""Read Arrow table from storage."""
def read_arrow_sync(self, path: str, **kwargs: Any) -> ArrowTable:
"""Read Arrow table from storage synchronously."""
raise NotImplementedError

@abstractmethod
def write_arrow(self, path: str, table: ArrowTable, **kwargs: Any) -> None:
"""Write Arrow table to storage."""
def write_arrow_sync(self, path: str, table: ArrowTable, **kwargs: Any) -> None:
"""Write Arrow table to storage synchronously."""
raise NotImplementedError

@abstractmethod
def stream_arrow(self, pattern: str, **kwargs: Any) -> Iterator[ArrowRecordBatch]:
"""Stream Arrow record batches from storage."""
def stream_arrow_sync(self, pattern: str, **kwargs: Any) -> Iterator[ArrowRecordBatch]:
"""Stream Arrow record batches from storage synchronously."""
raise NotImplementedError

@abstractmethod
Expand Down Expand Up @@ -426,7 +464,7 @@ async def stream_read_async(
raise NotImplementedError

@abstractmethod
def list_objects_async(self, prefix: str = "", recursive: bool = True, **kwargs: Any) -> "list[str]":
async def list_objects_async(self, prefix: str = "", recursive: bool = True, **kwargs: Any) -> "list[str]":
"""List objects in storage asynchronously."""
raise NotImplementedError

Expand All @@ -451,7 +489,7 @@ async def move_async(self, source: str, destination: str, **kwargs: Any) -> None
raise NotImplementedError

@abstractmethod
def get_metadata_async(self, path: str, **kwargs: Any) -> "dict[str, object]":
async def get_metadata_async(self, path: str, **kwargs: Any) -> "dict[str, object]":
"""Get object metadata from storage asynchronously."""
raise NotImplementedError

Expand Down
Loading
Loading