scuffle_bootstrap_telemetry/
lib.rs1#![cfg_attr(docsrs, feature(doc_cfg))]
110#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
111
112use anyhow::Context;
113use bytes::Bytes;
114#[cfg(feature = "opentelemetry-logs")]
115#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry-logs")))]
116pub use opentelemetry_appender_tracing;
117#[cfg(feature = "opentelemetry")]
118#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
119pub use opentelemetry_sdk;
120#[cfg(feature = "prometheus")]
121#[cfg_attr(docsrs, doc(cfg(feature = "prometheus")))]
122pub use prometheus_client;
123use scuffle_bootstrap::global::Global;
124use scuffle_bootstrap::service::Service;
125#[cfg(feature = "opentelemetry-traces")]
126pub use tracing_opentelemetry;
127
128#[cfg(feature = "opentelemetry")]
129#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
130pub mod opentelemetry;
131
132pub struct TelemetrySvc;
180
181pub trait TelemetryConfig: Global {
183 fn enabled(&self) -> bool {
185 true
186 }
187
188 fn bind_address(&self) -> Option<std::net::SocketAddr> {
190 None
191 }
192
193 fn http_server_name(&self) -> &str {
195 "scuffle-bootstrap-telemetry"
196 }
197
198 fn health_check(&self) -> impl std::future::Future<Output = Result<(), anyhow::Error>> + Send {
202 std::future::ready(Ok(()))
203 }
204
205 #[cfg(feature = "prometheus")]
212 #[cfg_attr(docsrs, doc(cfg(feature = "prometheus")))]
213 fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
214 None
215 }
216
217 #[cfg(feature = "opentelemetry")]
224 #[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
225 fn opentelemetry(&self) -> Option<&opentelemetry::OpenTelemetry> {
226 None
227 }
228}
229
230impl<Global: TelemetryConfig> Service<Global> for TelemetrySvc {
231 async fn enabled(&self, global: &std::sync::Arc<Global>) -> anyhow::Result<bool> {
232 Ok(global.enabled())
233 }
234
235 async fn run(self, global: std::sync::Arc<Global>, ctx: scuffle_context::Context) -> anyhow::Result<()> {
236 if let Some(bind_addr) = global.bind_address() {
237 let global = global.clone();
238
239 let service = scuffle_http::service::fn_http_service(move |req| {
240 let global = global.clone();
241 async move {
242 match req.uri().path() {
243 "/health" => health_check(&global, req).await,
244 #[cfg(feature = "prometheus")]
245 "/metrics" => metrics(&global, req).await,
246 #[cfg(feature = "pprof")]
247 "/pprof/cpu" => pprof(&global, req).await,
248 #[cfg(feature = "opentelemetry")]
249 "/opentelemetry/flush" => opentelemetry_flush(&global).await,
250 _ => Ok(http::Response::builder()
251 .status(http::StatusCode::NOT_FOUND)
252 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?),
253 }
254 }
255 });
256
257 scuffle_http::HttpServer::builder()
258 .bind(bind_addr)
259 .ctx(ctx)
260 .service_factory(scuffle_http::service::service_clone_factory(service))
261 .build()
262 .run()
263 .await
264 .context("server run")?;
265 } else {
266 ctx.done().await;
267 }
268
269 #[cfg(feature = "opentelemetry")]
270 if let Some(opentelemetry) = global.opentelemetry().cloned() {
271 if opentelemetry.is_enabled() {
272 tokio::task::spawn_blocking(move || opentelemetry.shutdown())
273 .await
274 .context("opentelemetry shutdown spawn")?
275 .context("opentelemetry shutdown")?;
276 }
277 }
278
279 Ok(())
280 }
281}
282
283async fn health_check<G: TelemetryConfig>(
284 global: &std::sync::Arc<G>,
285 _: http::Request<scuffle_http::body::IncomingBody>,
286) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
287 if let Err(err) = global.health_check().await {
288 tracing::error!("health check failed: {err}");
289 Ok(http::Response::builder()
290 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
291 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
292 } else {
293 Ok(http::Response::builder()
294 .status(http::StatusCode::OK)
295 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
296 }
297}
298
299#[cfg(feature = "prometheus")]
300async fn metrics<G: TelemetryConfig>(
301 global: &std::sync::Arc<G>,
302 _: http::Request<scuffle_http::body::IncomingBody>,
303) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
304 if let Some(metrics) = global.prometheus_metrics_registry() {
305 let mut buf = String::new();
306 if prometheus_client::encoding::text::encode(&mut buf, metrics).is_err() {
307 tracing::error!("metrics encode failed");
308 return http::Response::builder()
309 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
310 .body(http_body_util::Full::new("metrics encode failed".to_string().into()));
311 }
312
313 Ok(http::Response::builder()
314 .status(http::StatusCode::OK)
315 .body(http_body_util::Full::new(Bytes::from(buf)))?)
316 } else {
317 Ok(http::Response::builder()
318 .status(http::StatusCode::NOT_FOUND)
319 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
320 }
321}
322
323#[cfg(feature = "pprof")]
324async fn pprof<G: TelemetryConfig>(
325 _: &std::sync::Arc<G>,
326 req: http::Request<scuffle_http::body::IncomingBody>,
327) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
328 let query = req.uri().query();
329 let query = query.map(querystring::querify).into_iter().flatten();
330
331 let mut freq = 100;
332 let mut duration = std::time::Duration::from_secs(5);
333 let mut ignore_list = Vec::new();
334
335 for (key, value) in query {
336 if key == "freq" {
337 freq = match value.parse() {
338 Ok(v) => v,
339 Err(err) => {
340 return http::Response::builder()
341 .status(http::StatusCode::BAD_REQUEST)
342 .body(http_body_util::Full::new(format!("invalid freq: {err:#}").into()));
343 }
344 };
345 } else if key == "duration" {
346 duration = match value.parse() {
347 Ok(v) => std::time::Duration::from_secs(v),
348 Err(err) => {
349 return http::Response::builder()
350 .status(http::StatusCode::BAD_REQUEST)
351 .body(http_body_util::Full::new(format!("invalid duration: {err:#}").into()));
352 }
353 };
354 } else if key == "ignore" {
355 ignore_list.push(value);
356 }
357 }
358
359 let cpu = scuffle_pprof::Cpu::new(freq, &ignore_list);
360
361 match tokio::task::spawn_blocking(move || cpu.capture(duration)).await {
362 Ok(Ok(data)) => Ok(http::Response::builder()
363 .status(http::StatusCode::OK)
364 .body(http_body_util::Full::new(Bytes::from(data)))?),
365 Ok(Err(err)) => {
366 tracing::error!("cpu capture failed: {err:#}");
367 Ok(http::Response::builder()
368 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
369 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
370 }
371 Err(err) => {
372 tracing::error!("cpu capture failed: {err:#}");
373 Ok(http::Response::builder()
374 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
375 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
376 }
377 }
378}
379
380#[cfg(feature = "opentelemetry")]
381async fn opentelemetry_flush<G: TelemetryConfig>(
382 global: &std::sync::Arc<G>,
383) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
384 if let Some(opentelemetry) = global.opentelemetry().cloned() {
385 if opentelemetry.is_enabled() {
386 match tokio::task::spawn_blocking(move || opentelemetry.flush()).await {
387 Ok(Ok(())) => Ok(http::Response::builder()
388 .status(http::StatusCode::OK)
389 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?),
390 Ok(Err(err)) => {
391 tracing::error!("opentelemetry flush failed: {err:#}");
392 Ok(http::Response::builder()
393 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
394 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
395 }
396 Err(err) => {
397 tracing::error!("opentelemetry flush spawn failed: {err:#}");
398 Ok(http::Response::builder()
399 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
400 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
401 }
402 }
403 } else {
404 Ok(http::Response::builder()
405 .status(http::StatusCode::OK)
406 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
407 }
408 } else {
409 Ok(http::Response::builder()
410 .status(http::StatusCode::NOT_FOUND)
411 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
412 }
413}
414
415#[cfg(test)]
416#[cfg_attr(all(test, coverage_nightly), coverage(off))]
417#[cfg(all(
418 feature = "opentelemetry-metrics",
419 feature = "opentelemetry-traces",
420 feature = "opentelemetry-logs"
421))]
422mod tests {
423 use std::net::SocketAddr;
424 use std::sync::Arc;
425
426 use bytes::Bytes;
427 #[cfg(feature = "opentelemetry-logs")]
428 use opentelemetry_sdk::logs::SdkLoggerProvider;
429 #[cfg(feature = "opentelemetry-metrics")]
430 use opentelemetry_sdk::metrics::SdkMeterProvider;
431 #[cfg(feature = "opentelemetry-traces")]
432 use opentelemetry_sdk::trace::SdkTracerProvider;
433 use scuffle_bootstrap::{GlobalWithoutConfig, Service};
434
435 use crate::{TelemetryConfig, TelemetrySvc};
436
437 async fn request_metrics(addr: SocketAddr) -> reqwest::Result<String> {
438 reqwest::get(format!("http://{addr}/metrics"))
439 .await
440 .unwrap()
441 .error_for_status()?
442 .text()
443 .await
444 }
445
446 async fn request_health(addr: SocketAddr) -> String {
447 reqwest::get(format!("http://{addr}/health"))
448 .await
449 .unwrap()
450 .error_for_status()
451 .expect("health check failed")
452 .text()
453 .await
454 .expect("health check text")
455 }
456
457 async fn request_pprof(addr: SocketAddr, freq: &str, duration: &str) -> reqwest::Result<Bytes> {
458 reqwest::get(format!("http://{addr}/pprof/cpu?freq={freq}&duration={duration}"))
459 .await
460 .unwrap()
461 .error_for_status()?
462 .bytes()
463 .await
464 }
465
466 async fn flush_opentelemetry(addr: SocketAddr) -> reqwest::Result<reqwest::Response> {
467 reqwest::get(format!("http://{addr}/opentelemetry/flush"))
468 .await
469 .unwrap()
470 .error_for_status()
471 }
472
473 #[cfg(not(valgrind))] #[tokio::test]
475 async fn telemetry_http_server() {
476 struct TestGlobal {
477 bind_addr: SocketAddr,
478 #[cfg(feature = "prometheus")]
479 prometheus: prometheus_client::registry::Registry,
480 open_telemetry: crate::opentelemetry::OpenTelemetry,
481 }
482
483 impl GlobalWithoutConfig for TestGlobal {
484 async fn init() -> anyhow::Result<Arc<Self>> {
485 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
486 let bind_addr = listener.local_addr()?;
487
488 let mut prometheus = prometheus_client::registry::Registry::default();
489
490 let exporter = scuffle_metrics::prometheus::exporter().build();
491 prometheus.register_collector(exporter.collector());
492
493 let metrics = SdkMeterProvider::builder().with_reader(exporter).build();
494 opentelemetry::global::set_meter_provider(metrics.clone());
495
496 let tracer = SdkTracerProvider::default();
497 opentelemetry::global::set_tracer_provider(tracer.clone());
498
499 let logger = SdkLoggerProvider::builder().build();
500
501 let open_telemetry = crate::opentelemetry::OpenTelemetry::new()
502 .with_metrics(metrics)
503 .with_traces(tracer)
504 .with_logs(logger);
505
506 Ok(Arc::new(TestGlobal {
507 bind_addr,
508 prometheus,
509 open_telemetry,
510 }))
511 }
512 }
513
514 impl TelemetryConfig for TestGlobal {
515 fn bind_address(&self) -> Option<std::net::SocketAddr> {
516 Some(self.bind_addr)
517 }
518
519 fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
520 Some(&self.prometheus)
521 }
522
523 fn opentelemetry(&self) -> Option<&crate::opentelemetry::OpenTelemetry> {
524 Some(&self.open_telemetry)
525 }
526 }
527
528 #[scuffle_metrics::metrics]
529 mod example {
530 use scuffle_metrics::{CounterU64, MetricEnum};
531
532 #[derive(MetricEnum)]
533 pub enum Kind {
534 Http,
535 Grpc,
536 }
537
538 #[metrics(unit = "requests")]
539 pub fn request(kind: Kind) -> CounterU64;
540 }
541
542 let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
543
544 let bind_addr = global.bind_addr;
545
546 assert!(TelemetrySvc.enabled(&global).await.unwrap());
547
548 let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
549
550 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
551
552 let health = request_health(bind_addr).await;
553 assert_eq!(health, "ok");
554
555 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
556 assert!(metrics.starts_with("# HELP target Information about the target\n"));
557 assert!(metrics.contains("# TYPE target info\n"));
558 assert!(metrics.contains("service_name=\"unknown_service\""));
559 assert!(metrics.contains("telemetry_sdk_language=\"rust\""));
560 assert!(metrics.contains("telemetry_sdk_name=\"opentelemetry\""));
561 assert!(metrics.ends_with("# EOF\n"));
562
563 example::request(example::Kind::Http).incr();
564
565 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
566
567 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
568 assert!(metrics.contains("# UNIT example_request_requests requests\n"));
569 assert!(metrics.contains("example_request_requests_total{"));
570 assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
571 assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
572 assert!(metrics.contains("kind=\"Http\""));
573 assert!(metrics.contains("} 1\n"));
574 assert!(metrics.ends_with("# EOF\n"));
575
576 example::request(example::Kind::Http).incr();
577
578 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
579
580 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
581 assert!(metrics.contains("# UNIT example_request_requests requests\n"));
582 assert!(metrics.contains("example_request_requests_total{"));
583 assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
584 assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
585 assert!(metrics.contains("kind=\"Http\""));
586 assert!(metrics.contains("} 2\n"));
587 assert!(metrics.ends_with("# EOF\n"));
588
589 let timer = std::time::Instant::now();
590 assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
591 assert!(timer.elapsed() > std::time::Duration::from_secs(2));
592
593 let res = request_pprof(bind_addr, "invalid", "2").await.expect_err("error expected");
594 assert!(res.is_status());
595 assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
596
597 let res = request_pprof(bind_addr, "100", "invalid").await.expect_err("error expected");
598 assert!(res.is_status());
599 assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
600
601 assert!(flush_opentelemetry(bind_addr).await.is_ok());
602
603 let res = reqwest::get(format!("http://{bind_addr}/not_found")).await.unwrap();
605 assert_eq!(res.status(), reqwest::StatusCode::NOT_FOUND);
606
607 scuffle_context::Handler::global().shutdown().await;
608
609 task_handle.await.unwrap().unwrap();
610 }
611
612 #[cfg(not(valgrind))] #[tokio::test]
614 async fn empty_telemetry_http_server() {
615 struct TestGlobal {
616 bind_addr: SocketAddr,
617 }
618
619 impl GlobalWithoutConfig for TestGlobal {
620 async fn init() -> anyhow::Result<Arc<Self>> {
621 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
622 let bind_addr = listener.local_addr()?;
623
624 Ok(Arc::new(TestGlobal { bind_addr }))
625 }
626 }
627
628 impl TelemetryConfig for TestGlobal {
629 fn bind_address(&self) -> Option<std::net::SocketAddr> {
630 Some(self.bind_addr)
631 }
632 }
633
634 let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
635
636 let bind_addr = global.bind_addr;
637
638 assert!(TelemetrySvc.enabled(&global).await.unwrap());
639
640 let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
641 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
642
643 let health = request_health(bind_addr).await;
644 assert_eq!(health, "ok");
645
646 let res = request_metrics(bind_addr).await.expect_err("error expected");
647 assert!(res.is_status());
648 assert_eq!(res.status(), Some(reqwest::StatusCode::NOT_FOUND));
649
650 let timer = std::time::Instant::now();
651 assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
652 assert!(timer.elapsed() > std::time::Duration::from_secs(2));
653
654 let err = flush_opentelemetry(bind_addr).await.expect_err("error expected");
655 assert!(err.is_status());
656 assert_eq!(err.status(), Some(reqwest::StatusCode::NOT_FOUND));
657
658 scuffle_context::Handler::global().shutdown().await;
659
660 task_handle.await.unwrap().unwrap();
661 }
662}