1use std::borrow::Cow;
2use std::sync::Arc;
3
4use opentelemetry::{otel_error, otel_warn, InstrumentationScope, KeyValue};
5use opentelemetry_sdk::metrics::data::{Gauge, Histogram, ResourceMetrics, Sum};
6use opentelemetry_sdk::metrics::reader::MetricReader;
7use opentelemetry_sdk::metrics::{ManualReader, ManualReaderBuilder};
8use opentelemetry_sdk::Resource;
9use prometheus_client::encoding::{EncodeCounterValue, EncodeGaugeValue, NoLabelSet};
10use prometheus_client::metrics::MetricType;
11use prometheus_client::registry::Unit;
12
13#[derive(Debug, Clone)]
27pub struct PrometheusExporter {
28 reader: Arc<ManualReader>,
29 prometheus_full_utf8: bool,
30}
31
32impl PrometheusExporter {
33 pub fn builder() -> PrometheusExporterBuilder {
35 PrometheusExporterBuilder::default()
36 }
37
38 pub fn collector(&self) -> Box<dyn prometheus_client::collector::Collector> {
41 Box::new(self.clone())
42 }
43}
44
45impl MetricReader for PrometheusExporter {
46 fn register_pipeline(&self, pipeline: std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>) {
47 self.reader.register_pipeline(pipeline)
48 }
49
50 fn collect(
51 &self,
52 rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
53 ) -> opentelemetry_sdk::metrics::MetricResult<()> {
54 self.reader.collect(rm)
55 }
56
57 fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult {
58 self.reader.force_flush()
59 }
60
61 fn shutdown(&self) -> opentelemetry_sdk::error::OTelSdkResult {
62 self.reader.shutdown()
63 }
64
65 fn temporality(&self, kind: opentelemetry_sdk::metrics::InstrumentKind) -> opentelemetry_sdk::metrics::Temporality {
66 self.reader.temporality(kind)
67 }
68}
69
70#[derive(Default)]
72pub struct PrometheusExporterBuilder {
73 reader: ManualReaderBuilder,
74 prometheus_full_utf8: bool,
75}
76
77impl PrometheusExporterBuilder {
78 pub fn with_temporality(mut self, temporality: opentelemetry_sdk::metrics::Temporality) -> Self {
80 self.reader = self.reader.with_temporality(temporality);
81 self
82 }
83
84 pub fn with_prometheus_full_utf8(mut self, prometheus_full_utf8: bool) -> Self {
89 self.prometheus_full_utf8 = prometheus_full_utf8;
90 self
91 }
92
93 pub fn build(self) -> PrometheusExporter {
95 PrometheusExporter {
96 reader: Arc::new(self.reader.build()),
97 prometheus_full_utf8: self.prometheus_full_utf8,
98 }
99 }
100}
101
102pub fn exporter() -> PrometheusExporterBuilder {
104 PrometheusExporter::builder()
105}
106
107#[derive(Debug, Clone, Copy)]
108enum RawNumber {
109 U64(u64),
110 I64(i64),
111 F64(f64),
112 #[cfg(feature = "extended-numbers")]
113 U32(u32),
114 #[cfg(feature = "extended-numbers")]
115 U16(u16),
116 #[cfg(feature = "extended-numbers")]
117 U8(u8),
118 #[cfg(feature = "extended-numbers")]
119 I32(i32),
120 #[cfg(feature = "extended-numbers")]
121 I16(i16),
122 #[cfg(feature = "extended-numbers")]
123 I8(i8),
124 #[cfg(feature = "extended-numbers")]
125 F32(f32),
126}
127
128impl RawNumber {
129 fn as_f64(&self) -> f64 {
130 match *self {
131 RawNumber::U64(value) => value as f64,
132 RawNumber::I64(value) => value as f64,
133 RawNumber::F64(value) => value,
134 #[cfg(feature = "extended-numbers")]
135 RawNumber::U32(value) => value as f64,
136 #[cfg(feature = "extended-numbers")]
137 RawNumber::U16(value) => value as f64,
138 #[cfg(feature = "extended-numbers")]
139 RawNumber::U8(value) => value as f64,
140 #[cfg(feature = "extended-numbers")]
141 RawNumber::I32(value) => value as f64,
142 #[cfg(feature = "extended-numbers")]
143 RawNumber::I16(value) => value as f64,
144 #[cfg(feature = "extended-numbers")]
145 RawNumber::I8(value) => value as f64,
146 #[cfg(feature = "extended-numbers")]
147 RawNumber::F32(value) => value as f64,
148 }
149 }
150}
151
152impl EncodeGaugeValue for RawNumber {
153 fn encode(&self, encoder: &mut prometheus_client::encoding::GaugeValueEncoder) -> Result<(), std::fmt::Error> {
154 match *self {
155 RawNumber::U64(value) => EncodeGaugeValue::encode(&(value as i64), encoder),
156 RawNumber::I64(value) => EncodeGaugeValue::encode(&value, encoder),
157 RawNumber::F64(value) => EncodeGaugeValue::encode(&value, encoder),
158 #[cfg(feature = "extended-numbers")]
159 RawNumber::U32(value) => EncodeGaugeValue::encode(&value, encoder),
160 #[cfg(feature = "extended-numbers")]
161 RawNumber::U16(value) => EncodeGaugeValue::encode(&(value as u32), encoder),
162 #[cfg(feature = "extended-numbers")]
163 RawNumber::U8(value) => EncodeGaugeValue::encode(&(value as u32), encoder),
164 #[cfg(feature = "extended-numbers")]
165 RawNumber::I32(value) => EncodeGaugeValue::encode(&(value as i64), encoder),
166 #[cfg(feature = "extended-numbers")]
167 RawNumber::I16(value) => EncodeGaugeValue::encode(&(value as i64), encoder),
168 #[cfg(feature = "extended-numbers")]
169 RawNumber::I8(value) => EncodeGaugeValue::encode(&(value as i64), encoder),
170 #[cfg(feature = "extended-numbers")]
171 RawNumber::F32(value) => EncodeGaugeValue::encode(&(value as f64), encoder),
172 }
173 }
174}
175
176impl EncodeCounterValue for RawNumber {
177 fn encode(&self, encoder: &mut prometheus_client::encoding::CounterValueEncoder) -> Result<(), std::fmt::Error> {
178 match *self {
179 RawNumber::U64(value) => EncodeCounterValue::encode(&value, encoder),
180 RawNumber::I64(value) => EncodeCounterValue::encode(&(value as f64), encoder),
181 RawNumber::F64(value) => EncodeCounterValue::encode(&value, encoder),
182 #[cfg(feature = "extended-numbers")]
183 RawNumber::U32(value) => EncodeCounterValue::encode(&(value as u64), encoder),
184 #[cfg(feature = "extended-numbers")]
185 RawNumber::U16(value) => EncodeCounterValue::encode(&(value as u64), encoder),
186 #[cfg(feature = "extended-numbers")]
187 RawNumber::U8(value) => EncodeCounterValue::encode(&(value as u64), encoder),
188 #[cfg(feature = "extended-numbers")]
189 RawNumber::I32(value) => EncodeCounterValue::encode(&(value as f64), encoder),
190 #[cfg(feature = "extended-numbers")]
191 RawNumber::I16(value) => EncodeCounterValue::encode(&(value as f64), encoder),
192 #[cfg(feature = "extended-numbers")]
193 RawNumber::I8(value) => EncodeCounterValue::encode(&(value as f64), encoder),
194 #[cfg(feature = "extended-numbers")]
195 RawNumber::F32(value) => EncodeCounterValue::encode(&(value as f64), encoder),
196 }
197 }
198}
199
200macro_rules! impl_raw_number {
201 ($t:ty, $variant:ident) => {
202 impl From<$t> for RawNumber {
203 fn from(value: $t) -> Self {
204 RawNumber::$variant(value)
205 }
206 }
207 };
208}
209
210impl_raw_number!(u64, U64);
211impl_raw_number!(i64, I64);
212impl_raw_number!(f64, F64);
213
214#[cfg(feature = "extended-numbers")]
215const _: () = {
216 impl_raw_number!(u32, U32);
217 impl_raw_number!(u16, U16);
218 impl_raw_number!(u8, U8);
219 impl_raw_number!(i32, I32);
220 impl_raw_number!(i16, I16);
221 impl_raw_number!(i8, I8);
222 impl_raw_number!(f32, F32);
223};
224
225enum KnownMetricT<'a, T> {
226 Gauge(&'a Gauge<T>),
227 Sum(&'a Sum<T>),
228 Histogram(&'a Histogram<T>),
229}
230
231impl<'a, T: 'static> KnownMetricT<'a, T>
232where
233 RawNumber: From<T>,
234 T: Copy,
235{
236 fn from_any(any: &'a dyn std::any::Any) -> Option<Self> {
237 if let Some(gauge) = any.downcast_ref::<Gauge<T>>() {
238 Some(KnownMetricT::Gauge(gauge))
239 } else if let Some(sum) = any.downcast_ref::<Sum<T>>() {
240 Some(KnownMetricT::Sum(sum))
241 } else {
242 any.downcast_ref::<Histogram<T>>()
243 .map(|histogram| KnownMetricT::Histogram(histogram))
244 }
245 }
246
247 fn metric_type(&self) -> MetricType {
248 match self {
249 KnownMetricT::Gauge(_) => MetricType::Gauge,
250 KnownMetricT::Sum(sum) => {
251 if sum.is_monotonic {
252 MetricType::Counter
253 } else {
254 MetricType::Gauge
255 }
256 }
257 KnownMetricT::Histogram(_) => MetricType::Histogram,
258 }
259 }
260
261 fn encode(
262 &self,
263 mut encoder: prometheus_client::encoding::MetricEncoder,
264 labels: KeyValueEncoder<'a>,
265 ) -> Result<(), std::fmt::Error> {
266 match self {
267 KnownMetricT::Gauge(gauge) => {
268 for data_point in &gauge.data_points {
269 let number = RawNumber::from(data_point.value);
270 encoder
271 .encode_family(&labels.with_attrs(Some(&data_point.attributes)))?
272 .encode_gauge(&number)?;
273 }
274 }
275 KnownMetricT::Sum(sum) => {
276 for data_point in &sum.data_points {
277 let number = RawNumber::from(data_point.value);
278 let attrs = labels.with_attrs(Some(&data_point.attributes));
279 let mut encoder = encoder.encode_family(&attrs)?;
280
281 if sum.is_monotonic {
282 encoder.encode_counter::<NoLabelSet, _, f64>(&number, None)?;
284 } else {
285 encoder.encode_gauge(&number)?;
286 }
287 }
288 }
289 KnownMetricT::Histogram(histogram) => {
290 for data_point in &histogram.data_points {
291 let attrs = labels.with_attrs(Some(&data_point.attributes));
292 let mut encoder = encoder.encode_family(&attrs)?;
293
294 let sum = RawNumber::from(data_point.sum);
295
296 let buckets = data_point
297 .bounds
298 .iter()
299 .copied()
300 .zip(data_point.bucket_counts.iter().copied())
301 .collect::<Vec<_>>();
302
303 encoder.encode_histogram::<NoLabelSet>(sum.as_f64(), data_point.count, &buckets, None)?;
304 }
305 }
306 }
307
308 Ok(())
309 }
310}
311
312enum KnownMetric<'a> {
313 U64(KnownMetricT<'a, u64>),
314 I64(KnownMetricT<'a, i64>),
315 F64(KnownMetricT<'a, f64>),
316 #[cfg(feature = "extended-numbers")]
317 U32(KnownMetricT<'a, u32>),
318 #[cfg(feature = "extended-numbers")]
319 U16(KnownMetricT<'a, u16>),
320 #[cfg(feature = "extended-numbers")]
321 U8(KnownMetricT<'a, u8>),
322 #[cfg(feature = "extended-numbers")]
323 I32(KnownMetricT<'a, i32>),
324 #[cfg(feature = "extended-numbers")]
325 I16(KnownMetricT<'a, i16>),
326 #[cfg(feature = "extended-numbers")]
327 I8(KnownMetricT<'a, i8>),
328 #[cfg(feature = "extended-numbers")]
329 F32(KnownMetricT<'a, f32>),
330}
331
332impl<'a> KnownMetric<'a> {
333 fn from_any(any: &'a dyn std::any::Any) -> Option<Self> {
334 macro_rules! try_decode {
335 ($t:ty, $variant:ident) => {
336 if let Some(metric) = KnownMetricT::<$t>::from_any(any) {
337 return Some(KnownMetric::$variant(metric));
338 }
339 };
340 }
341
342 try_decode!(u64, U64);
343 try_decode!(i64, I64);
344 try_decode!(f64, F64);
345 #[cfg(feature = "extended-numbers")]
346 try_decode!(u32, U32);
347 #[cfg(feature = "extended-numbers")]
348 try_decode!(i32, I32);
349 #[cfg(feature = "extended-numbers")]
350 try_decode!(u16, U16);
351 #[cfg(feature = "extended-numbers")]
352 try_decode!(i16, I16);
353 #[cfg(feature = "extended-numbers")]
354 try_decode!(u8, U8);
355 #[cfg(feature = "extended-numbers")]
356 try_decode!(i8, I8);
357 #[cfg(feature = "extended-numbers")]
358 try_decode!(f32, F32);
359
360 None
361 }
362
363 fn metric_type(&self) -> MetricType {
364 match self {
365 KnownMetric::U64(metric) => metric.metric_type(),
366 KnownMetric::I64(metric) => metric.metric_type(),
367 KnownMetric::F64(metric) => metric.metric_type(),
368 #[cfg(feature = "extended-numbers")]
369 KnownMetric::U32(metric) => metric.metric_type(),
370 #[cfg(feature = "extended-numbers")]
371 KnownMetric::I32(metric) => metric.metric_type(),
372 #[cfg(feature = "extended-numbers")]
373 KnownMetric::U16(metric) => metric.metric_type(),
374 #[cfg(feature = "extended-numbers")]
375 KnownMetric::I16(metric) => metric.metric_type(),
376 #[cfg(feature = "extended-numbers")]
377 KnownMetric::U8(metric) => metric.metric_type(),
378 #[cfg(feature = "extended-numbers")]
379 KnownMetric::I8(metric) => metric.metric_type(),
380 #[cfg(feature = "extended-numbers")]
381 KnownMetric::F32(metric) => metric.metric_type(),
382 }
383 }
384
385 fn encode(
386 &self,
387 encoder: prometheus_client::encoding::MetricEncoder,
388 labels: KeyValueEncoder<'a>,
389 ) -> Result<(), std::fmt::Error> {
390 match self {
391 KnownMetric::U64(metric) => metric.encode(encoder, labels),
392 KnownMetric::I64(metric) => metric.encode(encoder, labels),
393 KnownMetric::F64(metric) => metric.encode(encoder, labels),
394 #[cfg(feature = "extended-numbers")]
395 KnownMetric::U32(metric) => metric.encode(encoder, labels),
396 #[cfg(feature = "extended-numbers")]
397 KnownMetric::I32(metric) => metric.encode(encoder, labels),
398 #[cfg(feature = "extended-numbers")]
399 KnownMetric::U16(metric) => metric.encode(encoder, labels),
400 #[cfg(feature = "extended-numbers")]
401 KnownMetric::I16(metric) => metric.encode(encoder, labels),
402 #[cfg(feature = "extended-numbers")]
403 KnownMetric::U8(metric) => metric.encode(encoder, labels),
404 #[cfg(feature = "extended-numbers")]
405 KnownMetric::I8(metric) => metric.encode(encoder, labels),
406 #[cfg(feature = "extended-numbers")]
407 KnownMetric::F32(metric) => metric.encode(encoder, labels),
408 }
409 }
410}
411
412impl prometheus_client::collector::Collector for PrometheusExporter {
413 fn encode(&self, mut encoder: prometheus_client::encoding::DescriptorEncoder) -> Result<(), std::fmt::Error> {
414 let mut metrics = ResourceMetrics {
415 resource: Resource::builder_empty().build(),
416 scope_metrics: vec![],
417 };
418
419 if let Err(err) = self.reader.collect(&mut metrics) {
420 otel_error!(name: "prometheus_collector_collect_error", error = err.to_string());
421 return Err(std::fmt::Error);
422 }
423
424 let labels = KeyValueEncoder::new(self.prometheus_full_utf8);
425
426 encoder
427 .encode_descriptor("target", "Information about the target", None, MetricType::Info)?
428 .encode_info(&labels.with_resource(Some(&metrics.resource)))?;
429
430 for scope_metrics in &metrics.scope_metrics {
431 for metric in &scope_metrics.metrics {
432 let Some(known_metric) = KnownMetric::from_any(metric.data.as_any()) else {
433 otel_warn!(name: "prometheus_collector_unknown_metric_type", metric_name = metric.name.as_ref());
434 continue;
435 };
436
437 let unit = if metric.unit.is_empty() {
438 None
439 } else {
440 Some(Unit::Other(metric.unit.to_string()))
441 };
442
443 known_metric.encode(
444 encoder.encode_descriptor(
445 &metric.name,
446 &metric.description,
447 unit.as_ref(),
448 known_metric.metric_type(),
449 )?,
450 labels.with_scope(Some(&scope_metrics.scope)),
451 )?;
452 }
453 }
454
455 Ok(())
456 }
457}
458
459fn scope_to_iter(scope: &InstrumentationScope) -> impl Iterator<Item = (&str, Cow<'_, str>)> {
460 [
461 ("otel.scope.name", Some(Cow::Borrowed(scope.name()))),
462 ("otel.scope.version", scope.version().map(Cow::Borrowed)),
463 ("otel.scope.schema_url", scope.schema_url().map(Cow::Borrowed)),
464 ]
465 .into_iter()
466 .chain(scope.attributes().map(|kv| (kv.key.as_str(), Some(kv.value.as_str()))))
467 .filter_map(|(key, value)| value.map(|v| (key, v)))
468}
469
470#[derive(Debug, Clone, Copy)]
471struct KeyValueEncoder<'a> {
472 resource: Option<&'a Resource>,
473 scope: Option<&'a InstrumentationScope>,
474 attrs: Option<&'a [KeyValue]>,
475 prometheus_full_utf8: bool,
476}
477
478impl<'a> KeyValueEncoder<'a> {
479 fn new(prometheus_full_utf8: bool) -> Self {
480 Self {
481 resource: None,
482 scope: None,
483 attrs: None,
484 prometheus_full_utf8,
485 }
486 }
487
488 pub fn with_resource(self, resource: Option<&'a Resource>) -> Self {
489 Self { resource, ..self }
490 }
491
492 pub fn with_scope(self, scope: Option<&'a InstrumentationScope>) -> Self {
493 Self { scope, ..self }
494 }
495
496 pub fn with_attrs(self, attrs: Option<&'a [KeyValue]>) -> Self {
497 Self { attrs, ..self }
498 }
499}
500
501fn escape_key(s: &str) -> Cow<'_, str> {
502 let mut prefix = "";
504
505 if let Some((replace_idx, _)) = s.char_indices().find(|(i, c)| {
507 if *i == 0 && c.is_ascii_digit() {
508 prefix = "_";
510 true
511 } else {
512 !c.is_alphanumeric() && *c != '_' && *c != ':'
514 }
515 }) {
516 let (valid, rest) = s.split_at(replace_idx);
518 Cow::Owned(
519 prefix
520 .chars()
521 .chain(valid.chars())
522 .chain(rest.chars().map(|c| {
523 if c.is_ascii_alphanumeric() || c == '_' || c == ':' {
524 c
525 } else {
526 '_'
527 }
528 }))
529 .collect(),
530 )
531 } else {
532 Cow::Borrowed(s) }
534}
535
536impl prometheus_client::encoding::EncodeLabelSet for KeyValueEncoder<'_> {
537 fn encode(&self, mut encoder: prometheus_client::encoding::LabelSetEncoder) -> Result<(), std::fmt::Error> {
538 use std::fmt::Write;
539
540 fn write_kv(
541 encoder: &mut prometheus_client::encoding::LabelSetEncoder,
542 key: &str,
543 value: &str,
544 prometheus_full_utf8: bool,
545 ) -> Result<(), std::fmt::Error> {
546 let mut label = encoder.encode_label();
547 let mut key_encoder = label.encode_label_key()?;
548 if prometheus_full_utf8 {
549 write!(&mut key_encoder, "{}", key)?;
552 } else {
553 write!(&mut key_encoder, "{}", escape_key(key))?;
554 }
555
556 let mut value_encoder = key_encoder.encode_label_value()?;
557 write!(&mut value_encoder, "{}", value)?;
558
559 value_encoder.finish()
560 }
561
562 if let Some(resource) = self.resource {
563 for (key, value) in resource.iter() {
564 write_kv(&mut encoder, key.as_str(), value.as_str().as_ref(), self.prometheus_full_utf8)?;
565 }
566 }
567
568 if let Some(scope) = self.scope {
569 for (key, value) in scope_to_iter(scope) {
570 write_kv(&mut encoder, key, value.as_ref(), self.prometheus_full_utf8)?;
571 }
572 }
573
574 if let Some(attrs) = self.attrs {
575 for kv in attrs {
576 write_kv(
577 &mut encoder,
578 kv.key.as_str(),
579 kv.value.as_str().as_ref(),
580 self.prometheus_full_utf8,
581 )?;
582 }
583 }
584
585 Ok(())
586 }
587}