1use aws_smithy_http::header::ParseError;
8use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
9use aws_smithy_runtime_api::{
10 client::{
11 http::{
12 HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings,
13 SharedHttpClient, SharedHttpConnector,
14 },
15 orchestrator::HttpRequest,
16 result::ConnectorError,
17 runtime_components::RuntimeComponents,
18 },
19 http::Response,
20 shared::IntoShared,
21};
22use aws_smithy_types::body::SdkBody;
23use bytes::{Bytes, BytesMut};
24use wasi::http::{
25 outgoing_handler,
26 types::{self as wasi_http, OutgoingBody, RequestOptions},
27};
28
29#[derive(Default, Debug)]
32#[non_exhaustive]
33pub struct WasiHttpClientBuilder {}
34
35impl WasiHttpClientBuilder {
36 pub fn new() -> Self {
38 Default::default()
39 }
40
41 pub fn build(self) -> SharedHttpClient {
43 let client = WasiHttpClient {};
44 client.into_shared()
45 }
46}
47
48#[derive(Debug, Clone)]
52#[non_exhaustive]
53pub struct WasiHttpClient {}
54
55impl HttpClient for WasiHttpClient {
56 fn http_connector(
57 &self,
58 settings: &HttpConnectorSettings,
59 _components: &RuntimeComponents,
60 ) -> SharedHttpConnector {
61 let options = WasiRequestOptions::from(settings);
62 let connector = WasiHttpConnector { options };
63
64 connector.into_shared()
65 }
66
67 fn connector_metadata(&self) -> Option<ConnectorMetadata> {
68 Some(ConnectorMetadata::new("wasi-http-client", None))
69 }
70}
71
72#[derive(Debug, Clone)]
74struct WasiHttpConnector {
75 options: WasiRequestOptions,
76}
77
78impl HttpConnector for WasiHttpConnector {
79 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
80 tracing::trace!("WasiHttpConnector: sending request {request:?}");
81
82 let client = WasiDefaultClient::new(self.options.clone());
83 let http_req = request.try_into_http1x().expect("Http request invalid");
84 let converted_req = http_req.map(|body| match body.bytes() {
85 Some(value) => Bytes::copy_from_slice(value),
86 None => Bytes::new(),
87 });
88
89 let fut_result = client.handle(converted_req);
90
91 HttpConnectorFuture::new(async move {
92 let fut = fut_result?;
93 let response = fut.map(|body| {
94 if body.is_empty() {
95 SdkBody::empty()
96 } else {
97 SdkBody::from(body)
98 }
99 });
100 tracing::trace!("WasiHttpConnector: response received {response:?}");
101
102 let sdk_res = Response::try_from(response)
103 .map_err(|err| ConnectorError::other(err.into(), None))?;
104
105 Ok(sdk_res)
106 })
107 }
108}
109
110struct WasiDefaultClient {
112 options: WasiRequestOptions,
113}
114
115impl WasiDefaultClient {
116 fn new(options: WasiRequestOptions) -> Self {
118 Self { options }
119 }
120
121 fn handle(&self, req: http::Request<Bytes>) -> Result<http::Response<Bytes>, ConnectorError> {
123 let req =
124 WasiRequest::try_from(req).map_err(|err| ConnectorError::other(err.into(), None))?;
125
126 let res = outgoing_handler::handle(req.0, self.options.clone().0)
127 .map_err(|err| ConnectorError::other(err.into(), None))?;
128
129 let subscription = res.subscribe();
132 subscription.block();
133
134 let incoming_res = res
140 .get()
141 .expect("Http response not ready")
142 .expect("Http response accessed more than once")
143 .map_err(|err| ConnectorError::other(err.into(), None))?;
144
145 let response = http::Response::try_from(WasiResponse(incoming_res))
146 .map_err(|err| ConnectorError::other(err.into(), None))?;
147
148 Ok(response)
149 }
150}
151
152#[derive(Debug)]
154struct WasiRequestOptions(Option<outgoing_handler::RequestOptions>);
155
156impl From<&HttpConnectorSettings> for WasiRequestOptions {
157 fn from(value: &HttpConnectorSettings) -> Self {
158 let connect_timeout = value
162 .connect_timeout()
163 .map(|dur| u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX));
164 let read_timeout = value
165 .read_timeout()
166 .map(|dur| u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX));
167
168 let wasi_http_opts = wasi_http::RequestOptions::new();
172 wasi_http_opts
173 .set_connect_timeout(connect_timeout)
174 .expect("Connect timeout not supported");
175 wasi_http_opts
176 .set_first_byte_timeout(read_timeout)
177 .expect("Read timeout not supported");
178
179 WasiRequestOptions(Some(wasi_http_opts))
180 }
181}
182impl Clone for WasiRequestOptions {
185 fn clone(&self) -> Self {
186 let new_opts = if let Some(opts) = &self.0 {
189 let new_opts = RequestOptions::new();
190 new_opts
191 .set_between_bytes_timeout(opts.between_bytes_timeout())
192 .expect("Between bytes timeout");
193 new_opts
194 .set_connect_timeout(opts.connect_timeout())
195 .expect("Connect timeout");
196 new_opts
197 .set_first_byte_timeout(opts.first_byte_timeout())
198 .expect("First byte timeout");
199
200 Some(new_opts)
201 } else {
202 None
203 };
204
205 Self(new_opts)
206 }
207}
208
209#[derive(Debug)]
211struct WasiRequest(outgoing_handler::OutgoingRequest);
212
213impl TryFrom<http::Request<Bytes>> for WasiRequest {
214 type Error = ParseError;
215
216 fn try_from(value: http::Request<Bytes>) -> Result<Self, Self::Error> {
217 let (parts, body) = value.into_parts();
218 let method = WasiMethod::try_from(parts.method)?;
219 let path_with_query = parts.uri.path_and_query().map(|path| path.as_str());
220 let headers = WasiHeaders::try_from(parts.headers)?;
221 let scheme = match parts.uri.scheme_str().unwrap_or("") {
222 "http" => Some(&wasi_http::Scheme::Http),
223 "https" => Some(&wasi_http::Scheme::Https),
224 _ => None,
225 };
226 let authority = parts.uri.authority().map(|auth| auth.as_str());
227
228 let request = wasi_http::OutgoingRequest::new(headers.0);
229 request
230 .set_scheme(scheme)
231 .map_err(|_| ParseError::new("Failed to set HTTP scheme"))?;
232 request
233 .set_method(&method.0)
234 .map_err(|_| ParseError::new("Failed to set HTTP method"))?;
235 request
236 .set_path_with_query(path_with_query)
237 .map_err(|_| ParseError::new("Failed to set HTTP path"))?;
238 request
239 .set_authority(authority)
240 .map_err(|_| ParseError::new("Failed to set HTTP authority"))?;
241
242 let request_body = request.body().expect("Body accessed more than once");
243
244 let request_stream = request_body
245 .write()
246 .expect("Output stream accessed more than once");
247
248 request_stream
249 .blocking_write_and_flush(&body)
250 .map_err(|_| ParseError::new("Failed to write HTTP body"))?;
251
252 drop(request_stream);
256
257 OutgoingBody::finish(request_body, None)
258 .map_err(|_| ParseError::new("Failed to finalize HTTP body"))?;
259
260 Ok(WasiRequest(request))
261 }
262}
263
264struct WasiMethod(wasi_http::Method);
266
267impl TryFrom<http::Method> for WasiMethod {
268 type Error = ParseError;
269
270 fn try_from(method: http::Method) -> Result<Self, Self::Error> {
271 Ok(Self(match method {
272 http::Method::GET => wasi_http::Method::Get,
273 http::Method::POST => wasi_http::Method::Post,
274 http::Method::PUT => wasi_http::Method::Put,
275 http::Method::DELETE => wasi_http::Method::Delete,
276 http::Method::PATCH => wasi_http::Method::Patch,
277 http::Method::CONNECT => wasi_http::Method::Connect,
278 http::Method::TRACE => wasi_http::Method::Trace,
279 http::Method::HEAD => wasi_http::Method::Head,
280 http::Method::OPTIONS => wasi_http::Method::Options,
281 _ => return Err(ParseError::new("failed due to unsupported method, currently supported methods are: GET, POST, PUT, DELETE, PATCH, CONNECT, TRACE, HEAD, and OPTIONS")),
282 }))
283 }
284}
285
286struct WasiResponse(wasi_http::IncomingResponse);
288
289impl TryFrom<WasiResponse> for http::Response<Bytes> {
290 type Error = ParseError;
291
292 fn try_from(value: WasiResponse) -> Result<Self, Self::Error> {
293 let response = value.0;
294
295 let status = response.status();
296
297 let headers = response.headers().entries();
300
301 let res_build = headers
302 .into_iter()
303 .fold(http::Response::builder().status(status), |rb, header| {
304 rb.header(header.0, header.1)
305 });
306
307 let body_incoming = response.consume().expect("Consume called more than once");
308
309 let body_stream = body_incoming
313 .stream()
314 .expect("Stream accessed more than once");
315
316 let mut body = BytesMut::new();
317
318 while let Ok(stream_bytes) = body_stream.blocking_read(u64::MAX) {
320 body.extend_from_slice(stream_bytes.as_slice())
321 }
322
323 drop(body_stream);
324
325 let res = res_build
326 .body(body.freeze())
327 .map_err(|err| ParseError::new(err.to_string()))?;
328
329 Ok(res)
330 }
331}
332
333struct WasiHeaders(wasi_http::Fields);
335
336impl TryFrom<http::HeaderMap> for WasiHeaders {
337 type Error = ParseError;
338
339 fn try_from(headers: http::HeaderMap) -> Result<Self, Self::Error> {
340 let entries = headers
341 .iter()
342 .map(|(name, value)| {
343 (
344 name.to_string(),
345 value.to_str().unwrap().as_bytes().to_vec(),
346 )
347 })
348 .collect::<Vec<_>>();
349
350 let fields = wasi_http::Fields::from_list(&entries)
351 .map_err(|err| ParseError::new(err.to_string()))?;
352
353 Ok(Self(fields))
354 }
355}