aws_smithy_observability_otel/
meter.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! OpenTelemetry based implementations of the Smithy Observability Meter traits.
7
8use std::fmt::Debug;
9use std::ops::Deref;
10use std::sync::Arc;
11
12use crate::attributes::kv_from_option_attr;
13use aws_smithy_observability::instruments::{
14    AsyncInstrumentBuilder, AsyncMeasure, Histogram, InstrumentBuilder, MonotonicCounter,
15    ProvideInstrument, UpDownCounter,
16};
17pub use aws_smithy_observability::meter::{Meter, ProvideMeter};
18
19use aws_smithy_observability::{Attributes, Context, ErrorKind, ObservabilityError};
20use opentelemetry::metrics::{
21    AsyncInstrument as OtelAsyncInstrument, Counter as OtelCounter, Histogram as OtelHistogram,
22    Meter as OtelMeter, MeterProvider as OtelMeterProviderTrait,
23    ObservableCounter as OtelObservableCounter, ObservableGauge as OtelObservableGauge,
24    ObservableUpDownCounter as OtelObservableUpDownCounter, UpDownCounter as OtelUpDownCounter,
25};
26use opentelemetry_sdk::metrics::SdkMeterProvider as OtelSdkMeterProvider;
27
28#[derive(Debug)]
29struct UpDownCounterWrap(OtelUpDownCounter<i64>);
30impl UpDownCounter for UpDownCounterWrap {
31    fn add(&self, value: i64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
32        self.0.add(value, &kv_from_option_attr(attributes));
33    }
34}
35
36#[derive(Debug)]
37struct HistogramWrap(OtelHistogram<f64>);
38impl Histogram for HistogramWrap {
39    fn record(&self, value: f64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
40        self.0.record(value, &kv_from_option_attr(attributes));
41    }
42}
43
44#[derive(Debug)]
45struct MonotonicCounterWrap(OtelCounter<u64>);
46impl MonotonicCounter for MonotonicCounterWrap {
47    fn add(&self, value: u64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
48        self.0.add(value, &kv_from_option_attr(attributes));
49    }
50}
51
52#[derive(Debug)]
53struct GaugeWrap(OtelObservableGauge<f64>);
54impl AsyncMeasure for GaugeWrap {
55    type Value = f64;
56
57    fn record(
58        &self,
59        value: Self::Value,
60        attributes: Option<&Attributes>,
61        _context: Option<&dyn Context>,
62    ) {
63        self.0.observe(value, &kv_from_option_attr(attributes));
64    }
65
66    // OTel rust does not currently support unregistering callbacks
67    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
68    fn stop(&self) {}
69}
70
71#[derive(Debug)]
72struct AsyncUpDownCounterWrap(OtelObservableUpDownCounter<i64>);
73impl AsyncMeasure for AsyncUpDownCounterWrap {
74    type Value = i64;
75
76    fn record(
77        &self,
78        value: Self::Value,
79        attributes: Option<&Attributes>,
80        _context: Option<&dyn Context>,
81    ) {
82        self.0.observe(value, &kv_from_option_attr(attributes));
83    }
84
85    // OTel rust does not currently support unregistering callbacks
86    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
87    fn stop(&self) {}
88}
89
90#[derive(Debug)]
91struct AsyncMonotonicCounterWrap(OtelObservableCounter<u64>);
92impl AsyncMeasure for AsyncMonotonicCounterWrap {
93    type Value = u64;
94
95    fn record(
96        &self,
97        value: Self::Value,
98        attributes: Option<&Attributes>,
99        _context: Option<&dyn Context>,
100    ) {
101        self.0.observe(value, &kv_from_option_attr(attributes));
102    }
103
104    // OTel rust does not currently support unregistering callbacks
105    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
106    fn stop(&self) {}
107}
108
109struct AsyncInstrumentWrap<'a, T>(&'a (dyn OtelAsyncInstrument<T> + Send + Sync));
110impl<T> AsyncMeasure for AsyncInstrumentWrap<'_, T> {
111    type Value = T;
112
113    fn record(
114        &self,
115        value: Self::Value,
116        attributes: Option<&Attributes>,
117        _context: Option<&dyn Context>,
118    ) {
119        self.0.observe(value, &kv_from_option_attr(attributes));
120    }
121
122    // OTel rust does not currently support unregistering callbacks
123    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
124    fn stop(&self) {}
125}
126
127// The OtelAsyncInstrument trait does not have Debug as a supertrait, so we impl a minimal version
128// for our wrapper struct
129impl<T> Debug for AsyncInstrumentWrap<'_, T> {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        f.debug_tuple("AsyncInstrumentWrap").finish()
132    }
133}
134
135#[derive(Debug)]
136struct MeterWrap(OtelMeter);
137impl Deref for MeterWrap {
138    type Target = OtelMeter;
139
140    fn deref(&self) -> &Self::Target {
141        &self.0
142    }
143}
144
145impl ProvideInstrument for MeterWrap {
146    fn create_gauge(
147        &self,
148        builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = f64>>, f64>,
149    ) -> Arc<dyn AsyncMeasure<Value = f64>> {
150        let mut otel_builder = self.f64_observable_gauge(builder.get_name().clone());
151
152        if let Some(desc) = builder.get_description() {
153            otel_builder = otel_builder.with_description(desc.clone());
154        }
155
156        if let Some(u) = builder.get_units() {
157            otel_builder = otel_builder.with_unit(u.clone());
158        }
159
160        otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<f64>| {
161            let f = builder.callback.clone();
162            f(&AsyncInstrumentWrap(input));
163        });
164
165        Arc::new(GaugeWrap(otel_builder.init()))
166    }
167
168    fn create_up_down_counter(
169        &self,
170        builder: InstrumentBuilder<'_, Arc<dyn UpDownCounter>>,
171    ) -> Arc<dyn UpDownCounter> {
172        let mut otel_builder = self.i64_up_down_counter(builder.get_name().clone());
173        if let Some(desc) = builder.get_description() {
174            otel_builder = otel_builder.with_description(desc.clone());
175        }
176
177        if let Some(u) = builder.get_units() {
178            otel_builder = otel_builder.with_unit(u.clone());
179        }
180
181        Arc::new(UpDownCounterWrap(otel_builder.init()))
182    }
183
184    fn create_async_up_down_counter(
185        &self,
186        builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = i64>>, i64>,
187    ) -> Arc<dyn AsyncMeasure<Value = i64>> {
188        let mut otel_builder = self.i64_observable_up_down_counter(builder.get_name().clone());
189
190        if let Some(desc) = builder.get_description() {
191            otel_builder = otel_builder.with_description(desc.clone());
192        }
193
194        if let Some(u) = builder.get_units() {
195            otel_builder = otel_builder.with_unit(u.clone());
196        }
197
198        otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<i64>| {
199            let f = builder.callback.clone();
200            f(&AsyncInstrumentWrap(input));
201        });
202
203        Arc::new(AsyncUpDownCounterWrap(otel_builder.init()))
204    }
205
206    fn create_monotonic_counter(
207        &self,
208        builder: InstrumentBuilder<'_, Arc<dyn MonotonicCounter>>,
209    ) -> Arc<dyn MonotonicCounter> {
210        let mut otel_builder = self.u64_counter(builder.get_name().clone());
211        if let Some(desc) = builder.get_description() {
212            otel_builder = otel_builder.with_description(desc.clone());
213        }
214
215        if let Some(u) = builder.get_units() {
216            otel_builder = otel_builder.with_unit(u.clone());
217        }
218
219        Arc::new(MonotonicCounterWrap(otel_builder.init()))
220    }
221
222    fn create_async_monotonic_counter(
223        &self,
224        builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = u64>>, u64>,
225    ) -> Arc<dyn AsyncMeasure<Value = u64>> {
226        let mut otel_builder = self.u64_observable_counter(builder.get_name().clone());
227
228        if let Some(desc) = builder.get_description() {
229            otel_builder = otel_builder.with_description(desc.clone());
230        }
231
232        if let Some(u) = builder.get_units() {
233            otel_builder = otel_builder.with_unit(u.clone());
234        }
235
236        otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<u64>| {
237            let f = builder.callback.clone();
238            f(&AsyncInstrumentWrap(input));
239        });
240
241        Arc::new(AsyncMonotonicCounterWrap(otel_builder.init()))
242    }
243
244    fn create_histogram(
245        &self,
246        builder: InstrumentBuilder<'_, Arc<dyn Histogram>>,
247    ) -> Arc<dyn Histogram> {
248        let mut otel_builder = self.f64_histogram(builder.get_name().clone());
249        if let Some(desc) = builder.get_description() {
250            otel_builder = otel_builder.with_description(desc.clone());
251        }
252
253        if let Some(u) = builder.get_units() {
254            otel_builder = otel_builder.with_unit(u.clone());
255        }
256
257        Arc::new(HistogramWrap(otel_builder.init()))
258    }
259}
260
261/// An OpenTelemetry based implementation of the AWS SDK's [ProvideMeter] trait
262#[non_exhaustive]
263#[derive(Debug)]
264pub struct OtelMeterProvider {
265    meter_provider: OtelSdkMeterProvider,
266}
267
268impl OtelMeterProvider {
269    /// Create a new [OtelMeterProvider] from an [OtelSdkMeterProvider].
270    pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self {
271        Self {
272            meter_provider: otel_meter_provider,
273        }
274    }
275
276    /// Flush the metric pipeline.
277    pub fn flush(&self) -> Result<(), ObservabilityError> {
278        match self.meter_provider.force_flush() {
279            Ok(_) => Ok(()),
280            Err(err) => Err(ObservabilityError::new(ErrorKind::Other, err)),
281        }
282    }
283}
284
285impl ProvideMeter for OtelMeterProvider {
286    fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Meter {
287        Meter::new(Arc::new(MeterWrap(self.meter_provider.meter(scope))))
288    }
289}
290
291#[cfg(test)]
292mod tests {
293
294    use std::sync::Arc;
295
296    use aws_smithy_observability::instruments::AsyncMeasure;
297    use aws_smithy_observability::{AttributeValue, Attributes, TelemetryProvider};
298    use opentelemetry_sdk::metrics::{
299        data::{Gauge, Histogram, Sum},
300        PeriodicReader, SdkMeterProvider,
301    };
302    use opentelemetry_sdk::runtime::Tokio;
303    use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
304
305    use super::OtelMeterProvider;
306
307    // Without these tokio settings this test just stalls forever on flushing the metrics pipeline
308    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
309    async fn sync_instruments_work() {
310        // Create the OTel metrics objects
311        let exporter = InMemoryMetricsExporter::default();
312        let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
313        let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
314
315        // Create the SDK metrics types from the OTel objects
316        let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
317        let sdk_ref = sdk_mp.clone();
318        let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
319
320        // Get the dyn versions of the SDK metrics objects
321        let dyn_sdk_mp = sdk_tp.meter_provider();
322        let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
323
324        //Create all 3 sync instruments and record some data for each
325        let mono_counter = dyn_sdk_meter
326            .create_monotonic_counter("TestMonoCounter")
327            .build();
328        mono_counter.add(4, None, None);
329        let ud_counter = dyn_sdk_meter
330            .create_up_down_counter("TestUpDownCounter")
331            .build();
332        ud_counter.add(-6, None, None);
333        let histogram = dyn_sdk_meter.create_histogram("TestHistogram").build();
334        histogram.record(1.234, None, None);
335
336        // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
337        sdk_ref.flush().unwrap();
338
339        // Extract the metrics from the exporter and assert that they are what we expect
340        let finished_metrics = exporter.get_finished_metrics().unwrap();
341        let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
342            .data
343            .as_any()
344            .downcast_ref::<Sum<u64>>()
345            .unwrap()
346            .data_points[0]
347            .value;
348        assert_eq!(extracted_mono_counter_data, &4);
349
350        let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
351            .data
352            .as_any()
353            .downcast_ref::<Sum<i64>>()
354            .unwrap()
355            .data_points[0]
356            .value;
357        assert_eq!(extracted_ud_counter_data, &-6);
358
359        let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2]
360            .data
361            .as_any()
362            .downcast_ref::<Histogram<f64>>()
363            .unwrap()
364            .data_points[0]
365            .sum;
366        assert_eq!(extracted_histogram_data, &1.234);
367    }
368
369    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
370    async fn async_instrument_work() {
371        // Create the OTel metrics objects
372        let exporter = InMemoryMetricsExporter::default();
373        let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
374        let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
375
376        // Create the SDK metrics types from the OTel objects
377        let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
378        let sdk_ref = sdk_mp.clone();
379        let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
380
381        // Get the dyn versions of the SDK metrics objects
382        let dyn_sdk_mp = sdk_tp.meter_provider();
383        let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
384
385        //Create all async instruments and record some data
386        let gauge = dyn_sdk_meter
387            .create_gauge(
388                "TestGauge".to_string(),
389                // Callback function records another value with different attributes so it is deduped
390                |measurement: &dyn AsyncMeasure<Value = f64>| {
391                    let mut attrs = Attributes::new();
392                    attrs.set(
393                        "TestGaugeAttr",
394                        AttributeValue::String("TestGaugeAttr".into()),
395                    );
396                    measurement.record(6.789, Some(&attrs), None);
397                },
398            )
399            .build();
400        gauge.record(1.234, None, None);
401
402        let async_ud_counter = dyn_sdk_meter
403            .create_async_up_down_counter(
404                "TestAsyncUpDownCounter".to_string(),
405                |measurement: &dyn AsyncMeasure<Value = i64>| {
406                    let mut attrs = Attributes::new();
407                    attrs.set(
408                        "TestAsyncUpDownCounterAttr",
409                        AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
410                    );
411                    measurement.record(12, Some(&attrs), None);
412                },
413            )
414            .build();
415        async_ud_counter.record(-6, None, None);
416
417        let async_mono_counter = dyn_sdk_meter
418            .create_async_monotonic_counter(
419                "TestAsyncMonoCounter".to_string(),
420                |measurement: &dyn AsyncMeasure<Value = u64>| {
421                    let mut attrs = Attributes::new();
422                    attrs.set(
423                        "TestAsyncMonoCounterAttr",
424                        AttributeValue::String("TestAsyncMonoCounterAttr".into()),
425                    );
426                    measurement.record(123, Some(&attrs), None);
427                },
428            )
429            .build();
430        async_mono_counter.record(4, None, None);
431
432        // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
433        sdk_ref.flush().unwrap();
434
435        // Extract the metrics from the exporter
436        let finished_metrics = exporter.get_finished_metrics().unwrap();
437
438        // Assert that the reported metrics are what we expect
439        let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
440            .data
441            .as_any()
442            .downcast_ref::<Gauge<f64>>()
443            .unwrap()
444            .data_points[0]
445            .value;
446        assert_eq!(extracted_gauge_data, &1.234);
447
448        let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
449            .data
450            .as_any()
451            .downcast_ref::<Sum<i64>>()
452            .unwrap()
453            .data_points[0]
454            .value;
455        assert_eq!(extracted_async_ud_counter_data, &-6);
456
457        let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
458            .data
459            .as_any()
460            .downcast_ref::<Sum<u64>>()
461            .unwrap()
462            .data_points[0]
463            .value;
464        assert_eq!(extracted_async_mono_data, &4);
465
466        // Assert that the async callbacks ran
467        let finished_metrics = exporter.get_finished_metrics().unwrap();
468        let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
469            .data
470            .as_any()
471            .downcast_ref::<Gauge<f64>>()
472            .unwrap()
473            .data_points[1]
474            .value;
475        assert_eq!(extracted_gauge_data, &6.789);
476
477        let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
478            .data
479            .as_any()
480            .downcast_ref::<Sum<i64>>()
481            .unwrap()
482            .data_points[1]
483            .value;
484        assert_eq!(extracted_async_ud_counter_data, &12);
485
486        let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
487            .data
488            .as_any()
489            .downcast_ref::<Sum<u64>>()
490            .unwrap()
491            .data_points[1]
492            .value;
493        assert_eq!(extracted_async_mono_data, &123);
494    }
495}