193 193 | ::std::write!(output, "/service/RpcV2CborService/operation/StreamingOperationWithOptionalData").expect("formatting should succeed");
|
194 194 | ::std::result::Result::Ok(())
|
195 195 | }
|
196 196 | #[allow(clippy::unnecessary_wraps)]
|
197 197 | fn update_http_builder(
|
198 198 | input: &crate::operation::streaming_operation_with_optional_data::StreamingOperationWithOptionalDataInput,
|
199 199 | builder: ::http::request::Builder,
|
200 200 | ) -> ::std::result::Result<::http::request::Builder, ::aws_smithy_types::error::operation::BuildError> {
|
201 201 | let mut uri = ::std::string::String::new();
|
202 202 | uri_base(input, &mut uri)?;
|
203 203 | ::std::result::Result::Ok(builder.method("POST").uri(uri))
|
204 204 | }
|
205 205 | let mut builder = update_http_builder(&input, ::http::request::Builder::new())?;
|
206 206 | builder = _header_serialization_settings.set_default_header(builder, ::http::header::CONTENT_TYPE, "application/cbor");
|
207 207 | builder =
|
208 208 | _header_serialization_settings.set_default_header(builder, ::http::header::HeaderName::from_static("smithy-protocol"), "rpc-v2-cbor");
|
209 209 | builder =
|
210 210 | _header_serialization_settings.set_default_header(builder, ::http::header::HeaderName::from_static("accept"), "application/cbor");
|
211 211 | builder
|
212 212 | };
|
213 213 | let body = ::aws_smithy_types::body::SdkBody::from({
|
214 214 | let error_marshaller = crate::event_stream_serde::EventsErrorMarshaller::new();
|
215 215 | let marshaller = crate::event_stream_serde::EventsMarshaller::new();
|
216 216 | let (signer, signer_sender) = ::aws_smithy_eventstream::frame::DeferredSigner::new();
|
217 217 | _cfg.interceptor_state().store_put(signer_sender);
|
218 218 | ::aws_smithy_types::body::SdkBody::from_body_0_4(::hyper::Body::wrap_stream({
|
219 219 | use ::futures_util::StreamExt;
|
220 220 | let body =
|
221 221 | crate::protocol_serde::shape_streaming_operation_with_optional_data::ser_streaming_operation_with_optional_data_input(&input)?;
|
222 222 | let initial_message = crate::event_stream_serde::initial_message_from_body(body);
|
223 - | let mut buffer = ::std::vec::Vec::new();
|
224 - | ::aws_smithy_eventstream::frame::write_message_to(&initial_message, &mut buffer)?;
|
225 - | let initial_message_stream = futures_util::stream::iter(vec![Ok(buffer.into())]);
|
226 - | let adapter = input.events.into_body_stream(marshaller, error_marshaller, signer);
|
227 - | initial_message_stream.chain(adapter)
|
223 + |
|
224 + | // Wrap the marshaller to handle both initial and regular messages
|
225 + | let wrapped_marshaller = ::aws_smithy_http::event_stream::EventOrInitialMarshaller::new(marshaller);
|
226 + |
|
227 + | // Create stream with initial message
|
228 + | let initial_stream = ::futures_util::stream::once(async move {
|
229 + | ::std::result::Result::Ok(::aws_smithy_http::event_stream::EventOrInitial::InitialMessage(initial_message))
|
230 + | });
|
231 + |
|
232 + | // Extract inner stream and map events
|
233 + | let event_stream = input
|
234 + | .events
|
235 + | .into_inner()
|
236 + | .map(|result| result.map(::aws_smithy_http::event_stream::EventOrInitial::Event));
|
237 + |
|
238 + | // Chain streams and convert to EventStreamSender
|
239 + | let combined = initial_stream.chain(event_stream);
|
240 + | ::aws_smithy_http::event_stream::EventStreamSender::from(combined).into_body_stream(wrapped_marshaller, error_marshaller, signer)
|
228 241 | }))
|
229 242 | });
|
230 243 | if let Some(content_length) = body.content_length() {
|
231 244 | let content_length = content_length.to_string();
|
232 245 | request_builder = _header_serialization_settings.set_default_header(request_builder, ::http::header::CONTENT_LENGTH, &content_length);
|
233 246 | }
|
234 247 | ::std::result::Result::Ok(request_builder.body(body).expect("valid request").try_into().unwrap())
|
235 248 | }
|
236 249 | }
|
237 250 | #[derive(Debug)]
|
238 251 | struct StreamingOperationWithOptionalDataEndpointParamsInterceptor;
|
239 252 |
|
240 253 | impl ::aws_smithy_runtime_api::client::interceptors::Intercept for StreamingOperationWithOptionalDataEndpointParamsInterceptor {
|
241 254 | fn name(&self) -> &'static str {
|
242 255 | "StreamingOperationWithOptionalDataEndpointParamsInterceptor"
|
243 256 | }
|
244 257 |
|
245 258 | fn read_before_execution(
|
246 259 | &self,
|
247 260 | context: &::aws_smithy_runtime_api::client::interceptors::context::BeforeSerializationInterceptorContextRef<
|
248 261 | '_,
|
249 262 | ::aws_smithy_runtime_api::client::interceptors::context::Input,
|
250 263 | ::aws_smithy_runtime_api::client::interceptors::context::Output,
|
251 264 | ::aws_smithy_runtime_api::client::interceptors::context::Error,
|
252 265 | >,
|
253 266 | cfg: &mut ::aws_smithy_types::config_bag::ConfigBag,
|
254 267 | ) -> ::std::result::Result<(), ::aws_smithy_runtime_api::box_error::BoxError> {
|
255 268 | let _input = context
|
256 269 | .input()
|
257 270 | .downcast_ref::<StreamingOperationWithOptionalDataInput>()
|