scuffle_bootstrap_telemetry/
lib.rs

1//! A crate used to add telemetry to applications built with the
2//! [`scuffle-bootstrap`](../scuffle_bootstrap) crate.
3//!
4//! Emit metrics using the [`scuffle-metrics`](../scuffle_metrics)
5//! crate.
6//!
7//! ## Feature Flags
8//!
9//! - `prometheus`: Enable Prometheus support.
10//! - `pprof`: Enable pprof support.
11//! - `opentelemetry-metrics`: Enable OpenTelemetry metrics support.
12//! - `opentelemetry-traces`: Enable OpenTelemetry traces support.
13//! - `opentelemetry-logs`: Enable OpenTelemetry logs support.
14//!
15//! All features are enabled by default.
16//!
17//! See [`TelemetrySvc`] for more details.
18//!
19//! ## Example
20//!
21//! ```rust
22//! use std::net::SocketAddr;
23//! use std::sync::Arc;
24//!
25//! use scuffle_bootstrap::global::GlobalWithoutConfig;
26//! use scuffle_bootstrap_telemetry::{prometheus_client, opentelemetry, opentelemetry_sdk, TelemetryConfig, TelemetrySvc};
27//!
28//! struct Global {
29//!     prometheus: prometheus_client::registry::Registry,
30//!     open_telemetry: opentelemetry::OpenTelemetry,
31//! }
32//!
33//! impl GlobalWithoutConfig for Global {
34//!     async fn init() -> anyhow::Result<Arc<Self>> {
35//!         // Initialize the Prometheus metrics registry.
36//!         let mut prometheus = prometheus_client::registry::Registry::default();
37//!         // The exporter converts opentelemetry metrics into the Prometheus format.
38//!         let exporter = scuffle_metrics::prometheus::exporter().build();
39//!         // Register the exporter as a data source for the Prometheus registry.
40//!         prometheus.register_collector(exporter.collector());
41//!
42//!         // Initialize the OpenTelemetry metrics provider and add the Prometheus exporter as a reader.
43//!         let metrics = opentelemetry_sdk::metrics::SdkMeterProvider::builder().with_reader(exporter).build();
44//!         opentelemetry::global::set_meter_provider(metrics.clone());
45//!
46//!         // Initialize the OpenTelemetry configuration instance.
47//!         let open_telemetry = opentelemetry::OpenTelemetry::new().with_metrics(metrics);
48//!
49//!         Ok(Arc::new(Self {
50//!             prometheus,
51//!             open_telemetry,
52//!         }))
53//!     }
54//! }
55//!
56//! impl TelemetryConfig for Global {
57//!     fn bind_address(&self) -> Option<SocketAddr> {
58//!         // Tells the http server to bind to port 8080 on localhost.
59//!         Some(SocketAddr::from(([127, 0, 0, 1], 8080)))
60//!     }
61//!
62//!     fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
63//!         Some(&self.prometheus)
64//!     }
65//!
66//!     fn opentelemetry(&self) -> Option<&opentelemetry::OpenTelemetry> {
67//!         Some(&self.open_telemetry)
68//!     }
69//! }
70//!
71//! #[scuffle_metrics::metrics]
72//! mod example {
73//!     use scuffle_metrics::{CounterU64, MetricEnum};
74//!
75//!     #[derive(MetricEnum)]
76//!     pub enum Kind {
77//!         Http,
78//!         Grpc,
79//!     }
80//!
81//!     #[metrics(unit = "requests")]
82//!     pub fn request(kind: Kind) -> CounterU64;
83//! }
84//!
85//! // Now emit metrics from anywhere in your code using the `example` module.
86//! example::request(example::Kind::Http).incr();
87//!
88//! scuffle_bootstrap::main! {
89//!     Global {
90//!         TelemetrySvc,
91//!     }
92//! };
93//! ```
94//!
95//! ## Status
96//!
97//! This crate is currently under development and is not yet stable, unit tests
98//! are not yet fully implemented.
99//!
100//! Unit tests are not yet fully implemented. Use at your own risk.
101//!
102//! ## License
103//!
104//! This project is licensed under the [MIT](./LICENSE.MIT) or
105//! [Apache-2.0](./LICENSE.Apache-2.0) license. You can choose between one of
106//! them if you use this work.
107//!
108//! `SPDX-License-Identifier: MIT OR Apache-2.0`
109#![cfg_attr(docsrs, feature(doc_cfg))]
110#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
111
112use anyhow::Context;
113use bytes::Bytes;
114#[cfg(feature = "opentelemetry-logs")]
115#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry-logs")))]
116pub use opentelemetry_appender_tracing;
117#[cfg(feature = "opentelemetry")]
118#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
119pub use opentelemetry_sdk;
120#[cfg(feature = "prometheus")]
121#[cfg_attr(docsrs, doc(cfg(feature = "prometheus")))]
122pub use prometheus_client;
123use scuffle_bootstrap::global::Global;
124use scuffle_bootstrap::service::Service;
125#[cfg(feature = "opentelemetry-traces")]
126pub use tracing_opentelemetry;
127
128#[cfg(feature = "opentelemetry")]
129#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
130pub mod opentelemetry;
131
132/// The telemetry service.
133///
134/// This is supposed to be used with the `scuffle-bootstrap` crate.
135///
136/// # HTTP Server
137///
138/// This service provides an http server which will bind to the address provided
139/// by the config. (See [`TelemetryConfig`])
140///
141/// ## Endpoints
142///
143/// The server provides the following endpoints:
144///
145/// ### `/health`
146///
147/// Health check endpoint.
148///
149/// This endpoint calls the health check function provided by the config and
150/// responds with `200 OK` if the health check returns `Ok(())`. If the health
151/// check returns an error, the endpoint returns `500 Internal Server Error`
152/// along with the error message.
153///
154/// ### `/metrics`
155///
156/// Metrics endpoint which can be used by Prometheus to scrape metrics.
157///
158/// This endpoint is only enabled if the `prometheus` feature flag is enabled
159/// and a metrics registry is provided through the config.
160///
161/// ### `/pprof/cpu`
162///
163/// pprof cpu endpoint to capture a cpu profile.
164///
165/// #### Query Parameters
166///
167/// - `freq`: Sampling frequency in Hz.
168/// - `duration`: Duration the profile should be captured for in s.
169/// - `ignore`: List of functions to exclude from the profile.
170///
171/// This endpoint is only enabled if the `pprof` feature flag is enabled.
172///
173/// ### `/opentelemetry/flush`
174///
175/// OpenTelemetry flush endpoint.
176///
177/// This endpoint is only enabled if one of the `opentelemetry` feature flags is
178/// enabled and an OpenTelemetry config is provided through the config.
179pub struct TelemetrySvc;
180
181/// Implement this trait to configure the telemetry service.
182pub trait TelemetryConfig: Global {
183    /// Return true if the service is enabled.
184    fn enabled(&self) -> bool {
185        true
186    }
187
188    /// Return the bind address for the http server.
189    fn bind_address(&self) -> Option<std::net::SocketAddr> {
190        None
191    }
192
193    /// Return the http server name.
194    fn http_server_name(&self) -> &str {
195        "scuffle-bootstrap-telemetry"
196    }
197
198    /// Return a health check to determine if the service is healthy.
199    ///
200    /// Always healthy by default.
201    fn health_check(&self) -> impl std::future::Future<Output = Result<(), anyhow::Error>> + Send {
202        std::future::ready(Ok(()))
203    }
204
205    /// Return a Prometheus metrics registry to scrape metrics from.
206    ///
207    /// Returning `Some` will enable the `/metrics` http endpoint which can be
208    /// used by Prometheus to scrape metrics.
209    ///
210    /// Disabled (`None`) by default.
211    #[cfg(feature = "prometheus")]
212    #[cfg_attr(docsrs, doc(cfg(feature = "prometheus")))]
213    fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
214        None
215    }
216
217    /// Pass an OpenTelemetry instance to the service.
218    ///
219    /// If provided the service will flush and shutdown the OpenTelemetry
220    /// instance when it shuts down.
221    /// Additionally, the service provides the `/opentelemetry/flush` http
222    /// endpoint to manually flush the data.
223    #[cfg(feature = "opentelemetry")]
224    #[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
225    fn opentelemetry(&self) -> Option<&opentelemetry::OpenTelemetry> {
226        None
227    }
228}
229
230impl<Global: TelemetryConfig> Service<Global> for TelemetrySvc {
231    async fn enabled(&self, global: &std::sync::Arc<Global>) -> anyhow::Result<bool> {
232        Ok(global.enabled())
233    }
234
235    async fn run(self, global: std::sync::Arc<Global>, ctx: scuffle_context::Context) -> anyhow::Result<()> {
236        if let Some(bind_addr) = global.bind_address() {
237            let global = global.clone();
238
239            let service = scuffle_http::service::fn_http_service(move |req| {
240                let global = global.clone();
241                async move {
242                    match req.uri().path() {
243                        "/health" => health_check(&global, req).await,
244                        #[cfg(feature = "prometheus")]
245                        "/metrics" => metrics(&global, req).await,
246                        #[cfg(feature = "pprof")]
247                        "/pprof/cpu" => pprof(&global, req).await,
248                        #[cfg(feature = "opentelemetry")]
249                        "/opentelemetry/flush" => opentelemetry_flush(&global).await,
250                        _ => Ok(http::Response::builder()
251                            .status(http::StatusCode::NOT_FOUND)
252                            .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?),
253                    }
254                }
255            });
256
257            scuffle_http::HttpServer::builder()
258                .bind(bind_addr)
259                .ctx(ctx)
260                .service_factory(scuffle_http::service::service_clone_factory(service))
261                .build()
262                .run()
263                .await
264                .context("server run")?;
265        } else {
266            ctx.done().await;
267        }
268
269        #[cfg(feature = "opentelemetry")]
270        if let Some(opentelemetry) = global.opentelemetry().cloned() {
271            if opentelemetry.is_enabled() {
272                tokio::task::spawn_blocking(move || opentelemetry.shutdown())
273                    .await
274                    .context("opentelemetry shutdown spawn")?
275                    .context("opentelemetry shutdown")?;
276            }
277        }
278
279        Ok(())
280    }
281}
282
283async fn health_check<G: TelemetryConfig>(
284    global: &std::sync::Arc<G>,
285    _: http::Request<scuffle_http::body::IncomingBody>,
286) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
287    if let Err(err) = global.health_check().await {
288        tracing::error!("health check failed: {err}");
289        Ok(http::Response::builder()
290            .status(http::StatusCode::INTERNAL_SERVER_ERROR)
291            .body(http_body_util::Full::new(format!("{err:#}").into()))?)
292    } else {
293        Ok(http::Response::builder()
294            .status(http::StatusCode::OK)
295            .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
296    }
297}
298
299#[cfg(feature = "prometheus")]
300async fn metrics<G: TelemetryConfig>(
301    global: &std::sync::Arc<G>,
302    _: http::Request<scuffle_http::body::IncomingBody>,
303) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
304    if let Some(metrics) = global.prometheus_metrics_registry() {
305        let mut buf = String::new();
306        if prometheus_client::encoding::text::encode(&mut buf, metrics).is_err() {
307            tracing::error!("metrics encode failed");
308            return http::Response::builder()
309                .status(http::StatusCode::INTERNAL_SERVER_ERROR)
310                .body(http_body_util::Full::new("metrics encode failed".to_string().into()));
311        }
312
313        Ok(http::Response::builder()
314            .status(http::StatusCode::OK)
315            .body(http_body_util::Full::new(Bytes::from(buf)))?)
316    } else {
317        Ok(http::Response::builder()
318            .status(http::StatusCode::NOT_FOUND)
319            .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
320    }
321}
322
323#[cfg(feature = "pprof")]
324async fn pprof<G: TelemetryConfig>(
325    _: &std::sync::Arc<G>,
326    req: http::Request<scuffle_http::body::IncomingBody>,
327) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
328    let query = req.uri().query();
329    let query = query.map(querystring::querify).into_iter().flatten();
330
331    let mut freq = 100;
332    let mut duration = std::time::Duration::from_secs(5);
333    let mut ignore_list = Vec::new();
334
335    for (key, value) in query {
336        if key == "freq" {
337            freq = match value.parse() {
338                Ok(v) => v,
339                Err(err) => {
340                    return http::Response::builder()
341                        .status(http::StatusCode::BAD_REQUEST)
342                        .body(http_body_util::Full::new(format!("invalid freq: {err:#}").into()));
343                }
344            };
345        } else if key == "duration" {
346            duration = match value.parse() {
347                Ok(v) => std::time::Duration::from_secs(v),
348                Err(err) => {
349                    return http::Response::builder()
350                        .status(http::StatusCode::BAD_REQUEST)
351                        .body(http_body_util::Full::new(format!("invalid duration: {err:#}").into()));
352                }
353            };
354        } else if key == "ignore" {
355            ignore_list.push(value);
356        }
357    }
358
359    let cpu = scuffle_pprof::Cpu::new(freq, &ignore_list);
360
361    match tokio::task::spawn_blocking(move || cpu.capture(duration)).await {
362        Ok(Ok(data)) => Ok(http::Response::builder()
363            .status(http::StatusCode::OK)
364            .body(http_body_util::Full::new(Bytes::from(data)))?),
365        Ok(Err(err)) => {
366            tracing::error!("cpu capture failed: {err:#}");
367            Ok(http::Response::builder()
368                .status(http::StatusCode::INTERNAL_SERVER_ERROR)
369                .body(http_body_util::Full::new(format!("{err:#}").into()))?)
370        }
371        Err(err) => {
372            tracing::error!("cpu capture failed: {err:#}");
373            Ok(http::Response::builder()
374                .status(http::StatusCode::INTERNAL_SERVER_ERROR)
375                .body(http_body_util::Full::new(format!("{err:#}").into()))?)
376        }
377    }
378}
379
380#[cfg(feature = "opentelemetry")]
381async fn opentelemetry_flush<G: TelemetryConfig>(
382    global: &std::sync::Arc<G>,
383) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
384    if let Some(opentelemetry) = global.opentelemetry().cloned() {
385        if opentelemetry.is_enabled() {
386            match tokio::task::spawn_blocking(move || opentelemetry.flush()).await {
387                Ok(Ok(())) => Ok(http::Response::builder()
388                    .status(http::StatusCode::OK)
389                    .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?),
390                Ok(Err(err)) => {
391                    tracing::error!("opentelemetry flush failed: {err:#}");
392                    Ok(http::Response::builder()
393                        .status(http::StatusCode::INTERNAL_SERVER_ERROR)
394                        .body(http_body_util::Full::new(format!("{err:#}").into()))?)
395                }
396                Err(err) => {
397                    tracing::error!("opentelemetry flush spawn failed: {err:#}");
398                    Ok(http::Response::builder()
399                        .status(http::StatusCode::INTERNAL_SERVER_ERROR)
400                        .body(http_body_util::Full::new(format!("{err:#}").into()))?)
401                }
402            }
403        } else {
404            Ok(http::Response::builder()
405                .status(http::StatusCode::OK)
406                .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
407        }
408    } else {
409        Ok(http::Response::builder()
410            .status(http::StatusCode::NOT_FOUND)
411            .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
412    }
413}
414
415#[cfg(test)]
416#[cfg_attr(all(test, coverage_nightly), coverage(off))]
417#[cfg(all(
418    feature = "opentelemetry-metrics",
419    feature = "opentelemetry-traces",
420    feature = "opentelemetry-logs"
421))]
422mod tests {
423    use std::net::SocketAddr;
424    use std::sync::Arc;
425
426    use bytes::Bytes;
427    #[cfg(feature = "opentelemetry-logs")]
428    use opentelemetry_sdk::logs::SdkLoggerProvider;
429    #[cfg(feature = "opentelemetry-metrics")]
430    use opentelemetry_sdk::metrics::SdkMeterProvider;
431    #[cfg(feature = "opentelemetry-traces")]
432    use opentelemetry_sdk::trace::SdkTracerProvider;
433    use scuffle_bootstrap::{GlobalWithoutConfig, Service};
434
435    use crate::{TelemetryConfig, TelemetrySvc};
436
437    async fn request_metrics(addr: SocketAddr) -> reqwest::Result<String> {
438        reqwest::get(format!("http://{addr}/metrics"))
439            .await
440            .unwrap()
441            .error_for_status()?
442            .text()
443            .await
444    }
445
446    async fn request_health(addr: SocketAddr) -> String {
447        reqwest::get(format!("http://{addr}/health"))
448            .await
449            .unwrap()
450            .error_for_status()
451            .expect("health check failed")
452            .text()
453            .await
454            .expect("health check text")
455    }
456
457    async fn request_pprof(addr: SocketAddr, freq: &str, duration: &str) -> reqwest::Result<Bytes> {
458        reqwest::get(format!("http://{addr}/pprof/cpu?freq={freq}&duration={duration}"))
459            .await
460            .unwrap()
461            .error_for_status()?
462            .bytes()
463            .await
464    }
465
466    async fn flush_opentelemetry(addr: SocketAddr) -> reqwest::Result<reqwest::Response> {
467        reqwest::get(format!("http://{addr}/opentelemetry/flush"))
468            .await
469            .unwrap()
470            .error_for_status()
471    }
472
473    #[cfg(not(valgrind))] // test is time-sensitive
474    #[tokio::test]
475    async fn telemetry_http_server() {
476        struct TestGlobal {
477            bind_addr: SocketAddr,
478            #[cfg(feature = "prometheus")]
479            prometheus: prometheus_client::registry::Registry,
480            open_telemetry: crate::opentelemetry::OpenTelemetry,
481        }
482
483        impl GlobalWithoutConfig for TestGlobal {
484            async fn init() -> anyhow::Result<Arc<Self>> {
485                let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
486                let bind_addr = listener.local_addr()?;
487
488                let mut prometheus = prometheus_client::registry::Registry::default();
489
490                let exporter = scuffle_metrics::prometheus::exporter().build();
491                prometheus.register_collector(exporter.collector());
492
493                let metrics = SdkMeterProvider::builder().with_reader(exporter).build();
494                opentelemetry::global::set_meter_provider(metrics.clone());
495
496                let tracer = SdkTracerProvider::default();
497                opentelemetry::global::set_tracer_provider(tracer.clone());
498
499                let logger = SdkLoggerProvider::builder().build();
500
501                let open_telemetry = crate::opentelemetry::OpenTelemetry::new()
502                    .with_metrics(metrics)
503                    .with_traces(tracer)
504                    .with_logs(logger);
505
506                Ok(Arc::new(TestGlobal {
507                    bind_addr,
508                    prometheus,
509                    open_telemetry,
510                }))
511            }
512        }
513
514        impl TelemetryConfig for TestGlobal {
515            fn bind_address(&self) -> Option<std::net::SocketAddr> {
516                Some(self.bind_addr)
517            }
518
519            fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
520                Some(&self.prometheus)
521            }
522
523            fn opentelemetry(&self) -> Option<&crate::opentelemetry::OpenTelemetry> {
524                Some(&self.open_telemetry)
525            }
526        }
527
528        #[scuffle_metrics::metrics]
529        mod example {
530            use scuffle_metrics::{CounterU64, MetricEnum};
531
532            #[derive(MetricEnum)]
533            pub enum Kind {
534                Http,
535                Grpc,
536            }
537
538            #[metrics(unit = "requests")]
539            pub fn request(kind: Kind) -> CounterU64;
540        }
541
542        let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
543
544        let bind_addr = global.bind_addr;
545
546        assert!(TelemetrySvc.enabled(&global).await.unwrap());
547
548        let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
549
550        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
551
552        let health = request_health(bind_addr).await;
553        assert_eq!(health, "ok");
554
555        let metrics = request_metrics(bind_addr).await.expect("metrics failed");
556        assert!(metrics.starts_with("# HELP target Information about the target\n"));
557        assert!(metrics.contains("# TYPE target info\n"));
558        assert!(metrics.contains("service_name=\"unknown_service\""));
559        assert!(metrics.contains("telemetry_sdk_language=\"rust\""));
560        assert!(metrics.contains("telemetry_sdk_name=\"opentelemetry\""));
561        assert!(metrics.ends_with("# EOF\n"));
562
563        example::request(example::Kind::Http).incr();
564
565        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
566
567        let metrics = request_metrics(bind_addr).await.expect("metrics failed");
568        assert!(metrics.contains("# UNIT example_request_requests requests\n"));
569        assert!(metrics.contains("example_request_requests_total{"));
570        assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
571        assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
572        assert!(metrics.contains("kind=\"Http\""));
573        assert!(metrics.contains("} 1\n"));
574        assert!(metrics.ends_with("# EOF\n"));
575
576        example::request(example::Kind::Http).incr();
577
578        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
579
580        let metrics = request_metrics(bind_addr).await.expect("metrics failed");
581        assert!(metrics.contains("# UNIT example_request_requests requests\n"));
582        assert!(metrics.contains("example_request_requests_total{"));
583        assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
584        assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
585        assert!(metrics.contains("kind=\"Http\""));
586        assert!(metrics.contains("} 2\n"));
587        assert!(metrics.ends_with("# EOF\n"));
588
589        let timer = std::time::Instant::now();
590        assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
591        assert!(timer.elapsed() > std::time::Duration::from_secs(2));
592
593        let res = request_pprof(bind_addr, "invalid", "2").await.expect_err("error expected");
594        assert!(res.is_status());
595        assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
596
597        let res = request_pprof(bind_addr, "100", "invalid").await.expect_err("error expected");
598        assert!(res.is_status());
599        assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
600
601        assert!(flush_opentelemetry(bind_addr).await.is_ok());
602
603        // Not found
604        let res = reqwest::get(format!("http://{bind_addr}/not_found")).await.unwrap();
605        assert_eq!(res.status(), reqwest::StatusCode::NOT_FOUND);
606
607        scuffle_context::Handler::global().shutdown().await;
608
609        task_handle.await.unwrap().unwrap();
610    }
611
612    #[cfg(not(valgrind))] // test is time-sensitive
613    #[tokio::test]
614    async fn empty_telemetry_http_server() {
615        struct TestGlobal {
616            bind_addr: SocketAddr,
617        }
618
619        impl GlobalWithoutConfig for TestGlobal {
620            async fn init() -> anyhow::Result<Arc<Self>> {
621                let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
622                let bind_addr = listener.local_addr()?;
623
624                Ok(Arc::new(TestGlobal { bind_addr }))
625            }
626        }
627
628        impl TelemetryConfig for TestGlobal {
629            fn bind_address(&self) -> Option<std::net::SocketAddr> {
630                Some(self.bind_addr)
631            }
632        }
633
634        let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
635
636        let bind_addr = global.bind_addr;
637
638        assert!(TelemetrySvc.enabled(&global).await.unwrap());
639
640        let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
641        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
642
643        let health = request_health(bind_addr).await;
644        assert_eq!(health, "ok");
645
646        let res = request_metrics(bind_addr).await.expect_err("error expected");
647        assert!(res.is_status());
648        assert_eq!(res.status(), Some(reqwest::StatusCode::NOT_FOUND));
649
650        let timer = std::time::Instant::now();
651        assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
652        assert!(timer.elapsed() > std::time::Duration::from_secs(2));
653
654        let err = flush_opentelemetry(bind_addr).await.expect_err("error expected");
655        assert!(err.is_status());
656        assert_eq!(err.status(), Some(reqwest::StatusCode::NOT_FOUND));
657
658        scuffle_context::Handler::global().shutdown().await;
659
660        task_handle.await.unwrap().unwrap();
661    }
662}