155 161 |
|
156 162 | if let Some(u) = builder.get_units() {
|
157 163 | otel_builder = otel_builder.with_unit(u.clone());
|
158 164 | }
|
159 165 |
|
160 166 | otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<f64>| {
|
161 167 | let f = builder.callback.clone();
|
162 168 | f(&AsyncInstrumentWrap(input));
|
163 169 | });
|
164 170 |
|
165 - | Arc::new(GaugeWrap(otel_builder.init()))
|
171 + | Arc::new(GaugeWrap(otel_builder.build()))
|
166 172 | }
|
167 173 |
|
168 174 | fn create_up_down_counter(
|
169 175 | &self,
|
170 176 | builder: InstrumentBuilder<'_, Arc<dyn UpDownCounter>>,
|
171 177 | ) -> Arc<dyn UpDownCounter> {
|
172 178 | let mut otel_builder = self.i64_up_down_counter(builder.get_name().clone());
|
173 179 | if let Some(desc) = builder.get_description() {
|
174 180 | otel_builder = otel_builder.with_description(desc.clone());
|
175 181 | }
|
176 182 |
|
177 183 | if let Some(u) = builder.get_units() {
|
178 184 | otel_builder = otel_builder.with_unit(u.clone());
|
179 185 | }
|
180 186 |
|
181 - | Arc::new(UpDownCounterWrap(otel_builder.init()))
|
187 + | Arc::new(UpDownCounterWrap(otel_builder.build()))
|
182 188 | }
|
183 189 |
|
184 190 | fn create_async_up_down_counter(
|
185 191 | &self,
|
186 192 | builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = i64>>, i64>,
|
187 193 | ) -> Arc<dyn AsyncMeasure<Value = i64>> {
|
188 194 | let mut otel_builder = self.i64_observable_up_down_counter(builder.get_name().clone());
|
189 195 |
|
190 196 | if let Some(desc) = builder.get_description() {
|
191 197 | otel_builder = otel_builder.with_description(desc.clone());
|
192 198 | }
|
193 199 |
|
194 200 | if let Some(u) = builder.get_units() {
|
195 201 | otel_builder = otel_builder.with_unit(u.clone());
|
196 202 | }
|
197 203 |
|
198 204 | otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<i64>| {
|
199 205 | let f = builder.callback.clone();
|
200 206 | f(&AsyncInstrumentWrap(input));
|
201 207 | });
|
202 208 |
|
203 - | Arc::new(AsyncUpDownCounterWrap(otel_builder.init()))
|
209 + | Arc::new(AsyncUpDownCounterWrap(otel_builder.build()))
|
204 210 | }
|
205 211 |
|
206 212 | fn create_monotonic_counter(
|
207 213 | &self,
|
208 214 | builder: InstrumentBuilder<'_, Arc<dyn MonotonicCounter>>,
|
209 215 | ) -> Arc<dyn MonotonicCounter> {
|
210 216 | let mut otel_builder = self.u64_counter(builder.get_name().clone());
|
211 217 | if let Some(desc) = builder.get_description() {
|
212 218 | otel_builder = otel_builder.with_description(desc.clone());
|
213 219 | }
|
214 220 |
|
215 221 | if let Some(u) = builder.get_units() {
|
216 222 | otel_builder = otel_builder.with_unit(u.clone());
|
217 223 | }
|
218 224 |
|
219 - | Arc::new(MonotonicCounterWrap(otel_builder.init()))
|
225 + | Arc::new(MonotonicCounterWrap(otel_builder.build()))
|
220 226 | }
|
221 227 |
|
222 228 | fn create_async_monotonic_counter(
|
223 229 | &self,
|
224 230 | builder: AsyncInstrumentBuilder<'_, Arc<dyn AsyncMeasure<Value = u64>>, u64>,
|
225 231 | ) -> Arc<dyn AsyncMeasure<Value = u64>> {
|
226 232 | let mut otel_builder = self.u64_observable_counter(builder.get_name().clone());
|
227 233 |
|
228 234 | if let Some(desc) = builder.get_description() {
|
229 235 | otel_builder = otel_builder.with_description(desc.clone());
|
230 236 | }
|
231 237 |
|
232 238 | if let Some(u) = builder.get_units() {
|
233 239 | otel_builder = otel_builder.with_unit(u.clone());
|
234 240 | }
|
235 241 |
|
236 242 | otel_builder = otel_builder.with_callback(move |input: &dyn OtelAsyncInstrument<u64>| {
|
237 243 | let f = builder.callback.clone();
|
238 244 | f(&AsyncInstrumentWrap(input));
|
239 245 | });
|
240 246 |
|
241 - | Arc::new(AsyncMonotonicCounterWrap(otel_builder.init()))
|
247 + | Arc::new(AsyncMonotonicCounterWrap(otel_builder.build()))
|
242 248 | }
|
243 249 |
|
244 250 | fn create_histogram(
|
245 251 | &self,
|
246 252 | builder: InstrumentBuilder<'_, Arc<dyn Histogram>>,
|
247 253 | ) -> Arc<dyn Histogram> {
|
248 254 | let mut otel_builder = self.f64_histogram(builder.get_name().clone());
|
249 255 | if let Some(desc) = builder.get_description() {
|
250 256 | otel_builder = otel_builder.with_description(desc.clone());
|
251 257 | }
|
252 258 |
|
253 259 | if let Some(u) = builder.get_units() {
|
254 260 | otel_builder = otel_builder.with_unit(u.clone());
|
255 261 | }
|
256 262 |
|
257 - | Arc::new(HistogramWrap(otel_builder.init()))
|
263 + | Arc::new(HistogramWrap(otel_builder.build()))
|
258 264 | }
|
259 265 | }
|
260 266 |
|
261 267 | /// An OpenTelemetry based implementation of the AWS SDK's [ProvideMeter] trait
|
262 268 | #[non_exhaustive]
|
263 269 | #[derive(Debug)]
|
264 270 | pub struct OtelMeterProvider {
|
265 271 | meter_provider: OtelSdkMeterProvider,
|
266 272 | }
|
267 273 |
|
268 274 | impl OtelMeterProvider {
|
269 275 | /// Create a new [OtelMeterProvider] from an [OtelSdkMeterProvider].
|
270 276 | pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self {
|
271 277 | Self {
|
272 278 | meter_provider: otel_meter_provider,
|
273 279 | }
|
274 280 | }
|
275 281 |
|
276 282 | /// Flush the metric pipeline.
|
277 283 | pub fn flush(&self) -> Result<(), ObservabilityError> {
|
278 284 | match self.meter_provider.force_flush() {
|
279 285 | Ok(_) => Ok(()),
|
280 286 | Err(err) => Err(ObservabilityError::new(ErrorKind::Other, err)),
|
281 287 | }
|
282 288 | }
|
283 289 | }
|
284 290 |
|
285 291 | impl ProvideMeter for OtelMeterProvider {
|
286 292 | fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Meter {
|
287 293 | Meter::new(Arc::new(MeterWrap(self.meter_provider.meter(scope))))
|
288 294 | }
|
295 + |
|
296 + | fn as_any(&self) -> &dyn std::any::Any {
|
297 + | self
|
298 + | }
|
289 299 | }
|
290 300 |
|
291 301 | #[cfg(test)]
|
292 302 | mod tests {
|
293 303 |
|
294 304 | use std::sync::Arc;
|
295 305 |
|
296 306 | use aws_smithy_observability::instruments::AsyncMeasure;
|
297 307 | use aws_smithy_observability::{AttributeValue, Attributes, TelemetryProvider};
|
298 308 | use opentelemetry_sdk::metrics::{
|
299 309 | data::{Gauge, Histogram, Sum},
|
300 310 | PeriodicReader, SdkMeterProvider,
|
301 311 | };
|
302 312 | use opentelemetry_sdk::runtime::Tokio;
|
303 - | use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter;
|
313 + | use opentelemetry_sdk::testing::metrics::InMemoryMetricExporter;
|
304 314 |
|
305 315 | use super::OtelMeterProvider;
|
306 316 |
|
307 317 | // Without these tokio settings this test just stalls forever on flushing the metrics pipeline
|
308 318 | #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
309 319 | async fn sync_instruments_work() {
|
310 320 | // Create the OTel metrics objects
|
311 - | let exporter = InMemoryMetricsExporter::default();
|
321 + | let exporter = InMemoryMetricExporter::default();
|
312 322 | let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
|
313 323 | let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
|
314 324 |
|
315 325 | // Create the SDK metrics types from the OTel objects
|
316 326 | let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
|
317 327 | let sdk_ref = sdk_mp.clone();
|
318 328 | let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
|
319 329 |
|
320 330 | // Get the dyn versions of the SDK metrics objects
|
321 331 | let dyn_sdk_mp = sdk_tp.meter_provider();
|
322 332 | let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
|
323 333 |
|
324 334 | //Create all 3 sync instruments and record some data for each
|
325 335 | let mono_counter = dyn_sdk_meter
|
326 336 | .create_monotonic_counter("TestMonoCounter")
|
327 337 | .build();
|
328 338 | mono_counter.add(4, None, None);
|
329 339 | let ud_counter = dyn_sdk_meter
|
330 340 | .create_up_down_counter("TestUpDownCounter")
|
331 341 | .build();
|
332 342 | ud_counter.add(-6, None, None);
|
333 343 | let histogram = dyn_sdk_meter.create_histogram("TestHistogram").build();
|
334 344 | histogram.record(1.234, None, None);
|
335 345 |
|
336 346 | // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
|
337 347 | sdk_ref.flush().unwrap();
|
338 348 |
|
339 349 | // Extract the metrics from the exporter and assert that they are what we expect
|
340 350 | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
341 351 | let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
342 352 | .data
|
343 353 | .as_any()
|
344 354 | .downcast_ref::<Sum<u64>>()
|
345 355 | .unwrap()
|
346 356 | .data_points[0]
|
347 357 | .value;
|
348 358 | assert_eq!(extracted_mono_counter_data, &4);
|
349 359 |
|
350 360 | let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
351 361 | .data
|
352 362 | .as_any()
|
353 363 | .downcast_ref::<Sum<i64>>()
|
354 364 | .unwrap()
|
355 365 | .data_points[0]
|
356 366 | .value;
|
357 367 | assert_eq!(extracted_ud_counter_data, &-6);
|
358 368 |
|
359 369 | let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
360 370 | .data
|
361 371 | .as_any()
|
362 372 | .downcast_ref::<Histogram<f64>>()
|
363 373 | .unwrap()
|
364 374 | .data_points[0]
|
365 375 | .sum;
|
366 376 | assert_eq!(extracted_histogram_data, &1.234);
|
367 377 | }
|
368 378 |
|
369 379 | #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
370 380 | async fn async_instrument_work() {
|
371 381 | // Create the OTel metrics objects
|
372 - | let exporter = InMemoryMetricsExporter::default();
|
382 + | let exporter = InMemoryMetricExporter::default();
|
373 383 | let reader = PeriodicReader::builder(exporter.clone(), Tokio).build();
|
374 384 | let otel_mp = SdkMeterProvider::builder().with_reader(reader).build();
|
375 385 |
|
376 386 | // Create the SDK metrics types from the OTel objects
|
377 387 | let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp));
|
378 388 | let sdk_ref = sdk_mp.clone();
|
379 389 | let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build();
|
380 390 |
|
381 391 | // Get the dyn versions of the SDK metrics objects
|
382 392 | let dyn_sdk_mp = sdk_tp.meter_provider();
|
383 393 | let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None);
|
384 394 |
|
385 - | //Create all async instruments and record some data
|
386 - | let gauge = dyn_sdk_meter
|
395 + | //Create all async instruments - in OpenTelemetry 0.27+, async instruments only work through callbacks
|
396 + | let _gauge = dyn_sdk_meter
|
387 397 | .create_gauge(
|
388 398 | "TestGauge".to_string(),
|
389 - | // Callback function records another value with different attributes so it is deduped
|
399 + | // Callback function records the value
|
390 400 | |measurement: &dyn AsyncMeasure<Value = f64>| {
|
391 401 | let mut attrs = Attributes::new();
|
392 402 | attrs.set(
|
393 403 | "TestGaugeAttr",
|
394 404 | AttributeValue::String("TestGaugeAttr".into()),
|
395 405 | );
|
396 406 | measurement.record(6.789, Some(&attrs), None);
|
397 407 | },
|
398 408 | )
|
399 409 | .build();
|
400 - | gauge.record(1.234, None, None);
|
401 410 |
|
402 - | let async_ud_counter = dyn_sdk_meter
|
411 + | let _async_ud_counter = dyn_sdk_meter
|
403 412 | .create_async_up_down_counter(
|
404 413 | "TestAsyncUpDownCounter".to_string(),
|
405 414 | |measurement: &dyn AsyncMeasure<Value = i64>| {
|
406 415 | let mut attrs = Attributes::new();
|
407 416 | attrs.set(
|
408 417 | "TestAsyncUpDownCounterAttr",
|
409 418 | AttributeValue::String("TestAsyncUpDownCounterAttr".into()),
|
410 419 | );
|
411 420 | measurement.record(12, Some(&attrs), None);
|
412 421 | },
|
413 422 | )
|
414 423 | .build();
|
415 - | async_ud_counter.record(-6, None, None);
|
416 424 |
|
417 - | let async_mono_counter = dyn_sdk_meter
|
425 + | let _async_mono_counter = dyn_sdk_meter
|
418 426 | .create_async_monotonic_counter(
|
419 427 | "TestAsyncMonoCounter".to_string(),
|
420 428 | |measurement: &dyn AsyncMeasure<Value = u64>| {
|
421 429 | let mut attrs = Attributes::new();
|
422 430 | attrs.set(
|
423 431 | "TestAsyncMonoCounterAttr",
|
424 432 | AttributeValue::String("TestAsyncMonoCounterAttr".into()),
|
425 433 | );
|
426 434 | measurement.record(123, Some(&attrs), None);
|
427 435 | },
|
428 436 | )
|
429 437 | .build();
|
430 - | async_mono_counter.record(4, None, None);
|
431 438 |
|
432 439 | // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline
|
433 440 | sdk_ref.flush().unwrap();
|
434 441 |
|
435 442 | // Extract the metrics from the exporter
|
436 443 | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
437 444 |
|
438 - | // Assert that the reported metrics are what we expect
|
445 + | // Assert that the async callbacks ran and recorded the expected values
|
446 + | // In OpenTelemetry 0.27+, async instruments only work through callbacks
|
439 447 | let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
440 448 | .data
|
441 449 | .as_any()
|
442 450 | .downcast_ref::<Gauge<f64>>()
|
443 451 | .unwrap()
|
444 452 | .data_points[0]
|
445 453 | .value;
|
446 - | assert_eq!(extracted_gauge_data, &1.234);
|
447 - |
|
448 - | let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
449 - | .data
|
450 - | .as_any()
|
451 - | .downcast_ref::<Sum<i64>>()
|
452 - | .unwrap()
|
453 - | .data_points[0]
|
454 - | .value;
|
455 - | assert_eq!(extracted_async_ud_counter_data, &-6);
|
456 - |
|
457 - | let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
458 - | .data
|
459 - | .as_any()
|
460 - | .downcast_ref::<Sum<u64>>()
|
461 - | .unwrap()
|
462 - | .data_points[0]
|
463 - | .value;
|
464 - | assert_eq!(extracted_async_mono_data, &4);
|
465 - |
|
466 - | // Assert that the async callbacks ran
|
467 - | let finished_metrics = exporter.get_finished_metrics().unwrap();
|
468 - | let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0]
|
469 - | .data
|
470 - | .as_any()
|
471 - | .downcast_ref::<Gauge<f64>>()
|
472 - | .unwrap()
|
473 - | .data_points[1]
|
474 - | .value;
|
475 454 | assert_eq!(extracted_gauge_data, &6.789);
|
476 455 |
|
477 456 | let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1]
|
478 457 | .data
|
479 458 | .as_any()
|
480 459 | .downcast_ref::<Sum<i64>>()
|
481 460 | .unwrap()
|
482 - | .data_points[1]
|
461 + | .data_points[0]
|
483 462 | .value;
|
484 463 | assert_eq!(extracted_async_ud_counter_data, &12);
|
485 464 |
|
486 465 | let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2]
|
487 466 | .data
|
488 467 | .as_any()
|
489 468 | .downcast_ref::<Sum<u64>>()
|
490 469 | .unwrap()
|
491 - | .data_points[1]
|
470 + | .data_points[0]
|
492 471 | .value;
|
493 472 | assert_eq!(extracted_async_mono_data, &123);
|
494 473 | }
|
495 474 | }
|