aws_smithy_http_server/operation/
upgrade.rs1use std::{
7 convert::Infallible,
8 future::{Future, Ready},
9 marker::PhantomData,
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14use futures_util::ready;
15use pin_project_lite::pin_project;
16use tower::{util::Oneshot, Service, ServiceExt};
17use tracing::error;
18
19use crate::{
20 body::BoxBody, plugin::Plugin, request::FromRequest, response::IntoResponse,
21 runtime_error::InternalFailureException, service::ServiceShape,
22};
23
24use super::OperationShape;
25
26#[derive(Debug, Clone)]
31pub struct UpgradePlugin<Extractors> {
32 _extractors: PhantomData<Extractors>,
33}
34
35impl<Extractors> Default for UpgradePlugin<Extractors> {
36 fn default() -> Self {
37 Self {
38 _extractors: PhantomData,
39 }
40 }
41}
42
43impl<Extractors> UpgradePlugin<Extractors> {
44 pub fn new() -> Self {
46 Self::default()
47 }
48}
49
50impl<Ser, Op, T, Extractors> Plugin<Ser, Op, T> for UpgradePlugin<Extractors>
51where
52 Ser: ServiceShape,
53 Op: OperationShape,
54{
55 type Output = Upgrade<Ser::Protocol, (Op::Input, Extractors), T>;
56
57 fn apply(&self, inner: T) -> Self::Output {
58 Upgrade {
59 _protocol: PhantomData,
60 _input: PhantomData,
61 inner,
62 }
63 }
64}
65
66pub struct Upgrade<Protocol, Input, S> {
69 _protocol: PhantomData<Protocol>,
70 _input: PhantomData<Input>,
71 inner: S,
72}
73
74impl<P, Input, S> Clone for Upgrade<P, Input, S>
75where
76 S: Clone,
77{
78 fn clone(&self) -> Self {
79 Self {
80 _protocol: PhantomData,
81 _input: PhantomData,
82 inner: self.inner.clone(),
83 }
84 }
85}
86
87pin_project! {
88 #[project = InnerProj]
89 #[project_replace = InnerProjReplace]
90 enum Inner<FromFut, HandlerFut> {
91 FromRequest {
92 #[pin]
93 inner: FromFut
94 },
95 Inner {
96 #[pin]
97 call: HandlerFut
98 }
99 }
100}
101
102type InnerAlias<Input, Protocol, B, S> = Inner<<Input as FromRequest<Protocol, B>>::Future, Oneshot<S, Input>>;
103
104pin_project! {
105 pub struct UpgradeFuture<Protocol, Input, B, S>
107 where
108 Input: FromRequest<Protocol, B>,
109 S: Service<Input>,
110 {
111 service: Option<S>,
112 #[pin]
113 inner: InnerAlias<Input, Protocol, B, S>
114 }
115}
116
117impl<P, Input, B, S> Future for UpgradeFuture<P, Input, B, S>
118where
119 Input: FromRequest<P, B>,
120 <Input as FromRequest<P, B>>::Rejection: std::fmt::Display,
121 S: Service<Input>,
122 S::Response: IntoResponse<P>,
123 S::Error: IntoResponse<P>,
124{
125 type Output = Result<http::Response<crate::body::BoxBody>, Infallible>;
126
127 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128 loop {
129 let mut this = self.as_mut().project();
130 let this2 = this.inner.as_mut().project();
131
132 let call = match this2 {
133 InnerProj::FromRequest { inner } => {
134 let result = ready!(inner.poll(cx));
135 match result {
136 Ok(ok) => this
137 .service
138 .take()
139 .expect("futures cannot be polled after completion")
140 .oneshot(ok),
141 Err(err) => {
142 tracing::trace!(error = %err, "parameter for the handler cannot be constructed");
146 return Poll::Ready(Ok(err.into_response()));
147 }
148 }
149 }
150 InnerProj::Inner { call } => {
151 let result = ready!(call.poll(cx));
152 let output = match result {
153 Ok(ok) => ok.into_response(),
154 Err(err) => err.into_response(),
155 };
156 return Poll::Ready(Ok(output));
157 }
158 };
159
160 this.inner.as_mut().project_replace(Inner::Inner { call });
161 }
162 }
163}
164
165impl<P, Input, B, S> Service<http::Request<B>> for Upgrade<P, Input, S>
166where
167 Input: FromRequest<P, B>,
168 <Input as FromRequest<P, B>>::Rejection: std::fmt::Display,
169 S: Service<Input> + Clone,
170 S::Response: IntoResponse<P>,
171 S::Error: IntoResponse<P>,
172{
173 type Response = http::Response<crate::body::BoxBody>;
174 type Error = Infallible;
175 type Future = UpgradeFuture<P, Input, B, S>;
176
177 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
178 Poll::Ready(Ok(()))
181 }
182
183 fn call(&mut self, req: http::Request<B>) -> Self::Future {
184 let clone = self.inner.clone();
185 let service = std::mem::replace(&mut self.inner, clone);
186 UpgradeFuture {
187 service: Some(service),
188 inner: Inner::FromRequest {
189 inner: <Input as FromRequest<P, B>>::from_request(req),
190 },
191 }
192 }
193}
194
195#[derive(Copy)]
197pub struct MissingFailure<P> {
198 _protocol: PhantomData<fn(P)>,
199}
200
201impl<P> Default for MissingFailure<P> {
202 fn default() -> Self {
203 Self { _protocol: PhantomData }
204 }
205}
206
207impl<P> Clone for MissingFailure<P> {
208 fn clone(&self) -> Self {
209 MissingFailure { _protocol: PhantomData }
210 }
211}
212
213impl<R, P> Service<R> for MissingFailure<P>
214where
215 InternalFailureException: IntoResponse<P>,
216{
217 type Response = http::Response<BoxBody>;
218 type Error = Infallible;
219 type Future = Ready<Result<Self::Response, Self::Error>>;
220
221 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
222 Poll::Ready(Ok(()))
223 }
224
225 fn call(&mut self, _request: R) -> Self::Future {
226 error!("the operation has not been set");
227 std::future::ready(Ok(InternalFailureException.into_response()))
228 }
229}