aws_smithy_http_server/operation/
upgrade.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use std::{
7    convert::Infallible,
8    future::{Future, Ready},
9    marker::PhantomData,
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14use futures_util::ready;
15use pin_project_lite::pin_project;
16use tower::{util::Oneshot, Service, ServiceExt};
17use tracing::error;
18
19use crate::{
20    body::BoxBody, plugin::Plugin, request::FromRequest, response::IntoResponse,
21    runtime_error::InternalFailureException, service::ServiceShape,
22};
23
24use super::OperationShape;
25
26/// A [`Plugin`] responsible for taking an operation [`Service`], accepting and returning Smithy
27/// types and converting it into a [`Service`] taking and returning [`http`] types.
28///
29/// See [`Upgrade`].
30#[derive(Debug, Clone)]
31pub struct UpgradePlugin<Extractors> {
32    _extractors: PhantomData<Extractors>,
33}
34
35impl<Extractors> Default for UpgradePlugin<Extractors> {
36    fn default() -> Self {
37        Self {
38            _extractors: PhantomData,
39        }
40    }
41}
42
43impl<Extractors> UpgradePlugin<Extractors> {
44    /// Creates a new [`UpgradePlugin`].
45    pub fn new() -> Self {
46        Self::default()
47    }
48}
49
50impl<Ser, Op, T, Extractors> Plugin<Ser, Op, T> for UpgradePlugin<Extractors>
51where
52    Ser: ServiceShape,
53    Op: OperationShape,
54{
55    type Output = Upgrade<Ser::Protocol, (Op::Input, Extractors), T>;
56
57    fn apply(&self, inner: T) -> Self::Output {
58        Upgrade {
59            _protocol: PhantomData,
60            _input: PhantomData,
61            inner,
62        }
63    }
64}
65
66/// A [`Service`] responsible for wrapping an operation [`Service`] accepting and returning Smithy
67/// types, and converting it into a [`Service`] accepting and returning [`http`] types.
68pub struct Upgrade<Protocol, Input, S> {
69    _protocol: PhantomData<Protocol>,
70    _input: PhantomData<Input>,
71    inner: S,
72}
73
74impl<P, Input, S> Clone for Upgrade<P, Input, S>
75where
76    S: Clone,
77{
78    fn clone(&self) -> Self {
79        Self {
80            _protocol: PhantomData,
81            _input: PhantomData,
82            inner: self.inner.clone(),
83        }
84    }
85}
86
87pin_project! {
88    #[project = InnerProj]
89    #[project_replace = InnerProjReplace]
90    enum Inner<FromFut, HandlerFut> {
91        FromRequest {
92            #[pin]
93            inner: FromFut
94        },
95        Inner {
96            #[pin]
97            call: HandlerFut
98        }
99    }
100}
101
102type InnerAlias<Input, Protocol, B, S> = Inner<<Input as FromRequest<Protocol, B>>::Future, Oneshot<S, Input>>;
103
104pin_project! {
105    /// The [`Service::Future`] of [`Upgrade`].
106    pub struct UpgradeFuture<Protocol, Input, B, S>
107    where
108        Input: FromRequest<Protocol, B>,
109        S: Service<Input>,
110    {
111        service: Option<S>,
112        #[pin]
113        inner: InnerAlias<Input, Protocol, B, S>
114    }
115}
116
117impl<P, Input, B, S> Future for UpgradeFuture<P, Input, B, S>
118where
119    Input: FromRequest<P, B>,
120    <Input as FromRequest<P, B>>::Rejection: std::fmt::Display,
121    S: Service<Input>,
122    S::Response: IntoResponse<P>,
123    S::Error: IntoResponse<P>,
124{
125    type Output = Result<http::Response<crate::body::BoxBody>, Infallible>;
126
127    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128        loop {
129            let mut this = self.as_mut().project();
130            let this2 = this.inner.as_mut().project();
131
132            let call = match this2 {
133                InnerProj::FromRequest { inner } => {
134                    let result = ready!(inner.poll(cx));
135                    match result {
136                        Ok(ok) => this
137                            .service
138                            .take()
139                            .expect("futures cannot be polled after completion")
140                            .oneshot(ok),
141                        Err(err) => {
142                            // The error may arise either from a `FromRequest` failure for any user-defined
143                            // handler's additional input parameters, or from a de-serialization failure
144                            // of an input parameter specific to the operation.
145                            tracing::trace!(error = %err, "parameter for the handler cannot be constructed");
146                            return Poll::Ready(Ok(err.into_response()));
147                        }
148                    }
149                }
150                InnerProj::Inner { call } => {
151                    let result = ready!(call.poll(cx));
152                    let output = match result {
153                        Ok(ok) => ok.into_response(),
154                        Err(err) => err.into_response(),
155                    };
156                    return Poll::Ready(Ok(output));
157                }
158            };
159
160            this.inner.as_mut().project_replace(Inner::Inner { call });
161        }
162    }
163}
164
165impl<P, Input, B, S> Service<http::Request<B>> for Upgrade<P, Input, S>
166where
167    Input: FromRequest<P, B>,
168    <Input as FromRequest<P, B>>::Rejection: std::fmt::Display,
169    S: Service<Input> + Clone,
170    S::Response: IntoResponse<P>,
171    S::Error: IntoResponse<P>,
172{
173    type Response = http::Response<crate::body::BoxBody>;
174    type Error = Infallible;
175    type Future = UpgradeFuture<P, Input, B, S>;
176
177    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
178        // The check that the inner service is ready is done by `Oneshot` in `UpgradeFuture`'s
179        // implementation.
180        Poll::Ready(Ok(()))
181    }
182
183    fn call(&mut self, req: http::Request<B>) -> Self::Future {
184        let clone = self.inner.clone();
185        let service = std::mem::replace(&mut self.inner, clone);
186        UpgradeFuture {
187            service: Some(service),
188            inner: Inner::FromRequest {
189                inner: <Input as FromRequest<P, B>>::from_request(req),
190            },
191        }
192    }
193}
194
195/// A [`Service`] which always returns an internal failure message and logs an error.
196#[derive(Copy)]
197pub struct MissingFailure<P> {
198    _protocol: PhantomData<fn(P)>,
199}
200
201impl<P> Default for MissingFailure<P> {
202    fn default() -> Self {
203        Self { _protocol: PhantomData }
204    }
205}
206
207impl<P> Clone for MissingFailure<P> {
208    fn clone(&self) -> Self {
209        MissingFailure { _protocol: PhantomData }
210    }
211}
212
213impl<R, P> Service<R> for MissingFailure<P>
214where
215    InternalFailureException: IntoResponse<P>,
216{
217    type Response = http::Response<BoxBody>;
218    type Error = Infallible;
219    type Future = Ready<Result<Self::Response, Self::Error>>;
220
221    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
222        Poll::Ready(Ok(()))
223    }
224
225    fn call(&mut self, _request: R) -> Self::Future {
226        error!("the operation has not been set");
227        std::future::ready(Ok(InternalFailureException.into_response()))
228    }
229}