200 200 | ) -> ::std::result::Result<::http_1x::request::Builder, ::aws_smithy_types::error::operation::BuildError> {
|
201 201 | let mut uri = ::std::string::String::new();
|
202 202 | uri_base(input, &mut uri)?;
|
203 203 | ::std::result::Result::Ok(builder.method("POST").uri(uri))
|
204 204 | }
|
205 205 | let mut builder = update_http_builder(&input, ::http_1x::request::Builder::new())?;
|
206 206 | builder =
|
207 207 | _header_serialization_settings.set_default_header(builder, ::http_1x::header::CONTENT_TYPE, "application/vnd.amazon.eventstream");
|
208 208 | builder = _header_serialization_settings.set_default_header(
|
209 209 | builder,
|
210 210 | ::http_1x::header::HeaderName::from_static("smithy-protocol"),
|
211 211 | "rpc-v2-cbor",
|
212 212 | );
|
213 213 | builder = _header_serialization_settings.set_default_header(
|
214 214 | builder,
|
215 215 | ::http_1x::header::HeaderName::from_static("accept"),
|
216 216 | "application/vnd.amazon.eventstream, application/cbor",
|
217 217 | );
|
218 218 | builder
|
219 219 | };
|
220 220 | let body = ::aws_smithy_types::body::SdkBody::from({
|
221 221 | let error_marshaller = crate::event_stream_serde::EventsErrorMarshaller::new();
|
222 222 | let marshaller = crate::event_stream_serde::EventsMarshaller::new();
|
223 223 | let (signer, signer_sender) = ::aws_smithy_eventstream::frame::DeferredSigner::new();
|
224 224 | _cfg.interceptor_state().store_put(signer_sender);
|
225 225 | ::aws_smithy_types::body::SdkBody::from_body_1_x(::http_body_util::StreamBody::new({
|
226 226 | use ::futures_util::StreamExt;
|
227 227 | let body =
|
228 228 | crate::protocol_serde::shape_streaming_operation_with_optional_data::ser_streaming_operation_with_optional_data_input(&input)?;
|
229 229 | let initial_message = crate::event_stream_serde::initial_message_from_body(body);
|
230 - | let mut buffer = ::std::vec::Vec::new();
|
231 - | ::aws_smithy_eventstream::frame::write_message_to(&initial_message, &mut buffer)?;
|
232 - | let initial_message_stream = futures_util::stream::iter(vec![Ok(::http_body_1x::Frame::data(buffer.into()))]);
|
233 - | let adapter = input.events.into_body_stream(marshaller, error_marshaller, signer);
|
234 - | initial_message_stream.chain(adapter)
|
230 + |
|
231 + | // Wrap the marshaller to handle both initial and regular messages
|
232 + | let wrapped_marshaller = ::aws_smithy_http::event_stream::EventOrInitialMarshaller::new(marshaller);
|
233 + |
|
234 + | // Create stream with initial message
|
235 + | let initial_stream = ::futures_util::stream::once(async move {
|
236 + | ::std::result::Result::Ok(::aws_smithy_http::event_stream::EventOrInitial::InitialMessage(initial_message))
|
237 + | });
|
238 + |
|
239 + | // Extract inner stream and map events
|
240 + | let event_stream = input
|
241 + | .events
|
242 + | .into_inner()
|
243 + | .map(|result| result.map(::aws_smithy_http::event_stream::EventOrInitial::Event));
|
244 + |
|
245 + | // Chain streams and convert to EventStreamSender
|
246 + | let combined = initial_stream.chain(event_stream);
|
247 + | ::aws_smithy_http::event_stream::EventStreamSender::from(combined).into_body_stream(wrapped_marshaller, error_marshaller, signer)
|
235 248 | }))
|
236 249 | });
|
237 250 | if let Some(content_length) = body.content_length() {
|
238 251 | let content_length = content_length.to_string();
|
239 252 | request_builder = _header_serialization_settings.set_default_header(request_builder, ::http_1x::header::CONTENT_LENGTH, &content_length);
|
240 253 | }
|
241 254 | ::std::result::Result::Ok(request_builder.body(body).expect("valid request").try_into().unwrap())
|
242 255 | }
|
243 256 | }
|
244 257 | #[derive(Debug)]
|
245 258 | struct StreamingOperationWithOptionalDataEndpointParamsInterceptor;
|
246 259 |
|
247 260 | impl ::aws_smithy_runtime_api::client::interceptors::Intercept for StreamingOperationWithOptionalDataEndpointParamsInterceptor {
|
248 261 | fn name(&self) -> &'static str {
|
249 262 | "StreamingOperationWithOptionalDataEndpointParamsInterceptor"
|
250 263 | }
|
251 264 |
|
252 265 | fn read_before_execution(
|
253 266 | &self,
|
254 267 | context: &::aws_smithy_runtime_api::client::interceptors::context::BeforeSerializationInterceptorContextRef<
|
255 268 | '_,
|
256 269 | ::aws_smithy_runtime_api::client::interceptors::context::Input,
|
257 270 | ::aws_smithy_runtime_api::client::interceptors::context::Output,
|
258 271 | ::aws_smithy_runtime_api::client::interceptors::context::Error,
|
259 272 | >,
|
260 273 | cfg: &mut ::aws_smithy_types::config_bag::ConfigBag,
|
261 274 | ) -> ::std::result::Result<(), ::aws_smithy_runtime_api::box_error::BoxError> {
|
262 275 | let _input = context
|
263 276 | .input()
|
264 277 | .downcast_ref::<StreamingOperationWithOptionalDataInput>()
|