aws_smithy_runtime/client/
metrics.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use aws_smithy_async::time::{SharedTimeSource, TimeSource};
7use aws_smithy_observability::{
8    global::get_telemetry_provider, instruments::Histogram, AttributeValue, Attributes,
9    ObservabilityError,
10};
11use aws_smithy_runtime_api::client::{
12    interceptors::{dyn_dispatch_hint, Intercept, SharedInterceptor},
13    orchestrator::Metadata,
14    runtime_components::RuntimeComponentsBuilder,
15    runtime_plugin::RuntimePlugin,
16};
17use aws_smithy_types::config_bag::{FrozenLayer, Layer, Storable, StoreReplace};
18use std::{borrow::Cow, sync::Arc, time::SystemTime};
19
20/// Struct to hold metric data in the ConfigBag
21#[derive(Debug, Clone)]
22pub(crate) struct MeasurementsContainer {
23    call_start: SystemTime,
24    attempts: u32,
25    attempt_start: SystemTime,
26}
27
28impl Storable for MeasurementsContainer {
29    type Storer = StoreReplace<Self>;
30}
31
32/// Instruments for recording a single operation
33#[derive(Debug, Clone)]
34pub(crate) struct OperationTelemetry {
35    pub(crate) operation_duration: Arc<dyn Histogram>,
36    pub(crate) attempt_duration: Arc<dyn Histogram>,
37}
38
39impl OperationTelemetry {
40    pub(crate) fn new(scope: &'static str) -> Result<Self, ObservabilityError> {
41        let meter = get_telemetry_provider()?
42            .meter_provider()
43            .get_meter(scope, None);
44
45        Ok(Self{
46            operation_duration: meter
47                .create_histogram("smithy.client.call.duration")
48                .set_units("s")
49                .set_description("Overall call duration (including retries and time to send or receive request and response body)")
50                .build(),
51            attempt_duration: meter
52                .create_histogram("smithy.client.call.attempt.duration")
53                .set_units("s")
54                .set_description("The time it takes to connect to the service, send the request, and get back HTTP status code and headers (including time queued waiting to be sent)")
55                .build(),
56        })
57    }
58}
59
60impl Storable for OperationTelemetry {
61    type Storer = StoreReplace<Self>;
62}
63
64#[derive(Debug)]
65pub(crate) struct MetricsInterceptor {
66    // Holding a TimeSource here isn't ideal, but RuntimeComponents aren't available in
67    // the read_before_execution hook and that is when we need to start the timer for
68    // the operation.
69    time_source: SharedTimeSource,
70}
71
72impl MetricsInterceptor {
73    pub(crate) fn new(time_source: SharedTimeSource) -> Result<Self, ObservabilityError> {
74        Ok(MetricsInterceptor { time_source })
75    }
76
77    pub(crate) fn get_attrs_from_cfg(
78        &self,
79        cfg: &aws_smithy_types::config_bag::ConfigBag,
80    ) -> Option<Attributes> {
81        let operation_metadata = cfg.load::<Metadata>();
82
83        if let Some(md) = operation_metadata {
84            let mut attributes = Attributes::new();
85            attributes.set("rpc.service", AttributeValue::String(md.service().into()));
86            attributes.set("rpc.method", AttributeValue::String(md.name().into()));
87
88            Some(attributes)
89        } else {
90            None
91        }
92    }
93
94    pub(crate) fn get_measurements_and_instruments<'a>(
95        &self,
96        cfg: &'a aws_smithy_types::config_bag::ConfigBag,
97    ) -> (&'a MeasurementsContainer, &'a OperationTelemetry) {
98        let measurements = cfg
99            .load::<MeasurementsContainer>()
100            .expect("set in `read_before_execution`");
101
102        let instruments = cfg
103            .load::<OperationTelemetry>()
104            .expect("set in RuntimePlugin");
105
106        (measurements, instruments)
107    }
108}
109
110#[dyn_dispatch_hint]
111impl Intercept for MetricsInterceptor {
112    fn name(&self) -> &'static str {
113        "MetricsInterceptor"
114    }
115
116    fn read_before_execution(
117        &self,
118        _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeSerializationInterceptorContextRef<'_>,
119        cfg: &mut aws_smithy_types::config_bag::ConfigBag,
120    ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
121        cfg.interceptor_state().store_put(MeasurementsContainer {
122            call_start: self.time_source.now(),
123            attempts: 0,
124            attempt_start: SystemTime::UNIX_EPOCH,
125        });
126
127        Ok(())
128    }
129
130    fn read_after_execution(
131        &self,
132        _context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>,
133        _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
134        cfg: &mut aws_smithy_types::config_bag::ConfigBag,
135    ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
136        let (measurements, instruments) = self.get_measurements_and_instruments(cfg);
137
138        let attributes = self.get_attrs_from_cfg(cfg);
139
140        if let Some(attrs) = attributes {
141            let call_end = self.time_source.now();
142            let call_duration = call_end.duration_since(measurements.call_start);
143            if let Ok(elapsed) = call_duration {
144                instruments
145                    .operation_duration
146                    .record(elapsed.as_secs_f64(), Some(&attrs), None);
147            }
148        }
149
150        Ok(())
151    }
152
153    fn read_before_attempt(
154        &self,
155        _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextRef<'_>,
156        _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
157        cfg: &mut aws_smithy_types::config_bag::ConfigBag,
158    ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
159        let measurements = cfg
160            .get_mut::<MeasurementsContainer>()
161            .expect("set in `read_before_execution`");
162
163        measurements.attempts += 1;
164        measurements.attempt_start = self.time_source.now();
165
166        Ok(())
167    }
168
169    fn read_after_attempt(
170        &self,
171        _context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>,
172        _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
173        cfg: &mut aws_smithy_types::config_bag::ConfigBag,
174    ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
175        let (measurements, instruments) = self.get_measurements_and_instruments(cfg);
176
177        let attempt_end = self.time_source.now();
178        let attempt_duration = attempt_end.duration_since(measurements.attempt_start);
179        let attributes = self.get_attrs_from_cfg(cfg);
180
181        if let (Ok(elapsed), Some(mut attrs)) = (attempt_duration, attributes) {
182            attrs.set("attempt", AttributeValue::I64(measurements.attempts.into()));
183
184            instruments
185                .attempt_duration
186                .record(elapsed.as_secs_f64(), Some(&attrs), None);
187        }
188        Ok(())
189    }
190}
191
192/// Runtime plugin that adds an interceptor for collecting metrics
193#[derive(Debug, Default)]
194pub struct MetricsRuntimePlugin {
195    scope: &'static str,
196    time_source: SharedTimeSource,
197    metadata: Option<Metadata>,
198}
199
200impl MetricsRuntimePlugin {
201    /// Create a [MetricsRuntimePluginBuilder]
202    pub fn builder() -> MetricsRuntimePluginBuilder {
203        MetricsRuntimePluginBuilder::default()
204    }
205}
206
207impl RuntimePlugin for MetricsRuntimePlugin {
208    fn runtime_components(
209        &self,
210        _current_components: &RuntimeComponentsBuilder,
211    ) -> Cow<'_, RuntimeComponentsBuilder> {
212        let interceptor = MetricsInterceptor::new(self.time_source.clone());
213        if let Ok(interceptor) = interceptor {
214            Cow::Owned(
215                RuntimeComponentsBuilder::new("Metrics")
216                    .with_interceptor(SharedInterceptor::permanent(interceptor)),
217            )
218        } else {
219            Cow::Owned(RuntimeComponentsBuilder::new("Metrics"))
220        }
221    }
222
223    fn config(&self) -> Option<FrozenLayer> {
224        let instruments = OperationTelemetry::new(self.scope);
225
226        if let Ok(instruments) = instruments {
227            let mut cfg = Layer::new("Metrics");
228            cfg.store_put(instruments);
229
230            if let Some(metadata) = &self.metadata {
231                cfg.store_put(metadata.clone());
232            }
233
234            Some(cfg.freeze())
235        } else {
236            None
237        }
238    }
239}
240
241/// Builder for [MetricsRuntimePlugin]
242#[derive(Debug, Default)]
243pub struct MetricsRuntimePluginBuilder {
244    scope: Option<&'static str>,
245    time_source: Option<SharedTimeSource>,
246    metadata: Option<Metadata>,
247}
248
249impl MetricsRuntimePluginBuilder {
250    /// Set the scope for the metrics
251    pub fn with_scope(mut self, scope: &'static str) -> Self {
252        self.scope = Some(scope);
253        self
254    }
255
256    /// Set the [TimeSource] for the metrics
257    pub fn with_time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
258        self.time_source = Some(SharedTimeSource::new(time_source));
259        self
260    }
261
262    /// Set the [Metadata] for the metrics.
263    ///
264    /// Note: the Metadata is optional, most operations set it themselves, but this is useful
265    /// for operations that do not, like some of the credential providers.
266    pub fn with_metadata(mut self, metadata: Metadata) -> Self {
267        self.metadata = Some(metadata);
268        self
269    }
270
271    /// Build a [MetricsRuntimePlugin]
272    pub fn build(
273        self,
274    ) -> Result<MetricsRuntimePlugin, aws_smithy_runtime_api::box_error::BoxError> {
275        if let Some(scope) = self.scope {
276            Ok(MetricsRuntimePlugin {
277                scope,
278                time_source: self.time_source.unwrap_or_default(),
279                metadata: self.metadata,
280            })
281        } else {
282            Err("Scope is required for MetricsRuntimePlugin.".into())
283        }
284    }
285}