diff --git a/README.md b/README.md index a56c9aa..ed9419c 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Currently this library is not on maven so you have to add it via git deps (note: ```clojure andersmurphy/sqlite4clj {:git/url "https://github.com/andersmurphy/sqlite4clj" - :git/sha "7cc1fc180c683a27aa3f91d434729eb07f9702f6"} + :git/sha "ecf9876626985d61bd27605ea6748aed25eaad87"} ``` Initialise a db: @@ -220,8 +220,9 @@ sqlite4clj automatically encodes any EDN object you pass it: ``` This effectively lets you use SQLite as an EDN document store. +When decoding BLOB values, sqlite4clj reads the exact SQLite BLOB size before parsing EDN. -Encoding is done with [fast-edn](https://github.com/tonsky/fast-edn) as text and then converted into bytes. From my testing this was faster than both [deed](https://github.com/igrishaev/deed) and [nippy](https://github.com/taoensso/nippy) despite being a text format. Being a text format it is stable and can be swapped out for faster EDN text serialises without breaking changes. Of course, this also means only EDN data is support and not arbitrary Java classes. +Encoding is done with [fast-edn](https://github.com/tonsky/fast-edn) as text and then converted into bytes. From my testing this was faster than both [deed](https://github.com/igrishaev/deed) and [nippy](https://github.com/taoensso/nippy) despite being a text format. Being a text format it is stable and can be swapped out for faster EDN text serialises without breaking changes. Of course, this also means only EDN data is supported and not arbitrary Java classes. ## Application functions @@ -240,7 +241,55 @@ Declaring and using an application function: ;; [["46536a4a-0b1e-4749-9c01-f44f73de3b91" {:type "foo", :a 3, :b 3}]] ``` -When dealing with columns that are encoded EDN blobs they will automatically decoded. +When dealing with columns that are encoded EDN blobs they will automatically be decoded. + +You can unregister scalar functions with: + +```clojure +(d/remove-function db "entity_type") +;; optional arity-specific removal: +(d/remove-function db "entity_type" 1) +``` + +## Application aggregates + +SQLite also supports [Application-Defined Aggregate Functions](https://www.sqlite.org/appfunc.html). sqlite4clj exposes these via `d/create-aggregate`. + +The aggregate step callback receives `[state & sql-args]`, and arity is inferred as `step-arity - 1` by default: + +```clojure +(d/create-aggregate db "sum_n" + (fn [state n] + (+ (or state 0) n)) + (fn [state] state) + {:deterministic? true}) + +(d/q (:reader db) ["SELECT sum_n(checks) FROM session"]) +;; => +;; [42] +``` + +You can also: + +- pass vars (for repl-driven redefinition), e.g. `#'sum-step` and `#'sum-final` +- set `:arity` explicitly (`-1` for variadic aggregates) +- set `:initial-state` to control empty-input results +- remove aggregates with `d/remove-aggregate` (optionally by arity) + +```clojure +(d/create-aggregate db "sum_empty_init" + (fn [state n] (+ (or state 0) n)) + (fn [state] state) + {:initial-state 10}) + +(d/q (:writer db) ["SELECT sum_empty_init(n) FROM (SELECT 1 AS n WHERE 0)"]) +;; => +;; [10] + +(d/remove-aggregate db "sum_n") +;; optional arity-specific removal: +(d/remove-aggregate db "sum_n" 1) +``` ## Indexing on encoded edn blobs diff --git a/src/sqlite4clj/core.clj b/src/sqlite4clj/core.clj index c7055b8..b70b3c0 100644 --- a/src/sqlite4clj/core.clj +++ b/src/sqlite4clj/core.clj @@ -4,6 +4,7 @@ (:require [sqlite4clj.impl.api :as api] [sqlite4clj.impl.functions :as funcs] + [sqlite4clj.impl.functions-aggregates :as aggs] [clojure.core.cache.wrapped :as cache]) (:import (java.util.concurrent BlockingQueue LinkedBlockingQueue))) @@ -240,7 +241,8 @@ :reader reader ;; Prevents application function callback pointers from getting ;; garbage collected. - :internal {:app-functions (atom {})}})) + :internal {:app-functions (atom {}) + :app-aggregates (atom {})}})) (defmacro with-conn "Use the same connection for a series of queries (not a transaction) without @@ -355,3 +357,42 @@ (funcs/remove-function db name)) ([db name arity] (funcs/remove-function db name arity))) + +(defn create-aggregate + "register a user-defined aggregate function with sqlite on all connections. + + parameters: + - db: database from init-db! + - name: string function name + - step-f-or-var: step callback function or var + - final-f-or-var: final callback function or var + - opts: a map of options that can include: + + the sqlite function flags (see https://www.sqlite.org/c3ref/c_deterministic.html) + - :deterministic? (boolean) + - :direct-only? (boolean) + - :innocuous? (boolean) + - :sub-type? (boolean) + - :result-sub-type? (boolean) + - :self-order1? (boolean) + + aggregate options: + - :arity (int): SQL arity. -1 means variadic. + - :initial-state: initial step state used for empty inputs when provided. + + by default SQL arity is inferred from step-f-or-var as: + sql arity = (step arity - 1) where the first argument is aggregate state. + step callbacks must have signature: + (fn [state & sql-args] new-state) + final callbacks must have signature: + (fn [state] result)" + [db name step-f-or-var final-f-or-var & {:as opts}] + (aggs/create-aggregate db name step-f-or-var final-f-or-var opts)) + +(defn remove-aggregate + "unregister a user-defined aggregate from sqlite on all connections. + if an arity is not provided, it will unregister all arities for the aggregate." + ([db name] + (aggs/remove-aggregate db name)) + ([db name arity] + (aggs/remove-aggregate db name arity))) diff --git a/src/sqlite4clj/impl/api.clj b/src/sqlite4clj/impl/api.clj index 51c1edf..1a3602e 100644 --- a/src/sqlite4clj/impl/api.clj +++ b/src/sqlite4clj/impl/api.clj @@ -234,6 +234,12 @@ ::mem/pointer] ;; xDestroy (destructor) ::mem/int) +(defcfn aggregate-context + sqlite3_aggregate_context + [::mem/pointer ;; sqlite3_context* + ::mem/int] ;; nBytes + ::mem/pointer) + (defcfn value-text sqlite3_value_text [::mem/pointer] ::mem/c-string) diff --git a/src/sqlite4clj/impl/functions_aggregates.clj b/src/sqlite4clj/impl/functions_aggregates.clj new file mode 100644 index 0000000..f98835d --- /dev/null +++ b/src/sqlite4clj/impl/functions_aggregates.clj @@ -0,0 +1,256 @@ +(ns sqlite4clj.impl.functions-aggregates + (:require + [coffi.ffi :as ffi] + [coffi.mem :as mem] + [sqlite4clj.impl.api :as api] + [sqlite4clj.impl.functions :as funcs]) + (:import + [java.util.concurrent ConcurrentHashMap] + [java.util.concurrent.atomic AtomicReference])) + +(defn app-aggregates [db] + (when-let [aggs (get-in db [:internal :app-aggregates])] + @aggs)) + +(defn get-aggregate + ([db name] + (get-in (app-aggregates db) [name])) + ([db name arity] + (get-in (app-aggregates db) [name arity]))) + +(defn unregister-aggregate-callback [db name arity flags] + (funcs/doto-connections db + (fn [conn] + (let [pdb (:pdb conn) + ;; "To delete an existing SQL function or aggregate, pass NULL pointers for all three function callbacks." + code (api/create-function-v2 pdb name arity flags mem/null + mem/null mem/null mem/null mem/null)] + (when-not (api/sqlite-ok? code) + (throw (api/sqlite-ex-info pdb code {:aggregate name}))))))) + +(defn- build-removal-update + [agg-data arity] + (let [arities-to-remove (if arity + (when (get agg-data arity) #{arity}) + (set (keys (dissoc agg-data :meta)))) + remaining-arities (if arity + (disj (set (keys (dissoc agg-data :meta))) arity) + #{})] + {:remove-arities arities-to-remove + :remove-all? (empty? remaining-arities)})) + +(defn- clear-aggregate-arities + [db name] + (when-let [agg-data (get-aggregate db name)] + (let [{:keys [remove-arities]} (build-removal-update agg-data nil)] + (doseq [a remove-arities + :let [{:keys [flags]} (get agg-data a)]] + (unregister-aggregate-callback db name a flags)) + (swap! (get-in db [:internal :app-aggregates]) + #(update % name (fn [agg-entry] + (reduce dissoc agg-entry remove-arities))))))) + +(defn- aggregate-error-message [^Throwable e] + (or (.getMessage e) + (str "Unexpected " (.getSimpleName (class e))))) + +(defn wrap-aggregate-step + [step-f initial-state state-by-context] + (fn [context argc argv] + (try + (let [ctx-ptr (api/aggregate-context context 8)] + (if (mem/null? ctx-ptr) + (api/result-error context "Failed to allocate aggregate context") + (let [ctx-key (Long/valueOf (mem/address-of ctx-ptr)) + args (funcs/deserialize-argv argv argc) + clj-args (mapv funcs/value->clj args) + new-state (AtomicReference. initial-state) + state-ref (or (.putIfAbsent state-by-context ctx-key new-state) + new-state) + next-state (apply step-f (.get state-ref) clj-args)] + (.set state-ref next-state)))) + (catch Throwable e + (when-let [ctx-ptr (let [ptr (api/aggregate-context context 0)] + (when-not (mem/null? ptr) ptr))] + (.remove state-by-context (Long/valueOf (mem/address-of ctx-ptr)))) + (api/result-error context (aggregate-error-message e)))))) + +(defn wrap-aggregate-final + [final-f initial-state initial-state? state-by-context] + (fn [context] + (try + (let [ctx-ptr (api/aggregate-context context 0) + state-ref (when-not (mem/null? ctx-ptr) + (.remove state-by-context + (Long/valueOf (mem/address-of ctx-ptr)))) + has-state? (or (some? state-ref) initial-state?)] + (if has-state? + (let [state (if state-ref (.get ^AtomicReference state-ref) initial-state) + result (final-f state) + result-fn (funcs/result->result-fn result)] + (result-fn context result)) + (api/result-null context))) + (catch Throwable e + (api/result-error context (aggregate-error-message e)))))) + +(defn aggregate-arities + [step-f {:keys [arity] :as opts}] + (if (contains? opts :arity) + (cond + (= arity -1) [:variadic] + (and (int? arity) (>= arity 0)) [arity] + :else + (throw (ex-info "Aggregate :arity must be an integer >= 0 or -1 for variadic" + {:arity arity}))) + (let [step-arities (funcs/infer-arity step-f)] + (when-not step-arities + (throw (ex-info "Could not infer aggregate arity from step function" + {:step-fn step-f}))) + (mapv (fn [n] + (if (= n :variadic) + :variadic + (let [sql-arity (dec n)] + (when (neg? sql-arity) + (throw (ex-info "Aggregate step function must accept at least a state argument" + {:step-arity n}))) + sql-arity))) + step-arities)))) + +(defn- do-register-aggregate + [db name step-f final-f arities flags-bitmask + {:keys [initial-state] :as opts} + {:keys [step-source final-source watch-keys]}] + (let [registrations (vec + (for [n arities] + (let [arity (if (= n :variadic) -1 n) + state-by-context (ConcurrentHashMap.) + step-callback (wrap-aggregate-step step-f initial-state state-by-context) + step-callback-ptr (mem/serialize step-callback + [::ffi/fn + [::mem/pointer ::mem/int ::mem/pointer] + ::mem/void + :raw-fn? true] + (mem/global-arena)) + final-callback (wrap-aggregate-final final-f initial-state + (contains? opts :initial-state) + state-by-context) + final-callback-ptr (mem/serialize final-callback + [::ffi/fn + [::mem/pointer] + ::mem/void + :raw-fn? true] + (mem/global-arena))] + (funcs/doto-connections db + (fn [conn] + (let [pdb (:pdb conn) + code (api/create-function-v2 pdb name arity flags-bitmask + mem/null + mem/null + step-callback-ptr + final-callback-ptr + mem/null)] + (when-not (api/sqlite-ok? code) + (throw (api/sqlite-ex-info pdb code {:aggregate name})))))) + {:arity arity + :data {:flags flags-bitmask + :step-callback step-callback + :step-callback-ptr step-callback-ptr + :final-callback final-callback + :final-callback-ptr final-callback-ptr + :state-by-context state-by-context}}))) + metadata (when (or (var? step-source) (var? final-source)) + {:step-source step-source + :final-source final-source + :watch-keys watch-keys + :opts opts})] + (swap! (get-in db [:internal :app-aggregates]) + update name + (fn [existing] + (let [new-arities (into {} + (for [{:keys [arity data]} registrations] + [arity data]))] + (cond-> (merge existing new-arities) + metadata (assoc :meta metadata))))))) + +(defn- aggregate-var-updated + [db name opts step-source final-source watch-keys _watch-key _var _old-val _new-val] + (clear-aggregate-arities db name) + (let [step-f (if (var? step-source) (var-get step-source) step-source) + final-f (if (var? final-source) (var-get final-source) final-source)] + (when (and (fn? step-f) (fn? final-f)) + (do-register-aggregate db name step-f final-f + (aggregate-arities step-f opts) + (funcs/function-flags->bitmask opts) + opts + {:step-source step-source + :final-source final-source + :watch-keys watch-keys})))) + +(defn register-aggregate + [db name step-f final-f & {:as opts}] + (when-not (fn? step-f) + (throw (ex-info "step-f must be a function" {:step-f step-f}))) + (when-not (fn? final-f) + (throw (ex-info "final-f must be a function" {:final-f final-f}))) + (do-register-aggregate db name step-f final-f + (aggregate-arities step-f opts) + (funcs/function-flags->bitmask opts) + opts + {:step-source step-f + :final-source final-f + :watch-keys {}})) + +(defn register-aggregate-vars + [db name step-source final-source & {:as opts}] + (let [step-f (if (var? step-source) (var-get step-source) step-source) + final-f (if (var? final-source) (var-get final-source) final-source) + watch-keys {:step (when (var? step-source) + (keyword (str "sqlite4clj-" name "-aggregate-step-var"))) + :final (when (var? final-source) + (keyword (str "sqlite4clj-" name "-aggregate-final-var")))}] + (when-not (fn? step-f) + (throw (ex-info "step-f source must resolve to a function" + {:step-source step-source}))) + (when-not (fn? final-f) + (throw (ex-info "final-f source must resolve to a function" + {:final-source final-source}))) + (do-register-aggregate db name step-f final-f + (aggregate-arities step-f opts) + (funcs/function-flags->bitmask opts) + opts + {:step-source step-source + :final-source final-source + :watch-keys watch-keys}) + (when-let [watch-key (:step watch-keys)] + (add-watch step-source watch-key + (partial aggregate-var-updated db name opts step-source final-source watch-keys))) + (when-let [watch-key (:final watch-keys)] + (add-watch final-source watch-key + (partial aggregate-var-updated db name opts step-source final-source watch-keys))))) + +(defn create-aggregate + [db name step-f-or-var final-f-or-var & {:as opts}] + (if (or (var? step-f-or-var) (var? final-f-or-var)) + (register-aggregate-vars db name step-f-or-var final-f-or-var opts) + (register-aggregate db name step-f-or-var final-f-or-var opts))) + +(defn remove-aggregate + ([db name] + (remove-aggregate db name nil)) + ([db name arity] + (when-let [agg-data (get-aggregate db name)] + (let [{:keys [remove-arities remove-all?]} (build-removal-update agg-data arity)] + (doseq [a remove-arities + :let [{:keys [flags]} (get agg-data a)]] + (unregister-aggregate-callback db name a flags)) + (when (and remove-all? (:meta agg-data)) + (let [{:keys [step-source final-source watch-keys]} (:meta agg-data)] + (when-let [watch-key (:step watch-keys)] + (remove-watch step-source watch-key)) + (when-let [watch-key (:final watch-keys)] + (remove-watch final-source watch-key)))) + (swap! (get-in db [:internal :app-aggregates]) + (if remove-all? + #(dissoc % name) + #(update % name (fn [agg-entry] + (reduce dissoc agg-entry remove-arities))))))))) diff --git a/test/sqlite4clj/impl/functions_aggregates_test.clj b/test/sqlite4clj/impl/functions_aggregates_test.clj new file mode 100644 index 0000000..2b525c8 --- /dev/null +++ b/test/sqlite4clj/impl/functions_aggregates_test.clj @@ -0,0 +1,213 @@ +(ns sqlite4clj.impl.functions-aggregates-test + (:require + [clojure.test :refer [deftest is testing use-fixtures]] + [sqlite4clj.core :as d] + [sqlite4clj.impl.api :as api] + [sqlite4clj.impl.functions-aggregates :as aggs] + [sqlite4clj.test-common :refer [test-db test-fixture with-db]])) + +(use-fixtures :once test-fixture) + +(deftest basic-aggregate-registration + (testing "Can register and call an aggregate on writer and reader pools" + (with-db [db (test-db)] + (d/q (:writer db) ["CREATE TABLE agg_basic_nums (n INTEGER)"]) + (d/q (:writer db) ["INSERT INTO agg_basic_nums VALUES (1), (2), (3), (4)"]) + (d/create-aggregate db "sum_n" + (fn [state n] + (+ (or state 0) n)) + (fn [state] state) + {:deterministic? true}) + + (is (= [10] (d/q (:writer db) ["SELECT sum_n(n) FROM agg_basic_nums"]))) + (is (= [10] (d/q (:reader db) ["SELECT sum_n(n) FROM agg_basic_nums"])))))) + +(deftest aggregate-type-handling + (testing "Aggregate step receives decoded SQLite values" + (with-db [db (test-db)] + (d/q (:writer db) ["CREATE TABLE agg_type_data (v BLOB)"]) + (d/q (:writer db) ["INSERT INTO agg_type_data VALUES (42), (3.14), ('hello'), (NULL), (?)" + (byte-array [1 2 3])]) + + (d/create-aggregate db "collect_types" + (fn [state v] + (conj (or state []) + (cond + (nil? v) "NULL" + (integer? v) "INTEGER" + (double? v) "REAL" + (string? v) "TEXT" + (bytes? v) "BLOB" + :else "UNKNOWN"))) + (fn [state] state)) + + (is (= [["INTEGER" "REAL" "TEXT" "NULL" "BLOB"]] + (d/q (:writer db) ["SELECT collect_types(v) FROM (SELECT v FROM agg_type_data ORDER BY rowid)"])))))) + +(deftest aggregate-arity-inference + (testing "Arity is inferred from step signature and supports variadic callbacks" + (with-db [db (test-db)] + (d/q (:writer db) ["CREATE TABLE agg_arity_nums (n INTEGER)"]) + (d/q (:writer db) ["INSERT INTO agg_arity_nums VALUES (1), (2), (3)"]) + + (d/create-aggregate db "count_rows" + (fn [state] + (inc (or state 0))) + (fn [state] state)) + (is (= [3] (d/q (:writer db) ["SELECT count_rows() FROM agg_arity_nums"]))) + + (d/create-aggregate db "sum_pairs" + (fn [state a b] + (+ (or state 0) a b)) + (fn [state] state)) + (is (= [12] (d/q (:writer db) ["SELECT sum_pairs(n, n) FROM agg_arity_nums"]))) + + (d/create-aggregate db "count_variadic" + (fn [state & args] + (+ (or state 0) (count args))) + (fn [state] state)) + (is (= [6] (d/q (:writer db) ["SELECT count_variadic(n, n) FROM agg_arity_nums"]))) + + (d/create-aggregate db "count_variadic_arity" + (fn [state & args] + (+ (or state 0) (count args))) + (fn [state] state) + {:arity 1}) + (is (= [3] (d/q (:writer db) ["SELECT count_variadic_arity(n) FROM agg_arity_nums"])))))) + +(deftest aggregate-initial-state-and-empty-input + (testing "Empty input returns NULL by default and finalized initial state when provided" + (with-db [db (test-db)] + (d/create-aggregate db "sum_empty_default" + (fn [state n] + (+ (or state 0) n)) + (fn [state] state)) + + (d/create-aggregate db "sum_empty_init" + (fn [state n] + (+ (or state 0) n)) + (fn [state] state) + {:initial-state 10}) + + (is (= [nil] + (d/q (:writer db) ["SELECT sum_empty_default(n) FROM (SELECT 1 AS n WHERE 0)"]))) + (is (= [10] + (d/q (:writer db) ["SELECT sum_empty_init(n) FROM (SELECT 1 AS n WHERE 0)"])))))) + +(deftest aggregate-exception-handling + (testing "Step and final callback exceptions are surfaced as SQLite errors" + (with-db [db (test-db)] + (d/q (:writer db) ["CREATE TABLE agg_exception_nums (n INTEGER)"]) + (d/q (:writer db) ["INSERT INTO agg_exception_nums VALUES (1), (2), (3)"]) + + (d/create-aggregate db "explode_step" + (fn [state n] + (if (= n 2) + (throw (Exception. "explode step")) + (+ (or state 0) n))) + (fn [state] state)) + (is (thrown-with-msg? Exception #"explode step" + (d/q (:writer db) ["SELECT explode_step(n) FROM agg_exception_nums"]))) + + (d/create-aggregate db "explode_final" + (fn [state n] + (+ (or state 0) n)) + (fn [_] + (throw (Exception. "explode final")))) + (is (thrown-with-msg? Exception #"explode final" + (d/q (:writer db) ["SELECT explode_final(n) FROM agg_exception_nums"])))))) + +(defn get-flags [db name arity] + (:flags (aggs/get-aggregate db name arity))) + +(defn has-flag? [db name arity flag] + (let [flags (get-flags db name arity)] + (and flags (bit-test flags flag)))) + +(deftest aggregate-flags + (testing "Aggregate flags are captured in registration metadata" + (with-db [db (test-db)] + (d/create-aggregate db "all_agg_flags" + (fn [state x] + (+ (or state 0) x)) + (fn [state] state) + {:deterministic? true + :innocuous? true + :direct-only? true + :sub-type? true + :result-sub-type? true + :self-order1? true}) + (doseq [flag [api/SQLITE_DETERMINISTIC + api/SQLITE_INNOCUOUS + api/SQLITE_DIRECTONLY + api/SQLITE_SUBTYPE + api/SQLITE_RESULT_SUBTYPE + api/SQLITE_SELFORDER1]] + (is (has-flag? db "all_agg_flags" 1 flag)))))) + +(deftest removing-aggregates + (testing "Aggregates can be removed by arity or by name" + (with-db [db (test-db)] + (d/q (:writer db) ["CREATE TABLE agg_remove_nums (n INTEGER)"]) + (d/q (:writer db) ["INSERT INTO agg_remove_nums VALUES (1), (2), (3)"]) + + (d/create-aggregate db "sum_any" + (fn [state x] + (+ (or state 0) x)) + (fn [state] state)) + (d/create-aggregate db "sum_any" + (fn [state x y] + (+ (or state 0) x y)) + (fn [state] state)) + + (is (= [6] (d/q (:writer db) ["SELECT sum_any(n) FROM agg_remove_nums"]))) + (is (= [12] (d/q (:writer db) ["SELECT sum_any(n, n) FROM agg_remove_nums"]))) + (is (some? (aggs/get-aggregate db "sum_any" 1))) + (is (some? (aggs/get-aggregate db "sum_any" 2))) + + (d/remove-aggregate db "sum_any" 1) + (is (nil? (aggs/get-aggregate db "sum_any" 1))) + (is (some? (aggs/get-aggregate db "sum_any" 2))) + (is (= [12] (d/q (:writer db) ["SELECT sum_any(n, n) FROM agg_remove_nums"]))) + (is (thrown-with-msg? Exception #"wrong number of arguments to function sum_any\(\)" + (d/q (:writer db) ["SELECT sum_any(n) FROM agg_remove_nums"]))) + + (d/remove-aggregate db "sum_any") + (is (nil? (aggs/get-aggregate db "sum_any" 2))) + (is (thrown-with-msg? Exception #"no such function: sum_any" + (d/q (:writer db) ["SELECT sum_any(n, n) FROM agg_remove_nums"])))))) + +(deftest registering-aggregate-vars + (testing "Aggregate vars can be redefined and stop updating after removal" + #_{:clj-kondo/ignore [:inline-def]} + (defn watched-agg-step [state x] + (+ (or state 0) x)) + #_{:clj-kondo/ignore [:inline-def]} + (defn watched-agg-final [state] + state) + (with-db [db (test-db)] + (d/q (:writer db) ["CREATE TABLE agg_watch_nums (n INTEGER)"]) + (d/q (:writer db) ["INSERT INTO agg_watch_nums VALUES (1), (2), (3)"]) + + (d/create-aggregate db "watched_sum" #'watched-agg-step #'watched-agg-final) + (is (= [6] (d/q (:writer db) ["SELECT watched_sum(n) FROM agg_watch_nums"]))) + + (alter-var-root #'watched-agg-step + (fn [_] + (fn [state x] + (+ (or state 0) (* 2 x))))) + (is (= [12] (d/q (:writer db) ["SELECT watched_sum(n) FROM agg_watch_nums"]))) + + (alter-var-root #'watched-agg-final + (fn [_] + (fn [state] + (inc state)))) + (is (= [13] (d/q (:writer db) ["SELECT watched_sum(n) FROM agg_watch_nums"]))) + + (d/remove-aggregate db "watched_sum") + (alter-var-root #'watched-agg-step + (fn [_] + (fn [state x] + (+ (or state 0) (* 3 x))))) + (is (thrown-with-msg? Exception #"no such function: watched_sum" + (d/q (:writer db) ["SELECT watched_sum(n) FROM agg_watch_nums"]))))))