Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[package]
name = "ddtrace"
version = "0.1.1"
version = "0.2.0"
authors = ["David Steiner <david_j_steiner@yahoo.co.nz", "Fergus Strangways-Dixon <fergusdixon101@gmail.com>"]
edition = "2021"
license = "MIT"
description = "utilities for integrating Datadog with tracing"
description = "Utilities for integrating Datadog with tracing"
readme = "README.md"
homepage = "https://github.com/Validus-Risk-Management/ddtrace"
repository = "https://github.com/Validus-Risk-Management/ddtrace"
Expand All @@ -13,19 +13,25 @@ categories = ["web-programming"]
exclude = [".pre-commit-config.yaml"]

[features]
axum = ["dep:axum", "dep:tokio", "dep:axum-tracing-opentelemetry"]
axum = ["dep:axum-tracing-opentelemetry"]
tonic = ["dep:tonic-tracing-opentelemetry"]
tracing_level_info = [
"axum-tracing-opentelemetry/tracing_level_info",
"tonic-tracing-opentelemetry/tracing_level_info",
]

[dependencies]
axum = { version = "^0.6.10", optional = true }
axum-tracing-opentelemetry = { version = "^0.11.0", optional = true }
chrono = "^0.4.24"
opentelemetry = { version = "^0.19.0", features = ["rt-tokio"] }
opentelemetry-datadog = "^0.7.0"
opentelemetry-otlp = { version = "^0.12.0" }
serde = { version = "^1.0.156", features = ["derive"] }
serde_json = "^1.0.95"
tokio = { version = "^1.26.0", features = ["signal"], optional = true }
tracing = "^0.1.37"
tracing-opentelemetry = "^0.19.0"
tracing-serde = "^0.1.3"
tracing-subscriber = { version = "^0.3.16", features = ["env-filter", "json"] }
axum-tracing-opentelemetry = { version = "0.29", optional = true }
chrono = "0.4"
opentelemetry = "0.30"
opentelemetry_sdk = "0.30"
opentelemetry-datadog = "0.18"
opentelemetry-otlp = { version = "0.30", features = ["grpc-tonic", "http-proto"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "2.0.12"
tonic-tracing-opentelemetry = { version = "0.29", optional = true }
tracing = "0.1"
tracing-opentelemetry = "0.31"
tracing-serde = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
38 changes: 21 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ and Datadog.
# Features

`ddtrace` has the following features:

1. tracing: utilities for building an OpenTelemetry tracer/layer that sends traces to the Datadog agent
2. log correlation: a log formatter that converts the trace ID and span ID to the Datadog native format and injects them into the `dd.trace_id` and `dd.span_id` fields
2. log correlation: a log formatter that converts the trace ID and span ID to the Datadog native format and injects them
into the `dd.trace_id` and `dd.span_id` fields
([more information](https://docs.datadoghq.com/tracing/other_telemetry/connect_logs_and_traces/opentelemetry/))
3. propagation: a utility function to set the Datadog propagator as the global propagator
4. axum (enabled via the `axum` feature): re-exposing the functionality of [axum-tracing-opentelemetry](https://github.com/davidB/axum-tracing-opentelemetry)
4. axum (enabled via the `axum` feature): re-exposing the functionality
of [axum-tracing-opentelemetry](https://github.com/davidB/axum-tracing-opentelemetry)

# A Complete Example

Expand All @@ -32,17 +35,18 @@ use std::net::SocketAddr;
use std::time::Duration;

use axum::{routing::get, Router};
use ddtrace::axum::opentelemetry_tracing_layer;
use ddtrace::axum::OtelAxumLayer;
use ddtrace::error::Error;
use ddtrace::formatter::DatadogFormatter;
use ddtrace::set_global_propagator;
use ddtrace::tracer::{build_layer, TraceResult};
use ddtrace::tracer::build_layer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[tokio::main]
async fn main() -> TraceResult<()> {
async fn main() -> Result<(), Error> {
let service_name = std::env::var("DD_SERVICE").unwrap_or("my-service".to_string());
let tracing_layer = build_layer(&service_name)?;
let (tracing_layer, _guard) = build_layer(service_name)?;
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()),
Expand All @@ -58,16 +62,13 @@ async fn main() -> TraceResult<()> {

let app = Router::new()
.route("/", get(root))
.layer(opentelemetry_tracing_layer())
.layer(OtelAxumLayer::default())
.route("/health", get(health));

let addr = SocketAddr::from(([0, 0, 0, 0], 3025));
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
tracing::info!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.with_graceful_shutdown(ddtrace::axum::shutdown_signal())
.await
.unwrap();

Ok(())
}
Expand Down Expand Up @@ -109,13 +110,14 @@ to set up the agent.
# Further Context and Rationale

## Exporting Traces

For traces, the official Datadog agent
[can ingest OTel trace data](https://docs.datadoghq.com/opentelemetry/)
with the correct environment variable settings. The traces can be sent
with the correct environment variable settings. The traces can be sent
via either HTTP or gRPC. More information on this can be found
[here](https://docs.datadoghq.com/opentelemetry/otlp_ingest_in_the_agent/?tab=docker).

OpenTelemetry has an official Rust crate with extensions for major
OpenTelemetry has an official Rust crate with extensions for major
formats/providers. This includes a Datadog exporter. We have found
this exporter to be less reliable than the standard OTel exporter
sending data to the OTel endpoint of the Datadog agent, though.
Expand All @@ -124,7 +126,8 @@ This crate builds on the OTel exporter.
## Propagation

Two commonly used propagation standards are `B3` (OpenZipkin's propagation style)
and Jaeger. OpenTelemetry [supports both](https://opentelemetry.io/docs/reference/specification/context/api-propagators/#propagators-distribution).
and Jaeger.
OpenTelemetry [supports both](https://opentelemetry.io/docs/reference/specification/context/api-propagators/#propagators-distribution).

Most Datadog SDK's support both `B3` and the Datadog native propagation style.
For example, the Python `ddtrace` library supports `B3` but it
Expand All @@ -134,8 +137,8 @@ For ease of integration with services written in other languages that use the of
we opted for sticking with Datadog-style propagation over `B3`. This is set via the
`set_global_propagator` function.


# Reqwest Propagation

The Python library takes care of propagation of the trace context automatically.
Unfortunately, we need to do this manually in Rust.

Expand All @@ -154,7 +157,7 @@ use reqwest_tracing::TracingMiddleware;
async fn main() {
set_global_propagator();
client = get_http_client();

// configure tracing, setup your app and inject the client
}

Expand All @@ -166,4 +169,5 @@ fn get_http_client() -> ClientWithMiddleware {
```

[crates-badge]: https://img.shields.io/crates/v/ddtrace.svg

[docs-badge]: https://docs.rs/ddtrace/badge.svg
8 changes: 4 additions & 4 deletions examples/axum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ version = "0.1.0"
edition = "2021"

[dependencies]
axum = "^0.6.12"
axum = "0.8.4"
ddtrace = { path = "../..", features = ["axum"] }
tokio = { version = "^1.26.0", features = ["macros", "rt-multi-thread"] }
tracing = "^0.1.37"
tracing-subscriber = { version = "^0.3.16", features = ["env-filter", "json"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
18 changes: 8 additions & 10 deletions examples/axum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ use std::net::SocketAddr;
use std::time::Duration;

use axum::{routing::get, Router};
use ddtrace::axum::opentelemetry_tracing_layer;
use ddtrace::axum::OtelAxumLayer;
use ddtrace::error::Error;
use ddtrace::formatter::DatadogFormatter;
use ddtrace::set_global_propagator;
use ddtrace::tracer::{build_layer, TraceResult};
use ddtrace::tracer::build_layer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[tokio::main]
async fn main() -> TraceResult<()> {
async fn main() -> Result<(), Error> {
let service_name = std::env::var("DD_SERVICE").unwrap_or("my-service".to_string());
let tracing_layer = build_layer(&service_name)?;
let (tracing_layer, _guard) = build_layer(service_name)?;
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()),
Expand All @@ -28,16 +29,13 @@ async fn main() -> TraceResult<()> {

let app = Router::new()
.route("/", get(root))
.layer(opentelemetry_tracing_layer())
.layer(OtelAxumLayer::default())
.route("/health", get(health));

let addr = SocketAddr::from(([0, 0, 0, 0], 3025));
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
tracing::info!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.with_graceful_shutdown(ddtrace::axum::shutdown_signal())
.await
.unwrap();

Ok(())
}
Expand Down
14 changes: 2 additions & 12 deletions src/axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,6 @@
//! This module re-exposes the middleware layers provided by the
//! [`axum-tracing-opentelemetry`] project.
//!
//! [`axum-tracing-opentelemetry`]: https://github.com/davidB/axum-tracing-opentelemetry
//! [`axum-tracing-opentelemetry`]: https://github.com/davidB/tracing-opentelemetry-instrumentation-sdk

pub use axum_tracing_opentelemetry::opentelemetry_tracing_layer;
pub use axum_tracing_opentelemetry::opentelemetry_tracing_layer_grpc;

pub async fn shutdown_signal() {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;

opentelemetry::global::shutdown_tracer_provider();
}
pub use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer};
5 changes: 5 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
ExporterBuildError(#[from] opentelemetry_otlp::ExporterBuildError),
}
9 changes: 3 additions & 6 deletions src/formatter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct TraceInfo {

impl From<TraceId> for DatadogId {
fn from(value: TraceId) -> Self {
let bytes = &value.to_bytes()[std::mem::size_of::<u64>()..std::mem::size_of::<u128>()];
let bytes = &value.to_bytes()[size_of::<u64>()..size_of::<u128>()];
Self(u64::from_be_bytes(bytes.try_into().unwrap()))
}
}
Expand Down Expand Up @@ -106,12 +106,9 @@ impl<'a> io::Write for WriteAdaptor<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let s =
std::str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
self.fmt_write.write_str(s).map_err(io::Error::other)?;

self.fmt_write
.write_str(s)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

Ok(s.as_bytes().len())
Ok(s.len())
}

fn flush(&mut self) -> io::Result<()> {
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@

#[cfg(feature = "axum")]
pub mod axum;
pub mod error;
pub mod formatter;
mod propagator;
#[cfg(feature = "tonic")]
pub mod tonic;
pub mod tracer;

pub use propagator::set_global_propagator;
9 changes: 9 additions & 0 deletions src/tonic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//! Tonic utilities.
//!
//! This module re-exposes the middleware layers provided by the
//! [`tonic-tracing-opentelemetry`] project.
//!
//! [`tonic-tracing-opentelemetry`]: https://github.com/davidB/tracing-opentelemetry-instrumentation-sdk

pub use tonic_tracing_opentelemetry::middleware::filters::reject_healthcheck;
pub use tonic_tracing_opentelemetry::middleware::server::{Filter, OtelGrpcLayer, OtelGrpcService};
60 changes: 36 additions & 24 deletions src/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,53 @@
//! to send traces to the Datadog agent in batches over gRPC.
//!
//! It also contains a convenience function to build a layer with the tracer.
use opentelemetry::sdk::trace::{RandomIdGenerator, Sampler, Tracer};
use opentelemetry::sdk::{trace, Resource};
pub use opentelemetry::trace::{TraceError, TraceResult};
use opentelemetry::KeyValue;
use crate::error::Error;
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracerProvider, Tracer};
use opentelemetry_sdk::Resource;
use std::time::Duration;
use tracing::Subscriber;
use tracing_opentelemetry::{OpenTelemetryLayer, PreSampledTracer};
use tracing_subscriber::registry::LookupSpan;

pub fn build_tracer(service_name: &str) -> TraceResult<Tracer> {
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_timeout(Duration::from_secs(3));
pub struct ProviderGuard {
tracer_provider: SdkTracerProvider,
}

impl Drop for ProviderGuard {
fn drop(&mut self) {
let _ = self.tracer_provider.force_flush();
let _ = self.tracer_provider.shutdown();
}
}

pub fn build_tracer_provider(service_name: String) -> Result<SdkTracerProvider, Error> {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_timeout(Duration::from_secs(3))
.build()?;
let resource = Resource::builder().with_service_name(service_name).build();

let provider = SdkTracerProvider::builder()
.with_sampler(Sampler::AlwaysOn)
.with_resource(resource)
.with_id_generator(RandomIdGenerator::default())
.with_batch_exporter(exporter)
.build();

opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
trace::config()
.with_sampler(Sampler::AlwaysOn)
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
service_name.to_string(),
)]))
.with_id_generator(RandomIdGenerator::default()),
)
.with_exporter(exporter)
.install_batch(opentelemetry::runtime::Tokio)
Ok(provider)
}

pub fn build_layer<S>(service_name: &str) -> TraceResult<OpenTelemetryLayer<S, Tracer>>
pub fn build_layer<S>(
service_name: String,
) -> Result<(OpenTelemetryLayer<S, Tracer>, ProviderGuard), Error>
where
Tracer: opentelemetry::trace::Tracer + PreSampledTracer + 'static,
S: Subscriber + for<'span> LookupSpan<'span>,
{
let tracer = build_tracer(service_name)?;
Ok(tracing_opentelemetry::layer().with_tracer(tracer))
let tracer_provider = build_tracer_provider(service_name)?;
let layer = tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer(""));
let guard = ProviderGuard { tracer_provider };
Ok((layer, guard))
}