aws_smithy_compression/
body.rs1pub mod compress {
12 use aws_smithy_types::body::SdkBody;
13 use pin_project_lite::pin_project;
14
15 pin_project! {
16 pub struct CompressedBody<InnerBody, CompressionImpl> {
21 #[pin]
22 body: InnerBody,
23 compress_request: CompressionImpl,
24 is_end_stream: bool,
25 }
26 }
27
28 impl<CR> CompressedBody<SdkBody, CR> {
29 pub fn new(body: SdkBody, compress_request: CR) -> Self {
31 Self {
32 body,
33 compress_request,
34 is_end_stream: false,
35 }
36 }
37 }
38
39 pub mod http_body_1_x {
41 use crate::body::compress::CompressedBody;
42 use crate::http::CompressRequest;
43 use aws_smithy_runtime_api::box_error::BoxError;
44 use aws_smithy_types::body::SdkBody;
45 use http_body_1x::{Body, Frame, SizeHint};
46 use std::pin::Pin;
47 use std::task::{ready, Context, Poll};
48
49 impl Body for CompressedBody<SdkBody, Box<dyn CompressRequest>> {
50 type Data = bytes::Bytes;
51 type Error = aws_smithy_types::body::Error;
52
53 fn poll_frame(
54 mut self: Pin<&mut Self>,
55 cx: &mut Context<'_>,
56 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
57 let this = self.as_mut().project();
58 Poll::Ready(match ready!(this.body.poll_frame(cx)) {
59 Some(Ok(f)) => {
60 if f.is_data() {
61 let d = f.into_data().expect("we checked for data first");
62 let mut out = Vec::new();
63 this.compress_request.compress_bytes(&d, &mut out)?;
64 Some(Ok(Frame::data(out.into())))
65 } else if f.is_trailers() {
66 Some(Ok(f))
68 } else {
69 unreachable!("Frame is either data or trailers")
70 }
71 }
72 None => {
73 *this.is_end_stream = true;
74 None
75 }
76 other => other,
77 })
78 }
79
80 fn is_end_stream(&self) -> bool {
81 self.is_end_stream
82 }
83
84 fn size_hint(&self) -> SizeHint {
85 SizeHint::default()
88 }
89 }
90 impl CompressedBody<SdkBody, Box<dyn CompressRequest>> {
91 pub fn into_compressed_sdk_body(mut self) -> Result<SdkBody, BoxError> {
96 let mut compressed_body = Vec::new();
97 let bytes = self.body.bytes().ok_or_else(|| "`into_compressed_sdk_body` requires that the inner body is 'in-memory', but it was streaming".to_string())?;
98
99 self.compress_request
100 .compress_bytes(bytes, &mut compressed_body)?;
101 Ok(SdkBody::from(compressed_body))
102 }
103 }
104 }
105}
106
107#[cfg(test)]
108mod test {
109 use crate::body::compress::CompressedBody;
110 use crate::{CompressionAlgorithm, CompressionOptions};
111 use aws_smithy_types::body::SdkBody;
112 use bytes::Buf;
113 use bytes_utils::SegmentedBuf;
114 use std::io::Read;
115 const UNCOMPRESSED_INPUT: &[u8] = b"hello world";
116 const COMPRESSED_OUTPUT: &[u8] = &[
117 31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 203, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 0,
118 133, 17, 74, 13, 11, 0, 0, 0,
119 ];
120
121 use http_body_util::BodyExt;
122
123 #[tokio::test]
124 async fn test_body_is_compressed() {
125 let compression_options = CompressionOptions::default()
126 .with_min_compression_size_bytes(0)
127 .unwrap();
128 let compress_request =
129 CompressionAlgorithm::Gzip.into_impl_http_body_1_x(&compression_options);
130 let body = SdkBody::from(UNCOMPRESSED_INPUT);
131 let mut compressed_body = CompressedBody::new(body, compress_request);
132
133 let mut output = SegmentedBuf::new();
134
135 loop {
136 let data = match compressed_body.frame().await {
137 Some(Ok(frame)) => frame.into_data(),
138 Some(Err(e)) => panic!("Error: {}", e),
139 None => break,
141 }
142 .expect("frame is OK");
143 output.push(data);
144 }
145
146 let mut actual_output = Vec::new();
147 output
148 .reader()
149 .read_to_end(&mut actual_output)
150 .expect("Doesn't cause IO errors");
151 assert_eq!(COMPRESSED_OUTPUT, actual_output);
153 }
154}