1 + | /*
|
2 + | * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
3 + | * SPDX-License-Identifier: Apache-2.0
|
4 + | */
|
5 + |
|
6 + | //! OpenTelemetry based implementations of the Smithy Observability Meter traits.
|
7 + |
|
8 + | use std::fmt::Debug;
|
9 + | use std::ops::Deref;
|
10 + | use std::sync::Arc;
|
11 + |
|
12 + | use crate::attributes::kv_from_option_attr;
|
13 + | use aws_smithy_observability::attributes::{Attributes, Context};
|
14 + | use aws_smithy_observability::error::{ErrorKind, ObservabilityError};
|
15 + | pub use aws_smithy_observability::meter::{
|
16 + | AsyncMeasure, Histogram, Meter, MonotonicCounter, ProvideMeter, UpDownCounter,
|
17 + | };
|
18 + | pub use aws_smithy_observability::provider::TelemetryProvider;
|
19 + | use opentelemetry::metrics::{
|
20 + | AsyncInstrument as OtelAsyncInstrument, Counter as OtelCounter, Histogram as OtelHistogram,
|
21 + | Meter as OtelMeter, MeterProvider as OtelMeterProvider,
|
22 + | ObservableCounter as OtelObservableCounter, ObservableGauge as OtelObservableGauge,
|
23 + | ObservableUpDownCounter as OtelObservableUpDownCounter, UpDownCounter as OtelUpDownCounter,
|
24 + | };
|
25 + | use opentelemetry_sdk::metrics::SdkMeterProvider as OtelSdkMeterProvider;
|
26 + |
|
27 + | #[derive(Debug)]
|
28 + | struct UpDownCounterWrap(OtelUpDownCounter<i64>);
|
29 + | impl UpDownCounter for UpDownCounterWrap {
|
30 + | fn add(&self, value: i64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
|
31 + | self.0.add(value, &kv_from_option_attr(attributes));
|
32 + | }
|
33 + | }
|
34 + |
|
35 + | #[derive(Debug)]
|
36 + | struct HistogramWrap(OtelHistogram<f64>);
|
37 + | impl Histogram for HistogramWrap {
|
38 + | fn record(&self, value: f64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
|
39 + | self.0.record(value, &kv_from_option_attr(attributes));
|
40 + | }
|
41 + | }
|
42 + |
|
43 + | #[derive(Debug)]
|
44 + | struct MonotonicCounterWrap(OtelCounter<u64>);
|
45 + | impl MonotonicCounter for MonotonicCounterWrap {
|
46 + | fn add(&self, value: u64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
|
47 + | self.0.add(value, &kv_from_option_attr(attributes));
|
48 + | }
|
49 + | }
|
50 + |
|
51 + | #[derive(Debug)]
|
52 + | struct GaugeWrap(OtelObservableGauge<f64>);
|
53 + | impl AsyncMeasure for GaugeWrap {
|
54 + | type Value = f64;
|
55 + |
|
56 + | fn record(
|
57 + | &self,
|
58 + | value: Self::Value,
|
59 + | attributes: Option<&Attributes>,
|
60 + | _context: Option<&dyn Context>,
|
61 + | ) {
|
62 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
63 + | }
|
64 + |
|
65 + | // OTel rust does not currently support unregistering callbacks
|
66 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
67 + | fn stop(&self) {}
|
68 + | }
|
69 + |
|
70 + | #[derive(Debug)]
|
71 + | struct AsyncUpDownCounterWrap(OtelObservableUpDownCounter<i64>);
|
72 + | impl AsyncMeasure for AsyncUpDownCounterWrap {
|
73 + | type Value = i64;
|
74 + |
|
75 + | fn record(
|
76 + | &self,
|
77 + | value: Self::Value,
|
78 + | attributes: Option<&Attributes>,
|
79 + | _context: Option<&dyn Context>,
|
80 + | ) {
|
81 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
82 + | }
|
83 + |
|
84 + | // OTel rust does not currently support unregistering callbacks
|
85 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
86 + | fn stop(&self) {}
|
87 + | }
|
88 + |
|
89 + | #[derive(Debug)]
|
90 + | struct AsyncMonotonicCounterWrap(OtelObservableCounter<u64>);
|
91 + | impl AsyncMeasure for AsyncMonotonicCounterWrap {
|
92 + | type Value = u64;
|
93 + |
|
94 + | fn record(
|
95 + | &self,
|
96 + | value: Self::Value,
|
97 + | attributes: Option<&Attributes>,
|
98 + | _context: Option<&dyn Context>,
|
99 + | ) {
|
100 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
101 + | }
|
102 + |
|
103 + | // OTel rust does not currently support unregistering callbacks
|
104 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
105 + | fn stop(&self) {}
|
106 + | }
|
107 + |
|
108 + | struct AsyncInstrumentWrap<'a, T>(&'a (dyn OtelAsyncInstrument<T> + Send + Sync));
|
109 + | impl<T> AsyncMeasure for AsyncInstrumentWrap<'_, T> {
|
110 + | type Value = T;
|
111 + |
|
112 + | fn record(
|
113 + | &self,
|
114 + | value: Self::Value,
|
115 + | attributes: Option<&Attributes>,
|
116 + | _context: Option<&dyn Context>,
|
117 + | ) {
|
118 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
119 + | }
|
120 + |
|
121 + | // OTel rust does not currently support unregistering callbacks
|
122 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
123 + | fn stop(&self) {}
|
124 + | }
|
125 + |
|
126 + | // The OtelAsyncInstrument trait does not have Debug as a supertrait, so we impl a minimal version
|
127 + | // for our wrapper struct
|
128 + | impl<T> Debug for AsyncInstrumentWrap<'_, T> {
|
129 + | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
130 + | f.debug_tuple("AsyncInstrumentWrap").finish()
|
131 + | }
|
132 + | }
|
133 + |
|
134 + | #[derive(Debug)]
|
135 + | struct MeterWrap(OtelMeter);
|
136 + | impl Deref for MeterWrap {
|
137 + | type Target = OtelMeter;
|
138 + |
|
139 + | fn deref(&self) -> &Self::Target {
|
140 + | &self.0
|
141 + | }
|
142 + | }
|
143 + |
|
144 + | impl Meter for MeterWrap {
|
145 + | fn create_gauge(
|
146 + | &self,
|
147 + | name: String,
|
148 + | callback: Box<dyn Fn(&dyn AsyncMeasure<Value = f64>) + Send + Sync>,
|
149 + | units: Option<String>,
|
150 + | description: Option<String>,
|
151 + | ) -> Arc<dyn AsyncMeasure<Value = f64>> {
|
152 + | let mut builder = self.f64_observable_gauge(name).with_callback(
|
153 + | move |input: &dyn OtelAsyncInstrument<f64>| {
|
154 + | callback(&AsyncInstrumentWrap(input));
|
155 + | },
|
156 + | );
|
157 + |
|
158 + | if let Some(desc) = description {
|
159 + | builder = builder.with_description(desc);
|
160 + | }
|
161 + |
|
162 + | if let Some(u) = units {
|
163 + | builder = builder.with_unit(u);
|
164 + | }
|
165 + |
|
166 + | Arc::new(GaugeWrap(builder.init()))
|
167 + | }
|
168 + |
|
169 + | fn create_up_down_counter(
|
170 + | &self,
|
171 + | name: String,
|
172 + | units: Option<String>,
|
173 + | description: Option<String>,
|
174 + | ) -> Arc<dyn UpDownCounter> {
|
175 + | let mut builder = self.i64_up_down_counter(name);
|
176 + | if let Some(desc) = description {
|
177 + | builder = builder.with_description(desc);
|
178 + | }
|
179 + |
|
180 + | if let Some(u) = units {
|
181 + | builder = builder.with_unit(u);
|
182 + | }
|
183 + |
|
184 + | Arc::new(UpDownCounterWrap(builder.init()))
|
185 + | }
|
186 + |
|
187 + | fn create_async_up_down_counter(
|
188 + | &self,
|
189 + | name: String,
|
190 + | callback: Box<dyn Fn(&dyn AsyncMeasure<Value = i64>) + Send + Sync>,
|
191 + | units: Option<String>,
|
192 + | description: Option<String>,
|
193 + | ) -> Arc<dyn AsyncMeasure<Value = i64>> {
|
194 + | let mut builder = self.i64_observable_up_down_counter(name).with_callback(
|
195 + | move |input: &dyn OtelAsyncInstrument<i64>| {
|
196 + | callback(&AsyncInstrumentWrap(input));
|
197 + | },
|
198 + | );
|
199 + |
|
200 + | if let Some(desc) = description {
|
201 + | builder = builder.with_description(desc);
|
202 + | }
|
203 + |
|
204 + | if let Some(u) = units {
|
205 + | builder = builder.with_unit(u);
|
206 + | }
|
207 + |
|
208 + | Arc::new(AsyncUpDownCounterWrap(builder.init()))
|
209 + | }
|
210 + |
|
211 + | fn create_monotonic_counter(
|
212 + | &self,
|
213 + | name: String,
|
214 + | units: Option<String>,
|
215 + | description: Option<String>,
|
216 + | ) -> Arc<dyn MonotonicCounter> {
|
217 + | let mut builder = self.u64_counter(name);
|
218 + | if let Some(desc) = description {
|
219 + | builder = builder.with_description(desc);
|
220 + | }
|
221 + |
|
222 + | if let Some(u) = units {
|
223 + | builder = builder.with_unit(u);
|
224 + | }
|
225 + |
|
226 + | Arc::new(MonotonicCounterWrap(builder.init()))
|
227 + | }
|
228 + |
|
229 + | fn create_async_monotonic_counter(
|
230 + | &self,
|
231 + | name: String,
|
232 + | callback: Box<dyn Fn(&dyn AsyncMeasure<Value = u64>) + Send + Sync>,
|
233 + | units: Option<String>,
|
234 + | description: Option<String>,
|
235 + | ) -> Arc<dyn AsyncMeasure<Value = u64>> {
|
236 + | let mut builder = self.u64_observable_counter(name).with_callback(
|
237 + | move |input: &dyn OtelAsyncInstrument<u64>| {
|
238 + | callback(&AsyncInstrumentWrap(input));
|
239 + | },
|
240 + | );
|
241 + |
|
242 + | if let Some(desc) = description {
|
243 + | builder = builder.with_description(desc);
|
244 + | }
|
245 + |
|
246 + | if let Some(u) = units {
|
247 + | builder = builder.with_unit(u);
|
248 + | }
|
249 + |
|
250 + | Arc::new(AsyncMonotonicCounterWrap(builder.init()))
|
251 + | }
|
252 + |
|
253 + | fn create_histogram(
|
254 + | &self,
|
255 + | name: String,
|
256 + | units: Option<String>,
|
257 + | description: Option<String>,
|
258 + | ) -> Arc<dyn Histogram> {
|
259 + | let mut builder = self.f64_histogram(name);
|
260 + | if let Some(desc) = description {
|
261 + | builder = builder.with_description(desc);
|
262 + | }
|
263 + |
|
264 + | if let Some(u) = units {
|
265 + | builder = builder.with_unit(u);
|
266 + | }
|
267 + |
|
268 + | Arc::new(HistogramWrap(builder.init()))
|
269 + | }
|
270 + | }
|
271 + |
|
272 + | /// An OpenTelemetry based implementation of the AWS SDK's [ProvideMeter] trait
|
273 + | #[non_exhaustive]
|
274 + | #[derive(Debug)]
|
275 + | pub struct AwsSdkOtelMeterProvider {
|
276 + | meter_provider: OtelSdkMeterProvider,
|
277 + | }
|
278 + |
|
279 + | impl AwsSdkOtelMeterProvider {
|
280 + | /// Create a new [AwsSdkOtelMeterProvider] from an [OtelSdkMeterProvider].
|
281 + | pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self {
|
282 + | Self {
|
283 + | meter_provider: otel_meter_provider,
|
284 + | }
|
285 + | }
|
286 + |
|
287 + | /// Flush the metric pipeline.
|
288 + | pub fn flush(&self) -> Result<(), ObservabilityError> {
|
289 + | match self.meter_provider.force_flush() {
|
290 + | Ok(_) => Ok(()),
|
291 + | Err(err) => Err(ObservabilityError::new(ErrorKind::MetricsFlush, err)),
|
292 + | }
|
293 + | }
|
294 + |
|
295 + | /// Gracefully shutdown the metric pipeline.
|
296 + | pub fn shutdown(&self) -> Result<(), ObservabilityError> {
|
297 + | match self.meter_provider.force_flush() {
|
298 + | Ok(_) => Ok(()),
|
299 + | Err(err) => Err(ObservabilityError::new(ErrorKind::MetricsShutdown, err)),
|
300 + | }
|
301 + | }
|
302 + | }
|
303 + |
|
304 + | impl ProvideMeter for AwsSdkOtelMeterProvider {
|
305 + | fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Arc<dyn Meter> {
|
306 + | Arc::new(MeterWrap(self.meter_provider.meter(scope)))
|
307 + | }
|
308 + |
|
309 + | fn as_any(&self) -> &dyn std::any::Any {
|
310 + | self
|
311 + | }
|
312 + | }
|
313 + |
|
314 + | #[cfg(test)]
|
315 + | mod tests {
|
316 + |
|
317 + | use aws_smithy_observability::attributes::{AttributeValue, Attributes};
|
318 + | use aws_smithy_observability::meter::AsyncMeasure;
|
319 + | use aws_smithy_observability::provider::TelemetryProvider;
|
320 + | use opentelemetry_sdk::metrics::{
|
321 + | data::{Gauge, Histogram, Sum},
|
322 + | PeriodicReader, SdkMeterProvider,
|
323 + | };
|
324 + | use opentelemetry_sdk::runtime::Tokio;
|
325 + | use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
|
326 + |
|
327 + | use super::AwsSdkOtelMeterProvider;
|
328 + |
|
329 + | // Without these tokio settings this test just stalls forever on flushing the metrics pipeline
|
330 + | #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
331 + | async fn sync_instruments_work() {
|
332 + | // Create the OTel metrics objects
|
333 + | let exporter = InMemoryMetricsExporter::default();
|
334 + | let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
|
335 + | let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
|
336 + |
|
337 + | // Create the SDK metrics types from the OTel objects
|
338 + | let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp);
|
339 + | let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
|
340 + |
|
341 + | // Get the dyn versions of the SDK metrics objects
|
342 + | let dyn_sdk_mp = sdk_tp.meter_provider();
|
343 + | let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
|
344 + |
|
345 + | //Create all 3 sync instruments and record some data for each
|
346 + | let mono_counter =
|
347 + | dyn_sdk_meter.create_monotonic_counter("TestMonoCounter".to_string(), None, None);
|
348 + | mono_counter.add(4, None, None);
|
349 + | let ud_counter =
|
350 + | dyn_sdk_meter.create_up_down_counter("TestUpDownCounter".to_string(), None, None);
|
351 + | ud_counter.add(-6, None, None);
|
352 + | let histogram = dyn_sdk_meter.create_histogram("TestHistogram".to_string(), None, None);
|
353 + | histogram.record(1.234, None, None);
|
354 + |
|
355 + | // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
|
356 + | dyn_sdk_mp
|
357 + | .as_any()
|
358 + | .downcast_ref::<AwsSdkOtelMeterProvider>()
|
359 + | .unwrap()
|
360 + | .shutdown()
|
361 + | .unwrap();
|
362 + |
|
363 + | // Extract the metrics from the exporter and assert that they are what we expect
|
364 + | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
365 + | let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
366 + | .data
|
367 + | .as_any()
|
368 + | .downcast_ref::<Sum<u64>>()
|
369 + | .unwrap()
|
370 + | .data_points[0]
|
371 + | .value;
|
372 + | assert_eq!(extracted_mono_counter_data, &4);
|
373 + |
|
374 + | let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
375 + | .data
|
376 + | .as_any()
|
377 + | .downcast_ref::<Sum<i64>>()
|
378 + | .unwrap()
|
379 + | .data_points[0]
|
380 + | .value;
|
381 + | assert_eq!(extracted_ud_counter_data, &-6);
|
382 + |
|
383 + | let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
384 + | .data
|
385 + | .as_any()
|
386 + | .downcast_ref::<Histogram<f64>>()
|
387 + | .unwrap()
|
388 + | .data_points[0]
|
389 + | .sum;
|
390 + | assert_eq!(extracted_histogram_data, &1.234);
|
391 + | }
|
392 + |
|
393 + | #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
394 + | async fn async_instrument_work() {
|
395 + | // Create the OTel metrics objects
|
396 + | let exporter = InMemoryMetricsExporter::default();
|
397 + | let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
|
398 + | let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
|
399 + |
|
400 + | // Create the SDK metrics types from the OTel objects
|
401 + | let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp);
|
402 + | let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
|
403 + |
|
404 + | // Get the dyn versions of the SDK metrics objects
|
405 + | let dyn_sdk_mp = sdk_tp.meter_provider();
|
406 + | let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
|
407 + |
|
408 + | //Create all async instruments and record some data
|
409 + | let gauge = dyn_sdk_meter.create_gauge(
|
410 + | "TestGauge".to_string(),
|
411 + | // Callback function records another value with different attributes so it is deduped
|
412 + | Box::new(|measurement: &dyn AsyncMeasure<Value = f64>| {
|
413 + | let mut attrs = Attributes::new();
|
414 + | attrs.set(
|
415 + | "TestGaugeAttr",
|
416 + | AttributeValue::String("TestGaugeAttr".into()),
|
417 + | );
|
418 + | measurement.record(6.789, Some(&attrs), None);
|
419 + | }),
|
420 + | None,
|
421 + | None,
|
422 + | );
|
423 + | gauge.record(1.234, None, None);
|
424 + |
|
425 + | let async_ud_counter = dyn_sdk_meter.create_async_up_down_counter(
|
426 + | "TestAsyncUpDownCounter".to_string(),
|
427 + | Box::new(|measurement: &dyn AsyncMeasure<Value = i64>| {
|
428 + | let mut attrs = Attributes::new();
|
429 + | attrs.set(
|
430 + | "TestAsyncUpDownCounterAttr",
|
431 + | AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
|
432 + | );
|
433 + | measurement.record(12, Some(&attrs), None);
|
434 + | }),
|
435 + | None,
|
436 + | None,
|
437 + | );
|
438 + | async_ud_counter.record(-6, None, None);
|
439 + |
|
440 + | let async_mono_counter = dyn_sdk_meter.create_async_monotonic_counter(
|
441 + | "TestAsyncMonoCounter".to_string(),
|
442 + | Box::new(|measurement: &dyn AsyncMeasure<Value = u64>| {
|
443 + | let mut attrs = Attributes::new();
|
444 + | attrs.set(
|
445 + | "TestAsyncMonoCounterAttr",
|
446 + | AttributeValue::String("TestAsyncMonoCounterAttr".into()),
|
447 + | );
|
448 + | measurement.record(123, Some(&attrs), None);
|
449 + | }),
|
450 + | None,
|
451 + | None,
|
452 + | );
|
453 + | async_mono_counter.record(4, None, None);
|
454 + |
|
455 + | // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
|
456 + | dyn_sdk_mp
|
457 + | .as_any()
|
458 + | .downcast_ref::<AwsSdkOtelMeterProvider>()
|
459 + | .unwrap()
|
460 + | .shutdown()
|
461 + | .unwrap();
|
462 + |
|
463 + | // Extract the metrics from the exporter
|
464 + | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
465 + |
|
466 + | // Assert that the reported metrics are what we expect
|
467 + | let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
468 + | .data
|
469 + | .as_any()
|
470 + | .downcast_ref::<Gauge<f64>>()
|
471 + | .unwrap()
|
472 + | .data_points[0]
|
473 + | .value;
|
474 + | assert_eq!(extracted_gauge_data, &1.234);
|
475 + |
|
476 + | let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
477 + | .data
|
478 + | .as_any()
|
479 + | .downcast_ref::<Sum<i64>>()
|
480 + | .unwrap()
|
481 + | .data_points[0]
|
482 + | .value;
|
483 + | assert_eq!(extracted_async_ud_counter_data, &-6);
|
484 + |
|
485 + | let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
486 + | .data
|
487 + | .as_any()
|
488 + | .downcast_ref::<Sum<u64>>()
|
489 + | .unwrap()
|
490 + | .data_points[0]
|
491 + | .value;
|
492 + | assert_eq!(extracted_async_mono_data, &4);
|
493 + |
|
494 + | // Assert that the async callbacks ran
|
495 + | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
496 + | let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
497 + | .data
|
498 + | .as_any()
|
499 + | .downcast_ref::<Gauge<f64>>()
|
500 + | .unwrap()
|
501 + | .data_points[1]
|
502 + | .value;
|
503 + | assert_eq!(extracted_gauge_data, &6.789);
|
504 + |
|
505 + | let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
506 + | .data
|
507 + | .as_any()
|
508 + | .downcast_ref::<Sum<i64>>()
|
509 + | .unwrap()
|
510 + | .data_points[1]
|
511 + | .value;
|
512 + | assert_eq!(extracted_async_ud_counter_data, &12);
|
513 + |
|
514 + | let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
515 + | .data
|
516 + | .as_any()
|
517 + | .downcast_ref::<Sum<u64>>()
|
518 + | .unwrap()
|
519 + | .data_points[1]
|
520 + | .value;
|
521 + | assert_eq!(extracted_async_mono_data, &123);
|
522 + | }
|
523 + | }
|