aws_smithy_observability_otel/
meter.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! OpenTelemetry based implementations of the Smithy Observability Meter traits.
7
8use std::fmt::Debug;
9use std::ops::Deref;
10use std::sync::Arc;
11
12use crate::attributes::kv_from_option_attr;
13use aws_smithy_observability::instruments::{
14    AsyncInstrumentBuilder, AsyncMeasure, Histogram, InstrumentBuilder, MonotonicCounter,
15    ProvideInstrument, UpDownCounter,
16};
17pub use aws_smithy_observability::meter::{Meter, ProvideMeter};
18
19use aws_smithy_observability::{Attributes, Context, ErrorKind, ObservabilityError};
20use opentelemetry::metrics::{
21    AsyncInstrument as OtelAsyncInstrument, Counter as OtelCounter, Histogram as OtelHistogram,
22    Meter as OtelMeter, MeterProvider as OtelMeterProviderTrait,
23    ObservableCounter as OtelObservableCounter, ObservableGauge as OtelObservableGauge,
24    ObservableUpDownCounter as OtelObservableUpDownCounter, UpDownCounter as OtelUpDownCounter,
25};
26use opentelemetry_sdk::metrics::SdkMeterProvider as OtelSdkMeterProvider;
27
28#[derive(Debug)]
29struct UpDownCounterWrap(OtelUpDownCounter<i64>);
30impl UpDownCounter for UpDownCounterWrap {
31    fn add(&self, value: i64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
32        self.0.add(value, &kv_from_option_attr(attributes));
33    }
34}
35
36#[derive(Debug)]
37struct HistogramWrap(OtelHistogram<f64>);
38impl Histogram for HistogramWrap {
39    fn record(&self, value: f64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
40        self.0.record(value, &kv_from_option_attr(attributes));
41    }
42}
43
44#[derive(Debug)]
45struct MonotonicCounterWrap(OtelCounter<u64>);
46impl MonotonicCounter for MonotonicCounterWrap {
47    fn add(&self, value: u64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
48        self.0.add(value, &kv_from_option_attr(attributes));
49    }
50}
51
52#[derive(Debug)]
53struct GaugeWrap(OtelObservableGauge<f64>);
54impl AsyncMeasure for GaugeWrap {
55    type Value = f64;
56
57    fn record(
58        &self,
59        value: Self::Value,
60        attributes: Option<&Attributes>,
61        _context: Option<&dyn Context>,
62    ) {
63        self.0.observe(value, &kv_from_option_attr(attributes));
64    }
65
66    // OTel rust does not currently support unregistering callbacks
67    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
68    fn stop(&self) {}
69}
70
71#[derive(Debug)]
72struct AsyncUpDownCounterWrap(OtelObservableUpDownCounter<i64>);
73impl AsyncMeasure for AsyncUpDownCounterWrap {
74    type Value = i64;
75
76    fn record(
77        &self,
78        value: Self::Value,
79        attributes: Option<&Attributes>,
80        _context: Option<&dyn Context>,
81    ) {
82        self.0.observe(value, &kv_from_option_attr(attributes));
83    }
84
85    // OTel rust does not currently support unregistering callbacks
86    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
87    fn stop(&self) {}
88}
89
90#[derive(Debug)]
91struct AsyncMonotonicCounterWrap(OtelObservableCounter<u64>);
92impl AsyncMeasure for AsyncMonotonicCounterWrap {
93    type Value = u64;
94
95    fn record(
96        &self,
97        value: Self::Value,
98        attributes: Option<&Attributes>,
99        _context: Option<&dyn Context>,
100    ) {
101        self.0.observe(value, &kv_from_option_attr(attributes));
102    }
103
104    // OTel rust does not currently support unregistering callbacks
105    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
106    fn stop(&self) {}
107}
108
109struct AsyncInstrumentWrap<'a, T>(&'a (dyn OtelAsyncInstrument<T> + Send + Sync));
110impl<T> AsyncMeasure for AsyncInstrumentWrap<'_, T> {
111    type Value = T;
112
113    fn record(
114        &self,
115        value: Self::Value,
116        attributes: Option<&Attributes>,
117        _context: Option<&dyn Context>,
118    ) {
119        self.0.observe(value, &kv_from_option_attr(attributes));
120    }
121
122    // OTel rust does not currently support unregistering callbacks
123    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
124    fn stop(&self) {}
125}
126
127// The OtelAsyncInstrument trait does not have Debug as a supertrait, so we impl a minimal version
128// for our wrapper struct
129impl<T> Debug for AsyncInstrumentWrap<'_, T> {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        f.debug_tuple("AsyncInstrumentWrap").finish()
132    }
133}
134
135#[derive(Debug)]
136struct MeterWrap(OtelMeter);
137impl Deref for MeterWrap {
138    type Target = OtelMeter;
139
140    fn deref(&self) -> &Self::Target {
141        &self.0
142    }
143}
144
145impl ProvideInstrument for MeterWrap {
146    fn create_gauge(
147        &self,
148        builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = f64>>, f64>,
149    ) -> Arc<dyn AsyncMeasure<Value = f64>> {
150        let mut otel_builder = self.f64_observable_gauge(builder.get_name().clone());
151
152        if let Some(desc) = builder.get_description() {
153            otel_builder = otel_builder.with_description(desc.clone());
154        }
155
156        if let Some(u) = builder.get_units() {
157            otel_builder = otel_builder.with_unit(u.clone());
158        }
159
160        otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<f64>| {
161            let f = builder.callback.clone();
162            f(&AsyncInstrumentWrap(input));
163        });
164
165        Arc::new(GaugeWrap(otel_builder.init()))
166    }
167
168    fn create_up_down_counter(
169        &self,
170        builder: InstrumentBuilder<'_, Arc<dyn UpDownCounter>>,
171    ) -> Arc<dyn UpDownCounter> {
172        let mut otel_builder = self.i64_up_down_counter(builder.get_name().clone());
173        if let Some(desc) = builder.get_description() {
174            otel_builder = otel_builder.with_description(desc.clone());
175        }
176
177        if let Some(u) = builder.get_units() {
178            otel_builder = otel_builder.with_unit(u.clone());
179        }
180
181        Arc::new(UpDownCounterWrap(otel_builder.init()))
182    }
183
184    fn create_async_up_down_counter(
185        &self,
186        builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = i64>>, i64>,
187    ) -> Arc<dyn AsyncMeasure<Value = i64>> {
188        let mut otel_builder = self.i64_observable_up_down_counter(builder.get_name().clone());
189
190        if let Some(desc) = builder.get_description() {
191            otel_builder = otel_builder.with_description(desc.clone());
192        }
193
194        if let Some(u) = builder.get_units() {
195            otel_builder = otel_builder.with_unit(u.clone());
196        }
197
198        otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<i64>| {
199            let f = builder.callback.clone();
200            f(&AsyncInstrumentWrap(input));
201        });
202
203        Arc::new(AsyncUpDownCounterWrap(otel_builder.init()))
204    }
205
206    fn create_monotonic_counter(
207        &self,
208        builder: InstrumentBuilder<'_, Arc<dyn MonotonicCounter>>,
209    ) -> Arc<dyn MonotonicCounter> {
210        let mut otel_builder = self.u64_counter(builder.get_name().clone());
211        if let Some(desc) = builder.get_description() {
212            otel_builder = otel_builder.with_description(desc.clone());
213        }
214
215        if let Some(u) = builder.get_units() {
216            otel_builder = otel_builder.with_unit(u.clone());
217        }
218
219        Arc::new(MonotonicCounterWrap(otel_builder.init()))
220    }
221
222    fn create_async_monotonic_counter(
223        &self,
224        builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = u64>>, u64>,
225    ) -> Arc<dyn AsyncMeasure<Value = u64>> {
226        let mut otel_builder = self.u64_observable_counter(builder.get_name().clone());
227
228        if let Some(desc) = builder.get_description() {
229            otel_builder = otel_builder.with_description(desc.clone());
230        }
231
232        if let Some(u) = builder.get_units() {
233            otel_builder = otel_builder.with_unit(u.clone());
234        }
235
236        otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<u64>| {
237            let f = builder.callback.clone();
238            f(&AsyncInstrumentWrap(input));
239        });
240
241        Arc::new(AsyncMonotonicCounterWrap(otel_builder.init()))
242    }
243
244    fn create_histogram(
245        &self,
246        builder: InstrumentBuilder<'_, Arc<dyn Histogram>>,
247    ) -> Arc<dyn Histogram> {
248        let mut otel_builder = self.f64_histogram(builder.get_name().clone());
249        if let Some(desc) = builder.get_description() {
250            otel_builder = otel_builder.with_description(desc.clone());
251        }
252
253        if let Some(u) = builder.get_units() {
254            otel_builder = otel_builder.with_unit(u.clone());
255        }
256
257        Arc::new(HistogramWrap(otel_builder.init()))
258    }
259}
260
261/// An OpenTelemetry based implementation of the AWS SDK's [ProvideMeter] trait
262#[non_exhaustive]
263#[derive(Debug)]
264pub struct OtelMeterProvider {
265    meter_provider: OtelSdkMeterProvider,
266}
267
268impl OtelMeterProvider {
269    /// Create a new [OtelMeterProvider] from an [OtelSdkMeterProvider].
270    pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self {
271        Self {
272            meter_provider: otel_meter_provider,
273        }
274    }
275
276    /// Flush the metric pipeline.
277    pub fn flush(&self) -> Result<(), ObservabilityError> {
278        match self.meter_provider.force_flush() {
279            Ok(_) => Ok(()),
280            Err(err) => Err(ObservabilityError::new(ErrorKind::Other, err)),
281        }
282    }
283}
284
285impl ProvideMeter for OtelMeterProvider {
286    fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Meter {
287        Meter::new(Arc::new(MeterWrap(self.meter_provider.meter(scope))))
288    }
289
290    fn as_any(&self) -> &dyn std::any::Any {
291        self
292    }
293
294    fn provider_name(&self) -> &'static str {
295        "otel"
296    }
297}
298
299#[cfg(test)]
300mod tests {
301
302    use std::sync::Arc;
303
304    use aws_smithy_observability::instruments::AsyncMeasure;
305    use aws_smithy_observability::{AttributeValue, Attributes, TelemetryProvider};
306    use opentelemetry_sdk::metrics::{
307        data::{Gauge, Histogram, Sum},
308        PeriodicReader, SdkMeterProvider,
309    };
310    use opentelemetry_sdk::runtime::Tokio;
311    use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
312
313    use super::OtelMeterProvider;
314
315    // Without these tokio settings this test just stalls forever on flushing the metrics pipeline
316    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
317    async fn sync_instruments_work() {
318        // Create the OTel metrics objects
319        let exporter = InMemoryMetricsExporter::default();
320        let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
321        let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
322
323        // Create the SDK metrics types from the OTel objects
324        let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
325        let sdk_ref = sdk_mp.clone();
326        let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
327
328        // Get the dyn versions of the SDK metrics objects
329        let dyn_sdk_mp = sdk_tp.meter_provider();
330        let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
331
332        //Create all 3 sync instruments and record some data for each
333        let mono_counter = dyn_sdk_meter
334            .create_monotonic_counter("TestMonoCounter")
335            .build();
336        mono_counter.add(4, None, None);
337        let ud_counter = dyn_sdk_meter
338            .create_up_down_counter("TestUpDownCounter")
339            .build();
340        ud_counter.add(-6, None, None);
341        let histogram = dyn_sdk_meter.create_histogram("TestHistogram").build();
342        histogram.record(1.234, None, None);
343
344        // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
345        sdk_ref.flush().unwrap();
346
347        // Extract the metrics from the exporter and assert that they are what we expect
348        let finished_metrics = exporter.get_finished_metrics().unwrap();
349        let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
350            .data
351            .as_any()
352            .downcast_ref::<Sum<u64>>()
353            .unwrap()
354            .data_points[0]
355            .value;
356        assert_eq!(extracted_mono_counter_data, &4);
357
358        let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
359            .data
360            .as_any()
361            .downcast_ref::<Sum<i64>>()
362            .unwrap()
363            .data_points[0]
364            .value;
365        assert_eq!(extracted_ud_counter_data, &-6);
366
367        let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2]
368            .data
369            .as_any()
370            .downcast_ref::<Histogram<f64>>()
371            .unwrap()
372            .data_points[0]
373            .sum;
374        assert_eq!(extracted_histogram_data, &1.234);
375    }
376
377    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
378    async fn async_instrument_work() {
379        // Create the OTel metrics objects
380        let exporter = InMemoryMetricsExporter::default();
381        let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
382        let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
383
384        // Create the SDK metrics types from the OTel objects
385        let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
386        let sdk_ref = sdk_mp.clone();
387        let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
388
389        // Get the dyn versions of the SDK metrics objects
390        let dyn_sdk_mp = sdk_tp.meter_provider();
391        let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
392
393        //Create all async instruments and record some data
394        let gauge = dyn_sdk_meter
395            .create_gauge(
396                "TestGauge".to_string(),
397                // Callback function records another value with different attributes so it is deduped
398                |measurement: &dyn AsyncMeasure<Value = f64>| {
399                    let mut attrs = Attributes::new();
400                    attrs.set(
401                        "TestGaugeAttr",
402                        AttributeValue::String("TestGaugeAttr".into()),
403                    );
404                    measurement.record(6.789, Some(&attrs), None);
405                },
406            )
407            .build();
408        gauge.record(1.234, None, None);
409
410        let async_ud_counter = dyn_sdk_meter
411            .create_async_up_down_counter(
412                "TestAsyncUpDownCounter".to_string(),
413                |measurement: &dyn AsyncMeasure<Value = i64>| {
414                    let mut attrs = Attributes::new();
415                    attrs.set(
416                        "TestAsyncUpDownCounterAttr",
417                        AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
418                    );
419                    measurement.record(12, Some(&attrs), None);
420                },
421            )
422            .build();
423        async_ud_counter.record(-6, None, None);
424
425        let async_mono_counter = dyn_sdk_meter
426            .create_async_monotonic_counter(
427                "TestAsyncMonoCounter".to_string(),
428                |measurement: &dyn AsyncMeasure<Value = u64>| {
429                    let mut attrs = Attributes::new();
430                    attrs.set(
431                        "TestAsyncMonoCounterAttr",
432                        AttributeValue::String("TestAsyncMonoCounterAttr".into()),
433                    );
434                    measurement.record(123, Some(&attrs), None);
435                },
436            )
437            .build();
438        async_mono_counter.record(4, None, None);
439
440        // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
441        sdk_ref.flush().unwrap();
442
443        // Extract the metrics from the exporter
444        let finished_metrics = exporter.get_finished_metrics().unwrap();
445
446        // Assert that the reported metrics are what we expect
447        let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
448            .data
449            .as_any()
450            .downcast_ref::<Gauge<f64>>()
451            .unwrap()
452            .data_points[0]
453            .value;
454        assert_eq!(extracted_gauge_data, &1.234);
455
456        let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
457            .data
458            .as_any()
459            .downcast_ref::<Sum<i64>>()
460            .unwrap()
461            .data_points[0]
462            .value;
463        assert_eq!(extracted_async_ud_counter_data, &-6);
464
465        let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
466            .data
467            .as_any()
468            .downcast_ref::<Sum<u64>>()
469            .unwrap()
470            .data_points[0]
471            .value;
472        assert_eq!(extracted_async_mono_data, &4);
473
474        // Assert that the async callbacks ran
475        let finished_metrics = exporter.get_finished_metrics().unwrap();
476        let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
477            .data
478            .as_any()
479            .downcast_ref::<Gauge<f64>>()
480            .unwrap()
481            .data_points[1]
482            .value;
483        assert_eq!(extracted_gauge_data, &6.789);
484
485        let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
486            .data
487            .as_any()
488            .downcast_ref::<Sum<i64>>()
489            .unwrap()
490            .data_points[1]
491            .value;
492        assert_eq!(extracted_async_ud_counter_data, &12);
493
494        let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
495            .data
496            .as_any()
497            .downcast_ref::<Sum<u64>>()
498            .unwrap()
499            .data_points[1]
500            .value;
501        assert_eq!(extracted_async_mono_data, &123);
502    }
503}