aws_smithy_runtime/client/
orchestrator.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use crate::client::interceptors::Interceptors;
7use crate::client::orchestrator::http::{log_response_body, read_body};
8use crate::client::timeout::{MaybeTimeout, MaybeTimeoutConfig, TimeoutKind};
9use crate::client::{
10    http::body::minimum_throughput::MaybeUploadThroughputCheckFuture,
11    orchestrator::endpoints::orchestrate_endpoint,
12};
13use auth::{resolve_identity, sign_request};
14use aws_smithy_async::rt::sleep::AsyncSleep;
15use aws_smithy_runtime_api::box_error::BoxError;
16use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnector, HttpConnectorSettings};
17use aws_smithy_runtime_api::client::interceptors::context::{
18    Error, Input, InterceptorContext, Output, RewindResult,
19};
20use aws_smithy_runtime_api::client::orchestrator::{
21    HttpResponse, LoadedRequestBody, OrchestratorError,
22};
23use aws_smithy_runtime_api::client::result::SdkError;
24use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
25use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
26use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
27use aws_smithy_runtime_api::client::ser_de::{
28    DeserializeResponse, SerializeRequest, SharedRequestSerializer, SharedResponseDeserializer,
29};
30use aws_smithy_types::body::SdkBody;
31use aws_smithy_types::byte_stream::ByteStream;
32use aws_smithy_types::config_bag::ConfigBag;
33use aws_smithy_types::timeout::{MergeTimeoutConfig, TimeoutConfig};
34use endpoints::apply_endpoint;
35use std::mem;
36use tracing::{debug, debug_span, instrument, trace, Instrument};
37
38mod auth;
39pub use auth::AuthSchemeAndEndpointOrchestrationV2;
40
41/// Defines types that implement a trait for endpoint resolution
42pub mod endpoints;
43
44/// Defines types that work with HTTP types
45mod http;
46
47/// Utility for making one-off unmodeled requests with the orchestrator.
48pub mod operation;
49
50macro_rules! halt {
51    ([$ctx:ident] => $err:expr) => {{
52        debug!("encountered orchestrator error; halting");
53        $ctx.fail($err.into());
54        return;
55    }};
56}
57
58macro_rules! halt_on_err {
59    ([$ctx:ident] => $expr:expr) => {
60        match $expr {
61            Ok(ok) => ok,
62            Err(err) => halt!([$ctx] => err),
63        }
64    };
65}
66
67macro_rules! continue_on_err {
68    ([$ctx:ident] => $expr:expr) => {
69        if let Err(err) = $expr {
70            debug!(err = ?err, "encountered orchestrator error; continuing");
71            $ctx.fail(err.into());
72        }
73    };
74}
75
76macro_rules! run_interceptors {
77    (continue_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
78        $(run_interceptors!(continue_on_err: $interceptor($ctx, $rc, $cfg));)+
79    };
80    (continue_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
81        continue_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
82    };
83    (halt_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
84        $(run_interceptors!(halt_on_err: $interceptor($ctx, $rc, $cfg));)+
85    };
86    (halt_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
87        halt_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
88    };
89    (__private $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
90        Interceptors::new($rc.interceptors()).$interceptor($ctx, $rc, $cfg)
91    };
92}
93
94/// Orchestrates the execution of a request and handling of a response.
95///
96/// The given `runtime_plugins` will be used to generate a `ConfigBag` for this request,
97/// and then the given `input` will be serialized and transmitted. When a response is
98/// received, it will be deserialized and returned.
99///
100/// This orchestration handles retries, endpoint resolution, identity resolution, and signing.
101/// Each of these are configurable via the config and runtime components given by the runtime
102/// plugins.
103pub async fn invoke(
104    service_name: &str,
105    operation_name: &str,
106    input: Input,
107    runtime_plugins: &RuntimePlugins,
108) -> Result<Output, SdkError<Error, HttpResponse>> {
109    invoke_with_stop_point(
110        service_name,
111        operation_name,
112        input,
113        runtime_plugins,
114        StopPoint::None,
115    )
116    .await?
117    .finalize()
118}
119
120/// Allows for returning early at different points during orchestration.
121#[non_exhaustive]
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub enum StopPoint {
124    /// Don't stop orchestration early
125    None,
126
127    /// Stop the orchestrator before transmitting the request
128    BeforeTransmit,
129}
130
131/// Same as [`invoke`], but allows for returning early at different points during orchestration.
132///
133/// Orchestration will cease at the point specified by `stop_point`. This is useful for orchestrations
134/// that don't need to actually transmit requests, such as for generating presigned requests.
135///
136/// See the docs on [`invoke`] for more details.
137pub async fn invoke_with_stop_point(
138    // NOTE: service_name and operation_name were at one point used for instrumentation that is now
139    // handled as part of codegen. Manually constructed operations (e.g. via Operation::builder())
140    // are handled as part of Operation::invoke
141    _service_name: &str,
142    _operation_name: &str,
143    input: Input,
144    runtime_plugins: &RuntimePlugins,
145    stop_point: StopPoint,
146) -> Result<InterceptorContext, SdkError<Error, HttpResponse>> {
147    async move {
148        let mut cfg = ConfigBag::base();
149        let cfg = &mut cfg;
150
151        let mut ctx = InterceptorContext::new(input);
152
153        let runtime_components = apply_configuration(&mut ctx, cfg, runtime_plugins)
154            .map_err(SdkError::construction_failure)?;
155        trace!(runtime_components = ?runtime_components);
156
157        let operation_timeout_config =
158            MaybeTimeoutConfig::new(&runtime_components, cfg, TimeoutKind::Operation);
159        trace!(operation_timeout_config = ?operation_timeout_config);
160        async {
161            // If running the pre-execution interceptors failed, then we skip running the op and run the
162            // final interceptors instead.
163            if !ctx.is_failed() {
164                try_op(&mut ctx, cfg, &runtime_components, stop_point).await;
165            }
166            finally_op(&mut ctx, cfg, &runtime_components).await;
167            if ctx.is_failed() {
168                Err(ctx.finalize().expect_err("it is failed"))
169            } else {
170                Ok(ctx)
171            }
172        }
173        .maybe_timeout(operation_timeout_config)
174        .await
175    }
176    .await
177}
178
179/// Apply configuration is responsible for apply runtime plugins to the config bag, as well as running
180/// `read_before_execution` interceptors. If a failure occurs due to config construction, `invoke`
181/// will raise it to the user. If an interceptor fails, then `invoke`
182#[instrument(skip_all, level = "debug")]
183fn apply_configuration(
184    ctx: &mut InterceptorContext,
185    cfg: &mut ConfigBag,
186    runtime_plugins: &RuntimePlugins,
187) -> Result<RuntimeComponents, BoxError> {
188    let client_rc_builder = runtime_plugins.apply_client_configuration(cfg)?;
189    continue_on_err!([ctx] => Interceptors::new(client_rc_builder.interceptors()).read_before_execution(false, ctx, cfg));
190
191    let operation_rc_builder = runtime_plugins.apply_operation_configuration(cfg)?;
192    continue_on_err!([ctx] => Interceptors::new(operation_rc_builder.interceptors()).read_before_execution(true, ctx, cfg));
193
194    // The order below is important. Client interceptors must run before operation interceptors.
195    let components = RuntimeComponents::builder("merged orchestrator components")
196        .merge_from(&client_rc_builder)
197        .merge_from(&operation_rc_builder)
198        .build()?;
199
200    // In an ideal world, we'd simply update `cfg.load` to behave this way. Unfortunately, we can't
201    // do that without a breaking change. By overwriting the value in the config bag with a merged
202    // version, we can achieve a very similar behavior. `MergeTimeoutConfig`
203    let resolved_timeout_config = cfg.load::<MergeTimeoutConfig>();
204    debug!(
205        "timeout settings for this operation: {:?}",
206        resolved_timeout_config
207    );
208    cfg.interceptor_state().store_put(resolved_timeout_config);
209
210    components.validate_final_config(cfg)?;
211    Ok(components)
212}
213
214#[instrument(skip_all, level = "debug")]
215async fn try_op(
216    ctx: &mut InterceptorContext,
217    cfg: &mut ConfigBag,
218    runtime_components: &RuntimeComponents,
219    stop_point: StopPoint,
220) {
221    // Before serialization
222    run_interceptors!(halt_on_err: {
223        modify_before_serialization(ctx, runtime_components, cfg);
224        read_before_serialization(ctx, runtime_components, cfg);
225    });
226
227    // Serialization
228    ctx.enter_serialization_phase();
229    {
230        let _span = debug_span!("serialization").entered();
231        let request_serializer = cfg
232            .load::<SharedRequestSerializer>()
233            .expect("request serializer must be in the config bag")
234            .clone();
235        let input = ctx.take_input().expect("input set at this point");
236        let request = halt_on_err!([ctx] => request_serializer.serialize_input(input, cfg).map_err(OrchestratorError::other));
237        ctx.set_request(request);
238    }
239
240    // Load the request body into memory if configured to do so
241    if let Some(&LoadedRequestBody::Requested) = cfg.load::<LoadedRequestBody>() {
242        debug!("loading request body into memory");
243        let mut body = SdkBody::taken();
244        mem::swap(&mut body, ctx.request_mut().expect("set above").body_mut());
245        let loaded_body = halt_on_err!([ctx] =>
246            ByteStream::new(body).collect().await.map_err(OrchestratorError::other)
247        )
248        .into_bytes();
249        *ctx.request_mut().as_mut().expect("set above").body_mut() =
250            SdkBody::from(loaded_body.clone());
251        cfg.interceptor_state()
252            .store_put(LoadedRequestBody::Loaded(loaded_body));
253    }
254
255    // Before transmit
256    ctx.enter_before_transmit_phase();
257    run_interceptors!(halt_on_err: {
258        read_after_serialization(ctx, runtime_components, cfg);
259        modify_before_retry_loop(ctx, runtime_components, cfg);
260    });
261
262    // If we got a retry strategy from the bag, ask it what to do.
263    // Otherwise, assume we should attempt the initial request.
264    let should_attempt = runtime_components
265        .retry_strategy()
266        .should_attempt_initial_request(runtime_components, cfg);
267    match should_attempt {
268        // Yes, let's make a request
269        Ok(ShouldAttempt::Yes) => debug!("retry strategy has OKed initial request"),
270        // No, this request shouldn't be sent
271        Ok(ShouldAttempt::No) => {
272            let err: BoxError = "the retry strategy indicates that an initial request shouldn't be made, but it didn't specify why".into();
273            halt!([ctx] => OrchestratorError::other(err));
274        }
275        // No, we shouldn't make a request because...
276        Err(err) => halt!([ctx] => OrchestratorError::other(err)),
277        Ok(ShouldAttempt::YesAfterDelay(delay)) => {
278            let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
279                "the retry strategy requested a delay before sending the initial request, but no 'async sleep' implementation was set"
280            )));
281            debug!("retry strategy has OKed initial request after a {delay:?} delay");
282            sleep_impl.sleep(delay).await;
283        }
284    }
285
286    // Save a request checkpoint before we make the request. This will allow us to "rewind"
287    // the request in the case of retry attempts.
288    ctx.save_checkpoint();
289    let mut retry_delay = None;
290    for i in 1u32.. {
291        // Break from the loop if we can't rewind the request's state. This will always succeed the
292        // first time, but will fail on subsequent iterations if the request body wasn't retryable.
293        trace!("checking if context can be rewound for attempt #{i}");
294        if let RewindResult::Impossible = ctx.rewind(cfg) {
295            debug!("request cannot be retried since the request body cannot be cloned");
296            break;
297        }
298        // Track which attempt we're currently on.
299        cfg.interceptor_state()
300            .store_put::<RequestAttempts>(i.into());
301        // Backoff time should not be included in the attempt timeout
302        if let Some((delay, sleep)) = retry_delay.take() {
303            debug!("delaying for {delay:?}");
304            sleep.await;
305        }
306        let attempt_timeout_config =
307            MaybeTimeoutConfig::new(runtime_components, cfg, TimeoutKind::OperationAttempt);
308        trace!(attempt_timeout_config = ?attempt_timeout_config);
309        let maybe_timeout = async {
310            debug!("beginning attempt #{i}");
311            try_attempt(ctx, cfg, runtime_components, stop_point)
312                .instrument(debug_span!("try_attempt", "attempt" = i))
313                .await;
314            finally_attempt(ctx, cfg, runtime_components)
315                .instrument(debug_span!("finally_attempt", "attempt" = i))
316                .await;
317            Result::<_, SdkError<Error, HttpResponse>>::Ok(())
318        }
319        .maybe_timeout(attempt_timeout_config)
320        .await
321        .map_err(|err| OrchestratorError::timeout(err.into_source().unwrap()));
322
323        // We continue when encountering a timeout error. The retry classifier will decide what to do with it.
324        continue_on_err!([ctx] => maybe_timeout);
325
326        // If we got a retry strategy from the bag, ask it what to do.
327        // If no strategy was set, we won't retry.
328        let should_attempt = halt_on_err!([ctx] => runtime_components
329            .retry_strategy()
330            .should_attempt_retry(ctx, runtime_components, cfg)
331            .map_err(OrchestratorError::other));
332        match should_attempt {
333            // Yes, let's retry the request
334            ShouldAttempt::Yes => continue,
335            // No, this request shouldn't be retried
336            ShouldAttempt::No => {
337                debug!("a retry is either unnecessary or not possible, exiting attempt loop");
338                break;
339            }
340            ShouldAttempt::YesAfterDelay(delay) => {
341                let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
342                    "the retry strategy requested a delay before sending the retry request, but no 'async sleep' implementation was set"
343                )));
344                retry_delay = Some((delay, sleep_impl.sleep(delay)));
345                continue;
346            }
347        }
348    }
349}
350
351async fn try_attempt(
352    ctx: &mut InterceptorContext,
353    cfg: &mut ConfigBag,
354    runtime_components: &RuntimeComponents,
355    stop_point: StopPoint,
356) {
357    run_interceptors!(halt_on_err: read_before_attempt(ctx, runtime_components, cfg));
358
359    let (scheme_id, identity, endpoint) = halt_on_err!([ctx] => resolve_identity(runtime_components, cfg).await.map_err(OrchestratorError::other));
360
361    match endpoint {
362        Some(endpoint) => {
363            // This branch is for backward compatibility when `AuthSchemeAndEndpointOrchestrationV2` is not present in the config bag.
364            // `resolve_identity` internally resolved an endpoint to determine the most suitable scheme ID, and returned that endpoint.
365            halt_on_err!([ctx] => apply_endpoint(&endpoint, ctx, cfg).map_err(OrchestratorError::other));
366            // Make the endpoint config available to interceptors
367            cfg.interceptor_state().store_put(endpoint);
368        }
369        None => {
370            halt_on_err!([ctx] => orchestrate_endpoint(identity.clone(), ctx, runtime_components, cfg)
371				    .instrument(debug_span!("orchestrate_endpoint"))
372				    .await
373				    .map_err(OrchestratorError::other));
374        }
375    }
376
377    run_interceptors!(halt_on_err: {
378        modify_before_signing(ctx, runtime_components, cfg);
379        read_before_signing(ctx, runtime_components, cfg);
380    });
381
382    halt_on_err!([ctx] => sign_request(&scheme_id, &identity, ctx, runtime_components, cfg).map_err(OrchestratorError::other));
383
384    run_interceptors!(halt_on_err: {
385        read_after_signing(ctx, runtime_components, cfg);
386        modify_before_transmit(ctx, runtime_components, cfg);
387        read_before_transmit(ctx, runtime_components, cfg);
388    });
389
390    // Return early if a stop point is set for before transmit
391    if let StopPoint::BeforeTransmit = stop_point {
392        debug!("ending orchestration early because the stop point is `BeforeTransmit`");
393        return;
394    }
395
396    // The connection consumes the request but we need to keep a copy of it
397    // within the interceptor context, so we clone it here.
398    ctx.enter_transmit_phase();
399    let response = halt_on_err!([ctx] => {
400        let request = ctx.take_request().expect("set during serialization");
401        trace!(request = ?request, "transmitting request");
402        let http_client = halt_on_err!([ctx] => runtime_components.http_client().ok_or_else(||
403            OrchestratorError::other("No HTTP client was available to send this request. \
404                Enable the `default-https-client` crate feature or configure an HTTP client to fix this.")
405        ));
406        let timeout_config = cfg.load::<TimeoutConfig>().expect("timeout config must be set");
407        let settings = {
408            let mut builder = HttpConnectorSettings::builder();
409            builder.set_connect_timeout(timeout_config.connect_timeout());
410            builder.set_read_timeout(timeout_config.read_timeout());
411            builder.build()
412        };
413        let connector = http_client.http_connector(&settings, runtime_components);
414        let response_future = MaybeUploadThroughputCheckFuture::new(
415            cfg,
416            runtime_components,
417            connector.call(request),
418        );
419        response_future.await.map_err(OrchestratorError::connector)
420    });
421    trace!(response = ?response, "received response from service");
422    ctx.set_response(response);
423    ctx.enter_before_deserialization_phase();
424
425    run_interceptors!(halt_on_err: {
426        read_after_transmit(ctx, runtime_components, cfg);
427        modify_before_deserialization(ctx, runtime_components, cfg);
428        read_before_deserialization(ctx, runtime_components, cfg);
429    });
430
431    ctx.enter_deserialization_phase();
432    let output_or_error = async {
433        let response = ctx.response_mut().expect("set during transmit");
434        let response_deserializer = cfg
435            .load::<SharedResponseDeserializer>()
436            .expect("a request deserializer must be in the config bag");
437        let maybe_deserialized = {
438            let _span = debug_span!("deserialize_streaming").entered();
439            response_deserializer.deserialize_streaming(response)
440        };
441        match maybe_deserialized {
442            Some(output_or_error) => output_or_error,
443            None => read_body(response)
444                .instrument(debug_span!("read_body"))
445                .await
446                .map_err(OrchestratorError::response)
447                .and_then(|_| {
448                    let _span = debug_span!("deserialize_nonstreaming").entered();
449                    log_response_body(response, cfg);
450                    response_deserializer.deserialize_nonstreaming(response)
451                }),
452        }
453    }
454    .instrument(debug_span!("deserialization"))
455    .await;
456    trace!(output_or_error = ?output_or_error);
457    ctx.set_output_or_error(output_or_error);
458
459    ctx.enter_after_deserialization_phase();
460    run_interceptors!(halt_on_err: read_after_deserialization(ctx, runtime_components, cfg));
461}
462
463async fn finally_attempt(
464    ctx: &mut InterceptorContext,
465    cfg: &mut ConfigBag,
466    runtime_components: &RuntimeComponents,
467) {
468    run_interceptors!(continue_on_err: {
469        modify_before_attempt_completion(ctx, runtime_components, cfg);
470        read_after_attempt(ctx, runtime_components, cfg);
471    });
472}
473
474#[instrument(skip_all, level = "debug")]
475async fn finally_op(
476    ctx: &mut InterceptorContext,
477    cfg: &mut ConfigBag,
478    runtime_components: &RuntimeComponents,
479) {
480    run_interceptors!(continue_on_err: {
481        modify_before_completion(ctx, runtime_components, cfg);
482        read_after_execution(ctx, runtime_components, cfg);
483    });
484}
485
486#[cfg(all(test, any(feature = "test-util", feature = "legacy-test-util")))]
487mod tests {
488    use crate::client::auth::no_auth::{NoAuthRuntimePlugin, NO_AUTH_SCHEME_ID};
489    use crate::client::orchestrator::endpoints::StaticUriEndpointResolver;
490    use crate::client::orchestrator::{invoke, invoke_with_stop_point, StopPoint};
491    use crate::client::retries::strategy::NeverRetryStrategy;
492    use crate::client::test_util::{
493        deserializer::CannedResponseDeserializer, serializer::CannedRequestSerializer,
494    };
495    use aws_smithy_http_client::test_util::NeverClient;
496    use aws_smithy_runtime_api::box_error::BoxError;
497    use aws_smithy_runtime_api::client::auth::static_resolver::StaticAuthSchemeOptionResolver;
498    use aws_smithy_runtime_api::client::auth::{
499        AuthSchemeOptionResolverParams, SharedAuthSchemeOptionResolver,
500    };
501    use aws_smithy_runtime_api::client::endpoint::{
502        EndpointResolverParams, SharedEndpointResolver,
503    };
504    use aws_smithy_runtime_api::client::http::{
505        http_client_fn, HttpConnector, HttpConnectorFuture,
506    };
507    use aws_smithy_runtime_api::client::interceptors::context::{
508        AfterDeserializationInterceptorContextRef, BeforeDeserializationInterceptorContextMut,
509        BeforeDeserializationInterceptorContextRef, BeforeSerializationInterceptorContextMut,
510        BeforeSerializationInterceptorContextRef, BeforeTransmitInterceptorContextMut,
511        BeforeTransmitInterceptorContextRef, FinalizerInterceptorContextMut,
512        FinalizerInterceptorContextRef, Input, Output,
513    };
514    use aws_smithy_runtime_api::client::interceptors::{Intercept, SharedInterceptor};
515    use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, OrchestratorError};
516    use aws_smithy_runtime_api::client::retries::SharedRetryStrategy;
517    use aws_smithy_runtime_api::client::runtime_components::{
518        RuntimeComponents, RuntimeComponentsBuilder,
519    };
520    use aws_smithy_runtime_api::client::runtime_plugin::{RuntimePlugin, RuntimePlugins};
521    use aws_smithy_runtime_api::client::ser_de::{
522        SharedRequestSerializer, SharedResponseDeserializer,
523    };
524    use aws_smithy_runtime_api::shared::IntoShared;
525    use aws_smithy_types::body::SdkBody;
526    use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
527    use aws_smithy_types::timeout::TimeoutConfig;
528    use http_02x::{Response, StatusCode};
529    use std::borrow::Cow;
530    use std::sync::atomic::{AtomicBool, Ordering};
531    use std::sync::Arc;
532    use tracing_test::traced_test;
533
534    fn new_request_serializer() -> CannedRequestSerializer {
535        CannedRequestSerializer::success(HttpRequest::empty())
536    }
537
538    fn new_response_deserializer() -> CannedResponseDeserializer {
539        CannedResponseDeserializer::new(
540            Response::builder()
541                .status(StatusCode::OK)
542                .body(SdkBody::empty())
543                .map_err(|err| OrchestratorError::other(Box::new(err)))
544                .map(Output::erase),
545        )
546    }
547
548    #[derive(Debug, Default)]
549    struct OkConnector {}
550
551    impl OkConnector {
552        fn new() -> Self {
553            Self::default()
554        }
555    }
556
557    impl HttpConnector for OkConnector {
558        fn call(&self, _request: HttpRequest) -> HttpConnectorFuture {
559            HttpConnectorFuture::ready(Ok(http_02x::Response::builder()
560                .status(200)
561                .body(SdkBody::empty())
562                .expect("OK response is valid")
563                .try_into()
564                .unwrap()))
565        }
566    }
567
568    #[derive(Debug)]
569    struct TestOperationRuntimePlugin {
570        builder: RuntimeComponentsBuilder,
571    }
572
573    impl TestOperationRuntimePlugin {
574        fn new() -> Self {
575            Self {
576                builder: RuntimeComponentsBuilder::for_tests()
577                    .with_retry_strategy(Some(SharedRetryStrategy::new(NeverRetryStrategy::new())))
578                    .with_endpoint_resolver(Some(SharedEndpointResolver::new(
579                        StaticUriEndpointResolver::http_localhost(8080),
580                    )))
581                    .with_http_client(Some(http_client_fn(|_, _| {
582                        OkConnector::new().into_shared()
583                    })))
584                    .with_auth_scheme_option_resolver(Some(SharedAuthSchemeOptionResolver::new(
585                        StaticAuthSchemeOptionResolver::new(vec![NO_AUTH_SCHEME_ID]),
586                    ))),
587            }
588        }
589    }
590
591    impl RuntimePlugin for TestOperationRuntimePlugin {
592        fn config(&self) -> Option<FrozenLayer> {
593            let mut layer = Layer::new("TestOperationRuntimePlugin");
594            layer.store_put(AuthSchemeOptionResolverParams::new("idontcare"));
595            layer.store_put(EndpointResolverParams::new("dontcare"));
596            layer.store_put(SharedRequestSerializer::new(new_request_serializer()));
597            layer.store_put(SharedResponseDeserializer::new(new_response_deserializer()));
598            layer.store_put(TimeoutConfig::builder().build());
599            Some(layer.freeze())
600        }
601
602        fn runtime_components(
603            &self,
604            _: &RuntimeComponentsBuilder,
605        ) -> Cow<'_, RuntimeComponentsBuilder> {
606            Cow::Borrowed(&self.builder)
607        }
608    }
609
610    macro_rules! interceptor_error_handling_test {
611        (read_before_execution, $ctx:ty, $expected:expr,) => {
612            interceptor_error_handling_test!(__private read_before_execution, $ctx, $expected,);
613        };
614        ($interceptor:ident, $ctx:ty, $expected:expr) => {
615            interceptor_error_handling_test!(__private $interceptor, $ctx, $expected, _rc: &RuntimeComponents,);
616        };
617        (__private $interceptor:ident, $ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
618            #[derive(Debug)]
619            struct FailingInterceptorA;
620            impl Intercept for FailingInterceptorA {
621                fn name(&self) -> &'static str { "FailingInterceptorA" }
622
623                fn $interceptor(
624                    &self,
625                    _ctx: $ctx,
626                    $($rc_arg)*
627                    _cfg: &mut ConfigBag,
628                ) -> Result<(), BoxError> {
629                    tracing::debug!("FailingInterceptorA called!");
630                    Err("FailingInterceptorA".into())
631                }
632            }
633
634            #[derive(Debug)]
635            struct FailingInterceptorB;
636            impl Intercept for FailingInterceptorB {
637                fn name(&self) -> &'static str { "FailingInterceptorB" }
638
639                fn $interceptor(
640                    &self,
641                    _ctx: $ctx,
642                    $($rc_arg)*
643                    _cfg: &mut ConfigBag,
644                ) -> Result<(), BoxError> {
645                    tracing::debug!("FailingInterceptorB called!");
646                    Err("FailingInterceptorB".into())
647                }
648            }
649
650            #[derive(Debug)]
651            struct FailingInterceptorC;
652            impl Intercept for FailingInterceptorC {
653                fn name(&self) -> &'static str { "FailingInterceptorC" }
654
655                fn $interceptor(
656                    &self,
657                    _ctx: $ctx,
658                    $($rc_arg)*
659                    _cfg: &mut ConfigBag,
660                ) -> Result<(), BoxError> {
661                    tracing::debug!("FailingInterceptorC called!");
662                    Err("FailingInterceptorC".into())
663                }
664            }
665
666            #[derive(Debug)]
667            struct FailingInterceptorsClientRuntimePlugin(RuntimeComponentsBuilder);
668            impl FailingInterceptorsClientRuntimePlugin {
669                fn new() -> Self {
670                    Self(RuntimeComponentsBuilder::new("test").with_interceptor(SharedInterceptor::new(FailingInterceptorA)))
671                }
672            }
673            impl RuntimePlugin for FailingInterceptorsClientRuntimePlugin {
674                fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
675                    Cow::Borrowed(&self.0)
676                }
677            }
678
679            #[derive(Debug)]
680            struct FailingInterceptorsOperationRuntimePlugin(RuntimeComponentsBuilder);
681            impl FailingInterceptorsOperationRuntimePlugin {
682                fn new() -> Self {
683                    Self(
684                        RuntimeComponentsBuilder::new("test")
685                            .with_interceptor(SharedInterceptor::new(FailingInterceptorB))
686                            .with_interceptor(SharedInterceptor::new(FailingInterceptorC))
687                    )
688                }
689            }
690            impl RuntimePlugin for FailingInterceptorsOperationRuntimePlugin {
691                fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
692                    Cow::Borrowed(&self.0)
693                }
694            }
695
696            let input = Input::doesnt_matter();
697            let runtime_plugins = RuntimePlugins::new()
698                .with_client_plugin(FailingInterceptorsClientRuntimePlugin::new())
699                .with_operation_plugin(TestOperationRuntimePlugin::new())
700                .with_operation_plugin(NoAuthRuntimePlugin::new())
701                .with_operation_plugin(FailingInterceptorsOperationRuntimePlugin::new());
702            let actual = invoke("test", "test", input, &runtime_plugins)
703                .await
704                .expect_err("should error");
705            let actual = format!("{:?}", actual);
706            assert!(
707                actual.starts_with(&$expected),
708                "\nActual error:      {actual}\nShould start with: {}\n",
709                $expected
710            );
711
712            assert!(logs_contain("FailingInterceptorA called!"));
713            assert!(logs_contain("FailingInterceptorB called!"));
714            assert!(logs_contain("FailingInterceptorC called!"));
715        };
716    }
717
718    #[tokio::test]
719    #[traced_test]
720    async fn test_read_before_execution_error_handling() {
721        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeExecution, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
722        interceptor_error_handling_test!(
723            read_before_execution,
724            &BeforeSerializationInterceptorContextRef<'_>,
725            expected,
726        );
727    }
728
729    #[tokio::test]
730    #[traced_test]
731    async fn test_modify_before_serialization_error_handling() {
732        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
733        interceptor_error_handling_test!(
734            modify_before_serialization,
735            &mut BeforeSerializationInterceptorContextMut<'_>,
736            expected
737        );
738    }
739
740    #[tokio::test]
741    #[traced_test]
742    async fn test_read_before_serialization_error_handling() {
743        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
744        interceptor_error_handling_test!(
745            read_before_serialization,
746            &BeforeSerializationInterceptorContextRef<'_>,
747            expected
748        );
749    }
750
751    #[tokio::test]
752    #[traced_test]
753    async fn test_read_after_serialization_error_handling() {
754        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSerialization, interceptor_name: Some("FailingInterceptorC")"#.to_string();
755        interceptor_error_handling_test!(
756            read_after_serialization,
757            &BeforeTransmitInterceptorContextRef<'_>,
758            expected
759        );
760    }
761
762    #[tokio::test]
763    #[traced_test]
764    async fn test_modify_before_retry_loop_error_handling() {
765        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeRetryLoop, interceptor_name: Some("FailingInterceptorC")"#.to_string();
766        interceptor_error_handling_test!(
767            modify_before_retry_loop,
768            &mut BeforeTransmitInterceptorContextMut<'_>,
769            expected
770        );
771    }
772
773    #[tokio::test]
774    #[traced_test]
775    async fn test_read_before_attempt_error_handling() {
776        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeAttempt, interceptor_name: Some("FailingInterceptorC")"#;
777        interceptor_error_handling_test!(
778            read_before_attempt,
779            &BeforeTransmitInterceptorContextRef<'_>,
780            expected
781        );
782    }
783
784    #[tokio::test]
785    #[traced_test]
786    async fn test_modify_before_signing_error_handling() {
787        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
788        interceptor_error_handling_test!(
789            modify_before_signing,
790            &mut BeforeTransmitInterceptorContextMut<'_>,
791            expected
792        );
793    }
794
795    #[tokio::test]
796    #[traced_test]
797    async fn test_read_before_signing_error_handling() {
798        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
799        interceptor_error_handling_test!(
800            read_before_signing,
801            &BeforeTransmitInterceptorContextRef<'_>,
802            expected
803        );
804    }
805
806    #[tokio::test]
807    #[traced_test]
808    async fn test_read_after_signing_error_handling() {
809        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSigning, interceptor_name: Some("FailingInterceptorC")"#;
810        interceptor_error_handling_test!(
811            read_after_signing,
812            &BeforeTransmitInterceptorContextRef<'_>,
813            expected
814        );
815    }
816
817    #[tokio::test]
818    #[traced_test]
819    async fn test_modify_before_transmit_error_handling() {
820        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
821        interceptor_error_handling_test!(
822            modify_before_transmit,
823            &mut BeforeTransmitInterceptorContextMut<'_>,
824            expected
825        );
826    }
827
828    #[tokio::test]
829    #[traced_test]
830    async fn test_read_before_transmit_error_handling() {
831        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
832        interceptor_error_handling_test!(
833            read_before_transmit,
834            &BeforeTransmitInterceptorContextRef<'_>,
835            expected
836        );
837    }
838
839    #[tokio::test]
840    #[traced_test]
841    async fn test_read_after_transmit_error_handling() {
842        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterTransmit, interceptor_name: Some("FailingInterceptorC")"#;
843        interceptor_error_handling_test!(
844            read_after_transmit,
845            &BeforeDeserializationInterceptorContextRef<'_>,
846            expected
847        );
848    }
849
850    #[tokio::test]
851    #[traced_test]
852    async fn test_modify_before_deserialization_error_handling() {
853        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
854        interceptor_error_handling_test!(
855            modify_before_deserialization,
856            &mut BeforeDeserializationInterceptorContextMut<'_>,
857            expected
858        );
859    }
860
861    #[tokio::test]
862    #[traced_test]
863    async fn test_read_before_deserialization_error_handling() {
864        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
865        interceptor_error_handling_test!(
866            read_before_deserialization,
867            &BeforeDeserializationInterceptorContextRef<'_>,
868            expected
869        );
870    }
871
872    #[tokio::test]
873    #[traced_test]
874    async fn test_read_after_deserialization_error_handling() {
875        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
876        interceptor_error_handling_test!(
877            read_after_deserialization,
878            &AfterDeserializationInterceptorContextRef<'_>,
879            expected
880        );
881    }
882
883    #[tokio::test]
884    #[traced_test]
885    async fn test_modify_before_attempt_completion_error_handling() {
886        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("FailingInterceptorC")"#;
887        interceptor_error_handling_test!(
888            modify_before_attempt_completion,
889            &mut FinalizerInterceptorContextMut<'_>,
890            expected
891        );
892    }
893
894    #[tokio::test]
895    #[traced_test]
896    async fn test_read_after_attempt_error_handling() {
897        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("FailingInterceptorC")"#;
898        interceptor_error_handling_test!(
899            read_after_attempt,
900            &FinalizerInterceptorContextRef<'_>,
901            expected
902        );
903    }
904
905    #[tokio::test]
906    #[traced_test]
907    async fn test_modify_before_completion_error_handling() {
908        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("FailingInterceptorC")"#;
909        interceptor_error_handling_test!(
910            modify_before_completion,
911            &mut FinalizerInterceptorContextMut<'_>,
912            expected
913        );
914    }
915
916    #[tokio::test]
917    #[traced_test]
918    async fn test_read_after_execution_error_handling() {
919        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("FailingInterceptorC")"#;
920        interceptor_error_handling_test!(
921            read_after_execution,
922            &FinalizerInterceptorContextRef<'_>,
923            expected
924        );
925    }
926
927    macro_rules! interceptor_error_redirection_test {
928        (read_before_execution, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
929            interceptor_error_redirection_test!(__private read_before_execution, $origin_ctx, $destination_interceptor, $destination_ctx, $expected,);
930        };
931        ($origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
932            interceptor_error_redirection_test!(__private $origin_interceptor, $origin_ctx, $destination_interceptor, $destination_ctx, $expected, _rc: &RuntimeComponents,);
933        };
934        (__private $origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
935            #[derive(Debug)]
936            struct OriginInterceptor;
937            impl Intercept for OriginInterceptor {
938                fn name(&self) -> &'static str { "OriginInterceptor" }
939
940                fn $origin_interceptor(
941                    &self,
942                    _ctx: $origin_ctx,
943                    $($rc_arg)*
944                    _cfg: &mut ConfigBag,
945                ) -> Result<(), BoxError> {
946                    tracing::debug!("OriginInterceptor called!");
947                    Err("OriginInterceptor".into())
948                }
949            }
950
951            #[derive(Debug)]
952            struct DestinationInterceptor;
953            impl Intercept for DestinationInterceptor {
954                fn name(&self) -> &'static str { "DestinationInterceptor" }
955
956                fn $destination_interceptor(
957                    &self,
958                    _ctx: $destination_ctx,
959                    _runtime_components: &RuntimeComponents,
960                    _cfg: &mut ConfigBag,
961                ) -> Result<(), BoxError> {
962                    tracing::debug!("DestinationInterceptor called!");
963                    Err("DestinationInterceptor".into())
964                }
965            }
966
967            #[derive(Debug)]
968            struct InterceptorsTestOperationRuntimePlugin(RuntimeComponentsBuilder);
969            impl InterceptorsTestOperationRuntimePlugin {
970                fn new() -> Self {
971                    Self(
972                        RuntimeComponentsBuilder::new("test")
973                            .with_interceptor(SharedInterceptor::new(OriginInterceptor))
974                            .with_interceptor(SharedInterceptor::new(DestinationInterceptor))
975                    )
976                }
977            }
978            impl RuntimePlugin for InterceptorsTestOperationRuntimePlugin {
979                fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
980                    Cow::Borrowed(&self.0)
981                }
982            }
983
984            let input = Input::doesnt_matter();
985            let runtime_plugins = RuntimePlugins::new()
986                .with_operation_plugin(TestOperationRuntimePlugin::new())
987                .with_operation_plugin(NoAuthRuntimePlugin::new())
988                .with_operation_plugin(InterceptorsTestOperationRuntimePlugin::new());
989            let actual = invoke("test", "test", input, &runtime_plugins)
990                .await
991                .expect_err("should error");
992            let actual = format!("{:?}", actual);
993            assert!(
994                actual.starts_with(&$expected),
995                "\nActual error:      {actual}\nShould start with: {}\n",
996                $expected
997            );
998
999            assert!(logs_contain("OriginInterceptor called!"));
1000            assert!(logs_contain("DestinationInterceptor called!"));
1001        };
1002    }
1003
1004    #[tokio::test]
1005    #[traced_test]
1006    async fn test_read_before_execution_error_causes_jump_to_modify_before_completion() {
1007        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1008        interceptor_error_redirection_test!(
1009            read_before_execution,
1010            &BeforeSerializationInterceptorContextRef<'_>,
1011            modify_before_completion,
1012            &mut FinalizerInterceptorContextMut<'_>,
1013            expected
1014        );
1015    }
1016
1017    #[tokio::test]
1018    #[traced_test]
1019    async fn test_modify_before_serialization_error_causes_jump_to_modify_before_completion() {
1020        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1021        interceptor_error_redirection_test!(
1022            modify_before_serialization,
1023            &mut BeforeSerializationInterceptorContextMut<'_>,
1024            modify_before_completion,
1025            &mut FinalizerInterceptorContextMut<'_>,
1026            expected
1027        );
1028    }
1029
1030    #[tokio::test]
1031    #[traced_test]
1032    async fn test_read_before_serialization_error_causes_jump_to_modify_before_completion() {
1033        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1034        interceptor_error_redirection_test!(
1035            read_before_serialization,
1036            &BeforeSerializationInterceptorContextRef<'_>,
1037            modify_before_completion,
1038            &mut FinalizerInterceptorContextMut<'_>,
1039            expected
1040        );
1041    }
1042
1043    #[tokio::test]
1044    #[traced_test]
1045    async fn test_read_after_serialization_error_causes_jump_to_modify_before_completion() {
1046        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1047        interceptor_error_redirection_test!(
1048            read_after_serialization,
1049            &BeforeTransmitInterceptorContextRef<'_>,
1050            modify_before_completion,
1051            &mut FinalizerInterceptorContextMut<'_>,
1052            expected
1053        );
1054    }
1055
1056    #[tokio::test]
1057    #[traced_test]
1058    async fn test_modify_before_retry_loop_error_causes_jump_to_modify_before_completion() {
1059        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1060        interceptor_error_redirection_test!(
1061            modify_before_retry_loop,
1062            &mut BeforeTransmitInterceptorContextMut<'_>,
1063            modify_before_completion,
1064            &mut FinalizerInterceptorContextMut<'_>,
1065            expected
1066        );
1067    }
1068
1069    #[tokio::test]
1070    #[traced_test]
1071    async fn test_read_before_attempt_error_causes_jump_to_modify_before_attempt_completion() {
1072        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1073        interceptor_error_redirection_test!(
1074            read_before_attempt,
1075            &BeforeTransmitInterceptorContextRef<'_>,
1076            modify_before_attempt_completion,
1077            &mut FinalizerInterceptorContextMut<'_>,
1078            expected
1079        );
1080    }
1081
1082    #[tokio::test]
1083    #[traced_test]
1084    async fn test_modify_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1085        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1086        interceptor_error_redirection_test!(
1087            modify_before_signing,
1088            &mut BeforeTransmitInterceptorContextMut<'_>,
1089            modify_before_attempt_completion,
1090            &mut FinalizerInterceptorContextMut<'_>,
1091            expected
1092        );
1093    }
1094
1095    #[tokio::test]
1096    #[traced_test]
1097    async fn test_read_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1098        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1099        interceptor_error_redirection_test!(
1100            read_before_signing,
1101            &BeforeTransmitInterceptorContextRef<'_>,
1102            modify_before_attempt_completion,
1103            &mut FinalizerInterceptorContextMut<'_>,
1104            expected
1105        );
1106    }
1107
1108    #[tokio::test]
1109    #[traced_test]
1110    async fn test_read_after_signing_error_causes_jump_to_modify_before_attempt_completion() {
1111        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1112        interceptor_error_redirection_test!(
1113            read_after_signing,
1114            &BeforeTransmitInterceptorContextRef<'_>,
1115            modify_before_attempt_completion,
1116            &mut FinalizerInterceptorContextMut<'_>,
1117            expected
1118        );
1119    }
1120
1121    #[tokio::test]
1122    #[traced_test]
1123    async fn test_modify_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1124        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1125        interceptor_error_redirection_test!(
1126            modify_before_transmit,
1127            &mut BeforeTransmitInterceptorContextMut<'_>,
1128            modify_before_attempt_completion,
1129            &mut FinalizerInterceptorContextMut<'_>,
1130            expected
1131        );
1132    }
1133
1134    #[tokio::test]
1135    #[traced_test]
1136    async fn test_read_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1137        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1138        interceptor_error_redirection_test!(
1139            read_before_transmit,
1140            &BeforeTransmitInterceptorContextRef<'_>,
1141            modify_before_attempt_completion,
1142            &mut FinalizerInterceptorContextMut<'_>,
1143            expected
1144        );
1145    }
1146
1147    #[tokio::test]
1148    #[traced_test]
1149    async fn test_read_after_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1150        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1151        interceptor_error_redirection_test!(
1152            read_after_transmit,
1153            &BeforeDeserializationInterceptorContextRef<'_>,
1154            modify_before_attempt_completion,
1155            &mut FinalizerInterceptorContextMut<'_>,
1156            expected
1157        );
1158    }
1159
1160    #[tokio::test]
1161    #[traced_test]
1162    async fn test_modify_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1163    ) {
1164        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1165        interceptor_error_redirection_test!(
1166            modify_before_deserialization,
1167            &mut BeforeDeserializationInterceptorContextMut<'_>,
1168            modify_before_attempt_completion,
1169            &mut FinalizerInterceptorContextMut<'_>,
1170            expected
1171        );
1172    }
1173
1174    #[tokio::test]
1175    #[traced_test]
1176    async fn test_read_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1177    ) {
1178        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1179        interceptor_error_redirection_test!(
1180            read_before_deserialization,
1181            &BeforeDeserializationInterceptorContextRef<'_>,
1182            modify_before_attempt_completion,
1183            &mut FinalizerInterceptorContextMut<'_>,
1184            expected
1185        );
1186    }
1187
1188    #[tokio::test]
1189    #[traced_test]
1190    async fn test_read_after_deserialization_error_causes_jump_to_modify_before_attempt_completion()
1191    {
1192        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1193        interceptor_error_redirection_test!(
1194            read_after_deserialization,
1195            &AfterDeserializationInterceptorContextRef<'_>,
1196            modify_before_attempt_completion,
1197            &mut FinalizerInterceptorContextMut<'_>,
1198            expected
1199        );
1200    }
1201
1202    #[tokio::test]
1203    #[traced_test]
1204    async fn test_modify_before_attempt_completion_error_causes_jump_to_read_after_attempt() {
1205        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("DestinationInterceptor")"#;
1206        interceptor_error_redirection_test!(
1207            modify_before_attempt_completion,
1208            &mut FinalizerInterceptorContextMut<'_>,
1209            read_after_attempt,
1210            &FinalizerInterceptorContextRef<'_>,
1211            expected
1212        );
1213    }
1214
1215    #[tokio::test]
1216    #[traced_test]
1217    async fn test_modify_before_completion_error_causes_jump_to_read_after_execution() {
1218        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("DestinationInterceptor")"#;
1219        interceptor_error_redirection_test!(
1220            modify_before_completion,
1221            &mut FinalizerInterceptorContextMut<'_>,
1222            read_after_execution,
1223            &FinalizerInterceptorContextRef<'_>,
1224            expected
1225        );
1226    }
1227
1228    #[tokio::test]
1229    async fn test_stop_points() {
1230        let runtime_plugins = || {
1231            RuntimePlugins::new()
1232                .with_operation_plugin(TestOperationRuntimePlugin::new())
1233                .with_operation_plugin(NoAuthRuntimePlugin::new())
1234        };
1235
1236        // StopPoint::None should result in a response getting set since orchestration doesn't stop
1237        let context = invoke_with_stop_point(
1238            "test",
1239            "test",
1240            Input::doesnt_matter(),
1241            &runtime_plugins(),
1242            StopPoint::None,
1243        )
1244        .await
1245        .expect("success");
1246        assert!(context.response().is_some());
1247
1248        // StopPoint::BeforeTransmit will exit right before sending the request, so there should be no response
1249        let context = invoke_with_stop_point(
1250            "test",
1251            "test",
1252            Input::doesnt_matter(),
1253            &runtime_plugins(),
1254            StopPoint::BeforeTransmit,
1255        )
1256        .await
1257        .expect("success");
1258        assert!(context.response().is_none());
1259    }
1260
1261    /// The "finally" interceptors should run upon error when the StopPoint is set to BeforeTransmit
1262    #[tokio::test]
1263    async fn test_stop_points_error_handling() {
1264        #[derive(Debug, Default)]
1265        struct Inner {
1266            modify_before_retry_loop_called: AtomicBool,
1267            modify_before_completion_called: AtomicBool,
1268            read_after_execution_called: AtomicBool,
1269        }
1270        #[derive(Clone, Debug, Default)]
1271        struct TestInterceptor {
1272            inner: Arc<Inner>,
1273        }
1274
1275        impl Intercept for TestInterceptor {
1276            fn name(&self) -> &'static str {
1277                "TestInterceptor"
1278            }
1279
1280            fn modify_before_retry_loop(
1281                &self,
1282                _context: &mut BeforeTransmitInterceptorContextMut<'_>,
1283                _rc: &RuntimeComponents,
1284                _cfg: &mut ConfigBag,
1285            ) -> Result<(), BoxError> {
1286                self.inner
1287                    .modify_before_retry_loop_called
1288                    .store(true, Ordering::Relaxed);
1289                Err("test error".into())
1290            }
1291
1292            fn modify_before_completion(
1293                &self,
1294                _context: &mut FinalizerInterceptorContextMut<'_>,
1295                _rc: &RuntimeComponents,
1296                _cfg: &mut ConfigBag,
1297            ) -> Result<(), BoxError> {
1298                self.inner
1299                    .modify_before_completion_called
1300                    .store(true, Ordering::Relaxed);
1301                Ok(())
1302            }
1303
1304            fn read_after_execution(
1305                &self,
1306                _context: &FinalizerInterceptorContextRef<'_>,
1307                _rc: &RuntimeComponents,
1308                _cfg: &mut ConfigBag,
1309            ) -> Result<(), BoxError> {
1310                self.inner
1311                    .read_after_execution_called
1312                    .store(true, Ordering::Relaxed);
1313                Ok(())
1314            }
1315        }
1316
1317        #[derive(Debug)]
1318        struct TestInterceptorRuntimePlugin {
1319            builder: RuntimeComponentsBuilder,
1320        }
1321
1322        impl RuntimePlugin for TestInterceptorRuntimePlugin {
1323            fn runtime_components(
1324                &self,
1325                _: &RuntimeComponentsBuilder,
1326            ) -> Cow<'_, RuntimeComponentsBuilder> {
1327                Cow::Borrowed(&self.builder)
1328            }
1329        }
1330
1331        let interceptor = TestInterceptor::default();
1332        let client = NeverClient::new();
1333        let runtime_plugins = || {
1334            RuntimePlugins::new()
1335                .with_operation_plugin(TestOperationRuntimePlugin::new())
1336                .with_operation_plugin(NoAuthRuntimePlugin::new())
1337                .with_operation_plugin(TestInterceptorRuntimePlugin {
1338                    builder: RuntimeComponentsBuilder::new("test")
1339                        .with_interceptor(SharedInterceptor::new(interceptor.clone()))
1340                        .with_http_client(Some(client.clone())),
1341                })
1342        };
1343
1344        // StopPoint::BeforeTransmit will exit right before sending the request, so there should be no response
1345        let _err = invoke_with_stop_point(
1346            "test",
1347            "test",
1348            Input::doesnt_matter(),
1349            &runtime_plugins(),
1350            StopPoint::BeforeTransmit,
1351        )
1352        .await
1353        .expect_err("an error was returned");
1354        assert_eq!(client.num_calls(), 0);
1355
1356        assert!(interceptor
1357            .inner
1358            .modify_before_retry_loop_called
1359            .load(Ordering::Relaxed));
1360        assert!(interceptor
1361            .inner
1362            .modify_before_completion_called
1363            .load(Ordering::Relaxed));
1364        assert!(interceptor
1365            .inner
1366            .read_after_execution_called
1367            .load(Ordering::Relaxed));
1368    }
1369}