Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ebed4c3
Split WS into v1 and v2; make various changes in v2.
gefjon Jan 13, 2026
4fc9dc1
Merge remote-tracking branch 'origin/master' into phoebe/websocket-v2
gefjon Jan 14, 2026
7a1258a
Fix some copy-paste typos
gefjon Jan 14, 2026
05478c8
Nix `NoSuccessNotify`, add doc comments
gefjon Jan 15, 2026
4a4b10b
Update references to v1 WS format
gefjon Jan 15, 2026
dd020e6
Add doc comment to `TableUpdateRows`
gefjon Jan 21, 2026
e4e46c0
Fix compilation issue
jsdt Jan 29, 2026
f28f81e
Move compression into common and add new protocol option for v2
jsdt Jan 29, 2026
b621b2a
WIP on new subscription update format
jsdt Feb 4, 2026
8ca7795
WIP on subscription update stuff, and add UnsubscribeFlags
jsdt Feb 4, 2026
89a09c2
Move BsatnRowList to common and use it in v2
jsdt Feb 4, 2026
d9e9bfe
Update module subscription manager to group updates by client/query.
jsdt Feb 5, 2026
9437a55
Merge origin/master into jsdt/ws-v2
cloutiertyler Feb 5, 2026
0906a38
Derive Ord for TableName and ReducerName
cloutiertyler Feb 5, 2026
6a68b54
Fix Rust SDK import of RowListLen from common module
cloutiertyler Feb 6, 2026
aaa8b97
fix a issue from having missing files when merging
jsdt Feb 6, 2026
34328a8
Plumb reducer return value
jsdt Feb 6, 2026
a21b07d
finish send worker for v2
jsdt Feb 7, 2026
39dac16
send reducer result even if the client isn't getting something
jsdt Feb 7, 2026
bc1ebdd
More subscription manager
jsdt Feb 7, 2026
760379e
Cleanup
jsdt Feb 8, 2026
d9a8b0c
Merge branch 'master' into jsdt/ws-v2
jsdt Feb 8, 2026
b53b97c
Fix reducer return issue
jsdt Feb 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 19 additions & 16 deletions crates/cli/src/subcommands/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::{Sink, SinkExt, TryStream, TryStreamExt};
use http::header;
use reqwest::Url;
use serde_json::Value;
use spacetimedb_client_api_messages::websocket::{self as ws, JsonFormat};
use spacetimedb_client_api_messages::websocket::v1 as ws_v1;
use spacetimedb_data_structures::map::HashMap;
use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9;
use spacetimedb_lib::de::serde::{DeserializeWrapper, SeedWrapper};
Expand Down Expand Up @@ -71,16 +71,16 @@ pub fn cli() -> clap::Command {
.arg(common_args::server().help("The nickname, host name or URL of the server hosting the database"))
}

fn parse_msg_json(msg: &WsMessage) -> Option<ws::ServerMessage<JsonFormat>> {
fn parse_msg_json(msg: &WsMessage) -> Option<ws_v1::ServerMessage<ws_v1::JsonFormat>> {
let WsMessage::Text(msg) = msg else { return None };
serde_json::from_str::<DeserializeWrapper<ws::ServerMessage<JsonFormat>>>(msg)
serde_json::from_str::<DeserializeWrapper<ws_v1::ServerMessage<ws_v1::JsonFormat>>>(msg)
.inspect_err(|e| eprintln!("couldn't parse message from server: {e}"))
.map(|wrapper| wrapper.0)
.ok()
}

fn reformat_update<'a>(
msg: &'a ws::DatabaseUpdate<JsonFormat>,
msg: &'a ws_v1::DatabaseUpdate<ws_v1::JsonFormat>,
schema: &RawModuleDefV9,
) -> anyhow::Result<HashMap<&'a str, SubscriptionTable>> {
msg.tables
Expand Down Expand Up @@ -152,7 +152,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
let mut req = url.into_client_request()?;
req.headers_mut().insert(
header::SEC_WEBSOCKET_PROTOCOL,
http::HeaderValue::from_static(ws::TEXT_PROTOCOL),
http::HeaderValue::from_static(ws_v1::TEXT_PROTOCOL),
);
// Add the authorization header, if any.
if let Some(auth_header) = api.con.auth_header.to_header() {
Expand Down Expand Up @@ -241,8 +241,8 @@ async fn subscribe<S>(ws: &mut S, query_strings: Box<[Box<str>]>) -> Result<(),
where
S: Sink<WsMessage, Error = WsError> + Unpin,
{
let msg = serde_json::to_string(&SerializeWrapper::new(ws::ClientMessage::<()>::Subscribe(
ws::Subscribe {
let msg = serde_json::to_string(&SerializeWrapper::new(ws_v1::ClientMessage::<()>::Subscribe(
ws_v1::Subscribe {
query_strings,
request_id: 0,
},
Expand All @@ -262,22 +262,22 @@ where
while let Some(msg) = ws.try_next().await.map_err(|source| Error::Websocket { source })? {
let Some(msg) = parse_msg_json(&msg) else { continue };
match msg {
ws::ServerMessage::InitialSubscription(sub) => {
ws_v1::ServerMessage::InitialSubscription(sub) => {
if let Some(module_def) = module_def {
let output = format_output_json(&sub.database_update, module_def)?;
tokio::io::stdout().write_all(output.as_bytes()).await?
}
break;
}
ws::ServerMessage::TransactionUpdate(ws::TransactionUpdate { status, .. }) => {
ws_v1::ServerMessage::TransactionUpdate(ws_v1::TransactionUpdate { status, .. }) => {
return Err(match status {
ws::UpdateStatus::Failed(msg) => Error::TransactionFailure { reason: msg },
ws_v1::UpdateStatus::Failed(msg) => Error::TransactionFailure { reason: msg },
_ => Error::Protocol {
details: RECV_TX_UPDATE,
},
})
}
ws::ServerMessage::TransactionUpdateLight(ws::TransactionUpdateLight { .. }) => {
ws_v1::ServerMessage::TransactionUpdateLight(ws_v1::TransactionUpdateLight { .. }) => {
return Err(Error::Protocol {
details: RECV_TX_UPDATE,
})
Expand Down Expand Up @@ -310,14 +310,14 @@ where

let Some(msg) = parse_msg_json(&msg) else { continue };
match msg {
ws::ServerMessage::InitialSubscription(_) => {
ws_v1::ServerMessage::InitialSubscription(_) => {
return Err(Error::Protocol {
details: "received a second initial subscription update",
})
}
ws::ServerMessage::TransactionUpdateLight(ws::TransactionUpdateLight { update, .. })
| ws::ServerMessage::TransactionUpdate(ws::TransactionUpdate {
status: ws::UpdateStatus::Committed(update),
ws_v1::ServerMessage::TransactionUpdateLight(ws_v1::TransactionUpdateLight { update, .. })
| ws_v1::ServerMessage::TransactionUpdate(ws_v1::TransactionUpdate {
status: ws_v1::UpdateStatus::Committed(update),
..
}) => {
let output = format_output_json(&update, module_def)?;
Expand All @@ -329,7 +329,10 @@ where
}
}

fn format_output_json(msg: &ws::DatabaseUpdate<JsonFormat>, schema: &RawModuleDefV9) -> Result<String, Error> {
fn format_output_json(
msg: &ws_v1::DatabaseUpdate<ws_v1::JsonFormat>,
schema: &RawModuleDefV9,
) -> Result<String, Error> {
let formatted = reformat_update(msg, schema).map_err(|source| Error::Reformat { source })?;
let output = serde_json::to_string(&formatted)? + "\n";

Expand Down
2 changes: 1 addition & 1 deletion crates/client-api-messages/examples/get_ws_schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use spacetimedb_client_api_messages::websocket::{BsatnFormat, ClientMessage, ServerMessage};
use spacetimedb_client_api_messages::websocket::v1::{BsatnFormat, ClientMessage, ServerMessage};
use spacetimedb_lib::ser::serde::SerializeWrapper;
use spacetimedb_lib::{RawModuleDef, RawModuleDefV8};

Expand Down
Loading
Loading