aws_smithy_observability_otel/
meter.rs1use std::fmt::Debug;
9use std::ops::Deref;
10use std::sync::Arc;
11
12use crate::attributes::kv_from_option_attr;
13use aws_smithy_observability::instruments::{
14 AsyncInstrumentBuilder, AsyncMeasure, Histogram, InstrumentBuilder, MonotonicCounter,
15 ProvideInstrument, UpDownCounter,
16};
17pub use aws_smithy_observability::meter::{Meter, ProvideMeter};
18
19use aws_smithy_observability::{Attributes, Context, ErrorKind, ObservabilityError};
20use opentelemetry::metrics::{
21 AsyncInstrument as OtelAsyncInstrument, Counter as OtelCounter, Histogram as OtelHistogram,
22 Meter as OtelMeter, MeterProvider as OtelMeterProviderTrait,
23 ObservableCounter as OtelObservableCounter, ObservableGauge as OtelObservableGauge,
24 ObservableUpDownCounter as OtelObservableUpDownCounter, UpDownCounter as OtelUpDownCounter,
25};
26use opentelemetry_sdk::metrics::SdkMeterProvider as OtelSdkMeterProvider;
27
28#[derive(Debug)]
29struct UpDownCounterWrap(OtelUpDownCounter<i64>);
30impl UpDownCounter for UpDownCounterWrap {
31 fn add(&self, value: i64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
32 self.0.add(value, &kv_from_option_attr(attributes));
33 }
34}
35
36#[derive(Debug)]
37struct HistogramWrap(OtelHistogram<f64>);
38impl Histogram for HistogramWrap {
39 fn record(&self, value: f64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
40 self.0.record(value, &kv_from_option_attr(attributes));
41 }
42}
43
44#[derive(Debug)]
45struct MonotonicCounterWrap(OtelCounter<u64>);
46impl MonotonicCounter for MonotonicCounterWrap {
47 fn add(&self, value: u64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
48 self.0.add(value, &kv_from_option_attr(attributes));
49 }
50}
51
52#[derive(Debug)]
53struct GaugeWrap(OtelObservableGauge<f64>);
54impl AsyncMeasure for GaugeWrap {
55 type Value = f64;
56
57 fn record(
58 &self,
59 _value: Self::Value,
60 _attributes: Option<&Attributes>,
61 _context: Option<&dyn Context>,
62 ) {
63 }
67
68 fn stop(&self) {}
71}
72
73#[derive(Debug)]
74struct AsyncUpDownCounterWrap(OtelObservableUpDownCounter<i64>);
75impl AsyncMeasure for AsyncUpDownCounterWrap {
76 type Value = i64;
77
78 fn record(
79 &self,
80 _value: Self::Value,
81 _attributes: Option<&Attributes>,
82 _context: Option<&dyn Context>,
83 ) {
84 }
88
89 fn stop(&self) {}
92}
93
94#[derive(Debug)]
95struct AsyncMonotonicCounterWrap(OtelObservableCounter<u64>);
96impl AsyncMeasure for AsyncMonotonicCounterWrap {
97 type Value = u64;
98
99 fn record(
100 &self,
101 _value: Self::Value,
102 _attributes: Option<&Attributes>,
103 _context: Option<&dyn Context>,
104 ) {
105 }
109
110 fn stop(&self) {}
113}
114
115struct AsyncInstrumentWrap<'a, T>(&'a (dyn OtelAsyncInstrument<T> + Send + Sync));
116impl<T> AsyncMeasure for AsyncInstrumentWrap<'_, T> {
117 type Value = T;
118
119 fn record(
120 &self,
121 value: Self::Value,
122 attributes: Option<&Attributes>,
123 _context: Option<&dyn Context>,
124 ) {
125 self.0.observe(value, &kv_from_option_attr(attributes));
126 }
127
128 fn stop(&self) {}
131}
132
133impl<T> Debug for AsyncInstrumentWrap<'_, T> {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 f.debug_tuple("AsyncInstrumentWrap").finish()
138 }
139}
140
141#[derive(Debug)]
142struct MeterWrap(OtelMeter);
143impl Deref for MeterWrap {
144 type Target = OtelMeter;
145
146 fn deref(&self) -> &Self::Target {
147 &self.0
148 }
149}
150
151impl ProvideInstrument for MeterWrap {
152 fn create_gauge(
153 &self,
154 builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = f64>>, f64>,
155 ) -> Arc<dyn AsyncMeasure<Value = f64>> {
156 let mut otel_builder = self.f64_observable_gauge(builder.get_name().clone());
157
158 if let Some(desc) = builder.get_description() {
159 otel_builder = otel_builder.with_description(desc.clone());
160 }
161
162 if let Some(u) = builder.get_units() {
163 otel_builder = otel_builder.with_unit(u.clone());
164 }
165
166 otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<f64>| {
167 let f = builder.callback.clone();
168 f(&AsyncInstrumentWrap(input));
169 });
170
171 Arc::new(GaugeWrap(otel_builder.build()))
172 }
173
174 fn create_up_down_counter(
175 &self,
176 builder: InstrumentBuilder<'_, Arc<dyn UpDownCounter>>,
177 ) -> Arc<dyn UpDownCounter> {
178 let mut otel_builder = self.i64_up_down_counter(builder.get_name().clone());
179 if let Some(desc) = builder.get_description() {
180 otel_builder = otel_builder.with_description(desc.clone());
181 }
182
183 if let Some(u) = builder.get_units() {
184 otel_builder = otel_builder.with_unit(u.clone());
185 }
186
187 Arc::new(UpDownCounterWrap(otel_builder.build()))
188 }
189
190 fn create_async_up_down_counter(
191 &self,
192 builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = i64>>, i64>,
193 ) -> Arc<dyn AsyncMeasure<Value = i64>> {
194 let mut otel_builder = self.i64_observable_up_down_counter(builder.get_name().clone());
195
196 if let Some(desc) = builder.get_description() {
197 otel_builder = otel_builder.with_description(desc.clone());
198 }
199
200 if let Some(u) = builder.get_units() {
201 otel_builder = otel_builder.with_unit(u.clone());
202 }
203
204 otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<i64>| {
205 let f = builder.callback.clone();
206 f(&AsyncInstrumentWrap(input));
207 });
208
209 Arc::new(AsyncUpDownCounterWrap(otel_builder.build()))
210 }
211
212 fn create_monotonic_counter(
213 &self,
214 builder: InstrumentBuilder<'_, Arc<dyn MonotonicCounter>>,
215 ) -> Arc<dyn MonotonicCounter> {
216 let mut otel_builder = self.u64_counter(builder.get_name().clone());
217 if let Some(desc) = builder.get_description() {
218 otel_builder = otel_builder.with_description(desc.clone());
219 }
220
221 if let Some(u) = builder.get_units() {
222 otel_builder = otel_builder.with_unit(u.clone());
223 }
224
225 Arc::new(MonotonicCounterWrap(otel_builder.build()))
226 }
227
228 fn create_async_monotonic_counter(
229 &self,
230 builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = u64>>, u64>,
231 ) -> Arc<dyn AsyncMeasure<Value = u64>> {
232 let mut otel_builder = self.u64_observable_counter(builder.get_name().clone());
233
234 if let Some(desc) = builder.get_description() {
235 otel_builder = otel_builder.with_description(desc.clone());
236 }
237
238 if let Some(u) = builder.get_units() {
239 otel_builder = otel_builder.with_unit(u.clone());
240 }
241
242 otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<u64>| {
243 let f = builder.callback.clone();
244 f(&AsyncInstrumentWrap(input));
245 });
246
247 Arc::new(AsyncMonotonicCounterWrap(otel_builder.build()))
248 }
249
250 fn create_histogram(
251 &self,
252 builder: InstrumentBuilder<'_, Arc<dyn Histogram>>,
253 ) -> Arc<dyn Histogram> {
254 let mut otel_builder = self.f64_histogram(builder.get_name().clone());
255 if let Some(desc) = builder.get_description() {
256 otel_builder = otel_builder.with_description(desc.clone());
257 }
258
259 if let Some(u) = builder.get_units() {
260 otel_builder = otel_builder.with_unit(u.clone());
261 }
262
263 Arc::new(HistogramWrap(otel_builder.build()))
264 }
265}
266
267#[non_exhaustive]
269#[derive(Debug)]
270pub struct OtelMeterProvider {
271 meter_provider: OtelSdkMeterProvider,
272}
273
274impl OtelMeterProvider {
275 pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self {
277 Self {
278 meter_provider: otel_meter_provider,
279 }
280 }
281
282 pub fn flush(&self) -> Result<(), ObservabilityError> {
284 match self.meter_provider.force_flush() {
285 Ok(_) => Ok(()),
286 Err(err) => Err(ObservabilityError::new(ErrorKind::Other, err)),
287 }
288 }
289}
290
291impl ProvideMeter for OtelMeterProvider {
292 fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Meter {
293 Meter::new(Arc::new(MeterWrap(self.meter_provider.meter(scope))))
294 }
295
296 fn as_any(&self) -> &dyn std::any::Any {
297 self
298 }
299}
300
301#[cfg(test)]
302mod tests {
303
304 use std::sync::Arc;
305
306 use aws_smithy_observability::instruments::AsyncMeasure;
307 use aws_smithy_observability::{AttributeValue, Attributes, TelemetryProvider};
308 use opentelemetry_sdk::metrics::{
309 data::{Gauge, Histogram, Sum},
310 PeriodicReader, SdkMeterProvider,
311 };
312 use opentelemetry_sdk::runtime::Tokio;
313 use opentelemetry_sdk::testing::metrics::InMemoryMetricExporter;
314
315 use super::OtelMeterProvider;
316
317 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
319 async fn sync_instruments_work() {
320 let exporter = InMemoryMetricExporter::default();
322 let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
323 let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
324
325 let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
327 let sdk_ref = sdk_mp.clone();
328 let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
329
330 let dyn_sdk_mp = sdk_tp.meter_provider();
332 let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
333
334 let mono_counter = dyn_sdk_meter
336 .create_monotonic_counter("TestMonoCounter")
337 .build();
338 mono_counter.add(4, None, None);
339 let ud_counter = dyn_sdk_meter
340 .create_up_down_counter("TestUpDownCounter")
341 .build();
342 ud_counter.add(-6, None, None);
343 let histogram = dyn_sdk_meter.create_histogram("TestHistogram").build();
344 histogram.record(1.234, None, None);
345
346 sdk_ref.flush().unwrap();
348
349 let finished_metrics = exporter.get_finished_metrics().unwrap();
351 let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
352 .data
353 .as_any()
354 .downcast_ref::<Sum<u64>>()
355 .unwrap()
356 .data_points[0]
357 .value;
358 assert_eq!(extracted_mono_counter_data, &4);
359
360 let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
361 .data
362 .as_any()
363 .downcast_ref::<Sum<i64>>()
364 .unwrap()
365 .data_points[0]
366 .value;
367 assert_eq!(extracted_ud_counter_data, &-6);
368
369 let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2]
370 .data
371 .as_any()
372 .downcast_ref::<Histogram<f64>>()
373 .unwrap()
374 .data_points[0]
375 .sum;
376 assert_eq!(extracted_histogram_data, &1.234);
377 }
378
379 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
380 async fn async_instrument_work() {
381 let exporter = InMemoryMetricExporter::default();
383 let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
384 let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
385
386 let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
388 let sdk_ref = sdk_mp.clone();
389 let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
390
391 let dyn_sdk_mp = sdk_tp.meter_provider();
393 let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
394
395 let _gauge = dyn_sdk_meter
397 .create_gauge(
398 "TestGauge".to_string(),
399 |measurement: &dyn AsyncMeasure<Value = f64>| {
401 let mut attrs = Attributes::new();
402 attrs.set(
403 "TestGaugeAttr",
404 AttributeValue::String("TestGaugeAttr".into()),
405 );
406 measurement.record(6.789, Some(&attrs), None);
407 },
408 )
409 .build();
410
411 let _async_ud_counter = dyn_sdk_meter
412 .create_async_up_down_counter(
413 "TestAsyncUpDownCounter".to_string(),
414 |measurement: &dyn AsyncMeasure<Value = i64>| {
415 let mut attrs = Attributes::new();
416 attrs.set(
417 "TestAsyncUpDownCounterAttr",
418 AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
419 );
420 measurement.record(12, Some(&attrs), None);
421 },
422 )
423 .build();
424
425 let _async_mono_counter = dyn_sdk_meter
426 .create_async_monotonic_counter(
427 "TestAsyncMonoCounter".to_string(),
428 |measurement: &dyn AsyncMeasure<Value = u64>| {
429 let mut attrs = Attributes::new();
430 attrs.set(
431 "TestAsyncMonoCounterAttr",
432 AttributeValue::String("TestAsyncMonoCounterAttr".into()),
433 );
434 measurement.record(123, Some(&attrs), None);
435 },
436 )
437 .build();
438
439 sdk_ref.flush().unwrap();
441
442 let finished_metrics = exporter.get_finished_metrics().unwrap();
444
445 let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
448 .data
449 .as_any()
450 .downcast_ref::<Gauge<f64>>()
451 .unwrap()
452 .data_points[0]
453 .value;
454 assert_eq!(extracted_gauge_data, &6.789);
455
456 let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
457 .data
458 .as_any()
459 .downcast_ref::<Sum<i64>>()
460 .unwrap()
461 .data_points[0]
462 .value;
463 assert_eq!(extracted_async_ud_counter_data, &12);
464
465 let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
466 .data
467 .as_any()
468 .downcast_ref::<Sum<u64>>()
469 .unwrap()
470 .data_points[0]
471 .value;
472 assert_eq!(extracted_async_mono_data, &123);
473 }
474}