aws_smithy_runtime/client/
metrics.rs1use aws_smithy_async::time::{SharedTimeSource, TimeSource};
7use aws_smithy_observability::{
8 global::get_telemetry_provider, instruments::Histogram, AttributeValue, Attributes,
9 ObservabilityError,
10};
11use aws_smithy_runtime_api::client::{
12 interceptors::{dyn_dispatch_hint, Intercept, SharedInterceptor},
13 orchestrator::Metadata,
14 runtime_components::RuntimeComponentsBuilder,
15 runtime_plugin::RuntimePlugin,
16};
17use aws_smithy_types::config_bag::{FrozenLayer, Layer, Storable, StoreReplace};
18use std::{borrow::Cow, sync::Arc, time::SystemTime};
19
20#[derive(Debug, Clone)]
22pub(crate) struct MeasurementsContainer {
23 call_start: SystemTime,
24 attempts: u32,
25 attempt_start: SystemTime,
26}
27
28impl Storable for MeasurementsContainer {
29 type Storer = StoreReplace<Self>;
30}
31
32#[derive(Debug, Clone)]
34pub(crate) struct OperationTelemetry {
35 pub(crate) operation_duration: Arc<dyn Histogram>,
36 pub(crate) attempt_duration: Arc<dyn Histogram>,
37}
38
39impl OperationTelemetry {
40 pub(crate) fn new(scope: &'static str) -> Result<Self, ObservabilityError> {
41 let meter = get_telemetry_provider()?
42 .meter_provider()
43 .get_meter(scope, None);
44
45 Ok(Self{
46 operation_duration: meter
47 .create_histogram("smithy.client.call.duration")
48 .set_units("s")
49 .set_description("Overall call duration (including retries and time to send or receive request and response body)")
50 .build(),
51 attempt_duration: meter
52 .create_histogram("smithy.client.call.attempt.duration")
53 .set_units("s")
54 .set_description("The time it takes to connect to the service, send the request, and get back HTTP status code and headers (including time queued waiting to be sent)")
55 .build(),
56 })
57 }
58}
59
60impl Storable for OperationTelemetry {
61 type Storer = StoreReplace<Self>;
62}
63
64#[derive(Debug)]
65pub(crate) struct MetricsInterceptor {
66 time_source: SharedTimeSource,
70}
71
72impl MetricsInterceptor {
73 pub(crate) fn new(time_source: SharedTimeSource) -> Result<Self, ObservabilityError> {
74 Ok(MetricsInterceptor { time_source })
75 }
76
77 pub(crate) fn get_attrs_from_cfg(
78 &self,
79 cfg: &aws_smithy_types::config_bag::ConfigBag,
80 ) -> Option<Attributes> {
81 let operation_metadata = cfg.load::<Metadata>();
82
83 if let Some(md) = operation_metadata {
84 let mut attributes = Attributes::new();
85 attributes.set("rpc.service", AttributeValue::String(md.service().into()));
86 attributes.set("rpc.method", AttributeValue::String(md.name().into()));
87
88 Some(attributes)
89 } else {
90 None
91 }
92 }
93
94 pub(crate) fn get_measurements_and_instruments<'a>(
95 &self,
96 cfg: &'a aws_smithy_types::config_bag::ConfigBag,
97 ) -> (&'a MeasurementsContainer, &'a OperationTelemetry) {
98 let measurements = cfg
99 .load::<MeasurementsContainer>()
100 .expect("set in `read_before_execution`");
101
102 let instruments = cfg
103 .load::<OperationTelemetry>()
104 .expect("set in RuntimePlugin");
105
106 (measurements, instruments)
107 }
108}
109
110#[dyn_dispatch_hint]
111impl Intercept for MetricsInterceptor {
112 fn name(&self) -> &'static str {
113 "MetricsInterceptor"
114 }
115
116 fn read_before_execution(
117 &self,
118 _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeSerializationInterceptorContextRef<'_>,
119 cfg: &mut aws_smithy_types::config_bag::ConfigBag,
120 ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
121 cfg.interceptor_state().store_put(MeasurementsContainer {
122 call_start: self.time_source.now(),
123 attempts: 0,
124 attempt_start: SystemTime::UNIX_EPOCH,
125 });
126
127 Ok(())
128 }
129
130 fn read_after_execution(
131 &self,
132 _context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>,
133 _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
134 cfg: &mut aws_smithy_types::config_bag::ConfigBag,
135 ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
136 let (measurements, instruments) = self.get_measurements_and_instruments(cfg);
137
138 let attributes = self.get_attrs_from_cfg(cfg);
139
140 if let Some(attrs) = attributes {
141 let call_end = self.time_source.now();
142 let call_duration = call_end.duration_since(measurements.call_start);
143 if let Ok(elapsed) = call_duration {
144 instruments
145 .operation_duration
146 .record(elapsed.as_secs_f64(), Some(&attrs), None);
147 }
148 }
149
150 Ok(())
151 }
152
153 fn read_before_attempt(
154 &self,
155 _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextRef<'_>,
156 _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
157 cfg: &mut aws_smithy_types::config_bag::ConfigBag,
158 ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
159 let measurements = cfg
160 .get_mut::<MeasurementsContainer>()
161 .expect("set in `read_before_execution`");
162
163 measurements.attempts += 1;
164 measurements.attempt_start = self.time_source.now();
165
166 Ok(())
167 }
168
169 fn read_after_attempt(
170 &self,
171 _context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>,
172 _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
173 cfg: &mut aws_smithy_types::config_bag::ConfigBag,
174 ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
175 let (measurements, instruments) = self.get_measurements_and_instruments(cfg);
176
177 let attempt_end = self.time_source.now();
178 let attempt_duration = attempt_end.duration_since(measurements.attempt_start);
179 let attributes = self.get_attrs_from_cfg(cfg);
180
181 if let (Ok(elapsed), Some(mut attrs)) = (attempt_duration, attributes) {
182 attrs.set("attempt", AttributeValue::I64(measurements.attempts.into()));
183
184 instruments
185 .attempt_duration
186 .record(elapsed.as_secs_f64(), Some(&attrs), None);
187 }
188 Ok(())
189 }
190}
191
192#[derive(Debug, Default)]
194pub struct MetricsRuntimePlugin {
195 scope: &'static str,
196 time_source: SharedTimeSource,
197 metadata: Option<Metadata>,
198}
199
200impl MetricsRuntimePlugin {
201 pub fn builder() -> MetricsRuntimePluginBuilder {
203 MetricsRuntimePluginBuilder::default()
204 }
205}
206
207impl RuntimePlugin for MetricsRuntimePlugin {
208 fn runtime_components(
209 &self,
210 _current_components: &RuntimeComponentsBuilder,
211 ) -> Cow<'_, RuntimeComponentsBuilder> {
212 let interceptor = MetricsInterceptor::new(self.time_source.clone());
213 if let Ok(interceptor) = interceptor {
214 Cow::Owned(
215 RuntimeComponentsBuilder::new("Metrics")
216 .with_interceptor(SharedInterceptor::permanent(interceptor)),
217 )
218 } else {
219 Cow::Owned(RuntimeComponentsBuilder::new("Metrics"))
220 }
221 }
222
223 fn config(&self) -> Option<FrozenLayer> {
224 let instruments = OperationTelemetry::new(self.scope);
225
226 if let Ok(instruments) = instruments {
227 let mut cfg = Layer::new("Metrics");
228 cfg.store_put(instruments);
229
230 if let Some(metadata) = &self.metadata {
231 cfg.store_put(metadata.clone());
232 }
233
234 Some(cfg.freeze())
235 } else {
236 None
237 }
238 }
239}
240
241#[derive(Debug, Default)]
243pub struct MetricsRuntimePluginBuilder {
244 scope: Option<&'static str>,
245 time_source: Option<SharedTimeSource>,
246 metadata: Option<Metadata>,
247}
248
249impl MetricsRuntimePluginBuilder {
250 pub fn with_scope(mut self, scope: &'static str) -> Self {
252 self.scope = Some(scope);
253 self
254 }
255
256 pub fn with_time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
258 self.time_source = Some(SharedTimeSource::new(time_source));
259 self
260 }
261
262 pub fn with_metadata(mut self, metadata: Metadata) -> Self {
267 self.metadata = Some(metadata);
268 self
269 }
270
271 pub fn build(
273 self,
274 ) -> Result<MetricsRuntimePlugin, aws_smithy_runtime_api::box_error::BoxError> {
275 if let Some(scope) = self.scope {
276 Ok(MetricsRuntimePlugin {
277 scope,
278 time_source: self.time_source.unwrap_or_default(),
279 metadata: self.metadata,
280 })
281 } else {
282 Err("Scope is required for MetricsRuntimePlugin.".into())
283 }
284 }
285}