Skip to content

Commit de26abf

Browse files
feat: add api keys support for ingestion authentication
use api keys as an alternative auth mechanism for ingestion use header `X-API-KEY` in place of basic auth middleware validates the header on ingest action key store in object store at .settings/apikeys/<key-id>.json
1 parent eacb1b9 commit de26abf

7 files changed

Lines changed: 451 additions & 3 deletions

File tree

src/apikeys.rs

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2025 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use std::collections::HashMap;
20+
21+
use chrono::{DateTime, Utc};
22+
use once_cell::sync::Lazy;
23+
use rand::Rng;
24+
use serde::{Deserialize, Serialize};
25+
use tokio::sync::RwLock;
26+
use ulid::Ulid;
27+
28+
use crate::{
29+
metastore::metastore_traits::MetastoreObject,
30+
parseable::{DEFAULT_TENANT, PARSEABLE},
31+
storage::object_storage::apikey_json_path,
32+
};
33+
34+
pub static API_KEYS: Lazy<ApiKeyStore> = Lazy::new(|| ApiKeyStore {
35+
keys: RwLock::new(HashMap::new()),
36+
});
37+
38+
#[derive(Debug)]
39+
pub struct ApiKeyStore {
40+
pub keys: RwLock<HashMap<String, HashMap<Ulid, ApiKey>>>,
41+
}
42+
43+
#[derive(Debug, Clone, Serialize, Deserialize)]
44+
#[serde(rename_all = "camelCase")]
45+
pub struct ApiKey {
46+
pub key_id: Ulid,
47+
pub api_key: String,
48+
pub key_name: String,
49+
pub created_by: String,
50+
pub created_at: DateTime<Utc>,
51+
pub modified_at: DateTime<Utc>,
52+
#[serde(default)]
53+
pub tenant: Option<String>,
54+
}
55+
56+
/// Request body for creating a new API key
57+
#[derive(Debug, Deserialize)]
58+
#[serde(rename_all = "camelCase")]
59+
pub struct CreateApiKeyRequest {
60+
pub key_name: String,
61+
}
62+
63+
/// Response for list keys (api_key masked to last 4 chars)
64+
#[derive(Debug, Serialize)]
65+
#[serde(rename_all = "camelCase")]
66+
pub struct ApiKeyListEntry {
67+
pub key_id: Ulid,
68+
pub api_key: String,
69+
pub key_name: String,
70+
pub created_by: String,
71+
pub created_at: DateTime<Utc>,
72+
pub modified_at: DateTime<Utc>,
73+
}
74+
75+
impl ApiKey {
76+
pub fn new(key_name: String, created_by: String, tenant: Option<String>) -> Self {
77+
let now = Utc::now();
78+
Self {
79+
key_id: Ulid::new(),
80+
api_key: generate_uuid_v4(),
81+
key_name,
82+
created_by,
83+
created_at: now,
84+
modified_at: now,
85+
tenant,
86+
}
87+
}
88+
89+
pub fn to_list_entry(&self) -> ApiKeyListEntry {
90+
let masked = if self.api_key.len() >= 4 {
91+
let last4 = &self.api_key[self.api_key.len() - 4..];
92+
format!("****{last4}")
93+
} else {
94+
"****".to_string()
95+
};
96+
ApiKeyListEntry {
97+
key_id: self.key_id,
98+
api_key: masked,
99+
key_name: self.key_name.clone(),
100+
created_by: self.created_by.clone(),
101+
created_at: self.created_at,
102+
modified_at: self.modified_at,
103+
}
104+
}
105+
}
106+
107+
impl MetastoreObject for ApiKey {
108+
fn get_object_path(&self) -> String {
109+
apikey_json_path(&self.key_id, &self.tenant).to_string()
110+
}
111+
112+
fn get_object_id(&self) -> String {
113+
self.key_id.to_string()
114+
}
115+
}
116+
117+
/// Generate a UUID v4 formatted string using rand
118+
fn generate_uuid_v4() -> String {
119+
let mut rng = rand::thread_rng();
120+
let mut bytes = [0u8; 16];
121+
rng.fill(&mut bytes);
122+
// Set version 4 (bits 12-15 of time_hi_and_version)
123+
bytes[6] = (bytes[6] & 0x0f) | 0x40;
124+
// Set variant 1 (bits 6-7 of clock_seq_hi_and_reserved)
125+
bytes[8] = (bytes[8] & 0x3f) | 0x80;
126+
format!(
127+
"{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
128+
bytes[0], bytes[1], bytes[2], bytes[3],
129+
bytes[4], bytes[5],
130+
bytes[6], bytes[7],
131+
bytes[8], bytes[9],
132+
bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15]
133+
)
134+
}
135+
136+
impl ApiKeyStore {
137+
/// Load API keys from object store into memory
138+
pub async fn load(&self) -> anyhow::Result<()> {
139+
let api_keys = PARSEABLE.metastore.get_api_keys().await?;
140+
let mut map = self.keys.write().await;
141+
for (tenant_id, keys) in api_keys {
142+
let inner = keys
143+
.into_iter()
144+
.map(|mut k| {
145+
k.tenant = Some(tenant_id.clone());
146+
(k.key_id, k)
147+
})
148+
.collect();
149+
map.insert(tenant_id, inner);
150+
}
151+
Ok(())
152+
}
153+
154+
/// Create a new API key
155+
pub async fn create(&self, api_key: ApiKey) -> Result<(), ApiKeyError> {
156+
let tenant = api_key.tenant.as_deref().unwrap_or(DEFAULT_TENANT);
157+
158+
// Hold write lock for the entire operation to prevent TOCTOU race
159+
// on duplicate name check
160+
let mut map = self.keys.write().await;
161+
if let Some(tenant_keys) = map.get(tenant) {
162+
if tenant_keys
163+
.values()
164+
.any(|k| k.key_name == api_key.key_name)
165+
{
166+
return Err(ApiKeyError::DuplicateKeyName(api_key.key_name));
167+
}
168+
}
169+
170+
PARSEABLE
171+
.metastore
172+
.put_api_key(&api_key, &api_key.tenant)
173+
.await?;
174+
175+
map.entry(tenant.to_owned())
176+
.or_default()
177+
.insert(api_key.key_id, api_key);
178+
Ok(())
179+
}
180+
181+
/// Delete an API key by key_id
182+
pub async fn delete(
183+
&self,
184+
key_id: &Ulid,
185+
tenant_id: &Option<String>,
186+
) -> Result<ApiKey, ApiKeyError> {
187+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
188+
189+
// Read the key first without removing
190+
let api_key = {
191+
let map = self.keys.read().await;
192+
let tenant_keys = map
193+
.get(tenant)
194+
.ok_or_else(|| ApiKeyError::KeyNotFound(key_id.to_string()))?;
195+
tenant_keys
196+
.get(key_id)
197+
.cloned()
198+
.ok_or_else(|| ApiKeyError::KeyNotFound(key_id.to_string()))?
199+
};
200+
201+
// Delete from storage first
202+
PARSEABLE
203+
.metastore
204+
.delete_api_key(&api_key, tenant_id)
205+
.await?;
206+
207+
// Remove from memory only after successful storage deletion
208+
{
209+
let mut map = self.keys.write().await;
210+
if let Some(tenant_keys) = map.get_mut(tenant) {
211+
tenant_keys.remove(key_id);
212+
}
213+
}
214+
215+
Ok(api_key)
216+
}
217+
218+
/// List all API keys for a tenant (returns masked entries)
219+
pub async fn list(
220+
&self,
221+
tenant_id: &Option<String>,
222+
) -> Result<Vec<ApiKeyListEntry>, ApiKeyError> {
223+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
224+
let map = self.keys.read().await;
225+
let entries = if let Some(tenant_keys) = map.get(tenant) {
226+
tenant_keys.values().map(|k| k.to_list_entry()).collect()
227+
} else {
228+
vec![]
229+
};
230+
Ok(entries)
231+
}
232+
233+
/// Get a specific API key by key_id (returns full key)
234+
pub async fn get(
235+
&self,
236+
key_id: &Ulid,
237+
tenant_id: &Option<String>,
238+
) -> Result<ApiKey, ApiKeyError> {
239+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
240+
let map = self.keys.read().await;
241+
let tenant_keys = map
242+
.get(tenant)
243+
.ok_or_else(|| ApiKeyError::KeyNotFound(key_id.to_string()))?;
244+
tenant_keys
245+
.get(key_id)
246+
.cloned()
247+
.ok_or_else(|| ApiKeyError::KeyNotFound(key_id.to_string()))
248+
}
249+
250+
/// Validate an API key for ingestion. Returns true if the key is valid.
251+
/// For multi-tenant: checks the key belongs to the specified tenant.
252+
/// For single-tenant: checks the key exists globally.
253+
pub async fn validate_key(
254+
&self,
255+
api_key_value: &str,
256+
tenant_id: &Option<String>,
257+
) -> bool {
258+
let map = self.keys.read().await;
259+
if let Some(tenant_id) = tenant_id {
260+
// Multi-tenant: check keys for the specific tenant
261+
if let Some(tenant_keys) = map.get(tenant_id) {
262+
return tenant_keys
263+
.values()
264+
.any(|k| k.api_key == api_key_value);
265+
}
266+
false
267+
} else {
268+
// Single-tenant: check keys under DEFAULT_TENANT
269+
if let Some(tenant_keys) = map.get(DEFAULT_TENANT) {
270+
return tenant_keys
271+
.values()
272+
.any(|k| k.api_key == api_key_value);
273+
}
274+
false
275+
}
276+
}
277+
278+
/// Insert an API key directly into memory (used for sync from prism)
279+
pub async fn sync_put(&self, api_key: ApiKey) {
280+
let tenant = api_key.tenant.as_deref().unwrap_or(DEFAULT_TENANT).to_owned();
281+
let mut map = self.keys.write().await;
282+
map.entry(tenant).or_default().insert(api_key.key_id, api_key);
283+
}
284+
285+
/// Remove an API key from memory (used for sync from prism)
286+
pub async fn sync_delete(&self, key_id: &Ulid, tenant_id: &Option<String>) {
287+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
288+
let mut map = self.keys.write().await;
289+
if let Some(tenant_keys) = map.get_mut(tenant) {
290+
tenant_keys.remove(key_id);
291+
}
292+
}
293+
}
294+
295+
#[derive(Debug, thiserror::Error)]
296+
pub enum ApiKeyError {
297+
#[error("API key not found: {0}")]
298+
KeyNotFound(String),
299+
300+
#[error("Duplicate key name: {0}")]
301+
DuplicateKeyName(String),
302+
303+
#[error("Unauthorized: {0}")]
304+
Unauthorized(String),
305+
306+
#[error("{0}")]
307+
MetastoreError(#[from] crate::metastore::MetastoreError),
308+
309+
#[error("{0}")]
310+
AnyhowError(#[from] anyhow::Error),
311+
}
312+
313+
impl actix_web::ResponseError for ApiKeyError {
314+
fn status_code(&self) -> actix_web::http::StatusCode {
315+
match self {
316+
ApiKeyError::KeyNotFound(_) => actix_web::http::StatusCode::NOT_FOUND,
317+
ApiKeyError::DuplicateKeyName(_) => actix_web::http::StatusCode::CONFLICT,
318+
ApiKeyError::Unauthorized(_) => actix_web::http::StatusCode::FORBIDDEN,
319+
ApiKeyError::MetastoreError(_) | ApiKeyError::AnyhowError(_) => {
320+
actix_web::http::StatusCode::INTERNAL_SERVER_ERROR
321+
}
322+
}
323+
}
324+
}

src/handlers/http/middleware.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,42 @@ where
181181
let mut header_error = None;
182182
let user_and_tenant_id = get_user_and_tenant(&self.action, &mut req, &mut header_error);
183183

184+
// Check for X-API-KEY header for ingestion
185+
let api_key_value = if self.action.eq(&Action::Ingest) {
186+
req.headers()
187+
.get("x-api-key")
188+
.and_then(|v| v.to_str().ok())
189+
.map(String::from)
190+
} else {
191+
None
192+
};
193+
194+
// If API key auth is being used, short-circuit the normal auth flow
195+
if let Some(api_key) = api_key_value {
196+
let suspension = check_suspension(req.request(), self.action);
197+
let tenant_id = req
198+
.headers()
199+
.get(TENANT_ID)
200+
.and_then(|v| v.to_str().ok())
201+
.map(String::from);
202+
let fut = self.service.call(req);
203+
204+
return Box::pin(async move {
205+
if let Some(err) = header_error {
206+
return Err(err);
207+
}
208+
if let rbac::Response::Suspended(msg) = suspension {
209+
return Err(ErrorBadRequest(msg));
210+
}
211+
212+
use crate::apikeys::API_KEYS;
213+
if API_KEYS.validate_key(&api_key, &tenant_id).await {
214+
return fut.await;
215+
}
216+
Err(ErrorUnauthorized("Invalid API key"))
217+
});
218+
}
219+
184220
let key: Result<SessionKey, Error> = extract_session_key(&mut req);
185221

186222
// if action is ingestion, check if tenant is correct for basic auth user

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
pub mod about;
2020
pub mod alerts;
21+
pub mod apikeys;
2122
pub mod analytics;
2223
pub mod banner;
2324
pub mod catalog;

0 commit comments

Comments
 (0)