aws_smithy_observability_otel/
meter.rs1use std::fmt::Debug;
9use std::ops::Deref;
10use std::sync::Arc;
11
12use crate::attributes::kv_from_option_attr;
13use aws_smithy_observability::instruments::{
14 AsyncInstrumentBuilder, AsyncMeasure, Histogram, InstrumentBuilder, MonotonicCounter,
15 ProvideInstrument, UpDownCounter,
16};
17pub use aws_smithy_observability::meter::{Meter, ProvideMeter};
18
19use aws_smithy_observability::{Attributes, Context, ErrorKind, ObservabilityError};
20use opentelemetry::metrics::{
21 AsyncInstrument as OtelAsyncInstrument, Counter as OtelCounter, Histogram as OtelHistogram,
22 Meter as OtelMeter, MeterProvider as OtelMeterProviderTrait,
23 ObservableCounter as OtelObservableCounter, ObservableGauge as OtelObservableGauge,
24 ObservableUpDownCounter as OtelObservableUpDownCounter, UpDownCounter as OtelUpDownCounter,
25};
26use opentelemetry_sdk::metrics::SdkMeterProvider as OtelSdkMeterProvider;
27
28#[derive(Debug)]
29struct UpDownCounterWrap(OtelUpDownCounter<i64>);
30impl UpDownCounter for UpDownCounterWrap {
31 fn add(&self, value: i64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
32 self.0.add(value, &kv_from_option_attr(attributes));
33 }
34}
35
36#[derive(Debug)]
37struct HistogramWrap(OtelHistogram<f64>);
38impl Histogram for HistogramWrap {
39 fn record(&self, value: f64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
40 self.0.record(value, &kv_from_option_attr(attributes));
41 }
42}
43
44#[derive(Debug)]
45struct MonotonicCounterWrap(OtelCounter<u64>);
46impl MonotonicCounter for MonotonicCounterWrap {
47 fn add(&self, value: u64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
48 self.0.add(value, &kv_from_option_attr(attributes));
49 }
50}
51
52#[derive(Debug)]
53struct GaugeWrap(OtelObservableGauge<f64>);
54impl AsyncMeasure for GaugeWrap {
55 type Value = f64;
56
57 fn record(
58 &self,
59 value: Self::Value,
60 attributes: Option<&Attributes>,
61 _context: Option<&dyn Context>,
62 ) {
63 self.0.observe(value, &kv_from_option_attr(attributes));
64 }
65
66 fn stop(&self) {}
69}
70
71#[derive(Debug)]
72struct AsyncUpDownCounterWrap(OtelObservableUpDownCounter<i64>);
73impl AsyncMeasure for AsyncUpDownCounterWrap {
74 type Value = i64;
75
76 fn record(
77 &self,
78 value: Self::Value,
79 attributes: Option<&Attributes>,
80 _context: Option<&dyn Context>,
81 ) {
82 self.0.observe(value, &kv_from_option_attr(attributes));
83 }
84
85 fn stop(&self) {}
88}
89
90#[derive(Debug)]
91struct AsyncMonotonicCounterWrap(OtelObservableCounter<u64>);
92impl AsyncMeasure for AsyncMonotonicCounterWrap {
93 type Value = u64;
94
95 fn record(
96 &self,
97 value: Self::Value,
98 attributes: Option<&Attributes>,
99 _context: Option<&dyn Context>,
100 ) {
101 self.0.observe(value, &kv_from_option_attr(attributes));
102 }
103
104 fn stop(&self) {}
107}
108
109struct AsyncInstrumentWrap<'a, T>(&'a (dyn OtelAsyncInstrument<T> + Send + Sync));
110impl<T> AsyncMeasure for AsyncInstrumentWrap<'_, T> {
111 type Value = T;
112
113 fn record(
114 &self,
115 value: Self::Value,
116 attributes: Option<&Attributes>,
117 _context: Option<&dyn Context>,
118 ) {
119 self.0.observe(value, &kv_from_option_attr(attributes));
120 }
121
122 fn stop(&self) {}
125}
126
127impl<T> Debug for AsyncInstrumentWrap<'_, T> {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 f.debug_tuple("AsyncInstrumentWrap").finish()
132 }
133}
134
135#[derive(Debug)]
136struct MeterWrap(OtelMeter);
137impl Deref for MeterWrap {
138 type Target = OtelMeter;
139
140 fn deref(&self) -> &Self::Target {
141 &self.0
142 }
143}
144
145impl ProvideInstrument for MeterWrap {
146 fn create_gauge(
147 &self,
148 builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = f64>>, f64>,
149 ) -> Arc<dyn AsyncMeasure<Value = f64>> {
150 let mut otel_builder = self.f64_observable_gauge(builder.get_name().clone());
151
152 if let Some(desc) = builder.get_description() {
153 otel_builder = otel_builder.with_description(desc.clone());
154 }
155
156 if let Some(u) = builder.get_units() {
157 otel_builder = otel_builder.with_unit(u.clone());
158 }
159
160 otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<f64>| {
161 let f = builder.callback.clone();
162 f(&AsyncInstrumentWrap(input));
163 });
164
165 Arc::new(GaugeWrap(otel_builder.init()))
166 }
167
168 fn create_up_down_counter(
169 &self,
170 builder: InstrumentBuilder<'_, Arc<dyn UpDownCounter>>,
171 ) -> Arc<dyn UpDownCounter> {
172 let mut otel_builder = self.i64_up_down_counter(builder.get_name().clone());
173 if let Some(desc) = builder.get_description() {
174 otel_builder = otel_builder.with_description(desc.clone());
175 }
176
177 if let Some(u) = builder.get_units() {
178 otel_builder = otel_builder.with_unit(u.clone());
179 }
180
181 Arc::new(UpDownCounterWrap(otel_builder.init()))
182 }
183
184 fn create_async_up_down_counter(
185 &self,
186 builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = i64>>, i64>,
187 ) -> Arc<dyn AsyncMeasure<Value = i64>> {
188 let mut otel_builder = self.i64_observable_up_down_counter(builder.get_name().clone());
189
190 if let Some(desc) = builder.get_description() {
191 otel_builder = otel_builder.with_description(desc.clone());
192 }
193
194 if let Some(u) = builder.get_units() {
195 otel_builder = otel_builder.with_unit(u.clone());
196 }
197
198 otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<i64>| {
199 let f = builder.callback.clone();
200 f(&AsyncInstrumentWrap(input));
201 });
202
203 Arc::new(AsyncUpDownCounterWrap(otel_builder.init()))
204 }
205
206 fn create_monotonic_counter(
207 &self,
208 builder: InstrumentBuilder<'_, Arc<dyn MonotonicCounter>>,
209 ) -> Arc<dyn MonotonicCounter> {
210 let mut otel_builder = self.u64_counter(builder.get_name().clone());
211 if let Some(desc) = builder.get_description() {
212 otel_builder = otel_builder.with_description(desc.clone());
213 }
214
215 if let Some(u) = builder.get_units() {
216 otel_builder = otel_builder.with_unit(u.clone());
217 }
218
219 Arc::new(MonotonicCounterWrap(otel_builder.init()))
220 }
221
222 fn create_async_monotonic_counter(
223 &self,
224 builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = u64>>, u64>,
225 ) -> Arc<dyn AsyncMeasure<Value = u64>> {
226 let mut otel_builder = self.u64_observable_counter(builder.get_name().clone());
227
228 if let Some(desc) = builder.get_description() {
229 otel_builder = otel_builder.with_description(desc.clone());
230 }
231
232 if let Some(u) = builder.get_units() {
233 otel_builder = otel_builder.with_unit(u.clone());
234 }
235
236 otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<u64>| {
237 let f = builder.callback.clone();
238 f(&AsyncInstrumentWrap(input));
239 });
240
241 Arc::new(AsyncMonotonicCounterWrap(otel_builder.init()))
242 }
243
244 fn create_histogram(
245 &self,
246 builder: InstrumentBuilder<'_, Arc<dyn Histogram>>,
247 ) -> Arc<dyn Histogram> {
248 let mut otel_builder = self.f64_histogram(builder.get_name().clone());
249 if let Some(desc) = builder.get_description() {
250 otel_builder = otel_builder.with_description(desc.clone());
251 }
252
253 if let Some(u) = builder.get_units() {
254 otel_builder = otel_builder.with_unit(u.clone());
255 }
256
257 Arc::new(HistogramWrap(otel_builder.init()))
258 }
259}
260
261#[non_exhaustive]
263#[derive(Debug)]
264pub struct OtelMeterProvider {
265 meter_provider: OtelSdkMeterProvider,
266}
267
268impl OtelMeterProvider {
269 pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self {
271 Self {
272 meter_provider: otel_meter_provider,
273 }
274 }
275
276 pub fn flush(&self) -> Result<(), ObservabilityError> {
278 match self.meter_provider.force_flush() {
279 Ok(_) => Ok(()),
280 Err(err) => Err(ObservabilityError::new(ErrorKind::Other, err)),
281 }
282 }
283}
284
285impl ProvideMeter for OtelMeterProvider {
286 fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Meter {
287 Meter::new(Arc::new(MeterWrap(self.meter_provider.meter(scope))))
288 }
289}
290
291#[cfg(test)]
292mod tests {
293
294 use std::sync::Arc;
295
296 use aws_smithy_observability::instruments::AsyncMeasure;
297 use aws_smithy_observability::{AttributeValue, Attributes, TelemetryProvider};
298 use opentelemetry_sdk::metrics::{
299 data::{Gauge, Histogram, Sum},
300 PeriodicReader, SdkMeterProvider,
301 };
302 use opentelemetry_sdk::runtime::Tokio;
303 use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
304
305 use super::OtelMeterProvider;
306
307 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
309 async fn sync_instruments_work() {
310 let exporter = InMemoryMetricsExporter::default();
312 let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
313 let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
314
315 let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
317 let sdk_ref = sdk_mp.clone();
318 let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
319
320 let dyn_sdk_mp = sdk_tp.meter_provider();
322 let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
323
324 let mono_counter = dyn_sdk_meter
326 .create_monotonic_counter("TestMonoCounter")
327 .build();
328 mono_counter.add(4, None, None);
329 let ud_counter = dyn_sdk_meter
330 .create_up_down_counter("TestUpDownCounter")
331 .build();
332 ud_counter.add(-6, None, None);
333 let histogram = dyn_sdk_meter.create_histogram("TestHistogram").build();
334 histogram.record(1.234, None, None);
335
336 sdk_ref.flush().unwrap();
338
339 let finished_metrics = exporter.get_finished_metrics().unwrap();
341 let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
342 .data
343 .as_any()
344 .downcast_ref::<Sum<u64>>()
345 .unwrap()
346 .data_points[0]
347 .value;
348 assert_eq!(extracted_mono_counter_data, &4);
349
350 let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
351 .data
352 .as_any()
353 .downcast_ref::<Sum<i64>>()
354 .unwrap()
355 .data_points[0]
356 .value;
357 assert_eq!(extracted_ud_counter_data, &-6);
358
359 let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2]
360 .data
361 .as_any()
362 .downcast_ref::<Histogram<f64>>()
363 .unwrap()
364 .data_points[0]
365 .sum;
366 assert_eq!(extracted_histogram_data, &1.234);
367 }
368
369 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
370 async fn async_instrument_work() {
371 let exporter = InMemoryMetricsExporter::default();
373 let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
374 let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
375
376 let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
378 let sdk_ref = sdk_mp.clone();
379 let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
380
381 let dyn_sdk_mp = sdk_tp.meter_provider();
383 let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
384
385 let gauge = dyn_sdk_meter
387 .create_gauge(
388 "TestGauge".to_string(),
389 |measurement: &dyn AsyncMeasure<Value = f64>| {
391 let mut attrs = Attributes::new();
392 attrs.set(
393 "TestGaugeAttr",
394 AttributeValue::String("TestGaugeAttr".into()),
395 );
396 measurement.record(6.789, Some(&attrs), None);
397 },
398 )
399 .build();
400 gauge.record(1.234, None, None);
401
402 let async_ud_counter = dyn_sdk_meter
403 .create_async_up_down_counter(
404 "TestAsyncUpDownCounter".to_string(),
405 |measurement: &dyn AsyncMeasure<Value = i64>| {
406 let mut attrs = Attributes::new();
407 attrs.set(
408 "TestAsyncUpDownCounterAttr",
409 AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
410 );
411 measurement.record(12, Some(&attrs), None);
412 },
413 )
414 .build();
415 async_ud_counter.record(-6, None, None);
416
417 let async_mono_counter = dyn_sdk_meter
418 .create_async_monotonic_counter(
419 "TestAsyncMonoCounter".to_string(),
420 |measurement: &dyn AsyncMeasure<Value = u64>| {
421 let mut attrs = Attributes::new();
422 attrs.set(
423 "TestAsyncMonoCounterAttr",
424 AttributeValue::String("TestAsyncMonoCounterAttr".into()),
425 );
426 measurement.record(123, Some(&attrs), None);
427 },
428 )
429 .build();
430 async_mono_counter.record(4, None, None);
431
432 sdk_ref.flush().unwrap();
434
435 let finished_metrics = exporter.get_finished_metrics().unwrap();
437
438 let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
440 .data
441 .as_any()
442 .downcast_ref::<Gauge<f64>>()
443 .unwrap()
444 .data_points[0]
445 .value;
446 assert_eq!(extracted_gauge_data, &1.234);
447
448 let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
449 .data
450 .as_any()
451 .downcast_ref::<Sum<i64>>()
452 .unwrap()
453 .data_points[0]
454 .value;
455 assert_eq!(extracted_async_ud_counter_data, &-6);
456
457 let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
458 .data
459 .as_any()
460 .downcast_ref::<Sum<u64>>()
461 .unwrap()
462 .data_points[0]
463 .value;
464 assert_eq!(extracted_async_mono_data, &4);
465
466 let finished_metrics = exporter.get_finished_metrics().unwrap();
468 let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
469 .data
470 .as_any()
471 .downcast_ref::<Gauge<f64>>()
472 .unwrap()
473 .data_points[1]
474 .value;
475 assert_eq!(extracted_gauge_data, &6.789);
476
477 let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
478 .data
479 .as_any()
480 .downcast_ref::<Sum<i64>>()
481 .unwrap()
482 .data_points[1]
483 .value;
484 assert_eq!(extracted_async_ud_counter_data, &12);
485
486 let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
487 .data
488 .as_any()
489 .downcast_ref::<Sum<u64>>()
490 .unwrap()
491 .data_points[1]
492 .value;
493 assert_eq!(extracted_async_mono_data, &123);
494 }
495}