From d1b5b00467d8e281b17e8f893672e51c2566d4f5 Mon Sep 17 00:00:00 2001 From: Wes Date: Mon, 23 Feb 2026 13:23:41 -0700 Subject: [PATCH] fix(acp-client): normalize remote ACP proxy stdout --- crates/acp-client/src/driver.rs | 166 +++++++++++++++++++++++++++++++- 1 file changed, 165 insertions(+), 1 deletion(-) diff --git a/crates/acp-client/src/driver.rs b/crates/acp-client/src/driver.rs index dbd5b5fe..5c4d7696 100644 --- a/crates/acp-client/src/driver.rs +++ b/crates/acp-client/src/driver.rs @@ -20,6 +20,7 @@ use agent_client_protocol::{ SelectedPermissionOutcome, SessionNotification, SessionUpdate, TextContent, }; use async_trait::async_trait; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::Command; use tokio::sync::Mutex; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; @@ -96,6 +97,8 @@ pub struct AcpDriver { is_remote: bool, } +const REMOTE_ACP_MAX_PENDING_LINE_BYTES: usize = 256 * 1024; + impl AcpDriver { /// Create a driver for the given provider ID (e.g. "goose", "claude"). /// @@ -177,7 +180,20 @@ impl AgentDriver for AcpDriver { .ok_or_else(|| "Failed to get stdout".to_string())?; let stdin_compat = stdin.compat_write(); - let stdout_compat = stdout.compat(); + let incoming_reader: Box = if self.is_remote { + let (normalized_stdout_writer, normalized_stdout_reader) = tokio::io::duplex(64 * 1024); + tokio::task::spawn_local(async move { + if let Err(error) = + normalize_remote_acp_stdout(stdout, normalized_stdout_writer).await + { + log::error!("remote ACP stdout normalization failed: {error}"); + } + }); + Box::new(normalized_stdout_reader) + } else { + Box::new(stdout) + }; + let stdout_compat = incoming_reader.compat(); let is_resuming = agent_session_id.is_some(); let handler = Arc::new(AcpNotificationHandler::new(Arc::clone(writer), is_resuming)); @@ -219,6 +235,106 @@ impl AgentDriver for AcpDriver { } } +#[derive(Debug, PartialEq, Eq)] +enum RemoteLineOutcome { + Emit(String), + Pending, + Dropped, +} + +fn sanitize_remote_acp_chunk(chunk: &str) -> String { + chunk + .chars() + .filter(|ch| *ch != '\0' && *ch != '\u{1e}') + .collect() +} + +fn consume_remote_acp_line(pending: &mut String, raw_line: &str) -> RemoteLineOutcome { + let line = raw_line.trim_end_matches(['\r', '\n']); + if line.is_empty() { + return RemoteLineOutcome::Pending; + } + + let chunk = sanitize_remote_acp_chunk(line); + if chunk.is_empty() { + return RemoteLineOutcome::Pending; + } + + pending.push_str(&chunk); + + match serde_json::from_str::(pending) { + Ok(_) => RemoteLineOutcome::Emit(std::mem::take(pending)), + Err(error) if error.is_eof() => { + if pending.len() > REMOTE_ACP_MAX_PENDING_LINE_BYTES { + pending.clear(); + RemoteLineOutcome::Dropped + } else { + RemoteLineOutcome::Pending + } + } + Err(_) => { + // Recovery path: pending may contain stale/corrupted bytes. If the + // current chunk is a standalone JSON payload, emit it and reset. + match serde_json::from_str::(&chunk) { + Ok(_) => { + pending.clear(); + RemoteLineOutcome::Emit(chunk) + } + Err(chunk_error) if chunk_error.is_eof() => { + pending.clear(); + pending.push_str(&chunk); + if pending.len() > REMOTE_ACP_MAX_PENDING_LINE_BYTES { + pending.clear(); + RemoteLineOutcome::Dropped + } else { + RemoteLineOutcome::Pending + } + } + Err(_) => { + pending.clear(); + RemoteLineOutcome::Dropped + } + } + } + } +} + +async fn normalize_remote_acp_stdout( + stdout: tokio::process::ChildStdout, + mut writer: tokio::io::DuplexStream, +) -> Result<(), std::io::Error> { + let mut reader = BufReader::new(stdout); + let mut raw_line = String::new(); + let mut pending = String::new(); + + loop { + raw_line.clear(); + let bytes_read = reader.read_line(&mut raw_line).await?; + if bytes_read == 0 { + break; + } + + match consume_remote_acp_line(&mut pending, &raw_line) { + RemoteLineOutcome::Emit(line) => { + writer.write_all(line.as_bytes()).await?; + writer.write_all(b"\n").await?; + } + RemoteLineOutcome::Pending => {} + RemoteLineOutcome::Dropped => { + if !raw_line.trim().is_empty() { + log::warn!("Dropped malformed ACP proxy output line"); + } + } + } + } + + if !pending.is_empty() { + log::warn!("Dropped incomplete ACP proxy output at EOF"); + } + + writer.shutdown().await +} + // ============================================================================= // ACP notification handler // ============================================================================= @@ -475,3 +591,51 @@ impl MessageWriter for BasicMessageWriter { current.push_str(&format!("\n[Result: {}]\n", content)); } } + +#[cfg(test)] +mod tests { + use super::{consume_remote_acp_line, sanitize_remote_acp_chunk, RemoteLineOutcome}; + + #[test] + fn consumes_wrapped_json_line_across_multiple_chunks() { + let mut pending = String::new(); + let first = r#"{"jsonrpc":"2.0","id":1,"result":{"text":"Bypass all permiss"#; + let second = r#"ion checks"}}"#; + + assert_eq!( + consume_remote_acp_line(&mut pending, first), + RemoteLineOutcome::Pending + ); + + assert_eq!( + consume_remote_acp_line(&mut pending, second), + RemoteLineOutcome::Emit(format!("{first}{second}")) + ); + } + + #[test] + fn strips_record_separator_and_nul_bytes() { + let chunk = "\u{1e}{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":null}\0"; + assert_eq!( + sanitize_remote_acp_chunk(chunk), + "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":null}" + ); + } + + #[test] + fn drops_noise_and_recovers_with_next_valid_json_message() { + let mut pending = String::new(); + assert_eq!( + consume_remote_acp_line(&mut pending, "this is not json"), + RemoteLineOutcome::Dropped + ); + + assert_eq!( + consume_remote_acp_line( + &mut pending, + "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":null}" + ), + RemoteLineOutcome::Emit("{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":null}".to_string()) + ); + } +}