aws_smithy_observability_otel/
meter.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! OpenTelemetry based implementations of the Smithy Observability Meter traits.
7
8use std::fmt::Debug;
9use std::ops::Deref;
10use std::sync::Arc;
11
12use crate::attributes::kv_from_option_attr;
13use aws_smithy_observability::instruments::{
14    AsyncInstrumentBuilder, AsyncMeasure, Histogram, InstrumentBuilder, MonotonicCounter,
15    ProvideInstrument, UpDownCounter,
16};
17pub use aws_smithy_observability::meter::{Meter, ProvideMeter};
18
19use aws_smithy_observability::{Attributes, Context, ErrorKind, ObservabilityError};
20use opentelemetry::metrics::{
21    AsyncInstrument as OtelAsyncInstrument, Counter as OtelCounter, Histogram as OtelHistogram,
22    Meter as OtelMeter, MeterProvider as OtelMeterProviderTrait,
23    ObservableCounter as OtelObservableCounter, ObservableGauge as OtelObservableGauge,
24    ObservableUpDownCounter as OtelObservableUpDownCounter, UpDownCounter as OtelUpDownCounter,
25};
26use opentelemetry_sdk::metrics::SdkMeterProvider as OtelSdkMeterProvider;
27
28#[derive(Debug)]
29struct UpDownCounterWrap(OtelUpDownCounter<i64>);
30impl UpDownCounter for UpDownCounterWrap {
31    fn add(&self, value: i64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
32        self.0.add(value, &kv_from_option_attr(attributes));
33    }
34}
35
36#[derive(Debug)]
37struct HistogramWrap(OtelHistogram<f64>);
38impl Histogram for HistogramWrap {
39    fn record(&self, value: f64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
40        self.0.record(value, &kv_from_option_attr(attributes));
41    }
42}
43
44#[derive(Debug)]
45struct MonotonicCounterWrap(OtelCounter<u64>);
46impl MonotonicCounter for MonotonicCounterWrap {
47    fn add(&self, value: u64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
48        self.0.add(value, &kv_from_option_attr(attributes));
49    }
50}
51
52#[derive(Debug)]
53struct GaugeWrap(OtelObservableGauge<f64>);
54impl AsyncMeasure for GaugeWrap {
55    type Value = f64;
56
57    fn record(
58        &self,
59        value: Self::Value,
60        attributes: Option<&Attributes>,
61        _context: Option<&dyn Context>,
62    ) {
63        self.0.observe(value, &kv_from_option_attr(attributes));
64    }
65
66    // OTel rust does not currently support unregistering callbacks
67    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
68    fn stop(&self) {}
69}
70
71#[derive(Debug)]
72struct AsyncUpDownCounterWrap(OtelObservableUpDownCounter<i64>);
73impl AsyncMeasure for AsyncUpDownCounterWrap {
74    type Value = i64;
75
76    fn record(
77        &self,
78        value: Self::Value,
79        attributes: Option<&Attributes>,
80        _context: Option<&dyn Context>,
81    ) {
82        self.0.observe(value, &kv_from_option_attr(attributes));
83    }
84
85    // OTel rust does not currently support unregistering callbacks
86    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
87    fn stop(&self) {}
88}
89
90#[derive(Debug)]
91struct AsyncMonotonicCounterWrap(OtelObservableCounter<u64>);
92impl AsyncMeasure for AsyncMonotonicCounterWrap {
93    type Value = u64;
94
95    fn record(
96        &self,
97        value: Self::Value,
98        attributes: Option<&Attributes>,
99        _context: Option<&dyn Context>,
100    ) {
101        self.0.observe(value, &kv_from_option_attr(attributes));
102    }
103
104    // OTel rust does not currently support unregistering callbacks
105    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
106    fn stop(&self) {}
107}
108
109struct AsyncInstrumentWrap<'a, T>(&'a (dyn OtelAsyncInstrument<T> + Send + Sync));
110impl<T> AsyncMeasure for AsyncInstrumentWrap<'_, T> {
111    type Value = T;
112
113    fn record(
114        &self,
115        value: Self::Value,
116        attributes: Option<&Attributes>,
117        _context: Option<&dyn Context>,
118    ) {
119        self.0.observe(value, &kv_from_option_attr(attributes));
120    }
121
122    // OTel rust does not currently support unregistering callbacks
123    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
124    fn stop(&self) {}
125}
126
127// The OtelAsyncInstrument trait does not have Debug as a supertrait, so we impl a minimal version
128// for our wrapper struct
129impl<T> Debug for AsyncInstrumentWrap<'_, T> {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        f.debug_tuple("AsyncInstrumentWrap").finish()
132    }
133}
134
135#[derive(Debug)]
136struct MeterWrap(OtelMeter);
137impl Deref for MeterWrap {
138    type Target = OtelMeter;
139
140    fn deref(&self) -> &Self::Target {
141        &self.0
142    }
143}
144
145impl ProvideInstrument for MeterWrap {
146    fn create_gauge(
147        &self,
148        builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = f64>>, f64>,
149    ) -> Arc<dyn AsyncMeasure<Value = f64>> {
150        let mut otel_builder = self.f64_observable_gauge(builder.get_name().clone());
151
152        if let Some(desc) = builder.get_description() {
153            otel_builder = otel_builder.with_description(desc.clone());
154        }
155
156        if let Some(u) = builder.get_units() {
157            otel_builder = otel_builder.with_unit(u.clone());
158        }
159
160        otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<f64>| {
161            let f = builder.callback.clone();
162            f(&AsyncInstrumentWrap(input));
163        });
164
165        Arc::new(GaugeWrap(otel_builder.init()))
166    }
167
168    fn create_up_down_counter(
169        &self,
170        builder: InstrumentBuilder<'_, Arc<dyn UpDownCounter>>,
171    ) -> Arc<dyn UpDownCounter> {
172        let mut otel_builder = self.i64_up_down_counter(builder.get_name().clone());
173        if let Some(desc) = builder.get_description() {
174            otel_builder = otel_builder.with_description(desc.clone());
175        }
176
177        if let Some(u) = builder.get_units() {
178            otel_builder = otel_builder.with_unit(u.clone());
179        }
180
181        Arc::new(UpDownCounterWrap(otel_builder.init()))
182    }
183
184    fn create_async_up_down_counter(
185        &self,
186        builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = i64>>, i64>,
187    ) -> Arc<dyn AsyncMeasure<Value = i64>> {
188        let mut otel_builder = self.i64_observable_up_down_counter(builder.get_name().clone());
189
190        if let Some(desc) = builder.get_description() {
191            otel_builder = otel_builder.with_description(desc.clone());
192        }
193
194        if let Some(u) = builder.get_units() {
195            otel_builder = otel_builder.with_unit(u.clone());
196        }
197
198        otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<i64>| {
199            let f = builder.callback.clone();
200            f(&AsyncInstrumentWrap(input));
201        });
202
203        Arc::new(AsyncUpDownCounterWrap(otel_builder.init()))
204    }
205
206    fn create_monotonic_counter(
207        &self,
208        builder: InstrumentBuilder<'_, Arc<dyn MonotonicCounter>>,
209    ) -> Arc<dyn MonotonicCounter> {
210        let mut otel_builder = self.u64_counter(builder.get_name().clone());
211        if let Some(desc) = builder.get_description() {
212            otel_builder = otel_builder.with_description(desc.clone());
213        }
214
215        if let Some(u) = builder.get_units() {
216            otel_builder = otel_builder.with_unit(u.clone());
217        }
218
219        Arc::new(MonotonicCounterWrap(otel_builder.init()))
220    }
221
222    fn create_async_monotonic_counter(
223        &self,
224        builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = u64>>, u64>,
225    ) -> Arc<dyn AsyncMeasure<Value = u64>> {
226        let mut otel_builder = self.u64_observable_counter(builder.get_name().clone());
227
228        if let Some(desc) = builder.get_description() {
229            otel_builder = otel_builder.with_description(desc.clone());
230        }
231
232        if let Some(u) = builder.get_units() {
233            otel_builder = otel_builder.with_unit(u.clone());
234        }
235
236        otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<u64>| {
237            let f = builder.callback.clone();
238            f(&AsyncInstrumentWrap(input));
239        });
240
241        Arc::new(AsyncMonotonicCounterWrap(otel_builder.init()))
242    }
243
244    fn create_histogram(
245        &self,
246        builder: InstrumentBuilder<'_, Arc<dyn Histogram>>,
247    ) -> Arc<dyn Histogram> {
248        let mut otel_builder = self.f64_histogram(builder.get_name().clone());
249        if let Some(desc) = builder.get_description() {
250            otel_builder = otel_builder.with_description(desc.clone());
251        }
252
253        if let Some(u) = builder.get_units() {
254            otel_builder = otel_builder.with_unit(u.clone());
255        }
256
257        Arc::new(HistogramWrap(otel_builder.init()))
258    }
259}
260
261/// An OpenTelemetry based implementation of the AWS SDK's [ProvideMeter] trait
262#[non_exhaustive]
263#[derive(Debug)]
264pub struct OtelMeterProvider {
265    meter_provider: OtelSdkMeterProvider,
266}
267
268impl OtelMeterProvider {
269    /// Create a new [OtelMeterProvider] from an [OtelSdkMeterProvider].
270    pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self {
271        Self {
272            meter_provider: otel_meter_provider,
273        }
274    }
275
276    /// Flush the metric pipeline.
277    pub fn flush(&self) -> Result<(), ObservabilityError> {
278        match self.meter_provider.force_flush() {
279            Ok(_) => Ok(()),
280            Err(err) => Err(ObservabilityError::new(ErrorKind::Other, err)),
281        }
282    }
283}
284
285impl ProvideMeter for OtelMeterProvider {
286    fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Meter {
287        Meter::new(Arc::new(MeterWrap(self.meter_provider.meter(scope))))
288    }
289
290    fn as_any(&self) -> &dyn std::any::Any {
291        self
292    }
293}
294
295#[cfg(test)]
296mod tests {
297
298    use std::sync::Arc;
299
300    use aws_smithy_observability::instruments::AsyncMeasure;
301    use aws_smithy_observability::{AttributeValue, Attributes, TelemetryProvider};
302    use opentelemetry_sdk::metrics::{
303        data::{Gauge, Histogram, Sum},
304        PeriodicReader, SdkMeterProvider,
305    };
306    use opentelemetry_sdk::runtime::Tokio;
307    use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
308
309    use super::OtelMeterProvider;
310
311    // Without these tokio settings this test just stalls forever on flushing the metrics pipeline
312    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
313    async fn sync_instruments_work() {
314        // Create the OTel metrics objects
315        let exporter = InMemoryMetricsExporter::default();
316        let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
317        let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
318
319        // Create the SDK metrics types from the OTel objects
320        let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
321        let sdk_ref = sdk_mp.clone();
322        let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
323
324        // Get the dyn versions of the SDK metrics objects
325        let dyn_sdk_mp = sdk_tp.meter_provider();
326        let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
327
328        //Create all 3 sync instruments and record some data for each
329        let mono_counter = dyn_sdk_meter
330            .create_monotonic_counter("TestMonoCounter")
331            .build();
332        mono_counter.add(4, None, None);
333        let ud_counter = dyn_sdk_meter
334            .create_up_down_counter("TestUpDownCounter")
335            .build();
336        ud_counter.add(-6, None, None);
337        let histogram = dyn_sdk_meter.create_histogram("TestHistogram").build();
338        histogram.record(1.234, None, None);
339
340        // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
341        sdk_ref.flush().unwrap();
342
343        // Extract the metrics from the exporter and assert that they are what we expect
344        let finished_metrics = exporter.get_finished_metrics().unwrap();
345        let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
346            .data
347            .as_any()
348            .downcast_ref::<Sum<u64>>()
349            .unwrap()
350            .data_points[0]
351            .value;
352        assert_eq!(extracted_mono_counter_data, &4);
353
354        let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
355            .data
356            .as_any()
357            .downcast_ref::<Sum<i64>>()
358            .unwrap()
359            .data_points[0]
360            .value;
361        assert_eq!(extracted_ud_counter_data, &-6);
362
363        let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2]
364            .data
365            .as_any()
366            .downcast_ref::<Histogram<f64>>()
367            .unwrap()
368            .data_points[0]
369            .sum;
370        assert_eq!(extracted_histogram_data, &1.234);
371    }
372
373    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
374    async fn async_instrument_work() {
375        // Create the OTel metrics objects
376        let exporter = InMemoryMetricsExporter::default();
377        let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
378        let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
379
380        // Create the SDK metrics types from the OTel objects
381        let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
382        let sdk_ref = sdk_mp.clone();
383        let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
384
385        // Get the dyn versions of the SDK metrics objects
386        let dyn_sdk_mp = sdk_tp.meter_provider();
387        let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
388
389        //Create all async instruments and record some data
390        let gauge = dyn_sdk_meter
391            .create_gauge(
392                "TestGauge".to_string(),
393                // Callback function records another value with different attributes so it is deduped
394                |measurement: &dyn AsyncMeasure<Value = f64>| {
395                    let mut attrs = Attributes::new();
396                    attrs.set(
397                        "TestGaugeAttr",
398                        AttributeValue::String("TestGaugeAttr".into()),
399                    );
400                    measurement.record(6.789, Some(&attrs), None);
401                },
402            )
403            .build();
404        gauge.record(1.234, None, None);
405
406        let async_ud_counter = dyn_sdk_meter
407            .create_async_up_down_counter(
408                "TestAsyncUpDownCounter".to_string(),
409                |measurement: &dyn AsyncMeasure<Value = i64>| {
410                    let mut attrs = Attributes::new();
411                    attrs.set(
412                        "TestAsyncUpDownCounterAttr",
413                        AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
414                    );
415                    measurement.record(12, Some(&attrs), None);
416                },
417            )
418            .build();
419        async_ud_counter.record(-6, None, None);
420
421        let async_mono_counter = dyn_sdk_meter
422            .create_async_monotonic_counter(
423                "TestAsyncMonoCounter".to_string(),
424                |measurement: &dyn AsyncMeasure<Value = u64>| {
425                    let mut attrs = Attributes::new();
426                    attrs.set(
427                        "TestAsyncMonoCounterAttr",
428                        AttributeValue::String("TestAsyncMonoCounterAttr".into()),
429                    );
430                    measurement.record(123, Some(&attrs), None);
431                },
432            )
433            .build();
434        async_mono_counter.record(4, None, None);
435
436        // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
437        sdk_ref.flush().unwrap();
438
439        // Extract the metrics from the exporter
440        let finished_metrics = exporter.get_finished_metrics().unwrap();
441
442        // Assert that the reported metrics are what we expect
443        let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
444            .data
445            .as_any()
446            .downcast_ref::<Gauge<f64>>()
447            .unwrap()
448            .data_points[0]
449            .value;
450        assert_eq!(extracted_gauge_data, &1.234);
451
452        let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
453            .data
454            .as_any()
455            .downcast_ref::<Sum<i64>>()
456            .unwrap()
457            .data_points[0]
458            .value;
459        assert_eq!(extracted_async_ud_counter_data, &-6);
460
461        let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
462            .data
463            .as_any()
464            .downcast_ref::<Sum<u64>>()
465            .unwrap()
466            .data_points[0]
467            .value;
468        assert_eq!(extracted_async_mono_data, &4);
469
470        // Assert that the async callbacks ran
471        let finished_metrics = exporter.get_finished_metrics().unwrap();
472        let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
473            .data
474            .as_any()
475            .downcast_ref::<Gauge<f64>>()
476            .unwrap()
477            .data_points[1]
478            .value;
479        assert_eq!(extracted_gauge_data, &6.789);
480
481        let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
482            .data
483            .as_any()
484            .downcast_ref::<Sum<i64>>()
485            .unwrap()
486            .data_points[1]
487            .value;
488        assert_eq!(extracted_async_ud_counter_data, &12);
489
490        let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
491            .data
492            .as_any()
493            .downcast_ref::<Sum<u64>>()
494            .unwrap()
495            .data_points[1]
496            .value;
497        assert_eq!(extracted_async_mono_data, &123);
498    }
499}