1use crate::client::interceptors::Interceptors;
7use crate::client::orchestrator::http::{log_response_body, read_body};
8use crate::client::retries::LongPollingBackoff;
9use crate::client::timeout::{MaybeTimeout, MaybeTimeoutConfig, TimeoutKind};
10use crate::client::{
11 http::body::minimum_throughput::MaybeUploadThroughputCheckFuture,
12 orchestrator::endpoints::orchestrate_endpoint,
13};
14use auth::{resolve_identity, sign_request};
15use aws_smithy_async::rt::sleep::AsyncSleep;
16use aws_smithy_runtime_api::box_error::BoxError;
17use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnector, HttpConnectorSettings};
18use aws_smithy_runtime_api::client::interceptors::context::{
19 Error, Input, InterceptorContext, Output, RewindResult,
20};
21use aws_smithy_runtime_api::client::orchestrator::{
22 HttpResponse, LoadedRequestBody, OrchestratorError,
23};
24use aws_smithy_runtime_api::client::result::SdkError;
25use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
26use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
27use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
28use aws_smithy_runtime_api::client::ser_de::{
29 DeserializeResponse, SerializeRequest, SharedRequestSerializer, SharedResponseDeserializer,
30};
31use aws_smithy_types::body::SdkBody;
32use aws_smithy_types::byte_stream::ByteStream;
33use aws_smithy_types::config_bag::ConfigBag;
34use aws_smithy_types::retry::{MergeRetryConfig, RetryConfig};
35use aws_smithy_types::timeout::{MergeTimeoutConfig, TimeoutConfig};
36use endpoints::apply_endpoint;
37use std::mem;
38use tracing::{debug, debug_span, instrument, trace, Instrument};
39
40mod auth;
41pub use auth::AuthSchemeAndEndpointOrchestrationV2;
42
43pub mod endpoints;
45
46mod http;
48
49pub mod operation;
51
52macro_rules! halt {
53 ([$ctx:ident] => $err:expr) => {{
54 debug!("encountered orchestrator error; halting");
55 $ctx.fail($err.into());
56 return;
57 }};
58}
59
60macro_rules! halt_on_err {
61 ([$ctx:ident] => $expr:expr) => {
62 match $expr {
63 Ok(ok) => ok,
64 Err(err) => halt!([$ctx] => err),
65 }
66 };
67}
68
69macro_rules! continue_on_err {
70 ([$ctx:ident] => $expr:expr) => {
71 if let Err(err) = $expr {
72 debug!(err = ?err, "encountered orchestrator error; continuing");
73 $ctx.fail(err.into());
74 }
75 };
76}
77
78macro_rules! run_interceptors {
79 (continue_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
80 $(run_interceptors!(continue_on_err: $interceptor($ctx, $rc, $cfg));)+
81 };
82 (continue_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
83 continue_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
84 };
85 (halt_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
86 $(run_interceptors!(halt_on_err: $interceptor($ctx, $rc, $cfg));)+
87 };
88 (halt_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
89 halt_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
90 };
91 (__private $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
92 Interceptors::new($rc.interceptors()).$interceptor($ctx, $rc, $cfg)
93 };
94}
95
96pub async fn invoke(
106 service_name: &str,
107 operation_name: &str,
108 input: Input,
109 runtime_plugins: &RuntimePlugins,
110) -> Result<Output, SdkError<Error, HttpResponse>> {
111 invoke_with_stop_point(
112 service_name,
113 operation_name,
114 input,
115 runtime_plugins,
116 StopPoint::None,
117 )
118 .await?
119 .finalize()
120}
121
122#[non_exhaustive]
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub enum StopPoint {
126 None,
128
129 BeforeTransmit,
131}
132
133pub async fn invoke_with_stop_point(
140 _service_name: &str,
144 _operation_name: &str,
145 input: Input,
146 runtime_plugins: &RuntimePlugins,
147 stop_point: StopPoint,
148) -> Result<InterceptorContext, SdkError<Error, HttpResponse>> {
149 async move {
150 let mut cfg = ConfigBag::base();
151 let cfg = &mut cfg;
152
153 let mut ctx = InterceptorContext::new(input);
154
155 let runtime_components = apply_configuration(&mut ctx, cfg, runtime_plugins)
156 .map_err(SdkError::construction_failure)?;
157 trace!(runtime_components = ?runtime_components);
158
159 let operation_timeout_config =
160 MaybeTimeoutConfig::new(&runtime_components, cfg, TimeoutKind::Operation);
161 trace!(operation_timeout_config = ?operation_timeout_config);
162 async {
163 if !ctx.is_failed() {
166 try_op(&mut ctx, cfg, &runtime_components, stop_point).await;
167 }
168 finally_op(&mut ctx, cfg, &runtime_components).await;
169 if ctx.is_failed() {
170 Err(ctx.finalize().expect_err("it is failed"))
171 } else {
172 Ok(ctx)
173 }
174 }
175 .maybe_timeout(operation_timeout_config)
176 .await
177 }
178 .await
179}
180
181#[instrument(skip_all, level = "debug")]
185fn apply_configuration(
186 ctx: &mut InterceptorContext,
187 cfg: &mut ConfigBag,
188 runtime_plugins: &RuntimePlugins,
189) -> Result<RuntimeComponents, BoxError> {
190 let client_rc_builder = runtime_plugins.apply_client_configuration(cfg)?;
191 continue_on_err!([ctx] => Interceptors::new(client_rc_builder.interceptors()).read_before_execution(false, ctx, cfg));
192
193 let operation_rc_builder = runtime_plugins.apply_operation_configuration(cfg)?;
194 continue_on_err!([ctx] => Interceptors::new(operation_rc_builder.interceptors()).read_before_execution(true, ctx, cfg));
195
196 let components = client_rc_builder
198 .merge_from(&operation_rc_builder)
199 .build()?;
200
201 let resolved_timeout_config = cfg.load::<MergeTimeoutConfig>();
205 debug!(
206 "timeout settings for this operation: {:?}",
207 resolved_timeout_config
208 );
209 cfg.interceptor_state().store_put(resolved_timeout_config);
210
211 let resolved_retry_config = cfg.load::<MergeRetryConfig>();
212 debug!(
213 "retry settings for this operation: {:?}",
214 resolved_retry_config
215 );
216 cfg.interceptor_state().store_put(resolved_retry_config);
217
218 components.validate_final_config(cfg)?;
219 Ok(components)
220}
221
222#[instrument(skip_all, level = "debug")]
223async fn try_op(
224 ctx: &mut InterceptorContext,
225 cfg: &mut ConfigBag,
226 runtime_components: &RuntimeComponents,
227 stop_point: StopPoint,
228) {
229 run_interceptors!(halt_on_err: {
231 modify_before_serialization(ctx, runtime_components, cfg);
232 read_before_serialization(ctx, runtime_components, cfg);
233 });
234
235 ctx.enter_serialization_phase();
237 {
238 let _span = debug_span!("serialization").entered();
239 let request_serializer = cfg
240 .load::<SharedRequestSerializer>()
241 .expect("request serializer must be in the config bag")
242 .clone();
243 let input = ctx.take_input().expect("input set at this point");
244 let request = halt_on_err!([ctx] => request_serializer.serialize_input(input, cfg).map_err(OrchestratorError::other));
245 ctx.set_request(request);
246 }
247
248 if let Some(&LoadedRequestBody::Requested) = cfg.load::<LoadedRequestBody>() {
250 debug!("loading request body into memory");
251 let mut body = SdkBody::taken();
252 mem::swap(&mut body, ctx.request_mut().expect("set above").body_mut());
253 let loaded_body = halt_on_err!([ctx] =>
254 ByteStream::new(body).collect().await.map_err(OrchestratorError::other)
255 )
256 .into_bytes();
257 *ctx.request_mut().as_mut().expect("set above").body_mut() =
258 SdkBody::from(loaded_body.clone());
259 cfg.interceptor_state()
260 .store_put(LoadedRequestBody::Loaded(loaded_body));
261 }
262
263 ctx.enter_before_transmit_phase();
265 run_interceptors!(halt_on_err: {
266 read_after_serialization(ctx, runtime_components, cfg);
267 modify_before_retry_loop(ctx, runtime_components, cfg);
268 });
269
270 let retry_strategy = runtime_components.retry_strategy();
275 loop {
276 let should_attempt = retry_strategy.should_attempt_initial_request(runtime_components, cfg);
277 match should_attempt {
278 Ok(ShouldAttempt::Yes) => {
279 debug!("retry strategy has OKed initial request");
280 break;
281 }
282 Ok(ShouldAttempt::No) => {
283 let err: BoxError = "the retry strategy indicates that an initial request shouldn't be made, but it didn't specify why".into();
284 halt!([ctx] => OrchestratorError::other(err));
285 }
286 Err(err) => halt!([ctx] => OrchestratorError::other(err)),
287 Ok(ShouldAttempt::YesAfterDelay(delay)) => {
288 let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
289 "the retry strategy requested a delay before sending the initial request, but no 'async sleep' implementation was set"
290 )));
291 debug!("retry strategy has OKed initial request after a {delay:?} delay");
292 sleep_impl.sleep(delay).await;
293 continue;
294 }
295 }
296 }
297
298 ctx.save_checkpoint();
301 if cfg
304 .load::<RetryConfig>()
305 .and_then(|rc| rc.retry_spec())
306 .is_some_and(|s| s.long_polling())
307 {
308 cfg.interceptor_state()
309 .store_put(LongPollingBackoff::default());
310 }
311 let mut retry_delay = None;
312 for i in 1u32.. {
313 trace!("checking if context can be rewound for attempt #{i}");
316 if let RewindResult::Impossible = ctx.rewind(cfg) {
317 debug!("request cannot be retried since the request body cannot be cloned");
318 break;
319 }
320 cfg.interceptor_state()
322 .store_put::<RequestAttempts>(i.into());
323 if let Some((delay, sleep)) = retry_delay.take() {
325 debug!("delaying for {delay:?}");
326 sleep.await;
327 }
328 let attempt_timeout_config =
329 MaybeTimeoutConfig::new(runtime_components, cfg, TimeoutKind::OperationAttempt);
330 trace!(attempt_timeout_config = ?attempt_timeout_config);
331 let maybe_timeout = async {
332 debug!("beginning attempt #{i}");
333 try_attempt(ctx, cfg, runtime_components, stop_point)
334 .instrument(debug_span!("try_attempt", "attempt" = i))
335 .await;
336 finally_attempt(ctx, cfg, runtime_components)
337 .instrument(debug_span!("finally_attempt", "attempt" = i))
338 .await;
339 Result::<_, SdkError<Error, HttpResponse>>::Ok(())
340 }
341 .maybe_timeout(attempt_timeout_config)
342 .await
343 .map_err(|err| OrchestratorError::timeout(err.into_source().unwrap()));
344
345 continue_on_err!([ctx] => maybe_timeout);
347
348 let should_attempt = halt_on_err!([ctx] => runtime_components
351 .retry_strategy()
352 .should_attempt_retry(ctx, runtime_components, cfg)
353 .map_err(OrchestratorError::other));
354 match should_attempt {
355 ShouldAttempt::Yes => continue,
357 ShouldAttempt::No => {
359 debug!("a retry is either unnecessary or not possible, exiting attempt loop");
360 if let Some(delay) = cfg.load::<LongPollingBackoff>().and_then(|h| h.take()) {
361 if let Some(sleep_impl) = runtime_components.sleep_impl() {
362 debug!("backing off {delay:?} before returning (no retry quota available)");
366 sleep_impl.sleep(delay).await;
367 }
368 }
369 break;
370 }
371 ShouldAttempt::YesAfterDelay(delay) => {
372 let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
373 "the retry strategy requested a delay before sending the retry request, but no 'async sleep' implementation was set"
374 )));
375 retry_delay = Some((delay, sleep_impl.sleep(delay)));
376 continue;
377 }
378 }
379 }
380}
381
382async fn try_attempt(
383 ctx: &mut InterceptorContext,
384 cfg: &mut ConfigBag,
385 runtime_components: &RuntimeComponents,
386 stop_point: StopPoint,
387) {
388 run_interceptors!(halt_on_err: read_before_attempt(ctx, runtime_components, cfg));
389
390 let (scheme_id, identity, endpoint) = halt_on_err!([ctx] => resolve_identity(runtime_components, cfg).await.map_err(OrchestratorError::other));
391
392 match endpoint {
393 Some(endpoint) => {
394 halt_on_err!([ctx] => apply_endpoint(&endpoint, ctx, cfg).map_err(OrchestratorError::other));
397 cfg.interceptor_state().store_put(endpoint);
399 }
400 None => {
401 halt_on_err!([ctx] => orchestrate_endpoint(identity.clone(), ctx, runtime_components, cfg)
402 .instrument(debug_span!("orchestrate_endpoint"))
403 .await
404 .map_err(OrchestratorError::other));
405 }
406 }
407
408 run_interceptors!(halt_on_err: {
409 modify_before_signing(ctx, runtime_components, cfg);
410 read_before_signing(ctx, runtime_components, cfg);
411 });
412
413 halt_on_err!([ctx] => sign_request(&scheme_id, &identity, ctx, runtime_components, cfg).map_err(OrchestratorError::other));
414
415 run_interceptors!(halt_on_err: {
416 read_after_signing(ctx, runtime_components, cfg);
417 modify_before_transmit(ctx, runtime_components, cfg);
418 read_before_transmit(ctx, runtime_components, cfg);
419 });
420
421 if let StopPoint::BeforeTransmit = stop_point {
423 debug!("ending orchestration early because the stop point is `BeforeTransmit`");
424 return;
425 }
426
427 ctx.enter_transmit_phase();
430 let response = halt_on_err!([ctx] => {
431 let request = ctx.take_request().expect("set during serialization");
432 trace!(request = ?request, "transmitting request");
433 let http_client = halt_on_err!([ctx] => runtime_components.http_client().ok_or_else(||
434 OrchestratorError::other("No HTTP client was available to send this request. \
435 Enable the `default-https-client` crate feature or configure an HTTP client to fix this.")
436 ));
437 let timeout_config = cfg.load::<TimeoutConfig>().expect("timeout config must be set");
438 let settings = {
439 let mut builder = HttpConnectorSettings::builder();
440 builder.set_connect_timeout(timeout_config.connect_timeout());
441 builder.set_read_timeout(timeout_config.read_timeout());
442 builder.build()
443 };
444 let connector = http_client.http_connector(&settings, runtime_components);
445 let response_future = MaybeUploadThroughputCheckFuture::new(
446 cfg,
447 runtime_components,
448 connector.call(request),
449 );
450 response_future.await.map_err(OrchestratorError::connector)
451 });
452 trace!(response = ?response, "received response from service");
453 ctx.set_response(response);
454 ctx.enter_before_deserialization_phase();
455
456 run_interceptors!(halt_on_err: {
457 read_after_transmit(ctx, runtime_components, cfg);
458 modify_before_deserialization(ctx, runtime_components, cfg);
459 read_before_deserialization(ctx, runtime_components, cfg);
460 });
461
462 ctx.enter_deserialization_phase();
463 let output_or_error = async {
464 let response = ctx.response_mut().expect("set during transmit");
465 let response_deserializer = cfg
466 .load::<SharedResponseDeserializer>()
467 .expect("a request deserializer must be in the config bag");
468 let maybe_deserialized = {
469 let _span = debug_span!("deserialize_streaming").entered();
470 response_deserializer.deserialize_streaming(response)
471 };
472 match maybe_deserialized {
473 Some(output_or_error) => output_or_error,
474 None => read_body(response)
475 .instrument(debug_span!("read_body"))
476 .await
477 .map_err(OrchestratorError::response)
478 .and_then(|_| {
479 let _span = debug_span!("deserialize_nonstreaming").entered();
480 log_response_body(response, cfg);
481 response_deserializer.deserialize_nonstreaming(response)
482 }),
483 }
484 }
485 .instrument(debug_span!("deserialization"))
486 .await;
487 trace!(output_or_error = ?output_or_error);
488 ctx.set_output_or_error(output_or_error);
489
490 ctx.enter_after_deserialization_phase();
491 run_interceptors!(halt_on_err: read_after_deserialization(ctx, runtime_components, cfg));
492}
493
494async fn finally_attempt(
495 ctx: &mut InterceptorContext,
496 cfg: &mut ConfigBag,
497 runtime_components: &RuntimeComponents,
498) {
499 run_interceptors!(continue_on_err: {
500 modify_before_attempt_completion(ctx, runtime_components, cfg);
501 read_after_attempt(ctx, runtime_components, cfg);
502 });
503}
504
505#[instrument(skip_all, level = "debug")]
506async fn finally_op(
507 ctx: &mut InterceptorContext,
508 cfg: &mut ConfigBag,
509 runtime_components: &RuntimeComponents,
510) {
511 run_interceptors!(continue_on_err: {
512 modify_before_completion(ctx, runtime_components, cfg);
513 read_after_execution(ctx, runtime_components, cfg);
514 });
515}
516
517#[cfg(all(test, any(feature = "test-util", feature = "legacy-test-util")))]
518mod tests {
519 use crate::client::auth::no_auth::{NoAuthRuntimePluginV2, NO_AUTH_SCHEME_ID};
520 use crate::client::orchestrator::endpoints::StaticUriEndpointResolver;
521 use crate::client::orchestrator::{invoke, invoke_with_stop_point, StopPoint};
522 use crate::client::retries::strategy::NeverRetryStrategy;
523 use crate::client::test_util::{
524 deserializer::CannedResponseDeserializer, serializer::CannedRequestSerializer,
525 };
526 use aws_smithy_http_client::test_util::NeverClient;
527 use aws_smithy_runtime_api::box_error::BoxError;
528 use aws_smithy_runtime_api::client::auth::static_resolver::StaticAuthSchemeOptionResolver;
529 use aws_smithy_runtime_api::client::auth::{
530 AuthSchemeOptionResolverParams, SharedAuthSchemeOptionResolver,
531 };
532 use aws_smithy_runtime_api::client::endpoint::{
533 EndpointResolverParams, SharedEndpointResolver,
534 };
535 use aws_smithy_runtime_api::client::http::{
536 http_client_fn, HttpConnector, HttpConnectorFuture,
537 };
538 use aws_smithy_runtime_api::client::interceptors::context::{
539 AfterDeserializationInterceptorContextRef, BeforeDeserializationInterceptorContextMut,
540 BeforeDeserializationInterceptorContextRef, BeforeSerializationInterceptorContextMut,
541 BeforeSerializationInterceptorContextRef, BeforeTransmitInterceptorContextMut,
542 BeforeTransmitInterceptorContextRef, FinalizerInterceptorContextMut,
543 FinalizerInterceptorContextRef, Input, Output,
544 };
545 use aws_smithy_runtime_api::client::interceptors::{Intercept, SharedInterceptor};
546 use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, OrchestratorError};
547 use aws_smithy_runtime_api::client::retries::SharedRetryStrategy;
548 use aws_smithy_runtime_api::client::runtime_components::{
549 RuntimeComponents, RuntimeComponentsBuilder,
550 };
551 use aws_smithy_runtime_api::client::runtime_plugin::{RuntimePlugin, RuntimePlugins};
552 use aws_smithy_runtime_api::client::ser_de::{
553 SharedRequestSerializer, SharedResponseDeserializer,
554 };
555 use aws_smithy_runtime_api::shared::IntoShared;
556 use aws_smithy_types::body::SdkBody;
557 use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
558 use aws_smithy_types::timeout::TimeoutConfig;
559 use http_1x::{Response, StatusCode};
560 use std::borrow::Cow;
561 use std::sync::atomic::{AtomicBool, Ordering};
562 use std::sync::Arc;
563 use tracing_test::traced_test;
564
565 fn new_request_serializer() -> CannedRequestSerializer {
566 CannedRequestSerializer::success(HttpRequest::empty())
567 }
568
569 fn new_response_deserializer() -> CannedResponseDeserializer {
570 CannedResponseDeserializer::new(
571 Response::builder()
572 .status(StatusCode::OK)
573 .body(SdkBody::empty())
574 .map_err(|err| OrchestratorError::other(Box::new(err)))
575 .map(Output::erase),
576 )
577 }
578
579 #[derive(Debug, Default)]
580 struct OkConnector {}
581
582 impl OkConnector {
583 fn new() -> Self {
584 Self::default()
585 }
586 }
587
588 impl HttpConnector for OkConnector {
589 fn call(&self, _request: HttpRequest) -> HttpConnectorFuture {
590 HttpConnectorFuture::ready(Ok(http_1x::Response::builder()
591 .status(200)
592 .body(SdkBody::empty())
593 .expect("OK response is valid")
594 .try_into()
595 .unwrap()))
596 }
597 }
598
599 #[derive(Debug)]
600 struct TestOperationRuntimePlugin {
601 builder: RuntimeComponentsBuilder,
602 }
603
604 impl TestOperationRuntimePlugin {
605 fn new() -> Self {
606 Self {
607 builder: RuntimeComponentsBuilder::for_tests()
608 .with_retry_strategy(Some(SharedRetryStrategy::new(NeverRetryStrategy::new())))
609 .with_endpoint_resolver(Some(SharedEndpointResolver::new(
610 StaticUriEndpointResolver::http_localhost(8080),
611 )))
612 .with_http_client(Some(http_client_fn(|_, _| {
613 OkConnector::new().into_shared()
614 })))
615 .with_auth_scheme_option_resolver(Some(SharedAuthSchemeOptionResolver::new(
616 StaticAuthSchemeOptionResolver::new(vec![NO_AUTH_SCHEME_ID]),
617 ))),
618 }
619 }
620 }
621
622 impl RuntimePlugin for TestOperationRuntimePlugin {
623 fn config(&self) -> Option<FrozenLayer> {
624 let mut layer = Layer::new("TestOperationRuntimePlugin");
625 layer.store_put(AuthSchemeOptionResolverParams::new("idontcare"));
626 layer.store_put(EndpointResolverParams::new("dontcare"));
627 layer.store_put(SharedRequestSerializer::new(new_request_serializer()));
628 layer.store_put(SharedResponseDeserializer::new(new_response_deserializer()));
629 layer.store_put(TimeoutConfig::builder().build());
630 Some(layer.freeze())
631 }
632
633 fn runtime_components(
634 &self,
635 _: &RuntimeComponentsBuilder,
636 ) -> Cow<'_, RuntimeComponentsBuilder> {
637 Cow::Borrowed(&self.builder)
638 }
639 }
640
641 macro_rules! interceptor_error_handling_test {
642 (read_before_execution, $ctx:ty, $expected:expr,) => {
643 interceptor_error_handling_test!(__private read_before_execution, $ctx, $expected,);
644 };
645 ($interceptor:ident, $ctx:ty, $expected:expr) => {
646 interceptor_error_handling_test!(__private $interceptor, $ctx, $expected, _rc: &RuntimeComponents,);
647 };
648 (__private $interceptor:ident, $ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
649 #[derive(Debug)]
650 struct FailingInterceptorA;
651 impl Intercept for FailingInterceptorA {
652 fn name(&self) -> &'static str { "FailingInterceptorA" }
653
654 fn $interceptor(
655 &self,
656 _ctx: $ctx,
657 $($rc_arg)*
658 _cfg: &mut ConfigBag,
659 ) -> Result<(), BoxError> {
660 tracing::debug!("FailingInterceptorA called!");
661 Err("FailingInterceptorA".into())
662 }
663 }
664
665 #[derive(Debug)]
666 struct FailingInterceptorB;
667 impl Intercept for FailingInterceptorB {
668 fn name(&self) -> &'static str { "FailingInterceptorB" }
669
670 fn $interceptor(
671 &self,
672 _ctx: $ctx,
673 $($rc_arg)*
674 _cfg: &mut ConfigBag,
675 ) -> Result<(), BoxError> {
676 tracing::debug!("FailingInterceptorB called!");
677 Err("FailingInterceptorB".into())
678 }
679 }
680
681 #[derive(Debug)]
682 struct FailingInterceptorC;
683 impl Intercept for FailingInterceptorC {
684 fn name(&self) -> &'static str { "FailingInterceptorC" }
685
686 fn $interceptor(
687 &self,
688 _ctx: $ctx,
689 $($rc_arg)*
690 _cfg: &mut ConfigBag,
691 ) -> Result<(), BoxError> {
692 tracing::debug!("FailingInterceptorC called!");
693 Err("FailingInterceptorC".into())
694 }
695 }
696
697 #[derive(Debug)]
698 struct FailingInterceptorsClientRuntimePlugin(RuntimeComponentsBuilder);
699 impl FailingInterceptorsClientRuntimePlugin {
700 fn new() -> Self {
701 Self(RuntimeComponentsBuilder::new("test").with_interceptor(SharedInterceptor::new(FailingInterceptorA)))
702 }
703 }
704 impl RuntimePlugin for FailingInterceptorsClientRuntimePlugin {
705 fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
706 Cow::Borrowed(&self.0)
707 }
708 }
709
710 #[derive(Debug)]
711 struct FailingInterceptorsOperationRuntimePlugin(RuntimeComponentsBuilder);
712 impl FailingInterceptorsOperationRuntimePlugin {
713 fn new() -> Self {
714 Self(
715 RuntimeComponentsBuilder::new("test")
716 .with_interceptor(SharedInterceptor::new(FailingInterceptorB))
717 .with_interceptor(SharedInterceptor::new(FailingInterceptorC))
718 )
719 }
720 }
721 impl RuntimePlugin for FailingInterceptorsOperationRuntimePlugin {
722 fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
723 Cow::Borrowed(&self.0)
724 }
725 }
726
727 let input = Input::doesnt_matter();
728 let runtime_plugins = RuntimePlugins::new()
729 .with_client_plugin(FailingInterceptorsClientRuntimePlugin::new())
730 .with_operation_plugin(TestOperationRuntimePlugin::new())
731 .with_operation_plugin(NoAuthRuntimePluginV2::new())
732 .with_operation_plugin(FailingInterceptorsOperationRuntimePlugin::new());
733 let actual = invoke("test", "test", input, &runtime_plugins)
734 .await
735 .expect_err("should error");
736 let actual = format!("{:?}", actual);
737 assert!(
738 actual.starts_with(&$expected),
739 "\nActual error: {actual}\nShould start with: {}\n",
740 $expected
741 );
742
743 assert!(logs_contain("FailingInterceptorA called!"));
744 assert!(logs_contain("FailingInterceptorB called!"));
745 assert!(logs_contain("FailingInterceptorC called!"));
746 };
747 }
748
749 #[tokio::test]
750 #[traced_test]
751 async fn test_read_before_execution_error_handling() {
752 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeExecution, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
753 interceptor_error_handling_test!(
754 read_before_execution,
755 &BeforeSerializationInterceptorContextRef<'_>,
756 expected,
757 );
758 }
759
760 #[tokio::test]
761 #[traced_test]
762 async fn test_modify_before_serialization_error_handling() {
763 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
764 interceptor_error_handling_test!(
765 modify_before_serialization,
766 &mut BeforeSerializationInterceptorContextMut<'_>,
767 expected
768 );
769 }
770
771 #[tokio::test]
772 #[traced_test]
773 async fn test_read_before_serialization_error_handling() {
774 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
775 interceptor_error_handling_test!(
776 read_before_serialization,
777 &BeforeSerializationInterceptorContextRef<'_>,
778 expected
779 );
780 }
781
782 #[tokio::test]
783 #[traced_test]
784 async fn test_read_after_serialization_error_handling() {
785 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSerialization, interceptor_name: Some("FailingInterceptorC")"#.to_string();
786 interceptor_error_handling_test!(
787 read_after_serialization,
788 &BeforeTransmitInterceptorContextRef<'_>,
789 expected
790 );
791 }
792
793 #[tokio::test]
794 #[traced_test]
795 async fn test_modify_before_retry_loop_error_handling() {
796 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeRetryLoop, interceptor_name: Some("FailingInterceptorC")"#.to_string();
797 interceptor_error_handling_test!(
798 modify_before_retry_loop,
799 &mut BeforeTransmitInterceptorContextMut<'_>,
800 expected
801 );
802 }
803
804 #[tokio::test]
805 #[traced_test]
806 async fn test_read_before_attempt_error_handling() {
807 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeAttempt, interceptor_name: Some("FailingInterceptorC")"#;
808 interceptor_error_handling_test!(
809 read_before_attempt,
810 &BeforeTransmitInterceptorContextRef<'_>,
811 expected
812 );
813 }
814
815 #[tokio::test]
816 #[traced_test]
817 async fn test_modify_before_signing_error_handling() {
818 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
819 interceptor_error_handling_test!(
820 modify_before_signing,
821 &mut BeforeTransmitInterceptorContextMut<'_>,
822 expected
823 );
824 }
825
826 #[tokio::test]
827 #[traced_test]
828 async fn test_read_before_signing_error_handling() {
829 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
830 interceptor_error_handling_test!(
831 read_before_signing,
832 &BeforeTransmitInterceptorContextRef<'_>,
833 expected
834 );
835 }
836
837 #[tokio::test]
838 #[traced_test]
839 async fn test_read_after_signing_error_handling() {
840 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSigning, interceptor_name: Some("FailingInterceptorC")"#;
841 interceptor_error_handling_test!(
842 read_after_signing,
843 &BeforeTransmitInterceptorContextRef<'_>,
844 expected
845 );
846 }
847
848 #[tokio::test]
849 #[traced_test]
850 async fn test_modify_before_transmit_error_handling() {
851 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
852 interceptor_error_handling_test!(
853 modify_before_transmit,
854 &mut BeforeTransmitInterceptorContextMut<'_>,
855 expected
856 );
857 }
858
859 #[tokio::test]
860 #[traced_test]
861 async fn test_read_before_transmit_error_handling() {
862 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
863 interceptor_error_handling_test!(
864 read_before_transmit,
865 &BeforeTransmitInterceptorContextRef<'_>,
866 expected
867 );
868 }
869
870 #[tokio::test]
871 #[traced_test]
872 async fn test_read_after_transmit_error_handling() {
873 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterTransmit, interceptor_name: Some("FailingInterceptorC")"#;
874 interceptor_error_handling_test!(
875 read_after_transmit,
876 &BeforeDeserializationInterceptorContextRef<'_>,
877 expected
878 );
879 }
880
881 #[tokio::test]
882 #[traced_test]
883 async fn test_modify_before_deserialization_error_handling() {
884 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
885 interceptor_error_handling_test!(
886 modify_before_deserialization,
887 &mut BeforeDeserializationInterceptorContextMut<'_>,
888 expected
889 );
890 }
891
892 #[tokio::test]
893 #[traced_test]
894 async fn test_read_before_deserialization_error_handling() {
895 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
896 interceptor_error_handling_test!(
897 read_before_deserialization,
898 &BeforeDeserializationInterceptorContextRef<'_>,
899 expected
900 );
901 }
902
903 #[tokio::test]
904 #[traced_test]
905 async fn test_read_after_deserialization_error_handling() {
906 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
907 interceptor_error_handling_test!(
908 read_after_deserialization,
909 &AfterDeserializationInterceptorContextRef<'_>,
910 expected
911 );
912 }
913
914 #[tokio::test]
915 #[traced_test]
916 async fn test_modify_before_attempt_completion_error_handling() {
917 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("FailingInterceptorC")"#;
918 interceptor_error_handling_test!(
919 modify_before_attempt_completion,
920 &mut FinalizerInterceptorContextMut<'_>,
921 expected
922 );
923 }
924
925 #[tokio::test]
926 #[traced_test]
927 async fn test_read_after_attempt_error_handling() {
928 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("FailingInterceptorC")"#;
929 interceptor_error_handling_test!(
930 read_after_attempt,
931 &FinalizerInterceptorContextRef<'_>,
932 expected
933 );
934 }
935
936 #[tokio::test]
937 #[traced_test]
938 async fn test_modify_before_completion_error_handling() {
939 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("FailingInterceptorC")"#;
940 interceptor_error_handling_test!(
941 modify_before_completion,
942 &mut FinalizerInterceptorContextMut<'_>,
943 expected
944 );
945 }
946
947 #[tokio::test]
948 #[traced_test]
949 async fn test_read_after_execution_error_handling() {
950 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("FailingInterceptorC")"#;
951 interceptor_error_handling_test!(
952 read_after_execution,
953 &FinalizerInterceptorContextRef<'_>,
954 expected
955 );
956 }
957
958 macro_rules! interceptor_error_redirection_test {
959 (read_before_execution, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
960 interceptor_error_redirection_test!(__private read_before_execution, $origin_ctx, $destination_interceptor, $destination_ctx, $expected,);
961 };
962 ($origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
963 interceptor_error_redirection_test!(__private $origin_interceptor, $origin_ctx, $destination_interceptor, $destination_ctx, $expected, _rc: &RuntimeComponents,);
964 };
965 (__private $origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
966 #[derive(Debug)]
967 struct OriginInterceptor;
968 impl Intercept for OriginInterceptor {
969 fn name(&self) -> &'static str { "OriginInterceptor" }
970
971 fn $origin_interceptor(
972 &self,
973 _ctx: $origin_ctx,
974 $($rc_arg)*
975 _cfg: &mut ConfigBag,
976 ) -> Result<(), BoxError> {
977 tracing::debug!("OriginInterceptor called!");
978 Err("OriginInterceptor".into())
979 }
980 }
981
982 #[derive(Debug)]
983 struct DestinationInterceptor;
984 impl Intercept for DestinationInterceptor {
985 fn name(&self) -> &'static str { "DestinationInterceptor" }
986
987 fn $destination_interceptor(
988 &self,
989 _ctx: $destination_ctx,
990 _runtime_components: &RuntimeComponents,
991 _cfg: &mut ConfigBag,
992 ) -> Result<(), BoxError> {
993 tracing::debug!("DestinationInterceptor called!");
994 Err("DestinationInterceptor".into())
995 }
996 }
997
998 #[derive(Debug)]
999 struct InterceptorsTestOperationRuntimePlugin(RuntimeComponentsBuilder);
1000 impl InterceptorsTestOperationRuntimePlugin {
1001 fn new() -> Self {
1002 Self(
1003 RuntimeComponentsBuilder::new("test")
1004 .with_interceptor(SharedInterceptor::new(OriginInterceptor))
1005 .with_interceptor(SharedInterceptor::new(DestinationInterceptor))
1006 )
1007 }
1008 }
1009 impl RuntimePlugin for InterceptorsTestOperationRuntimePlugin {
1010 fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
1011 Cow::Borrowed(&self.0)
1012 }
1013 }
1014
1015 let input = Input::doesnt_matter();
1016 let runtime_plugins = RuntimePlugins::new()
1017 .with_operation_plugin(TestOperationRuntimePlugin::new())
1018 .with_operation_plugin(NoAuthRuntimePluginV2::new())
1019 .with_operation_plugin(InterceptorsTestOperationRuntimePlugin::new());
1020 let actual = invoke("test", "test", input, &runtime_plugins)
1021 .await
1022 .expect_err("should error");
1023 let actual = format!("{:?}", actual);
1024 assert!(
1025 actual.starts_with(&$expected),
1026 "\nActual error: {actual}\nShould start with: {}\n",
1027 $expected
1028 );
1029
1030 assert!(logs_contain("OriginInterceptor called!"));
1031 assert!(logs_contain("DestinationInterceptor called!"));
1032 };
1033 }
1034
1035 #[tokio::test]
1036 #[traced_test]
1037 async fn test_read_before_execution_error_causes_jump_to_modify_before_completion() {
1038 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1039 interceptor_error_redirection_test!(
1040 read_before_execution,
1041 &BeforeSerializationInterceptorContextRef<'_>,
1042 modify_before_completion,
1043 &mut FinalizerInterceptorContextMut<'_>,
1044 expected
1045 );
1046 }
1047
1048 #[tokio::test]
1049 #[traced_test]
1050 async fn test_modify_before_serialization_error_causes_jump_to_modify_before_completion() {
1051 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1052 interceptor_error_redirection_test!(
1053 modify_before_serialization,
1054 &mut BeforeSerializationInterceptorContextMut<'_>,
1055 modify_before_completion,
1056 &mut FinalizerInterceptorContextMut<'_>,
1057 expected
1058 );
1059 }
1060
1061 #[tokio::test]
1062 #[traced_test]
1063 async fn test_read_before_serialization_error_causes_jump_to_modify_before_completion() {
1064 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1065 interceptor_error_redirection_test!(
1066 read_before_serialization,
1067 &BeforeSerializationInterceptorContextRef<'_>,
1068 modify_before_completion,
1069 &mut FinalizerInterceptorContextMut<'_>,
1070 expected
1071 );
1072 }
1073
1074 #[tokio::test]
1075 #[traced_test]
1076 async fn test_read_after_serialization_error_causes_jump_to_modify_before_completion() {
1077 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1078 interceptor_error_redirection_test!(
1079 read_after_serialization,
1080 &BeforeTransmitInterceptorContextRef<'_>,
1081 modify_before_completion,
1082 &mut FinalizerInterceptorContextMut<'_>,
1083 expected
1084 );
1085 }
1086
1087 #[tokio::test]
1088 #[traced_test]
1089 async fn test_modify_before_retry_loop_error_causes_jump_to_modify_before_completion() {
1090 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1091 interceptor_error_redirection_test!(
1092 modify_before_retry_loop,
1093 &mut BeforeTransmitInterceptorContextMut<'_>,
1094 modify_before_completion,
1095 &mut FinalizerInterceptorContextMut<'_>,
1096 expected
1097 );
1098 }
1099
1100 #[tokio::test]
1101 #[traced_test]
1102 async fn test_read_before_attempt_error_causes_jump_to_modify_before_attempt_completion() {
1103 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1104 interceptor_error_redirection_test!(
1105 read_before_attempt,
1106 &BeforeTransmitInterceptorContextRef<'_>,
1107 modify_before_attempt_completion,
1108 &mut FinalizerInterceptorContextMut<'_>,
1109 expected
1110 );
1111 }
1112
1113 #[tokio::test]
1114 #[traced_test]
1115 async fn test_modify_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1116 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1117 interceptor_error_redirection_test!(
1118 modify_before_signing,
1119 &mut BeforeTransmitInterceptorContextMut<'_>,
1120 modify_before_attempt_completion,
1121 &mut FinalizerInterceptorContextMut<'_>,
1122 expected
1123 );
1124 }
1125
1126 #[tokio::test]
1127 #[traced_test]
1128 async fn test_read_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1129 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1130 interceptor_error_redirection_test!(
1131 read_before_signing,
1132 &BeforeTransmitInterceptorContextRef<'_>,
1133 modify_before_attempt_completion,
1134 &mut FinalizerInterceptorContextMut<'_>,
1135 expected
1136 );
1137 }
1138
1139 #[tokio::test]
1140 #[traced_test]
1141 async fn test_read_after_signing_error_causes_jump_to_modify_before_attempt_completion() {
1142 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1143 interceptor_error_redirection_test!(
1144 read_after_signing,
1145 &BeforeTransmitInterceptorContextRef<'_>,
1146 modify_before_attempt_completion,
1147 &mut FinalizerInterceptorContextMut<'_>,
1148 expected
1149 );
1150 }
1151
1152 #[tokio::test]
1153 #[traced_test]
1154 async fn test_modify_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1155 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1156 interceptor_error_redirection_test!(
1157 modify_before_transmit,
1158 &mut BeforeTransmitInterceptorContextMut<'_>,
1159 modify_before_attempt_completion,
1160 &mut FinalizerInterceptorContextMut<'_>,
1161 expected
1162 );
1163 }
1164
1165 #[tokio::test]
1166 #[traced_test]
1167 async fn test_read_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1168 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1169 interceptor_error_redirection_test!(
1170 read_before_transmit,
1171 &BeforeTransmitInterceptorContextRef<'_>,
1172 modify_before_attempt_completion,
1173 &mut FinalizerInterceptorContextMut<'_>,
1174 expected
1175 );
1176 }
1177
1178 #[tokio::test]
1179 #[traced_test]
1180 async fn test_read_after_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1181 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1182 interceptor_error_redirection_test!(
1183 read_after_transmit,
1184 &BeforeDeserializationInterceptorContextRef<'_>,
1185 modify_before_attempt_completion,
1186 &mut FinalizerInterceptorContextMut<'_>,
1187 expected
1188 );
1189 }
1190
1191 #[tokio::test]
1192 #[traced_test]
1193 async fn test_modify_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1194 ) {
1195 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1196 interceptor_error_redirection_test!(
1197 modify_before_deserialization,
1198 &mut BeforeDeserializationInterceptorContextMut<'_>,
1199 modify_before_attempt_completion,
1200 &mut FinalizerInterceptorContextMut<'_>,
1201 expected
1202 );
1203 }
1204
1205 #[tokio::test]
1206 #[traced_test]
1207 async fn test_read_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1208 ) {
1209 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1210 interceptor_error_redirection_test!(
1211 read_before_deserialization,
1212 &BeforeDeserializationInterceptorContextRef<'_>,
1213 modify_before_attempt_completion,
1214 &mut FinalizerInterceptorContextMut<'_>,
1215 expected
1216 );
1217 }
1218
1219 #[tokio::test]
1220 #[traced_test]
1221 async fn test_read_after_deserialization_error_causes_jump_to_modify_before_attempt_completion()
1222 {
1223 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1224 interceptor_error_redirection_test!(
1225 read_after_deserialization,
1226 &AfterDeserializationInterceptorContextRef<'_>,
1227 modify_before_attempt_completion,
1228 &mut FinalizerInterceptorContextMut<'_>,
1229 expected
1230 );
1231 }
1232
1233 #[tokio::test]
1234 #[traced_test]
1235 async fn test_modify_before_attempt_completion_error_causes_jump_to_read_after_attempt() {
1236 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("DestinationInterceptor")"#;
1237 interceptor_error_redirection_test!(
1238 modify_before_attempt_completion,
1239 &mut FinalizerInterceptorContextMut<'_>,
1240 read_after_attempt,
1241 &FinalizerInterceptorContextRef<'_>,
1242 expected
1243 );
1244 }
1245
1246 #[tokio::test]
1247 #[traced_test]
1248 async fn test_modify_before_completion_error_causes_jump_to_read_after_execution() {
1249 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("DestinationInterceptor")"#;
1250 interceptor_error_redirection_test!(
1251 modify_before_completion,
1252 &mut FinalizerInterceptorContextMut<'_>,
1253 read_after_execution,
1254 &FinalizerInterceptorContextRef<'_>,
1255 expected
1256 );
1257 }
1258
1259 #[tokio::test]
1260 async fn test_stop_points() {
1261 let runtime_plugins = || {
1262 RuntimePlugins::new()
1263 .with_operation_plugin(TestOperationRuntimePlugin::new())
1264 .with_operation_plugin(NoAuthRuntimePluginV2::new())
1265 };
1266
1267 let context = invoke_with_stop_point(
1269 "test",
1270 "test",
1271 Input::doesnt_matter(),
1272 &runtime_plugins(),
1273 StopPoint::None,
1274 )
1275 .await
1276 .expect("success");
1277 assert!(context.response().is_some());
1278
1279 let context = invoke_with_stop_point(
1281 "test",
1282 "test",
1283 Input::doesnt_matter(),
1284 &runtime_plugins(),
1285 StopPoint::BeforeTransmit,
1286 )
1287 .await
1288 .expect("success");
1289 assert!(context.response().is_none());
1290 }
1291
1292 #[tokio::test]
1294 async fn test_stop_points_error_handling() {
1295 #[derive(Debug, Default)]
1296 struct Inner {
1297 modify_before_retry_loop_called: AtomicBool,
1298 modify_before_completion_called: AtomicBool,
1299 read_after_execution_called: AtomicBool,
1300 }
1301 #[derive(Clone, Debug, Default)]
1302 struct TestInterceptor {
1303 inner: Arc<Inner>,
1304 }
1305
1306 impl Intercept for TestInterceptor {
1307 fn name(&self) -> &'static str {
1308 "TestInterceptor"
1309 }
1310
1311 fn modify_before_retry_loop(
1312 &self,
1313 _context: &mut BeforeTransmitInterceptorContextMut<'_>,
1314 _rc: &RuntimeComponents,
1315 _cfg: &mut ConfigBag,
1316 ) -> Result<(), BoxError> {
1317 self.inner
1318 .modify_before_retry_loop_called
1319 .store(true, Ordering::Relaxed);
1320 Err("test error".into())
1321 }
1322
1323 fn modify_before_completion(
1324 &self,
1325 _context: &mut FinalizerInterceptorContextMut<'_>,
1326 _rc: &RuntimeComponents,
1327 _cfg: &mut ConfigBag,
1328 ) -> Result<(), BoxError> {
1329 self.inner
1330 .modify_before_completion_called
1331 .store(true, Ordering::Relaxed);
1332 Ok(())
1333 }
1334
1335 fn read_after_execution(
1336 &self,
1337 _context: &FinalizerInterceptorContextRef<'_>,
1338 _rc: &RuntimeComponents,
1339 _cfg: &mut ConfigBag,
1340 ) -> Result<(), BoxError> {
1341 self.inner
1342 .read_after_execution_called
1343 .store(true, Ordering::Relaxed);
1344 Ok(())
1345 }
1346 }
1347
1348 #[derive(Debug)]
1349 struct TestInterceptorRuntimePlugin {
1350 builder: RuntimeComponentsBuilder,
1351 }
1352
1353 impl RuntimePlugin for TestInterceptorRuntimePlugin {
1354 fn runtime_components(
1355 &self,
1356 _: &RuntimeComponentsBuilder,
1357 ) -> Cow<'_, RuntimeComponentsBuilder> {
1358 Cow::Borrowed(&self.builder)
1359 }
1360 }
1361
1362 let interceptor = TestInterceptor::default();
1363 let client = NeverClient::new();
1364 let runtime_plugins = || {
1365 RuntimePlugins::new()
1366 .with_operation_plugin(TestOperationRuntimePlugin::new())
1367 .with_operation_plugin(NoAuthRuntimePluginV2::new())
1368 .with_operation_plugin(TestInterceptorRuntimePlugin {
1369 builder: RuntimeComponentsBuilder::new("test")
1370 .with_interceptor(SharedInterceptor::new(interceptor.clone()))
1371 .with_http_client(Some(client.clone())),
1372 })
1373 };
1374
1375 let _err = invoke_with_stop_point(
1377 "test",
1378 "test",
1379 Input::doesnt_matter(),
1380 &runtime_plugins(),
1381 StopPoint::BeforeTransmit,
1382 )
1383 .await
1384 .expect_err("an error was returned");
1385 assert_eq!(client.num_calls(), 0);
1386
1387 assert!(interceptor
1388 .inner
1389 .modify_before_retry_loop_called
1390 .load(Ordering::Relaxed));
1391 assert!(interceptor
1392 .inner
1393 .modify_before_completion_called
1394 .load(Ordering::Relaxed));
1395 assert!(interceptor
1396 .inner
1397 .read_after_execution_called
1398 .load(Ordering::Relaxed));
1399 }
1400}