Skip to content
Open
1 change: 1 addition & 0 deletions changes/3638.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add methods for reading stored objects as bytes and JSON-decoded bytes to store classes.
212 changes: 210 additions & 2 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from __future__ import annotations

import asyncio
import json
from abc import ABC, abstractmethod
from asyncio import gather
from dataclasses import dataclass
from itertools import starmap
from typing import TYPE_CHECKING, Literal, Protocol, runtime_checkable

from zarr.core.sync import sync

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, AsyncIterator, Iterable
from types import TracebackType
Expand Down Expand Up @@ -206,6 +209,211 @@ async def get(
"""
...

async def get_bytes(
self, key: str, *, prototype: BufferPrototype, byte_range: ByteRequest | None = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the output of this method is a bytes object (CPU memory), I don't see the value of specifying a prototype in this method. can anyone suggest a situation where specifying the prototype would be useful here?

cc @TomAugspurger

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree with you, that a prototype probably isn't needed. But I'd suggest including it for consistency with the other methods.

) -> bytes:
"""
Retrieve raw bytes from the store asynchronously.

This is a convenience method that wraps ``get()`` and converts the result
to bytes. Use this when you need the raw byte content of a stored value.

Parameters
----------
key : str
The key identifying the data to retrieve.
prototype : BufferPrototype
The buffer prototype to use for reading the data.
byte_range : ByteRequest, optional
If specified, only retrieve a portion of the stored data.
Can be a ``RangeByteRequest``, ``OffsetByteRequest``, or ``SuffixByteRequest``.

Returns
-------
bytes
The raw bytes stored at the given key.

Raises
------
FileNotFoundError
If the key does not exist in the store.

See Also
--------
get : Lower-level method that returns a Buffer object.
get_bytes : Synchronous version of this method.
get_json : Asynchronous method for retrieving and parsing JSON data.

Examples
--------
>>> store = await MemoryStore.open()
>>> await store.set("data", Buffer.from_bytes(b"hello world"))
>>> data = await store.get_bytes("data", prototype=default_buffer_prototype())
>>> print(data)
b'hello world'
"""
buffer = await self.get(key, prototype, byte_range)
if buffer is None:
raise FileNotFoundError(key)
return buffer.to_bytes()

def get_bytes_sync(
self, key: str = "", *, prototype: BufferPrototype, byte_range: ByteRequest | None = None
) -> bytes:
"""
Retrieve raw bytes from the store synchronously.

This is a synchronous wrapper around ``get_bytes()``. It should only
be called from non-async code. For async contexts, use ``get_bytes()``
instead.

Parameters
----------
key : str, optional
The key identifying the data to retrieve. Defaults to an empty string.
prototype : BufferPrototype
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll want to align this with what we do in the async version.

The buffer prototype to use for reading the data.
byte_range : ByteRequest, optional
If specified, only retrieve a portion of the stored data.
Can be a ``RangeByteRequest``, ``OffsetByteRequest``, or ``SuffixByteRequest``.

Returns
-------
bytes
The raw bytes stored at the given key.

Raises
------
FileNotFoundError
If the key does not exist in the store.

Warnings
--------
Do not call this method from async functions. Use ``get_bytes()`` instead
to avoid blocking the event loop.

See Also
--------
get_bytes : Asynchronous version of this method.
get_json_sync : Synchronous method for retrieving and parsing JSON data.

Examples
--------
>>> store = MemoryStore()
>>> await store.set("data", Buffer.from_bytes(b"hello world"))
>>> data = store.get_bytes_sync("data", prototype=default_buffer_prototype())
>>> print(data)
b'hello world'
"""

return sync(self.get_bytes(key, prototype=prototype, byte_range=byte_range))

async def get_json(
self, key: str, *, prototype: BufferPrototype, byte_range: ByteRequest | None = None
) -> Any:
"""
Retrieve and parse JSON data from the store asynchronously.

This is a convenience method that retrieves bytes from the store and
parses them as JSON.

Parameters
----------
key : str
The key identifying the JSON data to retrieve.
prototype : BufferPrototype
The buffer prototype to use for reading the data.
byte_range : ByteRequest, optional
If specified, only retrieve a portion of the stored data.
Can be a ``RangeByteRequest``, ``OffsetByteRequest``, or ``SuffixByteRequest``.
Note: Using byte ranges with JSON may result in invalid JSON.

Returns
-------
Any
The parsed JSON data. This follows the behavior of ``json.loads()`` and
can be any JSON-serializable type: dict, list, str, int, float, bool, or None.

Raises
------
FileNotFoundError
If the key does not exist in the store.
json.JSONDecodeError
If the stored data is not valid JSON.

See Also
--------
get_bytes : Method for retrieving raw bytes.
get_json_sync : Synchronous version of this method.

Examples
--------
>>> store = await MemoryStore.open()
>>> metadata = {"zarr_format": 3, "node_type": "array"}
>>> await store.set("zarr.json", Buffer.from_bytes(json.dumps(metadata).encode()))
>>> data = await store.get_json("zarr.json", prototype=default_buffer_prototype())
>>> print(data)
{'zarr_format': 3, 'node_type': 'array'}
"""

return json.loads(await self.get_bytes(key, prototype=prototype, byte_range=byte_range))

def get_json_sync(
self, key: str = "", *, prototype: BufferPrototype, byte_range: ByteRequest | None = None
) -> Any:
"""
Retrieve and parse JSON data from the store synchronously.

This is a synchronous wrapper around ``get_json()``. It should only
be called from non-async code. For async contexts, use ``get_json()``
instead.

Parameters
----------
key : str, optional
The key identifying the JSON data to retrieve. Defaults to an empty string.
prototype : BufferPrototype
The buffer prototype to use for reading the data.
byte_range : ByteRequest, optional
If specified, only retrieve a portion of the stored data.
Can be a ``RangeByteRequest``, ``OffsetByteRequest``, or ``SuffixByteRequest``.
Note: Using byte ranges with JSON may result in invalid JSON.

Returns
-------
Any
The parsed JSON data. This follows the behavior of ``json.loads()`` and
can be any JSON-serializable type: dict, list, str, int, float, bool, or None.

Raises
------
FileNotFoundError
If the key does not exist in the store.
json.JSONDecodeError
If the stored data is not valid JSON.

Warnings
--------
Do not call this method from async functions. Use ``get_json()`` instead
to avoid blocking the event loop.

See Also
--------
get_json : Asynchronous version of this method.
get_bytes_sync : Synchronous method for retrieving raw bytes without parsing.

Examples
--------
>>> store = MemoryStore()
>>> metadata = {"zarr_format": 3, "node_type": "array"}
>>> store.set("zarr.json", Buffer.from_bytes(json.dumps(metadata).encode()))
>>> data = store.get_json_sync("zarr.json", prototype=default_buffer_prototype())
>>> print(data)
{'zarr_format': 3, 'node_type': 'array'}
"""

return sync(self.get_json(key, prototype=prototype, byte_range=byte_range))

@abstractmethod
async def get_partial_values(
self,
Expand Down Expand Up @@ -278,7 +486,7 @@ async def _set_many(self, values: Iterable[tuple[str, Buffer]]) -> None:
"""
Insert multiple (key, value) pairs into storage.
"""
await gather(*starmap(self.set, values))
await asyncio.gather(*starmap(self.set, values))

@property
def supports_consolidated_metadata(self) -> bool:
Expand Down
Loading