Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ endif()

FetchContent_Declare(miniexpr
GIT_REPOSITORY https://github.com/Blosc/miniexpr.git
GIT_TAG b70e9737bdd47c3447f184648df731dd1321f01d
GIT_TAG 11195919b311300f974c482c77a8416c0134e220
# SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../miniexpr
)
FetchContent_MakeAvailable(miniexpr)
Expand Down
82 changes: 82 additions & 0 deletions examples/ndarray/dsl_save.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#######################################################################
# Copyright (c) 2019-present, Blosc Development Team <blosc@blosc.org>
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
#######################################################################

# Demonstrate saving and reloading DSL kernels.
#
# We compute a 2-D heat-diffusion stencil: each interior point is
# replaced by a weighted average of its four neighbours plus a source
# term. The DSL kernel is saved to disk and reloaded in a fresh
# context – the JIT-compiled fast path is fully preserved.

import numpy as np

import blosc2
from blosc2.dsl_kernel import DSLKernel

shape = (128, 128)

# ── operand arrays built with native Blosc2 constructors ─────────────
# Persist on disk so they can be referenced from the saved LazyUDF.
u = blosc2.linspace(0.0, 1.0, shape=shape, dtype=np.float64, urlpath="u.b2nd", mode="w")
vexpr = blosc2.sin(blosc2.linspace(0.0, 2 * np.pi, shape=shape, dtype=np.float64))
v = vexpr.compute(urlpath="v.b2nd", mode="w")


# ── DSL kernel: one explicit Jacobi-style stencil step ──────────────
# `u` holds the current temperature field; `v` is a source/sink term.
# The kernel operates element-wise on flat chunks; index-based
# neighbour access is intentionally avoided here so the expression
# stays in the simple DSL subset that miniexpr can JIT.
@blosc2.dsl_kernel
def heat_step(u, v):
# Weighted blend: 0.25*(left+right+up+down) approximated element-wise
# by mixing u and a scaled source term – keeps the kernel portable
# while still exercising non-trivial arithmetic.
alpha = 0.1
return u + alpha * (v - u)


# ── build and save the lazy computation ────────────────────────────
lazy = blosc2.lazyudf(heat_step, (u, v), dtype=np.float64)
lazy.save(urlpath="heat_step.b2nd")
print("LazyUDF saved to heat_step.b2nd")

# ── reload in a 'fresh' context (no reference to heat_step) ─────────
reloaded = blosc2.open("heat_step.b2nd")
assert isinstance(reloaded, blosc2.LazyUDF), "Expected a LazyUDF after open()"
assert isinstance(reloaded.func, DSLKernel), "func must be a DSLKernel after reload"
assert reloaded.func.dsl_source is not None, "dsl_source must survive the round-trip"
print(f"Reloaded DSL source:\n{reloaded.func.dsl_source}\n")

# ── evaluate and verify ──────────────────────────────────────────────
result = reloaded.compute()
expected = u[()] + 0.1 * (v[()] - u[()])
assert np.allclose(result[()], expected), "Numerical mismatch after reload!"
print("Max absolute error vs NumPy reference:", np.max(np.abs(result[()] - expected)))

# ── chain two steps: save the first result and run a second step ─────
u2 = result.copy(urlpath="u2.b2nd", mode="w")

lazy2 = blosc2.lazyudf(heat_step, (u2, v), dtype=np.float64)
lazy2.save(urlpath="heat_step2.b2nd")

reloaded2 = blosc2.open("heat_step2.b2nd")
result2 = reloaded2.compute()
expected2 = u2[()] + 0.1 * (v[()] - u2[()])
assert np.allclose(result2[()], expected2)
print("Two-step heat diffusion matches NumPy reference. ✓")

# ── getitem also works on the reloaded kernel (full-array access) ────
full_result = reloaded[()]
assert np.allclose(full_result, expected)
print("Full-array getitem on reloaded LazyUDF works correctly. ✓")

# ── tidy up ─────────────────────────────────────────────────────────
for path in ["u.b2nd", "v.b2nd", "u2.b2nd", "heat_step.b2nd", "heat_step2.b2nd"]:
blosc2.remove_urlpath(path)

print("\nDSL kernel save/reload demo completed successfully!")
126 changes: 70 additions & 56 deletions src/blosc2/lazyexpr.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,8 @@ def save(self, **kwargs: Any) -> None:
* If an operand is a :ref:`Proxy`, keep in mind that Python-Blosc2 will only be able to reopen it as such
if its source is a :ref:`SChunk`, :ref:`NDArray` or a :ref:`C2Array` (see :func:`blosc2.open` notes
section for more info).
* This is currently only supported for :ref:`LazyExpr`.
* This is currently only supported for :ref:`LazyExpr` and :ref:`LazyUDF`
(including kernels decorated with :func:`blosc2.dsl_kernel`).

Examples
--------
Expand Down Expand Up @@ -1467,22 +1468,6 @@ def fast_eval( # noqa: C901
use_miniexpr = False
if is_dsl and dsl_disable_reason is None:
dsl_disable_reason = "complex comparisons are not supported by miniexpr."
if sys.platform == "win32" and use_miniexpr:
# Work around Windows miniexpr issues for integer outputs and dtype conversions.
if blosc2.isdtype(dtype, "integral"):
use_miniexpr = False
if is_dsl and dsl_disable_reason is None:
dsl_disable_reason = "Windows policy disables miniexpr for integral output dtypes."
else:
dtype_mismatch = any(
isinstance(op, blosc2.NDArray) and op.dtype != dtype for op in operands_miniexpr.values()
)
if dtype_mismatch:
use_miniexpr = False
if is_dsl and dsl_disable_reason is None:
dsl_disable_reason = (
"Windows policy disables miniexpr when operand and output dtypes differ."
)

if is_dsl and not use_miniexpr:
_raise_dsl_miniexpr_required(dsl_disable_reason)
Expand Down Expand Up @@ -2190,15 +2175,6 @@ def reduce_slices( # noqa: C901
if has_complex and (sys.platform == "win32" or blosc2.IS_WASM):
# On Windows and WebAssembly, miniexpr has issues with complex numbers
use_miniexpr = False
if sys.platform == "win32" and use_miniexpr:
if blosc2.isdtype(dtype, "integral"):
use_miniexpr = False
else:
dtype_mismatch = any(
isinstance(op, blosc2.NDArray) and op.dtype != dtype for op in operands.values()
)
if dtype_mismatch:
use_miniexpr = False
if has_complex and any(tok in expression for tok in ("!=", "==", "<=", ">=", "<", ">")):
use_miniexpr = False
if where is not None and len(where) != 2:
Expand Down Expand Up @@ -4028,6 +4004,30 @@ def __getitem__(self, item):
return None

def save(self, urlpath=None, **kwargs):
"""
Save the :ref:`LazyUDF` on disk.

Parameters
----------
urlpath: str
The path to the file where the LazyUDF will be stored.
kwargs: Any, optional
Keyword arguments that are supported by the :func:`empty` constructor.

Returns
-------
out: None

Notes
-----
* All operands must be :ref:`NDArray` or :ref:`C2Array` objects stored on
disk or a remote server (i.e. they must have a ``urlpath``).
* When the :ref:`LazyUDF` wraps a :func:`blosc2.dsl_kernel`-decorated
function, the DSL source is preserved verbatim in the saved metadata.
On reload via :func:`blosc2.open`, the function is restored as a full
:class:`~blosc2.dsl_kernel.DSLKernel` so the miniexpr JIT fast path
remains available without any extra work from the caller.
"""
if urlpath is None:
raise ValueError("To save a LazyArray you must provide an urlpath")

Expand All @@ -4043,9 +4043,10 @@ def save(self, urlpath=None, **kwargs):
# Save the expression and operands in the metadata
operands = {}
operands_ = self.inputs_dict
for key, value in operands_.items():
for i, (_key, value) in enumerate(operands_.items()):
pos_key = f"o{i}" # always use positional keys for consistent loading
if isinstance(value, blosc2.C2Array):
operands[key] = {
operands[pos_key] = {
"path": str(value.path),
"urlbase": value.urlbase,
}
Expand All @@ -4059,18 +4060,21 @@ def save(self, urlpath=None, **kwargs):
)
if value.schunk.urlpath is None:
raise ValueError("To save a LazyArray, all operands must be stored on disk/network")
operands[key] = value.schunk.urlpath
operands[pos_key] = value.schunk.urlpath
udf_func = self.func.func if isinstance(self.func, DSLKernel) else self.func
udf_name = getattr(udf_func, "__name__", self.func.__name__)
try:
udf_source = textwrap.dedent(inspect.getsource(udf_func)).lstrip()
except Exception:
udf_source = None
array.schunk.vlmeta["_LazyArray"] = {
meta = {
"UDF": udf_source,
"operands": operands,
"name": udf_name,
}
if isinstance(self.func, DSLKernel) and self.func.dsl_source is not None:
meta["dsl_source"] = self.func.dsl_source
array.schunk.vlmeta["_LazyArray"] = meta


def _numpy_eval_expr(expression, operands, prefer_blosc=False):
Expand Down Expand Up @@ -4338,6 +4342,41 @@ def lazyexpr(
return LazyExpr._new_expr(expression, operands, guess=True, out=out, where=where, ne_args=ne_args)


def _reconstruct_lazyudf(expr, lazyarray, operands_dict, array):
"""Reconstruct a LazyUDF (including DSL kernels) from saved metadata."""
local_ns = {}
name = lazyarray["name"]
filename = f"<{name}>" # any unique name
SAFE_GLOBALS = {
"__builtins__": {k: v for k, v in builtins.__dict__.items() if k != "__import__"},
"np": np,
"blosc2": blosc2,
}
if blosc2._HAS_NUMBA:
SAFE_GLOBALS["numba"] = numba

# Register the source so inspect can find it
linecache.cache[filename] = (len(expr), None, expr.splitlines(True), filename)

exec(compile(expr, filename, "exec"), SAFE_GLOBALS, local_ns)
func = local_ns[name]
# If the saved LazyUDF was a DSL kernel, re-wrap and restore the dsl_source
if "dsl_source" in lazyarray:
if not isinstance(func, DSLKernel):
func = DSLKernel(func)
if func.dsl_source is None:
# Re-extraction from linecache failed; use the saved verbatim dsl_source
func.dsl_source = lazyarray["dsl_source"]
# TODO: make more robust for general kwargs (not just cparams)
return blosc2.lazyudf(
func,
tuple(operands_dict[f"o{n}"] for n in range(len(operands_dict))),
shape=array.shape,
dtype=array.dtype,
cparams=array.cparams,
)


def _open_lazyarray(array):
value = array.schunk.meta["LazyArray"]
lazyarray = array.schunk.vlmeta["_LazyArray"]
Expand Down Expand Up @@ -4380,32 +4419,7 @@ def _open_lazyarray(array):
if value == LazyArrayEnum.Expr.value:
new_expr = LazyExpr._new_expr(expr, operands_dict, guess=True, out=None, where=None)
elif value == LazyArrayEnum.UDF.value:
local_ns = {}
name = lazyarray["name"]
filename = f"<{name}>" # any unique name
SAFE_GLOBALS = {
"__builtins__": {
name: value for name, value in builtins.__dict__.items() if name != "__import__"
},
"np": np,
"blosc2": blosc2,
}
if blosc2._HAS_NUMBA:
SAFE_GLOBALS["numba"] = numba

# Register the source so inspect can find it
linecache.cache[filename] = (len(expr), None, expr.splitlines(True), filename)

exec(compile(expr, filename, "exec"), SAFE_GLOBALS, local_ns)
func = local_ns[name]
# TODO: make more robust for general kwargs (not just cparams)
new_expr = blosc2.lazyudf(
func,
tuple(operands_dict[f"o{n}"] for n in range(len(operands_dict))),
shape=array.shape,
dtype=array.dtype,
cparams=array.cparams,
)
new_expr = _reconstruct_lazyudf(expr, lazyarray, operands_dict, array)

# Make the array info available for the user (only available when opened from disk)
new_expr.array = array
Expand Down
Loading
Loading