Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cdf9fb7
docs: update worker events for protobuf
mitchhs12 Jan 29, 2026
ed78014
WIP: feat(worker): add Kafka event streaming for sync progress
mitchhs12 Jan 29, 2026
06ad093
feat(worker): emit lifecycle events per-table
mitchhs12 Jan 30, 2026
2bfe5ba
feat(kafka): make partition count configurable
mitchhs12 Jan 30, 2026
5981c96
test(tests): fix E2E test to expect per-table events
mitchhs12 Jan 30, 2026
0d46881
chore(monitoring): add kafka_client to AMP_CRATES list
mitchhs12 Jan 30, 2026
9cc9587
chore(deps): update Cargo.lock
mitchhs12 Jan 30, 2026
d518bed
refactor(worker): addresses PR feedback
mitchhs12 Jan 30, 2026
ee9cc53
fix(kafka): add error_source to error logs
mitchhs12 Feb 2, 2026
86315a1
refactor(worker): uses proto types instead for event streaming
mitchhs12 Feb 2, 2026
4797bb1
refactor(kafka): consolidated kafka config into amp-config
mitchhs12 Feb 2, 2026
4b91a97
docs(kafka): add status field and preqres section
mitchhs12 Feb 2, 2026
bccfb46
refactor(worker): rename ProgressCallback to ProgressReporter
mitchhs12 Feb 3, 2026
dad6cf8
refactor(worker): change progress events from percentage-based to tim…
mitchhs12 Feb 4, 2026
1ee1283
fix(dump): suppress clippy too_many_arguments warning
mitchhs12 Feb 4, 2026
7990dfc
feat(worker): add SASL and TLS support for Kafka
mitchhs12 Feb 4, 2026
d1eb876
docs(worker-events): align feature doc with implementation
mitchhs12 Feb 4, 2026
7cae6d2
add missing amp-config dep
mitchhs12 Feb 4, 2026
d1c794f
docs: fix config key and proto comment accuracy
mitchhs12 Feb 4, 2026
92d3e01
fix(worker): clippy
mitchhs12 Feb 4, 2026
05297fe
refactor(worker): address PR feedback
mitchhs12 Feb 4, 2026
44e70c1
fix(worker): CI formatting
mitchhs12 Feb 4, 2026
61dc963
refactor(worker): address PR feedback on patterns and conventions
mitchhs12 Feb 5, 2026
2d2a59f
chore: trigger CI rerun
mitchhs12 Feb 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 92 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ reqwest = { version = "0.13", default-features = false, features = [
"rustls",
"stream",
] }
rustls = { version = "0.23", features = ["ring"] }
schemars = "1.0.4"
semver = { version = "1.0.18", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down Expand Up @@ -142,6 +143,7 @@ url = { version = "2.5.0", features = ["serde"] }
uuid = { version = "1.11.0", features = ["v7"] }
nix = { version = "0.30.1", default-features = false, features = ["signal"] }
percent-encoding = "2.3"
webpki-roots = "0.26"
which = "8.0.0"
zstd = "0.13.3"

Expand Down
1 change: 1 addition & 0 deletions crates/bin/ampd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ amp-providers-registry = { path = "../../core/providers-registry" }
metadata-db = { path = "../../core/metadata-db" }
metadata-db-postgres = { path = "../../services/metadata-db-postgres" }
monitoring = { path = "../../core/monitoring" }
rustls.workspace = true
server = { path = "../../services/server" }
snmalloc-rs = { version = "0.3.4", optional = true }
thiserror.workspace = true
Expand Down
6 changes: 6 additions & 0 deletions crates/bin/ampd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ async fn main() {
}

async fn main_inner() -> Result<(), Error> {
// Install rustls crypto provider before any TLS operations.
// Required when both ring and aws-lc-rs are available in the dependency tree.
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install default crypto provider");

// Initialize tokio-console subscriber if feature is enabled
#[cfg(feature = "console-subscriber")]
{
Expand Down
1 change: 1 addition & 0 deletions crates/bin/ampd/src/solo_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ pub async fn run(
dataset_store,
meter,
worker_id,
None, // Use config-based event emitter
)
.await
.map_err(Error::WorkerInit)?;
Expand Down
2 changes: 2 additions & 0 deletions crates/bin/ampd/src/worker_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub async fn run(
dataset_store,
meter,
node_id,
None, // Use config-based event emitter
)
.await
.map_err(Error::Init)?;
Expand Down Expand Up @@ -126,5 +127,6 @@ pub(crate) fn config_from_common(config: &Config) -> worker::config::Config {
query_max_mem_mb: config.query_max_mem_mb,
spill_location: config.spill_location.clone(),
parquet: config.parquet.clone(),
events_config: config.worker_events.clone(),
}
}
8 changes: 8 additions & 0 deletions crates/config/src/config_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,17 @@ pub struct ConfigFile {
// Writer/Parquet configuration
#[serde(default)]
pub writer: ParquetConfig,

// Worker event streaming configuration
#[serde(default)]
pub worker_events: crate::WorkerEventsConfig,
}

impl ConfigFile {
/// Returns the worker events configuration.
pub fn worker_events(&self) -> &crate::WorkerEventsConfig {
&self.worker_events
}
/// Returns the data directory path where Parquet files are stored.
pub fn data_dir(&self) -> &str {
&self.data_dir
Expand Down
Loading
Loading