Skip to content

Commit 0b83b8a

Browse files
add system defined tags and free form labels to datasets (#1553)
* add system defined tags and free form labels to datasets PUT /api/v1/logstream/{name} accepts X-P-Dataset-Tags and X-P-Dataset-Labels headers (comma-separated) on stream creation PUT /api/prism/v1/datasets/{name} - update tags and labels GET /api/prism/v1/datasets/{name}/correlated - find datasets sharing tags or labels GET /api/prism/v1/datasets/tags/{tag} - find all datasets with a specific tag include tags and labels in home api response * add dataset handler file * single endpoint for tags and labels * add tenant_id params in new handlers * add validation for otel datasets * use middleware-normalized header for post datasets * fix route for correlated dataset * add log source to the correlated and tagged dataset apis * load stream from storage * verify dataset rbac * dataset tags and labels in home api response * move endpoints to enterprise * fix counts api to support OR and AND operators * resolve coderabbit comments * default instead of new * handle empty filters
1 parent d1b608f commit 0b83b8a

16 files changed

Lines changed: 279 additions & 106 deletions

File tree

src/alerts/alert_structs.rs

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -191,49 +191,52 @@ pub struct ConditionConfig {
191191
#[serde(rename_all = "camelCase")]
192192
pub struct Conditions {
193193
pub operator: Option<LogicalOperator>,
194+
#[serde(default)]
194195
pub condition_config: Vec<ConditionConfig>,
196+
/// Nested condition groups for complex logic like (A OR B) AND (C OR D)
197+
pub groups: Option<Vec<Conditions>>,
195198
}
196199

197200
impl Conditions {
201+
fn format_condition(cond: &ConditionConfig) -> String {
202+
match cond.value.as_ref().filter(|v| !v.is_empty()) {
203+
Some(val) => format!("{} {} {}", cond.column, cond.operator, val),
204+
None => format!("{} {}", cond.column, cond.operator),
205+
}
206+
}
207+
198208
pub fn generate_filter_message(&self) -> String {
199-
match &self.operator {
200-
Some(op) => match op {
201-
LogicalOperator::And | LogicalOperator::Or => {
202-
let expr1 = &self.condition_config[0];
203-
let expr2 = &self.condition_config[1];
204-
let expr1_msg = if expr1.value.as_ref().is_some_and(|v| !v.is_empty()) {
205-
format!(
206-
"{} {} {}",
207-
expr1.column,
208-
expr1.operator,
209-
expr1.value.as_ref().unwrap()
210-
)
211-
} else {
212-
format!("{} {}", expr1.column, expr1.operator)
213-
};
214-
215-
let expr2_msg = if expr2.value.as_ref().is_some_and(|v| !v.is_empty()) {
216-
format!(
217-
"{} {} {}",
218-
expr2.column,
219-
expr2.operator,
220-
expr2.value.as_ref().unwrap()
221-
)
222-
} else {
223-
format!("{} {}", expr2.column, expr2.operator)
224-
};
225-
226-
format!("[{expr1_msg} {op} {expr2_msg}]")
227-
}
228-
},
229-
None => {
230-
let expr = &self.condition_config[0];
231-
if let Some(val) = &expr.value {
232-
format!("{} {} {}", expr.column, expr.operator, val)
233-
} else {
234-
format!("{} {}", expr.column, expr.operator)
235-
}
236-
}
209+
let op = self.operator.as_ref().unwrap_or(&LogicalOperator::And);
210+
let separator = format!(" {op} ");
211+
212+
// Format inline condition_config entries
213+
let condition_parts: Vec<String> = self
214+
.condition_config
215+
.iter()
216+
.map(Self::format_condition)
217+
.collect();
218+
219+
// Format nested groups recursively, skipping empty ones
220+
let group_parts: Vec<String> = self
221+
.groups
222+
.as_deref()
223+
.unwrap_or_default()
224+
.iter()
225+
.map(|g| g.generate_filter_message())
226+
.filter(|msg| !msg.is_empty())
227+
.map(|msg| format!("({msg})"))
228+
.collect();
229+
230+
let all_parts: Vec<&str> = condition_parts
231+
.iter()
232+
.chain(group_parts.iter())
233+
.map(|s| s.as_str())
234+
.collect();
235+
236+
match all_parts.len() {
237+
0 => String::default(),
238+
1 => all_parts[0].to_string(),
239+
_ => format!("[{}]", all_parts.join(&separator)),
237240
}
238241
}
239242
}

src/alerts/alerts_utils.rs

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -388,21 +388,39 @@ fn extract_group_results(records: Vec<RecordBatch>, plan: LogicalPlan) -> AlertQ
388388
}
389389

390390
pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> {
391-
match &where_clause.operator {
392-
Some(op) => match op {
393-
&LogicalOperator::And => {
394-
let mut exprs = vec![];
395-
for condition in &where_clause.condition_config {
396-
exprs.push(condition_to_expr(condition)?);
397-
}
398-
Ok(exprs.join(" AND "))
399-
}
400-
_ => Err(String::from("Invalid option 'or', only 'and' is supported")),
401-
},
402-
_ => Err(String::from(
403-
"Invalid option 'null', only 'and' is supported",
404-
)),
391+
let op = where_clause
392+
.operator
393+
.as_ref()
394+
.unwrap_or(&LogicalOperator::And);
395+
396+
let joiner = match op {
397+
LogicalOperator::And => " AND ",
398+
LogicalOperator::Or => " OR ",
399+
};
400+
401+
let mut exprs = vec![];
402+
403+
// Process flat condition_config entries
404+
for condition in &where_clause.condition_config {
405+
exprs.push(condition_to_expr(condition)?);
406+
}
407+
408+
// Process nested groups recursively
409+
if let Some(groups) = &where_clause.groups {
410+
for group in groups {
411+
let group_expr = get_filter_string(group)?;
412+
// Wrap each group in parentheses to preserve precedence
413+
exprs.push(format!("({group_expr})"));
414+
}
405415
}
416+
417+
if exprs.is_empty() {
418+
return Err(String::from(
419+
"conditions must have at least one condition or group",
420+
));
421+
}
422+
423+
Ok(exprs.join(joiner))
406424
}
407425

408426
fn condition_to_expr(condition: &ConditionConfig) -> Result<String, String> {

src/connectors/kafka/processor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ impl ParseableSinkProcessor {
6464
vec![log_source_entry],
6565
TelemetryType::default(),
6666
tenant_id,
67-
None,
67+
vec![],
68+
vec![],
6869
)
6970
.await?;
7071

src/handlers/http/ingest.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
3232
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
3333
use crate::handlers::http::modal::utils::ingest_utils::validate_stream_for_ingestion;
3434
use crate::handlers::{
35-
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
35+
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, DatasetTag, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
3636
STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
3737
};
3838
use crate::metadata::SchemaVersion;
@@ -120,7 +120,8 @@ pub async fn ingest(
120120
vec![log_source_entry.clone()],
121121
telemetry_type,
122122
&tenant_id,
123-
None,
123+
vec![],
124+
vec![],
124125
)
125126
.await
126127
.map_err(|e| {
@@ -206,6 +207,8 @@ pub async fn setup_otel_stream(
206207
expected_log_source: LogSource,
207208
known_fields: &[&str],
208209
telemetry_type: TelemetryType,
210+
dataset_tags: Vec<DatasetTag>,
211+
dataset_labels: Vec<String>,
209212
) -> Result<(String, LogSource, LogSourceEntry, Option<String>), PostError> {
210213
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
211214
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -239,7 +242,8 @@ pub async fn setup_otel_stream(
239242
vec![log_source_entry.clone()],
240243
telemetry_type,
241244
&tenant_id,
242-
None,
245+
dataset_tags,
246+
dataset_labels,
243247
)
244248
.await?;
245249
let mut time_partition = None;
@@ -362,6 +366,8 @@ pub async fn handle_otel_logs_ingestion(
362366
LogSource::OtelLogs,
363367
&OTEL_LOG_KNOWN_FIELD_LIST,
364368
TelemetryType::Logs,
369+
vec![],
370+
vec![],
365371
)
366372
.await
367373
.map_err(|e| {
@@ -386,6 +392,8 @@ pub async fn handle_otel_metrics_ingestion(
386392
LogSource::OtelMetrics,
387393
&OTEL_METRICS_KNOWN_FIELD_LIST,
388394
TelemetryType::Metrics,
395+
vec![],
396+
vec![],
389397
)
390398
.await
391399
.map_err(|e| {
@@ -417,6 +425,8 @@ pub async fn handle_otel_traces_ingestion(
417425
LogSource::OtelTraces,
418426
&OTEL_TRACES_KNOWN_FIELD_LIST,
419427
TelemetryType::Traces,
428+
vec![],
429+
vec![],
420430
)
421431
.await
422432
.map_err(|e| {

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,18 @@
1616
*
1717
*/
1818

19+
use actix_web::http::header::HeaderMap;
20+
1921
use crate::{
2022
event::format::LogSource,
2123
handlers::{
22-
CUSTOM_PARTITION_KEY, DATASET_TAG_KEY, DatasetTag, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG,
23-
STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
24-
TelemetryType, UPDATE_STREAM_KEY,
24+
CUSTOM_PARTITION_KEY, DATASET_LABELS_KEY, DATASET_TAG_KEY, DATASET_TAGS_KEY, DatasetTag,
25+
LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY,
26+
TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY,
27+
parse_dataset_labels, parse_dataset_tags,
2528
},
2629
storage::StreamType,
2730
};
28-
use actix_web::http::header::HeaderMap;
29-
use tracing::warn;
3031

3132
#[derive(Debug, Default)]
3233
pub struct PutStreamHeaders {
@@ -38,7 +39,8 @@ pub struct PutStreamHeaders {
3839
pub stream_type: StreamType,
3940
pub log_source: LogSource,
4041
pub telemetry_type: TelemetryType,
41-
pub dataset_tag: Option<DatasetTag>,
42+
pub dataset_tags: Vec<DatasetTag>,
43+
pub dataset_labels: Vec<String>,
4244
}
4345

4446
impl From<&HeaderMap> for PutStreamHeaders {
@@ -72,16 +74,17 @@ impl From<&HeaderMap> for PutStreamHeaders {
7274
.get(TELEMETRY_TYPE_KEY)
7375
.and_then(|v| v.to_str().ok())
7476
.map_or(TelemetryType::Logs, TelemetryType::from),
75-
dataset_tag: headers
76-
.get(DATASET_TAG_KEY)
77+
dataset_tags: headers
78+
.get(DATASET_TAGS_KEY)
79+
.or_else(|| headers.get(DATASET_TAG_KEY))
7780
.and_then(|v| v.to_str().ok())
78-
.and_then(|v| match DatasetTag::try_from(v) {
79-
Ok(tag) => Some(tag),
80-
Err(err) => {
81-
warn!("Invalid dataset tag '{v}': {err}");
82-
None
83-
}
84-
}),
81+
.map(parse_dataset_tags)
82+
.unwrap_or_default(),
83+
dataset_labels: headers
84+
.get(DATASET_LABELS_KEY)
85+
.and_then(|v| v.to_str().ok())
86+
.map(parse_dataset_labels)
87+
.unwrap_or_default(),
8588
}
8689
}
8790
}

src/handlers/http/prism_logstream.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,11 @@ pub async fn post_datasets(
4343
req: HttpRequest,
4444
) -> Result<impl Responder, PrismLogstreamError> {
4545
let session_key = extract_session_key_from_req(&req)?;
46+
let tenant_id = get_tenant_id_from_request(&req);
4647
let dataset = dataset_req
4748
.map(|Json(r)| r)
4849
.unwrap_or_default()
49-
.get_datasets(session_key)
50+
.get_datasets(session_key, tenant_id)
5051
.await?;
5152

5253
Ok(web::Json(dataset))

src/handlers/mod.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
*
1717
*/
1818

19+
use std::collections::HashSet;
1920
use std::fmt::Display;
2021

2122
use serde::{Deserialize, Serialize};
23+
use tracing::warn;
2224

2325
pub mod airplane;
2426
pub mod http;
@@ -36,6 +38,8 @@ pub const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
3638
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
3739
pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type";
3840
pub const DATASET_TAG_KEY: &str = "x-p-dataset-tag";
41+
pub const DATASET_TAGS_KEY: &str = "x-p-dataset-tags";
42+
pub const DATASET_LABELS_KEY: &str = "x-p-dataset-labels";
3943
pub const TENANT_ID: &str = "x-p-tenant";
4044
const COOKIE_AGE_DAYS: usize = 7;
4145
const SESSION_COOKIE_NAME: &str = "session";
@@ -85,12 +89,14 @@ impl Display for TelemetryType {
8589
}
8690

8791
/// Tag for categorizing datasets/streams by observability domain
88-
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
92+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
8993
#[serde(rename_all = "kebab-case")]
9094
pub enum DatasetTag {
9195
AgentObservability,
9296
K8sObservability,
9397
DatabaseObservability,
98+
APM,
99+
ServiceMap,
94100
}
95101

96102
impl TryFrom<&str> for DatasetTag {
@@ -101,8 +107,10 @@ impl TryFrom<&str> for DatasetTag {
101107
"agent-observability" => Ok(DatasetTag::AgentObservability),
102108
"k8s-observability" => Ok(DatasetTag::K8sObservability),
103109
"database-observability" => Ok(DatasetTag::DatabaseObservability),
110+
"apm" => Ok(DatasetTag::APM),
111+
"service-map" => Ok(DatasetTag::ServiceMap),
104112
_ => Err(
105-
"Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability",
113+
"Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability, apm, service-map",
106114
),
107115
}
108116
}
@@ -114,6 +122,40 @@ impl Display for DatasetTag {
114122
DatasetTag::AgentObservability => "agent-observability",
115123
DatasetTag::K8sObservability => "k8s-observability",
116124
DatasetTag::DatabaseObservability => "database-observability",
125+
DatasetTag::APM => "apm",
126+
DatasetTag::ServiceMap => "service-map",
117127
})
118128
}
119129
}
130+
131+
pub fn parse_dataset_tags(header_value: &str) -> Vec<DatasetTag> {
132+
header_value
133+
.split(',')
134+
.filter_map(|s| {
135+
let trimmed = s.trim();
136+
if trimmed.is_empty() {
137+
None
138+
} else {
139+
match DatasetTag::try_from(trimmed) {
140+
Ok(tag) => Some(tag),
141+
Err(err) => {
142+
warn!("Invalid dataset tag '{trimmed}': {err}");
143+
None
144+
}
145+
}
146+
}
147+
})
148+
.collect::<HashSet<_>>()
149+
.into_iter()
150+
.collect()
151+
}
152+
153+
pub fn parse_dataset_labels(header_value: &str) -> Vec<String> {
154+
header_value
155+
.split(',')
156+
.map(|s| s.trim().to_string())
157+
.filter(|s| !s.is_empty())
158+
.collect::<HashSet<_>>()
159+
.into_iter()
160+
.collect()
161+
}

0 commit comments

Comments
 (0)