Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/bindings-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ mod sym {
symbol!(unique);
symbol!(update);
symbol!(default);
symbol!(event);

symbol!(u8);
symbol!(i8);
Expand Down
18 changes: 18 additions & 0 deletions crates/bindings-macro/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub(crate) struct TableArgs {
scheduled: Option<ScheduledArg>,
name: Ident,
indices: Vec<IndexArg>,
event: Option<Span>,
}

enum TableAccess {
Expand Down Expand Up @@ -71,6 +72,7 @@ impl TableArgs {
let mut scheduled = None;
let mut name = None;
let mut indices = Vec::new();
let mut event = None;
syn::meta::parser(|meta| {
match_meta!(match meta {
sym::public => {
Expand All @@ -91,6 +93,10 @@ impl TableArgs {
check_duplicate(&scheduled, &meta)?;
scheduled = Some(ScheduledArg::parse_meta(meta)?);
}
sym::event => {
check_duplicate(&event, &meta)?;
event = Some(meta.path.span());
}
});
Ok(())
})
Expand All @@ -107,6 +113,7 @@ impl TableArgs {
scheduled,
name,
indices,
event,
})
}
}
Expand Down Expand Up @@ -841,6 +848,14 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
);

let table_access = args.access.iter().map(|acc| acc.to_value());
let is_event = args.event.iter().map(|_| quote!(const IS_EVENT: bool = true;));
let can_be_lookup_impl = if args.event.is_none() {
quote! {
impl spacetimedb::query_builder::CanBeLookupTable for #original_struct_ident {}
}
} else {
quote! {}
};
let unique_col_ids = unique_columns.iter().map(|col| col.index);
let primary_col_id = primary_key_column.clone().into_iter().map(|col| col.index);
let sequence_col_ids = sequenced_columns.iter().map(|col| col.index);
Expand Down Expand Up @@ -966,6 +981,7 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
const TABLE_NAME: &'static str = #table_name;
// the default value if not specified is Private
#(const TABLE_ACCESS: spacetimedb::table::TableAccess = #table_access;)*
#(#is_event)*
const UNIQUE_COLUMNS: &'static [u16] = &[#(#unique_col_ids),*];
const INDEXES: &'static [spacetimedb::table::IndexDesc<'static>] = &[#(#index_descs),*];
#(const PRIMARY_KEY: Option<u16> = Some(#primary_col_id);)*
Expand Down Expand Up @@ -1077,6 +1093,8 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
}
}

#can_be_lookup_impl

};

let table_query_handle_def = quote! {
Expand Down
2 changes: 1 addition & 1 deletion crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ pub mod raw {
/// would be where you would initialize the interepreter and load the user module into it.
fn __setup__() -> Result;
/// Required. Runs after `__setup__`; returns all the exports for the module.
fn __describe_module__() -> Encoded<ModuleDef>;
fn __describe_module_v10__() -> Encoded<ModuleDef>;
/// Required. id is an index into the `ModuleDef.reducers` returned from `__describe_module__`.
/// args is a bsatn-encoded product value defined by the schema at `reducers[id]`.
fn __call_reducer__(
Expand Down
31 changes: 20 additions & 11 deletions crates/bindings/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use crate::query_builder::Query;
use crate::table::IndexAlgo;
use crate::{sys, AnonymousViewContext, IterBuf, ReducerContext, ReducerResult, SpacetimeType, Table, ViewContext};
use spacetimedb_lib::bsatn::EncodeError;
use spacetimedb_lib::db::raw_def::v10::RawModuleDefV10Builder;
pub use spacetimedb_lib::db::raw_def::v9::Lifecycle as LifecycleReducer;
use spacetimedb_lib::db::raw_def::v9::{RawIndexAlgorithm, RawModuleDefV9Builder, TableType, ViewResultHeader};
use spacetimedb_lib::db::raw_def::v9::{RawIndexAlgorithm, TableType, ViewResultHeader};
use spacetimedb_lib::de::{self, Deserialize, DeserializeOwned, Error as _, SeqProductAccess};
use spacetimedb_lib::sats::typespace::TypespaceBuilder;
use spacetimedb_lib::sats::{impl_deserialize, impl_serialize, ProductTypeElement};
Expand Down Expand Up @@ -674,7 +675,7 @@ pub trait RowLevelSecurityInfo {
}

/// A function which will be registered by [`register_describer`] into [`DESCRIBERS`],
/// which will be called by [`__describe_module__`] to construct a module definition.
/// which will be called by [`__describe_module_v10__`] to construct a module definition.
///
/// May be a closure over static data, so that e.g.
/// [`register_row_level_security`] doesn't need to take a type parameter.
Expand All @@ -700,12 +701,20 @@ pub fn register_reftype<T: SpacetimeType>() {
pub fn register_table<T: Table>() {
register_describer(|module| {
let product_type_ref = *T::Row::make_type(&mut module.inner).as_ref().unwrap();
if let Some(schedule) = T::SCHEDULE {
module.inner.add_schedule(
T::TABLE_NAME,
schedule.scheduled_at_column,
schedule.reducer_or_procedure_name,
);
}

let mut table = module
.inner
.build_table(T::TABLE_NAME, product_type_ref)
.with_type(TableType::User)
.with_access(T::TABLE_ACCESS);
.with_access(T::TABLE_ACCESS)
.with_event(T::IS_EVENT);

for &col in T::UNIQUE_COLUMNS {
table = table.with_unique_constraint(col);
Expand All @@ -719,10 +728,6 @@ pub fn register_table<T: Table>() {
for &col in T::SEQUENCES {
table = table.with_column_sequence(col);
}
if let Some(schedule) = T::SCHEDULE {
table = table.with_schedule(schedule.reducer_or_procedure_name, schedule.scheduled_at_column);
}

for col in T::get_default_col_values().iter_mut() {
table = table.with_default_column_value(col.col_id, col.value.clone())
}
Expand All @@ -749,7 +754,11 @@ impl From<IndexAlgo<'_>> for RawIndexAlgorithm {
pub fn register_reducer<'a, A: Args<'a>, I: FnInfo<Invoke = ReducerFn>>(_: impl Reducer<'a, A>) {
register_describer(|module| {
let params = A::schema::<I>(&mut module.inner);
module.inner.add_reducer(I::NAME, params, I::LIFECYCLE);
if let Some(lifecycle) = I::LIFECYCLE {
module.inner.add_lifecycle_reducer(lifecycle, I::NAME, params);
} else {
module.inner.add_reducer(I::NAME, params);
}
module.reducers.push(I::INVOKE);
})
}
Expand Down Expand Up @@ -814,7 +823,7 @@ pub fn register_row_level_security(sql: &'static str) {
#[derive(Default)]
pub struct ModuleBuilder {
/// The module definition.
inner: RawModuleDefV9Builder,
inner: RawModuleDefV10Builder,
/// The reducers of the module.
reducers: Vec<ReducerFn>,
/// The procedures of the module.
Expand Down Expand Up @@ -863,7 +872,7 @@ static ANONYMOUS_VIEWS: OnceLock<Vec<AnonymousFn>> = OnceLock::new();
/// including when modules are updated (re-publishing).
/// After initialization, the module cannot alter the schema.
#[no_mangle]
extern "C" fn __describe_module__(description: BytesSink) {
extern "C" fn __describe_module_v10__(description: BytesSink) {
// Collect the `module`.
let mut module = ModuleBuilder::default();
for describer in &mut *DESCRIBERS.lock().unwrap() {
Expand All @@ -872,7 +881,7 @@ extern "C" fn __describe_module__(description: BytesSink) {

// Serialize the module to bsatn.
let module_def = module.inner.finish();
let module_def = RawModuleDef::V9(module_def);
let module_def = RawModuleDef::V10(module_def);
let bytes = bsatn::to_vec(&module_def).expect("unable to serialize typespace");

// Write the sets of reducers, procedures and views.
Expand Down
1 change: 1 addition & 0 deletions crates/bindings/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub trait TableInternal: Sized {
const PRIMARY_KEY: Option<u16> = None;
const SEQUENCES: &'static [u16];
const SCHEDULE: Option<ScheduleDesc<'static>> = None;
const IS_EVENT: bool = false;

/// Returns the ID of this table.
fn table_id() -> TableId;
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ pub(crate) mod tests {
access,
None,
None,
false,
),
)?;
let schema = db.schema_for_table_mut(tx, table_id)?;
Expand Down
24 changes: 24 additions & 0 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,12 @@ impl CommittedState {
schema: &Arc<TableSchema>,
row: &ProductValue,
) -> Result<()> {
// Event table rows in the commitlog are preserved for future replay features
// but don't rebuild state — event tables have no committed state.
if schema.is_event {
return Ok(());
}

let (table, blob_store, pool) = self.get_table_and_blob_store_or_create(table_id, schema);

let (_, row_ref) = match table.insert(pool, blob_store, row) {
Expand Down Expand Up @@ -1187,6 +1193,24 @@ impl CommittedState {
// and the fullness of the page.

for (table_id, tx_table) in insert_tables {
// Event tables: record in TxData for commitlog persistence and subscription dispatch,
// but do NOT merge into committed state.
// NOTE: There is no need to call `get_table_and_blob_store_or_create` here.
// The logic for collecting inserts is duplicated, but it's cleaner this way.
if tx_table.get_schema().is_event {
let mut inserts = Vec::with_capacity(tx_table.row_count as usize);
for row_ref in tx_table.scan_rows(&tx_blob_store) {
inserts.push(row_ref.to_product_value());
}
if !inserts.is_empty() {
let table_name = &tx_table.get_schema().table_name;
tx_data.set_inserts_for_table(table_id, table_name, inserts.into());
}
let (_schema, _indexes, pages) = tx_table.consume_for_merge();
self.page_pool.put_many(pages);
continue;
}

let (commit_table, commit_blob_store, page_pool) =
self.get_table_and_blob_store_or_create(table_id, tx_table.get_schema());

Expand Down
1 change: 1 addition & 0 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,7 @@ mod tests {
StAccess::Public,
schedule,
pk,
false,
)
}

Expand Down
13 changes: 10 additions & 3 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use crate::{
system_tables::{
with_sys_table_buf, StClientFields, StClientRow, StColumnFields, StColumnRow, StConstraintFields,
StConstraintRow, StFields as _, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow,
StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, SystemTable,
ST_CLIENT_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID,
ST_SEQUENCE_ID, ST_TABLE_ID,
StEventTableRow, StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow,
SystemTable, ST_CLIENT_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, ST_INDEX_ID,
ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID,
},
};
use crate::{execution_context::ExecutionContext, system_tables::StViewColumnRow};
Expand Down Expand Up @@ -656,6 +656,7 @@ impl MutTxId {
self.insert_st_column(table_schema.columns())?;

let schedule = table_schema.schedule.clone();
let is_event = table_schema.is_event;
let mut schema_internal = table_schema;
// Extract all indexes, constraints, and sequences from the schema.
// We will add them back later with correct ids.
Expand Down Expand Up @@ -685,6 +686,12 @@ impl MutTxId {
table.with_mut_schema(|s| s.schedule.as_mut().unwrap().schedule_id = id);
}

// Insert into st_event_table if this is an event table.
if is_event {
let row = StEventTableRow { table_id };
self.insert_via_serialize_bsatn(ST_EVENT_TABLE_ID, &row)?;
}

// Create the indexes for the table.
for index in indices {
let col_set = ColSet::from(index.index_algorithm.columns());
Expand Down
11 changes: 9 additions & 2 deletions crates/datastore/src/locking_tx_datastore/state_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::system_tables::{
ConnectionIdViaU128, StColumnFields, StColumnRow, StConnectionCredentialsFields, StConnectionCredentialsRow,
StConstraintFields, StConstraintRow, StIndexFields, StIndexRow, StScheduledFields, StScheduledRow,
StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewFields, StViewParamFields, StViewRow,
SystemTable, ST_COLUMN_ID, ST_CONNECTION_CREDENTIALS_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SCHEDULED_ID,
ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID,
StEventTableFields, SystemTable, ST_COLUMN_ID, ST_CONNECTION_CREDENTIALS_ID, ST_CONSTRAINT_ID,
ST_EVENT_TABLE_ID, ST_INDEX_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID,
};
use anyhow::anyhow;
use core::ops::RangeBounds;
Expand Down Expand Up @@ -159,6 +159,12 @@ pub trait StateView {
.unwrap_or(None)
.transpose()?;

// Check if this table is an event table by looking up st_event_table.
let is_event = self
.iter_by_col_eq(ST_EVENT_TABLE_ID, StEventTableFields::TableId, value_eq)
.map(|mut iter| iter.next().is_some())
.unwrap_or(false);

Ok(TableSchema::new(
table_id,
table_name,
Expand All @@ -171,6 +177,7 @@ pub trait StateView {
table_access,
schedule,
table_primary_key,
is_event,
))
}

Expand Down
Loading
Loading