aws_smithy_runtime/client/
orchestrator.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use crate::client::interceptors::Interceptors;
7use crate::client::orchestrator::http::{log_response_body, read_body};
8use crate::client::timeout::{MaybeTimeout, MaybeTimeoutConfig, TimeoutKind};
9use crate::client::{
10    http::body::minimum_throughput::MaybeUploadThroughputCheckFuture,
11    orchestrator::endpoints::orchestrate_endpoint,
12};
13use auth::{resolve_identity, sign_request};
14use aws_smithy_async::rt::sleep::AsyncSleep;
15use aws_smithy_runtime_api::box_error::BoxError;
16use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnector, HttpConnectorSettings};
17use aws_smithy_runtime_api::client::interceptors::context::{
18    Error, Input, InterceptorContext, Output, RewindResult,
19};
20use aws_smithy_runtime_api::client::orchestrator::{
21    HttpResponse, LoadedRequestBody, OrchestratorError,
22};
23use aws_smithy_runtime_api::client::result::SdkError;
24use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
25use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
26use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
27use aws_smithy_runtime_api::client::ser_de::{
28    DeserializeResponse, SerializeRequest, SharedRequestSerializer, SharedResponseDeserializer,
29};
30use aws_smithy_types::body::SdkBody;
31use aws_smithy_types::byte_stream::ByteStream;
32use aws_smithy_types::config_bag::ConfigBag;
33use aws_smithy_types::timeout::{MergeTimeoutConfig, TimeoutConfig};
34use endpoints::apply_endpoint;
35use std::mem;
36use tracing::{debug, debug_span, instrument, trace, Instrument};
37
38mod auth;
39pub use auth::AuthSchemeAndEndpointOrchestrationV2;
40
41/// Defines types that implement a trait for endpoint resolution
42pub mod endpoints;
43
44/// Defines types that work with HTTP types
45mod http;
46
47/// Utility for making one-off unmodeled requests with the orchestrator.
48pub mod operation;
49
50macro_rules! halt {
51    ([$ctx:ident] => $err:expr) => {{
52        debug!("encountered orchestrator error; halting");
53        $ctx.fail($err.into());
54        return;
55    }};
56}
57
58macro_rules! halt_on_err {
59    ([$ctx:ident] => $expr:expr) => {
60        match $expr {
61            Ok(ok) => ok,
62            Err(err) => halt!([$ctx] => err),
63        }
64    };
65}
66
67macro_rules! continue_on_err {
68    ([$ctx:ident] => $expr:expr) => {
69        if let Err(err) = $expr {
70            debug!(err = ?err, "encountered orchestrator error; continuing");
71            $ctx.fail(err.into());
72        }
73    };
74}
75
76macro_rules! run_interceptors {
77    (continue_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
78        $(run_interceptors!(continue_on_err: $interceptor($ctx, $rc, $cfg));)+
79    };
80    (continue_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
81        continue_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
82    };
83    (halt_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
84        $(run_interceptors!(halt_on_err: $interceptor($ctx, $rc, $cfg));)+
85    };
86    (halt_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
87        halt_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
88    };
89    (__private $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
90        Interceptors::new($rc.interceptors()).$interceptor($ctx, $rc, $cfg)
91    };
92}
93
94/// Orchestrates the execution of a request and handling of a response.
95///
96/// The given `runtime_plugins` will be used to generate a `ConfigBag` for this request,
97/// and then the given `input` will be serialized and transmitted. When a response is
98/// received, it will be deserialized and returned.
99///
100/// This orchestration handles retries, endpoint resolution, identity resolution, and signing.
101/// Each of these are configurable via the config and runtime components given by the runtime
102/// plugins.
103pub async fn invoke(
104    service_name: &str,
105    operation_name: &str,
106    input: Input,
107    runtime_plugins: &RuntimePlugins,
108) -> Result<Output, SdkError<Error, HttpResponse>> {
109    invoke_with_stop_point(
110        service_name,
111        operation_name,
112        input,
113        runtime_plugins,
114        StopPoint::None,
115    )
116    .await?
117    .finalize()
118}
119
120/// Allows for returning early at different points during orchestration.
121#[non_exhaustive]
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub enum StopPoint {
124    /// Don't stop orchestration early
125    None,
126
127    /// Stop the orchestrator before transmitting the request
128    BeforeTransmit,
129}
130
131/// Same as [`invoke`], but allows for returning early at different points during orchestration.
132///
133/// Orchestration will cease at the point specified by `stop_point`. This is useful for orchestrations
134/// that don't need to actually transmit requests, such as for generating presigned requests.
135///
136/// See the docs on [`invoke`] for more details.
137pub async fn invoke_with_stop_point(
138    // NOTE: service_name and operation_name were at one point used for instrumentation that is now
139    // handled as part of codegen. Manually constructed operations (e.g. via Operation::builder())
140    // are handled as part of Operation::invoke
141    _service_name: &str,
142    _operation_name: &str,
143    input: Input,
144    runtime_plugins: &RuntimePlugins,
145    stop_point: StopPoint,
146) -> Result<InterceptorContext, SdkError<Error, HttpResponse>> {
147    async move {
148        let mut cfg = ConfigBag::base();
149        let cfg = &mut cfg;
150
151        let mut ctx = InterceptorContext::new(input);
152
153        let runtime_components = apply_configuration(&mut ctx, cfg, runtime_plugins)
154            .map_err(SdkError::construction_failure)?;
155        trace!(runtime_components = ?runtime_components);
156
157        let operation_timeout_config =
158            MaybeTimeoutConfig::new(&runtime_components, cfg, TimeoutKind::Operation);
159        trace!(operation_timeout_config = ?operation_timeout_config);
160        async {
161            // If running the pre-execution interceptors failed, then we skip running the op and run the
162            // final interceptors instead.
163            if !ctx.is_failed() {
164                try_op(&mut ctx, cfg, &runtime_components, stop_point).await;
165            }
166            finally_op(&mut ctx, cfg, &runtime_components).await;
167            if ctx.is_failed() {
168                Err(ctx.finalize().expect_err("it is failed"))
169            } else {
170                Ok(ctx)
171            }
172        }
173        .maybe_timeout(operation_timeout_config)
174        .await
175    }
176    .await
177}
178
179/// Apply configuration is responsible for apply runtime plugins to the config bag, as well as running
180/// `read_before_execution` interceptors. If a failure occurs due to config construction, `invoke`
181/// will raise it to the user. If an interceptor fails, then `invoke`
182#[instrument(skip_all, level = "debug")]
183fn apply_configuration(
184    ctx: &mut InterceptorContext,
185    cfg: &mut ConfigBag,
186    runtime_plugins: &RuntimePlugins,
187) -> Result<RuntimeComponents, BoxError> {
188    let client_rc_builder = runtime_plugins.apply_client_configuration(cfg)?;
189    continue_on_err!([ctx] => Interceptors::new(client_rc_builder.interceptors()).read_before_execution(false, ctx, cfg));
190
191    let operation_rc_builder = runtime_plugins.apply_operation_configuration(cfg)?;
192    continue_on_err!([ctx] => Interceptors::new(operation_rc_builder.interceptors()).read_before_execution(true, ctx, cfg));
193
194    // The order below is important. Client interceptors must run before operation interceptors.
195    let components = client_rc_builder
196        .merge_from(&operation_rc_builder)
197        .build()?;
198
199    // In an ideal world, we'd simply update `cfg.load` to behave this way. Unfortunately, we can't
200    // do that without a breaking change. By overwriting the value in the config bag with a merged
201    // version, we can achieve a very similar behavior. `MergeTimeoutConfig`
202    let resolved_timeout_config = cfg.load::<MergeTimeoutConfig>();
203    debug!(
204        "timeout settings for this operation: {:?}",
205        resolved_timeout_config
206    );
207    cfg.interceptor_state().store_put(resolved_timeout_config);
208
209    components.validate_final_config(cfg)?;
210    Ok(components)
211}
212
213#[instrument(skip_all, level = "debug")]
214async fn try_op(
215    ctx: &mut InterceptorContext,
216    cfg: &mut ConfigBag,
217    runtime_components: &RuntimeComponents,
218    stop_point: StopPoint,
219) {
220    // Before serialization
221    run_interceptors!(halt_on_err: {
222        modify_before_serialization(ctx, runtime_components, cfg);
223        read_before_serialization(ctx, runtime_components, cfg);
224    });
225
226    // Serialization
227    ctx.enter_serialization_phase();
228    {
229        let _span = debug_span!("serialization").entered();
230        let request_serializer = cfg
231            .load::<SharedRequestSerializer>()
232            .expect("request serializer must be in the config bag")
233            .clone();
234        let input = ctx.take_input().expect("input set at this point");
235        let request = halt_on_err!([ctx] => request_serializer.serialize_input(input, cfg).map_err(OrchestratorError::other));
236        ctx.set_request(request);
237    }
238
239    // Load the request body into memory if configured to do so
240    if let Some(&LoadedRequestBody::Requested) = cfg.load::<LoadedRequestBody>() {
241        debug!("loading request body into memory");
242        let mut body = SdkBody::taken();
243        mem::swap(&mut body, ctx.request_mut().expect("set above").body_mut());
244        let loaded_body = halt_on_err!([ctx] =>
245            ByteStream::new(body).collect().await.map_err(OrchestratorError::other)
246        )
247        .into_bytes();
248        *ctx.request_mut().as_mut().expect("set above").body_mut() =
249            SdkBody::from(loaded_body.clone());
250        cfg.interceptor_state()
251            .store_put(LoadedRequestBody::Loaded(loaded_body));
252    }
253
254    // Before transmit
255    ctx.enter_before_transmit_phase();
256    run_interceptors!(halt_on_err: {
257        read_after_serialization(ctx, runtime_components, cfg);
258        modify_before_retry_loop(ctx, runtime_components, cfg);
259    });
260
261    // If we got a retry strategy from the bag, ask it what to do.
262    // Otherwise, assume we should attempt the initial request.
263    let should_attempt = runtime_components
264        .retry_strategy()
265        .should_attempt_initial_request(runtime_components, cfg);
266    match should_attempt {
267        // Yes, let's make a request
268        Ok(ShouldAttempt::Yes) => debug!("retry strategy has OKed initial request"),
269        // No, this request shouldn't be sent
270        Ok(ShouldAttempt::No) => {
271            let err: BoxError = "the retry strategy indicates that an initial request shouldn't be made, but it didn't specify why".into();
272            halt!([ctx] => OrchestratorError::other(err));
273        }
274        // No, we shouldn't make a request because...
275        Err(err) => halt!([ctx] => OrchestratorError::other(err)),
276        Ok(ShouldAttempt::YesAfterDelay(delay)) => {
277            let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
278                "the retry strategy requested a delay before sending the initial request, but no 'async sleep' implementation was set"
279            )));
280            debug!("retry strategy has OKed initial request after a {delay:?} delay");
281            sleep_impl.sleep(delay).await;
282        }
283    }
284
285    // Save a request checkpoint before we make the request. This will allow us to "rewind"
286    // the request in the case of retry attempts.
287    ctx.save_checkpoint();
288    let mut retry_delay = None;
289    for i in 1u32.. {
290        // Break from the loop if we can't rewind the request's state. This will always succeed the
291        // first time, but will fail on subsequent iterations if the request body wasn't retryable.
292        trace!("checking if context can be rewound for attempt #{i}");
293        if let RewindResult::Impossible = ctx.rewind(cfg) {
294            debug!("request cannot be retried since the request body cannot be cloned");
295            break;
296        }
297        // Track which attempt we're currently on.
298        cfg.interceptor_state()
299            .store_put::<RequestAttempts>(i.into());
300        // Backoff time should not be included in the attempt timeout
301        if let Some((delay, sleep)) = retry_delay.take() {
302            debug!("delaying for {delay:?}");
303            sleep.await;
304        }
305        let attempt_timeout_config =
306            MaybeTimeoutConfig::new(runtime_components, cfg, TimeoutKind::OperationAttempt);
307        trace!(attempt_timeout_config = ?attempt_timeout_config);
308        let maybe_timeout = async {
309            debug!("beginning attempt #{i}");
310            try_attempt(ctx, cfg, runtime_components, stop_point)
311                .instrument(debug_span!("try_attempt", "attempt" = i))
312                .await;
313            finally_attempt(ctx, cfg, runtime_components)
314                .instrument(debug_span!("finally_attempt", "attempt" = i))
315                .await;
316            Result::<_, SdkError<Error, HttpResponse>>::Ok(())
317        }
318        .maybe_timeout(attempt_timeout_config)
319        .await
320        .map_err(|err| OrchestratorError::timeout(err.into_source().unwrap()));
321
322        // We continue when encountering a timeout error. The retry classifier will decide what to do with it.
323        continue_on_err!([ctx] => maybe_timeout);
324
325        // If we got a retry strategy from the bag, ask it what to do.
326        // If no strategy was set, we won't retry.
327        let should_attempt = halt_on_err!([ctx] => runtime_components
328            .retry_strategy()
329            .should_attempt_retry(ctx, runtime_components, cfg)
330            .map_err(OrchestratorError::other));
331        match should_attempt {
332            // Yes, let's retry the request
333            ShouldAttempt::Yes => continue,
334            // No, this request shouldn't be retried
335            ShouldAttempt::No => {
336                debug!("a retry is either unnecessary or not possible, exiting attempt loop");
337                break;
338            }
339            ShouldAttempt::YesAfterDelay(delay) => {
340                let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
341                    "the retry strategy requested a delay before sending the retry request, but no 'async sleep' implementation was set"
342                )));
343                retry_delay = Some((delay, sleep_impl.sleep(delay)));
344                continue;
345            }
346        }
347    }
348}
349
350async fn try_attempt(
351    ctx: &mut InterceptorContext,
352    cfg: &mut ConfigBag,
353    runtime_components: &RuntimeComponents,
354    stop_point: StopPoint,
355) {
356    run_interceptors!(halt_on_err: read_before_attempt(ctx, runtime_components, cfg));
357
358    let (scheme_id, identity, endpoint) = halt_on_err!([ctx] => resolve_identity(runtime_components, cfg).await.map_err(OrchestratorError::other));
359
360    match endpoint {
361        Some(endpoint) => {
362            // This branch is for backward compatibility when `AuthSchemeAndEndpointOrchestrationV2` is not present in the config bag.
363            // `resolve_identity` internally resolved an endpoint to determine the most suitable scheme ID, and returned that endpoint.
364            halt_on_err!([ctx] => apply_endpoint(&endpoint, ctx, cfg).map_err(OrchestratorError::other));
365            // Make the endpoint config available to interceptors
366            cfg.interceptor_state().store_put(endpoint);
367        }
368        None => {
369            halt_on_err!([ctx] => orchestrate_endpoint(identity.clone(), ctx, runtime_components, cfg)
370				    .instrument(debug_span!("orchestrate_endpoint"))
371				    .await
372				    .map_err(OrchestratorError::other));
373        }
374    }
375
376    run_interceptors!(halt_on_err: {
377        modify_before_signing(ctx, runtime_components, cfg);
378        read_before_signing(ctx, runtime_components, cfg);
379    });
380
381    halt_on_err!([ctx] => sign_request(&scheme_id, &identity, ctx, runtime_components, cfg).map_err(OrchestratorError::other));
382
383    run_interceptors!(halt_on_err: {
384        read_after_signing(ctx, runtime_components, cfg);
385        modify_before_transmit(ctx, runtime_components, cfg);
386        read_before_transmit(ctx, runtime_components, cfg);
387    });
388
389    // Return early if a stop point is set for before transmit
390    if let StopPoint::BeforeTransmit = stop_point {
391        debug!("ending orchestration early because the stop point is `BeforeTransmit`");
392        return;
393    }
394
395    // The connection consumes the request but we need to keep a copy of it
396    // within the interceptor context, so we clone it here.
397    ctx.enter_transmit_phase();
398    let response = halt_on_err!([ctx] => {
399        let request = ctx.take_request().expect("set during serialization");
400        trace!(request = ?request, "transmitting request");
401        let http_client = halt_on_err!([ctx] => runtime_components.http_client().ok_or_else(||
402            OrchestratorError::other("No HTTP client was available to send this request. \
403                Enable the `default-https-client` crate feature or configure an HTTP client to fix this.")
404        ));
405        let timeout_config = cfg.load::<TimeoutConfig>().expect("timeout config must be set");
406        let settings = {
407            let mut builder = HttpConnectorSettings::builder();
408            builder.set_connect_timeout(timeout_config.connect_timeout());
409            builder.set_read_timeout(timeout_config.read_timeout());
410            builder.build()
411        };
412        let connector = http_client.http_connector(&settings, runtime_components);
413        let response_future = MaybeUploadThroughputCheckFuture::new(
414            cfg,
415            runtime_components,
416            connector.call(request),
417        );
418        response_future.await.map_err(OrchestratorError::connector)
419    });
420    trace!(response = ?response, "received response from service");
421    ctx.set_response(response);
422    ctx.enter_before_deserialization_phase();
423
424    run_interceptors!(halt_on_err: {
425        read_after_transmit(ctx, runtime_components, cfg);
426        modify_before_deserialization(ctx, runtime_components, cfg);
427        read_before_deserialization(ctx, runtime_components, cfg);
428    });
429
430    ctx.enter_deserialization_phase();
431    let output_or_error = async {
432        let response = ctx.response_mut().expect("set during transmit");
433        let response_deserializer = cfg
434            .load::<SharedResponseDeserializer>()
435            .expect("a request deserializer must be in the config bag");
436        let maybe_deserialized = {
437            let _span = debug_span!("deserialize_streaming").entered();
438            response_deserializer.deserialize_streaming(response)
439        };
440        match maybe_deserialized {
441            Some(output_or_error) => output_or_error,
442            None => read_body(response)
443                .instrument(debug_span!("read_body"))
444                .await
445                .map_err(OrchestratorError::response)
446                .and_then(|_| {
447                    let _span = debug_span!("deserialize_nonstreaming").entered();
448                    log_response_body(response, cfg);
449                    response_deserializer.deserialize_nonstreaming(response)
450                }),
451        }
452    }
453    .instrument(debug_span!("deserialization"))
454    .await;
455    trace!(output_or_error = ?output_or_error);
456    ctx.set_output_or_error(output_or_error);
457
458    ctx.enter_after_deserialization_phase();
459    run_interceptors!(halt_on_err: read_after_deserialization(ctx, runtime_components, cfg));
460}
461
462async fn finally_attempt(
463    ctx: &mut InterceptorContext,
464    cfg: &mut ConfigBag,
465    runtime_components: &RuntimeComponents,
466) {
467    run_interceptors!(continue_on_err: {
468        modify_before_attempt_completion(ctx, runtime_components, cfg);
469        read_after_attempt(ctx, runtime_components, cfg);
470    });
471}
472
473#[instrument(skip_all, level = "debug")]
474async fn finally_op(
475    ctx: &mut InterceptorContext,
476    cfg: &mut ConfigBag,
477    runtime_components: &RuntimeComponents,
478) {
479    run_interceptors!(continue_on_err: {
480        modify_before_completion(ctx, runtime_components, cfg);
481        read_after_execution(ctx, runtime_components, cfg);
482    });
483}
484
485#[cfg(all(test, any(feature = "test-util", feature = "legacy-test-util")))]
486mod tests {
487    use crate::client::auth::no_auth::{NoAuthRuntimePluginV2, NO_AUTH_SCHEME_ID};
488    use crate::client::orchestrator::endpoints::StaticUriEndpointResolver;
489    use crate::client::orchestrator::{invoke, invoke_with_stop_point, StopPoint};
490    use crate::client::retries::strategy::NeverRetryStrategy;
491    use crate::client::test_util::{
492        deserializer::CannedResponseDeserializer, serializer::CannedRequestSerializer,
493    };
494    use aws_smithy_http_client::test_util::NeverClient;
495    use aws_smithy_runtime_api::box_error::BoxError;
496    use aws_smithy_runtime_api::client::auth::static_resolver::StaticAuthSchemeOptionResolver;
497    use aws_smithy_runtime_api::client::auth::{
498        AuthSchemeOptionResolverParams, SharedAuthSchemeOptionResolver,
499    };
500    use aws_smithy_runtime_api::client::endpoint::{
501        EndpointResolverParams, SharedEndpointResolver,
502    };
503    use aws_smithy_runtime_api::client::http::{
504        http_client_fn, HttpConnector, HttpConnectorFuture,
505    };
506    use aws_smithy_runtime_api::client::interceptors::context::{
507        AfterDeserializationInterceptorContextRef, BeforeDeserializationInterceptorContextMut,
508        BeforeDeserializationInterceptorContextRef, BeforeSerializationInterceptorContextMut,
509        BeforeSerializationInterceptorContextRef, BeforeTransmitInterceptorContextMut,
510        BeforeTransmitInterceptorContextRef, FinalizerInterceptorContextMut,
511        FinalizerInterceptorContextRef, Input, Output,
512    };
513    use aws_smithy_runtime_api::client::interceptors::{Intercept, SharedInterceptor};
514    use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, OrchestratorError};
515    use aws_smithy_runtime_api::client::retries::SharedRetryStrategy;
516    use aws_smithy_runtime_api::client::runtime_components::{
517        RuntimeComponents, RuntimeComponentsBuilder,
518    };
519    use aws_smithy_runtime_api::client::runtime_plugin::{RuntimePlugin, RuntimePlugins};
520    use aws_smithy_runtime_api::client::ser_de::{
521        SharedRequestSerializer, SharedResponseDeserializer,
522    };
523    use aws_smithy_runtime_api::shared::IntoShared;
524    use aws_smithy_types::body::SdkBody;
525    use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
526    use aws_smithy_types::timeout::TimeoutConfig;
527    use http_1x::{Response, StatusCode};
528    use std::borrow::Cow;
529    use std::sync::atomic::{AtomicBool, Ordering};
530    use std::sync::Arc;
531    use tracing_test::traced_test;
532
533    fn new_request_serializer() -> CannedRequestSerializer {
534        CannedRequestSerializer::success(HttpRequest::empty())
535    }
536
537    fn new_response_deserializer() -> CannedResponseDeserializer {
538        CannedResponseDeserializer::new(
539            Response::builder()
540                .status(StatusCode::OK)
541                .body(SdkBody::empty())
542                .map_err(|err| OrchestratorError::other(Box::new(err)))
543                .map(Output::erase),
544        )
545    }
546
547    #[derive(Debug, Default)]
548    struct OkConnector {}
549
550    impl OkConnector {
551        fn new() -> Self {
552            Self::default()
553        }
554    }
555
556    impl HttpConnector for OkConnector {
557        fn call(&self, _request: HttpRequest) -> HttpConnectorFuture {
558            HttpConnectorFuture::ready(Ok(http_1x::Response::builder()
559                .status(200)
560                .body(SdkBody::empty())
561                .expect("OK response is valid")
562                .try_into()
563                .unwrap()))
564        }
565    }
566
567    #[derive(Debug)]
568    struct TestOperationRuntimePlugin {
569        builder: RuntimeComponentsBuilder,
570    }
571
572    impl TestOperationRuntimePlugin {
573        fn new() -> Self {
574            Self {
575                builder: RuntimeComponentsBuilder::for_tests()
576                    .with_retry_strategy(Some(SharedRetryStrategy::new(NeverRetryStrategy::new())))
577                    .with_endpoint_resolver(Some(SharedEndpointResolver::new(
578                        StaticUriEndpointResolver::http_localhost(8080),
579                    )))
580                    .with_http_client(Some(http_client_fn(|_, _| {
581                        OkConnector::new().into_shared()
582                    })))
583                    .with_auth_scheme_option_resolver(Some(SharedAuthSchemeOptionResolver::new(
584                        StaticAuthSchemeOptionResolver::new(vec![NO_AUTH_SCHEME_ID]),
585                    ))),
586            }
587        }
588    }
589
590    impl RuntimePlugin for TestOperationRuntimePlugin {
591        fn config(&self) -> Option<FrozenLayer> {
592            let mut layer = Layer::new("TestOperationRuntimePlugin");
593            layer.store_put(AuthSchemeOptionResolverParams::new("idontcare"));
594            layer.store_put(EndpointResolverParams::new("dontcare"));
595            layer.store_put(SharedRequestSerializer::new(new_request_serializer()));
596            layer.store_put(SharedResponseDeserializer::new(new_response_deserializer()));
597            layer.store_put(TimeoutConfig::builder().build());
598            Some(layer.freeze())
599        }
600
601        fn runtime_components(
602            &self,
603            _: &RuntimeComponentsBuilder,
604        ) -> Cow<'_, RuntimeComponentsBuilder> {
605            Cow::Borrowed(&self.builder)
606        }
607    }
608
609    macro_rules! interceptor_error_handling_test {
610        (read_before_execution, $ctx:ty, $expected:expr,) => {
611            interceptor_error_handling_test!(__private read_before_execution, $ctx, $expected,);
612        };
613        ($interceptor:ident, $ctx:ty, $expected:expr) => {
614            interceptor_error_handling_test!(__private $interceptor, $ctx, $expected, _rc: &RuntimeComponents,);
615        };
616        (__private $interceptor:ident, $ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
617            #[derive(Debug)]
618            struct FailingInterceptorA;
619            impl Intercept for FailingInterceptorA {
620                fn name(&self) -> &'static str { "FailingInterceptorA" }
621
622                fn $interceptor(
623                    &self,
624                    _ctx: $ctx,
625                    $($rc_arg)*
626                    _cfg: &mut ConfigBag,
627                ) -> Result<(), BoxError> {
628                    tracing::debug!("FailingInterceptorA called!");
629                    Err("FailingInterceptorA".into())
630                }
631            }
632
633            #[derive(Debug)]
634            struct FailingInterceptorB;
635            impl Intercept for FailingInterceptorB {
636                fn name(&self) -> &'static str { "FailingInterceptorB" }
637
638                fn $interceptor(
639                    &self,
640                    _ctx: $ctx,
641                    $($rc_arg)*
642                    _cfg: &mut ConfigBag,
643                ) -> Result<(), BoxError> {
644                    tracing::debug!("FailingInterceptorB called!");
645                    Err("FailingInterceptorB".into())
646                }
647            }
648
649            #[derive(Debug)]
650            struct FailingInterceptorC;
651            impl Intercept for FailingInterceptorC {
652                fn name(&self) -> &'static str { "FailingInterceptorC" }
653
654                fn $interceptor(
655                    &self,
656                    _ctx: $ctx,
657                    $($rc_arg)*
658                    _cfg: &mut ConfigBag,
659                ) -> Result<(), BoxError> {
660                    tracing::debug!("FailingInterceptorC called!");
661                    Err("FailingInterceptorC".into())
662                }
663            }
664
665            #[derive(Debug)]
666            struct FailingInterceptorsClientRuntimePlugin(RuntimeComponentsBuilder);
667            impl FailingInterceptorsClientRuntimePlugin {
668                fn new() -> Self {
669                    Self(RuntimeComponentsBuilder::new("test").with_interceptor(SharedInterceptor::new(FailingInterceptorA)))
670                }
671            }
672            impl RuntimePlugin for FailingInterceptorsClientRuntimePlugin {
673                fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
674                    Cow::Borrowed(&self.0)
675                }
676            }
677
678            #[derive(Debug)]
679            struct FailingInterceptorsOperationRuntimePlugin(RuntimeComponentsBuilder);
680            impl FailingInterceptorsOperationRuntimePlugin {
681                fn new() -> Self {
682                    Self(
683                        RuntimeComponentsBuilder::new("test")
684                            .with_interceptor(SharedInterceptor::new(FailingInterceptorB))
685                            .with_interceptor(SharedInterceptor::new(FailingInterceptorC))
686                    )
687                }
688            }
689            impl RuntimePlugin for FailingInterceptorsOperationRuntimePlugin {
690                fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
691                    Cow::Borrowed(&self.0)
692                }
693            }
694
695            let input = Input::doesnt_matter();
696            let runtime_plugins = RuntimePlugins::new()
697                .with_client_plugin(FailingInterceptorsClientRuntimePlugin::new())
698                .with_operation_plugin(TestOperationRuntimePlugin::new())
699                .with_operation_plugin(NoAuthRuntimePluginV2::new())
700                .with_operation_plugin(FailingInterceptorsOperationRuntimePlugin::new());
701            let actual = invoke("test", "test", input, &runtime_plugins)
702                .await
703                .expect_err("should error");
704            let actual = format!("{:?}", actual);
705            assert!(
706                actual.starts_with(&$expected),
707                "\nActual error:      {actual}\nShould start with: {}\n",
708                $expected
709            );
710
711            assert!(logs_contain("FailingInterceptorA called!"));
712            assert!(logs_contain("FailingInterceptorB called!"));
713            assert!(logs_contain("FailingInterceptorC called!"));
714        };
715    }
716
717    #[tokio::test]
718    #[traced_test]
719    async fn test_read_before_execution_error_handling() {
720        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeExecution, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
721        interceptor_error_handling_test!(
722            read_before_execution,
723            &BeforeSerializationInterceptorContextRef<'_>,
724            expected,
725        );
726    }
727
728    #[tokio::test]
729    #[traced_test]
730    async fn test_modify_before_serialization_error_handling() {
731        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
732        interceptor_error_handling_test!(
733            modify_before_serialization,
734            &mut BeforeSerializationInterceptorContextMut<'_>,
735            expected
736        );
737    }
738
739    #[tokio::test]
740    #[traced_test]
741    async fn test_read_before_serialization_error_handling() {
742        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
743        interceptor_error_handling_test!(
744            read_before_serialization,
745            &BeforeSerializationInterceptorContextRef<'_>,
746            expected
747        );
748    }
749
750    #[tokio::test]
751    #[traced_test]
752    async fn test_read_after_serialization_error_handling() {
753        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSerialization, interceptor_name: Some("FailingInterceptorC")"#.to_string();
754        interceptor_error_handling_test!(
755            read_after_serialization,
756            &BeforeTransmitInterceptorContextRef<'_>,
757            expected
758        );
759    }
760
761    #[tokio::test]
762    #[traced_test]
763    async fn test_modify_before_retry_loop_error_handling() {
764        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeRetryLoop, interceptor_name: Some("FailingInterceptorC")"#.to_string();
765        interceptor_error_handling_test!(
766            modify_before_retry_loop,
767            &mut BeforeTransmitInterceptorContextMut<'_>,
768            expected
769        );
770    }
771
772    #[tokio::test]
773    #[traced_test]
774    async fn test_read_before_attempt_error_handling() {
775        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeAttempt, interceptor_name: Some("FailingInterceptorC")"#;
776        interceptor_error_handling_test!(
777            read_before_attempt,
778            &BeforeTransmitInterceptorContextRef<'_>,
779            expected
780        );
781    }
782
783    #[tokio::test]
784    #[traced_test]
785    async fn test_modify_before_signing_error_handling() {
786        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
787        interceptor_error_handling_test!(
788            modify_before_signing,
789            &mut BeforeTransmitInterceptorContextMut<'_>,
790            expected
791        );
792    }
793
794    #[tokio::test]
795    #[traced_test]
796    async fn test_read_before_signing_error_handling() {
797        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
798        interceptor_error_handling_test!(
799            read_before_signing,
800            &BeforeTransmitInterceptorContextRef<'_>,
801            expected
802        );
803    }
804
805    #[tokio::test]
806    #[traced_test]
807    async fn test_read_after_signing_error_handling() {
808        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSigning, interceptor_name: Some("FailingInterceptorC")"#;
809        interceptor_error_handling_test!(
810            read_after_signing,
811            &BeforeTransmitInterceptorContextRef<'_>,
812            expected
813        );
814    }
815
816    #[tokio::test]
817    #[traced_test]
818    async fn test_modify_before_transmit_error_handling() {
819        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
820        interceptor_error_handling_test!(
821            modify_before_transmit,
822            &mut BeforeTransmitInterceptorContextMut<'_>,
823            expected
824        );
825    }
826
827    #[tokio::test]
828    #[traced_test]
829    async fn test_read_before_transmit_error_handling() {
830        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
831        interceptor_error_handling_test!(
832            read_before_transmit,
833            &BeforeTransmitInterceptorContextRef<'_>,
834            expected
835        );
836    }
837
838    #[tokio::test]
839    #[traced_test]
840    async fn test_read_after_transmit_error_handling() {
841        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterTransmit, interceptor_name: Some("FailingInterceptorC")"#;
842        interceptor_error_handling_test!(
843            read_after_transmit,
844            &BeforeDeserializationInterceptorContextRef<'_>,
845            expected
846        );
847    }
848
849    #[tokio::test]
850    #[traced_test]
851    async fn test_modify_before_deserialization_error_handling() {
852        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
853        interceptor_error_handling_test!(
854            modify_before_deserialization,
855            &mut BeforeDeserializationInterceptorContextMut<'_>,
856            expected
857        );
858    }
859
860    #[tokio::test]
861    #[traced_test]
862    async fn test_read_before_deserialization_error_handling() {
863        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
864        interceptor_error_handling_test!(
865            read_before_deserialization,
866            &BeforeDeserializationInterceptorContextRef<'_>,
867            expected
868        );
869    }
870
871    #[tokio::test]
872    #[traced_test]
873    async fn test_read_after_deserialization_error_handling() {
874        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
875        interceptor_error_handling_test!(
876            read_after_deserialization,
877            &AfterDeserializationInterceptorContextRef<'_>,
878            expected
879        );
880    }
881
882    #[tokio::test]
883    #[traced_test]
884    async fn test_modify_before_attempt_completion_error_handling() {
885        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("FailingInterceptorC")"#;
886        interceptor_error_handling_test!(
887            modify_before_attempt_completion,
888            &mut FinalizerInterceptorContextMut<'_>,
889            expected
890        );
891    }
892
893    #[tokio::test]
894    #[traced_test]
895    async fn test_read_after_attempt_error_handling() {
896        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("FailingInterceptorC")"#;
897        interceptor_error_handling_test!(
898            read_after_attempt,
899            &FinalizerInterceptorContextRef<'_>,
900            expected
901        );
902    }
903
904    #[tokio::test]
905    #[traced_test]
906    async fn test_modify_before_completion_error_handling() {
907        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("FailingInterceptorC")"#;
908        interceptor_error_handling_test!(
909            modify_before_completion,
910            &mut FinalizerInterceptorContextMut<'_>,
911            expected
912        );
913    }
914
915    #[tokio::test]
916    #[traced_test]
917    async fn test_read_after_execution_error_handling() {
918        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("FailingInterceptorC")"#;
919        interceptor_error_handling_test!(
920            read_after_execution,
921            &FinalizerInterceptorContextRef<'_>,
922            expected
923        );
924    }
925
926    macro_rules! interceptor_error_redirection_test {
927        (read_before_execution, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
928            interceptor_error_redirection_test!(__private read_before_execution, $origin_ctx, $destination_interceptor, $destination_ctx, $expected,);
929        };
930        ($origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
931            interceptor_error_redirection_test!(__private $origin_interceptor, $origin_ctx, $destination_interceptor, $destination_ctx, $expected, _rc: &RuntimeComponents,);
932        };
933        (__private $origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
934            #[derive(Debug)]
935            struct OriginInterceptor;
936            impl Intercept for OriginInterceptor {
937                fn name(&self) -> &'static str { "OriginInterceptor" }
938
939                fn $origin_interceptor(
940                    &self,
941                    _ctx: $origin_ctx,
942                    $($rc_arg)*
943                    _cfg: &mut ConfigBag,
944                ) -> Result<(), BoxError> {
945                    tracing::debug!("OriginInterceptor called!");
946                    Err("OriginInterceptor".into())
947                }
948            }
949
950            #[derive(Debug)]
951            struct DestinationInterceptor;
952            impl Intercept for DestinationInterceptor {
953                fn name(&self) -> &'static str { "DestinationInterceptor" }
954
955                fn $destination_interceptor(
956                    &self,
957                    _ctx: $destination_ctx,
958                    _runtime_components: &RuntimeComponents,
959                    _cfg: &mut ConfigBag,
960                ) -> Result<(), BoxError> {
961                    tracing::debug!("DestinationInterceptor called!");
962                    Err("DestinationInterceptor".into())
963                }
964            }
965
966            #[derive(Debug)]
967            struct InterceptorsTestOperationRuntimePlugin(RuntimeComponentsBuilder);
968            impl InterceptorsTestOperationRuntimePlugin {
969                fn new() -> Self {
970                    Self(
971                        RuntimeComponentsBuilder::new("test")
972                            .with_interceptor(SharedInterceptor::new(OriginInterceptor))
973                            .with_interceptor(SharedInterceptor::new(DestinationInterceptor))
974                    )
975                }
976            }
977            impl RuntimePlugin for InterceptorsTestOperationRuntimePlugin {
978                fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
979                    Cow::Borrowed(&self.0)
980                }
981            }
982
983            let input = Input::doesnt_matter();
984            let runtime_plugins = RuntimePlugins::new()
985                .with_operation_plugin(TestOperationRuntimePlugin::new())
986                .with_operation_plugin(NoAuthRuntimePluginV2::new())
987                .with_operation_plugin(InterceptorsTestOperationRuntimePlugin::new());
988            let actual = invoke("test", "test", input, &runtime_plugins)
989                .await
990                .expect_err("should error");
991            let actual = format!("{:?}", actual);
992            assert!(
993                actual.starts_with(&$expected),
994                "\nActual error:      {actual}\nShould start with: {}\n",
995                $expected
996            );
997
998            assert!(logs_contain("OriginInterceptor called!"));
999            assert!(logs_contain("DestinationInterceptor called!"));
1000        };
1001    }
1002
1003    #[tokio::test]
1004    #[traced_test]
1005    async fn test_read_before_execution_error_causes_jump_to_modify_before_completion() {
1006        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1007        interceptor_error_redirection_test!(
1008            read_before_execution,
1009            &BeforeSerializationInterceptorContextRef<'_>,
1010            modify_before_completion,
1011            &mut FinalizerInterceptorContextMut<'_>,
1012            expected
1013        );
1014    }
1015
1016    #[tokio::test]
1017    #[traced_test]
1018    async fn test_modify_before_serialization_error_causes_jump_to_modify_before_completion() {
1019        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1020        interceptor_error_redirection_test!(
1021            modify_before_serialization,
1022            &mut BeforeSerializationInterceptorContextMut<'_>,
1023            modify_before_completion,
1024            &mut FinalizerInterceptorContextMut<'_>,
1025            expected
1026        );
1027    }
1028
1029    #[tokio::test]
1030    #[traced_test]
1031    async fn test_read_before_serialization_error_causes_jump_to_modify_before_completion() {
1032        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1033        interceptor_error_redirection_test!(
1034            read_before_serialization,
1035            &BeforeSerializationInterceptorContextRef<'_>,
1036            modify_before_completion,
1037            &mut FinalizerInterceptorContextMut<'_>,
1038            expected
1039        );
1040    }
1041
1042    #[tokio::test]
1043    #[traced_test]
1044    async fn test_read_after_serialization_error_causes_jump_to_modify_before_completion() {
1045        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1046        interceptor_error_redirection_test!(
1047            read_after_serialization,
1048            &BeforeTransmitInterceptorContextRef<'_>,
1049            modify_before_completion,
1050            &mut FinalizerInterceptorContextMut<'_>,
1051            expected
1052        );
1053    }
1054
1055    #[tokio::test]
1056    #[traced_test]
1057    async fn test_modify_before_retry_loop_error_causes_jump_to_modify_before_completion() {
1058        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1059        interceptor_error_redirection_test!(
1060            modify_before_retry_loop,
1061            &mut BeforeTransmitInterceptorContextMut<'_>,
1062            modify_before_completion,
1063            &mut FinalizerInterceptorContextMut<'_>,
1064            expected
1065        );
1066    }
1067
1068    #[tokio::test]
1069    #[traced_test]
1070    async fn test_read_before_attempt_error_causes_jump_to_modify_before_attempt_completion() {
1071        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1072        interceptor_error_redirection_test!(
1073            read_before_attempt,
1074            &BeforeTransmitInterceptorContextRef<'_>,
1075            modify_before_attempt_completion,
1076            &mut FinalizerInterceptorContextMut<'_>,
1077            expected
1078        );
1079    }
1080
1081    #[tokio::test]
1082    #[traced_test]
1083    async fn test_modify_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1084        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1085        interceptor_error_redirection_test!(
1086            modify_before_signing,
1087            &mut BeforeTransmitInterceptorContextMut<'_>,
1088            modify_before_attempt_completion,
1089            &mut FinalizerInterceptorContextMut<'_>,
1090            expected
1091        );
1092    }
1093
1094    #[tokio::test]
1095    #[traced_test]
1096    async fn test_read_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1097        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1098        interceptor_error_redirection_test!(
1099            read_before_signing,
1100            &BeforeTransmitInterceptorContextRef<'_>,
1101            modify_before_attempt_completion,
1102            &mut FinalizerInterceptorContextMut<'_>,
1103            expected
1104        );
1105    }
1106
1107    #[tokio::test]
1108    #[traced_test]
1109    async fn test_read_after_signing_error_causes_jump_to_modify_before_attempt_completion() {
1110        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1111        interceptor_error_redirection_test!(
1112            read_after_signing,
1113            &BeforeTransmitInterceptorContextRef<'_>,
1114            modify_before_attempt_completion,
1115            &mut FinalizerInterceptorContextMut<'_>,
1116            expected
1117        );
1118    }
1119
1120    #[tokio::test]
1121    #[traced_test]
1122    async fn test_modify_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1123        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1124        interceptor_error_redirection_test!(
1125            modify_before_transmit,
1126            &mut BeforeTransmitInterceptorContextMut<'_>,
1127            modify_before_attempt_completion,
1128            &mut FinalizerInterceptorContextMut<'_>,
1129            expected
1130        );
1131    }
1132
1133    #[tokio::test]
1134    #[traced_test]
1135    async fn test_read_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1136        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1137        interceptor_error_redirection_test!(
1138            read_before_transmit,
1139            &BeforeTransmitInterceptorContextRef<'_>,
1140            modify_before_attempt_completion,
1141            &mut FinalizerInterceptorContextMut<'_>,
1142            expected
1143        );
1144    }
1145
1146    #[tokio::test]
1147    #[traced_test]
1148    async fn test_read_after_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1149        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1150        interceptor_error_redirection_test!(
1151            read_after_transmit,
1152            &BeforeDeserializationInterceptorContextRef<'_>,
1153            modify_before_attempt_completion,
1154            &mut FinalizerInterceptorContextMut<'_>,
1155            expected
1156        );
1157    }
1158
1159    #[tokio::test]
1160    #[traced_test]
1161    async fn test_modify_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1162    ) {
1163        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1164        interceptor_error_redirection_test!(
1165            modify_before_deserialization,
1166            &mut BeforeDeserializationInterceptorContextMut<'_>,
1167            modify_before_attempt_completion,
1168            &mut FinalizerInterceptorContextMut<'_>,
1169            expected
1170        );
1171    }
1172
1173    #[tokio::test]
1174    #[traced_test]
1175    async fn test_read_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1176    ) {
1177        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1178        interceptor_error_redirection_test!(
1179            read_before_deserialization,
1180            &BeforeDeserializationInterceptorContextRef<'_>,
1181            modify_before_attempt_completion,
1182            &mut FinalizerInterceptorContextMut<'_>,
1183            expected
1184        );
1185    }
1186
1187    #[tokio::test]
1188    #[traced_test]
1189    async fn test_read_after_deserialization_error_causes_jump_to_modify_before_attempt_completion()
1190    {
1191        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1192        interceptor_error_redirection_test!(
1193            read_after_deserialization,
1194            &AfterDeserializationInterceptorContextRef<'_>,
1195            modify_before_attempt_completion,
1196            &mut FinalizerInterceptorContextMut<'_>,
1197            expected
1198        );
1199    }
1200
1201    #[tokio::test]
1202    #[traced_test]
1203    async fn test_modify_before_attempt_completion_error_causes_jump_to_read_after_attempt() {
1204        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("DestinationInterceptor")"#;
1205        interceptor_error_redirection_test!(
1206            modify_before_attempt_completion,
1207            &mut FinalizerInterceptorContextMut<'_>,
1208            read_after_attempt,
1209            &FinalizerInterceptorContextRef<'_>,
1210            expected
1211        );
1212    }
1213
1214    #[tokio::test]
1215    #[traced_test]
1216    async fn test_modify_before_completion_error_causes_jump_to_read_after_execution() {
1217        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("DestinationInterceptor")"#;
1218        interceptor_error_redirection_test!(
1219            modify_before_completion,
1220            &mut FinalizerInterceptorContextMut<'_>,
1221            read_after_execution,
1222            &FinalizerInterceptorContextRef<'_>,
1223            expected
1224        );
1225    }
1226
1227    #[tokio::test]
1228    async fn test_stop_points() {
1229        let runtime_plugins = || {
1230            RuntimePlugins::new()
1231                .with_operation_plugin(TestOperationRuntimePlugin::new())
1232                .with_operation_plugin(NoAuthRuntimePluginV2::new())
1233        };
1234
1235        // StopPoint::None should result in a response getting set since orchestration doesn't stop
1236        let context = invoke_with_stop_point(
1237            "test",
1238            "test",
1239            Input::doesnt_matter(),
1240            &runtime_plugins(),
1241            StopPoint::None,
1242        )
1243        .await
1244        .expect("success");
1245        assert!(context.response().is_some());
1246
1247        // StopPoint::BeforeTransmit will exit right before sending the request, so there should be no response
1248        let context = invoke_with_stop_point(
1249            "test",
1250            "test",
1251            Input::doesnt_matter(),
1252            &runtime_plugins(),
1253            StopPoint::BeforeTransmit,
1254        )
1255        .await
1256        .expect("success");
1257        assert!(context.response().is_none());
1258    }
1259
1260    /// The "finally" interceptors should run upon error when the StopPoint is set to BeforeTransmit
1261    #[tokio::test]
1262    async fn test_stop_points_error_handling() {
1263        #[derive(Debug, Default)]
1264        struct Inner {
1265            modify_before_retry_loop_called: AtomicBool,
1266            modify_before_completion_called: AtomicBool,
1267            read_after_execution_called: AtomicBool,
1268        }
1269        #[derive(Clone, Debug, Default)]
1270        struct TestInterceptor {
1271            inner: Arc<Inner>,
1272        }
1273
1274        impl Intercept for TestInterceptor {
1275            fn name(&self) -> &'static str {
1276                "TestInterceptor"
1277            }
1278
1279            fn modify_before_retry_loop(
1280                &self,
1281                _context: &mut BeforeTransmitInterceptorContextMut<'_>,
1282                _rc: &RuntimeComponents,
1283                _cfg: &mut ConfigBag,
1284            ) -> Result<(), BoxError> {
1285                self.inner
1286                    .modify_before_retry_loop_called
1287                    .store(true, Ordering::Relaxed);
1288                Err("test error".into())
1289            }
1290
1291            fn modify_before_completion(
1292                &self,
1293                _context: &mut FinalizerInterceptorContextMut<'_>,
1294                _rc: &RuntimeComponents,
1295                _cfg: &mut ConfigBag,
1296            ) -> Result<(), BoxError> {
1297                self.inner
1298                    .modify_before_completion_called
1299                    .store(true, Ordering::Relaxed);
1300                Ok(())
1301            }
1302
1303            fn read_after_execution(
1304                &self,
1305                _context: &FinalizerInterceptorContextRef<'_>,
1306                _rc: &RuntimeComponents,
1307                _cfg: &mut ConfigBag,
1308            ) -> Result<(), BoxError> {
1309                self.inner
1310                    .read_after_execution_called
1311                    .store(true, Ordering::Relaxed);
1312                Ok(())
1313            }
1314        }
1315
1316        #[derive(Debug)]
1317        struct TestInterceptorRuntimePlugin {
1318            builder: RuntimeComponentsBuilder,
1319        }
1320
1321        impl RuntimePlugin for TestInterceptorRuntimePlugin {
1322            fn runtime_components(
1323                &self,
1324                _: &RuntimeComponentsBuilder,
1325            ) -> Cow<'_, RuntimeComponentsBuilder> {
1326                Cow::Borrowed(&self.builder)
1327            }
1328        }
1329
1330        let interceptor = TestInterceptor::default();
1331        let client = NeverClient::new();
1332        let runtime_plugins = || {
1333            RuntimePlugins::new()
1334                .with_operation_plugin(TestOperationRuntimePlugin::new())
1335                .with_operation_plugin(NoAuthRuntimePluginV2::new())
1336                .with_operation_plugin(TestInterceptorRuntimePlugin {
1337                    builder: RuntimeComponentsBuilder::new("test")
1338                        .with_interceptor(SharedInterceptor::new(interceptor.clone()))
1339                        .with_http_client(Some(client.clone())),
1340                })
1341        };
1342
1343        // StopPoint::BeforeTransmit will exit right before sending the request, so there should be no response
1344        let _err = invoke_with_stop_point(
1345            "test",
1346            "test",
1347            Input::doesnt_matter(),
1348            &runtime_plugins(),
1349            StopPoint::BeforeTransmit,
1350        )
1351        .await
1352        .expect_err("an error was returned");
1353        assert_eq!(client.num_calls(), 0);
1354
1355        assert!(interceptor
1356            .inner
1357            .modify_before_retry_loop_called
1358            .load(Ordering::Relaxed));
1359        assert!(interceptor
1360            .inner
1361            .modify_before_completion_called
1362            .load(Ordering::Relaxed));
1363        assert!(interceptor
1364            .inner
1365            .read_after_execution_called
1366            .load(Ordering::Relaxed));
1367    }
1368}