use aws_smithy_http::header::ParseError;
use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
use aws_smithy_runtime_api::{
client::{
http::{
HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings,
SharedHttpClient, SharedHttpConnector,
},
orchestrator::HttpRequest,
result::ConnectorError,
runtime_components::RuntimeComponents,
},
http::Response,
shared::IntoShared,
};
use aws_smithy_types::body::SdkBody;
use bytes::{Bytes, BytesMut};
use wasi::http::{
outgoing_handler,
types::{self as wasi_http, OutgoingBody, RequestOptions},
};
#[derive(Default, Debug)]
#[non_exhaustive]
pub struct WasiHttpClientBuilder {}
impl WasiHttpClientBuilder {
pub fn new() -> Self {
Default::default()
}
pub fn build(self) -> SharedHttpClient {
let client = WasiHttpClient {};
client.into_shared()
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct WasiHttpClient {}
impl HttpClient for WasiHttpClient {
fn http_connector(
&self,
settings: &HttpConnectorSettings,
_components: &RuntimeComponents,
) -> SharedHttpConnector {
let options = WasiRequestOptions::from(settings);
let connector = WasiHttpConnector { options };
connector.into_shared()
}
fn connector_metadata(&self) -> Option<ConnectorMetadata> {
Some(ConnectorMetadata::new("wasi-http-client", None))
}
}
#[derive(Debug, Clone)]
struct WasiHttpConnector {
options: WasiRequestOptions,
}
impl HttpConnector for WasiHttpConnector {
fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
tracing::trace!("WasiHttpConnector: sending request {request:?}");
let client = WasiDefaultClient::new(self.options.clone());
let http_req = request.try_into_http1x().expect("Http request invalid");
let converted_req = http_req.map(|body| match body.bytes() {
Some(value) => Bytes::copy_from_slice(value),
None => Bytes::new(),
});
let fut_result = client.handle(converted_req);
HttpConnectorFuture::new(async move {
let fut = fut_result?;
let response = fut.map(|body| {
if body.is_empty() {
SdkBody::empty()
} else {
SdkBody::from(body)
}
});
tracing::trace!("WasiHttpConnector: response received {response:?}");
let sdk_res = Response::try_from(response)
.map_err(|err| ConnectorError::other(err.into(), None))?;
Ok(sdk_res)
})
}
}
struct WasiDefaultClient {
options: WasiRequestOptions,
}
impl WasiDefaultClient {
fn new(options: WasiRequestOptions) -> Self {
Self { options }
}
fn handle(&self, req: http::Request<Bytes>) -> Result<http::Response<Bytes>, ConnectorError> {
let req =
WasiRequest::try_from(req).map_err(|err| ConnectorError::other(err.into(), None))?;
let res = outgoing_handler::handle(req.0, self.options.clone().0)
.map_err(|err| ConnectorError::other(err.into(), None))?;
let subscription = res.subscribe();
subscription.block();
let incoming_res = res
.get()
.expect("Http response not ready")
.expect("Http response accessed more than once")
.map_err(|err| ConnectorError::other(err.into(), None))?;
let response = http::Response::try_from(WasiResponse(incoming_res))
.map_err(|err| ConnectorError::other(err.into(), None))?;
Ok(response)
}
}
#[derive(Debug)]
struct WasiRequestOptions(Option<outgoing_handler::RequestOptions>);
impl From<&HttpConnectorSettings> for WasiRequestOptions {
fn from(value: &HttpConnectorSettings) -> Self {
let connect_timeout = value
.connect_timeout()
.map(|dur| u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX));
let read_timeout = value
.read_timeout()
.map(|dur| u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX));
let wasi_http_opts = wasi_http::RequestOptions::new();
wasi_http_opts
.set_connect_timeout(connect_timeout)
.expect("Connect timeout not supported");
wasi_http_opts
.set_first_byte_timeout(read_timeout)
.expect("Read timeout not supported");
WasiRequestOptions(Some(wasi_http_opts))
}
}
impl Clone for WasiRequestOptions {
fn clone(&self) -> Self {
let new_opts = if let Some(opts) = &self.0 {
let new_opts = RequestOptions::new();
new_opts
.set_between_bytes_timeout(opts.between_bytes_timeout())
.expect("Between bytes timeout");
new_opts
.set_connect_timeout(opts.connect_timeout())
.expect("Connect timeout");
new_opts
.set_first_byte_timeout(opts.first_byte_timeout())
.expect("First byte timeout");
Some(new_opts)
} else {
None
};
Self(new_opts)
}
}
#[derive(Debug)]
struct WasiRequest(outgoing_handler::OutgoingRequest);
impl TryFrom<http::Request<Bytes>> for WasiRequest {
type Error = ParseError;
fn try_from(value: http::Request<Bytes>) -> Result<Self, Self::Error> {
let (parts, body) = value.into_parts();
let method = WasiMethod::try_from(parts.method)?;
let path_with_query = parts.uri.path_and_query().map(|path| path.as_str());
let headers = WasiHeaders::try_from(parts.headers)?;
let scheme = match parts.uri.scheme_str().unwrap_or("") {
"http" => Some(&wasi_http::Scheme::Http),
"https" => Some(&wasi_http::Scheme::Https),
_ => None,
};
let authority = parts.uri.authority().map(|auth| auth.as_str());
let request = wasi_http::OutgoingRequest::new(headers.0);
request
.set_scheme(scheme)
.map_err(|_| ParseError::new("Failed to set HTTP scheme"))?;
request
.set_method(&method.0)
.map_err(|_| ParseError::new("Failed to set HTTP method"))?;
request
.set_path_with_query(path_with_query)
.map_err(|_| ParseError::new("Failed to set HTTP path"))?;
request
.set_authority(authority)
.map_err(|_| ParseError::new("Failed to set HTTP authority"))?;
let request_body = request.body().expect("Body accessed more than once");
let request_stream = request_body
.write()
.expect("Output stream accessed more than once");
request_stream
.blocking_write_and_flush(&body)
.map_err(|_| ParseError::new("Failed to write HTTP body"))?;
drop(request_stream);
OutgoingBody::finish(request_body, None)
.map_err(|_| ParseError::new("Failed to finalize HTTP body"))?;
Ok(WasiRequest(request))
}
}
struct WasiMethod(wasi_http::Method);
impl TryFrom<http::Method> for WasiMethod {
type Error = ParseError;
fn try_from(method: http::Method) -> Result<Self, Self::Error> {
Ok(Self(match method {
http::Method::GET => wasi_http::Method::Get,
http::Method::POST => wasi_http::Method::Post,
http::Method::PUT => wasi_http::Method::Put,
http::Method::DELETE => wasi_http::Method::Delete,
http::Method::PATCH => wasi_http::Method::Patch,
http::Method::CONNECT => wasi_http::Method::Connect,
http::Method::TRACE => wasi_http::Method::Trace,
http::Method::HEAD => wasi_http::Method::Head,
http::Method::OPTIONS => wasi_http::Method::Options,
_ => return Err(ParseError::new("failed due to unsupported method, currently supported methods are: GET, POST, PUT, DELETE, PATCH, CONNECT, TRACE, HEAD, and OPTIONS")),
}))
}
}
struct WasiResponse(wasi_http::IncomingResponse);
impl TryFrom<WasiResponse> for http::Response<Bytes> {
type Error = ParseError;
fn try_from(value: WasiResponse) -> Result<Self, Self::Error> {
let response = value.0;
let status = response.status();
let headers = response.headers().entries();
let res_build = headers
.into_iter()
.fold(http::Response::builder().status(status), |rb, header| {
rb.header(header.0, header.1)
});
let body_incoming = response.consume().expect("Consume called more than once");
let body_stream = body_incoming
.stream()
.expect("Stream accessed more than once");
let mut body = BytesMut::new();
while let Ok(stream_bytes) = body_stream.blocking_read(u64::MAX) {
body.extend_from_slice(stream_bytes.as_slice())
}
drop(body_stream);
let res = res_build
.body(body.freeze())
.map_err(|err| ParseError::new(err.to_string()))?;
Ok(res)
}
}
struct WasiHeaders(wasi_http::Fields);
impl TryFrom<http::HeaderMap> for WasiHeaders {
type Error = ParseError;
fn try_from(headers: http::HeaderMap) -> Result<Self, Self::Error> {
let entries = headers
.iter()
.map(|(name, value)| {
(
name.to_string(),
value.to_str().unwrap().as_bytes().to_vec(),
)
})
.collect::<Vec<_>>();
let fields = wasi_http::Fields::from_list(&entries)
.map_err(|err| ParseError::new(err.to_string()))?;
Ok(Self(fields))
}
}