1 + | /*
|
2 + | * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
3 + | * SPDX-License-Identifier: Apache-2.0
|
4 + | */
|
5 + |
|
6 + | //! OpenTelemetry based implementations of the Smithy Observability Meter traits.
|
7 + |
|
8 + | use std::fmt::Debug;
|
9 + | use std::ops::Deref;
|
10 + | use std::sync::Arc;
|
11 + |
|
12 + | use crate::attributes::kv_from_option_attr;
|
13 + | use aws_smithy_observability::instruments::{
|
14 + | AsyncInstrumentBuilder, AsyncMeasure, Histogram, InstrumentBuilder, MonotonicCounter,
|
15 + | ProvideInstrument, UpDownCounter,
|
16 + | };
|
17 + | pub use aws_smithy_observability::meter::{Meter, ProvideMeter};
|
18 + |
|
19 + | use aws_smithy_observability::{Attributes, Context, ErrorKind, ObservabilityError};
|
20 + | use opentelemetry::metrics::{
|
21 + | AsyncInstrument as OtelAsyncInstrument, Counter as OtelCounter, Histogram as OtelHistogram,
|
22 + | Meter as OtelMeter, MeterProvider as OtelMeterProviderTrait,
|
23 + | ObservableCounter as OtelObservableCounter, ObservableGauge as OtelObservableGauge,
|
24 + | ObservableUpDownCounter as OtelObservableUpDownCounter, UpDownCounter as OtelUpDownCounter,
|
25 + | };
|
26 + | use opentelemetry_sdk::metrics::SdkMeterProvider as OtelSdkMeterProvider;
|
27 + |
|
28 + | #[derive(Debug)]
|
29 + | struct UpDownCounterWrap(OtelUpDownCounter<i64>);
|
30 + | impl UpDownCounter for UpDownCounterWrap {
|
31 + | fn add(&self, value: i64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
|
32 + | self.0.add(value, &kv_from_option_attr(attributes));
|
33 + | }
|
34 + | }
|
35 + |
|
36 + | #[derive(Debug)]
|
37 + | struct HistogramWrap(OtelHistogram<f64>);
|
38 + | impl Histogram for HistogramWrap {
|
39 + | fn record(&self, value: f64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
|
40 + | self.0.record(value, &kv_from_option_attr(attributes));
|
41 + | }
|
42 + | }
|
43 + |
|
44 + | #[derive(Debug)]
|
45 + | struct MonotonicCounterWrap(OtelCounter<u64>);
|
46 + | impl MonotonicCounter for MonotonicCounterWrap {
|
47 + | fn add(&self, value: u64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
|
48 + | self.0.add(value, &kv_from_option_attr(attributes));
|
49 + | }
|
50 + | }
|
51 + |
|
52 + | #[derive(Debug)]
|
53 + | struct GaugeWrap(OtelObservableGauge<f64>);
|
54 + | impl AsyncMeasure for GaugeWrap {
|
55 + | type Value = f64;
|
56 + |
|
57 + | fn record(
|
58 + | &self,
|
59 + | value: Self::Value,
|
60 + | attributes: Option<&Attributes>,
|
61 + | _context: Option<&dyn Context>,
|
62 + | ) {
|
63 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
64 + | }
|
65 + |
|
66 + | // OTel rust does not currently support unregistering callbacks
|
67 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
68 + | fn stop(&self) {}
|
69 + | }
|
70 + |
|
71 + | #[derive(Debug)]
|
72 + | struct AsyncUpDownCounterWrap(OtelObservableUpDownCounter<i64>);
|
73 + | impl AsyncMeasure for AsyncUpDownCounterWrap {
|
74 + | type Value = i64;
|
75 + |
|
76 + | fn record(
|
77 + | &self,
|
78 + | value: Self::Value,
|
79 + | attributes: Option<&Attributes>,
|
80 + | _context: Option<&dyn Context>,
|
81 + | ) {
|
82 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
83 + | }
|
84 + |
|
85 + | // OTel rust does not currently support unregistering callbacks
|
86 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
87 + | fn stop(&self) {}
|
88 + | }
|
89 + |
|
90 + | #[derive(Debug)]
|
91 + | struct AsyncMonotonicCounterWrap(OtelObservableCounter<u64>);
|
92 + | impl AsyncMeasure for AsyncMonotonicCounterWrap {
|
93 + | type Value = u64;
|
94 + |
|
95 + | fn record(
|
96 + | &self,
|
97 + | value: Self::Value,
|
98 + | attributes: Option<&Attributes>,
|
99 + | _context: Option<&dyn Context>,
|
100 + | ) {
|
101 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
102 + | }
|
103 + |
|
104 + | // OTel rust does not currently support unregistering callbacks
|
105 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
106 + | fn stop(&self) {}
|
107 + | }
|
108 + |
|
109 + | struct AsyncInstrumentWrap<'a, T>(&'a (dyn OtelAsyncInstrument<T> + Send + Sync));
|
110 + | impl<T> AsyncMeasure for AsyncInstrumentWrap<'_, T> {
|
111 + | type Value = T;
|
112 + |
|
113 + | fn record(
|
114 + | &self,
|
115 + | value: Self::Value,
|
116 + | attributes: Option<&Attributes>,
|
117 + | _context: Option<&dyn Context>,
|
118 + | ) {
|
119 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
120 + | }
|
121 + |
|
122 + | // OTel rust does not currently support unregistering callbacks
|
123 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
124 + | fn stop(&self) {}
|
125 + | }
|
126 + |
|
127 + | // The OtelAsyncInstrument trait does not have Debug as a supertrait, so we impl a minimal version
|
128 + | // for our wrapper struct
|
129 + | impl<T> Debug for AsyncInstrumentWrap<'_, T> {
|
130 + | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
131 + | f.debug_tuple("AsyncInstrumentWrap").finish()
|
132 + | }
|
133 + | }
|
134 + |
|
135 + | #[derive(Debug)]
|
136 + | struct MeterWrap(OtelMeter);
|
137 + | impl Deref for MeterWrap {
|
138 + | type Target = OtelMeter;
|
139 + |
|
140 + | fn deref(&self) -> &Self::Target {
|
141 + | &self.0
|
142 + | }
|
143 + | }
|
144 + |
|
145 + | impl ProvideInstrument for MeterWrap {
|
146 + | fn create_gauge(
|
147 + | &self,
|
148 + | builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = f64>>, f64>,
|
149 + | ) -> Arc<dyn AsyncMeasure<Value = f64>> {
|
150 + | let mut otel_builder = self.f64_observable_gauge(builder.get_name().clone());
|
151 + |
|
152 + | if let Some(desc) = builder.get_description() {
|
153 + | otel_builder = otel_builder.with_description(desc.clone());
|
154 + | }
|
155 + |
|
156 + | if let Some(u) = builder.get_units() {
|
157 + | otel_builder = otel_builder.with_unit(u.clone());
|
158 + | }
|
159 + |
|
160 + | otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<f64>| {
|
161 + | let f = builder.callback.clone();
|
162 + | f(&AsyncInstrumentWrap(input));
|
163 + | });
|
164 + |
|
165 + | Arc::new(GaugeWrap(otel_builder.init()))
|
166 + | }
|
167 + |
|
168 + | fn create_up_down_counter(
|
169 + | &self,
|
170 + | builder: InstrumentBuilder<'_, Arc<dyn UpDownCounter>>,
|
171 + | ) -> Arc<dyn UpDownCounter> {
|
172 + | let mut otel_builder = self.i64_up_down_counter(builder.get_name().clone());
|
173 + | if let Some(desc) = builder.get_description() {
|
174 + | otel_builder = otel_builder.with_description(desc.clone());
|
175 + | }
|
176 + |
|
177 + | if let Some(u) = builder.get_units() {
|
178 + | otel_builder = otel_builder.with_unit(u.clone());
|
179 + | }
|
180 + |
|
181 + | Arc::new(UpDownCounterWrap(otel_builder.init()))
|
182 + | }
|
183 + |
|
184 + | fn create_async_up_down_counter(
|
185 + | &self,
|
186 + | builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = i64>>, i64>,
|
187 + | ) -> Arc<dyn AsyncMeasure<Value = i64>> {
|
188 + | let mut otel_builder = self.i64_observable_up_down_counter(builder.get_name().clone());
|
189 + |
|
190 + | if let Some(desc) = builder.get_description() {
|
191 + | otel_builder = otel_builder.with_description(desc.clone());
|
192 + | }
|
193 + |
|
194 + | if let Some(u) = builder.get_units() {
|
195 + | otel_builder = otel_builder.with_unit(u.clone());
|
196 + | }
|
197 + |
|
198 + | otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<i64>| {
|
199 + | let f = builder.callback.clone();
|
200 + | f(&AsyncInstrumentWrap(input));
|
201 + | });
|
202 + |
|
203 + | Arc::new(AsyncUpDownCounterWrap(otel_builder.init()))
|
204 + | }
|
205 + |
|
206 + | fn create_monotonic_counter(
|
207 + | &self,
|
208 + | builder: InstrumentBuilder<'_, Arc<dyn MonotonicCounter>>,
|
209 + | ) -> Arc<dyn MonotonicCounter> {
|
210 + | let mut otel_builder = self.u64_counter(builder.get_name().clone());
|
211 + | if let Some(desc) = builder.get_description() {
|
212 + | otel_builder = otel_builder.with_description(desc.clone());
|
213 + | }
|
214 + |
|
215 + | if let Some(u) = builder.get_units() {
|
216 + | otel_builder = otel_builder.with_unit(u.clone());
|
217 + | }
|
218 + |
|
219 + | Arc::new(MonotonicCounterWrap(otel_builder.init()))
|
220 + | }
|
221 + |
|
222 + | fn create_async_monotonic_counter(
|
223 + | &self,
|
224 + | builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = u64>>, u64>,
|
225 + | ) -> Arc<dyn AsyncMeasure<Value = u64>> {
|
226 + | let mut otel_builder = self.u64_observable_counter(builder.get_name().clone());
|
227 + |
|
228 + | if let Some(desc) = builder.get_description() {
|
229 + | otel_builder = otel_builder.with_description(desc.clone());
|
230 + | }
|
231 + |
|
232 + | if let Some(u) = builder.get_units() {
|
233 + | otel_builder = otel_builder.with_unit(u.clone());
|
234 + | }
|
235 + |
|
236 + | otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<u64>| {
|
237 + | let f = builder.callback.clone();
|
238 + | f(&AsyncInstrumentWrap(input));
|
239 + | });
|
240 + |
|
241 + | Arc::new(AsyncMonotonicCounterWrap(otel_builder.init()))
|
242 + | }
|
243 + |
|
244 + | fn create_histogram(
|
245 + | &self,
|
246 + | builder: InstrumentBuilder<'_, Arc<dyn Histogram>>,
|
247 + | ) -> Arc<dyn Histogram> {
|
248 + | let mut otel_builder = self.f64_histogram(builder.get_name().clone());
|
249 + | if let Some(desc) = builder.get_description() {
|
250 + | otel_builder = otel_builder.with_description(desc.clone());
|
251 + | }
|
252 + |
|
253 + | if let Some(u) = builder.get_units() {
|
254 + | otel_builder = otel_builder.with_unit(u.clone());
|
255 + | }
|
256 + |
|
257 + | Arc::new(HistogramWrap(otel_builder.init()))
|
258 + | }
|
259 + | }
|
260 + |
|
261 + | /// An OpenTelemetry based implementation of the AWS SDK's [ProvideMeter] trait
|
262 + | #[non_exhaustive]
|
263 + | #[derive(Debug)]
|
264 + | pub struct OtelMeterProvider {
|
265 + | meter_provider: OtelSdkMeterProvider,
|
266 + | }
|
267 + |
|
268 + | impl OtelMeterProvider {
|
269 + | /// Create a new [OtelMeterProvider] from an [OtelSdkMeterProvider].
|
270 + | pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self {
|
271 + | Self {
|
272 + | meter_provider: otel_meter_provider,
|
273 + | }
|
274 + | }
|
275 + |
|
276 + | /// Flush the metric pipeline.
|
277 + | pub fn flush(&self) -> Result<(), ObservabilityError> {
|
278 + | match self.meter_provider.force_flush() {
|
279 + | Ok(_) => Ok(()),
|
280 + | Err(err) => Err(ObservabilityError::new(ErrorKind::Other, err)),
|
281 + | }
|
282 + | }
|
283 + | }
|
284 + |
|
285 + | impl ProvideMeter for OtelMeterProvider {
|
286 + | fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Meter {
|
287 + | Meter::new(Arc::new(MeterWrap(self.meter_provider.meter(scope))))
|
288 + | }
|
289 + | }
|
290 + |
|
291 + | #[cfg(test)]
|
292 + | mod tests {
|
293 + |
|
294 + | use std::sync::Arc;
|
295 + |
|
296 + | use aws_smithy_observability::instruments::AsyncMeasure;
|
297 + | use aws_smithy_observability::{AttributeValue, Attributes, TelemetryProvider};
|
298 + | use opentelemetry_sdk::metrics::{
|
299 + | data::{Gauge, Histogram, Sum},
|
300 + | PeriodicReader, SdkMeterProvider,
|
301 + | };
|
302 + | use opentelemetry_sdk::runtime::Tokio;
|
303 + | use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
|
304 + |
|
305 + | use super::OtelMeterProvider;
|
306 + |
|
307 + | // Without these tokio settings this test just stalls forever on flushing the metrics pipeline
|
308 + | #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
309 + | async fn sync_instruments_work() {
|
310 + | // Create the OTel metrics objects
|
311 + | let exporter = InMemoryMetricsExporter::default();
|
312 + | let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
|
313 + | let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
|
314 + |
|
315 + | // Create the SDK metrics types from the OTel objects
|
316 + | let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
|
317 + | let sdk_ref = sdk_mp.clone();
|
318 + | let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
|
319 + |
|
320 + | // Get the dyn versions of the SDK metrics objects
|
321 + | let dyn_sdk_mp = sdk_tp.meter_provider();
|
322 + | let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
|
323 + |
|
324 + | //Create all 3 sync instruments and record some data for each
|
325 + | let mono_counter = dyn_sdk_meter
|
326 + | .create_monotonic_counter("TestMonoCounter")
|
327 + | .build();
|
328 + | mono_counter.add(4, None, None);
|
329 + | let ud_counter = dyn_sdk_meter
|
330 + | .create_up_down_counter("TestUpDownCounter")
|
331 + | .build();
|
332 + | ud_counter.add(-6, None, None);
|
333 + | let histogram = dyn_sdk_meter.create_histogram("TestHistogram").build();
|
334 + | histogram.record(1.234, None, None);
|
335 + |
|
336 + | // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
|
337 + | sdk_ref.flush().unwrap();
|
338 + |
|
339 + | // Extract the metrics from the exporter and assert that they are what we expect
|
340 + | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
341 + | let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
342 + | .data
|
343 + | .as_any()
|
344 + | .downcast_ref::<Sum<u64>>()
|
345 + | .unwrap()
|
346 + | .data_points[0]
|
347 + | .value;
|
348 + | assert_eq!(extracted_mono_counter_data, &4);
|
349 + |
|
350 + | let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
351 + | .data
|
352 + | .as_any()
|
353 + | .downcast_ref::<Sum<i64>>()
|
354 + | .unwrap()
|
355 + | .data_points[0]
|
356 + | .value;
|
357 + | assert_eq!(extracted_ud_counter_data, &-6);
|
358 + |
|
359 + | let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
360 + | .data
|
361 + | .as_any()
|
362 + | .downcast_ref::<Histogram<f64>>()
|
363 + | .unwrap()
|
364 + | .data_points[0]
|
365 + | .sum;
|
366 + | assert_eq!(extracted_histogram_data, &1.234);
|
367 + | }
|
368 + |
|
369 + | #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
370 + | async fn async_instrument_work() {
|
371 + | // Create the OTel metrics objects
|
372 + | let exporter = InMemoryMetricsExporter::default();
|
373 + | let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
|
374 + | let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
|
375 + |
|
376 + | // Create the SDK metrics types from the OTel objects
|
377 + | let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
|
378 + | let sdk_ref = sdk_mp.clone();
|
379 + | let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
|
380 + |
|
381 + | // Get the dyn versions of the SDK metrics objects
|
382 + | let dyn_sdk_mp = sdk_tp.meter_provider();
|
383 + | let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
|
384 + |
|
385 + | //Create all async instruments and record some data
|
386 + | let gauge = dyn_sdk_meter
|
387 + | .create_gauge(
|
388 + | "TestGauge".to_string(),
|
389 + | // Callback function records another value with different attributes so it is deduped
|
390 + | |measurement: &dyn AsyncMeasure<Value = f64>| {
|
391 + | let mut attrs = Attributes::new();
|
392 + | attrs.set(
|
393 + | "TestGaugeAttr",
|
394 + | AttributeValue::String("TestGaugeAttr".into()),
|
395 + | );
|
396 + | measurement.record(6.789, Some(&attrs), None);
|
397 + | },
|
398 + | )
|
399 + | .build();
|
400 + | gauge.record(1.234, None, None);
|
401 + |
|
402 + | let async_ud_counter = dyn_sdk_meter
|
403 + | .create_async_up_down_counter(
|
404 + | "TestAsyncUpDownCounter".to_string(),
|
405 + | |measurement: &dyn AsyncMeasure<Value = i64>| {
|
406 + | let mut attrs = Attributes::new();
|
407 + | attrs.set(
|
408 + | "TestAsyncUpDownCounterAttr",
|
409 + | AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
|
410 + | );
|
411 + | measurement.record(12, Some(&attrs), None);
|
412 + | },
|
413 + | )
|
414 + | .build();
|
415 + | async_ud_counter.record(-6, None, None);
|
416 + |
|
417 + | let async_mono_counter = dyn_sdk_meter
|
418 + | .create_async_monotonic_counter(
|
419 + | "TestAsyncMonoCounter".to_string(),
|
420 + | |measurement: &dyn AsyncMeasure<Value = u64>| {
|
421 + | let mut attrs = Attributes::new();
|
422 + | attrs.set(
|
423 + | "TestAsyncMonoCounterAttr",
|
424 + | AttributeValue::String("TestAsyncMonoCounterAttr".into()),
|
425 + | );
|
426 + | measurement.record(123, Some(&attrs), None);
|
427 + | },
|
428 + | )
|
429 + | .build();
|
430 + | async_mono_counter.record(4, None, None);
|
431 + |
|
432 + | // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
|
433 + | sdk_ref.flush().unwrap();
|
434 + |
|
435 + | // Extract the metrics from the exporter
|
436 + | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
437 + |
|
438 + | // Assert that the reported metrics are what we expect
|
439 + | let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
440 + | .data
|
441 + | .as_any()
|
442 + | .downcast_ref::<Gauge<f64>>()
|
443 + | .unwrap()
|
444 + | .data_points[0]
|
445 + | .value;
|
446 + | assert_eq!(extracted_gauge_data, &1.234);
|
447 + |
|
448 + | let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
449 + | .data
|
450 + | .as_any()
|
451 + | .downcast_ref::<Sum<i64>>()
|
452 + | .unwrap()
|
453 + | .data_points[0]
|
454 + | .value;
|
455 + | assert_eq!(extracted_async_ud_counter_data, &-6);
|
456 + |
|
457 + | let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
458 + | .data
|
459 + | .as_any()
|
460 + | .downcast_ref::<Sum<u64>>()
|
461 + | .unwrap()
|
462 + | .data_points[0]
|
463 + | .value;
|
464 + | assert_eq!(extracted_async_mono_data, &4);
|
465 + |
|
466 + | // Assert that the async callbacks ran
|
467 + | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
468 + | let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
469 + | .data
|
470 + | .as_any()
|
471 + | .downcast_ref::<Gauge<f64>>()
|
472 + | .unwrap()
|
473 + | .data_points[1]
|
474 + | .value;
|
475 + | assert_eq!(extracted_gauge_data, &6.789);
|
476 + |
|
477 + | let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
478 + | .data
|
479 + | .as_any()
|
480 + | .downcast_ref::<Sum<i64>>()
|
481 + | .unwrap()
|
482 + | .data_points[1]
|
483 + | .value;
|
484 + | assert_eq!(extracted_async_ud_counter_data, &12);
|
485 + |
|
486 + | let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
487 + | .data
|
488 + | .as_any()
|
489 + | .downcast_ref::<Sum<u64>>()
|
490 + | .unwrap()
|
491 + | .data_points[1]
|
492 + | .value;
|
493 + | assert_eq!(extracted_async_mono_data, &123);
|
494 + | }
|
495 + | }
|