1 + | /*
|
2 + | * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
3 + | * SPDX-License-Identifier: Apache-2.0
|
4 + | */
|
5 + |
|
6 + | //! OpenTelemetry based implementations of the Smithy Observability Meter traits.
|
7 + |
|
8 + | use std::fmt::Debug;
|
9 + | use std::ops::Deref;
|
10 + |
|
11 + | use crate::attributes::kv_from_option_attr;
|
12 + | use aws_smithy_observability::attributes::{Attributes, Context};
|
13 + | use aws_smithy_observability::error::{ErrorKind, ObservabilityError};
|
14 + | pub use aws_smithy_observability::meter::{
|
15 + | AsyncMeasurement, Histogram, Meter, MeterProvider, MonotonicCounter, UpDownCounter,
|
16 + | };
|
17 + | pub use aws_smithy_observability::provider::TelemetryProvider;
|
18 + | use opentelemetry::metrics::{
|
19 + | AsyncInstrument as OtelAsyncInstrument, Counter as OtelCounter, Histogram as OtelHistogram,
|
20 + | Meter as OtelMeter, MeterProvider as OtelMeterProvider,
|
21 + | ObservableCounter as OtelObservableCounter, ObservableGauge as OtelObservableGauge,
|
22 + | ObservableUpDownCounter as OtelObservableUpDownCounter, UpDownCounter as OtelUpDownCounter,
|
23 + | };
|
24 + | use opentelemetry_sdk::metrics::SdkMeterProvider as OtelSdkMeterProvider;
|
25 + |
|
26 + | #[derive(Debug)]
|
27 + | struct UpDownCounterWrap(OtelUpDownCounter<i64>);
|
28 + | impl UpDownCounter for UpDownCounterWrap {
|
29 + | fn add(&self, value: i64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
|
30 + | self.0.add(value, &kv_from_option_attr(attributes));
|
31 + | }
|
32 + | }
|
33 + |
|
34 + | #[derive(Debug)]
|
35 + | struct HistogramWrap(OtelHistogram<f64>);
|
36 + | impl Histogram for HistogramWrap {
|
37 + | fn record(&self, value: f64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
|
38 + | self.0.record(value, &kv_from_option_attr(attributes));
|
39 + | }
|
40 + | }
|
41 + |
|
42 + | #[derive(Debug)]
|
43 + | struct MonotonicCounterWrap(OtelCounter<u64>);
|
44 + | impl MonotonicCounter for MonotonicCounterWrap {
|
45 + | fn add(&self, value: u64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
|
46 + | self.0.add(value, &kv_from_option_attr(attributes));
|
47 + | }
|
48 + | }
|
49 + |
|
50 + | #[derive(Debug)]
|
51 + | struct GaugeWrap(OtelObservableGauge<f64>);
|
52 + | impl AsyncMeasurement for GaugeWrap {
|
53 + | type Value = f64;
|
54 + |
|
55 + | fn record(
|
56 + | &self,
|
57 + | value: Self::Value,
|
58 + | attributes: Option<&Attributes>,
|
59 + | _context: Option<&dyn Context>,
|
60 + | ) {
|
61 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
62 + | }
|
63 + |
|
64 + | // OTel rust does not currently support unregistering callbacks
|
65 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
66 + | fn stop(&self) {}
|
67 + | }
|
68 + |
|
69 + | #[derive(Debug)]
|
70 + | struct AsyncUpDownCounterWrap(OtelObservableUpDownCounter<i64>);
|
71 + | impl AsyncMeasurement for AsyncUpDownCounterWrap {
|
72 + | type Value = i64;
|
73 + |
|
74 + | fn record(
|
75 + | &self,
|
76 + | value: Self::Value,
|
77 + | attributes: Option<&Attributes>,
|
78 + | _context: Option<&dyn Context>,
|
79 + | ) {
|
80 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
81 + | }
|
82 + |
|
83 + | // OTel rust does not currently support unregistering callbacks
|
84 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
85 + | fn stop(&self) {}
|
86 + | }
|
87 + |
|
88 + | #[derive(Debug)]
|
89 + | struct AsyncMonotonicCounterWrap(OtelObservableCounter<u64>);
|
90 + | impl AsyncMeasurement for AsyncMonotonicCounterWrap {
|
91 + | type Value = u64;
|
92 + |
|
93 + | fn record(
|
94 + | &self,
|
95 + | value: Self::Value,
|
96 + | attributes: Option<&Attributes>,
|
97 + | _context: Option<&dyn Context>,
|
98 + | ) {
|
99 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
100 + | }
|
101 + |
|
102 + | // OTel rust does not currently support unregistering callbacks
|
103 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
104 + | fn stop(&self) {}
|
105 + | }
|
106 + |
|
107 + | struct AsyncInstrumentWrap<'a, T>(&'a (dyn OtelAsyncInstrument<T> + Send + Sync));
|
108 + | impl<T> AsyncMeasurement for AsyncInstrumentWrap<'_, T> {
|
109 + | type Value = T;
|
110 + |
|
111 + | fn record(
|
112 + | &self,
|
113 + | value: Self::Value,
|
114 + | attributes: Option<&Attributes>,
|
115 + | _context: Option<&dyn Context>,
|
116 + | ) {
|
117 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
118 + | }
|
119 + |
|
120 + | // OTel rust does not currently support unregistering callbacks
|
121 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
122 + | fn stop(&self) {}
|
123 + | }
|
124 + |
|
125 + | // The OtelAsyncInstrument trait does not have Debug as a supertrait, so we impl a minimal version
|
126 + | // for our wrapper struct
|
127 + | impl<T> Debug for AsyncInstrumentWrap<'_, T> {
|
128 + | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
129 + | f.debug_tuple("AsyncInstrumentWrap").finish()
|
130 + | }
|
131 + | }
|
132 + |
|
133 + | #[derive(Debug)]
|
134 + | struct MeterWrap(OtelMeter);
|
135 + | impl Deref for MeterWrap {
|
136 + | type Target = OtelMeter;
|
137 + |
|
138 + | fn deref(&self) -> &Self::Target {
|
139 + | &self.0
|
140 + | }
|
141 + | }
|
142 + |
|
143 + | impl Meter for MeterWrap {
|
144 + | fn create_gauge(
|
145 + | &self,
|
146 + | name: String,
|
147 + | callback: Box<dyn Fn(&dyn AsyncMeasurement<Value = f64>) + Send + Sync>,
|
148 + | units: Option<String>,
|
149 + | description: Option<String>,
|
150 + | ) -> Box<dyn AsyncMeasurement<Value = f64>> {
|
151 + | let mut builder = self.f64_observable_gauge(name).with_callback(
|
152 + | move |input: &dyn OtelAsyncInstrument<f64>| {
|
153 + | callback(&AsyncInstrumentWrap(input));
|
154 + | },
|
155 + | );
|
156 + |
|
157 + | if let Some(desc) = description {
|
158 + | builder = builder.with_description(desc);
|
159 + | }
|
160 + |
|
161 + | if let Some(u) = units {
|
162 + | builder = builder.with_unit(u);
|
163 + | }
|
164 + |
|
165 + | Box::new(GaugeWrap(builder.init()))
|
166 + | }
|
167 + |
|
168 + | fn create_up_down_counter(
|
169 + | &self,
|
170 + | name: String,
|
171 + | units: Option<String>,
|
172 + | description: Option<String>,
|
173 + | ) -> Box<dyn UpDownCounter> {
|
174 + | let mut builder = self.i64_up_down_counter(name);
|
175 + | if let Some(desc) = description {
|
176 + | builder = builder.with_description(desc);
|
177 + | }
|
178 + |
|
179 + | if let Some(u) = units {
|
180 + | builder = builder.with_unit(u);
|
181 + | }
|
182 + |
|
183 + | Box::new(UpDownCounterWrap(builder.init()))
|
184 + | }
|
185 + |
|
186 + | fn create_async_up_down_counter(
|
187 + | &self,
|
188 + | name: String,
|
189 + | callback: Box<dyn Fn(&dyn AsyncMeasurement<Value = i64>) + Send + Sync>,
|
190 + | units: Option<String>,
|
191 + | description: Option<String>,
|
192 + | ) -> Box<dyn AsyncMeasurement<Value = i64>> {
|
193 + | let mut builder = self.i64_observable_up_down_counter(name).with_callback(
|
194 + | move |input: &dyn OtelAsyncInstrument<i64>| {
|
195 + | callback(&AsyncInstrumentWrap(input));
|
196 + | },
|
197 + | );
|
198 + |
|
199 + | if let Some(desc) = description {
|
200 + | builder = builder.with_description(desc);
|
201 + | }
|
202 + |
|
203 + | if let Some(u) = units {
|
204 + | builder = builder.with_unit(u);
|
205 + | }
|
206 + |
|
207 + | Box::new(AsyncUpDownCounterWrap(builder.init()))
|
208 + | }
|
209 + |
|
210 + | fn create_monotonic_counter(
|
211 + | &self,
|
212 + | name: String,
|
213 + | units: Option<String>,
|
214 + | description: Option<String>,
|
215 + | ) -> Box<dyn MonotonicCounter> {
|
216 + | let mut builder = self.u64_counter(name);
|
217 + | if let Some(desc) = description {
|
218 + | builder = builder.with_description(desc);
|
219 + | }
|
220 + |
|
221 + | if let Some(u) = units {
|
222 + | builder = builder.with_unit(u);
|
223 + | }
|
224 + |
|
225 + | Box::new(MonotonicCounterWrap(builder.init()))
|
226 + | }
|
227 + |
|
228 + | fn create_async_monotonic_counter(
|
229 + | &self,
|
230 + | name: String,
|
231 + | callback: Box<dyn Fn(&dyn AsyncMeasurement<Value = u64>) + Send + Sync>,
|
232 + | units: Option<String>,
|
233 + | description: Option<String>,
|
234 + | ) -> Box<dyn AsyncMeasurement<Value = u64>> {
|
235 + | let mut builder = self.u64_observable_counter(name).with_callback(
|
236 + | move |input: &dyn OtelAsyncInstrument<u64>| {
|
237 + | callback(&AsyncInstrumentWrap(input));
|
238 + | },
|
239 + | );
|
240 + |
|
241 + | if let Some(desc) = description {
|
242 + | builder = builder.with_description(desc);
|
243 + | }
|
244 + |
|
245 + | if let Some(u) = units {
|
246 + | builder = builder.with_unit(u);
|
247 + | }
|
248 + |
|
249 + | Box::new(AsyncMonotonicCounterWrap(builder.init()))
|
250 + | }
|
251 + |
|
252 + | fn create_histogram(
|
253 + | &self,
|
254 + | name: String,
|
255 + | units: Option<String>,
|
256 + | description: Option<String>,
|
257 + | ) -> Box<dyn Histogram> {
|
258 + | let mut builder = self.f64_histogram(name);
|
259 + | if let Some(desc) = description {
|
260 + | builder = builder.with_description(desc);
|
261 + | }
|
262 + |
|
263 + | if let Some(u) = units {
|
264 + | builder = builder.with_unit(u);
|
265 + | }
|
266 + |
|
267 + | Box::new(HistogramWrap(builder.init()))
|
268 + | }
|
269 + | }
|
270 + |
|
271 + | /// An OpenTelemetry based implementation of the AWS SDK's [MeterProvider] trait
|
272 + | #[non_exhaustive]
|
273 + | #[derive(Debug)]
|
274 + | pub struct AwsSdkOtelMeterProvider {
|
275 + | meter_provider: OtelSdkMeterProvider,
|
276 + | }
|
277 + |
|
278 + | impl AwsSdkOtelMeterProvider {
|
279 + | /// Create a new [AwsSdkOtelMeterProvider] from an [OtelSdkMeterProvider].
|
280 + | pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self {
|
281 + | Self {
|
282 + | meter_provider: otel_meter_provider,
|
283 + | }
|
284 + | }
|
285 + |
|
286 + | /// Flush the metric pipeline.
|
287 + | pub fn flush(&self) -> Result<(), ObservabilityError> {
|
288 + | match self.meter_provider.force_flush() {
|
289 + | Ok(_) => Ok(()),
|
290 + | Err(err) => Err(ObservabilityError::new(ErrorKind::MetricsFlush, err)),
|
291 + | }
|
292 + | }
|
293 + |
|
294 + | /// Gracefully shutdown the metric pipeline.
|
295 + | pub fn shutdown(&self) -> Result<(), ObservabilityError> {
|
296 + | match self.meter_provider.force_flush() {
|
297 + | Ok(_) => Ok(()),
|
298 + | Err(err) => Err(ObservabilityError::new(ErrorKind::MetricsShutdown, err)),
|
299 + | }
|
300 + | }
|
301 + | }
|
302 + |
|
303 + | impl MeterProvider for AwsSdkOtelMeterProvider {
|
304 + | fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Box<dyn Meter> {
|
305 + | Box::new(MeterWrap(self.meter_provider.meter(scope)))
|
306 + | }
|
307 + |
|
308 + | fn as_any(&self) -> &dyn std::any::Any {
|
309 + | self
|
310 + | }
|
311 + | }
|
312 + |
|
313 + | #[cfg(test)]
|
314 + | mod tests {
|
315 + |
|
316 + | use aws_smithy_observability::attributes::{AttributeValue, Attributes};
|
317 + | use aws_smithy_observability::meter::AsyncMeasurement;
|
318 + | use aws_smithy_observability::provider::TelemetryProvider;
|
319 + | use opentelemetry_sdk::metrics::{
|
320 + | data::{Gauge, Histogram, Sum},
|
321 + | PeriodicReader, SdkMeterProvider,
|
322 + | };
|
323 + | use opentelemetry_sdk::runtime::Tokio;
|
324 + | use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
|
325 + |
|
326 + | use super::AwsSdkOtelMeterProvider;
|
327 + |
|
328 + | // Without these tokio settings this test just stalls forever on flushing the metrics pipeline
|
329 + | #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
330 + | async fn sync_instruments_work() {
|
331 + | // Create the OTel metrics objects
|
332 + | let exporter = InMemoryMetricsExporter::default();
|
333 + | let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
|
334 + | let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
|
335 + |
|
336 + | // Create the SDK metrics types from the OTel objects
|
337 + | let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp);
|
338 + | let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
|
339 + |
|
340 + | // Get the dyn versions of the SDK metrics objects
|
341 + | let dyn_sdk_mp = sdk_tp.meter_provider();
|
342 + | let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
|
343 + |
|
344 + | //Create all 3 sync instruments and record some data for each
|
345 + | let mono_counter =
|
346 + | dyn_sdk_meter.create_monotonic_counter("TestMonoCounter".to_string(), None, None);
|
347 + | mono_counter.add(4, None, None);
|
348 + | let ud_counter =
|
349 + | dyn_sdk_meter.create_up_down_counter("TestUpDownCounter".to_string(), None, None);
|
350 + | ud_counter.add(-6, None, None);
|
351 + | let histogram = dyn_sdk_meter.create_histogram("TestHistogram".to_string(), None, None);
|
352 + | histogram.record(1.234, None, None);
|
353 + |
|
354 + | // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
|
355 + | dyn_sdk_mp
|
356 + | .as_any()
|
357 + | .downcast_ref::<AwsSdkOtelMeterProvider>()
|
358 + | .unwrap()
|
359 + | .shutdown()
|
360 + | .unwrap();
|
361 + |
|
362 + | // Extract the metrics from the exporter and assert that they are what we expect
|
363 + | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
364 + | let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
365 + | .data
|
366 + | .as_any()
|
367 + | .downcast_ref::<Sum<u64>>()
|
368 + | .unwrap()
|
369 + | .data_points[0]
|
370 + | .value;
|
371 + | assert_eq!(extracted_mono_counter_data, &4);
|
372 + |
|
373 + | let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
374 + | .data
|
375 + | .as_any()
|
376 + | .downcast_ref::<Sum<i64>>()
|
377 + | .unwrap()
|
378 + | .data_points[0]
|
379 + | .value;
|
380 + | assert_eq!(extracted_ud_counter_data, &-6);
|
381 + |
|
382 + | let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
383 + | .data
|
384 + | .as_any()
|
385 + | .downcast_ref::<Histogram<f64>>()
|
386 + | .unwrap()
|
387 + | .data_points[0]
|
388 + | .sum;
|
389 + | assert_eq!(extracted_histogram_data, &1.234);
|
390 + | }
|
391 + |
|
392 + | #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
393 + | async fn async_instrument_work() {
|
394 + | // Create the OTel metrics objects
|
395 + | let exporter = InMemoryMetricsExporter::default();
|
396 + | let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
|
397 + | let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
|
398 + |
|
399 + | // Create the SDK metrics types from the OTel objects
|
400 + | let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp);
|
401 + | let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
|
402 + |
|
403 + | // Get the dyn versions of the SDK metrics objects
|
404 + | let dyn_sdk_mp = sdk_tp.meter_provider();
|
405 + | let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
|
406 + |
|
407 + | //Create all async instruments and record some data
|
408 + | let gauge = dyn_sdk_meter.create_gauge(
|
409 + | "TestGauge".to_string(),
|
410 + | // Callback function records another value with different attributes so it is deduped
|
411 + | Box::new(|measurement: &dyn AsyncMeasurement<Value = f64>| {
|
412 + | let mut attrs = Attributes::new();
|
413 + | attrs.set(
|
414 + | "TestGaugeAttr",
|
415 + | AttributeValue::String("TestGaugeAttr".into()),
|
416 + | );
|
417 + | measurement.record(6.789, Some(&attrs), None);
|
418 + | }),
|
419 + | None,
|
420 + | None,
|
421 + | );
|
422 + | gauge.record(1.234, None, None);
|
423 + |
|
424 + | let async_ud_counter = dyn_sdk_meter.create_async_up_down_counter(
|
425 + | "TestAsyncUpDownCounter".to_string(),
|
426 + | Box::new(|measurement: &dyn AsyncMeasurement<Value = i64>| {
|
427 + | let mut attrs = Attributes::new();
|
428 + | attrs.set(
|
429 + | "TestAsyncUpDownCounterAttr",
|
430 + | AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
|
431 + | );
|
432 + | measurement.record(12, Some(&attrs), None);
|
433 + | }),
|
434 + | None,
|
435 + | None,
|
436 + | );
|
437 + | async_ud_counter.record(-6, None, None);
|
438 + |
|
439 + | let async_mono_counter = dyn_sdk_meter.create_async_monotonic_counter(
|
440 + | "TestAsyncMonoCounter".to_string(),
|
441 + | Box::new(|measurement: &dyn AsyncMeasurement<Value = u64>| {
|
442 + | let mut attrs = Attributes::new();
|
443 + | attrs.set(
|
444 + | "TestAsyncMonoCounterAttr",
|
445 + | AttributeValue::String("TestAsyncMonoCounterAttr".into()),
|
446 + | );
|
447 + | measurement.record(123, Some(&attrs), None);
|
448 + | }),
|
449 + | None,
|
450 + | None,
|
451 + | );
|
452 + | async_mono_counter.record(4, None, None);
|
453 + |
|
454 + | // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
|
455 + | dyn_sdk_mp
|
456 + | .as_any()
|
457 + | .downcast_ref::<AwsSdkOtelMeterProvider>()
|
458 + | .unwrap()
|
459 + | .shutdown()
|
460 + | .unwrap();
|
461 + |
|
462 + | // Extract the metrics from the exporter
|
463 + | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
464 + |
|
465 + | // Assert that the reported metrics are what we expect
|
466 + | let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
467 + | .data
|
468 + | .as_any()
|
469 + | .downcast_ref::<Gauge<f64>>()
|
470 + | .unwrap()
|
471 + | .data_points[0]
|
472 + | .value;
|
473 + | assert_eq!(extracted_gauge_data, &1.234);
|
474 + |
|
475 + | let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
476 + | .data
|
477 + | .as_any()
|
478 + | .downcast_ref::<Sum<i64>>()
|
479 + | .unwrap()
|
480 + | .data_points[0]
|
481 + | .value;
|
482 + | assert_eq!(extracted_async_ud_counter_data, &-6);
|
483 + |
|
484 + | let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
485 + | .data
|
486 + | .as_any()
|
487 + | .downcast_ref::<Sum<u64>>()
|
488 + | .unwrap()
|
489 + | .data_points[0]
|
490 + | .value;
|
491 + | assert_eq!(extracted_async_mono_data, &4);
|
492 + |
|
493 + | // Assert that the async callbacks ran
|
494 + | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
495 + | let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
496 + | .data
|
497 + | .as_any()
|
498 + | .downcast_ref::<Gauge<f64>>()
|
499 + | .unwrap()
|
500 + | .data_points[1]
|
501 + | .value;
|
502 + | assert_eq!(extracted_gauge_data, &6.789);
|
503 + |
|
504 + | let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
505 + | .data
|
506 + | .as_any()
|
507 + | .downcast_ref::<Sum<i64>>()
|
508 + | .unwrap()
|
509 + | .data_points[1]
|
510 + | .value;
|
511 + | assert_eq!(extracted_async_ud_counter_data, &12);
|
512 + |
|
513 + | let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
514 + | .data
|
515 + | .as_any()
|
516 + | .downcast_ref::<Sum<u64>>()
|
517 + | .unwrap()
|
518 + | .data_points[1]
|
519 + | .value;
|
520 + | assert_eq!(extracted_async_mono_data, &123);
|
521 + | }
|
522 + | }
|