1 + | // Code generated by software.amazon.smithy.rust.codegen.smithy-rs. DO NOT EDIT.
|
2 + | #[::pyo3::pyclass]
|
3 + | #[derive(::std::clone::Clone, ::std::fmt::Debug)]
|
4 + | pub struct InputStreamInputStreamReceiver {
|
5 + | inner: ::std::sync::Arc<
|
6 + | ::tokio::sync::Mutex<
|
7 + | ::aws_smithy_legacy_http::event_stream::Receiver<
|
8 + | crate::model::EventStream,
|
9 + | crate::error::EventStreamError,
|
10 + | >,
|
11 + | >,
|
12 + | >,
|
13 + | }
|
14 + | impl InputStreamInputStreamReceiver {
|
15 + | pub fn new(
|
16 + | unmarshaller: impl ::aws_smithy_eventstream::frame::UnmarshallMessage<
|
17 + | Output = crate::model::EventStream,
|
18 + | Error = crate::error::EventStreamError,
|
19 + | > + ::std::marker::Send
|
20 + | + ::std::marker::Sync
|
21 + | + 'static,
|
22 + | body: ::aws_smithy_types::body::SdkBody,
|
23 + | ) -> InputStreamInputStreamReceiver {
|
24 + | let inner = ::aws_smithy_legacy_http::event_stream::Receiver::new(unmarshaller, body);
|
25 + | let inner = ::std::sync::Arc::new(::tokio::sync::Mutex::new(inner));
|
26 + | InputStreamInputStreamReceiver { inner }
|
27 + | }
|
28 + |
|
29 + | pub async fn try_recv_initial(
|
30 + | &mut self,
|
31 + | message_type: ::aws_smithy_legacy_http::event_stream::InitialMessageType,
|
32 + | ) -> Result<
|
33 + | ::std::option::Option<::aws_smithy_types::event_stream::Message>,
|
34 + | ::aws_smithy_runtime_api::client::result::SdkError<
|
35 + | crate::error::EventStreamError,
|
36 + | ::aws_smithy_types::event_stream::RawMessage,
|
37 + | >,
|
38 + | > {
|
39 + | let mut inner = self.inner.lock().await;
|
40 + | inner.try_recv_initial(message_type).await
|
41 + | }
|
42 + | }
|
43 + | #[::pyo3::pymethods]
|
44 + | impl InputStreamInputStreamReceiver {
|
45 + | pub fn __aiter__(slf: ::pyo3::PyRef<Self>) -> ::pyo3::PyRef<Self> {
|
46 + | slf
|
47 + | }
|
48 + |
|
49 + | pub fn __anext__(slf: ::pyo3::PyRefMut<Self>) -> ::pyo3::PyResult<Option<::pyo3::PyObject>> {
|
50 + | let body = slf.inner.clone();
|
51 + | let fut = ::pyo3_asyncio::tokio::future_into_py(slf.py(), async move {
|
52 + | let mut inner = body.lock().await;
|
53 + | let next = inner.recv().await;
|
54 + | match next {
|
55 + | Ok(Some(data)) => Ok(::pyo3::Python::with_gil(|py| {
|
56 + | ::pyo3::IntoPy::into_py(data, py)
|
57 + | })),
|
58 + | Ok(None) => Err(::pyo3::exceptions::PyStopAsyncIteration::new_err(
|
59 + | "stream exhausted",
|
60 + | )),
|
61 + | Err(::aws_smithy_runtime_api::client::result::SdkError::ServiceError(
|
62 + | service_err,
|
63 + | )) => Err(service_err.into_err().into()),
|
64 + | Err(err) => Err(::pyo3::exceptions::PyRuntimeError::new_err(err.to_string())),
|
65 + | }
|
66 + | })?;
|
67 + | Ok(Some(fut.into()))
|
68 + | }
|
69 + | }
|
70 + |
|
71 + | #[derive(::std::clone::Clone, ::std::fmt::Debug)]
|
72 + | pub struct OutputStreamOutputStreamEventStreamSender {
|
73 + | inner: ::std::sync::Arc<
|
74 + | ::parking_lot::Mutex<
|
75 + | ::std::option::Option<
|
76 + | ::aws_smithy_legacy_http::event_stream::EventStreamSender<
|
77 + | crate::model::EventStream,
|
78 + | crate::error::EventStreamError,
|
79 + | >,
|
80 + | >,
|
81 + | >,
|
82 + | >,
|
83 + | }
|
84 + | impl OutputStreamOutputStreamEventStreamSender {
|
85 + | pub fn into_body_stream(
|
86 + | self,
|
87 + | marshaller: impl ::aws_smithy_eventstream::frame::MarshallMessage<Input = crate::model::EventStream>
|
88 + | + ::std::marker::Send
|
89 + | + ::std::marker::Sync
|
90 + | + 'static,
|
91 + | error_marshaller: impl ::aws_smithy_eventstream::frame::MarshallMessage<Input = crate::error::EventStreamError>
|
92 + | + ::std::marker::Send
|
93 + | + ::std::marker::Sync
|
94 + | + 'static,
|
95 + | signer: impl ::aws_smithy_eventstream::frame::SignMessage
|
96 + | + ::std::marker::Send
|
97 + | + ::std::marker::Sync
|
98 + | + 'static,
|
99 + | ) -> ::aws_smithy_legacy_http::event_stream::MessageStreamAdapter<
|
100 + | crate::model::EventStream,
|
101 + | crate::error::EventStreamError,
|
102 + | > {
|
103 + | let mut inner = self.inner.lock();
|
104 + | let inner = inner.take().expect(
|
105 + | "attempted to reuse an event stream. \
|
106 + | that means you kept a reference to an event stream and tried to reuse it in another request, \
|
107 + | event streams are request scoped and shouldn't be used outside of their bounded request scope"
|
108 + | );
|
109 + | inner.into_body_stream(marshaller, error_marshaller, signer)
|
110 + | }
|
111 + | }
|
112 + | impl<'source> ::pyo3::FromPyObject<'source> for OutputStreamOutputStreamEventStreamSender {
|
113 + | fn extract(obj: &'source ::pyo3::PyAny) -> ::pyo3::PyResult<Self> {
|
114 + | use ::tokio_stream::StreamExt;
|
115 + | let stream = ::pyo3_asyncio::tokio::into_stream_v1(obj)?;
|
116 + | let stream = stream.filter_map(|res| {
|
117 + | ::pyo3::Python::with_gil(|py| {
|
118 + | // TODO(EventStreamImprovements): Add `InternalServerError` variant to all event streaming
|
119 + | // errors and return that variant in case of errors here?
|
120 + | match res {
|
121 + | Ok(obj) => {
|
122 + | match obj.extract::<crate::model::EventStream>(py) {
|
123 + | Ok(it) => Some(Ok(it)),
|
124 + | Err(err) => {
|
125 + | let rich_py_err = ::aws_smithy_http_server_python::rich_py_err(err);
|
126 + | ::tracing::error!(error = ?rich_py_err, "could not extract the output type 'crate::model::EventStream' from streamed value");
|
127 + | None
|
128 + | },
|
129 + | }
|
130 + | },
|
131 + | Err(err) => {
|
132 + | match ::pyo3::IntoPy::into_py(err, py).extract::<crate::error::EventStreamError>(py) {
|
133 + | Ok(modelled_error) => Some(Err(modelled_error)),
|
134 + | Err(err) => {
|
135 + | let rich_py_err = ::aws_smithy_http_server_python::rich_py_err(err);
|
136 + | ::tracing::error!(error = ?rich_py_err, "could not extract the error type 'crate::error::EventStreamError' from raised exception");
|
137 + | None
|
138 + | }
|
139 + | }
|
140 + | }
|
141 + | }
|
142 + | })
|
143 + | });
|
144 + |
|
145 + | Ok(OutputStreamOutputStreamEventStreamSender {
|
146 + | inner: ::std::sync::Arc::new(::parking_lot::Mutex::new(Some(stream.into()))),
|
147 + | })
|
148 + | }
|
149 + | }
|
150 + |
|
151 + | impl ::pyo3::IntoPy<::pyo3::PyObject> for OutputStreamOutputStreamEventStreamSender {
|
152 + | fn into_py(self, py: ::pyo3::Python<'_>) -> ::pyo3::PyObject {
|
153 + | ::pyo3::exceptions::PyAttributeError::new_err("this is a write-only object").into_py(py)
|
154 + | }
|
155 + | }
|
156 + |
|
157 + | #[::pyo3::pyclass]
|
158 + | #[derive(::std::clone::Clone, ::std::fmt::Debug)]
|
159 + | pub struct DuplexStreamInputStreamReceiver {
|
160 + | inner: ::std::sync::Arc<
|
161 + | ::tokio::sync::Mutex<
|
162 + | ::aws_smithy_legacy_http::event_stream::Receiver<
|
163 + | crate::model::EventStream,
|
164 + | crate::error::EventStreamError,
|
165 + | >,
|
166 + | >,
|
167 + | >,
|
168 + | }
|
169 + | impl DuplexStreamInputStreamReceiver {
|
170 + | pub fn new(
|
171 + | unmarshaller: impl ::aws_smithy_eventstream::frame::UnmarshallMessage<
|
172 + | Output = crate::model::EventStream,
|
173 + | Error = crate::error::EventStreamError,
|
174 + | > + ::std::marker::Send
|
175 + | + ::std::marker::Sync
|
176 + | + 'static,
|
177 + | body: ::aws_smithy_types::body::SdkBody,
|
178 + | ) -> DuplexStreamInputStreamReceiver {
|
179 + | let inner = ::aws_smithy_legacy_http::event_stream::Receiver::new(unmarshaller, body);
|
180 + | let inner = ::std::sync::Arc::new(::tokio::sync::Mutex::new(inner));
|
181 + | DuplexStreamInputStreamReceiver { inner }
|
182 + | }
|
183 + |
|
184 + | pub async fn try_recv_initial(
|
185 + | &mut self,
|
186 + | message_type: ::aws_smithy_legacy_http::event_stream::InitialMessageType,
|
187 + | ) -> Result<
|
188 + | ::std::option::Option<::aws_smithy_types::event_stream::Message>,
|
189 + | ::aws_smithy_runtime_api::client::result::SdkError<
|
190 + | crate::error::EventStreamError,
|
191 + | ::aws_smithy_types::event_stream::RawMessage,
|
192 + | >,
|
193 + | > {
|
194 + | let mut inner = self.inner.lock().await;
|
195 + | inner.try_recv_initial(message_type).await
|
196 + | }
|
197 + | }
|
198 + | #[::pyo3::pymethods]
|
199 + | impl DuplexStreamInputStreamReceiver {
|
200 + | pub fn __aiter__(slf: ::pyo3::PyRef<Self>) -> ::pyo3::PyRef<Self> {
|
201 + | slf
|
202 + | }
|
203 + |
|
204 + | pub fn __anext__(slf: ::pyo3::PyRefMut<Self>) -> ::pyo3::PyResult<Option<::pyo3::PyObject>> {
|
205 + | let body = slf.inner.clone();
|
206 + | let fut = ::pyo3_asyncio::tokio::future_into_py(slf.py(), async move {
|
207 + | let mut inner = body.lock().await;
|
208 + | let next = inner.recv().await;
|
209 + | match next {
|
210 + | Ok(Some(data)) => Ok(::pyo3::Python::with_gil(|py| {
|
211 + | ::pyo3::IntoPy::into_py(data, py)
|
212 + | })),
|
213 + | Ok(None) => Err(::pyo3::exceptions::PyStopAsyncIteration::new_err(
|
214 + | "stream exhausted",
|
215 + | )),
|
216 + | Err(::aws_smithy_runtime_api::client::result::SdkError::ServiceError(
|
217 + | service_err,
|
218 + | )) => Err(service_err.into_err().into()),
|
219 + | Err(err) => Err(::pyo3::exceptions::PyRuntimeError::new_err(err.to_string())),
|
220 + | }
|
221 + | })?;
|
222 + | Ok(Some(fut.into()))
|
223 + | }
|
224 + | }
|
225 + |
|
226 + | #[derive(::std::clone::Clone, ::std::fmt::Debug)]
|
227 + | pub struct DuplexStreamOutputStreamEventStreamSender {
|
228 + | inner: ::std::sync::Arc<
|
229 + | ::parking_lot::Mutex<
|
230 + | ::std::option::Option<
|
231 + | ::aws_smithy_legacy_http::event_stream::EventStreamSender<
|
232 + | crate::model::EventStream,
|
233 + | crate::error::EventStreamError,
|
234 + | >,
|
235 + | >,
|
236 + | >,
|
237 + | >,
|
238 + | }
|
239 + | impl DuplexStreamOutputStreamEventStreamSender {
|
240 + | pub fn into_body_stream(
|
241 + | self,
|
242 + | marshaller: impl ::aws_smithy_eventstream::frame::MarshallMessage<Input = crate::model::EventStream>
|
243 + | + ::std::marker::Send
|
244 + | + ::std::marker::Sync
|
245 + | + 'static,
|
246 + | error_marshaller: impl ::aws_smithy_eventstream::frame::MarshallMessage<Input = crate::error::EventStreamError>
|
247 + | + ::std::marker::Send
|
248 + | + ::std::marker::Sync
|
249 + | + 'static,
|
250 + | signer: impl ::aws_smithy_eventstream::frame::SignMessage
|
251 + | + ::std::marker::Send
|
252 + | + ::std::marker::Sync
|
253 + | + 'static,
|
254 + | ) -> ::aws_smithy_legacy_http::event_stream::MessageStreamAdapter<
|
255 + | crate::model::EventStream,
|
256 + | crate::error::EventStreamError,
|
257 + | > {
|
258 + | let mut inner = self.inner.lock();
|
259 + | let inner = inner.take().expect(
|
260 + | "attempted to reuse an event stream. \
|
261 + | that means you kept a reference to an event stream and tried to reuse it in another request, \
|
262 + | event streams are request scoped and shouldn't be used outside of their bounded request scope"
|
263 + | );
|
264 + | inner.into_body_stream(marshaller, error_marshaller, signer)
|
265 + | }
|
266 + | }
|
267 + | impl<'source> ::pyo3::FromPyObject<'source> for DuplexStreamOutputStreamEventStreamSender {
|
268 + | fn extract(obj: &'source ::pyo3::PyAny) -> ::pyo3::PyResult<Self> {
|
269 + | use ::tokio_stream::StreamExt;
|
270 + | let stream = ::pyo3_asyncio::tokio::into_stream_v1(obj)?;
|
271 + | let stream = stream.filter_map(|res| {
|
272 + | ::pyo3::Python::with_gil(|py| {
|
273 + | // TODO(EventStreamImprovements): Add `InternalServerError` variant to all event streaming
|
274 + | // errors and return that variant in case of errors here?
|
275 + | match res {
|
276 + | Ok(obj) => {
|
277 + | match obj.extract::<crate::model::EventStream>(py) {
|
278 + | Ok(it) => Some(Ok(it)),
|
279 + | Err(err) => {
|
280 + | let rich_py_err = ::aws_smithy_http_server_python::rich_py_err(err);
|
281 + | ::tracing::error!(error = ?rich_py_err, "could not extract the output type 'crate::model::EventStream' from streamed value");
|
282 + | None
|
283 + | },
|
284 + | }
|
285 + | },
|
286 + | Err(err) => {
|
287 + | match ::pyo3::IntoPy::into_py(err, py).extract::<crate::error::EventStreamError>(py) {
|
288 + | Ok(modelled_error) => Some(Err(modelled_error)),
|
289 + | Err(err) => {
|
290 + | let rich_py_err = ::aws_smithy_http_server_python::rich_py_err(err);
|
291 + | ::tracing::error!(error = ?rich_py_err, "could not extract the error type 'crate::error::EventStreamError' from raised exception");
|
292 + | None
|
293 + | }
|
294 + | }
|
295 + | }
|
296 + | }
|
297 + | })
|
298 + | });
|
299 + |
|
300 + | Ok(DuplexStreamOutputStreamEventStreamSender {
|
301 + | inner: ::std::sync::Arc::new(::parking_lot::Mutex::new(Some(stream.into()))),
|
302 + | })
|
303 + | }
|
304 + | }
|
305 + |
|
306 + | impl ::pyo3::IntoPy<::pyo3::PyObject> for DuplexStreamOutputStreamEventStreamSender {
|
307 + | fn into_py(self, py: ::pyo3::Python<'_>) -> ::pyo3::PyObject {
|
308 + | ::pyo3::exceptions::PyAttributeError::new_err("this is a write-only object").into_py(py)
|
309 + | }
|
310 + | }
|
311 + |
|
312 + | #[::pyo3::pyclass]
|
313 + | #[derive(::std::clone::Clone, ::std::fmt::Debug)]
|
314 + | pub struct InputStreamWithInitialRequestInputStreamReceiver {
|
315 + | inner: ::std::sync::Arc<
|
316 + | ::tokio::sync::Mutex<
|
317 + | ::aws_smithy_legacy_http::event_stream::Receiver<
|
318 + | crate::model::EventStream,
|
319 + | crate::error::EventStreamError,
|
320 + | >,
|
321 + | >,
|
322 + | >,
|
323 + | }
|
324 + | impl InputStreamWithInitialRequestInputStreamReceiver {
|
325 + | pub fn new(
|
326 + | unmarshaller: impl ::aws_smithy_eventstream::frame::UnmarshallMessage<
|
327 + | Output = crate::model::EventStream,
|
328 + | Error = crate::error::EventStreamError,
|
329 + | > + ::std::marker::Send
|
330 + | + ::std::marker::Sync
|
331 + | + 'static,
|
332 + | body: ::aws_smithy_types::body::SdkBody,
|
333 + | ) -> InputStreamWithInitialRequestInputStreamReceiver {
|
334 + | let inner = ::aws_smithy_legacy_http::event_stream::Receiver::new(unmarshaller, body);
|
335 + | let inner = ::std::sync::Arc::new(::tokio::sync::Mutex::new(inner));
|
336 + | InputStreamWithInitialRequestInputStreamReceiver { inner }
|
337 + | }
|
338 + |
|
339 + | pub async fn try_recv_initial(
|
340 + | &mut self,
|
341 + | message_type: ::aws_smithy_legacy_http::event_stream::InitialMessageType,
|
342 + | ) -> Result<
|
343 + | ::std::option::Option<::aws_smithy_types::event_stream::Message>,
|
344 + | ::aws_smithy_runtime_api::client::result::SdkError<
|
345 + | crate::error::EventStreamError,
|
346 + | ::aws_smithy_types::event_stream::RawMessage,
|
347 + | >,
|
348 + | > {
|
349 + | let mut inner = self.inner.lock().await;
|
350 + | inner.try_recv_initial(message_type).await
|
351 + | }
|
352 + | }
|
353 + | #[::pyo3::pymethods]
|
354 + | impl InputStreamWithInitialRequestInputStreamReceiver {
|
355 + | pub fn __aiter__(slf: ::pyo3::PyRef<Self>) -> ::pyo3::PyRef<Self> {
|
356 + | slf
|
357 + | }
|
358 + |
|
359 + | pub fn __anext__(slf: ::pyo3::PyRefMut<Self>) -> ::pyo3::PyResult<Option<::pyo3::PyObject>> {
|
360 + | let body = slf.inner.clone();
|
361 + | let fut = ::pyo3_asyncio::tokio::future_into_py(slf.py(), async move {
|
362 + | let mut inner = body.lock().await;
|
363 + | let next = inner.recv().await;
|
364 + | match next {
|
365 + | Ok(Some(data)) => Ok(::pyo3::Python::with_gil(|py| {
|
366 + | ::pyo3::IntoPy::into_py(data, py)
|
367 + | })),
|
368 + | Ok(None) => Err(::pyo3::exceptions::PyStopAsyncIteration::new_err(
|
369 + | "stream exhausted",
|
370 + | )),
|
371 + | Err(::aws_smithy_runtime_api::client::result::SdkError::ServiceError(
|
372 + | service_err,
|
373 + | )) => Err(service_err.into_err().into()),
|
374 + | Err(err) => Err(::pyo3::exceptions::PyRuntimeError::new_err(err.to_string())),
|
375 + | }
|
376 + | })?;
|
377 + | Ok(Some(fut.into()))
|
378 + | }
|
379 + | }
|
380 + |
|
381 + | #[derive(::std::clone::Clone, ::std::fmt::Debug)]
|
382 + | pub struct OutputStreamWithInitialResponseOutputStreamEventStreamSender {
|
383 + | inner: ::std::sync::Arc<
|
384 + | ::parking_lot::Mutex<
|
385 + | ::std::option::Option<
|
386 + | ::aws_smithy_legacy_http::event_stream::EventStreamSender<
|
387 + | crate::model::EventStream,
|
388 + | crate::error::EventStreamError,
|
389 + | >,
|
390 + | >,
|
391 + | >,
|
392 + | >,
|
393 + | }
|
394 + | impl OutputStreamWithInitialResponseOutputStreamEventStreamSender {
|
395 + | pub fn into_body_stream(
|
396 + | self,
|
397 + | marshaller: impl ::aws_smithy_eventstream::frame::MarshallMessage<Input = crate::model::EventStream>
|
398 + | + ::std::marker::Send
|
399 + | + ::std::marker::Sync
|
400 + | + 'static,
|
401 + | error_marshaller: impl ::aws_smithy_eventstream::frame::MarshallMessage<Input = crate::error::EventStreamError>
|
402 + | + ::std::marker::Send
|
403 + | + ::std::marker::Sync
|
404 + | + 'static,
|
405 + | signer: impl ::aws_smithy_eventstream::frame::SignMessage
|
406 + | + ::std::marker::Send
|
407 + | + ::std::marker::Sync
|
408 + | + 'static,
|
409 + | ) -> ::aws_smithy_legacy_http::event_stream::MessageStreamAdapter<
|
410 + | crate::model::EventStream,
|
411 + | crate::error::EventStreamError,
|
412 + | > {
|
413 + | let mut inner = self.inner.lock();
|
414 + | let inner = inner.take().expect(
|
415 + | "attempted to reuse an event stream. \
|
416 + | that means you kept a reference to an event stream and tried to reuse it in another request, \
|
417 + | event streams are request scoped and shouldn't be used outside of their bounded request scope"
|
418 + | );
|
419 + | inner.into_body_stream(marshaller, error_marshaller, signer)
|
420 + | }
|
421 + | }
|
422 + | impl<'source> ::pyo3::FromPyObject<'source>
|
423 + | for OutputStreamWithInitialResponseOutputStreamEventStreamSender
|
424 + | {
|
425 + | fn extract(obj: &'source ::pyo3::PyAny) -> ::pyo3::PyResult<Self> {
|
426 + | use ::tokio_stream::StreamExt;
|
427 + | let stream = ::pyo3_asyncio::tokio::into_stream_v1(obj)?;
|
428 + | let stream = stream.filter_map(|res| {
|
429 + | ::pyo3::Python::with_gil(|py| {
|
430 + | // TODO(EventStreamImprovements): Add `InternalServerError` variant to all event streaming
|
431 + | // errors and return that variant in case of errors here?
|
432 + | match res {
|
433 + | Ok(obj) => {
|
434 + | match obj.extract::<crate::model::EventStream>(py) {
|
435 + | Ok(it) => Some(Ok(it)),
|
436 + | Err(err) => {
|
437 + | let rich_py_err = ::aws_smithy_http_server_python::rich_py_err(err);
|
438 + | ::tracing::error!(error = ?rich_py_err, "could not extract the output type 'crate::model::EventStream' from streamed value");
|
439 + | None
|
440 + | },
|
441 + | }
|
442 + | },
|
443 + | Err(err) => {
|
444 + | match ::pyo3::IntoPy::into_py(err, py).extract::<crate::error::EventStreamError>(py) {
|
445 + | Ok(modelled_error) => Some(Err(modelled_error)),
|
446 + | Err(err) => {
|
447 + | let rich_py_err = ::aws_smithy_http_server_python::rich_py_err(err);
|
448 + | ::tracing::error!(error = ?rich_py_err, "could not extract the error type 'crate::error::EventStreamError' from raised exception");
|
449 + | None
|
450 + | }
|
451 + | }
|
452 + | }
|
453 + | }
|
454 + | })
|
455 + | });
|
456 + |
|
457 + | Ok(
|
458 + | OutputStreamWithInitialResponseOutputStreamEventStreamSender {
|
459 + | inner: ::std::sync::Arc::new(::parking_lot::Mutex::new(Some(stream.into()))),
|
460 + | },
|
461 + | )
|
462 + | }
|
463 + | }
|
464 + |
|
465 + | impl ::pyo3::IntoPy<::pyo3::PyObject>
|
466 + | for OutputStreamWithInitialResponseOutputStreamEventStreamSender
|
467 + | {
|
468 + | fn into_py(self, py: ::pyo3::Python<'_>) -> ::pyo3::PyObject {
|
469 + | ::pyo3::exceptions::PyAttributeError::new_err("this is a write-only object").into_py(py)
|
470 + | }
|
471 + | }
|
472 + |
|
473 + | #[::pyo3::pyclass]
|
474 + | #[derive(::std::clone::Clone, ::std::fmt::Debug)]
|
475 + | pub struct DuplexStreamWithInitialMessagesInputStreamReceiver {
|
476 + | inner: ::std::sync::Arc<
|
477 + | ::tokio::sync::Mutex<
|
478 + | ::aws_smithy_legacy_http::event_stream::Receiver<
|
479 + | crate::model::EventStream,
|
480 + | crate::error::EventStreamError,
|
481 + | >,
|
482 + | >,
|
483 + | >,
|
484 + | }
|
485 + | impl DuplexStreamWithInitialMessagesInputStreamReceiver {
|
486 + | pub fn new(
|
487 + | unmarshaller: impl ::aws_smithy_eventstream::frame::UnmarshallMessage<
|
488 + | Output = crate::model::EventStream,
|
489 + | Error = crate::error::EventStreamError,
|
490 + | > + ::std::marker::Send
|
491 + | + ::std::marker::Sync
|
492 + | + 'static,
|
493 + | body: ::aws_smithy_types::body::SdkBody,
|
494 + | ) -> DuplexStreamWithInitialMessagesInputStreamReceiver {
|
495 + | let inner = ::aws_smithy_legacy_http::event_stream::Receiver::new(unmarshaller, body);
|
496 + | let inner = ::std::sync::Arc::new(::tokio::sync::Mutex::new(inner));
|
497 + | DuplexStreamWithInitialMessagesInputStreamReceiver { inner }
|
498 + | }
|
499 + |
|
500 + | pub async fn try_recv_initial(
|
501 + | &mut self,
|
502 + | message_type: ::aws_smithy_legacy_http::event_stream::InitialMessageType,
|
503 + | ) -> Result<
|
504 + | ::std::option::Option<::aws_smithy_types::event_stream::Message>,
|
505 + | ::aws_smithy_runtime_api::client::result::SdkError<
|
506 + | crate::error::EventStreamError,
|
507 + | ::aws_smithy_types::event_stream::RawMessage,
|
508 + | >,
|
509 + | > {
|
510 + | let mut inner = self.inner.lock().await;
|
511 + | inner.try_recv_initial(message_type).await
|
512 + | }
|
513 + | }
|
514 + | #[::pyo3::pymethods]
|
515 + | impl DuplexStreamWithInitialMessagesInputStreamReceiver {
|
516 + | pub fn __aiter__(slf: ::pyo3::PyRef<Self>) -> ::pyo3::PyRef<Self> {
|
517 + | slf
|
518 + | }
|
519 + |
|
520 + | pub fn __anext__(slf: ::pyo3::PyRefMut<Self>) -> ::pyo3::PyResult<Option<::pyo3::PyObject>> {
|
521 + | let body = slf.inner.clone();
|
522 + | let fut = ::pyo3_asyncio::tokio::future_into_py(slf.py(), async move {
|
523 + | let mut inner = body.lock().await;
|
524 + | let next = inner.recv().await;
|
525 + | match next {
|
526 + | Ok(Some(data)) => Ok(::pyo3::Python::with_gil(|py| {
|
527 + | ::pyo3::IntoPy::into_py(data, py)
|
528 + | })),
|
529 + | Ok(None) => Err(::pyo3::exceptions::PyStopAsyncIteration::new_err(
|
530 + | "stream exhausted",
|
531 + | )),
|
532 + | Err(::aws_smithy_runtime_api::client::result::SdkError::ServiceError(
|
533 + | service_err,
|
534 + | )) => Err(service_err.into_err().into()),
|
535 + | Err(err) => Err(::pyo3::exceptions::PyRuntimeError::new_err(err.to_string())),
|
536 + | }
|
537 + | })?;
|
538 + | Ok(Some(fut.into()))
|
539 + | }
|
540 + | }
|
541 + |
|
542 + | #[derive(::std::clone::Clone, ::std::fmt::Debug)]
|
543 + | pub struct DuplexStreamWithInitialMessagesOutputStreamEventStreamSender {
|
544 + | inner: ::std::sync::Arc<
|
545 + | ::parking_lot::Mutex<
|
546 + | ::std::option::Option<
|
547 + | ::aws_smithy_legacy_http::event_stream::EventStreamSender<
|
548 + | crate::model::EventStream,
|
549 + | crate::error::EventStreamError,
|
550 + | >,
|
551 + | >,
|
552 + | >,
|
553 + | >,
|
554 + | }
|
555 + | impl DuplexStreamWithInitialMessagesOutputStreamEventStreamSender {
|
556 + | pub fn into_body_stream(
|
557 + | self,
|
558 + | marshaller: impl ::aws_smithy_eventstream::frame::MarshallMessage<Input = crate::model::EventStream>
|
559 + | + ::std::marker::Send
|
560 + | + ::std::marker::Sync
|
561 + | + 'static,
|
562 + | error_marshaller: impl ::aws_smithy_eventstream::frame::MarshallMessage<Input = crate::error::EventStreamError>
|
563 + | + ::std::marker::Send
|
564 + | + ::std::marker::Sync
|
565 + | + 'static,
|
566 + | signer: impl ::aws_smithy_eventstream::frame::SignMessage
|
567 + | + ::std::marker::Send
|
568 + | + ::std::marker::Sync
|
569 + | + 'static,
|
570 + | ) -> ::aws_smithy_legacy_http::event_stream::MessageStreamAdapter<
|
571 + | crate::model::EventStream,
|
572 + | crate::error::EventStreamError,
|
573 + | > {
|
574 + | let mut inner = self.inner.lock();
|
575 + | let inner = inner.take().expect(
|
576 + | "attempted to reuse an event stream. \
|
577 + | that means you kept a reference to an event stream and tried to reuse it in another request, \
|
578 + | event streams are request scoped and shouldn't be used outside of their bounded request scope"
|
579 + | );
|
580 + | inner.into_body_stream(marshaller, error_marshaller, signer)
|
581 + | }
|
582 + | }
|
583 + | impl<'source> ::pyo3::FromPyObject<'source>
|
584 + | for DuplexStreamWithInitialMessagesOutputStreamEventStreamSender
|
585 + | {
|
586 + | fn extract(obj: &'source ::pyo3::PyAny) -> ::pyo3::PyResult<Self> {
|
587 + | use ::tokio_stream::StreamExt;
|
588 + | let stream = ::pyo3_asyncio::tokio::into_stream_v1(obj)?;
|
589 + | let stream = stream.filter_map(|res| {
|
590 + | ::pyo3::Python::with_gil(|py| {
|
591 + | // TODO(EventStreamImprovements): Add `InternalServerError` variant to all event streaming
|
592 + | // errors and return that variant in case of errors here?
|
593 + | match res {
|
594 + | Ok(obj) => {
|
595 + | match obj.extract::<crate::model::EventStream>(py) {
|
596 + | Ok(it) => Some(Ok(it)),
|
597 + | Err(err) => {
|
598 + | let rich_py_err = ::aws_smithy_http_server_python::rich_py_err(err);
|
599 + | ::tracing::error!(error = ?rich_py_err, "could not extract the output type 'crate::model::EventStream' from streamed value");
|
600 + | None
|
601 + | },
|
602 + | }
|
603 + | },
|
604 + | Err(err) => {
|
605 + | match ::pyo3::IntoPy::into_py(err, py).extract::<crate::error::EventStreamError>(py) {
|
606 + | Ok(modelled_error) => Some(Err(modelled_error)),
|
607 + | Err(err) => {
|
608 + | let rich_py_err = ::aws_smithy_http_server_python::rich_py_err(err);
|
609 + | ::tracing::error!(error = ?rich_py_err, "could not extract the error type 'crate::error::EventStreamError' from raised exception");
|
610 + | None
|
611 + | }
|
612 + | }
|
613 + | }
|
614 + | }
|
615 + | })
|
616 + | });
|
617 + |
|
618 + | Ok(
|
619 + | DuplexStreamWithInitialMessagesOutputStreamEventStreamSender {
|
620 + | inner: ::std::sync::Arc::new(::parking_lot::Mutex::new(Some(stream.into()))),
|
621 + | },
|
622 + | )
|
623 + | }
|
624 + | }
|
625 + |
|
626 + | impl ::pyo3::IntoPy<::pyo3::PyObject>
|
627 + | for DuplexStreamWithInitialMessagesOutputStreamEventStreamSender
|
628 + | {
|
629 + | fn into_py(self, py: ::pyo3::Python<'_>) -> ::pyo3::PyObject {
|
630 + | ::pyo3::exceptions::PyAttributeError::new_err("this is a write-only object").into_py(py)
|
631 + | }
|
632 + | }
|
633 + |
|
634 + | #[::pyo3::pyclass]
|
635 + | #[derive(::std::clone::Clone, ::std::fmt::Debug)]
|
636 + | pub struct DuplexStreamWithDistinctStreamsInputStreamReceiver {
|
637 + | inner: ::std::sync::Arc<
|
638 + | ::tokio::sync::Mutex<
|
639 + | ::aws_smithy_legacy_http::event_stream::Receiver<
|
640 + | crate::model::EventStream,
|
641 + | crate::error::EventStreamError,
|
642 + | >,
|
643 + | >,
|
644 + | >,
|
645 + | }
|
646 + | impl DuplexStreamWithDistinctStreamsInputStreamReceiver {
|
647 + | pub fn new(
|
648 + | unmarshaller: impl ::aws_smithy_eventstream::frame::UnmarshallMessage<
|
649 + | Output = crate::model::EventStream,
|
650 + | Error = crate::error::EventStreamError,
|
651 + | > + ::std::marker::Send
|
652 + | + ::std::marker::Sync
|
653 + | + 'static,
|
654 + | body: ::aws_smithy_types::body::SdkBody,
|
655 + | ) -> DuplexStreamWithDistinctStreamsInputStreamReceiver {
|
656 + | let inner = ::aws_smithy_legacy_http::event_stream::Receiver::new(unmarshaller, body);
|
657 + | let inner = ::std::sync::Arc::new(::tokio::sync::Mutex::new(inner));
|
658 + | DuplexStreamWithDistinctStreamsInputStreamReceiver { inner }
|
659 + | }
|
660 + |
|
661 + | pub async fn try_recv_initial(
|
662 + | &mut self,
|
663 + | message_type: ::aws_smithy_legacy_http::event_stream::InitialMessageType,
|
664 + | ) -> Result<
|
665 + | ::std::option::Option<::aws_smithy_types::event_stream::Message>,
|
666 + | ::aws_smithy_runtime_api::client::result::SdkError<
|
667 + | crate::error::EventStreamError,
|
668 + | ::aws_smithy_types::event_stream::RawMessage,
|
669 + | >,
|
670 + | > {
|
671 + | let mut inner = self.inner.lock().await;
|
672 + | inner.try_recv_initial(message_type).await
|
673 + | }
|
674 + | }
|
675 + | #[::pyo3::pymethods]
|
676 + | impl DuplexStreamWithDistinctStreamsInputStreamReceiver {
|
677 + | pub fn __aiter__(slf: ::pyo3::PyRef<Self>) -> ::pyo3::PyRef<Self> {
|
678 + | slf
|
679 + | }
|
680 + |
|
681 + | pub fn __anext__(slf: ::pyo3::PyRefMut<Self>) -> ::pyo3::PyResult<Option<::pyo3::PyObject>> {
|
682 + | let body = slf.inner.clone();
|
683 + | let fut = ::pyo3_asyncio::tokio::future_into_py(slf.py(), async move {
|
684 + | let mut inner = body.lock().await;
|
685 + | let next = inner.recv().await;
|
686 + | match next {
|
687 + | Ok(Some(data)) => Ok(::pyo3::Python::with_gil(|py| {
|
688 + | ::pyo3::IntoPy::into_py(data, py)
|
689 + | })),
|
690 + | Ok(None) => Err(::pyo3::exceptions::PyStopAsyncIteration::new_err(
|
691 + | "stream exhausted",
|
692 + | )),
|
693 + | Err(::aws_smithy_runtime_api::client::result::SdkError::ServiceError(
|
694 + | service_err,
|
695 + | )) => Err(service_err.into_err().into()),
|
696 + | Err(err) => Err(::pyo3::exceptions::PyRuntimeError::new_err(err.to_string())),
|
697 + | }
|
698 + | })?;
|
699 + | Ok(Some(fut.into()))
|
700 + | }
|
701 + | }
|