scuffle_metrics/prometheus/
mod.rs

1use std::borrow::Cow;
2use std::sync::Arc;
3
4use opentelemetry::{otel_error, otel_warn, InstrumentationScope, KeyValue};
5use opentelemetry_sdk::metrics::data::{Gauge, Histogram, ResourceMetrics, Sum};
6use opentelemetry_sdk::metrics::reader::MetricReader;
7use opentelemetry_sdk::metrics::{ManualReader, ManualReaderBuilder};
8use opentelemetry_sdk::Resource;
9use prometheus_client::encoding::{EncodeCounterValue, EncodeGaugeValue, NoLabelSet};
10use prometheus_client::metrics::MetricType;
11use prometheus_client::registry::Unit;
12
13/// A Prometheus exporter for OpenTelemetry metrics.
14///
15/// Responsible for encoding OpenTelemetry metrics into Prometheus format.
16/// The exporter implements the
17/// [`opentelemetry_sdk::metrics::reader::MetricReader`](https://docs.rs/opentelemetry_sdk/0.27.0/opentelemetry_sdk/metrics/reader/trait.MetricReader.html)
18/// trait and therefore can be passed to a
19/// [`opentelemetry_sdk::metrics::SdkMeterProvider`](https://docs.rs/opentelemetry_sdk/0.27.0/opentelemetry_sdk/metrics/struct.SdkMeterProvider.html).
20///
21/// Use [`collector`](PrometheusExporter::collector) to get a
22/// [`prometheus_client::collector::Collector`](https://docs.rs/prometheus-client/0.22.3/prometheus_client/collector/trait.Collector.html)
23/// that can be registered with a
24/// [`prometheus_client::registry::Registry`](https://docs.rs/prometheus-client/0.22.3/prometheus_client/registry/struct.Registry.html)
25/// to provide metrics to Prometheus.
26#[derive(Debug, Clone)]
27pub struct PrometheusExporter {
28    reader: Arc<ManualReader>,
29    prometheus_full_utf8: bool,
30}
31
32impl PrometheusExporter {
33    /// Returns a new [`PrometheusExporterBuilder`] to configure a [`PrometheusExporter`].
34    pub fn builder() -> PrometheusExporterBuilder {
35        PrometheusExporterBuilder::default()
36    }
37
38    /// Returns a [`prometheus_client::collector::Collector`] that can be registered
39    /// with a [`prometheus_client::registry::Registry`] to provide metrics to Prometheus.
40    pub fn collector(&self) -> Box<dyn prometheus_client::collector::Collector> {
41        Box::new(self.clone())
42    }
43}
44
45impl MetricReader for PrometheusExporter {
46    fn register_pipeline(&self, pipeline: std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>) {
47        self.reader.register_pipeline(pipeline)
48    }
49
50    fn collect(
51        &self,
52        rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
53    ) -> opentelemetry_sdk::metrics::MetricResult<()> {
54        self.reader.collect(rm)
55    }
56
57    fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult {
58        self.reader.force_flush()
59    }
60
61    fn shutdown(&self) -> opentelemetry_sdk::error::OTelSdkResult {
62        self.reader.shutdown()
63    }
64
65    fn temporality(&self, kind: opentelemetry_sdk::metrics::InstrumentKind) -> opentelemetry_sdk::metrics::Temporality {
66        self.reader.temporality(kind)
67    }
68}
69
70/// Builder for [`PrometheusExporter`].
71#[derive(Default)]
72pub struct PrometheusExporterBuilder {
73    reader: ManualReaderBuilder,
74    prometheus_full_utf8: bool,
75}
76
77impl PrometheusExporterBuilder {
78    /// Set the reader temporality.
79    pub fn with_temporality(mut self, temporality: opentelemetry_sdk::metrics::Temporality) -> Self {
80        self.reader = self.reader.with_temporality(temporality);
81        self
82    }
83
84    /// Allow full UTF-8 labels in Prometheus.
85    ///
86    /// This is disabled by default however if you are using a newer version of
87    /// Prometheus that supports full UTF-8 labels you may enable this feature.
88    pub fn with_prometheus_full_utf8(mut self, prometheus_full_utf8: bool) -> Self {
89        self.prometheus_full_utf8 = prometheus_full_utf8;
90        self
91    }
92
93    /// Build the [`PrometheusExporter`].
94    pub fn build(self) -> PrometheusExporter {
95        PrometheusExporter {
96            reader: Arc::new(self.reader.build()),
97            prometheus_full_utf8: self.prometheus_full_utf8,
98        }
99    }
100}
101
102/// Returns a new [`PrometheusExporterBuilder`] to configure a [`PrometheusExporter`].
103pub fn exporter() -> PrometheusExporterBuilder {
104    PrometheusExporter::builder()
105}
106
107#[derive(Debug, Clone, Copy)]
108enum RawNumber {
109    U64(u64),
110    I64(i64),
111    F64(f64),
112    #[cfg(feature = "extended-numbers")]
113    U32(u32),
114    #[cfg(feature = "extended-numbers")]
115    U16(u16),
116    #[cfg(feature = "extended-numbers")]
117    U8(u8),
118    #[cfg(feature = "extended-numbers")]
119    I32(i32),
120    #[cfg(feature = "extended-numbers")]
121    I16(i16),
122    #[cfg(feature = "extended-numbers")]
123    I8(i8),
124    #[cfg(feature = "extended-numbers")]
125    F32(f32),
126}
127
128impl RawNumber {
129    fn as_f64(&self) -> f64 {
130        match *self {
131            RawNumber::U64(value) => value as f64,
132            RawNumber::I64(value) => value as f64,
133            RawNumber::F64(value) => value,
134            #[cfg(feature = "extended-numbers")]
135            RawNumber::U32(value) => value as f64,
136            #[cfg(feature = "extended-numbers")]
137            RawNumber::U16(value) => value as f64,
138            #[cfg(feature = "extended-numbers")]
139            RawNumber::U8(value) => value as f64,
140            #[cfg(feature = "extended-numbers")]
141            RawNumber::I32(value) => value as f64,
142            #[cfg(feature = "extended-numbers")]
143            RawNumber::I16(value) => value as f64,
144            #[cfg(feature = "extended-numbers")]
145            RawNumber::I8(value) => value as f64,
146            #[cfg(feature = "extended-numbers")]
147            RawNumber::F32(value) => value as f64,
148        }
149    }
150}
151
152impl EncodeGaugeValue for RawNumber {
153    fn encode(&self, encoder: &mut prometheus_client::encoding::GaugeValueEncoder) -> Result<(), std::fmt::Error> {
154        match *self {
155            RawNumber::U64(value) => EncodeGaugeValue::encode(&(value as i64), encoder),
156            RawNumber::I64(value) => EncodeGaugeValue::encode(&value, encoder),
157            RawNumber::F64(value) => EncodeGaugeValue::encode(&value, encoder),
158            #[cfg(feature = "extended-numbers")]
159            RawNumber::U32(value) => EncodeGaugeValue::encode(&value, encoder),
160            #[cfg(feature = "extended-numbers")]
161            RawNumber::U16(value) => EncodeGaugeValue::encode(&(value as u32), encoder),
162            #[cfg(feature = "extended-numbers")]
163            RawNumber::U8(value) => EncodeGaugeValue::encode(&(value as u32), encoder),
164            #[cfg(feature = "extended-numbers")]
165            RawNumber::I32(value) => EncodeGaugeValue::encode(&(value as i64), encoder),
166            #[cfg(feature = "extended-numbers")]
167            RawNumber::I16(value) => EncodeGaugeValue::encode(&(value as i64), encoder),
168            #[cfg(feature = "extended-numbers")]
169            RawNumber::I8(value) => EncodeGaugeValue::encode(&(value as i64), encoder),
170            #[cfg(feature = "extended-numbers")]
171            RawNumber::F32(value) => EncodeGaugeValue::encode(&(value as f64), encoder),
172        }
173    }
174}
175
176impl EncodeCounterValue for RawNumber {
177    fn encode(&self, encoder: &mut prometheus_client::encoding::CounterValueEncoder) -> Result<(), std::fmt::Error> {
178        match *self {
179            RawNumber::U64(value) => EncodeCounterValue::encode(&value, encoder),
180            RawNumber::I64(value) => EncodeCounterValue::encode(&(value as f64), encoder),
181            RawNumber::F64(value) => EncodeCounterValue::encode(&value, encoder),
182            #[cfg(feature = "extended-numbers")]
183            RawNumber::U32(value) => EncodeCounterValue::encode(&(value as u64), encoder),
184            #[cfg(feature = "extended-numbers")]
185            RawNumber::U16(value) => EncodeCounterValue::encode(&(value as u64), encoder),
186            #[cfg(feature = "extended-numbers")]
187            RawNumber::U8(value) => EncodeCounterValue::encode(&(value as u64), encoder),
188            #[cfg(feature = "extended-numbers")]
189            RawNumber::I32(value) => EncodeCounterValue::encode(&(value as f64), encoder),
190            #[cfg(feature = "extended-numbers")]
191            RawNumber::I16(value) => EncodeCounterValue::encode(&(value as f64), encoder),
192            #[cfg(feature = "extended-numbers")]
193            RawNumber::I8(value) => EncodeCounterValue::encode(&(value as f64), encoder),
194            #[cfg(feature = "extended-numbers")]
195            RawNumber::F32(value) => EncodeCounterValue::encode(&(value as f64), encoder),
196        }
197    }
198}
199
200macro_rules! impl_raw_number {
201    ($t:ty, $variant:ident) => {
202        impl From<$t> for RawNumber {
203            fn from(value: $t) -> Self {
204                RawNumber::$variant(value)
205            }
206        }
207    };
208}
209
210impl_raw_number!(u64, U64);
211impl_raw_number!(i64, I64);
212impl_raw_number!(f64, F64);
213
214#[cfg(feature = "extended-numbers")]
215const _: () = {
216    impl_raw_number!(u32, U32);
217    impl_raw_number!(u16, U16);
218    impl_raw_number!(u8, U8);
219    impl_raw_number!(i32, I32);
220    impl_raw_number!(i16, I16);
221    impl_raw_number!(i8, I8);
222    impl_raw_number!(f32, F32);
223};
224
225enum KnownMetricT<'a, T> {
226    Gauge(&'a Gauge<T>),
227    Sum(&'a Sum<T>),
228    Histogram(&'a Histogram<T>),
229}
230
231impl<'a, T: 'static> KnownMetricT<'a, T>
232where
233    RawNumber: From<T>,
234    T: Copy,
235{
236    fn from_any(any: &'a dyn std::any::Any) -> Option<Self> {
237        if let Some(gauge) = any.downcast_ref::<Gauge<T>>() {
238            Some(KnownMetricT::Gauge(gauge))
239        } else if let Some(sum) = any.downcast_ref::<Sum<T>>() {
240            Some(KnownMetricT::Sum(sum))
241        } else {
242            any.downcast_ref::<Histogram<T>>()
243                .map(|histogram| KnownMetricT::Histogram(histogram))
244        }
245    }
246
247    fn metric_type(&self) -> MetricType {
248        match self {
249            KnownMetricT::Gauge(_) => MetricType::Gauge,
250            KnownMetricT::Sum(sum) => {
251                if sum.is_monotonic {
252                    MetricType::Counter
253                } else {
254                    MetricType::Gauge
255                }
256            }
257            KnownMetricT::Histogram(_) => MetricType::Histogram,
258        }
259    }
260
261    fn encode(
262        &self,
263        mut encoder: prometheus_client::encoding::MetricEncoder,
264        labels: KeyValueEncoder<'a>,
265    ) -> Result<(), std::fmt::Error> {
266        match self {
267            KnownMetricT::Gauge(gauge) => {
268                for data_point in &gauge.data_points {
269                    let number = RawNumber::from(data_point.value);
270                    encoder
271                        .encode_family(&labels.with_attrs(Some(&data_point.attributes)))?
272                        .encode_gauge(&number)?;
273                }
274            }
275            KnownMetricT::Sum(sum) => {
276                for data_point in &sum.data_points {
277                    let number = RawNumber::from(data_point.value);
278                    let attrs = labels.with_attrs(Some(&data_point.attributes));
279                    let mut encoder = encoder.encode_family(&attrs)?;
280
281                    if sum.is_monotonic {
282                        // TODO(troy): Exemplar support
283                        encoder.encode_counter::<NoLabelSet, _, f64>(&number, None)?;
284                    } else {
285                        encoder.encode_gauge(&number)?;
286                    }
287                }
288            }
289            KnownMetricT::Histogram(histogram) => {
290                for data_point in &histogram.data_points {
291                    let attrs = labels.with_attrs(Some(&data_point.attributes));
292                    let mut encoder = encoder.encode_family(&attrs)?;
293
294                    let sum = RawNumber::from(data_point.sum);
295
296                    let buckets = data_point
297                        .bounds
298                        .iter()
299                        .copied()
300                        .zip(data_point.bucket_counts.iter().copied())
301                        .collect::<Vec<_>>();
302
303                    encoder.encode_histogram::<NoLabelSet>(sum.as_f64(), data_point.count, &buckets, None)?;
304                }
305            }
306        }
307
308        Ok(())
309    }
310}
311
312enum KnownMetric<'a> {
313    U64(KnownMetricT<'a, u64>),
314    I64(KnownMetricT<'a, i64>),
315    F64(KnownMetricT<'a, f64>),
316    #[cfg(feature = "extended-numbers")]
317    U32(KnownMetricT<'a, u32>),
318    #[cfg(feature = "extended-numbers")]
319    U16(KnownMetricT<'a, u16>),
320    #[cfg(feature = "extended-numbers")]
321    U8(KnownMetricT<'a, u8>),
322    #[cfg(feature = "extended-numbers")]
323    I32(KnownMetricT<'a, i32>),
324    #[cfg(feature = "extended-numbers")]
325    I16(KnownMetricT<'a, i16>),
326    #[cfg(feature = "extended-numbers")]
327    I8(KnownMetricT<'a, i8>),
328    #[cfg(feature = "extended-numbers")]
329    F32(KnownMetricT<'a, f32>),
330}
331
332impl<'a> KnownMetric<'a> {
333    fn from_any(any: &'a dyn std::any::Any) -> Option<Self> {
334        macro_rules! try_decode {
335            ($t:ty, $variant:ident) => {
336                if let Some(metric) = KnownMetricT::<$t>::from_any(any) {
337                    return Some(KnownMetric::$variant(metric));
338                }
339            };
340        }
341
342        try_decode!(u64, U64);
343        try_decode!(i64, I64);
344        try_decode!(f64, F64);
345        #[cfg(feature = "extended-numbers")]
346        try_decode!(u32, U32);
347        #[cfg(feature = "extended-numbers")]
348        try_decode!(i32, I32);
349        #[cfg(feature = "extended-numbers")]
350        try_decode!(u16, U16);
351        #[cfg(feature = "extended-numbers")]
352        try_decode!(i16, I16);
353        #[cfg(feature = "extended-numbers")]
354        try_decode!(u8, U8);
355        #[cfg(feature = "extended-numbers")]
356        try_decode!(i8, I8);
357        #[cfg(feature = "extended-numbers")]
358        try_decode!(f32, F32);
359
360        None
361    }
362
363    fn metric_type(&self) -> MetricType {
364        match self {
365            KnownMetric::U64(metric) => metric.metric_type(),
366            KnownMetric::I64(metric) => metric.metric_type(),
367            KnownMetric::F64(metric) => metric.metric_type(),
368            #[cfg(feature = "extended-numbers")]
369            KnownMetric::U32(metric) => metric.metric_type(),
370            #[cfg(feature = "extended-numbers")]
371            KnownMetric::I32(metric) => metric.metric_type(),
372            #[cfg(feature = "extended-numbers")]
373            KnownMetric::U16(metric) => metric.metric_type(),
374            #[cfg(feature = "extended-numbers")]
375            KnownMetric::I16(metric) => metric.metric_type(),
376            #[cfg(feature = "extended-numbers")]
377            KnownMetric::U8(metric) => metric.metric_type(),
378            #[cfg(feature = "extended-numbers")]
379            KnownMetric::I8(metric) => metric.metric_type(),
380            #[cfg(feature = "extended-numbers")]
381            KnownMetric::F32(metric) => metric.metric_type(),
382        }
383    }
384
385    fn encode(
386        &self,
387        encoder: prometheus_client::encoding::MetricEncoder,
388        labels: KeyValueEncoder<'a>,
389    ) -> Result<(), std::fmt::Error> {
390        match self {
391            KnownMetric::U64(metric) => metric.encode(encoder, labels),
392            KnownMetric::I64(metric) => metric.encode(encoder, labels),
393            KnownMetric::F64(metric) => metric.encode(encoder, labels),
394            #[cfg(feature = "extended-numbers")]
395            KnownMetric::U32(metric) => metric.encode(encoder, labels),
396            #[cfg(feature = "extended-numbers")]
397            KnownMetric::I32(metric) => metric.encode(encoder, labels),
398            #[cfg(feature = "extended-numbers")]
399            KnownMetric::U16(metric) => metric.encode(encoder, labels),
400            #[cfg(feature = "extended-numbers")]
401            KnownMetric::I16(metric) => metric.encode(encoder, labels),
402            #[cfg(feature = "extended-numbers")]
403            KnownMetric::U8(metric) => metric.encode(encoder, labels),
404            #[cfg(feature = "extended-numbers")]
405            KnownMetric::I8(metric) => metric.encode(encoder, labels),
406            #[cfg(feature = "extended-numbers")]
407            KnownMetric::F32(metric) => metric.encode(encoder, labels),
408        }
409    }
410}
411
412impl prometheus_client::collector::Collector for PrometheusExporter {
413    fn encode(&self, mut encoder: prometheus_client::encoding::DescriptorEncoder) -> Result<(), std::fmt::Error> {
414        let mut metrics = ResourceMetrics {
415            resource: Resource::builder_empty().build(),
416            scope_metrics: vec![],
417        };
418
419        if let Err(err) = self.reader.collect(&mut metrics) {
420            otel_error!(name: "prometheus_collector_collect_error", error = err.to_string());
421            return Err(std::fmt::Error);
422        }
423
424        let labels = KeyValueEncoder::new(self.prometheus_full_utf8);
425
426        encoder
427            .encode_descriptor("target", "Information about the target", None, MetricType::Info)?
428            .encode_info(&labels.with_resource(Some(&metrics.resource)))?;
429
430        for scope_metrics in &metrics.scope_metrics {
431            for metric in &scope_metrics.metrics {
432                let Some(known_metric) = KnownMetric::from_any(metric.data.as_any()) else {
433                    otel_warn!(name: "prometheus_collector_unknown_metric_type", metric_name = metric.name.as_ref());
434                    continue;
435                };
436
437                let unit = if metric.unit.is_empty() {
438                    None
439                } else {
440                    Some(Unit::Other(metric.unit.to_string()))
441                };
442
443                known_metric.encode(
444                    encoder.encode_descriptor(
445                        &metric.name,
446                        &metric.description,
447                        unit.as_ref(),
448                        known_metric.metric_type(),
449                    )?,
450                    labels.with_scope(Some(&scope_metrics.scope)),
451                )?;
452            }
453        }
454
455        Ok(())
456    }
457}
458
459fn scope_to_iter(scope: &InstrumentationScope) -> impl Iterator<Item = (&str, Cow<'_, str>)> {
460    [
461        ("otel.scope.name", Some(Cow::Borrowed(scope.name()))),
462        ("otel.scope.version", scope.version().map(Cow::Borrowed)),
463        ("otel.scope.schema_url", scope.schema_url().map(Cow::Borrowed)),
464    ]
465    .into_iter()
466    .chain(scope.attributes().map(|kv| (kv.key.as_str(), Some(kv.value.as_str()))))
467    .filter_map(|(key, value)| value.map(|v| (key, v)))
468}
469
470#[derive(Debug, Clone, Copy)]
471struct KeyValueEncoder<'a> {
472    resource: Option<&'a Resource>,
473    scope: Option<&'a InstrumentationScope>,
474    attrs: Option<&'a [KeyValue]>,
475    prometheus_full_utf8: bool,
476}
477
478impl<'a> KeyValueEncoder<'a> {
479    fn new(prometheus_full_utf8: bool) -> Self {
480        Self {
481            resource: None,
482            scope: None,
483            attrs: None,
484            prometheus_full_utf8,
485        }
486    }
487
488    pub fn with_resource(self, resource: Option<&'a Resource>) -> Self {
489        Self { resource, ..self }
490    }
491
492    pub fn with_scope(self, scope: Option<&'a InstrumentationScope>) -> Self {
493        Self { scope, ..self }
494    }
495
496    pub fn with_attrs(self, attrs: Option<&'a [KeyValue]>) -> Self {
497        Self { attrs, ..self }
498    }
499}
500
501fn escape_key(s: &str) -> Cow<'_, str> {
502    // prefix chars to add in case name starts with number
503    let mut prefix = "";
504
505    // Find first invalid char
506    if let Some((replace_idx, _)) = s.char_indices().find(|(i, c)| {
507        if *i == 0 && c.is_ascii_digit() {
508            // first char is number, add prefix and replace reset of chars
509            prefix = "_";
510            true
511        } else {
512            // keep checking
513            !c.is_alphanumeric() && *c != '_' && *c != ':'
514        }
515    }) {
516        // up to `replace_idx` have been validated, convert the rest
517        let (valid, rest) = s.split_at(replace_idx);
518        Cow::Owned(
519            prefix
520                .chars()
521                .chain(valid.chars())
522                .chain(rest.chars().map(|c| {
523                    if c.is_ascii_alphanumeric() || c == '_' || c == ':' {
524                        c
525                    } else {
526                        '_'
527                    }
528                }))
529                .collect(),
530        )
531    } else {
532        Cow::Borrowed(s) // no invalid chars found, return existing
533    }
534}
535
536impl prometheus_client::encoding::EncodeLabelSet for KeyValueEncoder<'_> {
537    fn encode(&self, mut encoder: prometheus_client::encoding::LabelSetEncoder) -> Result<(), std::fmt::Error> {
538        use std::fmt::Write;
539
540        fn write_kv(
541            encoder: &mut prometheus_client::encoding::LabelSetEncoder,
542            key: &str,
543            value: &str,
544            prometheus_full_utf8: bool,
545        ) -> Result<(), std::fmt::Error> {
546            let mut label = encoder.encode_label();
547            let mut key_encoder = label.encode_label_key()?;
548            if prometheus_full_utf8 {
549                // TODO(troy): I am not sure if this is correct.
550                // See: https://github.com/prometheus/client_rust/issues/251
551                write!(&mut key_encoder, "{}", key)?;
552            } else {
553                write!(&mut key_encoder, "{}", escape_key(key))?;
554            }
555
556            let mut value_encoder = key_encoder.encode_label_value()?;
557            write!(&mut value_encoder, "{}", value)?;
558
559            value_encoder.finish()
560        }
561
562        if let Some(resource) = self.resource {
563            for (key, value) in resource.iter() {
564                write_kv(&mut encoder, key.as_str(), value.as_str().as_ref(), self.prometheus_full_utf8)?;
565            }
566        }
567
568        if let Some(scope) = self.scope {
569            for (key, value) in scope_to_iter(scope) {
570                write_kv(&mut encoder, key, value.as_ref(), self.prometheus_full_utf8)?;
571            }
572        }
573
574        if let Some(attrs) = self.attrs {
575            for kv in attrs {
576                write_kv(
577                    &mut encoder,
578                    kv.key.as_str(),
579                    kv.value.as_str().as_ref(),
580                    self.prometheus_full_utf8,
581                )?;
582            }
583        }
584
585        Ok(())
586    }
587}