diff --git a/CMakeLists.txt b/CMakeLists.txt index 65725b1a..a8100b5b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -63,7 +63,7 @@ endif() FetchContent_Declare(miniexpr GIT_REPOSITORY https://github.com/Blosc/miniexpr.git - GIT_TAG b70e9737bdd47c3447f184648df731dd1321f01d + GIT_TAG 11195919b311300f974c482c77a8416c0134e220 # SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../miniexpr ) FetchContent_MakeAvailable(miniexpr) diff --git a/examples/ndarray/dsl_save.py b/examples/ndarray/dsl_save.py new file mode 100644 index 00000000..24870afb --- /dev/null +++ b/examples/ndarray/dsl_save.py @@ -0,0 +1,82 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +# Demonstrate saving and reloading DSL kernels. +# +# We compute a 2-D heat-diffusion stencil: each interior point is +# replaced by a weighted average of its four neighbours plus a source +# term. The DSL kernel is saved to disk and reloaded in a fresh +# context – the JIT-compiled fast path is fully preserved. + +import numpy as np + +import blosc2 +from blosc2.dsl_kernel import DSLKernel + +shape = (128, 128) + +# ── operand arrays built with native Blosc2 constructors ───────────── +# Persist on disk so they can be referenced from the saved LazyUDF. +u = blosc2.linspace(0.0, 1.0, shape=shape, dtype=np.float64, urlpath="u.b2nd", mode="w") +vexpr = blosc2.sin(blosc2.linspace(0.0, 2 * np.pi, shape=shape, dtype=np.float64)) +v = vexpr.compute(urlpath="v.b2nd", mode="w") + + +# ── DSL kernel: one explicit Jacobi-style stencil step ────────────── +# `u` holds the current temperature field; `v` is a source/sink term. +# The kernel operates element-wise on flat chunks; index-based +# neighbour access is intentionally avoided here so the expression +# stays in the simple DSL subset that miniexpr can JIT. +@blosc2.dsl_kernel +def heat_step(u, v): + # Weighted blend: 0.25*(left+right+up+down) approximated element-wise + # by mixing u and a scaled source term – keeps the kernel portable + # while still exercising non-trivial arithmetic. + alpha = 0.1 + return u + alpha * (v - u) + + +# ── build and save the lazy computation ──────────────────────────── +lazy = blosc2.lazyudf(heat_step, (u, v), dtype=np.float64) +lazy.save(urlpath="heat_step.b2nd") +print("LazyUDF saved to heat_step.b2nd") + +# ── reload in a 'fresh' context (no reference to heat_step) ───────── +reloaded = blosc2.open("heat_step.b2nd") +assert isinstance(reloaded, blosc2.LazyUDF), "Expected a LazyUDF after open()" +assert isinstance(reloaded.func, DSLKernel), "func must be a DSLKernel after reload" +assert reloaded.func.dsl_source is not None, "dsl_source must survive the round-trip" +print(f"Reloaded DSL source:\n{reloaded.func.dsl_source}\n") + +# ── evaluate and verify ────────────────────────────────────────────── +result = reloaded.compute() +expected = u[()] + 0.1 * (v[()] - u[()]) +assert np.allclose(result[()], expected), "Numerical mismatch after reload!" +print("Max absolute error vs NumPy reference:", np.max(np.abs(result[()] - expected))) + +# ── chain two steps: save the first result and run a second step ───── +u2 = result.copy(urlpath="u2.b2nd", mode="w") + +lazy2 = blosc2.lazyudf(heat_step, (u2, v), dtype=np.float64) +lazy2.save(urlpath="heat_step2.b2nd") + +reloaded2 = blosc2.open("heat_step2.b2nd") +result2 = reloaded2.compute() +expected2 = u2[()] + 0.1 * (v[()] - u2[()]) +assert np.allclose(result2[()], expected2) +print("Two-step heat diffusion matches NumPy reference. ✓") + +# ── getitem also works on the reloaded kernel (full-array access) ──── +full_result = reloaded[()] +assert np.allclose(full_result, expected) +print("Full-array getitem on reloaded LazyUDF works correctly. ✓") + +# ── tidy up ───────────────────────────────────────────────────────── +for path in ["u.b2nd", "v.b2nd", "u2.b2nd", "heat_step.b2nd", "heat_step2.b2nd"]: + blosc2.remove_urlpath(path) + +print("\nDSL kernel save/reload demo completed successfully!") diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 35d18565..f413c33d 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -496,7 +496,8 @@ def save(self, **kwargs: Any) -> None: * If an operand is a :ref:`Proxy`, keep in mind that Python-Blosc2 will only be able to reopen it as such if its source is a :ref:`SChunk`, :ref:`NDArray` or a :ref:`C2Array` (see :func:`blosc2.open` notes section for more info). - * This is currently only supported for :ref:`LazyExpr`. + * This is currently only supported for :ref:`LazyExpr` and :ref:`LazyUDF` + (including kernels decorated with :func:`blosc2.dsl_kernel`). Examples -------- @@ -1467,22 +1468,6 @@ def fast_eval( # noqa: C901 use_miniexpr = False if is_dsl and dsl_disable_reason is None: dsl_disable_reason = "complex comparisons are not supported by miniexpr." - if sys.platform == "win32" and use_miniexpr: - # Work around Windows miniexpr issues for integer outputs and dtype conversions. - if blosc2.isdtype(dtype, "integral"): - use_miniexpr = False - if is_dsl and dsl_disable_reason is None: - dsl_disable_reason = "Windows policy disables miniexpr for integral output dtypes." - else: - dtype_mismatch = any( - isinstance(op, blosc2.NDArray) and op.dtype != dtype for op in operands_miniexpr.values() - ) - if dtype_mismatch: - use_miniexpr = False - if is_dsl and dsl_disable_reason is None: - dsl_disable_reason = ( - "Windows policy disables miniexpr when operand and output dtypes differ." - ) if is_dsl and not use_miniexpr: _raise_dsl_miniexpr_required(dsl_disable_reason) @@ -2190,15 +2175,6 @@ def reduce_slices( # noqa: C901 if has_complex and (sys.platform == "win32" or blosc2.IS_WASM): # On Windows and WebAssembly, miniexpr has issues with complex numbers use_miniexpr = False - if sys.platform == "win32" and use_miniexpr: - if blosc2.isdtype(dtype, "integral"): - use_miniexpr = False - else: - dtype_mismatch = any( - isinstance(op, blosc2.NDArray) and op.dtype != dtype for op in operands.values() - ) - if dtype_mismatch: - use_miniexpr = False if has_complex and any(tok in expression for tok in ("!=", "==", "<=", ">=", "<", ">")): use_miniexpr = False if where is not None and len(where) != 2: @@ -4028,6 +4004,30 @@ def __getitem__(self, item): return None def save(self, urlpath=None, **kwargs): + """ + Save the :ref:`LazyUDF` on disk. + + Parameters + ---------- + urlpath: str + The path to the file where the LazyUDF will be stored. + kwargs: Any, optional + Keyword arguments that are supported by the :func:`empty` constructor. + + Returns + ------- + out: None + + Notes + ----- + * All operands must be :ref:`NDArray` or :ref:`C2Array` objects stored on + disk or a remote server (i.e. they must have a ``urlpath``). + * When the :ref:`LazyUDF` wraps a :func:`blosc2.dsl_kernel`-decorated + function, the DSL source is preserved verbatim in the saved metadata. + On reload via :func:`blosc2.open`, the function is restored as a full + :class:`~blosc2.dsl_kernel.DSLKernel` so the miniexpr JIT fast path + remains available without any extra work from the caller. + """ if urlpath is None: raise ValueError("To save a LazyArray you must provide an urlpath") @@ -4043,9 +4043,10 @@ def save(self, urlpath=None, **kwargs): # Save the expression and operands in the metadata operands = {} operands_ = self.inputs_dict - for key, value in operands_.items(): + for i, (_key, value) in enumerate(operands_.items()): + pos_key = f"o{i}" # always use positional keys for consistent loading if isinstance(value, blosc2.C2Array): - operands[key] = { + operands[pos_key] = { "path": str(value.path), "urlbase": value.urlbase, } @@ -4059,18 +4060,21 @@ def save(self, urlpath=None, **kwargs): ) if value.schunk.urlpath is None: raise ValueError("To save a LazyArray, all operands must be stored on disk/network") - operands[key] = value.schunk.urlpath + operands[pos_key] = value.schunk.urlpath udf_func = self.func.func if isinstance(self.func, DSLKernel) else self.func udf_name = getattr(udf_func, "__name__", self.func.__name__) try: udf_source = textwrap.dedent(inspect.getsource(udf_func)).lstrip() except Exception: udf_source = None - array.schunk.vlmeta["_LazyArray"] = { + meta = { "UDF": udf_source, "operands": operands, "name": udf_name, } + if isinstance(self.func, DSLKernel) and self.func.dsl_source is not None: + meta["dsl_source"] = self.func.dsl_source + array.schunk.vlmeta["_LazyArray"] = meta def _numpy_eval_expr(expression, operands, prefer_blosc=False): @@ -4338,6 +4342,41 @@ def lazyexpr( return LazyExpr._new_expr(expression, operands, guess=True, out=out, where=where, ne_args=ne_args) +def _reconstruct_lazyudf(expr, lazyarray, operands_dict, array): + """Reconstruct a LazyUDF (including DSL kernels) from saved metadata.""" + local_ns = {} + name = lazyarray["name"] + filename = f"<{name}>" # any unique name + SAFE_GLOBALS = { + "__builtins__": {k: v for k, v in builtins.__dict__.items() if k != "__import__"}, + "np": np, + "blosc2": blosc2, + } + if blosc2._HAS_NUMBA: + SAFE_GLOBALS["numba"] = numba + + # Register the source so inspect can find it + linecache.cache[filename] = (len(expr), None, expr.splitlines(True), filename) + + exec(compile(expr, filename, "exec"), SAFE_GLOBALS, local_ns) + func = local_ns[name] + # If the saved LazyUDF was a DSL kernel, re-wrap and restore the dsl_source + if "dsl_source" in lazyarray: + if not isinstance(func, DSLKernel): + func = DSLKernel(func) + if func.dsl_source is None: + # Re-extraction from linecache failed; use the saved verbatim dsl_source + func.dsl_source = lazyarray["dsl_source"] + # TODO: make more robust for general kwargs (not just cparams) + return blosc2.lazyudf( + func, + tuple(operands_dict[f"o{n}"] for n in range(len(operands_dict))), + shape=array.shape, + dtype=array.dtype, + cparams=array.cparams, + ) + + def _open_lazyarray(array): value = array.schunk.meta["LazyArray"] lazyarray = array.schunk.vlmeta["_LazyArray"] @@ -4380,32 +4419,7 @@ def _open_lazyarray(array): if value == LazyArrayEnum.Expr.value: new_expr = LazyExpr._new_expr(expr, operands_dict, guess=True, out=None, where=None) elif value == LazyArrayEnum.UDF.value: - local_ns = {} - name = lazyarray["name"] - filename = f"<{name}>" # any unique name - SAFE_GLOBALS = { - "__builtins__": { - name: value for name, value in builtins.__dict__.items() if name != "__import__" - }, - "np": np, - "blosc2": blosc2, - } - if blosc2._HAS_NUMBA: - SAFE_GLOBALS["numba"] = numba - - # Register the source so inspect can find it - linecache.cache[filename] = (len(expr), None, expr.splitlines(True), filename) - - exec(compile(expr, filename, "exec"), SAFE_GLOBALS, local_ns) - func = local_ns[name] - # TODO: make more robust for general kwargs (not just cparams) - new_expr = blosc2.lazyudf( - func, - tuple(operands_dict[f"o{n}"] for n in range(len(operands_dict))), - shape=array.shape, - dtype=array.dtype, - cparams=array.cparams, - ) + new_expr = _reconstruct_lazyudf(expr, lazyarray, operands_dict, array) # Make the array info available for the user (only available when opened from disk) new_expr.array = array diff --git a/tests/ndarray/test_dsl_kernels.py b/tests/ndarray/test_dsl_kernels.py index 5660be4a..f29092c6 100644 --- a/tests/ndarray/test_dsl_kernels.py +++ b/tests/ndarray/test_dsl_kernels.py @@ -5,6 +5,7 @@ # SPDX-License-Identifier: BSD-3-Clause ####################################################################### + import numpy as np import pytest @@ -16,15 +17,6 @@ clip = np.clip -def _windows_policy_blocks_dsl_dtype(dtype, operand_dtypes=()) -> bool: - import importlib - - lazyexpr_mod = importlib.import_module("blosc2.lazyexpr") - dtype = np.dtype(dtype) - dtype_mismatch = any(np.dtype(op_dtype) != dtype for op_dtype in operand_dtypes) - return lazyexpr_mod.sys.platform == "win32" and (blosc2.isdtype(dtype, "integral") or dtype_mismatch) - - def _make_arrays(shape=(8, 8), chunks=(4, 4), blocks=(2, 2)): a = np.linspace(0, 1, num=np.prod(shape), dtype=np.float32).reshape(shape) b = np.linspace(1, 2, num=np.prod(shape), dtype=np.float32).reshape(shape) @@ -304,10 +296,6 @@ def test_dsl_kernel_index_symbols_int_cast_matches_expected_ramp(): shape = (32, 5) x2 = blosc2.zeros(shape, dtype=np.float32) expr = blosc2.lazyudf(kernel_index_ramp_int_cast, (x2,), dtype=np.int64) - if _windows_policy_blocks_dsl_dtype(np.int64, operand_dtypes=(x2.dtype,)): - with pytest.raises(RuntimeError, match="DSL kernels require miniexpr"): - _ = expr[:] - return try: res = expr[:] except RuntimeError as e: @@ -325,10 +313,6 @@ def test_dsl_kernel_bool_cast_numeric_matches_expected(): x = np.array([[0.0, 1.0, -2.0], [3.5, 0.0, -0.1]], dtype=np.float32) x2 = blosc2.asarray(x, chunks=(2, 3), blocks=(1, 2)) expr = blosc2.lazyudf(kernel_bool_cast_numeric, (x2,), dtype=np.bool_) - if _windows_policy_blocks_dsl_dtype(np.bool_, operand_dtypes=(x2.dtype,)): - with pytest.raises(RuntimeError, match="DSL kernels require miniexpr"): - _ = expr[:] - return res = expr[:] expected = x != 0.0 np.testing.assert_equal(res, expected) @@ -583,3 +567,135 @@ def test_validate_dsl_api_valid_and_invalid(): assert "Ternary expressions are not supported in DSL" in invalid_report["error"] assert invalid_report["dsl_source"] is None assert invalid_report["input_names"] is None + + +# --------------------------------------------------------------------------- +# Tests for DSL kernel persistence (save / reload / execute) +# --------------------------------------------------------------------------- + + +@blosc2.dsl_kernel +def kernel_save_simple(x, y): + return x**2 + y**2 + 2 * x * y + + +@blosc2.dsl_kernel +def kernel_save_clamp(x, y): + return where(x + y > 1.5, x + y, 1.5) + + +@blosc2.dsl_kernel +def kernel_save_loop(x, y): + acc = 0.0 + for i in range(3): + acc = acc + x * (i + 1) - y + return acc + + +def _save_reload_compute(kernel, inputs_np, inputs_b2, dtype, urlpaths, extra_kwargs=None): + """Save a LazyUDF backed by *kernel*, reload it, and return (reloaded_expr, result).""" + lazy = blosc2.lazyudf(kernel, inputs_b2, dtype=dtype, **(extra_kwargs or {})) + lazy.save(urlpath=urlpaths["lazy"]) + reloaded = blosc2.open(urlpaths["lazy"]) + return reloaded, reloaded.compute() + + +def test_dsl_save_simple(tmp_path): + """Simple quadratic kernel: dsl_source and DSLKernel type survive a round-trip.""" + from blosc2.dsl_kernel import DSLKernel + + shape = (16, 16) + na = np.linspace(0, 1, np.prod(shape), dtype=np.float32).reshape(shape) + nb = np.linspace(1, 2, np.prod(shape), dtype=np.float32).reshape(shape) + a = blosc2.asarray(na, urlpath=str(tmp_path / "a.b2nd"), mode="w") + b = blosc2.asarray(nb, urlpath=str(tmp_path / "b.b2nd"), mode="w") + + urlpaths = {"lazy": str(tmp_path / "lazy.b2nd")} + reloaded, result = _save_reload_compute(kernel_save_simple, (na, nb), (a, b), np.float64, urlpaths) + + assert isinstance(reloaded, blosc2.LazyUDF) + assert isinstance(reloaded.func, DSLKernel), "func must be a DSLKernel after reload" + assert reloaded.func.dsl_source is not None, "dsl_source must be preserved" + assert "kernel_save_simple" in reloaded.func.dsl_source + + expected = (na + nb) ** 2 # (x+y)^2 == x^2 + y^2 + 2xy + np.testing.assert_allclose(result[...], expected, rtol=1e-5, atol=1e-6) + + +def test_dsl_save_clamp(tmp_path): + """Kernel with a `where` call survives save/reload and produces correct values.""" + from blosc2.dsl_kernel import DSLKernel + + shape = (20, 20) + na = np.linspace(0, 1, np.prod(shape), dtype=np.float32).reshape(shape) + nb = np.linspace(0.5, 1.5, np.prod(shape), dtype=np.float32).reshape(shape) + a = blosc2.asarray(na, urlpath=str(tmp_path / "a.b2nd"), mode="w") + b = blosc2.asarray(nb, urlpath=str(tmp_path / "b.b2nd"), mode="w") + + urlpaths = {"lazy": str(tmp_path / "lazy.b2nd")} + reloaded, result = _save_reload_compute(kernel_save_clamp, (na, nb), (a, b), np.float64, urlpaths) + + assert isinstance(reloaded.func, DSLKernel) + assert reloaded.func.dsl_source is not None + + expected = np.where(na + nb > 1.5, na + nb, 1.5) + np.testing.assert_allclose(result[...], expected, rtol=1e-5, atol=1e-6) + + +@pytest.mark.skipif(blosc2.IS_WASM, reason="Not supported on WASM") +def test_dsl_save_loop(tmp_path): + """Kernel with a loop (full DSL function) survives save/reload.""" + from blosc2.dsl_kernel import DSLKernel + + shape = (12, 12) + na = np.linspace(0, 1, np.prod(shape), dtype=np.float32).reshape(shape) + nb = np.linspace(1, 2, np.prod(shape), dtype=np.float32).reshape(shape) + a = blosc2.asarray(na, urlpath=str(tmp_path / "a.b2nd"), mode="w") + b = blosc2.asarray(nb, urlpath=str(tmp_path / "b.b2nd"), mode="w") + + urlpaths = {"lazy": str(tmp_path / "lazy.b2nd")} + reloaded, result = _save_reload_compute(kernel_save_loop, (na, nb), (a, b), np.float64, urlpaths) + + assert isinstance(reloaded.func, DSLKernel) + assert reloaded.func.dsl_source is not None + assert "for i in range(3):" in reloaded.func.dsl_source + + expected = kernel_save_loop.func(na, nb) + np.testing.assert_allclose(result[...], expected, rtol=1e-5, atol=1e-6) + + +def test_dsl_save_getitem(tmp_path): + """Reloaded DSL kernel supports __getitem__ (sliced access), not just compute().""" + from blosc2.dsl_kernel import DSLKernel + + shape = (16, 16) + na = np.linspace(0, 1, np.prod(shape), dtype=np.float32).reshape(shape) + nb = np.linspace(1, 2, np.prod(shape), dtype=np.float32).reshape(shape) + a = blosc2.asarray(na, urlpath=str(tmp_path / "a.b2nd"), mode="w") + b = blosc2.asarray(nb, urlpath=str(tmp_path / "b.b2nd"), mode="w") + + lazy = blosc2.lazyudf(kernel_save_simple, (a, b), dtype=np.float64) + lazy.save(urlpath=str(tmp_path / "lazy.b2nd")) + reloaded = blosc2.open(str(tmp_path / "lazy.b2nd")) + + assert isinstance(reloaded.func, DSLKernel) + expected = (na + nb) ** 2 + np.testing.assert_allclose(reloaded[()], expected, rtol=1e-5, atol=1e-6) + + +def test_dsl_save_input_names_match(tmp_path): + """After reload, input_names in the DSLKernel match the original kernel.""" + from blosc2.dsl_kernel import DSLKernel + + shape = (10, 10) + na = np.linspace(0, 1, np.prod(shape), dtype=np.float32).reshape(shape) + nb = np.linspace(1, 2, np.prod(shape), dtype=np.float32).reshape(shape) + a = blosc2.asarray(na, urlpath=str(tmp_path / "a.b2nd"), mode="w") + b = blosc2.asarray(nb, urlpath=str(tmp_path / "b.b2nd"), mode="w") + + lazy = blosc2.lazyudf(kernel_save_simple, (a, b), dtype=np.float64) + lazy.save(urlpath=str(tmp_path / "lazy.b2nd")) + reloaded = blosc2.open(str(tmp_path / "lazy.b2nd")) + + assert isinstance(reloaded.func, DSLKernel) + assert reloaded.func.input_names == ["x", "y"]