1 + | /*
|
2 + | * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
3 + | * SPDX-License-Identifier: Apache-2.0
|
4 + | */
|
5 + |
|
6 + | use aws_smithy_eventstream::frame::{
|
7 + | DecodedFrame, MessageFrameDecoder, UnmarshallMessage, UnmarshalledMessage,
|
8 + | };
|
9 + | use aws_smithy_runtime_api::client::result::{ConnectorError, SdkError};
|
10 + | use aws_smithy_types::body::SdkBody;
|
11 + | use aws_smithy_types::event_stream::{Message, RawMessage};
|
12 + | use bytes::Buf;
|
13 + | use bytes::Bytes;
|
14 + | use bytes_utils::SegmentedBuf;
|
15 + | use std::error::Error as StdError;
|
16 + | use std::fmt;
|
17 + | use std::marker::PhantomData;
|
18 + | use std::mem;
|
19 + | use tracing::trace;
|
20 + |
|
21 + | /// Wrapper around SegmentedBuf that tracks the state of the stream.
|
22 + | #[derive(Debug)]
|
23 + | enum RecvBuf {
|
24 + | /// Nothing has been buffered yet.
|
25 + | Empty,
|
26 + | /// Some data has been buffered.
|
27 + | /// The SegmentedBuf will automatically purge when it reads off the end of a chunk boundary.
|
28 + | Partial(SegmentedBuf<Bytes>),
|
29 + | /// The end of the stream has been reached, but there may still be some buffered data.
|
30 + | EosPartial(SegmentedBuf<Bytes>),
|
31 + | /// An exception terminated this stream.
|
32 + | Terminated,
|
33 + | }
|
34 + |
|
35 + | impl RecvBuf {
|
36 + | /// Returns true if there's more buffered data.
|
37 + | fn has_data(&self) -> bool {
|
38 + | match self {
|
39 + | RecvBuf::Empty | RecvBuf::Terminated => false,
|
40 + | RecvBuf::Partial(segments) | RecvBuf::EosPartial(segments) => segments.remaining() > 0,
|
41 + | }
|
42 + | }
|
43 + |
|
44 + | /// Returns true if the stream has ended.
|
45 + | fn is_eos(&self) -> bool {
|
46 + | matches!(self, RecvBuf::EosPartial(_) | RecvBuf::Terminated)
|
47 + | }
|
48 + |
|
49 + | /// Returns a mutable reference to the underlying buffered data.
|
50 + | fn buffered(&mut self) -> &mut SegmentedBuf<Bytes> {
|
51 + | match self {
|
52 + | RecvBuf::Empty => panic!("buffer must be populated before reading; this is a bug"),
|
53 + | RecvBuf::Partial(segmented) => segmented,
|
54 + | RecvBuf::EosPartial(segmented) => segmented,
|
55 + | RecvBuf::Terminated => panic!("buffer has been terminated; this is a bug"),
|
56 + | }
|
57 + | }
|
58 + |
|
59 + | /// Returns a new `RecvBuf` with additional data buffered. This will only allocate
|
60 + | /// if the `RecvBuf` was previously empty.
|
61 + | fn with_partial(self, partial: Bytes) -> Self {
|
62 + | match self {
|
63 + | RecvBuf::Empty => {
|
64 + | let mut segmented = SegmentedBuf::new();
|
65 + | segmented.push(partial);
|
66 + | RecvBuf::Partial(segmented)
|
67 + | }
|
68 + | RecvBuf::Partial(mut segmented) => {
|
69 + | segmented.push(partial);
|
70 + | RecvBuf::Partial(segmented)
|
71 + | }
|
72 + | RecvBuf::EosPartial(_) | RecvBuf::Terminated => {
|
73 + | panic!("cannot buffer more data after the stream has ended or been terminated; this is a bug")
|
74 + | }
|
75 + | }
|
76 + | }
|
77 + |
|
78 + | /// Returns a `RecvBuf` that has reached end of stream.
|
79 + | fn ended(self) -> Self {
|
80 + | match self {
|
81 + | RecvBuf::Empty => RecvBuf::EosPartial(SegmentedBuf::new()),
|
82 + | RecvBuf::Partial(segmented) => RecvBuf::EosPartial(segmented),
|
83 + | RecvBuf::EosPartial(_) => panic!("already end of stream; this is a bug"),
|
84 + | RecvBuf::Terminated => panic!("stream terminated; this is a bug"),
|
85 + | }
|
86 + | }
|
87 + | }
|
88 + |
|
89 + | #[derive(Debug)]
|
90 + | enum ReceiverErrorKind {
|
91 + | /// The stream ended before a complete message frame was received.
|
92 + | UnexpectedEndOfStream,
|
93 + | }
|
94 + |
|
95 + | /// An error that occurs within an event stream receiver.
|
96 + | #[derive(Debug)]
|
97 + | pub struct ReceiverError {
|
98 + | kind: ReceiverErrorKind,
|
99 + | }
|
100 + |
|
101 + | impl fmt::Display for ReceiverError {
|
102 + | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
103 + | match self.kind {
|
104 + | ReceiverErrorKind::UnexpectedEndOfStream => write!(f, "unexpected end of stream"),
|
105 + | }
|
106 + | }
|
107 + | }
|
108 + |
|
109 + | impl StdError for ReceiverError {}
|
110 + |
|
111 + | /// Receives Smithy-modeled messages out of an Event Stream.
|
112 + | #[derive(Debug)]
|
113 + | pub struct Receiver<T, E> {
|
114 + | unmarshaller: Box<dyn UnmarshallMessage<Output = T, Error = E> + Send + Sync>,
|
115 + | decoder: MessageFrameDecoder,
|
116 + | buffer: RecvBuf,
|
117 + | body: SdkBody,
|
118 + | /// Event Stream has optional initial response frames an with `:message-type` of
|
119 + | /// `initial-response`. If `try_recv_initial()` is called and the next message isn't an
|
120 + | /// initial response, then the message will be stored in `buffered_message` so that it can
|
121 + | /// be returned with the next call of `recv()`.
|
122 + | buffered_message: Option<Message>,
|
123 + | _phantom: PhantomData<E>,
|
124 + | }
|
125 + |
|
126 + | // Used by `Receiver::try_recv_initial`, hence this enum is also doc hidden
|
127 + | #[doc(hidden)]
|
128 + | #[non_exhaustive]
|
129 + | pub enum InitialMessageType {
|
130 + | Request,
|
131 + | Response,
|
132 + | }
|
133 + |
|
134 + | impl InitialMessageType {
|
135 + | fn as_str(&self) -> &'static str {
|
136 + | match self {
|
137 + | InitialMessageType::Request => "initial-request",
|
138 + | InitialMessageType::Response => "initial-response",
|
139 + | }
|
140 + | }
|
141 + | }
|
142 + |
|
143 + | impl<T, E> Receiver<T, E> {
|
144 + | /// Creates a new `Receiver` with the given message unmarshaller and SDK body.
|
145 + | pub fn new(
|
146 + | unmarshaller: impl UnmarshallMessage<Output = T, Error = E> + Send + Sync + 'static,
|
147 + | body: SdkBody,
|
148 + | ) -> Self {
|
149 + | Receiver {
|
150 + | unmarshaller: Box::new(unmarshaller),
|
151 + | decoder: MessageFrameDecoder::new(),
|
152 + | buffer: RecvBuf::Empty,
|
153 + | body,
|
154 + | buffered_message: None,
|
155 + | _phantom: Default::default(),
|
156 + | }
|
157 + | }
|
158 + |
|
159 + | fn unmarshall(&self, message: Message) -> Result<Option<T>, SdkError<E, RawMessage>> {
|
160 + | match self.unmarshaller.unmarshall(&message) {
|
161 + | Ok(unmarshalled) => match unmarshalled {
|
162 + | UnmarshalledMessage::Event(event) => Ok(Some(event)),
|
163 + | UnmarshalledMessage::Error(err) => {
|
164 + | Err(SdkError::service_error(err, RawMessage::Decoded(message)))
|
165 + | }
|
166 + | },
|
167 + | Err(err) => Err(SdkError::response_error(err, RawMessage::Decoded(message))),
|
168 + | }
|
169 + | }
|
170 + |
|
171 + | async fn buffer_next_chunk(&mut self) -> Result<(), SdkError<E, RawMessage>> {
|
172 + | use http_body_04x::Body;
|
173 + |
|
174 + | if !self.buffer.is_eos() {
|
175 + | let next_chunk = self
|
176 + | .body
|
177 + | .data()
|
178 + | .await
|
179 + | .transpose()
|
180 + | .map_err(|err| SdkError::dispatch_failure(ConnectorError::io(err)))?;
|
181 + | let buffer = mem::replace(&mut self.buffer, RecvBuf::Empty);
|
182 + | if let Some(chunk) = next_chunk {
|
183 + | self.buffer = buffer.with_partial(chunk);
|
184 + | } else {
|
185 + | self.buffer = buffer.ended();
|
186 + | }
|
187 + | }
|
188 + | Ok(())
|
189 + | }
|
190 + |
|
191 + | async fn next_message(&mut self) -> Result<Option<Message>, SdkError<E, RawMessage>> {
|
192 + | while !self.buffer.is_eos() {
|
193 + | if self.buffer.has_data() {
|
194 + | if let DecodedFrame::Complete(message) = self
|
195 + | .decoder
|
196 + | .decode_frame(self.buffer.buffered())
|
197 + | .map_err(|err| {
|
198 + | SdkError::response_error(
|
199 + | err,
|
200 + | // the buffer has been consumed
|
201 + | RawMessage::Invalid(None),
|
202 + | )
|
203 + | })?
|
204 + | {
|
205 + | trace!(message = ?message, "received complete event stream message");
|
206 + | return Ok(Some(message));
|
207 + | }
|
208 + | }
|
209 + |
|
210 + | self.buffer_next_chunk().await?;
|
211 + | }
|
212 + | if self.buffer.has_data() {
|
213 + | trace!(remaining_data = ?self.buffer, "data left over in the event stream response stream");
|
214 + | let buf = self.buffer.buffered();
|
215 + | return Err(SdkError::response_error(
|
216 + | ReceiverError {
|
217 + | kind: ReceiverErrorKind::UnexpectedEndOfStream,
|
218 + | },
|
219 + | RawMessage::invalid(Some(buf.copy_to_bytes(buf.remaining()))),
|
220 + | ));
|
221 + | }
|
222 + | Ok(None)
|
223 + | }
|
224 + |
|
225 + | /// Tries to receive the initial response message that has `:event-type` of a given `message_type`.
|
226 + | /// If a different event type is received, then it is buffered and `Ok(None)` is returned.
|
227 + | #[doc(hidden)]
|
228 + | pub async fn try_recv_initial(
|
229 + | &mut self,
|
230 + | message_type: InitialMessageType,
|
231 + | ) -> Result<Option<Message>, SdkError<E, RawMessage>> {
|
232 + | if let Some(message) = self.next_message().await? {
|
233 + | if let Some(event_type) = message
|
234 + | .headers()
|
235 + | .iter()
|
236 + | .find(|h| h.name().as_str() == ":event-type")
|
237 + | {
|
238 + | if event_type
|
239 + | .value()
|
240 + | .as_string()
|
241 + | .map(|s| s.as_str() == message_type.as_str())
|
242 + | .unwrap_or(false)
|
243 + | {
|
244 + | return Ok(Some(message));
|
245 + | }
|
246 + | }
|
247 + | // Buffer the message so that it can be returned by the next call to `recv()`
|
248 + | self.buffered_message = Some(message);
|
249 + | }
|
250 + | Ok(None)
|
251 + | }
|
252 + |
|
253 + | /// Asynchronously tries to receive a message from the stream. If the stream has ended,
|
254 + | /// it returns an `Ok(None)`. If there is a transport layer error, it will return
|
255 + | /// `Err(SdkError::DispatchFailure)`. Service-modeled errors will be a part of the returned
|
256 + | /// messages.
|
257 + | pub async fn recv(&mut self) -> Result<Option<T>, SdkError<E, RawMessage>> {
|
258 + | if let Some(buffered) = self.buffered_message.take() {
|
259 + | return match self.unmarshall(buffered) {
|
260 + | Ok(message) => Ok(message),
|
261 + | Err(error) => {
|
262 + | self.buffer = RecvBuf::Terminated;
|
263 + | Err(error)
|
264 + | }
|
265 + | };
|
266 + | }
|
267 + | if let Some(message) = self.next_message().await? {
|
268 + | match self.unmarshall(message) {
|
269 + | Ok(message) => Ok(message),
|
270 + | Err(error) => {
|
271 + | self.buffer = RecvBuf::Terminated;
|
272 + | Err(error)
|
273 + | }
|
274 + | }
|
275 + | } else {
|
276 + | Ok(None)
|
277 + | }
|
278 + | }
|
279 + | }
|
280 + |
|
281 + | #[cfg(test)]
|
282 + | mod tests {
|
283 + | use super::{InitialMessageType, Receiver, UnmarshallMessage};
|
284 + | use aws_smithy_eventstream::error::Error as EventStreamError;
|
285 + | use aws_smithy_eventstream::frame::{write_message_to, UnmarshalledMessage};
|
286 + | use aws_smithy_runtime_api::client::result::SdkError;
|
287 + | use aws_smithy_types::body::SdkBody;
|
288 + | use aws_smithy_types::event_stream::{Header, HeaderValue, Message};
|
289 + | use bytes::Bytes;
|
290 + | use hyper::body::Body;
|
291 + | use std::error::Error as StdError;
|
292 + | use std::io::{Error as IOError, ErrorKind};
|
293 + |
|
294 + | fn encode_initial_response() -> Bytes {
|
295 + | let mut buffer = Vec::new();
|
296 + | let message = Message::new(Bytes::new())
|
297 + | .add_header(Header::new(
|
298 + | ":message-type",
|
299 + | HeaderValue::String("event".into()),
|
300 + | ))
|
301 + | .add_header(Header::new(
|
302 + | ":event-type",
|
303 + | HeaderValue::String("initial-response".into()),
|
304 + | ));
|
305 + | write_message_to(&message, &mut buffer).unwrap();
|
306 + | buffer.into()
|
307 + | }
|
308 + |
|
309 + | fn encode_message(message: &str) -> Bytes {
|
310 + | let mut buffer = Vec::new();
|
311 + | let message = Message::new(Bytes::copy_from_slice(message.as_bytes()));
|
312 + | write_message_to(&message, &mut buffer).unwrap();
|
313 + | buffer.into()
|
314 + | }
|
315 + |
|
316 + | #[derive(Debug)]
|
317 + | struct FakeError;
|
318 + | impl std::fmt::Display for FakeError {
|
319 + | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
320 + | write!(f, "FakeError")
|
321 + | }
|
322 + | }
|
323 + | impl StdError for FakeError {}
|
324 + |
|
325 + | #[derive(Debug, Eq, PartialEq)]
|
326 + | struct TestMessage(String);
|
327 + |
|
328 + | #[derive(Debug)]
|
329 + | struct Unmarshaller;
|
330 + | impl UnmarshallMessage for Unmarshaller {
|
331 + | type Output = TestMessage;
|
332 + | type Error = EventStreamError;
|
333 + |
|
334 + | fn unmarshall(
|
335 + | &self,
|
336 + | message: &Message,
|
337 + | ) -> Result<UnmarshalledMessage<Self::Output, Self::Error>, EventStreamError> {
|
338 + | Ok(UnmarshalledMessage::Event(TestMessage(
|
339 + | std::str::from_utf8(&message.payload()[..]).unwrap().into(),
|
340 + | )))
|
341 + | }
|
342 + | }
|
343 + |
|
344 + | #[tokio::test]
|
345 + | async fn receive_success() {
|
346 + | let chunks: Vec<Result<_, IOError>> =
|
347 + | vec![Ok(encode_message("one")), Ok(encode_message("two"))];
|
348 + | let chunk_stream = futures_util::stream::iter(chunks);
|
349 + | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
350 + | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
351 + | assert_eq!(
|
352 + | TestMessage("one".into()),
|
353 + | receiver.recv().await.unwrap().unwrap()
|
354 + | );
|
355 + | assert_eq!(
|
356 + | TestMessage("two".into()),
|
357 + | receiver.recv().await.unwrap().unwrap()
|
358 + | );
|
359 + | assert_eq!(None, receiver.recv().await.unwrap());
|
360 + | }
|
361 + |
|
362 + | #[tokio::test]
|
363 + | async fn receive_last_chunk_empty() {
|
364 + | let chunks: Vec<Result<_, IOError>> = vec![
|
365 + | Ok(encode_message("one")),
|
366 + | Ok(encode_message("two")),
|
367 + | Ok(Bytes::from_static(&[])),
|
368 + | ];
|
369 + | let chunk_stream = futures_util::stream::iter(chunks);
|
370 + | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
371 + | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
372 + | assert_eq!(
|
373 + | TestMessage("one".into()),
|
374 + | receiver.recv().await.unwrap().unwrap()
|
375 + | );
|
376 + | assert_eq!(
|
377 + | TestMessage("two".into()),
|
378 + | receiver.recv().await.unwrap().unwrap()
|
379 + | );
|
380 + | assert_eq!(None, receiver.recv().await.unwrap());
|
381 + | }
|
382 + |
|
383 + | #[tokio::test]
|
384 + | async fn receive_last_chunk_not_full_message() {
|
385 + | let chunks: Vec<Result<_, IOError>> = vec![
|
386 + | Ok(encode_message("one")),
|
387 + | Ok(encode_message("two")),
|
388 + | Ok(encode_message("three").split_to(10)),
|
389 + | ];
|
390 + | let chunk_stream = futures_util::stream::iter(chunks);
|
391 + | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
392 + | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
393 + | assert_eq!(
|
394 + | TestMessage("one".into()),
|
395 + | receiver.recv().await.unwrap().unwrap()
|
396 + | );
|
397 + | assert_eq!(
|
398 + | TestMessage("two".into()),
|
399 + | receiver.recv().await.unwrap().unwrap()
|
400 + | );
|
401 + | assert!(matches!(
|
402 + | receiver.recv().await,
|
403 + | Err(SdkError::ResponseError { .. }),
|
404 + | ));
|
405 + | }
|
406 + |
|
407 + | #[tokio::test]
|
408 + | async fn receive_last_chunk_has_multiple_messages() {
|
409 + | let chunks: Vec<Result<_, IOError>> = vec![
|
410 + | Ok(encode_message("one")),
|
411 + | Ok(encode_message("two")),
|
412 + | Ok(Bytes::from(
|
413 + | [encode_message("three"), encode_message("four")].concat(),
|
414 + | )),
|
415 + | ];
|
416 + | let chunk_stream = futures_util::stream::iter(chunks);
|
417 + | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
418 + | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
419 + | assert_eq!(
|
420 + | TestMessage("one".into()),
|
421 + | receiver.recv().await.unwrap().unwrap()
|
422 + | );
|
423 + | assert_eq!(
|
424 + | TestMessage("two".into()),
|
425 + | receiver.recv().await.unwrap().unwrap()
|
426 + | );
|
427 + | assert_eq!(
|
428 + | TestMessage("three".into()),
|
429 + | receiver.recv().await.unwrap().unwrap()
|
430 + | );
|
431 + | assert_eq!(
|
432 + | TestMessage("four".into()),
|
433 + | receiver.recv().await.unwrap().unwrap()
|
434 + | );
|
435 + | assert_eq!(None, receiver.recv().await.unwrap());
|
436 + | }
|
437 + |
|
438 + | proptest::proptest! {
|
439 + | #[test]
|
440 + | fn receive_multiple_messages_split_unevenly_across_chunks(b1: usize, b2: usize) {
|
441 + | let combined = Bytes::from([
|
442 + | encode_message("one"),
|
443 + | encode_message("two"),
|
444 + | encode_message("three"),
|
445 + | encode_message("four"),
|
446 + | encode_message("five"),
|
447 + | encode_message("six"),
|
448 + | encode_message("seven"),
|
449 + | encode_message("eight"),
|
450 + | ].concat());
|
451 + |
|
452 + | let midpoint = combined.len() / 2;
|
453 + | let (start, boundary1, boundary2, end) = (
|
454 + | 0,
|
455 + | b1 % midpoint,
|
456 + | midpoint + b2 % midpoint,
|
457 + | combined.len()
|
458 + | );
|
459 + | println!("[{}, {}], [{}, {}], [{}, {}]", start, boundary1, boundary1, boundary2, boundary2, end);
|
460 + |
|
461 + | let rt = tokio::runtime::Runtime::new().unwrap();
|
462 + | rt.block_on(async move {
|
463 + | let chunks: Vec<Result<_, IOError>> = vec![
|
464 + | Ok(Bytes::copy_from_slice(&combined[start..boundary1])),
|
465 + | Ok(Bytes::copy_from_slice(&combined[boundary1..boundary2])),
|
466 + | Ok(Bytes::copy_from_slice(&combined[boundary2..end])),
|
467 + | ];
|
468 + |
|
469 + | let chunk_stream = futures_util::stream::iter(chunks);
|
470 + | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
471 + | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
472 + | for payload in &["one", "two", "three", "four", "five", "six", "seven", "eight"] {
|
473 + | assert_eq!(
|
474 + | TestMessage((*payload).into()),
|
475 + | receiver.recv().await.unwrap().unwrap()
|
476 + | );
|
477 + | }
|
478 + | assert_eq!(None, receiver.recv().await.unwrap());
|
479 + | });
|
480 + | }
|
481 + | }
|
482 + |
|
483 + | #[tokio::test]
|
484 + | async fn receive_network_failure() {
|
485 + | let chunks: Vec<Result<_, IOError>> = vec![
|
486 + | Ok(encode_message("one")),
|
487 + | Err(IOError::new(ErrorKind::ConnectionReset, FakeError)),
|
488 + | ];
|
489 + | let chunk_stream = futures_util::stream::iter(chunks);
|
490 + | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
491 + | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
492 + | assert_eq!(
|
493 + | TestMessage("one".into()),
|
494 + | receiver.recv().await.unwrap().unwrap()
|
495 + | );
|
496 + | assert!(matches!(
|
497 + | receiver.recv().await,
|
498 + | Err(SdkError::DispatchFailure(_))
|
499 + | ));
|
500 + | }
|
501 + |
|
502 + | #[tokio::test]
|
503 + | async fn receive_message_parse_failure() {
|
504 + | let chunks: Vec<Result<_, IOError>> = vec![
|
505 + | Ok(encode_message("one")),
|
506 + | // A zero length message will be invalid. We need to provide a minimum of 12 bytes
|
507 + | // for the MessageFrameDecoder to actually start parsing it.
|
508 + | Ok(Bytes::from_static(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])),
|
509 + | ];
|
510 + | let chunk_stream = futures_util::stream::iter(chunks);
|
511 + | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
512 + | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
513 + | assert_eq!(
|
514 + | TestMessage("one".into()),
|
515 + | receiver.recv().await.unwrap().unwrap()
|
516 + | );
|
517 + | assert!(matches!(
|
518 + | receiver.recv().await,
|
519 + | Err(SdkError::ResponseError { .. })
|
520 + | ));
|
521 + | }
|
522 + |
|
523 + | #[tokio::test]
|
524 + | async fn receive_initial_response() {
|
525 + | let chunks: Vec<Result<_, IOError>> =
|
526 + | vec![Ok(encode_initial_response()), Ok(encode_message("one"))];
|
527 + | let chunk_stream = futures_util::stream::iter(chunks);
|
528 + | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
529 + | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
530 + | assert!(receiver
|
531 + | .try_recv_initial(InitialMessageType::Response)
|
532 + | .await
|
533 + | .unwrap()
|
534 + | .is_some());
|
535 + | assert_eq!(
|
536 + | TestMessage("one".into()),
|
537 + | receiver.recv().await.unwrap().unwrap()
|
538 + | );
|
539 + | }
|
540 + |
|
541 + | #[tokio::test]
|
542 + | async fn receive_no_initial_response() {
|
543 + | let chunks: Vec<Result<_, IOError>> =
|
544 + | vec![Ok(encode_message("one")), Ok(encode_message("two"))];
|
545 + | let chunk_stream = futures_util::stream::iter(chunks);
|
546 + | let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
|
547 + | let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
|
548 + | assert!(receiver
|
549 + | .try_recv_initial(InitialMessageType::Response)
|
550 + | .await
|
551 + | .unwrap()
|
552 + | .is_none());
|
553 + | assert_eq!(
|
554 + | TestMessage("one".into()),
|
555 + | receiver.recv().await.unwrap().unwrap()
|
556 + | );
|
557 + | assert_eq!(
|
558 + | TestMessage("two".into()),
|
559 + | receiver.recv().await.unwrap().unwrap()
|
560 + | );
|
561 + | }
|
562 + |
|
563 + | fn assert_send_and_sync<T: Send + Sync>() {}
|
564 + |
|
565 + | #[tokio::test]
|
566 + | async fn receiver_is_send_and_sync() {
|
567 + | assert_send_and_sync::<Receiver<(), ()>>();
|
568 + | }
|
569 + | }
|