1use crate::client::interceptors::Interceptors;
7use crate::client::orchestrator::http::{log_response_body, read_body};
8use crate::client::timeout::{MaybeTimeout, MaybeTimeoutConfig, TimeoutKind};
9use crate::client::{
10 http::body::minimum_throughput::MaybeUploadThroughputCheckFuture,
11 orchestrator::endpoints::orchestrate_endpoint,
12};
13use auth::{resolve_identity, sign_request};
14use aws_smithy_async::rt::sleep::AsyncSleep;
15use aws_smithy_runtime_api::box_error::BoxError;
16use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnector, HttpConnectorSettings};
17use aws_smithy_runtime_api::client::interceptors::context::{
18 Error, Input, InterceptorContext, Output, RewindResult,
19};
20use aws_smithy_runtime_api::client::orchestrator::{
21 HttpResponse, LoadedRequestBody, OrchestratorError,
22};
23use aws_smithy_runtime_api::client::result::SdkError;
24use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
25use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
26use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
27use aws_smithy_runtime_api::client::ser_de::{
28 DeserializeResponse, SerializeRequest, SharedRequestSerializer, SharedResponseDeserializer,
29};
30use aws_smithy_types::body::SdkBody;
31use aws_smithy_types::byte_stream::ByteStream;
32use aws_smithy_types::config_bag::ConfigBag;
33use aws_smithy_types::timeout::{MergeTimeoutConfig, TimeoutConfig};
34use endpoints::apply_endpoint;
35use std::mem;
36use tracing::{debug, debug_span, instrument, trace, Instrument};
37
38mod auth;
39pub use auth::AuthSchemeAndEndpointOrchestrationV2;
40
41pub mod endpoints;
43
44mod http;
46
47pub mod operation;
49
50macro_rules! halt {
51 ([$ctx:ident] => $err:expr) => {{
52 debug!("encountered orchestrator error; halting");
53 $ctx.fail($err.into());
54 return;
55 }};
56}
57
58macro_rules! halt_on_err {
59 ([$ctx:ident] => $expr:expr) => {
60 match $expr {
61 Ok(ok) => ok,
62 Err(err) => halt!([$ctx] => err),
63 }
64 };
65}
66
67macro_rules! continue_on_err {
68 ([$ctx:ident] => $expr:expr) => {
69 if let Err(err) = $expr {
70 debug!(err = ?err, "encountered orchestrator error; continuing");
71 $ctx.fail(err.into());
72 }
73 };
74}
75
76macro_rules! run_interceptors {
77 (continue_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
78 $(run_interceptors!(continue_on_err: $interceptor($ctx, $rc, $cfg));)+
79 };
80 (continue_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
81 continue_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
82 };
83 (halt_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
84 $(run_interceptors!(halt_on_err: $interceptor($ctx, $rc, $cfg));)+
85 };
86 (halt_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
87 halt_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
88 };
89 (__private $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
90 Interceptors::new($rc.interceptors()).$interceptor($ctx, $rc, $cfg)
91 };
92}
93
94pub async fn invoke(
104 service_name: &str,
105 operation_name: &str,
106 input: Input,
107 runtime_plugins: &RuntimePlugins,
108) -> Result<Output, SdkError<Error, HttpResponse>> {
109 invoke_with_stop_point(
110 service_name,
111 operation_name,
112 input,
113 runtime_plugins,
114 StopPoint::None,
115 )
116 .await?
117 .finalize()
118}
119
120#[non_exhaustive]
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub enum StopPoint {
124 None,
126
127 BeforeTransmit,
129}
130
131pub async fn invoke_with_stop_point(
138 _service_name: &str,
142 _operation_name: &str,
143 input: Input,
144 runtime_plugins: &RuntimePlugins,
145 stop_point: StopPoint,
146) -> Result<InterceptorContext, SdkError<Error, HttpResponse>> {
147 async move {
148 let mut cfg = ConfigBag::base();
149 let cfg = &mut cfg;
150
151 let mut ctx = InterceptorContext::new(input);
152
153 let runtime_components = apply_configuration(&mut ctx, cfg, runtime_plugins)
154 .map_err(SdkError::construction_failure)?;
155 trace!(runtime_components = ?runtime_components);
156
157 let operation_timeout_config =
158 MaybeTimeoutConfig::new(&runtime_components, cfg, TimeoutKind::Operation);
159 trace!(operation_timeout_config = ?operation_timeout_config);
160 async {
161 if !ctx.is_failed() {
164 try_op(&mut ctx, cfg, &runtime_components, stop_point).await;
165 }
166 finally_op(&mut ctx, cfg, &runtime_components).await;
167 if ctx.is_failed() {
168 Err(ctx.finalize().expect_err("it is failed"))
169 } else {
170 Ok(ctx)
171 }
172 }
173 .maybe_timeout(operation_timeout_config)
174 .await
175 }
176 .await
177}
178
179#[instrument(skip_all, level = "debug")]
183fn apply_configuration(
184 ctx: &mut InterceptorContext,
185 cfg: &mut ConfigBag,
186 runtime_plugins: &RuntimePlugins,
187) -> Result<RuntimeComponents, BoxError> {
188 let client_rc_builder = runtime_plugins.apply_client_configuration(cfg)?;
189 continue_on_err!([ctx] => Interceptors::new(client_rc_builder.interceptors()).read_before_execution(false, ctx, cfg));
190
191 let operation_rc_builder = runtime_plugins.apply_operation_configuration(cfg)?;
192 continue_on_err!([ctx] => Interceptors::new(operation_rc_builder.interceptors()).read_before_execution(true, ctx, cfg));
193
194 let components = RuntimeComponents::builder("merged orchestrator components")
196 .merge_from(&client_rc_builder)
197 .merge_from(&operation_rc_builder)
198 .build()?;
199
200 let resolved_timeout_config = cfg.load::<MergeTimeoutConfig>();
204 debug!(
205 "timeout settings for this operation: {:?}",
206 resolved_timeout_config
207 );
208 cfg.interceptor_state().store_put(resolved_timeout_config);
209
210 components.validate_final_config(cfg)?;
211 Ok(components)
212}
213
214#[instrument(skip_all, level = "debug")]
215async fn try_op(
216 ctx: &mut InterceptorContext,
217 cfg: &mut ConfigBag,
218 runtime_components: &RuntimeComponents,
219 stop_point: StopPoint,
220) {
221 run_interceptors!(halt_on_err: {
223 modify_before_serialization(ctx, runtime_components, cfg);
224 read_before_serialization(ctx, runtime_components, cfg);
225 });
226
227 ctx.enter_serialization_phase();
229 {
230 let _span = debug_span!("serialization").entered();
231 let request_serializer = cfg
232 .load::<SharedRequestSerializer>()
233 .expect("request serializer must be in the config bag")
234 .clone();
235 let input = ctx.take_input().expect("input set at this point");
236 let request = halt_on_err!([ctx] => request_serializer.serialize_input(input, cfg).map_err(OrchestratorError::other));
237 ctx.set_request(request);
238 }
239
240 if let Some(&LoadedRequestBody::Requested) = cfg.load::<LoadedRequestBody>() {
242 debug!("loading request body into memory");
243 let mut body = SdkBody::taken();
244 mem::swap(&mut body, ctx.request_mut().expect("set above").body_mut());
245 let loaded_body = halt_on_err!([ctx] =>
246 ByteStream::new(body).collect().await.map_err(OrchestratorError::other)
247 )
248 .into_bytes();
249 *ctx.request_mut().as_mut().expect("set above").body_mut() =
250 SdkBody::from(loaded_body.clone());
251 cfg.interceptor_state()
252 .store_put(LoadedRequestBody::Loaded(loaded_body));
253 }
254
255 ctx.enter_before_transmit_phase();
257 run_interceptors!(halt_on_err: {
258 read_after_serialization(ctx, runtime_components, cfg);
259 modify_before_retry_loop(ctx, runtime_components, cfg);
260 });
261
262 let should_attempt = runtime_components
265 .retry_strategy()
266 .should_attempt_initial_request(runtime_components, cfg);
267 match should_attempt {
268 Ok(ShouldAttempt::Yes) => debug!("retry strategy has OKed initial request"),
270 Ok(ShouldAttempt::No) => {
272 let err: BoxError = "the retry strategy indicates that an initial request shouldn't be made, but it didn't specify why".into();
273 halt!([ctx] => OrchestratorError::other(err));
274 }
275 Err(err) => halt!([ctx] => OrchestratorError::other(err)),
277 Ok(ShouldAttempt::YesAfterDelay(delay)) => {
278 let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
279 "the retry strategy requested a delay before sending the initial request, but no 'async sleep' implementation was set"
280 )));
281 debug!("retry strategy has OKed initial request after a {delay:?} delay");
282 sleep_impl.sleep(delay).await;
283 }
284 }
285
286 ctx.save_checkpoint();
289 let mut retry_delay = None;
290 for i in 1u32.. {
291 trace!("checking if context can be rewound for attempt #{i}");
294 if let RewindResult::Impossible = ctx.rewind(cfg) {
295 debug!("request cannot be retried since the request body cannot be cloned");
296 break;
297 }
298 cfg.interceptor_state()
300 .store_put::<RequestAttempts>(i.into());
301 if let Some((delay, sleep)) = retry_delay.take() {
303 debug!("delaying for {delay:?}");
304 sleep.await;
305 }
306 let attempt_timeout_config =
307 MaybeTimeoutConfig::new(runtime_components, cfg, TimeoutKind::OperationAttempt);
308 trace!(attempt_timeout_config = ?attempt_timeout_config);
309 let maybe_timeout = async {
310 debug!("beginning attempt #{i}");
311 try_attempt(ctx, cfg, runtime_components, stop_point)
312 .instrument(debug_span!("try_attempt", "attempt" = i))
313 .await;
314 finally_attempt(ctx, cfg, runtime_components)
315 .instrument(debug_span!("finally_attempt", "attempt" = i))
316 .await;
317 Result::<_, SdkError<Error, HttpResponse>>::Ok(())
318 }
319 .maybe_timeout(attempt_timeout_config)
320 .await
321 .map_err(|err| OrchestratorError::timeout(err.into_source().unwrap()));
322
323 continue_on_err!([ctx] => maybe_timeout);
325
326 let should_attempt = halt_on_err!([ctx] => runtime_components
329 .retry_strategy()
330 .should_attempt_retry(ctx, runtime_components, cfg)
331 .map_err(OrchestratorError::other));
332 match should_attempt {
333 ShouldAttempt::Yes => continue,
335 ShouldAttempt::No => {
337 debug!("a retry is either unnecessary or not possible, exiting attempt loop");
338 break;
339 }
340 ShouldAttempt::YesAfterDelay(delay) => {
341 let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
342 "the retry strategy requested a delay before sending the retry request, but no 'async sleep' implementation was set"
343 )));
344 retry_delay = Some((delay, sleep_impl.sleep(delay)));
345 continue;
346 }
347 }
348 }
349}
350
351async fn try_attempt(
352 ctx: &mut InterceptorContext,
353 cfg: &mut ConfigBag,
354 runtime_components: &RuntimeComponents,
355 stop_point: StopPoint,
356) {
357 run_interceptors!(halt_on_err: read_before_attempt(ctx, runtime_components, cfg));
358
359 let (scheme_id, identity, endpoint) = halt_on_err!([ctx] => resolve_identity(runtime_components, cfg).await.map_err(OrchestratorError::other));
360
361 match endpoint {
362 Some(endpoint) => {
363 halt_on_err!([ctx] => apply_endpoint(&endpoint, ctx, cfg).map_err(OrchestratorError::other));
366 cfg.interceptor_state().store_put(endpoint);
368 }
369 None => {
370 halt_on_err!([ctx] => orchestrate_endpoint(identity.clone(), ctx, runtime_components, cfg)
371 .instrument(debug_span!("orchestrate_endpoint"))
372 .await
373 .map_err(OrchestratorError::other));
374 }
375 }
376
377 run_interceptors!(halt_on_err: {
378 modify_before_signing(ctx, runtime_components, cfg);
379 read_before_signing(ctx, runtime_components, cfg);
380 });
381
382 halt_on_err!([ctx] => sign_request(&scheme_id, &identity, ctx, runtime_components, cfg).map_err(OrchestratorError::other));
383
384 run_interceptors!(halt_on_err: {
385 read_after_signing(ctx, runtime_components, cfg);
386 modify_before_transmit(ctx, runtime_components, cfg);
387 read_before_transmit(ctx, runtime_components, cfg);
388 });
389
390 if let StopPoint::BeforeTransmit = stop_point {
392 debug!("ending orchestration early because the stop point is `BeforeTransmit`");
393 return;
394 }
395
396 ctx.enter_transmit_phase();
399 let response = halt_on_err!([ctx] => {
400 let request = ctx.take_request().expect("set during serialization");
401 trace!(request = ?request, "transmitting request");
402 let http_client = halt_on_err!([ctx] => runtime_components.http_client().ok_or_else(||
403 OrchestratorError::other("No HTTP client was available to send this request. \
404 Enable the `default-https-client` crate feature or configure an HTTP client to fix this.")
405 ));
406 let timeout_config = cfg.load::<TimeoutConfig>().expect("timeout config must be set");
407 let settings = {
408 let mut builder = HttpConnectorSettings::builder();
409 builder.set_connect_timeout(timeout_config.connect_timeout());
410 builder.set_read_timeout(timeout_config.read_timeout());
411 builder.build()
412 };
413 let connector = http_client.http_connector(&settings, runtime_components);
414 let response_future = MaybeUploadThroughputCheckFuture::new(
415 cfg,
416 runtime_components,
417 connector.call(request),
418 );
419 response_future.await.map_err(OrchestratorError::connector)
420 });
421 trace!(response = ?response, "received response from service");
422 ctx.set_response(response);
423 ctx.enter_before_deserialization_phase();
424
425 run_interceptors!(halt_on_err: {
426 read_after_transmit(ctx, runtime_components, cfg);
427 modify_before_deserialization(ctx, runtime_components, cfg);
428 read_before_deserialization(ctx, runtime_components, cfg);
429 });
430
431 ctx.enter_deserialization_phase();
432 let output_or_error = async {
433 let response = ctx.response_mut().expect("set during transmit");
434 let response_deserializer = cfg
435 .load::<SharedResponseDeserializer>()
436 .expect("a request deserializer must be in the config bag");
437 let maybe_deserialized = {
438 let _span = debug_span!("deserialize_streaming").entered();
439 response_deserializer.deserialize_streaming(response)
440 };
441 match maybe_deserialized {
442 Some(output_or_error) => output_or_error,
443 None => read_body(response)
444 .instrument(debug_span!("read_body"))
445 .await
446 .map_err(OrchestratorError::response)
447 .and_then(|_| {
448 let _span = debug_span!("deserialize_nonstreaming").entered();
449 log_response_body(response, cfg);
450 response_deserializer.deserialize_nonstreaming(response)
451 }),
452 }
453 }
454 .instrument(debug_span!("deserialization"))
455 .await;
456 trace!(output_or_error = ?output_or_error);
457 ctx.set_output_or_error(output_or_error);
458
459 ctx.enter_after_deserialization_phase();
460 run_interceptors!(halt_on_err: read_after_deserialization(ctx, runtime_components, cfg));
461}
462
463async fn finally_attempt(
464 ctx: &mut InterceptorContext,
465 cfg: &mut ConfigBag,
466 runtime_components: &RuntimeComponents,
467) {
468 run_interceptors!(continue_on_err: {
469 modify_before_attempt_completion(ctx, runtime_components, cfg);
470 read_after_attempt(ctx, runtime_components, cfg);
471 });
472}
473
474#[instrument(skip_all, level = "debug")]
475async fn finally_op(
476 ctx: &mut InterceptorContext,
477 cfg: &mut ConfigBag,
478 runtime_components: &RuntimeComponents,
479) {
480 run_interceptors!(continue_on_err: {
481 modify_before_completion(ctx, runtime_components, cfg);
482 read_after_execution(ctx, runtime_components, cfg);
483 });
484}
485
486#[cfg(all(test, any(feature = "test-util", feature = "legacy-test-util")))]
487mod tests {
488 use crate::client::auth::no_auth::{NoAuthRuntimePlugin, NO_AUTH_SCHEME_ID};
489 use crate::client::orchestrator::endpoints::StaticUriEndpointResolver;
490 use crate::client::orchestrator::{invoke, invoke_with_stop_point, StopPoint};
491 use crate::client::retries::strategy::NeverRetryStrategy;
492 use crate::client::test_util::{
493 deserializer::CannedResponseDeserializer, serializer::CannedRequestSerializer,
494 };
495 use aws_smithy_http_client::test_util::NeverClient;
496 use aws_smithy_runtime_api::box_error::BoxError;
497 use aws_smithy_runtime_api::client::auth::static_resolver::StaticAuthSchemeOptionResolver;
498 use aws_smithy_runtime_api::client::auth::{
499 AuthSchemeOptionResolverParams, SharedAuthSchemeOptionResolver,
500 };
501 use aws_smithy_runtime_api::client::endpoint::{
502 EndpointResolverParams, SharedEndpointResolver,
503 };
504 use aws_smithy_runtime_api::client::http::{
505 http_client_fn, HttpConnector, HttpConnectorFuture,
506 };
507 use aws_smithy_runtime_api::client::interceptors::context::{
508 AfterDeserializationInterceptorContextRef, BeforeDeserializationInterceptorContextMut,
509 BeforeDeserializationInterceptorContextRef, BeforeSerializationInterceptorContextMut,
510 BeforeSerializationInterceptorContextRef, BeforeTransmitInterceptorContextMut,
511 BeforeTransmitInterceptorContextRef, FinalizerInterceptorContextMut,
512 FinalizerInterceptorContextRef, Input, Output,
513 };
514 use aws_smithy_runtime_api::client::interceptors::{Intercept, SharedInterceptor};
515 use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, OrchestratorError};
516 use aws_smithy_runtime_api::client::retries::SharedRetryStrategy;
517 use aws_smithy_runtime_api::client::runtime_components::{
518 RuntimeComponents, RuntimeComponentsBuilder,
519 };
520 use aws_smithy_runtime_api::client::runtime_plugin::{RuntimePlugin, RuntimePlugins};
521 use aws_smithy_runtime_api::client::ser_de::{
522 SharedRequestSerializer, SharedResponseDeserializer,
523 };
524 use aws_smithy_runtime_api::shared::IntoShared;
525 use aws_smithy_types::body::SdkBody;
526 use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
527 use aws_smithy_types::timeout::TimeoutConfig;
528 use http_02x::{Response, StatusCode};
529 use std::borrow::Cow;
530 use std::sync::atomic::{AtomicBool, Ordering};
531 use std::sync::Arc;
532 use tracing_test::traced_test;
533
534 fn new_request_serializer() -> CannedRequestSerializer {
535 CannedRequestSerializer::success(HttpRequest::empty())
536 }
537
538 fn new_response_deserializer() -> CannedResponseDeserializer {
539 CannedResponseDeserializer::new(
540 Response::builder()
541 .status(StatusCode::OK)
542 .body(SdkBody::empty())
543 .map_err(|err| OrchestratorError::other(Box::new(err)))
544 .map(Output::erase),
545 )
546 }
547
548 #[derive(Debug, Default)]
549 struct OkConnector {}
550
551 impl OkConnector {
552 fn new() -> Self {
553 Self::default()
554 }
555 }
556
557 impl HttpConnector for OkConnector {
558 fn call(&self, _request: HttpRequest) -> HttpConnectorFuture {
559 HttpConnectorFuture::ready(Ok(http_02x::Response::builder()
560 .status(200)
561 .body(SdkBody::empty())
562 .expect("OK response is valid")
563 .try_into()
564 .unwrap()))
565 }
566 }
567
568 #[derive(Debug)]
569 struct TestOperationRuntimePlugin {
570 builder: RuntimeComponentsBuilder,
571 }
572
573 impl TestOperationRuntimePlugin {
574 fn new() -> Self {
575 Self {
576 builder: RuntimeComponentsBuilder::for_tests()
577 .with_retry_strategy(Some(SharedRetryStrategy::new(NeverRetryStrategy::new())))
578 .with_endpoint_resolver(Some(SharedEndpointResolver::new(
579 StaticUriEndpointResolver::http_localhost(8080),
580 )))
581 .with_http_client(Some(http_client_fn(|_, _| {
582 OkConnector::new().into_shared()
583 })))
584 .with_auth_scheme_option_resolver(Some(SharedAuthSchemeOptionResolver::new(
585 StaticAuthSchemeOptionResolver::new(vec![NO_AUTH_SCHEME_ID]),
586 ))),
587 }
588 }
589 }
590
591 impl RuntimePlugin for TestOperationRuntimePlugin {
592 fn config(&self) -> Option<FrozenLayer> {
593 let mut layer = Layer::new("TestOperationRuntimePlugin");
594 layer.store_put(AuthSchemeOptionResolverParams::new("idontcare"));
595 layer.store_put(EndpointResolverParams::new("dontcare"));
596 layer.store_put(SharedRequestSerializer::new(new_request_serializer()));
597 layer.store_put(SharedResponseDeserializer::new(new_response_deserializer()));
598 layer.store_put(TimeoutConfig::builder().build());
599 Some(layer.freeze())
600 }
601
602 fn runtime_components(
603 &self,
604 _: &RuntimeComponentsBuilder,
605 ) -> Cow<'_, RuntimeComponentsBuilder> {
606 Cow::Borrowed(&self.builder)
607 }
608 }
609
610 macro_rules! interceptor_error_handling_test {
611 (read_before_execution, $ctx:ty, $expected:expr,) => {
612 interceptor_error_handling_test!(__private read_before_execution, $ctx, $expected,);
613 };
614 ($interceptor:ident, $ctx:ty, $expected:expr) => {
615 interceptor_error_handling_test!(__private $interceptor, $ctx, $expected, _rc: &RuntimeComponents,);
616 };
617 (__private $interceptor:ident, $ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
618 #[derive(Debug)]
619 struct FailingInterceptorA;
620 impl Intercept for FailingInterceptorA {
621 fn name(&self) -> &'static str { "FailingInterceptorA" }
622
623 fn $interceptor(
624 &self,
625 _ctx: $ctx,
626 $($rc_arg)*
627 _cfg: &mut ConfigBag,
628 ) -> Result<(), BoxError> {
629 tracing::debug!("FailingInterceptorA called!");
630 Err("FailingInterceptorA".into())
631 }
632 }
633
634 #[derive(Debug)]
635 struct FailingInterceptorB;
636 impl Intercept for FailingInterceptorB {
637 fn name(&self) -> &'static str { "FailingInterceptorB" }
638
639 fn $interceptor(
640 &self,
641 _ctx: $ctx,
642 $($rc_arg)*
643 _cfg: &mut ConfigBag,
644 ) -> Result<(), BoxError> {
645 tracing::debug!("FailingInterceptorB called!");
646 Err("FailingInterceptorB".into())
647 }
648 }
649
650 #[derive(Debug)]
651 struct FailingInterceptorC;
652 impl Intercept for FailingInterceptorC {
653 fn name(&self) -> &'static str { "FailingInterceptorC" }
654
655 fn $interceptor(
656 &self,
657 _ctx: $ctx,
658 $($rc_arg)*
659 _cfg: &mut ConfigBag,
660 ) -> Result<(), BoxError> {
661 tracing::debug!("FailingInterceptorC called!");
662 Err("FailingInterceptorC".into())
663 }
664 }
665
666 #[derive(Debug)]
667 struct FailingInterceptorsClientRuntimePlugin(RuntimeComponentsBuilder);
668 impl FailingInterceptorsClientRuntimePlugin {
669 fn new() -> Self {
670 Self(RuntimeComponentsBuilder::new("test").with_interceptor(SharedInterceptor::new(FailingInterceptorA)))
671 }
672 }
673 impl RuntimePlugin for FailingInterceptorsClientRuntimePlugin {
674 fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
675 Cow::Borrowed(&self.0)
676 }
677 }
678
679 #[derive(Debug)]
680 struct FailingInterceptorsOperationRuntimePlugin(RuntimeComponentsBuilder);
681 impl FailingInterceptorsOperationRuntimePlugin {
682 fn new() -> Self {
683 Self(
684 RuntimeComponentsBuilder::new("test")
685 .with_interceptor(SharedInterceptor::new(FailingInterceptorB))
686 .with_interceptor(SharedInterceptor::new(FailingInterceptorC))
687 )
688 }
689 }
690 impl RuntimePlugin for FailingInterceptorsOperationRuntimePlugin {
691 fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
692 Cow::Borrowed(&self.0)
693 }
694 }
695
696 let input = Input::doesnt_matter();
697 let runtime_plugins = RuntimePlugins::new()
698 .with_client_plugin(FailingInterceptorsClientRuntimePlugin::new())
699 .with_operation_plugin(TestOperationRuntimePlugin::new())
700 .with_operation_plugin(NoAuthRuntimePlugin::new())
701 .with_operation_plugin(FailingInterceptorsOperationRuntimePlugin::new());
702 let actual = invoke("test", "test", input, &runtime_plugins)
703 .await
704 .expect_err("should error");
705 let actual = format!("{:?}", actual);
706 assert!(
707 actual.starts_with(&$expected),
708 "\nActual error: {actual}\nShould start with: {}\n",
709 $expected
710 );
711
712 assert!(logs_contain("FailingInterceptorA called!"));
713 assert!(logs_contain("FailingInterceptorB called!"));
714 assert!(logs_contain("FailingInterceptorC called!"));
715 };
716 }
717
718 #[tokio::test]
719 #[traced_test]
720 async fn test_read_before_execution_error_handling() {
721 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeExecution, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
722 interceptor_error_handling_test!(
723 read_before_execution,
724 &BeforeSerializationInterceptorContextRef<'_>,
725 expected,
726 );
727 }
728
729 #[tokio::test]
730 #[traced_test]
731 async fn test_modify_before_serialization_error_handling() {
732 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
733 interceptor_error_handling_test!(
734 modify_before_serialization,
735 &mut BeforeSerializationInterceptorContextMut<'_>,
736 expected
737 );
738 }
739
740 #[tokio::test]
741 #[traced_test]
742 async fn test_read_before_serialization_error_handling() {
743 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
744 interceptor_error_handling_test!(
745 read_before_serialization,
746 &BeforeSerializationInterceptorContextRef<'_>,
747 expected
748 );
749 }
750
751 #[tokio::test]
752 #[traced_test]
753 async fn test_read_after_serialization_error_handling() {
754 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSerialization, interceptor_name: Some("FailingInterceptorC")"#.to_string();
755 interceptor_error_handling_test!(
756 read_after_serialization,
757 &BeforeTransmitInterceptorContextRef<'_>,
758 expected
759 );
760 }
761
762 #[tokio::test]
763 #[traced_test]
764 async fn test_modify_before_retry_loop_error_handling() {
765 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeRetryLoop, interceptor_name: Some("FailingInterceptorC")"#.to_string();
766 interceptor_error_handling_test!(
767 modify_before_retry_loop,
768 &mut BeforeTransmitInterceptorContextMut<'_>,
769 expected
770 );
771 }
772
773 #[tokio::test]
774 #[traced_test]
775 async fn test_read_before_attempt_error_handling() {
776 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeAttempt, interceptor_name: Some("FailingInterceptorC")"#;
777 interceptor_error_handling_test!(
778 read_before_attempt,
779 &BeforeTransmitInterceptorContextRef<'_>,
780 expected
781 );
782 }
783
784 #[tokio::test]
785 #[traced_test]
786 async fn test_modify_before_signing_error_handling() {
787 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
788 interceptor_error_handling_test!(
789 modify_before_signing,
790 &mut BeforeTransmitInterceptorContextMut<'_>,
791 expected
792 );
793 }
794
795 #[tokio::test]
796 #[traced_test]
797 async fn test_read_before_signing_error_handling() {
798 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
799 interceptor_error_handling_test!(
800 read_before_signing,
801 &BeforeTransmitInterceptorContextRef<'_>,
802 expected
803 );
804 }
805
806 #[tokio::test]
807 #[traced_test]
808 async fn test_read_after_signing_error_handling() {
809 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSigning, interceptor_name: Some("FailingInterceptorC")"#;
810 interceptor_error_handling_test!(
811 read_after_signing,
812 &BeforeTransmitInterceptorContextRef<'_>,
813 expected
814 );
815 }
816
817 #[tokio::test]
818 #[traced_test]
819 async fn test_modify_before_transmit_error_handling() {
820 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
821 interceptor_error_handling_test!(
822 modify_before_transmit,
823 &mut BeforeTransmitInterceptorContextMut<'_>,
824 expected
825 );
826 }
827
828 #[tokio::test]
829 #[traced_test]
830 async fn test_read_before_transmit_error_handling() {
831 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
832 interceptor_error_handling_test!(
833 read_before_transmit,
834 &BeforeTransmitInterceptorContextRef<'_>,
835 expected
836 );
837 }
838
839 #[tokio::test]
840 #[traced_test]
841 async fn test_read_after_transmit_error_handling() {
842 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterTransmit, interceptor_name: Some("FailingInterceptorC")"#;
843 interceptor_error_handling_test!(
844 read_after_transmit,
845 &BeforeDeserializationInterceptorContextRef<'_>,
846 expected
847 );
848 }
849
850 #[tokio::test]
851 #[traced_test]
852 async fn test_modify_before_deserialization_error_handling() {
853 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
854 interceptor_error_handling_test!(
855 modify_before_deserialization,
856 &mut BeforeDeserializationInterceptorContextMut<'_>,
857 expected
858 );
859 }
860
861 #[tokio::test]
862 #[traced_test]
863 async fn test_read_before_deserialization_error_handling() {
864 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
865 interceptor_error_handling_test!(
866 read_before_deserialization,
867 &BeforeDeserializationInterceptorContextRef<'_>,
868 expected
869 );
870 }
871
872 #[tokio::test]
873 #[traced_test]
874 async fn test_read_after_deserialization_error_handling() {
875 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
876 interceptor_error_handling_test!(
877 read_after_deserialization,
878 &AfterDeserializationInterceptorContextRef<'_>,
879 expected
880 );
881 }
882
883 #[tokio::test]
884 #[traced_test]
885 async fn test_modify_before_attempt_completion_error_handling() {
886 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("FailingInterceptorC")"#;
887 interceptor_error_handling_test!(
888 modify_before_attempt_completion,
889 &mut FinalizerInterceptorContextMut<'_>,
890 expected
891 );
892 }
893
894 #[tokio::test]
895 #[traced_test]
896 async fn test_read_after_attempt_error_handling() {
897 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("FailingInterceptorC")"#;
898 interceptor_error_handling_test!(
899 read_after_attempt,
900 &FinalizerInterceptorContextRef<'_>,
901 expected
902 );
903 }
904
905 #[tokio::test]
906 #[traced_test]
907 async fn test_modify_before_completion_error_handling() {
908 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("FailingInterceptorC")"#;
909 interceptor_error_handling_test!(
910 modify_before_completion,
911 &mut FinalizerInterceptorContextMut<'_>,
912 expected
913 );
914 }
915
916 #[tokio::test]
917 #[traced_test]
918 async fn test_read_after_execution_error_handling() {
919 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("FailingInterceptorC")"#;
920 interceptor_error_handling_test!(
921 read_after_execution,
922 &FinalizerInterceptorContextRef<'_>,
923 expected
924 );
925 }
926
927 macro_rules! interceptor_error_redirection_test {
928 (read_before_execution, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
929 interceptor_error_redirection_test!(__private read_before_execution, $origin_ctx, $destination_interceptor, $destination_ctx, $expected,);
930 };
931 ($origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
932 interceptor_error_redirection_test!(__private $origin_interceptor, $origin_ctx, $destination_interceptor, $destination_ctx, $expected, _rc: &RuntimeComponents,);
933 };
934 (__private $origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
935 #[derive(Debug)]
936 struct OriginInterceptor;
937 impl Intercept for OriginInterceptor {
938 fn name(&self) -> &'static str { "OriginInterceptor" }
939
940 fn $origin_interceptor(
941 &self,
942 _ctx: $origin_ctx,
943 $($rc_arg)*
944 _cfg: &mut ConfigBag,
945 ) -> Result<(), BoxError> {
946 tracing::debug!("OriginInterceptor called!");
947 Err("OriginInterceptor".into())
948 }
949 }
950
951 #[derive(Debug)]
952 struct DestinationInterceptor;
953 impl Intercept for DestinationInterceptor {
954 fn name(&self) -> &'static str { "DestinationInterceptor" }
955
956 fn $destination_interceptor(
957 &self,
958 _ctx: $destination_ctx,
959 _runtime_components: &RuntimeComponents,
960 _cfg: &mut ConfigBag,
961 ) -> Result<(), BoxError> {
962 tracing::debug!("DestinationInterceptor called!");
963 Err("DestinationInterceptor".into())
964 }
965 }
966
967 #[derive(Debug)]
968 struct InterceptorsTestOperationRuntimePlugin(RuntimeComponentsBuilder);
969 impl InterceptorsTestOperationRuntimePlugin {
970 fn new() -> Self {
971 Self(
972 RuntimeComponentsBuilder::new("test")
973 .with_interceptor(SharedInterceptor::new(OriginInterceptor))
974 .with_interceptor(SharedInterceptor::new(DestinationInterceptor))
975 )
976 }
977 }
978 impl RuntimePlugin for InterceptorsTestOperationRuntimePlugin {
979 fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
980 Cow::Borrowed(&self.0)
981 }
982 }
983
984 let input = Input::doesnt_matter();
985 let runtime_plugins = RuntimePlugins::new()
986 .with_operation_plugin(TestOperationRuntimePlugin::new())
987 .with_operation_plugin(NoAuthRuntimePlugin::new())
988 .with_operation_plugin(InterceptorsTestOperationRuntimePlugin::new());
989 let actual = invoke("test", "test", input, &runtime_plugins)
990 .await
991 .expect_err("should error");
992 let actual = format!("{:?}", actual);
993 assert!(
994 actual.starts_with(&$expected),
995 "\nActual error: {actual}\nShould start with: {}\n",
996 $expected
997 );
998
999 assert!(logs_contain("OriginInterceptor called!"));
1000 assert!(logs_contain("DestinationInterceptor called!"));
1001 };
1002 }
1003
1004 #[tokio::test]
1005 #[traced_test]
1006 async fn test_read_before_execution_error_causes_jump_to_modify_before_completion() {
1007 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1008 interceptor_error_redirection_test!(
1009 read_before_execution,
1010 &BeforeSerializationInterceptorContextRef<'_>,
1011 modify_before_completion,
1012 &mut FinalizerInterceptorContextMut<'_>,
1013 expected
1014 );
1015 }
1016
1017 #[tokio::test]
1018 #[traced_test]
1019 async fn test_modify_before_serialization_error_causes_jump_to_modify_before_completion() {
1020 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1021 interceptor_error_redirection_test!(
1022 modify_before_serialization,
1023 &mut BeforeSerializationInterceptorContextMut<'_>,
1024 modify_before_completion,
1025 &mut FinalizerInterceptorContextMut<'_>,
1026 expected
1027 );
1028 }
1029
1030 #[tokio::test]
1031 #[traced_test]
1032 async fn test_read_before_serialization_error_causes_jump_to_modify_before_completion() {
1033 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1034 interceptor_error_redirection_test!(
1035 read_before_serialization,
1036 &BeforeSerializationInterceptorContextRef<'_>,
1037 modify_before_completion,
1038 &mut FinalizerInterceptorContextMut<'_>,
1039 expected
1040 );
1041 }
1042
1043 #[tokio::test]
1044 #[traced_test]
1045 async fn test_read_after_serialization_error_causes_jump_to_modify_before_completion() {
1046 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1047 interceptor_error_redirection_test!(
1048 read_after_serialization,
1049 &BeforeTransmitInterceptorContextRef<'_>,
1050 modify_before_completion,
1051 &mut FinalizerInterceptorContextMut<'_>,
1052 expected
1053 );
1054 }
1055
1056 #[tokio::test]
1057 #[traced_test]
1058 async fn test_modify_before_retry_loop_error_causes_jump_to_modify_before_completion() {
1059 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1060 interceptor_error_redirection_test!(
1061 modify_before_retry_loop,
1062 &mut BeforeTransmitInterceptorContextMut<'_>,
1063 modify_before_completion,
1064 &mut FinalizerInterceptorContextMut<'_>,
1065 expected
1066 );
1067 }
1068
1069 #[tokio::test]
1070 #[traced_test]
1071 async fn test_read_before_attempt_error_causes_jump_to_modify_before_attempt_completion() {
1072 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1073 interceptor_error_redirection_test!(
1074 read_before_attempt,
1075 &BeforeTransmitInterceptorContextRef<'_>,
1076 modify_before_attempt_completion,
1077 &mut FinalizerInterceptorContextMut<'_>,
1078 expected
1079 );
1080 }
1081
1082 #[tokio::test]
1083 #[traced_test]
1084 async fn test_modify_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1085 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1086 interceptor_error_redirection_test!(
1087 modify_before_signing,
1088 &mut BeforeTransmitInterceptorContextMut<'_>,
1089 modify_before_attempt_completion,
1090 &mut FinalizerInterceptorContextMut<'_>,
1091 expected
1092 );
1093 }
1094
1095 #[tokio::test]
1096 #[traced_test]
1097 async fn test_read_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1098 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1099 interceptor_error_redirection_test!(
1100 read_before_signing,
1101 &BeforeTransmitInterceptorContextRef<'_>,
1102 modify_before_attempt_completion,
1103 &mut FinalizerInterceptorContextMut<'_>,
1104 expected
1105 );
1106 }
1107
1108 #[tokio::test]
1109 #[traced_test]
1110 async fn test_read_after_signing_error_causes_jump_to_modify_before_attempt_completion() {
1111 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1112 interceptor_error_redirection_test!(
1113 read_after_signing,
1114 &BeforeTransmitInterceptorContextRef<'_>,
1115 modify_before_attempt_completion,
1116 &mut FinalizerInterceptorContextMut<'_>,
1117 expected
1118 );
1119 }
1120
1121 #[tokio::test]
1122 #[traced_test]
1123 async fn test_modify_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1124 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1125 interceptor_error_redirection_test!(
1126 modify_before_transmit,
1127 &mut BeforeTransmitInterceptorContextMut<'_>,
1128 modify_before_attempt_completion,
1129 &mut FinalizerInterceptorContextMut<'_>,
1130 expected
1131 );
1132 }
1133
1134 #[tokio::test]
1135 #[traced_test]
1136 async fn test_read_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1137 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1138 interceptor_error_redirection_test!(
1139 read_before_transmit,
1140 &BeforeTransmitInterceptorContextRef<'_>,
1141 modify_before_attempt_completion,
1142 &mut FinalizerInterceptorContextMut<'_>,
1143 expected
1144 );
1145 }
1146
1147 #[tokio::test]
1148 #[traced_test]
1149 async fn test_read_after_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1150 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1151 interceptor_error_redirection_test!(
1152 read_after_transmit,
1153 &BeforeDeserializationInterceptorContextRef<'_>,
1154 modify_before_attempt_completion,
1155 &mut FinalizerInterceptorContextMut<'_>,
1156 expected
1157 );
1158 }
1159
1160 #[tokio::test]
1161 #[traced_test]
1162 async fn test_modify_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1163 ) {
1164 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1165 interceptor_error_redirection_test!(
1166 modify_before_deserialization,
1167 &mut BeforeDeserializationInterceptorContextMut<'_>,
1168 modify_before_attempt_completion,
1169 &mut FinalizerInterceptorContextMut<'_>,
1170 expected
1171 );
1172 }
1173
1174 #[tokio::test]
1175 #[traced_test]
1176 async fn test_read_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1177 ) {
1178 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1179 interceptor_error_redirection_test!(
1180 read_before_deserialization,
1181 &BeforeDeserializationInterceptorContextRef<'_>,
1182 modify_before_attempt_completion,
1183 &mut FinalizerInterceptorContextMut<'_>,
1184 expected
1185 );
1186 }
1187
1188 #[tokio::test]
1189 #[traced_test]
1190 async fn test_read_after_deserialization_error_causes_jump_to_modify_before_attempt_completion()
1191 {
1192 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1193 interceptor_error_redirection_test!(
1194 read_after_deserialization,
1195 &AfterDeserializationInterceptorContextRef<'_>,
1196 modify_before_attempt_completion,
1197 &mut FinalizerInterceptorContextMut<'_>,
1198 expected
1199 );
1200 }
1201
1202 #[tokio::test]
1203 #[traced_test]
1204 async fn test_modify_before_attempt_completion_error_causes_jump_to_read_after_attempt() {
1205 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("DestinationInterceptor")"#;
1206 interceptor_error_redirection_test!(
1207 modify_before_attempt_completion,
1208 &mut FinalizerInterceptorContextMut<'_>,
1209 read_after_attempt,
1210 &FinalizerInterceptorContextRef<'_>,
1211 expected
1212 );
1213 }
1214
1215 #[tokio::test]
1216 #[traced_test]
1217 async fn test_modify_before_completion_error_causes_jump_to_read_after_execution() {
1218 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("DestinationInterceptor")"#;
1219 interceptor_error_redirection_test!(
1220 modify_before_completion,
1221 &mut FinalizerInterceptorContextMut<'_>,
1222 read_after_execution,
1223 &FinalizerInterceptorContextRef<'_>,
1224 expected
1225 );
1226 }
1227
1228 #[tokio::test]
1229 async fn test_stop_points() {
1230 let runtime_plugins = || {
1231 RuntimePlugins::new()
1232 .with_operation_plugin(TestOperationRuntimePlugin::new())
1233 .with_operation_plugin(NoAuthRuntimePlugin::new())
1234 };
1235
1236 let context = invoke_with_stop_point(
1238 "test",
1239 "test",
1240 Input::doesnt_matter(),
1241 &runtime_plugins(),
1242 StopPoint::None,
1243 )
1244 .await
1245 .expect("success");
1246 assert!(context.response().is_some());
1247
1248 let context = invoke_with_stop_point(
1250 "test",
1251 "test",
1252 Input::doesnt_matter(),
1253 &runtime_plugins(),
1254 StopPoint::BeforeTransmit,
1255 )
1256 .await
1257 .expect("success");
1258 assert!(context.response().is_none());
1259 }
1260
1261 #[tokio::test]
1263 async fn test_stop_points_error_handling() {
1264 #[derive(Debug, Default)]
1265 struct Inner {
1266 modify_before_retry_loop_called: AtomicBool,
1267 modify_before_completion_called: AtomicBool,
1268 read_after_execution_called: AtomicBool,
1269 }
1270 #[derive(Clone, Debug, Default)]
1271 struct TestInterceptor {
1272 inner: Arc<Inner>,
1273 }
1274
1275 impl Intercept for TestInterceptor {
1276 fn name(&self) -> &'static str {
1277 "TestInterceptor"
1278 }
1279
1280 fn modify_before_retry_loop(
1281 &self,
1282 _context: &mut BeforeTransmitInterceptorContextMut<'_>,
1283 _rc: &RuntimeComponents,
1284 _cfg: &mut ConfigBag,
1285 ) -> Result<(), BoxError> {
1286 self.inner
1287 .modify_before_retry_loop_called
1288 .store(true, Ordering::Relaxed);
1289 Err("test error".into())
1290 }
1291
1292 fn modify_before_completion(
1293 &self,
1294 _context: &mut FinalizerInterceptorContextMut<'_>,
1295 _rc: &RuntimeComponents,
1296 _cfg: &mut ConfigBag,
1297 ) -> Result<(), BoxError> {
1298 self.inner
1299 .modify_before_completion_called
1300 .store(true, Ordering::Relaxed);
1301 Ok(())
1302 }
1303
1304 fn read_after_execution(
1305 &self,
1306 _context: &FinalizerInterceptorContextRef<'_>,
1307 _rc: &RuntimeComponents,
1308 _cfg: &mut ConfigBag,
1309 ) -> Result<(), BoxError> {
1310 self.inner
1311 .read_after_execution_called
1312 .store(true, Ordering::Relaxed);
1313 Ok(())
1314 }
1315 }
1316
1317 #[derive(Debug)]
1318 struct TestInterceptorRuntimePlugin {
1319 builder: RuntimeComponentsBuilder,
1320 }
1321
1322 impl RuntimePlugin for TestInterceptorRuntimePlugin {
1323 fn runtime_components(
1324 &self,
1325 _: &RuntimeComponentsBuilder,
1326 ) -> Cow<'_, RuntimeComponentsBuilder> {
1327 Cow::Borrowed(&self.builder)
1328 }
1329 }
1330
1331 let interceptor = TestInterceptor::default();
1332 let client = NeverClient::new();
1333 let runtime_plugins = || {
1334 RuntimePlugins::new()
1335 .with_operation_plugin(TestOperationRuntimePlugin::new())
1336 .with_operation_plugin(NoAuthRuntimePlugin::new())
1337 .with_operation_plugin(TestInterceptorRuntimePlugin {
1338 builder: RuntimeComponentsBuilder::new("test")
1339 .with_interceptor(SharedInterceptor::new(interceptor.clone()))
1340 .with_http_client(Some(client.clone())),
1341 })
1342 };
1343
1344 let _err = invoke_with_stop_point(
1346 "test",
1347 "test",
1348 Input::doesnt_matter(),
1349 &runtime_plugins(),
1350 StopPoint::BeforeTransmit,
1351 )
1352 .await
1353 .expect_err("an error was returned");
1354 assert_eq!(client.num_calls(), 0);
1355
1356 assert!(interceptor
1357 .inner
1358 .modify_before_retry_loop_called
1359 .load(Ordering::Relaxed));
1360 assert!(interceptor
1361 .inner
1362 .modify_before_completion_called
1363 .load(Ordering::Relaxed));
1364 assert!(interceptor
1365 .inner
1366 .read_after_execution_called
1367 .load(Ordering::Relaxed));
1368 }
1369}