1 + | /*
|
2 + | * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
3 + | * SPDX-License-Identifier: Apache-2.0
|
4 + | */
|
5 + |
|
6 + | //! OpenTelemetry based implementations of the Smithy Observability Meter traits.
|
7 + |
|
8 + | use std::ops::Deref;
|
9 + |
|
10 + | use crate::attributes::kv_from_option_attr;
|
11 + | use aws_smithy_observability::attributes::{Attributes, Context};
|
12 + | use aws_smithy_observability::error::{ErrorKind, ObservabilityError};
|
13 + | pub use aws_smithy_observability::meter::{
|
14 + | AsyncMeasurement, Histogram, Meter, MeterProvider, MonotonicCounter, UpDownCounter,
|
15 + | };
|
16 + | pub use aws_smithy_observability::provider::TelemetryProvider;
|
17 + | use opentelemetry::metrics::{
|
18 + | AsyncInstrument as OtelAsyncInstrument, Counter as OtelCounter, Histogram as OtelHistogram,
|
19 + | Meter as OtelMeter, MeterProvider as OtelMeterProvider,
|
20 + | ObservableCounter as OtelObservableCounter, ObservableGauge as OtelObservableGauge,
|
21 + | ObservableUpDownCounter as OtelObservableUpDownCounter, UpDownCounter as OtelUpDownCounter,
|
22 + | };
|
23 + | use opentelemetry_sdk::metrics::SdkMeterProvider as OtelSdkMeterProvider;
|
24 + |
|
25 + | struct UpDownCounterWrap(OtelUpDownCounter<i64>);
|
26 + | impl UpDownCounter for UpDownCounterWrap {
|
27 + | fn add(&self, value: i64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
|
28 + | self.0.add(value, &kv_from_option_attr(attributes));
|
29 + | }
|
30 + | }
|
31 + |
|
32 + | struct HistogramWrap(OtelHistogram<f64>);
|
33 + | impl Histogram for HistogramWrap {
|
34 + | fn record(&self, value: f64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
|
35 + | self.0.record(value, &kv_from_option_attr(attributes));
|
36 + | }
|
37 + | }
|
38 + |
|
39 + | struct MonotonicCounterWrap(OtelCounter<u64>);
|
40 + | impl MonotonicCounter for MonotonicCounterWrap {
|
41 + | fn add(&self, value: u64, attributes: Option<&Attributes>, _context: Option<&dyn Context>) {
|
42 + | self.0.add(value, &kv_from_option_attr(attributes));
|
43 + | }
|
44 + | }
|
45 + |
|
46 + | struct GaugeWrap(OtelObservableGauge<f64>);
|
47 + | impl AsyncMeasurement for GaugeWrap {
|
48 + | type Value = f64;
|
49 + |
|
50 + | fn record(
|
51 + | &self,
|
52 + | value: Self::Value,
|
53 + | attributes: Option<&Attributes>,
|
54 + | _context: Option<&dyn Context>,
|
55 + | ) {
|
56 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
57 + | }
|
58 + |
|
59 + | // OTel rust does not currently support unregistering callbacks
|
60 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
61 + | fn stop(&self) {}
|
62 + | }
|
63 + |
|
64 + | struct AsyncUpDownCounterWrap(OtelObservableUpDownCounter<i64>);
|
65 + | impl AsyncMeasurement for AsyncUpDownCounterWrap {
|
66 + | type Value = i64;
|
67 + |
|
68 + | fn record(
|
69 + | &self,
|
70 + | value: Self::Value,
|
71 + | attributes: Option<&Attributes>,
|
72 + | _context: Option<&dyn Context>,
|
73 + | ) {
|
74 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
75 + | }
|
76 + |
|
77 + | // OTel rust does not currently support unregistering callbacks
|
78 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
79 + | fn stop(&self) {}
|
80 + | }
|
81 + |
|
82 + | struct AsyncMonotonicCounterWrap(OtelObservableCounter<u64>);
|
83 + | impl AsyncMeasurement for AsyncMonotonicCounterWrap {
|
84 + | type Value = u64;
|
85 + |
|
86 + | fn record(
|
87 + | &self,
|
88 + | value: Self::Value,
|
89 + | attributes: Option<&Attributes>,
|
90 + | _context: Option<&dyn Context>,
|
91 + | ) {
|
92 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
93 + | }
|
94 + |
|
95 + | // OTel rust does not currently support unregistering callbacks
|
96 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
97 + | fn stop(&self) {}
|
98 + | }
|
99 + |
|
100 + | struct AsyncInstrumentWrap<'a, T>(&'a (dyn OtelAsyncInstrument<T> + Send + Sync));
|
101 + | impl<T> AsyncMeasurement for AsyncInstrumentWrap<'_, T> {
|
102 + | type Value = T;
|
103 + |
|
104 + | fn record(
|
105 + | &self,
|
106 + | value: Self::Value,
|
107 + | attributes: Option<&Attributes>,
|
108 + | _context: Option<&dyn Context>,
|
109 + | ) {
|
110 + | self.0.observe(value, &kv_from_option_attr(attributes));
|
111 + | }
|
112 + |
|
113 + | // OTel rust does not currently support unregistering callbacks
|
114 + | // https://github.com/open-telemetry/opentelemetry-rust/issues/2245
|
115 + | fn stop(&self) {}
|
116 + | }
|
117 + |
|
118 + | struct MeterWrap(OtelMeter);
|
119 + | impl Deref for MeterWrap {
|
120 + | type Target = OtelMeter;
|
121 + |
|
122 + | fn deref(&self) -> &Self::Target {
|
123 + | &self.0
|
124 + | }
|
125 + | }
|
126 + |
|
127 + | impl Meter for MeterWrap {
|
128 + | fn create_gauge(
|
129 + | &self,
|
130 + | name: String,
|
131 + | callback: Box<dyn Fn(&dyn AsyncMeasurement<Value = f64>) + Send + Sync>,
|
132 + | units: Option<String>,
|
133 + | description: Option<String>,
|
134 + | ) -> Box<dyn AsyncMeasurement<Value = f64>> {
|
135 + | let mut builder = self.f64_observable_gauge(name).with_callback(
|
136 + | move |input: &dyn OtelAsyncInstrument<f64>| {
|
137 + | callback(&AsyncInstrumentWrap(input));
|
138 + | },
|
139 + | );
|
140 + |
|
141 + | if let Some(desc) = description {
|
142 + | builder = builder.with_description(desc);
|
143 + | }
|
144 + |
|
145 + | if let Some(u) = units {
|
146 + | builder = builder.with_unit(u);
|
147 + | }
|
148 + |
|
149 + | Box::new(GaugeWrap(builder.init()))
|
150 + | }
|
151 + |
|
152 + | fn create_up_down_counter(
|
153 + | &self,
|
154 + | name: String,
|
155 + | units: Option<String>,
|
156 + | description: Option<String>,
|
157 + | ) -> Box<dyn UpDownCounter> {
|
158 + | let mut builder = self.i64_up_down_counter(name);
|
159 + | if let Some(desc) = description {
|
160 + | builder = builder.with_description(desc);
|
161 + | }
|
162 + |
|
163 + | if let Some(u) = units {
|
164 + | builder = builder.with_unit(u);
|
165 + | }
|
166 + |
|
167 + | Box::new(UpDownCounterWrap(builder.init()))
|
168 + | }
|
169 + |
|
170 + | fn create_async_up_down_counter(
|
171 + | &self,
|
172 + | name: String,
|
173 + | callback: Box<dyn Fn(&dyn AsyncMeasurement<Value = i64>) + Send + Sync>,
|
174 + | units: Option<String>,
|
175 + | description: Option<String>,
|
176 + | ) -> Box<dyn AsyncMeasurement<Value = i64>> {
|
177 + | let mut builder = self.i64_observable_up_down_counter(name).with_callback(
|
178 + | move |input: &dyn OtelAsyncInstrument<i64>| {
|
179 + | callback(&AsyncInstrumentWrap(input));
|
180 + | },
|
181 + | );
|
182 + |
|
183 + | if let Some(desc) = description {
|
184 + | builder = builder.with_description(desc);
|
185 + | }
|
186 + |
|
187 + | if let Some(u) = units {
|
188 + | builder = builder.with_unit(u);
|
189 + | }
|
190 + |
|
191 + | Box::new(AsyncUpDownCounterWrap(builder.init()))
|
192 + | }
|
193 + |
|
194 + | fn create_monotonic_counter(
|
195 + | &self,
|
196 + | name: String,
|
197 + | units: Option<String>,
|
198 + | description: Option<String>,
|
199 + | ) -> Box<dyn MonotonicCounter> {
|
200 + | let mut builder = self.u64_counter(name);
|
201 + | if let Some(desc) = description {
|
202 + | builder = builder.with_description(desc);
|
203 + | }
|
204 + |
|
205 + | if let Some(u) = units {
|
206 + | builder = builder.with_unit(u);
|
207 + | }
|
208 + |
|
209 + | Box::new(MonotonicCounterWrap(builder.init()))
|
210 + | }
|
211 + |
|
212 + | fn create_async_monotonic_counter(
|
213 + | &self,
|
214 + | name: String,
|
215 + | callback: Box<dyn Fn(&dyn AsyncMeasurement<Value = u64>) + Send + Sync>,
|
216 + | units: Option<String>,
|
217 + | description: Option<String>,
|
218 + | ) -> Box<dyn AsyncMeasurement<Value = u64>> {
|
219 + | let mut builder = self.u64_observable_counter(name).with_callback(
|
220 + | move |input: &dyn OtelAsyncInstrument<u64>| {
|
221 + | callback(&AsyncInstrumentWrap(input));
|
222 + | },
|
223 + | );
|
224 + |
|
225 + | if let Some(desc) = description {
|
226 + | builder = builder.with_description(desc);
|
227 + | }
|
228 + |
|
229 + | if let Some(u) = units {
|
230 + | builder = builder.with_unit(u);
|
231 + | }
|
232 + |
|
233 + | Box::new(AsyncMonotonicCounterWrap(builder.init()))
|
234 + | }
|
235 + |
|
236 + | fn create_histogram(
|
237 + | &self,
|
238 + | name: String,
|
239 + | units: Option<String>,
|
240 + | description: Option<String>,
|
241 + | ) -> Box<dyn Histogram> {
|
242 + | let mut builder = self.f64_histogram(name);
|
243 + | if let Some(desc) = description {
|
244 + | builder = builder.with_description(desc);
|
245 + | }
|
246 + |
|
247 + | if let Some(u) = units {
|
248 + | builder = builder.with_unit(u);
|
249 + | }
|
250 + |
|
251 + | Box::new(HistogramWrap(builder.init()))
|
252 + | }
|
253 + | }
|
254 + |
|
255 + | /// An OpenTelemetry based implementation of the AWS SDK's [MeterProvider] trait
|
256 + | #[non_exhaustive]
|
257 + | #[derive(Debug)]
|
258 + | pub struct AwsSdkOtelMeterProvider {
|
259 + | meter_provider: OtelSdkMeterProvider,
|
260 + | }
|
261 + |
|
262 + | impl AwsSdkOtelMeterProvider {
|
263 + | /// Create a new [AwsSdkOtelMeterProvider] from an [OtelSdkMeterProvider].
|
264 + | pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self {
|
265 + | Self {
|
266 + | meter_provider: otel_meter_provider,
|
267 + | }
|
268 + | }
|
269 + | }
|
270 + |
|
271 + | impl MeterProvider for AwsSdkOtelMeterProvider {
|
272 + | fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Box<dyn Meter> {
|
273 + | Box::new(MeterWrap(self.meter_provider.meter(scope)))
|
274 + | }
|
275 + |
|
276 + | fn flush(&self) -> Result<(), ObservabilityError> {
|
277 + | match self.meter_provider.force_flush() {
|
278 + | Ok(_) => Ok(()),
|
279 + | Err(err) => Err(ObservabilityError::new(ErrorKind::MetricsFlush, err)),
|
280 + | }
|
281 + | }
|
282 + |
|
283 + | fn shutdown(&self) -> Result<(), ObservabilityError> {
|
284 + | match self.meter_provider.force_flush() {
|
285 + | Ok(_) => Ok(()),
|
286 + | Err(err) => Err(ObservabilityError::new(ErrorKind::MetricsShutdown, err)),
|
287 + | }
|
288 + | }
|
289 + | }
|
290 + |
|
291 + | #[cfg(test)]
|
292 + | mod tests {
|
293 + |
|
294 + | use aws_smithy_observability::attributes::{AttributeValue, Attributes};
|
295 + | use aws_smithy_observability::meter::AsyncMeasurement;
|
296 + | use aws_smithy_observability::provider::TelemetryProvider;
|
297 + | use opentelemetry_sdk::metrics::{
|
298 + | data::{Gauge, Histogram, Sum},
|
299 + | PeriodicReader, SdkMeterProvider,
|
300 + | };
|
301 + | use opentelemetry_sdk::runtime::Tokio;
|
302 + | use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
|
303 + |
|
304 + | use super::AwsSdkOtelMeterProvider;
|
305 + |
|
306 + | // Without these tokio settings this test just stalls forever on flushing the metrics pipeline
|
307 + | #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
308 + | async fn sync_instruments_work() {
|
309 + | // Create the OTel metrics objects
|
310 + | let exporter = InMemoryMetricsExporter::default();
|
311 + | let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
|
312 + | let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
|
313 + |
|
314 + | // Create the SDK metrics types from the OTel objects
|
315 + | let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp);
|
316 + | let sdk_tp = TelemetryProvider::builder()
|
317 + | .meter_provider(Box::new(sdk_mp))
|
318 + | .build();
|
319 + |
|
320 + | // Get the dyn versions of the SDK metrics objects
|
321 + | let dyn_sdk_mp = sdk_tp.meter_provider();
|
322 + | let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
|
323 + |
|
324 + | //Create all 3 sync instruments and record some data for each
|
325 + | let mono_counter =
|
326 + | dyn_sdk_meter.create_monotonic_counter("TestMonoCounter".to_string(), None, None);
|
327 + | mono_counter.add(4, None, None);
|
328 + | let ud_counter =
|
329 + | dyn_sdk_meter.create_up_down_counter("TestUpDownCounter".to_string(), None, None);
|
330 + | ud_counter.add(-6, None, None);
|
331 + | let histogram = dyn_sdk_meter.create_histogram("TestHistogram".to_string(), None, None);
|
332 + | histogram.record(1.234, None, None);
|
333 + |
|
334 + | // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
|
335 + | dyn_sdk_mp.shutdown().unwrap();
|
336 + |
|
337 + | // Extract the metrics from the exporter and assert that they are what we expect
|
338 + | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
339 + | let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
340 + | .data
|
341 + | .as_any()
|
342 + | .downcast_ref::<Sum<u64>>()
|
343 + | .unwrap()
|
344 + | .data_points[0]
|
345 + | .value;
|
346 + | assert_eq!(extracted_mono_counter_data, &4);
|
347 + |
|
348 + | let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
349 + | .data
|
350 + | .as_any()
|
351 + | .downcast_ref::<Sum<i64>>()
|
352 + | .unwrap()
|
353 + | .data_points[0]
|
354 + | .value;
|
355 + | assert_eq!(extracted_ud_counter_data, &-6);
|
356 + |
|
357 + | let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
358 + | .data
|
359 + | .as_any()
|
360 + | .downcast_ref::<Histogram<f64>>()
|
361 + | .unwrap()
|
362 + | .data_points[0]
|
363 + | .sum;
|
364 + | assert_eq!(extracted_histogram_data, &1.234);
|
365 + | }
|
366 + |
|
367 + | #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
368 + | async fn async_instrument_work() {
|
369 + | // Create the OTel metrics objects
|
370 + | let exporter = InMemoryMetricsExporter::default();
|
371 + | let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
|
372 + | let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
|
373 + |
|
374 + | // Create the SDK metrics types from the OTel objects
|
375 + | let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp);
|
376 + | let sdk_tp = TelemetryProvider::builder()
|
377 + | .meter_provider(Box::new(sdk_mp))
|
378 + | .build();
|
379 + |
|
380 + | // Get the dyn versions of the SDK metrics objects
|
381 + | let dyn_sdk_mp = sdk_tp.meter_provider();
|
382 + | let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
|
383 + |
|
384 + | //Create all async instruments and record some data
|
385 + | let gauge = dyn_sdk_meter.create_gauge(
|
386 + | "TestGauge".to_string(),
|
387 + | // Callback function records another value with different attributes so it is deduped
|
388 + | Box::new(|measurement: &dyn AsyncMeasurement<Value = f64>| {
|
389 + | let mut attrs = Attributes::new();
|
390 + | attrs.set(
|
391 + | "TestGaugeAttr".into(),
|
392 + | AttributeValue::String("TestGaugeAttr".into()),
|
393 + | );
|
394 + | measurement.record(6.789, Some(&attrs), None);
|
395 + | }),
|
396 + | None,
|
397 + | None,
|
398 + | );
|
399 + | gauge.record(1.234, None, None);
|
400 + |
|
401 + | let async_ud_counter = dyn_sdk_meter.create_async_up_down_counter(
|
402 + | "TestAsyncUpDownCounter".to_string(),
|
403 + | Box::new(|measurement: &dyn AsyncMeasurement<Value = i64>| {
|
404 + | let mut attrs = Attributes::new();
|
405 + | attrs.set(
|
406 + | "TestAsyncUpDownCounterAttr".into(),
|
407 + | AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
|
408 + | );
|
409 + | measurement.record(12, Some(&attrs), None);
|
410 + | }),
|
411 + | None,
|
412 + | None,
|
413 + | );
|
414 + | async_ud_counter.record(-6, None, None);
|
415 + |
|
416 + | let async_mono_counter = dyn_sdk_meter.create_async_monotonic_counter(
|
417 + | "TestAsyncMonoCounter".to_string(),
|
418 + | Box::new(|measurement: &dyn AsyncMeasurement<Value = u64>| {
|
419 + | let mut attrs = Attributes::new();
|
420 + | attrs.set(
|
421 + | "TestAsyncMonoCounterAttr".into(),
|
422 + | AttributeValue::String("TestAsyncMonoCounterAttr".into()),
|
423 + | );
|
424 + | measurement.record(123, Some(&attrs), None);
|
425 + | }),
|
426 + | None,
|
427 + | None,
|
428 + | );
|
429 + | async_mono_counter.record(4, None, None);
|
430 + |
|
431 + | // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
|
432 + | dyn_sdk_mp.flush().unwrap();
|
433 + | dyn_sdk_mp.shutdown().unwrap();
|
434 + |
|
435 + | // Extract the metrics from the exporter
|
436 + | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
437 + |
|
438 + | // Assert that the reported metrics are what we expect
|
439 + | let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
440 + | .data
|
441 + | .as_any()
|
442 + | .downcast_ref::<Gauge<f64>>()
|
443 + | .unwrap()
|
444 + | .data_points[0]
|
445 + | .value;
|
446 + | assert_eq!(extracted_gauge_data, &1.234);
|
447 + |
|
448 + | let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
449 + | .data
|
450 + | .as_any()
|
451 + | .downcast_ref::<Sum<i64>>()
|
452 + | .unwrap()
|
453 + | .data_points[0]
|
454 + | .value;
|
455 + | assert_eq!(extracted_async_ud_counter_data, &-6);
|
456 + |
|
457 + | let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
458 + | .data
|
459 + | .as_any()
|
460 + | .downcast_ref::<Sum<u64>>()
|
461 + | .unwrap()
|
462 + | .data_points[0]
|
463 + | .value;
|
464 + | assert_eq!(extracted_async_mono_data, &4);
|
465 + |
|
466 + | // Assert that the async callbacks ran
|
467 + | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
468 + | let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
469 + | .data
|
470 + | .as_any()
|
471 + | .downcast_ref::<Gauge<f64>>()
|
472 + | .unwrap()
|
473 + | .data_points[1]
|
474 + | .value;
|
475 + | assert_eq!(extracted_gauge_data, &6.789);
|
476 + |
|
477 + | let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
478 + | .data
|
479 + | .as_any()
|
480 + | .downcast_ref::<Sum<i64>>()
|
481 + | .unwrap()
|
482 + | .data_points[1]
|
483 + | .value;
|
484 + | assert_eq!(extracted_async_ud_counter_data, &12);
|
485 + |
|
486 + | let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
487 + | .data
|
488 + | .as_any()
|
489 + | .downcast_ref::<Sum<u64>>()
|
490 + | .unwrap()
|
491 + | .data_points[1]
|
492 + | .value;
|
493 + | assert_eq!(extracted_async_mono_data, &123);
|
494 + | }
|
495 + | }
|