aws_smithy_observability_otel/
meter.rs1use std::fmt::Debug;
9use std::ops::Deref;
10use std::sync::Arc;
11
12use crate::attributes::kv_from_option_attr;
13use aws_smithy_observability::instruments::{
14 AsyncInstrumentBuilder, AsyncMeasure, Histogram, InstrumentBuilder, MonotonicCounter,
15 ProvideInstrument, UpDownCounter,
16};
17pub use aws_smithy_observability::meter::{Meter, ProvideMeter};
18
19use aws_smithy_observability::{Attributes, Context, ErrorKind, ObservabilityError};
20use opentelemetry::metrics::{
21 AsyncInstrument as OtelAsyncInstrument, Counter as OtelCounter, Histogram as OtelHistogram,
22 Meter as OtelMeter, MeterProvider as OtelMeterProviderTrait,
23 ObservableCounter as OtelObservableCounter, ObservableGauge as OtelObservableGauge,
24 ObservableUpDownCounter as OtelObservableUpDownCounter, UpDownCounter as OtelUpDownCounter,
25};
26use opentelemetry_sdk::metrics::SdkMeterProvider as OtelSdkMeterProvider;
27
28#[derive(Debug)]
29struct UpDownCounterWrap(OtelUpDownCounter<i64>);
30impl UpDownCounter for UpDownCounterWrap {
31 fn add(&self, value: i64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
32 self.0.add(value, &kv_from_option_attr(attributes));
33 }
34}
35
36#[derive(Debug)]
37struct HistogramWrap(OtelHistogram<f64>);
38impl Histogram for HistogramWrap {
39 fn record(&self, value: f64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
40 self.0.record(value, &kv_from_option_attr(attributes));
41 }
42}
43
44#[derive(Debug)]
45struct MonotonicCounterWrap(OtelCounter<u64>);
46impl MonotonicCounter for MonotonicCounterWrap {
47 fn add(&self, value: u64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
48 self.0.add(value, &kv_from_option_attr(attributes));
49 }
50}
51
52#[derive(Debug)]
53struct GaugeWrap(OtelObservableGauge<f64>);
54impl AsyncMeasure for GaugeWrap {
55 type Value = f64;
56
57 fn record(
58 &self,
59 value: Self::Value,
60 attributes: Option<&Attributes>,
61 _context: Option<&dyn Context>,
62 ) {
63 self.0.observe(value, &kv_from_option_attr(attributes));
64 }
65
66 fn stop(&self) {}
69}
70
71#[derive(Debug)]
72struct AsyncUpDownCounterWrap(OtelObservableUpDownCounter<i64>);
73impl AsyncMeasure for AsyncUpDownCounterWrap {
74 type Value = i64;
75
76 fn record(
77 &self,
78 value: Self::Value,
79 attributes: Option<&Attributes>,
80 _context: Option<&dyn Context>,
81 ) {
82 self.0.observe(value, &kv_from_option_attr(attributes));
83 }
84
85 fn stop(&self) {}
88}
89
90#[derive(Debug)]
91struct AsyncMonotonicCounterWrap(OtelObservableCounter<u64>);
92impl AsyncMeasure for AsyncMonotonicCounterWrap {
93 type Value = u64;
94
95 fn record(
96 &self,
97 value: Self::Value,
98 attributes: Option<&Attributes>,
99 _context: Option<&dyn Context>,
100 ) {
101 self.0.observe(value, &kv_from_option_attr(attributes));
102 }
103
104 fn stop(&self) {}
107}
108
109struct AsyncInstrumentWrap<'a, T>(&'a (dyn OtelAsyncInstrument<T> + Send + Sync));
110impl<T> AsyncMeasure for AsyncInstrumentWrap<'_, T> {
111 type Value = T;
112
113 fn record(
114 &self,
115 value: Self::Value,
116 attributes: Option<&Attributes>,
117 _context: Option<&dyn Context>,
118 ) {
119 self.0.observe(value, &kv_from_option_attr(attributes));
120 }
121
122 fn stop(&self) {}
125}
126
127impl<T> Debug for AsyncInstrumentWrap<'_, T> {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 f.debug_tuple("AsyncInstrumentWrap").finish()
132 }
133}
134
135#[derive(Debug)]
136struct MeterWrap(OtelMeter);
137impl Deref for MeterWrap {
138 type Target = OtelMeter;
139
140 fn deref(&self) -> &Self::Target {
141 &self.0
142 }
143}
144
145impl ProvideInstrument for MeterWrap {
146 fn create_gauge(
147 &self,
148 builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = f64>>, f64>,
149 ) -> Arc<dyn AsyncMeasure<Value = f64>> {
150 let mut otel_builder = self.f64_observable_gauge(builder.get_name().clone());
151
152 if let Some(desc) = builder.get_description() {
153 otel_builder = otel_builder.with_description(desc.clone());
154 }
155
156 if let Some(u) = builder.get_units() {
157 otel_builder = otel_builder.with_unit(u.clone());
158 }
159
160 otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<f64>| {
161 let f = builder.callback.clone();
162 f(&AsyncInstrumentWrap(input));
163 });
164
165 Arc::new(GaugeWrap(otel_builder.init()))
166 }
167
168 fn create_up_down_counter(
169 &self,
170 builder: InstrumentBuilder<'_, Arc<dyn UpDownCounter>>,
171 ) -> Arc<dyn UpDownCounter> {
172 let mut otel_builder = self.i64_up_down_counter(builder.get_name().clone());
173 if let Some(desc) = builder.get_description() {
174 otel_builder = otel_builder.with_description(desc.clone());
175 }
176
177 if let Some(u) = builder.get_units() {
178 otel_builder = otel_builder.with_unit(u.clone());
179 }
180
181 Arc::new(UpDownCounterWrap(otel_builder.init()))
182 }
183
184 fn create_async_up_down_counter(
185 &self,
186 builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = i64>>, i64>,
187 ) -> Arc<dyn AsyncMeasure<Value = i64>> {
188 let mut otel_builder = self.i64_observable_up_down_counter(builder.get_name().clone());
189
190 if let Some(desc) = builder.get_description() {
191 otel_builder = otel_builder.with_description(desc.clone());
192 }
193
194 if let Some(u) = builder.get_units() {
195 otel_builder = otel_builder.with_unit(u.clone());
196 }
197
198 otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<i64>| {
199 let f = builder.callback.clone();
200 f(&AsyncInstrumentWrap(input));
201 });
202
203 Arc::new(AsyncUpDownCounterWrap(otel_builder.init()))
204 }
205
206 fn create_monotonic_counter(
207 &self,
208 builder: InstrumentBuilder<'_, Arc<dyn MonotonicCounter>>,
209 ) -> Arc<dyn MonotonicCounter> {
210 let mut otel_builder = self.u64_counter(builder.get_name().clone());
211 if let Some(desc) = builder.get_description() {
212 otel_builder = otel_builder.with_description(desc.clone());
213 }
214
215 if let Some(u) = builder.get_units() {
216 otel_builder = otel_builder.with_unit(u.clone());
217 }
218
219 Arc::new(MonotonicCounterWrap(otel_builder.init()))
220 }
221
222 fn create_async_monotonic_counter(
223 &self,
224 builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = u64>>, u64>,
225 ) -> Arc<dyn AsyncMeasure<Value = u64>> {
226 let mut otel_builder = self.u64_observable_counter(builder.get_name().clone());
227
228 if let Some(desc) = builder.get_description() {
229 otel_builder = otel_builder.with_description(desc.clone());
230 }
231
232 if let Some(u) = builder.get_units() {
233 otel_builder = otel_builder.with_unit(u.clone());
234 }
235
236 otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<u64>| {
237 let f = builder.callback.clone();
238 f(&AsyncInstrumentWrap(input));
239 });
240
241 Arc::new(AsyncMonotonicCounterWrap(otel_builder.init()))
242 }
243
244 fn create_histogram(
245 &self,
246 builder: InstrumentBuilder<'_, Arc<dyn Histogram>>,
247 ) -> Arc<dyn Histogram> {
248 let mut otel_builder = self.f64_histogram(builder.get_name().clone());
249 if let Some(desc) = builder.get_description() {
250 otel_builder = otel_builder.with_description(desc.clone());
251 }
252
253 if let Some(u) = builder.get_units() {
254 otel_builder = otel_builder.with_unit(u.clone());
255 }
256
257 Arc::new(HistogramWrap(otel_builder.init()))
258 }
259}
260
261#[non_exhaustive]
263#[derive(Debug)]
264pub struct OtelMeterProvider {
265 meter_provider: OtelSdkMeterProvider,
266}
267
268impl OtelMeterProvider {
269 pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self {
271 Self {
272 meter_provider: otel_meter_provider,
273 }
274 }
275
276 pub fn flush(&self) -> Result<(), ObservabilityError> {
278 match self.meter_provider.force_flush() {
279 Ok(_) => Ok(()),
280 Err(err) => Err(ObservabilityError::new(ErrorKind::Other, err)),
281 }
282 }
283}
284
285impl ProvideMeter for OtelMeterProvider {
286 fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Meter {
287 Meter::new(Arc::new(MeterWrap(self.meter_provider.meter(scope))))
288 }
289
290 fn as_any(&self) -> &dyn std::any::Any {
291 self
292 }
293
294 fn provider_name(&self) -> &'static str {
295 "otel"
296 }
297}
298
299#[cfg(test)]
300mod tests {
301
302 use std::sync::Arc;
303
304 use aws_smithy_observability::instruments::AsyncMeasure;
305 use aws_smithy_observability::{AttributeValue, Attributes, TelemetryProvider};
306 use opentelemetry_sdk::metrics::{
307 data::{Gauge, Histogram, Sum},
308 PeriodicReader, SdkMeterProvider,
309 };
310 use opentelemetry_sdk::runtime::Tokio;
311 use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
312
313 use super::OtelMeterProvider;
314
315 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
317 async fn sync_instruments_work() {
318 let exporter = InMemoryMetricsExporter::default();
320 let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
321 let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
322
323 let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
325 let sdk_ref = sdk_mp.clone();
326 let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
327
328 let dyn_sdk_mp = sdk_tp.meter_provider();
330 let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
331
332 let mono_counter = dyn_sdk_meter
334 .create_monotonic_counter("TestMonoCounter")
335 .build();
336 mono_counter.add(4, None, None);
337 let ud_counter = dyn_sdk_meter
338 .create_up_down_counter("TestUpDownCounter")
339 .build();
340 ud_counter.add(-6, None, None);
341 let histogram = dyn_sdk_meter.create_histogram("TestHistogram").build();
342 histogram.record(1.234, None, None);
343
344 sdk_ref.flush().unwrap();
346
347 let finished_metrics = exporter.get_finished_metrics().unwrap();
349 let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
350 .data
351 .as_any()
352 .downcast_ref::<Sum<u64>>()
353 .unwrap()
354 .data_points[0]
355 .value;
356 assert_eq!(extracted_mono_counter_data, &4);
357
358 let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
359 .data
360 .as_any()
361 .downcast_ref::<Sum<i64>>()
362 .unwrap()
363 .data_points[0]
364 .value;
365 assert_eq!(extracted_ud_counter_data, &-6);
366
367 let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2]
368 .data
369 .as_any()
370 .downcast_ref::<Histogram<f64>>()
371 .unwrap()
372 .data_points[0]
373 .sum;
374 assert_eq!(extracted_histogram_data, &1.234);
375 }
376
377 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
378 async fn async_instrument_work() {
379 let exporter = InMemoryMetricsExporter::default();
381 let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
382 let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
383
384 let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
386 let sdk_ref = sdk_mp.clone();
387 let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
388
389 let dyn_sdk_mp = sdk_tp.meter_provider();
391 let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
392
393 let gauge = dyn_sdk_meter
395 .create_gauge(
396 "TestGauge".to_string(),
397 |measurement: &dyn AsyncMeasure<Value = f64>| {
399 let mut attrs = Attributes::new();
400 attrs.set(
401 "TestGaugeAttr",
402 AttributeValue::String("TestGaugeAttr".into()),
403 );
404 measurement.record(6.789, Some(&attrs), None);
405 },
406 )
407 .build();
408 gauge.record(1.234, None, None);
409
410 let async_ud_counter = dyn_sdk_meter
411 .create_async_up_down_counter(
412 "TestAsyncUpDownCounter".to_string(),
413 |measurement: &dyn AsyncMeasure<Value = i64>| {
414 let mut attrs = Attributes::new();
415 attrs.set(
416 "TestAsyncUpDownCounterAttr",
417 AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
418 );
419 measurement.record(12, Some(&attrs), None);
420 },
421 )
422 .build();
423 async_ud_counter.record(-6, None, None);
424
425 let async_mono_counter = dyn_sdk_meter
426 .create_async_monotonic_counter(
427 "TestAsyncMonoCounter".to_string(),
428 |measurement: &dyn AsyncMeasure<Value = u64>| {
429 let mut attrs = Attributes::new();
430 attrs.set(
431 "TestAsyncMonoCounterAttr",
432 AttributeValue::String("TestAsyncMonoCounterAttr".into()),
433 );
434 measurement.record(123, Some(&attrs), None);
435 },
436 )
437 .build();
438 async_mono_counter.record(4, None, None);
439
440 sdk_ref.flush().unwrap();
442
443 let finished_metrics = exporter.get_finished_metrics().unwrap();
445
446 let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
448 .data
449 .as_any()
450 .downcast_ref::<Gauge<f64>>()
451 .unwrap()
452 .data_points[0]
453 .value;
454 assert_eq!(extracted_gauge_data, &1.234);
455
456 let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
457 .data
458 .as_any()
459 .downcast_ref::<Sum<i64>>()
460 .unwrap()
461 .data_points[0]
462 .value;
463 assert_eq!(extracted_async_ud_counter_data, &-6);
464
465 let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
466 .data
467 .as_any()
468 .downcast_ref::<Sum<u64>>()
469 .unwrap()
470 .data_points[0]
471 .value;
472 assert_eq!(extracted_async_mono_data, &4);
473
474 let finished_metrics = exporter.get_finished_metrics().unwrap();
476 let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
477 .data
478 .as_any()
479 .downcast_ref::<Gauge<f64>>()
480 .unwrap()
481 .data_points[1]
482 .value;
483 assert_eq!(extracted_gauge_data, &6.789);
484
485 let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
486 .data
487 .as_any()
488 .downcast_ref::<Sum<i64>>()
489 .unwrap()
490 .data_points[1]
491 .value;
492 assert_eq!(extracted_async_ud_counter_data, &12);
493
494 let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
495 .data
496 .as_any()
497 .downcast_ref::<Sum<u64>>()
498 .unwrap()
499 .data_points[1]
500 .value;
501 assert_eq!(extracted_async_mono_data, &123);
502 }
503}