Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 165 additions & 1 deletion crates/acp-client/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use agent_client_protocol::{
SelectedPermissionOutcome, SessionNotification, SessionUpdate, TextContent,
};
use async_trait::async_trait;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::sync::Mutex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
Expand Down Expand Up @@ -96,6 +97,8 @@ pub struct AcpDriver {
is_remote: bool,
}

const REMOTE_ACP_MAX_PENDING_LINE_BYTES: usize = 256 * 1024;

impl AcpDriver {
/// Create a driver for the given provider ID (e.g. "goose", "claude").
///
Expand Down Expand Up @@ -177,7 +180,20 @@ impl AgentDriver for AcpDriver {
.ok_or_else(|| "Failed to get stdout".to_string())?;

let stdin_compat = stdin.compat_write();
let stdout_compat = stdout.compat();
let incoming_reader: Box<dyn tokio::io::AsyncRead + Unpin> = if self.is_remote {
let (normalized_stdout_writer, normalized_stdout_reader) = tokio::io::duplex(64 * 1024);
tokio::task::spawn_local(async move {
if let Err(error) =
normalize_remote_acp_stdout(stdout, normalized_stdout_writer).await
{
log::error!("remote ACP stdout normalization failed: {error}");
}
});
Box::new(normalized_stdout_reader)
} else {
Box::new(stdout)
};
let stdout_compat = incoming_reader.compat();

let is_resuming = agent_session_id.is_some();
let handler = Arc::new(AcpNotificationHandler::new(Arc::clone(writer), is_resuming));
Expand Down Expand Up @@ -219,6 +235,106 @@ impl AgentDriver for AcpDriver {
}
}

#[derive(Debug, PartialEq, Eq)]
enum RemoteLineOutcome {
Emit(String),
Pending,
Dropped,
}

fn sanitize_remote_acp_chunk(chunk: &str) -> String {
chunk
.chars()
.filter(|ch| *ch != '\0' && *ch != '\u{1e}')
.collect()
}

fn consume_remote_acp_line(pending: &mut String, raw_line: &str) -> RemoteLineOutcome {
let line = raw_line.trim_end_matches(['\r', '\n']);
if line.is_empty() {
return RemoteLineOutcome::Pending;
}

let chunk = sanitize_remote_acp_chunk(line);
if chunk.is_empty() {
return RemoteLineOutcome::Pending;
}

pending.push_str(&chunk);

match serde_json::from_str::<serde_json::Value>(pending) {
Ok(_) => RemoteLineOutcome::Emit(std::mem::take(pending)),
Err(error) if error.is_eof() => {
if pending.len() > REMOTE_ACP_MAX_PENDING_LINE_BYTES {
pending.clear();
RemoteLineOutcome::Dropped
} else {
RemoteLineOutcome::Pending
}
}
Err(_) => {
// Recovery path: pending may contain stale/corrupted bytes. If the
// current chunk is a standalone JSON payload, emit it and reset.
match serde_json::from_str::<serde_json::Value>(&chunk) {
Ok(_) => {
pending.clear();
RemoteLineOutcome::Emit(chunk)
}
Err(chunk_error) if chunk_error.is_eof() => {
pending.clear();
pending.push_str(&chunk);
if pending.len() > REMOTE_ACP_MAX_PENDING_LINE_BYTES {
pending.clear();
RemoteLineOutcome::Dropped
} else {
RemoteLineOutcome::Pending
}
}
Err(_) => {
pending.clear();
RemoteLineOutcome::Dropped
}
}
}
}
}

async fn normalize_remote_acp_stdout(
stdout: tokio::process::ChildStdout,
mut writer: tokio::io::DuplexStream,
) -> Result<(), std::io::Error> {
let mut reader = BufReader::new(stdout);
let mut raw_line = String::new();
let mut pending = String::new();

loop {
raw_line.clear();
let bytes_read = reader.read_line(&mut raw_line).await?;
if bytes_read == 0 {
break;
}

match consume_remote_acp_line(&mut pending, &raw_line) {
RemoteLineOutcome::Emit(line) => {
writer.write_all(line.as_bytes()).await?;
writer.write_all(b"\n").await?;
}
RemoteLineOutcome::Pending => {}
RemoteLineOutcome::Dropped => {
if !raw_line.trim().is_empty() {
log::warn!("Dropped malformed ACP proxy output line");
}
}
}
}

if !pending.is_empty() {
log::warn!("Dropped incomplete ACP proxy output at EOF");
}

writer.shutdown().await
}

// =============================================================================
// ACP notification handler
// =============================================================================
Expand Down Expand Up @@ -475,3 +591,51 @@ impl MessageWriter for BasicMessageWriter {
current.push_str(&format!("\n[Result: {}]\n", content));
}
}

#[cfg(test)]
mod tests {
use super::{consume_remote_acp_line, sanitize_remote_acp_chunk, RemoteLineOutcome};

#[test]
fn consumes_wrapped_json_line_across_multiple_chunks() {
let mut pending = String::new();
let first = r#"{"jsonrpc":"2.0","id":1,"result":{"text":"Bypass all permiss"#;
let second = r#"ion checks"}}"#;

assert_eq!(
consume_remote_acp_line(&mut pending, first),
RemoteLineOutcome::Pending
);

assert_eq!(
consume_remote_acp_line(&mut pending, second),
RemoteLineOutcome::Emit(format!("{first}{second}"))
);
}

#[test]
fn strips_record_separator_and_nul_bytes() {
let chunk = "\u{1e}{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":null}\0";
assert_eq!(
sanitize_remote_acp_chunk(chunk),
"{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":null}"
);
}

#[test]
fn drops_noise_and_recovers_with_next_valid_json_message() {
let mut pending = String::new();
assert_eq!(
consume_remote_acp_line(&mut pending, "this is not json"),
RemoteLineOutcome::Dropped
);

assert_eq!(
consume_remote_acp_line(
&mut pending,
"{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":null}"
),
RemoteLineOutcome::Emit("{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":null}".to_string())
);
}
}