aws_smithy_observability_otel/
meter.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! OpenTelemetry based implementations of the Smithy Observability Meter traits.
7
8use std::fmt::Debug;
9use std::ops::Deref;
10use std::sync::Arc;
11
12use crate::attributes::kv_from_option_attr;
13use aws_smithy_observability::instruments::{
14    AsyncInstrumentBuilder, AsyncMeasure, Histogram, InstrumentBuilder, MonotonicCounter,
15    ProvideInstrument, UpDownCounter,
16};
17pub use aws_smithy_observability::meter::{Meter, ProvideMeter};
18
19use aws_smithy_observability::{Attributes, Context, ErrorKind, ObservabilityError};
20use opentelemetry::metrics::{
21    AsyncInstrument as OtelAsyncInstrument, Counter as OtelCounter, Histogram as OtelHistogram,
22    Meter as OtelMeter, MeterProvider as OtelMeterProviderTrait,
23    ObservableCounter as OtelObservableCounter, ObservableGauge as OtelObservableGauge,
24    ObservableUpDownCounter as OtelObservableUpDownCounter, UpDownCounter as OtelUpDownCounter,
25};
26use opentelemetry_sdk::metrics::SdkMeterProvider as OtelSdkMeterProvider;
27
28#[derive(Debug)]
29struct UpDownCounterWrap(OtelUpDownCounter<i64>);
30impl UpDownCounter for UpDownCounterWrap {
31    fn add(&self, value: i64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
32        self.0.add(value, &kv_from_option_attr(attributes));
33    }
34}
35
36#[derive(Debug)]
37struct HistogramWrap(OtelHistogram<f64>);
38impl Histogram for HistogramWrap {
39    fn record(&self, value: f64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
40        self.0.record(value, &kv_from_option_attr(attributes));
41    }
42}
43
44#[derive(Debug)]
45struct MonotonicCounterWrap(OtelCounter<u64>);
46impl MonotonicCounter for MonotonicCounterWrap {
47    fn add(&self, value: u64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
48        self.0.add(value, &kv_from_option_attr(attributes));
49    }
50}
51
52#[derive(Debug)]
53struct GaugeWrap(OtelObservableGauge<f64>);
54impl AsyncMeasure for GaugeWrap {
55    type Value = f64;
56
57    fn record(
58        &self,
59        _value: Self::Value,
60        _attributes: Option<&Attributes>,
61        _context: Option<&dyn Context>,
62    ) {
63        // In OpenTelemetry 0.27+, observable instruments don't have an observe() method.
64        // Recording is done through callbacks registered during instrument creation.
65        // This method is a no-op as the callback handles all observations.
66    }
67
68    // OTel rust does not currently support unregistering callbacks
69    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
70    fn stop(&self) {}
71}
72
73#[derive(Debug)]
74struct AsyncUpDownCounterWrap(OtelObservableUpDownCounter<i64>);
75impl AsyncMeasure for AsyncUpDownCounterWrap {
76    type Value = i64;
77
78    fn record(
79        &self,
80        _value: Self::Value,
81        _attributes: Option<&Attributes>,
82        _context: Option<&dyn Context>,
83    ) {
84        // In OpenTelemetry 0.27+, observable instruments don't have an observe() method.
85        // Recording is done through callbacks registered during instrument creation.
86        // This method is a no-op as the callback handles all observations.
87    }
88
89    // OTel rust does not currently support unregistering callbacks
90    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
91    fn stop(&self) {}
92}
93
94#[derive(Debug)]
95struct AsyncMonotonicCounterWrap(OtelObservableCounter<u64>);
96impl AsyncMeasure for AsyncMonotonicCounterWrap {
97    type Value = u64;
98
99    fn record(
100        &self,
101        _value: Self::Value,
102        _attributes: Option<&Attributes>,
103        _context: Option<&dyn Context>,
104    ) {
105        // In OpenTelemetry 0.27+, observable instruments don't have an observe() method.
106        // Recording is done through callbacks registered during instrument creation.
107        // This method is a no-op as the callback handles all observations.
108    }
109
110    // OTel rust does not currently support unregistering callbacks
111    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
112    fn stop(&self) {}
113}
114
115struct AsyncInstrumentWrap<'a, T>(&'a (dyn OtelAsyncInstrument<T> + Send + Sync));
116impl<T> AsyncMeasure for AsyncInstrumentWrap<'_, T> {
117    type Value = T;
118
119    fn record(
120        &self,
121        value: Self::Value,
122        attributes: Option<&Attributes>,
123        _context: Option<&dyn Context>,
124    ) {
125        self.0.observe(value, &kv_from_option_attr(attributes));
126    }
127
128    // OTel rust does not currently support unregistering callbacks
129    // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
130    fn stop(&self) {}
131}
132
133// The OtelAsyncInstrument trait does not have Debug as a supertrait, so we impl a minimal version
134// for our wrapper struct
135impl<T> Debug for AsyncInstrumentWrap<'_, T> {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        f.debug_tuple("AsyncInstrumentWrap").finish()
138    }
139}
140
141#[derive(Debug)]
142struct MeterWrap(OtelMeter);
143impl Deref for MeterWrap {
144    type Target = OtelMeter;
145
146    fn deref(&self) -> &Self::Target {
147        &self.0
148    }
149}
150
151impl ProvideInstrument for MeterWrap {
152    fn create_gauge(
153        &self,
154        builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = f64>>, f64>,
155    ) -> Arc<dyn AsyncMeasure<Value = f64>> {
156        let mut otel_builder = self.f64_observable_gauge(builder.get_name().clone());
157
158        if let Some(desc) = builder.get_description() {
159            otel_builder = otel_builder.with_description(desc.clone());
160        }
161
162        if let Some(u) = builder.get_units() {
163            otel_builder = otel_builder.with_unit(u.clone());
164        }
165
166        otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<f64>| {
167            let f = builder.callback.clone();
168            f(&AsyncInstrumentWrap(input));
169        });
170
171        Arc::new(GaugeWrap(otel_builder.build()))
172    }
173
174    fn create_up_down_counter(
175        &self,
176        builder: InstrumentBuilder<'_, Arc<dyn UpDownCounter>>,
177    ) -> Arc<dyn UpDownCounter> {
178        let mut otel_builder = self.i64_up_down_counter(builder.get_name().clone());
179        if let Some(desc) = builder.get_description() {
180            otel_builder = otel_builder.with_description(desc.clone());
181        }
182
183        if let Some(u) = builder.get_units() {
184            otel_builder = otel_builder.with_unit(u.clone());
185        }
186
187        Arc::new(UpDownCounterWrap(otel_builder.build()))
188    }
189
190    fn create_async_up_down_counter(
191        &self,
192        builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = i64>>, i64>,
193    ) -> Arc<dyn AsyncMeasure<Value = i64>> {
194        let mut otel_builder = self.i64_observable_up_down_counter(builder.get_name().clone());
195
196        if let Some(desc) = builder.get_description() {
197            otel_builder = otel_builder.with_description(desc.clone());
198        }
199
200        if let Some(u) = builder.get_units() {
201            otel_builder = otel_builder.with_unit(u.clone());
202        }
203
204        otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<i64>| {
205            let f = builder.callback.clone();
206            f(&AsyncInstrumentWrap(input));
207        });
208
209        Arc::new(AsyncUpDownCounterWrap(otel_builder.build()))
210    }
211
212    fn create_monotonic_counter(
213        &self,
214        builder: InstrumentBuilder<'_, Arc<dyn MonotonicCounter>>,
215    ) -> Arc<dyn MonotonicCounter> {
216        let mut otel_builder = self.u64_counter(builder.get_name().clone());
217        if let Some(desc) = builder.get_description() {
218            otel_builder = otel_builder.with_description(desc.clone());
219        }
220
221        if let Some(u) = builder.get_units() {
222            otel_builder = otel_builder.with_unit(u.clone());
223        }
224
225        Arc::new(MonotonicCounterWrap(otel_builder.build()))
226    }
227
228    fn create_async_monotonic_counter(
229        &self,
230        builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = u64>>, u64>,
231    ) -> Arc<dyn AsyncMeasure<Value = u64>> {
232        let mut otel_builder = self.u64_observable_counter(builder.get_name().clone());
233
234        if let Some(desc) = builder.get_description() {
235            otel_builder = otel_builder.with_description(desc.clone());
236        }
237
238        if let Some(u) = builder.get_units() {
239            otel_builder = otel_builder.with_unit(u.clone());
240        }
241
242        otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<u64>| {
243            let f = builder.callback.clone();
244            f(&AsyncInstrumentWrap(input));
245        });
246
247        Arc::new(AsyncMonotonicCounterWrap(otel_builder.build()))
248    }
249
250    fn create_histogram(
251        &self,
252        builder: InstrumentBuilder<'_, Arc<dyn Histogram>>,
253    ) -> Arc<dyn Histogram> {
254        let mut otel_builder = self.f64_histogram(builder.get_name().clone());
255        if let Some(desc) = builder.get_description() {
256            otel_builder = otel_builder.with_description(desc.clone());
257        }
258
259        if let Some(u) = builder.get_units() {
260            otel_builder = otel_builder.with_unit(u.clone());
261        }
262
263        Arc::new(HistogramWrap(otel_builder.build()))
264    }
265}
266
267/// An OpenTelemetry based implementation of the AWS SDK's [ProvideMeter] trait
268#[non_exhaustive]
269#[derive(Debug)]
270pub struct OtelMeterProvider {
271    meter_provider: OtelSdkMeterProvider,
272}
273
274impl OtelMeterProvider {
275    /// Create a new [OtelMeterProvider] from an [OtelSdkMeterProvider].
276    pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self {
277        Self {
278            meter_provider: otel_meter_provider,
279        }
280    }
281
282    /// Flush the metric pipeline.
283    pub fn flush(&self) -> Result<(), ObservabilityError> {
284        match self.meter_provider.force_flush() {
285            Ok(_) => Ok(()),
286            Err(err) => Err(ObservabilityError::new(ErrorKind::Other, err)),
287        }
288    }
289}
290
291impl ProvideMeter for OtelMeterProvider {
292    fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Meter {
293        Meter::new(Arc::new(MeterWrap(self.meter_provider.meter(scope))))
294    }
295
296    fn as_any(&self) -> &dyn std::any::Any {
297        self
298    }
299}
300
301#[cfg(test)]
302mod tests {
303
304    use std::sync::Arc;
305
306    use aws_smithy_observability::instruments::AsyncMeasure;
307    use aws_smithy_observability::{AttributeValue, Attributes, TelemetryProvider};
308    use opentelemetry_sdk::metrics::{
309        data::{Gauge, Histogram, Sum},
310        PeriodicReader, SdkMeterProvider,
311    };
312    use opentelemetry_sdk::runtime::Tokio;
313    use opentelemetry_sdk::testing::metrics::InMemoryMetricExporter;
314
315    use super::OtelMeterProvider;
316
317    // Without these tokio settings this test just stalls forever on flushing the metrics pipeline
318    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
319    async fn sync_instruments_work() {
320        // Create the OTel metrics objects
321        let exporter = InMemoryMetricExporter::default();
322        let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
323        let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
324
325        // Create the SDK metrics types from the OTel objects
326        let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
327        let sdk_ref = sdk_mp.clone();
328        let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
329
330        // Get the dyn versions of the SDK metrics objects
331        let dyn_sdk_mp = sdk_tp.meter_provider();
332        let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
333
334        //Create all 3 sync instruments and record some data for each
335        let mono_counter = dyn_sdk_meter
336            .create_monotonic_counter("TestMonoCounter")
337            .build();
338        mono_counter.add(4, None, None);
339        let ud_counter = dyn_sdk_meter
340            .create_up_down_counter("TestUpDownCounter")
341            .build();
342        ud_counter.add(-6, None, None);
343        let histogram = dyn_sdk_meter.create_histogram("TestHistogram").build();
344        histogram.record(1.234, None, None);
345
346        // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
347        sdk_ref.flush().unwrap();
348
349        // Extract the metrics from the exporter and assert that they are what we expect
350        let finished_metrics = exporter.get_finished_metrics().unwrap();
351        let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
352            .data
353            .as_any()
354            .downcast_ref::<Sum<u64>>()
355            .unwrap()
356            .data_points[0]
357            .value;
358        assert_eq!(extracted_mono_counter_data, &4);
359
360        let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
361            .data
362            .as_any()
363            .downcast_ref::<Sum<i64>>()
364            .unwrap()
365            .data_points[0]
366            .value;
367        assert_eq!(extracted_ud_counter_data, &-6);
368
369        let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2]
370            .data
371            .as_any()
372            .downcast_ref::<Histogram<f64>>()
373            .unwrap()
374            .data_points[0]
375            .sum;
376        assert_eq!(extracted_histogram_data, &1.234);
377    }
378
379    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
380    async fn async_instrument_work() {
381        // Create the OTel metrics objects
382        let exporter = InMemoryMetricExporter::default();
383        let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
384        let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
385
386        // Create the SDK metrics types from the OTel objects
387        let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
388        let sdk_ref = sdk_mp.clone();
389        let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
390
391        // Get the dyn versions of the SDK metrics objects
392        let dyn_sdk_mp = sdk_tp.meter_provider();
393        let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
394
395        //Create all async instruments - in OpenTelemetry 0.27+, async instruments only work through callbacks
396        let _gauge = dyn_sdk_meter
397            .create_gauge(
398                "TestGauge".to_string(),
399                // Callback function records the value
400                |measurement: &dyn AsyncMeasure<Value = f64>| {
401                    let mut attrs = Attributes::new();
402                    attrs.set(
403                        "TestGaugeAttr",
404                        AttributeValue::String("TestGaugeAttr".into()),
405                    );
406                    measurement.record(6.789, Some(&attrs), None);
407                },
408            )
409            .build();
410
411        let _async_ud_counter = dyn_sdk_meter
412            .create_async_up_down_counter(
413                "TestAsyncUpDownCounter".to_string(),
414                |measurement: &dyn AsyncMeasure<Value = i64>| {
415                    let mut attrs = Attributes::new();
416                    attrs.set(
417                        "TestAsyncUpDownCounterAttr",
418                        AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
419                    );
420                    measurement.record(12, Some(&attrs), None);
421                },
422            )
423            .build();
424
425        let _async_mono_counter = dyn_sdk_meter
426            .create_async_monotonic_counter(
427                "TestAsyncMonoCounter".to_string(),
428                |measurement: &dyn AsyncMeasure<Value = u64>| {
429                    let mut attrs = Attributes::new();
430                    attrs.set(
431                        "TestAsyncMonoCounterAttr",
432                        AttributeValue::String("TestAsyncMonoCounterAttr".into()),
433                    );
434                    measurement.record(123, Some(&attrs), None);
435                },
436            )
437            .build();
438
439        // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
440        sdk_ref.flush().unwrap();
441
442        // Extract the metrics from the exporter
443        let finished_metrics = exporter.get_finished_metrics().unwrap();
444
445        // Assert that the async callbacks ran and recorded the expected values
446        // In OpenTelemetry 0.27+, async instruments only work through callbacks
447        let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
448            .data
449            .as_any()
450            .downcast_ref::<Gauge<f64>>()
451            .unwrap()
452            .data_points[0]
453            .value;
454        assert_eq!(extracted_gauge_data, &6.789);
455
456        let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
457            .data
458            .as_any()
459            .downcast_ref::<Sum<i64>>()
460            .unwrap()
461            .data_points[0]
462            .value;
463        assert_eq!(extracted_async_ud_counter_data, &12);
464
465        let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
466            .data
467            .as_any()
468            .downcast_ref::<Sum<u64>>()
469            .unwrap()
470            .data_points[0]
471            .value;
472        assert_eq!(extracted_async_mono_data, &123);
473    }
474}