1use crate::client::interceptors::Interceptors;
7use crate::client::orchestrator::http::{log_response_body, read_body};
8use crate::client::timeout::{MaybeTimeout, MaybeTimeoutConfig, TimeoutKind};
9use crate::client::{
10 http::body::minimum_throughput::MaybeUploadThroughputCheckFuture,
11 orchestrator::endpoints::orchestrate_endpoint,
12};
13use auth::{resolve_identity, sign_request};
14use aws_smithy_async::rt::sleep::AsyncSleep;
15use aws_smithy_runtime_api::box_error::BoxError;
16use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnector, HttpConnectorSettings};
17use aws_smithy_runtime_api::client::interceptors::context::{
18 Error, Input, InterceptorContext, Output, RewindResult,
19};
20use aws_smithy_runtime_api::client::orchestrator::{
21 HttpResponse, LoadedRequestBody, OrchestratorError,
22};
23use aws_smithy_runtime_api::client::result::SdkError;
24use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
25use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
26use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
27use aws_smithy_runtime_api::client::ser_de::{
28 DeserializeResponse, SerializeRequest, SharedRequestSerializer, SharedResponseDeserializer,
29};
30use aws_smithy_types::body::SdkBody;
31use aws_smithy_types::byte_stream::ByteStream;
32use aws_smithy_types::config_bag::ConfigBag;
33use aws_smithy_types::timeout::{MergeTimeoutConfig, TimeoutConfig};
34use endpoints::apply_endpoint;
35use std::mem;
36use tracing::{debug, debug_span, instrument, trace, Instrument};
37
38mod auth;
39pub use auth::AuthSchemeAndEndpointOrchestrationV2;
40
41pub mod endpoints;
43
44mod http;
46
47pub mod operation;
49
50macro_rules! halt {
51 ([$ctx:ident] => $err:expr) => {{
52 debug!("encountered orchestrator error; halting");
53 $ctx.fail($err.into());
54 return;
55 }};
56}
57
58macro_rules! halt_on_err {
59 ([$ctx:ident] => $expr:expr) => {
60 match $expr {
61 Ok(ok) => ok,
62 Err(err) => halt!([$ctx] => err),
63 }
64 };
65}
66
67macro_rules! continue_on_err {
68 ([$ctx:ident] => $expr:expr) => {
69 if let Err(err) = $expr {
70 debug!(err = ?err, "encountered orchestrator error; continuing");
71 $ctx.fail(err.into());
72 }
73 };
74}
75
76macro_rules! run_interceptors {
77 (continue_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
78 $(run_interceptors!(continue_on_err: $interceptor($ctx, $rc, $cfg));)+
79 };
80 (continue_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
81 continue_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
82 };
83 (halt_on_err: { $($interceptor:ident($ctx:ident, $rc:ident, $cfg:ident);)+ }) => {
84 $(run_interceptors!(halt_on_err: $interceptor($ctx, $rc, $cfg));)+
85 };
86 (halt_on_err: $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
87 halt_on_err!([$ctx] => run_interceptors!(__private $interceptor($ctx, $rc, $cfg)))
88 };
89 (__private $interceptor:ident($ctx:ident, $rc:ident, $cfg:ident)) => {
90 Interceptors::new($rc.interceptors()).$interceptor($ctx, $rc, $cfg)
91 };
92}
93
94pub async fn invoke(
104 service_name: &str,
105 operation_name: &str,
106 input: Input,
107 runtime_plugins: &RuntimePlugins,
108) -> Result<Output, SdkError<Error, HttpResponse>> {
109 invoke_with_stop_point(
110 service_name,
111 operation_name,
112 input,
113 runtime_plugins,
114 StopPoint::None,
115 )
116 .await?
117 .finalize()
118}
119
120#[non_exhaustive]
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub enum StopPoint {
124 None,
126
127 BeforeTransmit,
129}
130
131pub async fn invoke_with_stop_point(
138 _service_name: &str,
142 _operation_name: &str,
143 input: Input,
144 runtime_plugins: &RuntimePlugins,
145 stop_point: StopPoint,
146) -> Result<InterceptorContext, SdkError<Error, HttpResponse>> {
147 async move {
148 let mut cfg = ConfigBag::base();
149 let cfg = &mut cfg;
150
151 let mut ctx = InterceptorContext::new(input);
152
153 let runtime_components = apply_configuration(&mut ctx, cfg, runtime_plugins)
154 .map_err(SdkError::construction_failure)?;
155 trace!(runtime_components = ?runtime_components);
156
157 let operation_timeout_config =
158 MaybeTimeoutConfig::new(&runtime_components, cfg, TimeoutKind::Operation);
159 trace!(operation_timeout_config = ?operation_timeout_config);
160 async {
161 if !ctx.is_failed() {
164 try_op(&mut ctx, cfg, &runtime_components, stop_point).await;
165 }
166 finally_op(&mut ctx, cfg, &runtime_components).await;
167 if ctx.is_failed() {
168 Err(ctx.finalize().expect_err("it is failed"))
169 } else {
170 Ok(ctx)
171 }
172 }
173 .maybe_timeout(operation_timeout_config)
174 .await
175 }
176 .await
177}
178
179#[instrument(skip_all, level = "debug")]
183fn apply_configuration(
184 ctx: &mut InterceptorContext,
185 cfg: &mut ConfigBag,
186 runtime_plugins: &RuntimePlugins,
187) -> Result<RuntimeComponents, BoxError> {
188 let client_rc_builder = runtime_plugins.apply_client_configuration(cfg)?;
189 continue_on_err!([ctx] => Interceptors::new(client_rc_builder.interceptors()).read_before_execution(false, ctx, cfg));
190
191 let operation_rc_builder = runtime_plugins.apply_operation_configuration(cfg)?;
192 continue_on_err!([ctx] => Interceptors::new(operation_rc_builder.interceptors()).read_before_execution(true, ctx, cfg));
193
194 let components = client_rc_builder
196 .merge_from(&operation_rc_builder)
197 .build()?;
198
199 let resolved_timeout_config = cfg.load::<MergeTimeoutConfig>();
203 debug!(
204 "timeout settings for this operation: {:?}",
205 resolved_timeout_config
206 );
207 cfg.interceptor_state().store_put(resolved_timeout_config);
208
209 components.validate_final_config(cfg)?;
210 Ok(components)
211}
212
213#[instrument(skip_all, level = "debug")]
214async fn try_op(
215 ctx: &mut InterceptorContext,
216 cfg: &mut ConfigBag,
217 runtime_components: &RuntimeComponents,
218 stop_point: StopPoint,
219) {
220 run_interceptors!(halt_on_err: {
222 modify_before_serialization(ctx, runtime_components, cfg);
223 read_before_serialization(ctx, runtime_components, cfg);
224 });
225
226 ctx.enter_serialization_phase();
228 {
229 let _span = debug_span!("serialization").entered();
230 let request_serializer = cfg
231 .load::<SharedRequestSerializer>()
232 .expect("request serializer must be in the config bag")
233 .clone();
234 let input = ctx.take_input().expect("input set at this point");
235 let request = halt_on_err!([ctx] => request_serializer.serialize_input(input, cfg).map_err(OrchestratorError::other));
236 ctx.set_request(request);
237 }
238
239 if let Some(&LoadedRequestBody::Requested) = cfg.load::<LoadedRequestBody>() {
241 debug!("loading request body into memory");
242 let mut body = SdkBody::taken();
243 mem::swap(&mut body, ctx.request_mut().expect("set above").body_mut());
244 let loaded_body = halt_on_err!([ctx] =>
245 ByteStream::new(body).collect().await.map_err(OrchestratorError::other)
246 )
247 .into_bytes();
248 *ctx.request_mut().as_mut().expect("set above").body_mut() =
249 SdkBody::from(loaded_body.clone());
250 cfg.interceptor_state()
251 .store_put(LoadedRequestBody::Loaded(loaded_body));
252 }
253
254 ctx.enter_before_transmit_phase();
256 run_interceptors!(halt_on_err: {
257 read_after_serialization(ctx, runtime_components, cfg);
258 modify_before_retry_loop(ctx, runtime_components, cfg);
259 });
260
261 let should_attempt = runtime_components
264 .retry_strategy()
265 .should_attempt_initial_request(runtime_components, cfg);
266 match should_attempt {
267 Ok(ShouldAttempt::Yes) => debug!("retry strategy has OKed initial request"),
269 Ok(ShouldAttempt::No) => {
271 let err: BoxError = "the retry strategy indicates that an initial request shouldn't be made, but it didn't specify why".into();
272 halt!([ctx] => OrchestratorError::other(err));
273 }
274 Err(err) => halt!([ctx] => OrchestratorError::other(err)),
276 Ok(ShouldAttempt::YesAfterDelay(delay)) => {
277 let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
278 "the retry strategy requested a delay before sending the initial request, but no 'async sleep' implementation was set"
279 )));
280 debug!("retry strategy has OKed initial request after a {delay:?} delay");
281 sleep_impl.sleep(delay).await;
282 }
283 }
284
285 ctx.save_checkpoint();
288 let mut retry_delay = None;
289 for i in 1u32.. {
290 trace!("checking if context can be rewound for attempt #{i}");
293 if let RewindResult::Impossible = ctx.rewind(cfg) {
294 debug!("request cannot be retried since the request body cannot be cloned");
295 break;
296 }
297 cfg.interceptor_state()
299 .store_put::<RequestAttempts>(i.into());
300 if let Some((delay, sleep)) = retry_delay.take() {
302 debug!("delaying for {delay:?}");
303 sleep.await;
304 }
305 let attempt_timeout_config =
306 MaybeTimeoutConfig::new(runtime_components, cfg, TimeoutKind::OperationAttempt);
307 trace!(attempt_timeout_config = ?attempt_timeout_config);
308 let maybe_timeout = async {
309 debug!("beginning attempt #{i}");
310 try_attempt(ctx, cfg, runtime_components, stop_point)
311 .instrument(debug_span!("try_attempt", "attempt" = i))
312 .await;
313 finally_attempt(ctx, cfg, runtime_components)
314 .instrument(debug_span!("finally_attempt", "attempt" = i))
315 .await;
316 Result::<_, SdkError<Error, HttpResponse>>::Ok(())
317 }
318 .maybe_timeout(attempt_timeout_config)
319 .await
320 .map_err(|err| OrchestratorError::timeout(err.into_source().unwrap()));
321
322 continue_on_err!([ctx] => maybe_timeout);
324
325 let should_attempt = halt_on_err!([ctx] => runtime_components
328 .retry_strategy()
329 .should_attempt_retry(ctx, runtime_components, cfg)
330 .map_err(OrchestratorError::other));
331 match should_attempt {
332 ShouldAttempt::Yes => continue,
334 ShouldAttempt::No => {
336 debug!("a retry is either unnecessary or not possible, exiting attempt loop");
337 break;
338 }
339 ShouldAttempt::YesAfterDelay(delay) => {
340 let sleep_impl = halt_on_err!([ctx] => runtime_components.sleep_impl().ok_or_else(|| OrchestratorError::other(
341 "the retry strategy requested a delay before sending the retry request, but no 'async sleep' implementation was set"
342 )));
343 retry_delay = Some((delay, sleep_impl.sleep(delay)));
344 continue;
345 }
346 }
347 }
348}
349
350async fn try_attempt(
351 ctx: &mut InterceptorContext,
352 cfg: &mut ConfigBag,
353 runtime_components: &RuntimeComponents,
354 stop_point: StopPoint,
355) {
356 run_interceptors!(halt_on_err: read_before_attempt(ctx, runtime_components, cfg));
357
358 let (scheme_id, identity, endpoint) = halt_on_err!([ctx] => resolve_identity(runtime_components, cfg).await.map_err(OrchestratorError::other));
359
360 match endpoint {
361 Some(endpoint) => {
362 halt_on_err!([ctx] => apply_endpoint(&endpoint, ctx, cfg).map_err(OrchestratorError::other));
365 cfg.interceptor_state().store_put(endpoint);
367 }
368 None => {
369 halt_on_err!([ctx] => orchestrate_endpoint(identity.clone(), ctx, runtime_components, cfg)
370 .instrument(debug_span!("orchestrate_endpoint"))
371 .await
372 .map_err(OrchestratorError::other));
373 }
374 }
375
376 run_interceptors!(halt_on_err: {
377 modify_before_signing(ctx, runtime_components, cfg);
378 read_before_signing(ctx, runtime_components, cfg);
379 });
380
381 halt_on_err!([ctx] => sign_request(&scheme_id, &identity, ctx, runtime_components, cfg).map_err(OrchestratorError::other));
382
383 run_interceptors!(halt_on_err: {
384 read_after_signing(ctx, runtime_components, cfg);
385 modify_before_transmit(ctx, runtime_components, cfg);
386 read_before_transmit(ctx, runtime_components, cfg);
387 });
388
389 if let StopPoint::BeforeTransmit = stop_point {
391 debug!("ending orchestration early because the stop point is `BeforeTransmit`");
392 return;
393 }
394
395 ctx.enter_transmit_phase();
398 let response = halt_on_err!([ctx] => {
399 let request = ctx.take_request().expect("set during serialization");
400 trace!(request = ?request, "transmitting request");
401 let http_client = halt_on_err!([ctx] => runtime_components.http_client().ok_or_else(||
402 OrchestratorError::other("No HTTP client was available to send this request. \
403 Enable the `default-https-client` crate feature or configure an HTTP client to fix this.")
404 ));
405 let timeout_config = cfg.load::<TimeoutConfig>().expect("timeout config must be set");
406 let settings = {
407 let mut builder = HttpConnectorSettings::builder();
408 builder.set_connect_timeout(timeout_config.connect_timeout());
409 builder.set_read_timeout(timeout_config.read_timeout());
410 builder.build()
411 };
412 let connector = http_client.http_connector(&settings, runtime_components);
413 let response_future = MaybeUploadThroughputCheckFuture::new(
414 cfg,
415 runtime_components,
416 connector.call(request),
417 );
418 response_future.await.map_err(OrchestratorError::connector)
419 });
420 trace!(response = ?response, "received response from service");
421 ctx.set_response(response);
422 ctx.enter_before_deserialization_phase();
423
424 run_interceptors!(halt_on_err: {
425 read_after_transmit(ctx, runtime_components, cfg);
426 modify_before_deserialization(ctx, runtime_components, cfg);
427 read_before_deserialization(ctx, runtime_components, cfg);
428 });
429
430 ctx.enter_deserialization_phase();
431 let output_or_error = async {
432 let response = ctx.response_mut().expect("set during transmit");
433 let response_deserializer = cfg
434 .load::<SharedResponseDeserializer>()
435 .expect("a request deserializer must be in the config bag");
436 let maybe_deserialized = {
437 let _span = debug_span!("deserialize_streaming").entered();
438 response_deserializer.deserialize_streaming(response)
439 };
440 match maybe_deserialized {
441 Some(output_or_error) => output_or_error,
442 None => read_body(response)
443 .instrument(debug_span!("read_body"))
444 .await
445 .map_err(OrchestratorError::response)
446 .and_then(|_| {
447 let _span = debug_span!("deserialize_nonstreaming").entered();
448 log_response_body(response, cfg);
449 response_deserializer.deserialize_nonstreaming(response)
450 }),
451 }
452 }
453 .instrument(debug_span!("deserialization"))
454 .await;
455 trace!(output_or_error = ?output_or_error);
456 ctx.set_output_or_error(output_or_error);
457
458 ctx.enter_after_deserialization_phase();
459 run_interceptors!(halt_on_err: read_after_deserialization(ctx, runtime_components, cfg));
460}
461
462async fn finally_attempt(
463 ctx: &mut InterceptorContext,
464 cfg: &mut ConfigBag,
465 runtime_components: &RuntimeComponents,
466) {
467 run_interceptors!(continue_on_err: {
468 modify_before_attempt_completion(ctx, runtime_components, cfg);
469 read_after_attempt(ctx, runtime_components, cfg);
470 });
471}
472
473#[instrument(skip_all, level = "debug")]
474async fn finally_op(
475 ctx: &mut InterceptorContext,
476 cfg: &mut ConfigBag,
477 runtime_components: &RuntimeComponents,
478) {
479 run_interceptors!(continue_on_err: {
480 modify_before_completion(ctx, runtime_components, cfg);
481 read_after_execution(ctx, runtime_components, cfg);
482 });
483}
484
485#[cfg(all(test, any(feature = "test-util", feature = "legacy-test-util")))]
486mod tests {
487 use crate::client::auth::no_auth::{NoAuthRuntimePluginV2, NO_AUTH_SCHEME_ID};
488 use crate::client::orchestrator::endpoints::StaticUriEndpointResolver;
489 use crate::client::orchestrator::{invoke, invoke_with_stop_point, StopPoint};
490 use crate::client::retries::strategy::NeverRetryStrategy;
491 use crate::client::test_util::{
492 deserializer::CannedResponseDeserializer, serializer::CannedRequestSerializer,
493 };
494 use aws_smithy_http_client::test_util::NeverClient;
495 use aws_smithy_runtime_api::box_error::BoxError;
496 use aws_smithy_runtime_api::client::auth::static_resolver::StaticAuthSchemeOptionResolver;
497 use aws_smithy_runtime_api::client::auth::{
498 AuthSchemeOptionResolverParams, SharedAuthSchemeOptionResolver,
499 };
500 use aws_smithy_runtime_api::client::endpoint::{
501 EndpointResolverParams, SharedEndpointResolver,
502 };
503 use aws_smithy_runtime_api::client::http::{
504 http_client_fn, HttpConnector, HttpConnectorFuture,
505 };
506 use aws_smithy_runtime_api::client::interceptors::context::{
507 AfterDeserializationInterceptorContextRef, BeforeDeserializationInterceptorContextMut,
508 BeforeDeserializationInterceptorContextRef, BeforeSerializationInterceptorContextMut,
509 BeforeSerializationInterceptorContextRef, BeforeTransmitInterceptorContextMut,
510 BeforeTransmitInterceptorContextRef, FinalizerInterceptorContextMut,
511 FinalizerInterceptorContextRef, Input, Output,
512 };
513 use aws_smithy_runtime_api::client::interceptors::{Intercept, SharedInterceptor};
514 use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, OrchestratorError};
515 use aws_smithy_runtime_api::client::retries::SharedRetryStrategy;
516 use aws_smithy_runtime_api::client::runtime_components::{
517 RuntimeComponents, RuntimeComponentsBuilder,
518 };
519 use aws_smithy_runtime_api::client::runtime_plugin::{RuntimePlugin, RuntimePlugins};
520 use aws_smithy_runtime_api::client::ser_de::{
521 SharedRequestSerializer, SharedResponseDeserializer,
522 };
523 use aws_smithy_runtime_api::shared::IntoShared;
524 use aws_smithy_types::body::SdkBody;
525 use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
526 use aws_smithy_types::timeout::TimeoutConfig;
527 use http_1x::{Response, StatusCode};
528 use std::borrow::Cow;
529 use std::sync::atomic::{AtomicBool, Ordering};
530 use std::sync::Arc;
531 use tracing_test::traced_test;
532
533 fn new_request_serializer() -> CannedRequestSerializer {
534 CannedRequestSerializer::success(HttpRequest::empty())
535 }
536
537 fn new_response_deserializer() -> CannedResponseDeserializer {
538 CannedResponseDeserializer::new(
539 Response::builder()
540 .status(StatusCode::OK)
541 .body(SdkBody::empty())
542 .map_err(|err| OrchestratorError::other(Box::new(err)))
543 .map(Output::erase),
544 )
545 }
546
547 #[derive(Debug, Default)]
548 struct OkConnector {}
549
550 impl OkConnector {
551 fn new() -> Self {
552 Self::default()
553 }
554 }
555
556 impl HttpConnector for OkConnector {
557 fn call(&self, _request: HttpRequest) -> HttpConnectorFuture {
558 HttpConnectorFuture::ready(Ok(http_1x::Response::builder()
559 .status(200)
560 .body(SdkBody::empty())
561 .expect("OK response is valid")
562 .try_into()
563 .unwrap()))
564 }
565 }
566
567 #[derive(Debug)]
568 struct TestOperationRuntimePlugin {
569 builder: RuntimeComponentsBuilder,
570 }
571
572 impl TestOperationRuntimePlugin {
573 fn new() -> Self {
574 Self {
575 builder: RuntimeComponentsBuilder::for_tests()
576 .with_retry_strategy(Some(SharedRetryStrategy::new(NeverRetryStrategy::new())))
577 .with_endpoint_resolver(Some(SharedEndpointResolver::new(
578 StaticUriEndpointResolver::http_localhost(8080),
579 )))
580 .with_http_client(Some(http_client_fn(|_, _| {
581 OkConnector::new().into_shared()
582 })))
583 .with_auth_scheme_option_resolver(Some(SharedAuthSchemeOptionResolver::new(
584 StaticAuthSchemeOptionResolver::new(vec![NO_AUTH_SCHEME_ID]),
585 ))),
586 }
587 }
588 }
589
590 impl RuntimePlugin for TestOperationRuntimePlugin {
591 fn config(&self) -> Option<FrozenLayer> {
592 let mut layer = Layer::new("TestOperationRuntimePlugin");
593 layer.store_put(AuthSchemeOptionResolverParams::new("idontcare"));
594 layer.store_put(EndpointResolverParams::new("dontcare"));
595 layer.store_put(SharedRequestSerializer::new(new_request_serializer()));
596 layer.store_put(SharedResponseDeserializer::new(new_response_deserializer()));
597 layer.store_put(TimeoutConfig::builder().build());
598 Some(layer.freeze())
599 }
600
601 fn runtime_components(
602 &self,
603 _: &RuntimeComponentsBuilder,
604 ) -> Cow<'_, RuntimeComponentsBuilder> {
605 Cow::Borrowed(&self.builder)
606 }
607 }
608
609 macro_rules! interceptor_error_handling_test {
610 (read_before_execution, $ctx:ty, $expected:expr,) => {
611 interceptor_error_handling_test!(__private read_before_execution, $ctx, $expected,);
612 };
613 ($interceptor:ident, $ctx:ty, $expected:expr) => {
614 interceptor_error_handling_test!(__private $interceptor, $ctx, $expected, _rc: &RuntimeComponents,);
615 };
616 (__private $interceptor:ident, $ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
617 #[derive(Debug)]
618 struct FailingInterceptorA;
619 impl Intercept for FailingInterceptorA {
620 fn name(&self) -> &'static str { "FailingInterceptorA" }
621
622 fn $interceptor(
623 &self,
624 _ctx: $ctx,
625 $($rc_arg)*
626 _cfg: &mut ConfigBag,
627 ) -> Result<(), BoxError> {
628 tracing::debug!("FailingInterceptorA called!");
629 Err("FailingInterceptorA".into())
630 }
631 }
632
633 #[derive(Debug)]
634 struct FailingInterceptorB;
635 impl Intercept for FailingInterceptorB {
636 fn name(&self) -> &'static str { "FailingInterceptorB" }
637
638 fn $interceptor(
639 &self,
640 _ctx: $ctx,
641 $($rc_arg)*
642 _cfg: &mut ConfigBag,
643 ) -> Result<(), BoxError> {
644 tracing::debug!("FailingInterceptorB called!");
645 Err("FailingInterceptorB".into())
646 }
647 }
648
649 #[derive(Debug)]
650 struct FailingInterceptorC;
651 impl Intercept for FailingInterceptorC {
652 fn name(&self) -> &'static str { "FailingInterceptorC" }
653
654 fn $interceptor(
655 &self,
656 _ctx: $ctx,
657 $($rc_arg)*
658 _cfg: &mut ConfigBag,
659 ) -> Result<(), BoxError> {
660 tracing::debug!("FailingInterceptorC called!");
661 Err("FailingInterceptorC".into())
662 }
663 }
664
665 #[derive(Debug)]
666 struct FailingInterceptorsClientRuntimePlugin(RuntimeComponentsBuilder);
667 impl FailingInterceptorsClientRuntimePlugin {
668 fn new() -> Self {
669 Self(RuntimeComponentsBuilder::new("test").with_interceptor(SharedInterceptor::new(FailingInterceptorA)))
670 }
671 }
672 impl RuntimePlugin for FailingInterceptorsClientRuntimePlugin {
673 fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
674 Cow::Borrowed(&self.0)
675 }
676 }
677
678 #[derive(Debug)]
679 struct FailingInterceptorsOperationRuntimePlugin(RuntimeComponentsBuilder);
680 impl FailingInterceptorsOperationRuntimePlugin {
681 fn new() -> Self {
682 Self(
683 RuntimeComponentsBuilder::new("test")
684 .with_interceptor(SharedInterceptor::new(FailingInterceptorB))
685 .with_interceptor(SharedInterceptor::new(FailingInterceptorC))
686 )
687 }
688 }
689 impl RuntimePlugin for FailingInterceptorsOperationRuntimePlugin {
690 fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
691 Cow::Borrowed(&self.0)
692 }
693 }
694
695 let input = Input::doesnt_matter();
696 let runtime_plugins = RuntimePlugins::new()
697 .with_client_plugin(FailingInterceptorsClientRuntimePlugin::new())
698 .with_operation_plugin(TestOperationRuntimePlugin::new())
699 .with_operation_plugin(NoAuthRuntimePluginV2::new())
700 .with_operation_plugin(FailingInterceptorsOperationRuntimePlugin::new());
701 let actual = invoke("test", "test", input, &runtime_plugins)
702 .await
703 .expect_err("should error");
704 let actual = format!("{:?}", actual);
705 assert!(
706 actual.starts_with(&$expected),
707 "\nActual error: {actual}\nShould start with: {}\n",
708 $expected
709 );
710
711 assert!(logs_contain("FailingInterceptorA called!"));
712 assert!(logs_contain("FailingInterceptorB called!"));
713 assert!(logs_contain("FailingInterceptorC called!"));
714 };
715 }
716
717 #[tokio::test]
718 #[traced_test]
719 async fn test_read_before_execution_error_handling() {
720 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeExecution, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
721 interceptor_error_handling_test!(
722 read_before_execution,
723 &BeforeSerializationInterceptorContextRef<'_>,
724 expected,
725 );
726 }
727
728 #[tokio::test]
729 #[traced_test]
730 async fn test_modify_before_serialization_error_handling() {
731 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
732 interceptor_error_handling_test!(
733 modify_before_serialization,
734 &mut BeforeSerializationInterceptorContextMut<'_>,
735 expected
736 );
737 }
738
739 #[tokio::test]
740 #[traced_test]
741 async fn test_read_before_serialization_error_handling() {
742 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ReadBeforeSerialization, interceptor_name: Some("FailingInterceptorC"), source: Some("FailingInterceptorC") } })"#.to_string();
743 interceptor_error_handling_test!(
744 read_before_serialization,
745 &BeforeSerializationInterceptorContextRef<'_>,
746 expected
747 );
748 }
749
750 #[tokio::test]
751 #[traced_test]
752 async fn test_read_after_serialization_error_handling() {
753 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSerialization, interceptor_name: Some("FailingInterceptorC")"#.to_string();
754 interceptor_error_handling_test!(
755 read_after_serialization,
756 &BeforeTransmitInterceptorContextRef<'_>,
757 expected
758 );
759 }
760
761 #[tokio::test]
762 #[traced_test]
763 async fn test_modify_before_retry_loop_error_handling() {
764 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeRetryLoop, interceptor_name: Some("FailingInterceptorC")"#.to_string();
765 interceptor_error_handling_test!(
766 modify_before_retry_loop,
767 &mut BeforeTransmitInterceptorContextMut<'_>,
768 expected
769 );
770 }
771
772 #[tokio::test]
773 #[traced_test]
774 async fn test_read_before_attempt_error_handling() {
775 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeAttempt, interceptor_name: Some("FailingInterceptorC")"#;
776 interceptor_error_handling_test!(
777 read_before_attempt,
778 &BeforeTransmitInterceptorContextRef<'_>,
779 expected
780 );
781 }
782
783 #[tokio::test]
784 #[traced_test]
785 async fn test_modify_before_signing_error_handling() {
786 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
787 interceptor_error_handling_test!(
788 modify_before_signing,
789 &mut BeforeTransmitInterceptorContextMut<'_>,
790 expected
791 );
792 }
793
794 #[tokio::test]
795 #[traced_test]
796 async fn test_read_before_signing_error_handling() {
797 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeSigning, interceptor_name: Some("FailingInterceptorC")"#;
798 interceptor_error_handling_test!(
799 read_before_signing,
800 &BeforeTransmitInterceptorContextRef<'_>,
801 expected
802 );
803 }
804
805 #[tokio::test]
806 #[traced_test]
807 async fn test_read_after_signing_error_handling() {
808 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadAfterSigning, interceptor_name: Some("FailingInterceptorC")"#;
809 interceptor_error_handling_test!(
810 read_after_signing,
811 &BeforeTransmitInterceptorContextRef<'_>,
812 expected
813 );
814 }
815
816 #[tokio::test]
817 #[traced_test]
818 async fn test_modify_before_transmit_error_handling() {
819 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
820 interceptor_error_handling_test!(
821 modify_before_transmit,
822 &mut BeforeTransmitInterceptorContextMut<'_>,
823 expected
824 );
825 }
826
827 #[tokio::test]
828 #[traced_test]
829 async fn test_read_before_transmit_error_handling() {
830 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ReadBeforeTransmit, interceptor_name: Some("FailingInterceptorC")"#;
831 interceptor_error_handling_test!(
832 read_before_transmit,
833 &BeforeTransmitInterceptorContextRef<'_>,
834 expected
835 );
836 }
837
838 #[tokio::test]
839 #[traced_test]
840 async fn test_read_after_transmit_error_handling() {
841 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterTransmit, interceptor_name: Some("FailingInterceptorC")"#;
842 interceptor_error_handling_test!(
843 read_after_transmit,
844 &BeforeDeserializationInterceptorContextRef<'_>,
845 expected
846 );
847 }
848
849 #[tokio::test]
850 #[traced_test]
851 async fn test_modify_before_deserialization_error_handling() {
852 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
853 interceptor_error_handling_test!(
854 modify_before_deserialization,
855 &mut BeforeDeserializationInterceptorContextMut<'_>,
856 expected
857 );
858 }
859
860 #[tokio::test]
861 #[traced_test]
862 async fn test_read_before_deserialization_error_handling() {
863 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadBeforeDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
864 interceptor_error_handling_test!(
865 read_before_deserialization,
866 &BeforeDeserializationInterceptorContextRef<'_>,
867 expected
868 );
869 }
870
871 #[tokio::test]
872 #[traced_test]
873 async fn test_read_after_deserialization_error_handling() {
874 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterDeserialization, interceptor_name: Some("FailingInterceptorC")"#;
875 interceptor_error_handling_test!(
876 read_after_deserialization,
877 &AfterDeserializationInterceptorContextRef<'_>,
878 expected
879 );
880 }
881
882 #[tokio::test]
883 #[traced_test]
884 async fn test_modify_before_attempt_completion_error_handling() {
885 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("FailingInterceptorC")"#;
886 interceptor_error_handling_test!(
887 modify_before_attempt_completion,
888 &mut FinalizerInterceptorContextMut<'_>,
889 expected
890 );
891 }
892
893 #[tokio::test]
894 #[traced_test]
895 async fn test_read_after_attempt_error_handling() {
896 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("FailingInterceptorC")"#;
897 interceptor_error_handling_test!(
898 read_after_attempt,
899 &FinalizerInterceptorContextRef<'_>,
900 expected
901 );
902 }
903
904 #[tokio::test]
905 #[traced_test]
906 async fn test_modify_before_completion_error_handling() {
907 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("FailingInterceptorC")"#;
908 interceptor_error_handling_test!(
909 modify_before_completion,
910 &mut FinalizerInterceptorContextMut<'_>,
911 expected
912 );
913 }
914
915 #[tokio::test]
916 #[traced_test]
917 async fn test_read_after_execution_error_handling() {
918 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("FailingInterceptorC")"#;
919 interceptor_error_handling_test!(
920 read_after_execution,
921 &FinalizerInterceptorContextRef<'_>,
922 expected
923 );
924 }
925
926 macro_rules! interceptor_error_redirection_test {
927 (read_before_execution, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
928 interceptor_error_redirection_test!(__private read_before_execution, $origin_ctx, $destination_interceptor, $destination_ctx, $expected,);
929 };
930 ($origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr) => {
931 interceptor_error_redirection_test!(__private $origin_interceptor, $origin_ctx, $destination_interceptor, $destination_ctx, $expected, _rc: &RuntimeComponents,);
932 };
933 (__private $origin_interceptor:ident, $origin_ctx:ty, $destination_interceptor:ident, $destination_ctx:ty, $expected:expr, $($rc_arg:tt)*) => {
934 #[derive(Debug)]
935 struct OriginInterceptor;
936 impl Intercept for OriginInterceptor {
937 fn name(&self) -> &'static str { "OriginInterceptor" }
938
939 fn $origin_interceptor(
940 &self,
941 _ctx: $origin_ctx,
942 $($rc_arg)*
943 _cfg: &mut ConfigBag,
944 ) -> Result<(), BoxError> {
945 tracing::debug!("OriginInterceptor called!");
946 Err("OriginInterceptor".into())
947 }
948 }
949
950 #[derive(Debug)]
951 struct DestinationInterceptor;
952 impl Intercept for DestinationInterceptor {
953 fn name(&self) -> &'static str { "DestinationInterceptor" }
954
955 fn $destination_interceptor(
956 &self,
957 _ctx: $destination_ctx,
958 _runtime_components: &RuntimeComponents,
959 _cfg: &mut ConfigBag,
960 ) -> Result<(), BoxError> {
961 tracing::debug!("DestinationInterceptor called!");
962 Err("DestinationInterceptor".into())
963 }
964 }
965
966 #[derive(Debug)]
967 struct InterceptorsTestOperationRuntimePlugin(RuntimeComponentsBuilder);
968 impl InterceptorsTestOperationRuntimePlugin {
969 fn new() -> Self {
970 Self(
971 RuntimeComponentsBuilder::new("test")
972 .with_interceptor(SharedInterceptor::new(OriginInterceptor))
973 .with_interceptor(SharedInterceptor::new(DestinationInterceptor))
974 )
975 }
976 }
977 impl RuntimePlugin for InterceptorsTestOperationRuntimePlugin {
978 fn runtime_components(&self, _: &RuntimeComponentsBuilder) -> Cow<'_, RuntimeComponentsBuilder> {
979 Cow::Borrowed(&self.0)
980 }
981 }
982
983 let input = Input::doesnt_matter();
984 let runtime_plugins = RuntimePlugins::new()
985 .with_operation_plugin(TestOperationRuntimePlugin::new())
986 .with_operation_plugin(NoAuthRuntimePluginV2::new())
987 .with_operation_plugin(InterceptorsTestOperationRuntimePlugin::new());
988 let actual = invoke("test", "test", input, &runtime_plugins)
989 .await
990 .expect_err("should error");
991 let actual = format!("{:?}", actual);
992 assert!(
993 actual.starts_with(&$expected),
994 "\nActual error: {actual}\nShould start with: {}\n",
995 $expected
996 );
997
998 assert!(logs_contain("OriginInterceptor called!"));
999 assert!(logs_contain("DestinationInterceptor called!"));
1000 };
1001 }
1002
1003 #[tokio::test]
1004 #[traced_test]
1005 async fn test_read_before_execution_error_causes_jump_to_modify_before_completion() {
1006 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1007 interceptor_error_redirection_test!(
1008 read_before_execution,
1009 &BeforeSerializationInterceptorContextRef<'_>,
1010 modify_before_completion,
1011 &mut FinalizerInterceptorContextMut<'_>,
1012 expected
1013 );
1014 }
1015
1016 #[tokio::test]
1017 #[traced_test]
1018 async fn test_modify_before_serialization_error_causes_jump_to_modify_before_completion() {
1019 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1020 interceptor_error_redirection_test!(
1021 modify_before_serialization,
1022 &mut BeforeSerializationInterceptorContextMut<'_>,
1023 modify_before_completion,
1024 &mut FinalizerInterceptorContextMut<'_>,
1025 expected
1026 );
1027 }
1028
1029 #[tokio::test]
1030 #[traced_test]
1031 async fn test_read_before_serialization_error_causes_jump_to_modify_before_completion() {
1032 let expected = r#"ConstructionFailure(ConstructionFailure { source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1033 interceptor_error_redirection_test!(
1034 read_before_serialization,
1035 &BeforeSerializationInterceptorContextRef<'_>,
1036 modify_before_completion,
1037 &mut FinalizerInterceptorContextMut<'_>,
1038 expected
1039 );
1040 }
1041
1042 #[tokio::test]
1043 #[traced_test]
1044 async fn test_read_after_serialization_error_causes_jump_to_modify_before_completion() {
1045 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1046 interceptor_error_redirection_test!(
1047 read_after_serialization,
1048 &BeforeTransmitInterceptorContextRef<'_>,
1049 modify_before_completion,
1050 &mut FinalizerInterceptorContextMut<'_>,
1051 expected
1052 );
1053 }
1054
1055 #[tokio::test]
1056 #[traced_test]
1057 async fn test_modify_before_retry_loop_error_causes_jump_to_modify_before_completion() {
1058 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1059 interceptor_error_redirection_test!(
1060 modify_before_retry_loop,
1061 &mut BeforeTransmitInterceptorContextMut<'_>,
1062 modify_before_completion,
1063 &mut FinalizerInterceptorContextMut<'_>,
1064 expected
1065 );
1066 }
1067
1068 #[tokio::test]
1069 #[traced_test]
1070 async fn test_read_before_attempt_error_causes_jump_to_modify_before_attempt_completion() {
1071 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1072 interceptor_error_redirection_test!(
1073 read_before_attempt,
1074 &BeforeTransmitInterceptorContextRef<'_>,
1075 modify_before_attempt_completion,
1076 &mut FinalizerInterceptorContextMut<'_>,
1077 expected
1078 );
1079 }
1080
1081 #[tokio::test]
1082 #[traced_test]
1083 async fn test_modify_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1084 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1085 interceptor_error_redirection_test!(
1086 modify_before_signing,
1087 &mut BeforeTransmitInterceptorContextMut<'_>,
1088 modify_before_attempt_completion,
1089 &mut FinalizerInterceptorContextMut<'_>,
1090 expected
1091 );
1092 }
1093
1094 #[tokio::test]
1095 #[traced_test]
1096 async fn test_read_before_signing_error_causes_jump_to_modify_before_attempt_completion() {
1097 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1098 interceptor_error_redirection_test!(
1099 read_before_signing,
1100 &BeforeTransmitInterceptorContextRef<'_>,
1101 modify_before_attempt_completion,
1102 &mut FinalizerInterceptorContextMut<'_>,
1103 expected
1104 );
1105 }
1106
1107 #[tokio::test]
1108 #[traced_test]
1109 async fn test_read_after_signing_error_causes_jump_to_modify_before_attempt_completion() {
1110 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1111 interceptor_error_redirection_test!(
1112 read_after_signing,
1113 &BeforeTransmitInterceptorContextRef<'_>,
1114 modify_before_attempt_completion,
1115 &mut FinalizerInterceptorContextMut<'_>,
1116 expected
1117 );
1118 }
1119
1120 #[tokio::test]
1121 #[traced_test]
1122 async fn test_modify_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1123 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1124 interceptor_error_redirection_test!(
1125 modify_before_transmit,
1126 &mut BeforeTransmitInterceptorContextMut<'_>,
1127 modify_before_attempt_completion,
1128 &mut FinalizerInterceptorContextMut<'_>,
1129 expected
1130 );
1131 }
1132
1133 #[tokio::test]
1134 #[traced_test]
1135 async fn test_read_before_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1136 let expected = r#"DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1137 interceptor_error_redirection_test!(
1138 read_before_transmit,
1139 &BeforeTransmitInterceptorContextRef<'_>,
1140 modify_before_attempt_completion,
1141 &mut FinalizerInterceptorContextMut<'_>,
1142 expected
1143 );
1144 }
1145
1146 #[tokio::test]
1147 #[traced_test]
1148 async fn test_read_after_transmit_error_causes_jump_to_modify_before_attempt_completion() {
1149 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1150 interceptor_error_redirection_test!(
1151 read_after_transmit,
1152 &BeforeDeserializationInterceptorContextRef<'_>,
1153 modify_before_attempt_completion,
1154 &mut FinalizerInterceptorContextMut<'_>,
1155 expected
1156 );
1157 }
1158
1159 #[tokio::test]
1160 #[traced_test]
1161 async fn test_modify_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1162 ) {
1163 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1164 interceptor_error_redirection_test!(
1165 modify_before_deserialization,
1166 &mut BeforeDeserializationInterceptorContextMut<'_>,
1167 modify_before_attempt_completion,
1168 &mut FinalizerInterceptorContextMut<'_>,
1169 expected
1170 );
1171 }
1172
1173 #[tokio::test]
1174 #[traced_test]
1175 async fn test_read_before_deserialization_error_causes_jump_to_modify_before_attempt_completion(
1176 ) {
1177 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1178 interceptor_error_redirection_test!(
1179 read_before_deserialization,
1180 &BeforeDeserializationInterceptorContextRef<'_>,
1181 modify_before_attempt_completion,
1182 &mut FinalizerInterceptorContextMut<'_>,
1183 expected
1184 );
1185 }
1186
1187 #[tokio::test]
1188 #[traced_test]
1189 async fn test_read_after_deserialization_error_causes_jump_to_modify_before_attempt_completion()
1190 {
1191 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ModifyBeforeAttemptCompletion, interceptor_name: Some("DestinationInterceptor")"#;
1192 interceptor_error_redirection_test!(
1193 read_after_deserialization,
1194 &AfterDeserializationInterceptorContextRef<'_>,
1195 modify_before_attempt_completion,
1196 &mut FinalizerInterceptorContextMut<'_>,
1197 expected
1198 );
1199 }
1200
1201 #[tokio::test]
1202 #[traced_test]
1203 async fn test_modify_before_attempt_completion_error_causes_jump_to_read_after_attempt() {
1204 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterAttempt, interceptor_name: Some("DestinationInterceptor")"#;
1205 interceptor_error_redirection_test!(
1206 modify_before_attempt_completion,
1207 &mut FinalizerInterceptorContextMut<'_>,
1208 read_after_attempt,
1209 &FinalizerInterceptorContextRef<'_>,
1210 expected
1211 );
1212 }
1213
1214 #[tokio::test]
1215 #[traced_test]
1216 async fn test_modify_before_completion_error_causes_jump_to_read_after_execution() {
1217 let expected = r#"ResponseError(ResponseError { source: InterceptorError { kind: ReadAfterExecution, interceptor_name: Some("DestinationInterceptor")"#;
1218 interceptor_error_redirection_test!(
1219 modify_before_completion,
1220 &mut FinalizerInterceptorContextMut<'_>,
1221 read_after_execution,
1222 &FinalizerInterceptorContextRef<'_>,
1223 expected
1224 );
1225 }
1226
1227 #[tokio::test]
1228 async fn test_stop_points() {
1229 let runtime_plugins = || {
1230 RuntimePlugins::new()
1231 .with_operation_plugin(TestOperationRuntimePlugin::new())
1232 .with_operation_plugin(NoAuthRuntimePluginV2::new())
1233 };
1234
1235 let context = invoke_with_stop_point(
1237 "test",
1238 "test",
1239 Input::doesnt_matter(),
1240 &runtime_plugins(),
1241 StopPoint::None,
1242 )
1243 .await
1244 .expect("success");
1245 assert!(context.response().is_some());
1246
1247 let context = invoke_with_stop_point(
1249 "test",
1250 "test",
1251 Input::doesnt_matter(),
1252 &runtime_plugins(),
1253 StopPoint::BeforeTransmit,
1254 )
1255 .await
1256 .expect("success");
1257 assert!(context.response().is_none());
1258 }
1259
1260 #[tokio::test]
1262 async fn test_stop_points_error_handling() {
1263 #[derive(Debug, Default)]
1264 struct Inner {
1265 modify_before_retry_loop_called: AtomicBool,
1266 modify_before_completion_called: AtomicBool,
1267 read_after_execution_called: AtomicBool,
1268 }
1269 #[derive(Clone, Debug, Default)]
1270 struct TestInterceptor {
1271 inner: Arc<Inner>,
1272 }
1273
1274 impl Intercept for TestInterceptor {
1275 fn name(&self) -> &'static str {
1276 "TestInterceptor"
1277 }
1278
1279 fn modify_before_retry_loop(
1280 &self,
1281 _context: &mut BeforeTransmitInterceptorContextMut<'_>,
1282 _rc: &RuntimeComponents,
1283 _cfg: &mut ConfigBag,
1284 ) -> Result<(), BoxError> {
1285 self.inner
1286 .modify_before_retry_loop_called
1287 .store(true, Ordering::Relaxed);
1288 Err("test error".into())
1289 }
1290
1291 fn modify_before_completion(
1292 &self,
1293 _context: &mut FinalizerInterceptorContextMut<'_>,
1294 _rc: &RuntimeComponents,
1295 _cfg: &mut ConfigBag,
1296 ) -> Result<(), BoxError> {
1297 self.inner
1298 .modify_before_completion_called
1299 .store(true, Ordering::Relaxed);
1300 Ok(())
1301 }
1302
1303 fn read_after_execution(
1304 &self,
1305 _context: &FinalizerInterceptorContextRef<'_>,
1306 _rc: &RuntimeComponents,
1307 _cfg: &mut ConfigBag,
1308 ) -> Result<(), BoxError> {
1309 self.inner
1310 .read_after_execution_called
1311 .store(true, Ordering::Relaxed);
1312 Ok(())
1313 }
1314 }
1315
1316 #[derive(Debug)]
1317 struct TestInterceptorRuntimePlugin {
1318 builder: RuntimeComponentsBuilder,
1319 }
1320
1321 impl RuntimePlugin for TestInterceptorRuntimePlugin {
1322 fn runtime_components(
1323 &self,
1324 _: &RuntimeComponentsBuilder,
1325 ) -> Cow<'_, RuntimeComponentsBuilder> {
1326 Cow::Borrowed(&self.builder)
1327 }
1328 }
1329
1330 let interceptor = TestInterceptor::default();
1331 let client = NeverClient::new();
1332 let runtime_plugins = || {
1333 RuntimePlugins::new()
1334 .with_operation_plugin(TestOperationRuntimePlugin::new())
1335 .with_operation_plugin(NoAuthRuntimePluginV2::new())
1336 .with_operation_plugin(TestInterceptorRuntimePlugin {
1337 builder: RuntimeComponentsBuilder::new("test")
1338 .with_interceptor(SharedInterceptor::new(interceptor.clone()))
1339 .with_http_client(Some(client.clone())),
1340 })
1341 };
1342
1343 let _err = invoke_with_stop_point(
1345 "test",
1346 "test",
1347 Input::doesnt_matter(),
1348 &runtime_plugins(),
1349 StopPoint::BeforeTransmit,
1350 )
1351 .await
1352 .expect_err("an error was returned");
1353 assert_eq!(client.num_calls(), 0);
1354
1355 assert!(interceptor
1356 .inner
1357 .modify_before_retry_loop_called
1358 .load(Ordering::Relaxed));
1359 assert!(interceptor
1360 .inner
1361 .modify_before_completion_called
1362 .load(Ordering::Relaxed));
1363 assert!(interceptor
1364 .inner
1365 .read_after_execution_called
1366 .load(Ordering::Relaxed));
1367 }
1368}