aws_smithy_runtime/client/
orchestrator.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use crate::client::interceptors::Interceptors;
7use crate::client::orchestrator::http::{log_response_body, read_body};
8use crate::client::retries::LongPollingBackoff;
9use crate::client::timeout::{MaybeTimeout, MaybeTimeoutConfig, TimeoutKind};
10use crate::client::{
11    http::body::minimum_throughput::MaybeUploadThroughputCheckFuture,
12    orchestrator::endpoints::orchestrate_endpoint,
13};
14use auth::{resolve_identity, sign_request};
15use aws_smithy_async::rt::sleep::AsyncSleep;
16use aws_smithy_runtime_api::box_error::BoxError;
17use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnector, HttpConnectorSettings};
18use aws_smithy_runtime_api::client::interceptors::context::{
19    Error, Input, InterceptorContext, Output, RewindResult,
20};
21use aws_smithy_runtime_api::client::orchestrator::{
22    HttpResponse, LoadedRequestBody, OrchestratorError,
23};
24use aws_smithy_runtime_api::client::result::SdkError;
25use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
26use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
27use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
28use aws_smithy_runtime_api::client::ser_de::{
29    DeserializeResponse, SerializeRequest, SharedRequestSerializer, SharedResponseDeserializer,
30};
31use aws_smithy_types::body::SdkBody;
32use aws_smithy_types::byte_stream::ByteStream;
33use aws_smithy_types::config_bag::ConfigBag;
34use aws_smithy_types::retry::{MergeRetryConfig, RetryConfig};
35use aws_smithy_types::timeout::{MergeTimeoutConfig, TimeoutConfig};
36use endpoints::apply_endpoint;
37use std::mem;
38use tracing::{debug, debug_span, instrument, trace, Instrument};
39
40mod auth;
41pub use auth::AuthSchemeAndEndpointOrchestrationV2;
42
43/// Defines types that implement a trait for endpoint resolution
44pub mod endpoints;
45
46/// Defines types that work with HTTP types
47mod http;
48
49/// Utility for making one-off unmodeled requests with the orchestrator.
50pub mod operation;
51
52macro_rules! halt {
53    ([$ctx:ident] => $err:expr) => {{
54        debug!("encountered orchestrator error; halting");
55        $ctx.fail($err.into());
56        return;
57    }};
58}
59
60macro_rules! halt_on_err {
61    ([$ctx:ident] => $expr:expr) => {
62        match $expr {
63            Ok(ok) => ok,
64            Err(err) => halt!([$ctx] => err),
65        }
66    };
67}
68
69macro_rules! continue_on_err {
70    ([$ctx:ident] => $expr:expr) => {
71        if let Err(err) = $expr {
72            debug!(err = ?err, "encountered orchestrator error; continuing");
73            $ctx.fail(err.into());
74        }
75    };
76}
77
78macro_rules! run_interceptors {
79    (continue_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
80        $(run_interceptors!(continue_on_err: $interceptor($ctx, $rc, $cfg));)+
81    };
82    (continue_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
83        continue_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
84    };
85    (halt_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
86        $(run_interceptors!(halt_on_err: $interceptor($ctx, $rc, $cfg));)+
87    };
88    (halt_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
89        halt_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
90    };
91    (__private $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
92        Interceptors::new($rc.interceptors()).$interceptor($ctx, $rc, $cfg)
93    };
94}
95
96/// Orchestrates the execution of a request and handling of a response.
97///
98/// The given `runtime_plugins` will be used to generate a `ConfigBag` for this request,
99/// and then the given `input` will be serialized and transmitted. When a response is
100/// received, it will be deserialized and returned.
101///
102/// This orchestration handles retries, endpoint resolution, identity resolution, and signing.
103/// Each of these are configurable via the config and runtime components given by the runtime
104/// plugins.
105pub async fn invoke(
106    service_name: &str,
107    operation_name: &str,
108    input: Input,
109    runtime_plugins: &RuntimePlugins,
110) -> Result<Output, SdkError<Error, HttpResponse>> {
111    invoke_with_stop_point(
112        service_name,
113        operation_name,
114        input,
115        runtime_plugins,
116        StopPoint::None,
117    )
118    .await?
119    .finalize()
120}
121
122/// Allows for returning early at different points during orchestration.
123#[non_exhaustive]
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub enum StopPoint {
126    /// Don't stop orchestration early
127    None,
128
129    /// Stop the orchestrator before transmitting the request
130    BeforeTransmit,
131}
132
133/// Same as [`invoke`], but allows for returning early at different points during orchestration.
134///
135/// Orchestration will cease at the point specified by `stop_point`. This is useful for orchestrations
136/// that don't need to actually transmit requests, such as for generating presigned requests.
137///
138/// See the docs on [`invoke`] for more details.
139pub async fn invoke_with_stop_point(
140    // NOTE: service_name and operation_name were at one point used for instrumentation that is now
141    // handled as part of codegen. Manually constructed operations (e.g. via Operation::builder())
142    // are handled as part of Operation::invoke
143    _service_name: &str,
144    _operation_name: &str,
145    input: Input,
146    runtime_plugins: &RuntimePlugins,
147    stop_point: StopPoint,
148) -> Result<InterceptorContext, SdkError<Error, HttpResponse>> {
149    async move {
150        let mut cfg = ConfigBag::base();
151        let cfg = &mut cfg;
152
153        let mut ctx = InterceptorContext::new(input);
154
155        let runtime_components = apply_configuration(&mut ctx, cfg, runtime_plugins)
156            .map_err(SdkError::construction_failure)?;
157        trace!(runtime_components = ?runtime_components);
158
159        let operation_timeout_config =
160            MaybeTimeoutConfig::new(&runtime_components, cfg, TimeoutKind::Operation);
161        trace!(operation_timeout_config = ?operation_timeout_config);
162        async {
163            // If running the pre-execution interceptors failed, then we skip running the op and run the
164            // final interceptors instead.
165            if !ctx.is_failed() {
166                try_op(&mut ctx, cfg, &runtime_components, stop_point).await;
167            }
168            finally_op(&mut ctx, cfg, &runtime_components).await;
169            if ctx.is_failed() {
170                Err(ctx.finalize().expect_err("it is failed"))
171            } else {
172                Ok(ctx)
173            }
174        }
175        .maybe_timeout(operation_timeout_config)
176        .await
177    }
178    .await
179}
180
181/// Apply configuration is responsible for apply runtime plugins to the config bag, as well as running
182/// `read_before_execution` interceptors. If a failure occurs due to config construction, `invoke`
183/// will raise it to the user. If an interceptor fails, then `invoke`
184#[instrument(skip_all, level = "debug")]
185fn apply_configuration(
186    ctx: &mut InterceptorContext,
187    cfg: &mut ConfigBag,
188    runtime_plugins: &RuntimePlugins,
189) -> Result<RuntimeComponents, BoxError> {
190    let client_rc_builder = runtime_plugins.apply_client_configuration(cfg)?;
191    continue_on_err!([ctx] => Interceptors::new(client_rc_builder.interceptors()).read_before_execution(false, ctx, cfg));
192
193    let operation_rc_builder = runtime_plugins.apply_operation_configuration(cfg)?;
194    continue_on_err!([ctx] => Interceptors::new(operation_rc_builder.interceptors()).read_before_execution(true, ctx, cfg));
195
196    // The order below is important. Client interceptors must run before operation interceptors.
197    let components = client_rc_builder
198        .merge_from(&operation_rc_builder)
199        .build()?;
200
201    // In an ideal world, we'd simply update `cfg.load` to behave this way. Unfortunately, we can't
202    // do that without a breaking change. By overwriting the value in the config bag with a merged
203    // version, we can achieve a very similar behavior. `MergeTimeoutConfig`
204    let resolved_timeout_config = cfg.load::<MergeTimeoutConfig>();
205    debug!(
206        "timeout settings for this operation: {:?}",
207        resolved_timeout_config
208    );
209    cfg.interceptor_state().store_put(resolved_timeout_config);
210
211    let resolved_retry_config = cfg.load::<MergeRetryConfig>();
212    debug!(
213        "retry settings for this operation: {:?}",
214        resolved_retry_config
215    );
216    cfg.interceptor_state().store_put(resolved_retry_config);
217
218    components.validate_final_config(cfg)?;
219    Ok(components)
220}
221
222#[instrument(skip_all, level = "debug")]
223async fn try_op(
224    ctx: &mut InterceptorContext,
225    cfg: &mut ConfigBag,
226    runtime_components: &RuntimeComponents,
227    stop_point: StopPoint,
228) {
229    // Before serialization
230    run_interceptors!(halt_on_err: {
231        modify_before_serialization(ctx, runtime_components, cfg);
232        read_before_serialization(ctx, runtime_components, cfg);
233    });
234
235    // Serialization
236    ctx.enter_serialization_phase();
237    {
238        let _span = debug_span!("serialization").entered();
239        let request_serializer = cfg
240            .load::<SharedRequestSerializer>()
241            .expect("request serializer must be in the config bag")
242            .clone();
243        let input = ctx.take_input().expect("input set at this point");
244        let request = halt_on_err!([ctx] => request_serializer.serialize_input(input, cfg).map_err(OrchestratorError::other));
245        ctx.set_request(request);
246    }
247
248    // Load the request body into memory if configured to do so
249    if let Some(&LoadedRequestBody::Requested) = cfg.load::<LoadedRequestBody>() {
250        debug!("loading request body into memory");
251        let mut body = SdkBody::taken();
252        mem::swap(&mut body, ctx.request_mut().expect("set above").body_mut());
253        let loaded_body = halt_on_err!([ctx] =>
254            ByteStream::new(body).collect().await.map_err(OrchestratorError::other)
255        )
256        .into_bytes();
257        *ctx.request_mut().as_mut().expect("set above").body_mut() =
258            SdkBody::from(loaded_body.clone());
259        cfg.interceptor_state()
260            .store_put(LoadedRequestBody::Loaded(loaded_body));
261    }
262
263    // Before transmit
264    ctx.enter_before_transmit_phase();
265    run_interceptors!(halt_on_err: {
266        read_after_serialization(ctx, runtime_components, cfg);
267        modify_before_retry_loop(ctx, runtime_components, cfg);
268    });
269
270    // Loop to acquire a send token from the adaptive rate limiter.
271    // When capacity is insufficient, the strategy returns YesAfterDelay.
272    // After sleeping, we re-check because other tasks may have consumed
273    // tokens during our sleep.
274    let retry_strategy = runtime_components.retry_strategy();
275    loop {
276        let should_attempt = retry_strategy.should_attempt_initial_request(runtime_components, cfg);
277        match should_attempt {
278            Ok(ShouldAttempt::Yes) => {
279                debug!("retry strategy has OKed initial request");
280                break;
281            }
282            Ok(ShouldAttempt::No) => {
283                let err: BoxError = "the retry strategy indicates that an initial request shouldn't be made, but it didn't specify why".into();
284                halt!([ctx] => OrchestratorError::other(err));
285            }
286            Err(err) => halt!([ctx] => OrchestratorError::other(err)),
287            Ok(ShouldAttempt::YesAfterDelay(delay)) => {
288                let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
289                    "the retry strategy requested a delay before sending the initial request, but no 'async sleep' implementation was set"
290                )));
291                debug!("retry strategy has OKed initial request after a {delay:?} delay");
292                sleep_impl.sleep(delay).await;
293                continue;
294            }
295        }
296    }
297
298    // Save a request checkpoint before we make the request. This will allow us to "rewind"
299    // the request in the case of retry attempts.
300    ctx.save_checkpoint();
301    // For long-polling operations, seed a shared slot so the retry strategy can
302    // communicate a backoff delay when the token bucket is empty.
303    if cfg
304        .load::<RetryConfig>()
305        .and_then(|rc| rc.retry_spec())
306        .is_some_and(|s| s.long_polling())
307    {
308        cfg.interceptor_state()
309            .store_put(LongPollingBackoff::default());
310    }
311    let mut retry_delay = None;
312    for i in 1u32.. {
313        // Break from the loop if we can't rewind the request's state. This will always succeed the
314        // first time, but will fail on subsequent iterations if the request body wasn't retryable.
315        trace!("checking if context can be rewound for attempt #{i}");
316        if let RewindResult::Impossible = ctx.rewind(cfg) {
317            debug!("request cannot be retried since the request body cannot be cloned");
318            break;
319        }
320        // Track which attempt we're currently on.
321        cfg.interceptor_state()
322            .store_put::<RequestAttempts>(i.into());
323        // Backoff time should not be included in the attempt timeout
324        if let Some((delay, sleep)) = retry_delay.take() {
325            debug!("delaying for {delay:?}");
326            sleep.await;
327        }
328        let attempt_timeout_config =
329            MaybeTimeoutConfig::new(runtime_components, cfg, TimeoutKind::OperationAttempt);
330        trace!(attempt_timeout_config = ?attempt_timeout_config);
331        let maybe_timeout = async {
332            debug!("beginning attempt #{i}");
333            try_attempt(ctx, cfg, runtime_components, stop_point)
334                .instrument(debug_span!("try_attempt", "attempt" = i))
335                .await;
336            finally_attempt(ctx, cfg, runtime_components)
337                .instrument(debug_span!("finally_attempt", "attempt" = i))
338                .await;
339            Result::<_, SdkError<Error, HttpResponse>>::Ok(())
340        }
341        .maybe_timeout(attempt_timeout_config)
342        .await
343        .map_err(|err| OrchestratorError::timeout(err.into_source().unwrap()));
344
345        // We continue when encountering a timeout error. The retry classifier will decide what to do with it.
346        continue_on_err!([ctx] => maybe_timeout);
347
348        // If we got a retry strategy from the bag, ask it what to do.
349        // If no strategy was set, we won't retry.
350        let should_attempt = halt_on_err!([ctx] => runtime_components
351            .retry_strategy()
352            .should_attempt_retry(ctx, runtime_components, cfg)
353            .map_err(OrchestratorError::other));
354        match should_attempt {
355            // Yes, let's retry the request
356            ShouldAttempt::Yes => continue,
357            // No, this request shouldn't be retried
358            ShouldAttempt::No => {
359                debug!("a retry is either unnecessary or not possible, exiting attempt loop");
360                if let Some(delay) = cfg.load::<LongPollingBackoff>().and_then(|h| h.take()) {
361                    if let Some(sleep_impl) = runtime_components.sleep_impl() {
362                        // This sleep is inside the operation timeout, so it gets cancelled
363                        // if one is set. Without an operation timeout, the delay is bounded
364                        // by `max_backoff` (enforced upstream in `calculate_backoff`).
365                        debug!("backing off {delay:?} before returning (no retry quota available)");
366                        sleep_impl.sleep(delay).await;
367                    }
368                }
369                break;
370            }
371            ShouldAttempt::YesAfterDelay(delay) => {
372                let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
373                    "the retry strategy requested a delay before sending the retry request, but no 'async sleep' implementation was set"
374                )));
375                retry_delay = Some((delay, sleep_impl.sleep(delay)));
376                continue;
377            }
378        }
379    }
380}
381
382async fn try_attempt(
383    ctx: &mut InterceptorContext,
384    cfg: &mut ConfigBag,
385    runtime_components: &RuntimeComponents,
386    stop_point: StopPoint,
387) {
388    run_interceptors!(halt_on_err: read_before_attempt(ctx, runtime_components, cfg));
389
390    let (scheme_id, identity, endpoint) = halt_on_err!([ctx] => resolve_identity(runtime_components, cfg).await.map_err(OrchestratorError::other));
391
392    match endpoint {
393        Some(endpoint) => {
394            // This branch is for backward compatibility when `AuthSchemeAndEndpointOrchestrationV2` is not present in the config bag.
395            // `resolve_identity` internally resolved an endpoint to determine the most suitable scheme ID, and returned that endpoint.
396            halt_on_err!([ctx] => apply_endpoint(&endpoint, ctx, cfg).map_err(OrchestratorError::other));
397            // Make the endpoint config available to interceptors
398            cfg.interceptor_state().store_put(endpoint);
399        }
400        None => {
401            halt_on_err!([ctx] => orchestrate_endpoint(identity.clone(), ctx, runtime_components, cfg)
402				    .instrument(debug_span!("orchestrate_endpoint"))
403				    .await
404				    .map_err(OrchestratorError::other));
405        }
406    }
407
408    run_interceptors!(halt_on_err: {
409        modify_before_signing(ctx, runtime_components, cfg);
410        read_before_signing(ctx, runtime_components, cfg);
411    });
412
413    halt_on_err!([ctx] => sign_request(&scheme_id, &identity, ctx, runtime_components, cfg).map_err(OrchestratorError::other));
414
415    run_interceptors!(halt_on_err: {
416        read_after_signing(ctx, runtime_components, cfg);
417        modify_before_transmit(ctx, runtime_components, cfg);
418        read_before_transmit(ctx, runtime_components, cfg);
419    });
420
421    // Return early if a stop point is set for before transmit
422    if let StopPoint::BeforeTransmit = stop_point {
423        debug!("ending orchestration early because the stop point is `BeforeTransmit`");
424        return;
425    }
426
427    // The connection consumes the request but we need to keep a copy of it
428    // within the interceptor context, so we clone it here.
429    ctx.enter_transmit_phase();
430    let response = halt_on_err!([ctx] => {
431        let request = ctx.take_request().expect("set during serialization");
432        trace!(request = ?request, "transmitting request");
433        let http_client = halt_on_err!([ctx] => runtime_components.http_client().ok_or_else(||
434            OrchestratorError::other("No HTTP client was available to send this request. \
435                Enable the `default-https-client` crate feature or configure an HTTP client to fix this.")
436        ));
437        let timeout_config = cfg.load::<TimeoutConfig>().expect("timeout config must be set");
438        let settings = {
439            let mut builder = HttpConnectorSettings::builder();
440            builder.set_connect_timeout(timeout_config.connect_timeout());
441            builder.set_read_timeout(timeout_config.read_timeout());
442            builder.build()
443        };
444        let connector = http_client.http_connector(&settings, runtime_components);
445        let response_future = MaybeUploadThroughputCheckFuture::new(
446            cfg,
447            runtime_components,
448            connector.call(request),
449        );
450        response_future.await.map_err(OrchestratorError::connector)
451    });
452    trace!(response = ?response, "received response from service");
453    ctx.set_response(response);
454    ctx.enter_before_deserialization_phase();
455
456    run_interceptors!(halt_on_err: {
457        read_after_transmit(ctx, runtime_components, cfg);
458        modify_before_deserialization(ctx, runtime_components, cfg);
459        read_before_deserialization(ctx, runtime_components, cfg);
460    });
461
462    ctx.enter_deserialization_phase();
463    let output_or_error = async {
464        let response = ctx.response_mut().expect("set during transmit");
465        let response_deserializer = cfg
466            .load::<SharedResponseDeserializer>()
467            .expect("a request deserializer must be in the config bag");
468        let maybe_deserialized = {
469            let _span = debug_span!("deserialize_streaming").entered();
470            response_deserializer.deserialize_streaming_with_config(response, cfg)
471        };
472        match maybe_deserialized {
473            Some(output_or_error) => output_or_error,
474            None => read_body(response)
475                .instrument(debug_span!("read_body"))
476                .await
477                .map_err(OrchestratorError::response)
478                .and_then(|_| {
479                    let _span = debug_span!("deserialize_nonstreaming").entered();
480                    log_response_body(response, cfg);
481                    response_deserializer.deserialize_nonstreaming_with_config(response, cfg)
482                }),
483        }
484    }
485    .instrument(debug_span!("deserialization"))
486    .await;
487    trace!(output_or_error = ?output_or_error);
488    ctx.set_output_or_error(output_or_error);
489
490    ctx.enter_after_deserialization_phase();
491    run_interceptors!(halt_on_err: read_after_deserialization(ctx, runtime_components, cfg));
492}
493
494async fn finally_attempt(
495    ctx: &mut InterceptorContext,
496    cfg: &mut ConfigBag,
497    runtime_components: &RuntimeComponents,
498) {
499    run_interceptors!(continue_on_err: {
500        modify_before_attempt_completion(ctx, runtime_components, cfg);
501        read_after_attempt(ctx, runtime_components, cfg);
502    });
503}
504
505#[instrument(skip_all, level = "debug")]
506async fn finally_op(
507    ctx: &mut InterceptorContext,
508    cfg: &mut ConfigBag,
509    runtime_components: &RuntimeComponents,
510) {
511    run_interceptors!(continue_on_err: {
512        modify_before_completion(ctx, runtime_components, cfg);
513        read_after_execution(ctx, runtime_components, cfg);
514    });
515}
516
517#[cfg(all(test, any(feature = "test-util", feature = "legacy-test-util")))]
518mod tests {
519    use crate::client::auth::no_auth::{NoAuthRuntimePluginV2, NO_AUTH_SCHEME_ID};
520    use crate::client::orchestrator::endpoints::StaticUriEndpointResolver;
521    use crate::client::orchestrator::{invoke, invoke_with_stop_point, StopPoint};
522    use crate::client::retries::strategy::NeverRetryStrategy;
523    use crate::client::test_util::{
524        deserializer::CannedResponseDeserializer, serializer::CannedRequestSerializer,
525    };
526    use aws_smithy_http_client::test_util::NeverClient;
527    use aws_smithy_runtime_api::box_error::BoxError;
528    use aws_smithy_runtime_api::client::auth::static_resolver::StaticAuthSchemeOptionResolver;
529    use aws_smithy_runtime_api::client::auth::{
530        AuthSchemeOptionResolverParams, SharedAuthSchemeOptionResolver,
531    };
532    use aws_smithy_runtime_api::client::endpoint::{
533        EndpointResolverParams, SharedEndpointResolver,
534    };
535    use aws_smithy_runtime_api::client::http::{
536        http_client_fn, HttpConnector, HttpConnectorFuture,
537    };
538    use aws_smithy_runtime_api::client::interceptors::context::{
539        AfterDeserializationInterceptorContextRef, BeforeDeserializationInterceptorContextMut,
540        BeforeDeserializationInterceptorContextRef, BeforeSerializationInterceptorContextMut,
541        BeforeSerializationInterceptorContextRef, BeforeTransmitInterceptorContextMut,
542        BeforeTransmitInterceptorContextRef, FinalizerInterceptorContextMut,
543        FinalizerInterceptorContextRef, Input, Output,
544    };
545    use aws_smithy_runtime_api::client::interceptors::{Intercept, SharedInterceptor};
546    use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, OrchestratorError};
547    use aws_smithy_runtime_api::client::retries::SharedRetryStrategy;
548    use aws_smithy_runtime_api::client::runtime_components::{
549        RuntimeComponents, RuntimeComponentsBuilder,
550    };
551    use aws_smithy_runtime_api::client::runtime_plugin::{RuntimePlugin, RuntimePlugins};
552    use aws_smithy_runtime_api::client::ser_de::{
553        SharedRequestSerializer, SharedResponseDeserializer,
554    };
555    use aws_smithy_runtime_api::shared::IntoShared;
556    use aws_smithy_types::body::SdkBody;
557    use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
558    use aws_smithy_types::timeout::TimeoutConfig;
559    use http_1x::{Response, StatusCode};
560    use std::borrow::Cow;
561    use std::sync::atomic::{AtomicBool, Ordering};
562    use std::sync::Arc;
563    use tracing_test::traced_test;
564
565    fn new_request_serializer() -> CannedRequestSerializer {
566        CannedRequestSerializer::success(HttpRequest::empty())
567    }
568
569    fn new_response_deserializer() -> CannedResponseDeserializer {
570        CannedResponseDeserializer::new(
571            Response::builder()
572                .status(StatusCode::OK)
573                .body(SdkBody::empty())
574                .map_err(|err| OrchestratorError::other(Box::new(err)))
575                .map(Output::erase),
576        )
577    }
578
579    #[derive(Debug, Default)]
580    struct OkConnector {}
581
582    impl OkConnector {
583        fn new() -> Self {
584            Self::default()
585        }
586    }
587
588    impl HttpConnector for OkConnector {
589        fn call(&self, _request: HttpRequest) -> HttpConnectorFuture {
590            HttpConnectorFuture::ready(Ok(http_1x::Response::builder()
591                .status(200)
592                .body(SdkBody::empty())
593                .expect("OK response is valid")
594                .try_into()
595                .unwrap()))
596        }
597    }
598
599    #[derive(Debug)]
600    struct TestOperationRuntimePlugin {
601        builder: RuntimeComponentsBuilder,
602    }
603
604    impl TestOperationRuntimePlugin {
605        fn new() -> Self {
606            Self {
607                builder: RuntimeComponentsBuilder::for_tests()
608                    .with_retry_strategy(Some(SharedRetryStrategy::new(NeverRetryStrategy::new())))
609                    .with_endpoint_resolver(Some(SharedEndpointResolver::new(
610                        StaticUriEndpointResolver::http_localhost(8080),
611                    )))
612                    .with_http_client(Some(http_client_fn(|_, _| {
613                        OkConnector::new().into_shared()
614                    })))
615                    .with_auth_scheme_option_resolver(Some(SharedAuthSchemeOptionResolver::new(
616                        StaticAuthSchemeOptionResolver::new(vec![NO_AUTH_SCHEME_ID]),
617                    ))),
618            }
619        }
620    }
621
622    impl RuntimePlugin for TestOperationRuntimePlugin {
623        fn config(&self) -> Option<FrozenLayer> {
624            let mut layer = Layer::new("TestOperationRuntimePlugin");
625            layer.store_put(AuthSchemeOptionResolverParams::new("idontcare"));
626            layer.store_put(EndpointResolverParams::new("dontcare"));
627            layer.store_put(SharedRequestSerializer::new(new_request_serializer()));
628            layer.store_put(SharedResponseDeserializer::new(new_response_deserializer()));
629            layer.store_put(TimeoutConfig::builder().build());
630            Some(layer.freeze())
631        }
632
633        fn runtime_components(
634            &self,
635            _: &RuntimeComponentsBuilder,
636        ) -> Cow<'_, RuntimeComponentsBuilder> {
637            Cow::Borrowed(&self.builder)
638        }
639    }
640
641    macro_rules! interceptor_error_handling_test {
642        (read_before_execution, $ctx:ty, $expected:expr,) => {
643            interceptor_error_handling_test!(__private read_before_execution, $ctx, $expected,);
644        };
645        ($interceptor:ident, $ctx:ty, $expected:expr) => {
646            interceptor_error_handling_test!(__private $interceptor, $ctx, $expected, _rc: &RuntimeComponents,);
647        };
648        (__private $interceptor:ident, $ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
649            #[derive(Debug)]
650            struct FailingInterceptorA;
651            impl Intercept for FailingInterceptorA {
652                fn name(&self) -> &'static str { "FailingInterceptorA" }
653
654                fn $interceptor(
655                    &self,
656                    _ctx: $ctx,
657                    $($rc_arg)*
658                    _cfg: &mut ConfigBag,
659                ) -> Result<(), BoxError> {
660                    tracing::debug!("FailingInterceptorA called!");
661                    Err("FailingInterceptorA".into())
662                }
663            }
664
665            #[derive(Debug)]
666            struct FailingInterceptorB;
667            impl Intercept for FailingInterceptorB {
668                fn name(&self) -> &'static str { "FailingInterceptorB" }
669
670                fn $interceptor(
671                    &self,
672                    _ctx: $ctx,
673                    $($rc_arg)*
674                    _cfg: &mut ConfigBag,
675                ) -> Result<(), BoxError> {
676                    tracing::debug!("FailingInterceptorB called!");
677                    Err("FailingInterceptorB".into())
678                }
679            }
680
681            #[derive(Debug)]
682            struct FailingInterceptorC;
683            impl Intercept for FailingInterceptorC {
684                fn name(&self) -> &'static str { "FailingInterceptorC" }
685
686                fn $interceptor(
687                    &self,
688                    _ctx: $ctx,
689                    $($rc_arg)*
690                    _cfg: &mut ConfigBag,
691                ) -> Result<(), BoxError> {
692                    tracing::debug!("FailingInterceptorC called!");
693                    Err("FailingInterceptorC".into())
694                }
695            }
696
697            #[derive(Debug)]
698            struct FailingInterceptorsClientRuntimePlugin(RuntimeComponentsBuilder);
699            impl FailingInterceptorsClientRuntimePlugin {
700                fn new() -> Self {
701                    Self(RuntimeComponentsBuilder::new("test").with_interceptor(SharedInterceptor::new(FailingInterceptorA)))
702                }
703            }
704            impl RuntimePlugin for FailingInterceptorsClientRuntimePlugin {
705                fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
706                    Cow::Borrowed(&self.0)
707                }
708            }
709
710            #[derive(Debug)]
711            struct FailingInterceptorsOperationRuntimePlugin(RuntimeComponentsBuilder);
712            impl FailingInterceptorsOperationRuntimePlugin {
713                fn new() -> Self {
714                    Self(
715                        RuntimeComponentsBuilder::new("test")
716                            .with_interceptor(SharedInterceptor::new(FailingInterceptorB))
717                            .with_interceptor(SharedInterceptor::new(FailingInterceptorC))
718                    )
719                }
720            }
721            impl RuntimePlugin for FailingInterceptorsOperationRuntimePlugin {
722                fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
723                    Cow::Borrowed(&self.0)
724                }
725            }
726
727            let input = Input::doesnt_matter();
728            let runtime_plugins = RuntimePlugins::new()
729                .with_client_plugin(FailingInterceptorsClientRuntimePlugin::new())
730                .with_operation_plugin(TestOperationRuntimePlugin::new())
731                .with_operation_plugin(NoAuthRuntimePluginV2::new())
732                .with_operation_plugin(FailingInterceptorsOperationRuntimePlugin::new());
733            let actual = invoke("test", "test", input, &runtime_plugins)
734                .await
735                .expect_err("should error");
736            let actual = format!("{:?}", actual);
737            assert!(
738                actual.starts_with(&$expected),
739                "\nActual error:      {actual}\nShould start with: {}\n",
740                $expected
741            );
742
743            assert!(logs_contain("FailingInterceptorA called!"));
744            assert!(logs_contain("FailingInterceptorB called!"));
745            assert!(logs_contain("FailingInterceptorC called!"));
746        };
747    }
748
749    #[tokio::test]
750    #[traced_test]
751    async fn test_read_before_execution_error_handling() {
752        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeExecution, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
753        interceptor_error_handling_test!(
754            read_before_execution,
755            &BeforeSerializationInterceptorContextRef<'_>,
756            expected,
757        );
758    }
759
760    #[tokio::test]
761    #[traced_test]
762    async fn test_modify_before_serialization_error_handling() {
763        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
764        interceptor_error_handling_test!(
765            modify_before_serialization,
766            &mut BeforeSerializationInterceptorContextMut<'_>,
767            expected
768        );
769    }
770
771    #[tokio::test]
772    #[traced_test]
773    async fn test_read_before_serialization_error_handling() {
774        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
775        interceptor_error_handling_test!(
776            read_before_serialization,
777            &BeforeSerializationInterceptorContextRef<'_>,
778            expected
779        );
780    }
781
782    #[tokio::test]
783    #[traced_test]
784    async fn test_read_after_serialization_error_handling() {
785        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSerialization, interceptor_name: Some("FailingInterceptorC")"#.to_string();
786        interceptor_error_handling_test!(
787            read_after_serialization,
788            &BeforeTransmitInterceptorContextRef<'_>,
789            expected
790        );
791    }
792
793    #[tokio::test]
794    #[traced_test]
795    async fn test_modify_before_retry_loop_error_handling() {
796        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeRetryLoop, interceptor_name: Some("FailingInterceptorC")"#.to_string();
797        interceptor_error_handling_test!(
798            modify_before_retry_loop,
799            &mut BeforeTransmitInterceptorContextMut<'_>,
800            expected
801        );
802    }
803
804    #[tokio::test]
805    #[traced_test]
806    async fn test_read_before_attempt_error_handling() {
807        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeAttempt, interceptor_name: Some("FailingInterceptorC")"#;
808        interceptor_error_handling_test!(
809            read_before_attempt,
810            &BeforeTransmitInterceptorContextRef<'_>,
811            expected
812        );
813    }
814
815    #[tokio::test]
816    #[traced_test]
817    async fn test_modify_before_signing_error_handling() {
818        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
819        interceptor_error_handling_test!(
820            modify_before_signing,
821            &mut BeforeTransmitInterceptorContextMut<'_>,
822            expected
823        );
824    }
825
826    #[tokio::test]
827    #[traced_test]
828    async fn test_read_before_signing_error_handling() {
829        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
830        interceptor_error_handling_test!(
831            read_before_signing,
832            &BeforeTransmitInterceptorContextRef<'_>,
833            expected
834        );
835    }
836
837    #[tokio::test]
838    #[traced_test]
839    async fn test_read_after_signing_error_handling() {
840        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSigning, interceptor_name: Some("FailingInterceptorC")"#;
841        interceptor_error_handling_test!(
842            read_after_signing,
843            &BeforeTransmitInterceptorContextRef<'_>,
844            expected
845        );
846    }
847
848    #[tokio::test]
849    #[traced_test]
850    async fn test_modify_before_transmit_error_handling() {
851        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
852        interceptor_error_handling_test!(
853            modify_before_transmit,
854            &mut BeforeTransmitInterceptorContextMut<'_>,
855            expected
856        );
857    }
858
859    #[tokio::test]
860    #[traced_test]
861    async fn test_read_before_transmit_error_handling() {
862        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
863        interceptor_error_handling_test!(
864            read_before_transmit,
865            &BeforeTransmitInterceptorContextRef<'_>,
866            expected
867        );
868    }
869
870    #[tokio::test]
871    #[traced_test]
872    async fn test_read_after_transmit_error_handling() {
873        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterTransmit, interceptor_name: Some("FailingInterceptorC")"#;
874        interceptor_error_handling_test!(
875            read_after_transmit,
876            &BeforeDeserializationInterceptorContextRef<'_>,
877            expected
878        );
879    }
880
881    #[tokio::test]
882    #[traced_test]
883    async fn test_modify_before_deserialization_error_handling() {
884        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
885        interceptor_error_handling_test!(
886            modify_before_deserialization,
887            &mut BeforeDeserializationInterceptorContextMut<'_>,
888            expected
889        );
890    }
891
892    #[tokio::test]
893    #[traced_test]
894    async fn test_read_before_deserialization_error_handling() {
895        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
896        interceptor_error_handling_test!(
897            read_before_deserialization,
898            &BeforeDeserializationInterceptorContextRef<'_>,
899            expected
900        );
901    }
902
903    #[tokio::test]
904    #[traced_test]
905    async fn test_read_after_deserialization_error_handling() {
906        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
907        interceptor_error_handling_test!(
908            read_after_deserialization,
909            &AfterDeserializationInterceptorContextRef<'_>,
910            expected
911        );
912    }
913
914    #[tokio::test]
915    #[traced_test]
916    async fn test_modify_before_attempt_completion_error_handling() {
917        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("FailingInterceptorC")"#;
918        interceptor_error_handling_test!(
919            modify_before_attempt_completion,
920            &mut FinalizerInterceptorContextMut<'_>,
921            expected
922        );
923    }
924
925    #[tokio::test]
926    #[traced_test]
927    async fn test_read_after_attempt_error_handling() {
928        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("FailingInterceptorC")"#;
929        interceptor_error_handling_test!(
930            read_after_attempt,
931            &FinalizerInterceptorContextRef<'_>,
932            expected
933        );
934    }
935
936    #[tokio::test]
937    #[traced_test]
938    async fn test_modify_before_completion_error_handling() {
939        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("FailingInterceptorC")"#;
940        interceptor_error_handling_test!(
941            modify_before_completion,
942            &mut FinalizerInterceptorContextMut<'_>,
943            expected
944        );
945    }
946
947    #[tokio::test]
948    #[traced_test]
949    async fn test_read_after_execution_error_handling() {
950        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("FailingInterceptorC")"#;
951        interceptor_error_handling_test!(
952            read_after_execution,
953            &FinalizerInterceptorContextRef<'_>,
954            expected
955        );
956    }
957
958    macro_rules! interceptor_error_redirection_test {
959        (read_before_execution, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
960            interceptor_error_redirection_test!(__private read_before_execution, $origin_ctx, $destination_interceptor, $destination_ctx, $expected,);
961        };
962        ($origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
963            interceptor_error_redirection_test!(__private $origin_interceptor, $origin_ctx, $destination_interceptor, $destination_ctx, $expected, _rc: &RuntimeComponents,);
964        };
965        (__private $origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
966            #[derive(Debug)]
967            struct OriginInterceptor;
968            impl Intercept for OriginInterceptor {
969                fn name(&self) -> &'static str { "OriginInterceptor" }
970
971                fn $origin_interceptor(
972                    &self,
973                    _ctx: $origin_ctx,
974                    $($rc_arg)*
975                    _cfg: &mut ConfigBag,
976                ) -> Result<(), BoxError> {
977                    tracing::debug!("OriginInterceptor called!");
978                    Err("OriginInterceptor".into())
979                }
980            }
981
982            #[derive(Debug)]
983            struct DestinationInterceptor;
984            impl Intercept for DestinationInterceptor {
985                fn name(&self) -> &'static str { "DestinationInterceptor" }
986
987                fn $destination_interceptor(
988                    &self,
989                    _ctx: $destination_ctx,
990                    _runtime_components: &RuntimeComponents,
991                    _cfg: &mut ConfigBag,
992                ) -> Result<(), BoxError> {
993                    tracing::debug!("DestinationInterceptor called!");
994                    Err("DestinationInterceptor".into())
995                }
996            }
997
998            #[derive(Debug)]
999            struct InterceptorsTestOperationRuntimePlugin(RuntimeComponentsBuilder);
1000            impl InterceptorsTestOperationRuntimePlugin {
1001                fn new() -> Self {
1002                    Self(
1003                        RuntimeComponentsBuilder::new("test")
1004                            .with_interceptor(SharedInterceptor::new(OriginInterceptor))
1005                            .with_interceptor(SharedInterceptor::new(DestinationInterceptor))
1006                    )
1007                }
1008            }
1009            impl RuntimePlugin for InterceptorsTestOperationRuntimePlugin {
1010                fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
1011                    Cow::Borrowed(&self.0)
1012                }
1013            }
1014
1015            let input = Input::doesnt_matter();
1016            let runtime_plugins = RuntimePlugins::new()
1017                .with_operation_plugin(TestOperationRuntimePlugin::new())
1018                .with_operation_plugin(NoAuthRuntimePluginV2::new())
1019                .with_operation_plugin(InterceptorsTestOperationRuntimePlugin::new());
1020            let actual = invoke("test", "test", input, &runtime_plugins)
1021                .await
1022                .expect_err("should error");
1023            let actual = format!("{:?}", actual);
1024            assert!(
1025                actual.starts_with(&$expected),
1026                "\nActual error:      {actual}\nShould start with: {}\n",
1027                $expected
1028            );
1029
1030            assert!(logs_contain("OriginInterceptor called!"));
1031            assert!(logs_contain("DestinationInterceptor called!"));
1032        };
1033    }
1034
1035    #[tokio::test]
1036    #[traced_test]
1037    async fn test_read_before_execution_error_causes_jump_to_modify_before_completion() {
1038        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1039        interceptor_error_redirection_test!(
1040            read_before_execution,
1041            &BeforeSerializationInterceptorContextRef<'_>,
1042            modify_before_completion,
1043            &mut FinalizerInterceptorContextMut<'_>,
1044            expected
1045        );
1046    }
1047
1048    #[tokio::test]
1049    #[traced_test]
1050    async fn test_modify_before_serialization_error_causes_jump_to_modify_before_completion() {
1051        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1052        interceptor_error_redirection_test!(
1053            modify_before_serialization,
1054            &mut BeforeSerializationInterceptorContextMut<'_>,
1055            modify_before_completion,
1056            &mut FinalizerInterceptorContextMut<'_>,
1057            expected
1058        );
1059    }
1060
1061    #[tokio::test]
1062    #[traced_test]
1063    async fn test_read_before_serialization_error_causes_jump_to_modify_before_completion() {
1064        let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1065        interceptor_error_redirection_test!(
1066            read_before_serialization,
1067            &BeforeSerializationInterceptorContextRef<'_>,
1068            modify_before_completion,
1069            &mut FinalizerInterceptorContextMut<'_>,
1070            expected
1071        );
1072    }
1073
1074    #[tokio::test]
1075    #[traced_test]
1076    async fn test_read_after_serialization_error_causes_jump_to_modify_before_completion() {
1077        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1078        interceptor_error_redirection_test!(
1079            read_after_serialization,
1080            &BeforeTransmitInterceptorContextRef<'_>,
1081            modify_before_completion,
1082            &mut FinalizerInterceptorContextMut<'_>,
1083            expected
1084        );
1085    }
1086
1087    #[tokio::test]
1088    #[traced_test]
1089    async fn test_modify_before_retry_loop_error_causes_jump_to_modify_before_completion() {
1090        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1091        interceptor_error_redirection_test!(
1092            modify_before_retry_loop,
1093            &mut BeforeTransmitInterceptorContextMut<'_>,
1094            modify_before_completion,
1095            &mut FinalizerInterceptorContextMut<'_>,
1096            expected
1097        );
1098    }
1099
1100    #[tokio::test]
1101    #[traced_test]
1102    async fn test_read_before_attempt_error_causes_jump_to_modify_before_attempt_completion() {
1103        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1104        interceptor_error_redirection_test!(
1105            read_before_attempt,
1106            &BeforeTransmitInterceptorContextRef<'_>,
1107            modify_before_attempt_completion,
1108            &mut FinalizerInterceptorContextMut<'_>,
1109            expected
1110        );
1111    }
1112
1113    #[tokio::test]
1114    #[traced_test]
1115    async fn test_modify_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1116        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1117        interceptor_error_redirection_test!(
1118            modify_before_signing,
1119            &mut BeforeTransmitInterceptorContextMut<'_>,
1120            modify_before_attempt_completion,
1121            &mut FinalizerInterceptorContextMut<'_>,
1122            expected
1123        );
1124    }
1125
1126    #[tokio::test]
1127    #[traced_test]
1128    async fn test_read_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1129        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1130        interceptor_error_redirection_test!(
1131            read_before_signing,
1132            &BeforeTransmitInterceptorContextRef<'_>,
1133            modify_before_attempt_completion,
1134            &mut FinalizerInterceptorContextMut<'_>,
1135            expected
1136        );
1137    }
1138
1139    #[tokio::test]
1140    #[traced_test]
1141    async fn test_read_after_signing_error_causes_jump_to_modify_before_attempt_completion() {
1142        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1143        interceptor_error_redirection_test!(
1144            read_after_signing,
1145            &BeforeTransmitInterceptorContextRef<'_>,
1146            modify_before_attempt_completion,
1147            &mut FinalizerInterceptorContextMut<'_>,
1148            expected
1149        );
1150    }
1151
1152    #[tokio::test]
1153    #[traced_test]
1154    async fn test_modify_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1155        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1156        interceptor_error_redirection_test!(
1157            modify_before_transmit,
1158            &mut BeforeTransmitInterceptorContextMut<'_>,
1159            modify_before_attempt_completion,
1160            &mut FinalizerInterceptorContextMut<'_>,
1161            expected
1162        );
1163    }
1164
1165    #[tokio::test]
1166    #[traced_test]
1167    async fn test_read_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1168        let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1169        interceptor_error_redirection_test!(
1170            read_before_transmit,
1171            &BeforeTransmitInterceptorContextRef<'_>,
1172            modify_before_attempt_completion,
1173            &mut FinalizerInterceptorContextMut<'_>,
1174            expected
1175        );
1176    }
1177
1178    #[tokio::test]
1179    #[traced_test]
1180    async fn test_read_after_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1181        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1182        interceptor_error_redirection_test!(
1183            read_after_transmit,
1184            &BeforeDeserializationInterceptorContextRef<'_>,
1185            modify_before_attempt_completion,
1186            &mut FinalizerInterceptorContextMut<'_>,
1187            expected
1188        );
1189    }
1190
1191    #[tokio::test]
1192    #[traced_test]
1193    async fn test_modify_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1194    ) {
1195        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1196        interceptor_error_redirection_test!(
1197            modify_before_deserialization,
1198            &mut BeforeDeserializationInterceptorContextMut<'_>,
1199            modify_before_attempt_completion,
1200            &mut FinalizerInterceptorContextMut<'_>,
1201            expected
1202        );
1203    }
1204
1205    #[tokio::test]
1206    #[traced_test]
1207    async fn test_read_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1208    ) {
1209        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1210        interceptor_error_redirection_test!(
1211            read_before_deserialization,
1212            &BeforeDeserializationInterceptorContextRef<'_>,
1213            modify_before_attempt_completion,
1214            &mut FinalizerInterceptorContextMut<'_>,
1215            expected
1216        );
1217    }
1218
1219    #[tokio::test]
1220    #[traced_test]
1221    async fn test_read_after_deserialization_error_causes_jump_to_modify_before_attempt_completion()
1222    {
1223        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1224        interceptor_error_redirection_test!(
1225            read_after_deserialization,
1226            &AfterDeserializationInterceptorContextRef<'_>,
1227            modify_before_attempt_completion,
1228            &mut FinalizerInterceptorContextMut<'_>,
1229            expected
1230        );
1231    }
1232
1233    #[tokio::test]
1234    #[traced_test]
1235    async fn test_modify_before_attempt_completion_error_causes_jump_to_read_after_attempt() {
1236        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("DestinationInterceptor")"#;
1237        interceptor_error_redirection_test!(
1238            modify_before_attempt_completion,
1239            &mut FinalizerInterceptorContextMut<'_>,
1240            read_after_attempt,
1241            &FinalizerInterceptorContextRef<'_>,
1242            expected
1243        );
1244    }
1245
1246    #[tokio::test]
1247    #[traced_test]
1248    async fn test_modify_before_completion_error_causes_jump_to_read_after_execution() {
1249        let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("DestinationInterceptor")"#;
1250        interceptor_error_redirection_test!(
1251            modify_before_completion,
1252            &mut FinalizerInterceptorContextMut<'_>,
1253            read_after_execution,
1254            &FinalizerInterceptorContextRef<'_>,
1255            expected
1256        );
1257    }
1258
1259    #[tokio::test]
1260    async fn test_stop_points() {
1261        let runtime_plugins = || {
1262            RuntimePlugins::new()
1263                .with_operation_plugin(TestOperationRuntimePlugin::new())
1264                .with_operation_plugin(NoAuthRuntimePluginV2::new())
1265        };
1266
1267        // StopPoint::None should result in a response getting set since orchestration doesn't stop
1268        let context = invoke_with_stop_point(
1269            "test",
1270            "test",
1271            Input::doesnt_matter(),
1272            &runtime_plugins(),
1273            StopPoint::None,
1274        )
1275        .await
1276        .expect("success");
1277        assert!(context.response().is_some());
1278
1279        // StopPoint::BeforeTransmit will exit right before sending the request, so there should be no response
1280        let context = invoke_with_stop_point(
1281            "test",
1282            "test",
1283            Input::doesnt_matter(),
1284            &runtime_plugins(),
1285            StopPoint::BeforeTransmit,
1286        )
1287        .await
1288        .expect("success");
1289        assert!(context.response().is_none());
1290    }
1291
1292    /// The "finally" interceptors should run upon error when the StopPoint is set to BeforeTransmit
1293    #[tokio::test]
1294    async fn test_stop_points_error_handling() {
1295        #[derive(Debug, Default)]
1296        struct Inner {
1297            modify_before_retry_loop_called: AtomicBool,
1298            modify_before_completion_called: AtomicBool,
1299            read_after_execution_called: AtomicBool,
1300        }
1301        #[derive(Clone, Debug, Default)]
1302        struct TestInterceptor {
1303            inner: Arc<Inner>,
1304        }
1305
1306        impl Intercept for TestInterceptor {
1307            fn name(&self) -> &'static str {
1308                "TestInterceptor"
1309            }
1310
1311            fn modify_before_retry_loop(
1312                &self,
1313                _context: &mut BeforeTransmitInterceptorContextMut<'_>,
1314                _rc: &RuntimeComponents,
1315                _cfg: &mut ConfigBag,
1316            ) -> Result<(), BoxError> {
1317                self.inner
1318                    .modify_before_retry_loop_called
1319                    .store(true, Ordering::Relaxed);
1320                Err("test error".into())
1321            }
1322
1323            fn modify_before_completion(
1324                &self,
1325                _context: &mut FinalizerInterceptorContextMut<'_>,
1326                _rc: &RuntimeComponents,
1327                _cfg: &mut ConfigBag,
1328            ) -> Result<(), BoxError> {
1329                self.inner
1330                    .modify_before_completion_called
1331                    .store(true, Ordering::Relaxed);
1332                Ok(())
1333            }
1334
1335            fn read_after_execution(
1336                &self,
1337                _context: &FinalizerInterceptorContextRef<'_>,
1338                _rc: &RuntimeComponents,
1339                _cfg: &mut ConfigBag,
1340            ) -> Result<(), BoxError> {
1341                self.inner
1342                    .read_after_execution_called
1343                    .store(true, Ordering::Relaxed);
1344                Ok(())
1345            }
1346        }
1347
1348        #[derive(Debug)]
1349        struct TestInterceptorRuntimePlugin {
1350            builder: RuntimeComponentsBuilder,
1351        }
1352
1353        impl RuntimePlugin for TestInterceptorRuntimePlugin {
1354            fn runtime_components(
1355                &self,
1356                _: &RuntimeComponentsBuilder,
1357            ) -> Cow<'_, RuntimeComponentsBuilder> {
1358                Cow::Borrowed(&self.builder)
1359            }
1360        }
1361
1362        let interceptor = TestInterceptor::default();
1363        let client = NeverClient::new();
1364        let runtime_plugins = || {
1365            RuntimePlugins::new()
1366                .with_operation_plugin(TestOperationRuntimePlugin::new())
1367                .with_operation_plugin(NoAuthRuntimePluginV2::new())
1368                .with_operation_plugin(TestInterceptorRuntimePlugin {
1369                    builder: RuntimeComponentsBuilder::new("test")
1370                        .with_interceptor(SharedInterceptor::new(interceptor.clone()))
1371                        .with_http_client(Some(client.clone())),
1372                })
1373        };
1374
1375        // StopPoint::BeforeTransmit will exit right before sending the request, so there should be no response
1376        let _err = invoke_with_stop_point(
1377            "test",
1378            "test",
1379            Input::doesnt_matter(),
1380            &runtime_plugins(),
1381            StopPoint::BeforeTransmit,
1382        )
1383        .await
1384        .expect_err("an error was returned");
1385        assert_eq!(client.num_calls(), 0);
1386
1387        assert!(interceptor
1388            .inner
1389            .modify_before_retry_loop_called
1390            .load(Ordering::Relaxed));
1391        assert!(interceptor
1392            .inner
1393            .modify_before_completion_called
1394            .load(Ordering::Relaxed));
1395        assert!(interceptor
1396            .inner
1397            .read_after_execution_called
1398            .load(Ordering::Relaxed));
1399    }
1400}