@@ -147,52 +147,17 @@ where
147147 forward_ready ! ( service) ;
148148
149149 fn call ( & self , mut req : ServiceRequest ) -> Self :: Future {
150- /*Below section is added to extract the Authorization and X-P-Stream headers from x-amz-firehose-common-attributes custom header
151- when request is made from Kinesis Firehose.
152- For requests made from other clients, no change.
153-
154- ## Section start */
155- if self . action . eq ( & Action :: Ingest )
156- && let Some ( kinesis_common_attributes) =
157- req. request ( ) . headers ( ) . get ( KINESIS_COMMON_ATTRIBUTES_KEY )
158- && let Ok ( attribute_value) = kinesis_common_attributes. to_str ( )
159- && let Ok ( message) = serde_json:: from_str :: < Message > ( attribute_value)
160- && let Ok ( auth_value) =
161- header:: HeaderValue :: from_str ( & message. common_attributes . authorization )
162- && let Ok ( stream_name_key) =
163- header:: HeaderValue :: from_str ( & message. common_attributes . x_p_stream )
164- {
165- req. headers_mut ( )
166- . insert ( HeaderName :: from_static ( AUTHORIZATION_KEY ) , auth_value) ;
167- req. headers_mut ( ) . insert (
168- HeaderName :: from_static ( STREAM_NAME_HEADER_KEY ) ,
169- stream_name_key,
170- ) ;
171- req. headers_mut ( ) . insert (
172- HeaderName :: from_static ( LOG_SOURCE_KEY ) ,
173- header:: HeaderValue :: from_static ( LOG_SOURCE_KINESIS ) ,
174- ) ;
150+ // Extract Kinesis Firehose headers if applicable
151+ if self . action . eq ( & Action :: Ingest ) {
152+ extract_kinesis_headers ( & mut req) ;
175153 }
176- /* ## Section end */
177- // if action is Ingest and multi-tenancy is on, then request MUST have tenant id
178- // else check for the presence of tenant id using other details
179154
180155 // an optional error to track the presence of CORRECT tenant header in case of ingestion
181156 let mut header_error = None ;
182157 let user_and_tenant_id = get_user_and_tenant ( & self . action , & mut req, & mut header_error) ;
183158
184- // Check for X-API-KEY header for ingestion
185- let api_key_value = if self . action . eq ( & Action :: Ingest ) {
186- req. headers ( )
187- . get ( "x-api-key" )
188- . and_then ( |v| v. to_str ( ) . ok ( ) )
189- . map ( String :: from)
190- } else {
191- None
192- } ;
193-
194- // If API key auth is being used, short-circuit the normal auth flow
195- if let Some ( api_key) = api_key_value {
159+ // If X-API-KEY header is present for ingestion, short-circuit normal auth
160+ if let Some ( api_key) = extract_api_key ( & req, & self . action ) {
196161 let suspension = check_suspension ( req. request ( ) , self . action ) ;
197162 let tenant_id = req
198163 . headers ( )
@@ -273,6 +238,43 @@ where
273238 }
274239}
275240
241+ /// Extract Kinesis Firehose headers (Authorization, X-P-Stream) from
242+ /// the x-amz-firehose-common-attributes custom header when present.
243+ fn extract_kinesis_headers ( req : & mut ServiceRequest ) {
244+ if let Some ( kinesis_common_attributes) =
245+ req. request ( ) . headers ( ) . get ( KINESIS_COMMON_ATTRIBUTES_KEY )
246+ && let Ok ( attribute_value) = kinesis_common_attributes. to_str ( )
247+ && let Ok ( message) = serde_json:: from_str :: < Message > ( attribute_value)
248+ && let Ok ( auth_value) =
249+ header:: HeaderValue :: from_str ( & message. common_attributes . authorization )
250+ && let Ok ( stream_name_key) =
251+ header:: HeaderValue :: from_str ( & message. common_attributes . x_p_stream )
252+ {
253+ req. headers_mut ( )
254+ . insert ( HeaderName :: from_static ( AUTHORIZATION_KEY ) , auth_value) ;
255+ req. headers_mut ( ) . insert (
256+ HeaderName :: from_static ( STREAM_NAME_HEADER_KEY ) ,
257+ stream_name_key,
258+ ) ;
259+ req. headers_mut ( ) . insert (
260+ HeaderName :: from_static ( LOG_SOURCE_KEY ) ,
261+ header:: HeaderValue :: from_static ( LOG_SOURCE_KINESIS ) ,
262+ ) ;
263+ }
264+ }
265+
266+ /// Extract X-API-KEY header value if present and action is Ingest.
267+ fn extract_api_key ( req : & ServiceRequest , action : & Action ) -> Option < String > {
268+ if action. eq ( & Action :: Ingest ) {
269+ req. headers ( )
270+ . get ( "x-api-key" )
271+ . and_then ( |v| v. to_str ( ) . ok ( ) )
272+ . map ( String :: from)
273+ } else {
274+ None
275+ }
276+ }
277+
276278#[ inline]
277279fn get_user_and_tenant (
278280 action : & Action ,
0 commit comments