aws_smithy_observability_otel/
meter.rs1use std::fmt::Debug;
9use std::ops::Deref;
10use std::sync::Arc;
11
12use crate::attributes::kv_from_option_attr;
13use aws_smithy_observability::instruments::{
14 AsyncInstrumentBuilder, AsyncMeasure, Histogram, InstrumentBuilder, MonotonicCounter,
15 ProvideInstrument, UpDownCounter,
16};
17pub use aws_smithy_observability::meter::{Meter, ProvideMeter};
18
19use aws_smithy_observability::{Attributes, Context, ErrorKind, ObservabilityError};
20use opentelemetry::metrics::{
21 AsyncInstrument as OtelAsyncInstrument, Counter as OtelCounter, Histogram as OtelHistogram,
22 Meter as OtelMeter, MeterProvider as OtelMeterProviderTrait,
23 ObservableCounter as OtelObservableCounter, ObservableGauge as OtelObservableGauge,
24 ObservableUpDownCounter as OtelObservableUpDownCounter, UpDownCounter as OtelUpDownCounter,
25};
26use opentelemetry_sdk::metrics::SdkMeterProvider as OtelSdkMeterProvider;
27
28#[derive(Debug)]
29struct UpDownCounterWrap(OtelUpDownCounter<i64>);
30impl UpDownCounter for UpDownCounterWrap {
31 fn add(&self, value: i64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
32 self.0.add(value, &kv_from_option_attr(attributes));
33 }
34}
35
36#[derive(Debug)]
37struct HistogramWrap(OtelHistogram<f64>);
38impl Histogram for HistogramWrap {
39 fn record(&self, value: f64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
40 self.0.record(value, &kv_from_option_attr(attributes));
41 }
42}
43
44#[derive(Debug)]
45struct MonotonicCounterWrap(OtelCounter<u64>);
46impl MonotonicCounter for MonotonicCounterWrap {
47 fn add(&self, value: u64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
48 self.0.add(value, &kv_from_option_attr(attributes));
49 }
50}
51
52#[derive(Debug)]
53struct GaugeWrap(OtelObservableGauge<f64>);
54impl AsyncMeasure for GaugeWrap {
55 type Value = f64;
56
57 fn record(
58 &self,
59 value: Self::Value,
60 attributes: Option<&Attributes>,
61 _context: Option<&dyn Context>,
62 ) {
63 self.0.observe(value, &kv_from_option_attr(attributes));
64 }
65
66 fn stop(&self) {}
69}
70
71#[derive(Debug)]
72struct AsyncUpDownCounterWrap(OtelObservableUpDownCounter<i64>);
73impl AsyncMeasure for AsyncUpDownCounterWrap {
74 type Value = i64;
75
76 fn record(
77 &self,
78 value: Self::Value,
79 attributes: Option<&Attributes>,
80 _context: Option<&dyn Context>,
81 ) {
82 self.0.observe(value, &kv_from_option_attr(attributes));
83 }
84
85 fn stop(&self) {}
88}
89
90#[derive(Debug)]
91struct AsyncMonotonicCounterWrap(OtelObservableCounter<u64>);
92impl AsyncMeasure for AsyncMonotonicCounterWrap {
93 type Value = u64;
94
95 fn record(
96 &self,
97 value: Self::Value,
98 attributes: Option<&Attributes>,
99 _context: Option<&dyn Context>,
100 ) {
101 self.0.observe(value, &kv_from_option_attr(attributes));
102 }
103
104 fn stop(&self) {}
107}
108
109struct AsyncInstrumentWrap<'a, T>(&'a (dyn OtelAsyncInstrument<T> + Send + Sync));
110impl<T> AsyncMeasure for AsyncInstrumentWrap<'_, T> {
111 type Value = T;
112
113 fn record(
114 &self,
115 value: Self::Value,
116 attributes: Option<&Attributes>,
117 _context: Option<&dyn Context>,
118 ) {
119 self.0.observe(value, &kv_from_option_attr(attributes));
120 }
121
122 fn stop(&self) {}
125}
126
127impl<T> Debug for AsyncInstrumentWrap<'_, T> {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 f.debug_tuple("AsyncInstrumentWrap").finish()
132 }
133}
134
135#[derive(Debug)]
136struct MeterWrap(OtelMeter);
137impl Deref for MeterWrap {
138 type Target = OtelMeter;
139
140 fn deref(&self) -> &Self::Target {
141 &self.0
142 }
143}
144
145impl ProvideInstrument for MeterWrap {
146 fn create_gauge(
147 &self,
148 builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = f64>>, f64>,
149 ) -> Arc<dyn AsyncMeasure<Value = f64>> {
150 let mut otel_builder = self.f64_observable_gauge(builder.get_name().clone());
151
152 if let Some(desc) = builder.get_description() {
153 otel_builder = otel_builder.with_description(desc.clone());
154 }
155
156 if let Some(u) = builder.get_units() {
157 otel_builder = otel_builder.with_unit(u.clone());
158 }
159
160 otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<f64>| {
161 let f = builder.callback.clone();
162 f(&AsyncInstrumentWrap(input));
163 });
164
165 Arc::new(GaugeWrap(otel_builder.init()))
166 }
167
168 fn create_up_down_counter(
169 &self,
170 builder: InstrumentBuilder<'_, Arc<dyn UpDownCounter>>,
171 ) -> Arc<dyn UpDownCounter> {
172 let mut otel_builder = self.i64_up_down_counter(builder.get_name().clone());
173 if let Some(desc) = builder.get_description() {
174 otel_builder = otel_builder.with_description(desc.clone());
175 }
176
177 if let Some(u) = builder.get_units() {
178 otel_builder = otel_builder.with_unit(u.clone());
179 }
180
181 Arc::new(UpDownCounterWrap(otel_builder.init()))
182 }
183
184 fn create_async_up_down_counter(
185 &self,
186 builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = i64>>, i64>,
187 ) -> Arc<dyn AsyncMeasure<Value = i64>> {
188 let mut otel_builder = self.i64_observable_up_down_counter(builder.get_name().clone());
189
190 if let Some(desc) = builder.get_description() {
191 otel_builder = otel_builder.with_description(desc.clone());
192 }
193
194 if let Some(u) = builder.get_units() {
195 otel_builder = otel_builder.with_unit(u.clone());
196 }
197
198 otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<i64>| {
199 let f = builder.callback.clone();
200 f(&AsyncInstrumentWrap(input));
201 });
202
203 Arc::new(AsyncUpDownCounterWrap(otel_builder.init()))
204 }
205
206 fn create_monotonic_counter(
207 &self,
208 builder: InstrumentBuilder<'_, Arc<dyn MonotonicCounter>>,
209 ) -> Arc<dyn MonotonicCounter> {
210 let mut otel_builder = self.u64_counter(builder.get_name().clone());
211 if let Some(desc) = builder.get_description() {
212 otel_builder = otel_builder.with_description(desc.clone());
213 }
214
215 if let Some(u) = builder.get_units() {
216 otel_builder = otel_builder.with_unit(u.clone());
217 }
218
219 Arc::new(MonotonicCounterWrap(otel_builder.init()))
220 }
221
222 fn create_async_monotonic_counter(
223 &self,
224 builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = u64>>, u64>,
225 ) -> Arc<dyn AsyncMeasure<Value = u64>> {
226 let mut otel_builder = self.u64_observable_counter(builder.get_name().clone());
227
228 if let Some(desc) = builder.get_description() {
229 otel_builder = otel_builder.with_description(desc.clone());
230 }
231
232 if let Some(u) = builder.get_units() {
233 otel_builder = otel_builder.with_unit(u.clone());
234 }
235
236 otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<u64>| {
237 let f = builder.callback.clone();
238 f(&AsyncInstrumentWrap(input));
239 });
240
241 Arc::new(AsyncMonotonicCounterWrap(otel_builder.init()))
242 }
243
244 fn create_histogram(
245 &self,
246 builder: InstrumentBuilder<'_, Arc<dyn Histogram>>,
247 ) -> Arc<dyn Histogram> {
248 let mut otel_builder = self.f64_histogram(builder.get_name().clone());
249 if let Some(desc) = builder.get_description() {
250 otel_builder = otel_builder.with_description(desc.clone());
251 }
252
253 if let Some(u) = builder.get_units() {
254 otel_builder = otel_builder.with_unit(u.clone());
255 }
256
257 Arc::new(HistogramWrap(otel_builder.init()))
258 }
259}
260
261#[non_exhaustive]
263#[derive(Debug)]
264pub struct OtelMeterProvider {
265 meter_provider: OtelSdkMeterProvider,
266}
267
268impl OtelMeterProvider {
269 pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self {
271 Self {
272 meter_provider: otel_meter_provider,
273 }
274 }
275
276 pub fn flush(&self) -> Result<(), ObservabilityError> {
278 match self.meter_provider.force_flush() {
279 Ok(_) => Ok(()),
280 Err(err) => Err(ObservabilityError::new(ErrorKind::Other, err)),
281 }
282 }
283}
284
285impl ProvideMeter for OtelMeterProvider {
286 fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Meter {
287 Meter::new(Arc::new(MeterWrap(self.meter_provider.meter(scope))))
288 }
289
290 fn as_any(&self) -> &dyn std::any::Any {
291 self
292 }
293}
294
295#[cfg(test)]
296mod tests {
297
298 use std::sync::Arc;
299
300 use aws_smithy_observability::instruments::AsyncMeasure;
301 use aws_smithy_observability::{AttributeValue, Attributes, TelemetryProvider};
302 use opentelemetry_sdk::metrics::{
303 data::{Gauge, Histogram, Sum},
304 PeriodicReader, SdkMeterProvider,
305 };
306 use opentelemetry_sdk::runtime::Tokio;
307 use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
308
309 use super::OtelMeterProvider;
310
311 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
313 async fn sync_instruments_work() {
314 let exporter = InMemoryMetricsExporter::default();
316 let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
317 let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
318
319 let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
321 let sdk_ref = sdk_mp.clone();
322 let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
323
324 let dyn_sdk_mp = sdk_tp.meter_provider();
326 let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
327
328 let mono_counter = dyn_sdk_meter
330 .create_monotonic_counter("TestMonoCounter")
331 .build();
332 mono_counter.add(4, None, None);
333 let ud_counter = dyn_sdk_meter
334 .create_up_down_counter("TestUpDownCounter")
335 .build();
336 ud_counter.add(-6, None, None);
337 let histogram = dyn_sdk_meter.create_histogram("TestHistogram").build();
338 histogram.record(1.234, None, None);
339
340 sdk_ref.flush().unwrap();
342
343 let finished_metrics = exporter.get_finished_metrics().unwrap();
345 let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
346 .data
347 .as_any()
348 .downcast_ref::<Sum<u64>>()
349 .unwrap()
350 .data_points[0]
351 .value;
352 assert_eq!(extracted_mono_counter_data, &4);
353
354 let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
355 .data
356 .as_any()
357 .downcast_ref::<Sum<i64>>()
358 .unwrap()
359 .data_points[0]
360 .value;
361 assert_eq!(extracted_ud_counter_data, &-6);
362
363 let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2]
364 .data
365 .as_any()
366 .downcast_ref::<Histogram<f64>>()
367 .unwrap()
368 .data_points[0]
369 .sum;
370 assert_eq!(extracted_histogram_data, &1.234);
371 }
372
373 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
374 async fn async_instrument_work() {
375 let exporter = InMemoryMetricsExporter::default();
377 let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
378 let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
379
380 let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
382 let sdk_ref = sdk_mp.clone();
383 let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
384
385 let dyn_sdk_mp = sdk_tp.meter_provider();
387 let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
388
389 let gauge = dyn_sdk_meter
391 .create_gauge(
392 "TestGauge".to_string(),
393 |measurement: &dyn AsyncMeasure<Value = f64>| {
395 let mut attrs = Attributes::new();
396 attrs.set(
397 "TestGaugeAttr",
398 AttributeValue::String("TestGaugeAttr".into()),
399 );
400 measurement.record(6.789, Some(&attrs), None);
401 },
402 )
403 .build();
404 gauge.record(1.234, None, None);
405
406 let async_ud_counter = dyn_sdk_meter
407 .create_async_up_down_counter(
408 "TestAsyncUpDownCounter".to_string(),
409 |measurement: &dyn AsyncMeasure<Value = i64>| {
410 let mut attrs = Attributes::new();
411 attrs.set(
412 "TestAsyncUpDownCounterAttr",
413 AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
414 );
415 measurement.record(12, Some(&attrs), None);
416 },
417 )
418 .build();
419 async_ud_counter.record(-6, None, None);
420
421 let async_mono_counter = dyn_sdk_meter
422 .create_async_monotonic_counter(
423 "TestAsyncMonoCounter".to_string(),
424 |measurement: &dyn AsyncMeasure<Value = u64>| {
425 let mut attrs = Attributes::new();
426 attrs.set(
427 "TestAsyncMonoCounterAttr",
428 AttributeValue::String("TestAsyncMonoCounterAttr".into()),
429 );
430 measurement.record(123, Some(&attrs), None);
431 },
432 )
433 .build();
434 async_mono_counter.record(4, None, None);
435
436 sdk_ref.flush().unwrap();
438
439 let finished_metrics = exporter.get_finished_metrics().unwrap();
441
442 let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
444 .data
445 .as_any()
446 .downcast_ref::<Gauge<f64>>()
447 .unwrap()
448 .data_points[0]
449 .value;
450 assert_eq!(extracted_gauge_data, &1.234);
451
452 let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
453 .data
454 .as_any()
455 .downcast_ref::<Sum<i64>>()
456 .unwrap()
457 .data_points[0]
458 .value;
459 assert_eq!(extracted_async_ud_counter_data, &-6);
460
461 let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
462 .data
463 .as_any()
464 .downcast_ref::<Sum<u64>>()
465 .unwrap()
466 .data_points[0]
467 .value;
468 assert_eq!(extracted_async_mono_data, &4);
469
470 let finished_metrics = exporter.get_finished_metrics().unwrap();
472 let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
473 .data
474 .as_any()
475 .downcast_ref::<Gauge<f64>>()
476 .unwrap()
477 .data_points[1]
478 .value;
479 assert_eq!(extracted_gauge_data, &6.789);
480
481 let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
482 .data
483 .as_any()
484 .downcast_ref::<Sum<i64>>()
485 .unwrap()
486 .data_points[1]
487 .value;
488 assert_eq!(extracted_async_ud_counter_data, &12);
489
490 let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
491 .data
492 .as_any()
493 .downcast_ref::<Sum<u64>>()
494 .unwrap()
495 .data_points[1]
496 .value;
497 assert_eq!(extracted_async_mono_data, &123);
498 }
499}