diff --git a/src/client/conn.rs b/src/client/conn.rs new file mode 100644 index 00000000..61ffe63b --- /dev/null +++ b/src/client/conn.rs @@ -0,0 +1,401 @@ +//! Tower layers and services for HTTP/1 and HTTP/2 client connections. +//! +//! This module provides Tower-compatible layers that wrap Hyper's low-level +//! HTTP client connection types, making them easier to compose with other +//! middleware and connection pooling strategies. + +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use http::{Request, Response}; +use tower_service::Service; + +use crate::common::future::poll_fn; + +type BoxError = Box; + +/// A Tower [`Layer`](tower_layer::Layer) for creating HTTP/1 client connections. +/// +/// This layer wraps a connection service (typically a TCP or TLS connector) and +/// performs the HTTP/1 handshake, producing an [`Http1ClientService`] that can +/// send requests. +/// +/// Use [`http1()`] to create a layer with default settings, or construct from +/// a [`hyper::client::conn::http1::Builder`] for custom configuration. +/// +/// # Example +/// +/// ```ignore +/// use hyper_util::client::conn::http1; +/// use hyper::{client::connect::HttpConnector, body::Bytes}; +/// use tower:: ServiceBuilder; +/// use http_body_util::Empty; +/// +/// let connector = HttpConnector::new(); +/// let layer: Http1Layer> = http1(); +/// let client = ServiceBuilder::new() +/// .layer(layer) +/// .service(connector); +/// ``` +#[cfg(feature = "http1")] +pub struct Http1Layer { + builder: hyper::client::conn::http1::Builder, + _body: PhantomData, +} + +/// Creates an [`Http1Layer`] with default HTTP/1 settings. +/// +/// For custom settings, construct an [`Http1Layer`] from a +/// [`hyper::client::conn::http1::Builder`] using `.into()`. +#[cfg(feature = "http1")] +pub fn http1() -> Http1Layer { + Http1Layer { + builder: hyper::client::conn::http1::Builder::new(), + _body: PhantomData, + } +} + +#[cfg(feature = "http1")] +impl tower_layer::Layer for Http1Layer { + type Service = Http1Connect; + fn layer(&self, inner: M) -> Self::Service { + Http1Connect { + inner, + builder: self.builder.clone(), + _body: self._body, + } + } +} + +#[cfg(feature = "http1")] +impl Clone for Http1Layer { + fn clone(&self) -> Self { + Self { + builder: self.builder.clone(), + _body: self._body.clone(), + } + } +} + +#[cfg(feature = "http1")] +impl From for Http1Layer { + fn from(builder: hyper::client::conn::http1::Builder) -> Self { + Self { + builder, + _body: PhantomData, + } + } +} + +/// A Tower [`Service`] that establishes HTTP/1 connections. +/// +/// This service wraps an underlying connection service (e.g., TCP or TLS) and +/// performs the HTTP/1 handshake when called. The resulting service can be used +/// to send HTTP requests over the established connection. +#[cfg(feature = "http1")] +pub struct Http1Connect { + inner: M, + builder: hyper::client::conn::http1::Builder, + _body: PhantomData, +} + +#[cfg(feature = "http1")] +impl Service for Http1Connect +where + M: Service, + M::Future: Send + 'static, + M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static, + M::Error: Into, + B: hyper::body::Body + Send + 'static, + B::Data: Send + 'static, + B::Error: Into>, +{ + type Response = Http1ClientService; + type Error = BoxError; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, dst: Dst) -> Self::Future { + let fut = self.inner.call(dst); + let builder = self.builder.clone(); + Box::pin(async move { + let io = fut.await.map_err(Into::into)?; + let (mut tx, conn) = builder.handshake(io).await?; + //todo: pass in Executor + tokio::spawn(async move { + if let Err(e) = conn.await { + eprintln!("connection error: {:?}", e); + } + }); + // todo: wait for ready? or factor out to other middleware? + poll_fn(|cx| tx.poll_ready(cx)).await?; + + Ok(Http1ClientService::new(tx)) + }) + } +} + +#[cfg(feature = "http1")] +impl Clone for Http1Connect { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + builder: self.builder.clone(), + _body: self._body.clone(), + } + } +} + +/// A Tower [`Layer`](tower_layer::Layer) for creating HTTP/2 client connections. +/// +/// This layer wraps a connection service (typically a TCP or TLS connector) and +/// performs the HTTP/2 handshake, producing an [`Http2ClientService`] that can +/// send requests. +/// +/// Use [`http2()`] to create a layer with a specific executor, or construct from +/// a [`hyper::client::conn::http2::Builder`] for custom configuration. +/// +/// # Example +/// +/// ```ignore +/// use hyper_util::client::conn::http2; +/// use hyper::{client::connect::HttpConnector, body::Bytes}; +/// use tower:: ServiceBuilder; +/// use http_body_util::Empty; +/// +/// let connector = HttpConnector::new(); +/// let layer: Http2Layer> = http2(); +/// let client = ServiceBuilder::new() +/// .layer(layer) +/// .service(connector); +/// ``` +#[cfg(feature = "http2")] +pub struct Http2Layer { + builder: hyper::client::conn::http2::Builder, + _body: PhantomData, +} + +/// Creates an [`Http2Layer`] with default HTTP/1 settings. +/// +/// For custom settings, construct an [`Http2Layer`] from a +/// [`hyper::client::conn::http2::Builder`] using `.into()`. +#[cfg(feature = "http2")] +pub fn http2(executor: E) -> Http2Layer +where + E: Clone, +{ + Http2Layer { + builder: hyper::client::conn::http2::Builder::new(executor), + _body: PhantomData, + } +} + +#[cfg(feature = "http2")] +impl tower_layer::Layer for Http2Layer +where + E: Clone, +{ + type Service = Http2Connect; + fn layer(&self, inner: M) -> Self::Service { + Http2Connect { + inner, + builder: hyper::client::conn::http2::Builder::new(crate::rt::TokioExecutor::new()), + _body: self._body, + } + } +} + +#[cfg(feature = "http2")] +impl Clone for Http2Layer { + fn clone(&self) -> Self { + Self { + builder: self.builder.clone(), + _body: self._body.clone(), + } + } +} + +#[cfg(feature = "http2")] +impl From> for Http2Layer { + fn from(builder: hyper::client::conn::http2::Builder) -> Self { + Self { + builder, + _body: PhantomData, + } + } +} + +/// A Tower [`Service`] that establishes HTTP/2 connections. +/// +/// This service wraps an underlying connection service (e.g., TCP or TLS) and +/// performs the HTTP/2 handshake when called. The resulting service can be used +/// to send HTTP requests over the established connection. +#[cfg(feature = "http2")] +#[derive(Debug)] +pub struct Http2Connect { + inner: M, + builder: hyper::client::conn::http2::Builder, + _body: PhantomData, +} + +#[cfg(feature = "http2")] +impl Service for Http2Connect +where + M: Service, + M::Future: Send + 'static, + M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static, + M::Error: Into, + B: hyper::body::Body + Unpin + Send + 'static, + B::Data: Send + 'static, + B::Error: Into, + E: hyper::rt::bounds::Http2ClientConnExec + Unpin + Clone + Send + 'static, +{ + type Response = Http2ClientService; + type Error = BoxError; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, dst: Dst) -> Self::Future { + let fut = self.inner.call(dst); + let builder = self.builder.clone(); + Box::pin(async move { + let io = fut.await.map_err(Into::into)?; + let (mut tx, conn) = builder.handshake(io).await?; + tokio::spawn(async move { + if let Err(e) = conn.await { + eprintln!("connection error: {:?}", e); + } + }); + + // todo: wait for ready? or factor out to other middleware? + poll_fn(|cx| tx.poll_ready(cx)).await?; + Ok(Http2ClientService::new(tx)) + }) + } +} + +#[cfg(feature = "http2")] +impl Clone for Http2Connect { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + builder: self.builder.clone(), + _body: self._body.clone(), + } + } +} + +/// A Tower [`Service`] that sends HTTP/1 requests over an established connection. +/// +/// This is a thin wrapper around [`hyper::client::conn::http1::SendRequest`] that implements +/// the Tower `Service` trait, making it composable with other Tower middleware. +/// +/// The service maintains a single HTTP/1 connection and can be used to send multiple +/// sequential requests. For concurrent requests or connection pooling, wrap this service +/// with appropriate middleware. +#[cfg(feature = "http1")] +#[derive(Debug)] +pub struct Http1ClientService { + tx: hyper::client::conn::http1::SendRequest, +} + +#[cfg(feature = "http1")] +impl Http1ClientService { + /// Constructs a new HTTP/1 client service from a Hyper `SendRequest`. + /// + /// Typically you won't call this directly; instead, use [`Http1Connect`] to + /// establish connections and produce this service. + pub fn new(tx: hyper::client::conn::http1::SendRequest) -> Self { + Self { tx } + } + + /// Checks if the connection side has been closed. + pub fn is_closed(&self) -> bool { + self.tx.is_closed() + } +} + +#[cfg(feature = "http1")] +impl Service> for Http1ClientService +where + B: hyper::body::Body + Send + 'static, +{ + type Response = Response; + type Error = hyper::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.tx.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let fut = self.tx.send_request(req); + Box::pin(fut) + } +} + +/// A Tower [`Service`] that sends HTTP/2 requests over an established connection. +/// +/// This is a thin wrapper around [`hyper::client::conn::http2::SendRequest`] that implements +/// the Tower `Service` trait, making it composable with other Tower middleware. +/// +/// The service maintains a single HTTP/2 connection and supports multiplexing multiple +/// concurrent requests over that connection. The service can be cloned to send requests +/// concurrently, or used with the [`Singleton`](crate::client::pool::singleton::Singleton) pool service. +#[cfg(feature = "http2")] +#[derive(Debug)] +pub struct Http2ClientService { + tx: hyper::client::conn::http2::SendRequest, +} + +#[cfg(feature = "http2")] +impl Http2ClientService { + /// Constructs a new HTTP/2 client service from a Hyper `SendRequest`. + /// + /// Typically you won't call this directly; instead, use [`Http2Connect`] to + /// establish connections and produce this service. + pub fn new(tx: hyper::client::conn::http2::SendRequest) -> Self { + Self { tx } + } + + /// Checks if the connection side has been closed. + pub fn is_closed(&self) -> bool { + self.tx.is_closed() + } +} + +#[cfg(feature = "http2")] +impl Service> for Http2ClientService +where + B: hyper::body::Body + Send + 'static, +{ + type Response = Response; + type Error = hyper::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.tx.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let fut = self.tx.send_request(req); + Box::pin(fut) + } +} + +#[cfg(feature = "http2")] +impl Clone for Http2ClientService { + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + } + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index 268cadf0..290db246 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,5 +1,8 @@ //! HTTP client utilities +#[cfg(any(feature = "http1", feature = "http2"))] +pub mod conn; + /// Legacy implementations of `connect` module and `Client` #[cfg(feature = "client-legacy")] pub mod legacy; diff --git a/src/client/pool/map.rs b/src/client/pool/map.rs index 83b56480..8ff4179d 100644 --- a/src/client/pool/map.rs +++ b/src/client/pool/map.rs @@ -48,7 +48,7 @@ where // impl Map impl Map { - /// Create a [`Builder`] to configure a new `Map`. + /// Create a `Builder` to configure a new `Map`. pub fn builder() -> builder::Builder { builder::Builder::new()