Skip to content

Commit ffb821b

Browse files
add tenant cleanup methods for in-memory structs on tenant deletion (#1591)
1. deregister the schema for a tenant on deletion 2. remove in-memory dashboards for a tenant 3. remove in-memory filters for a tenant 4. remove in-memory alerts for a tenant
1 parent 52da33f commit ffb821b

5 files changed

Lines changed: 45 additions & 0 deletions

File tree

src/alerts/alert_traits.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ pub trait AlertManagerTrait: Send + Sync {
119119
&self,
120120
tenant_id: &Option<String>,
121121
) -> HashMap<Ulid, Box<dyn AlertTrait>>;
122+
async fn delete_all_for_tenant(&self, tenant_id: &str);
122123
}
123124

124125
#[async_trait]

src/alerts/mod.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1484,6 +1484,31 @@ impl AlertManagerTrait for Alerts {
14841484
// let alerts = self.alerts.read().await;
14851485
// alerts.iter().map(|(k, v)| (*k, v.clone_box())).collect()
14861486
}
1487+
1488+
async fn delete_all_for_tenant(&self, tenant_id: &str) {
1489+
let tenant = if tenant_id.is_empty() {
1490+
DEFAULT_TENANT
1491+
} else {
1492+
tenant_id
1493+
};
1494+
1495+
let alert_ids: Vec<Ulid> = {
1496+
let read_access = self.alerts.read().await;
1497+
if let Some(alerts) = read_access.get(tenant) {
1498+
alerts.keys().copied().collect()
1499+
} else {
1500+
return;
1501+
}
1502+
};
1503+
1504+
for alert_id in &alert_ids {
1505+
if let Err(e) = self.sender.send(AlertTask::Delete(*alert_id)).await {
1506+
warn!("Failed to cancel alert task {alert_id} for tenant {tenant}: {e}");
1507+
}
1508+
}
1509+
1510+
self.alerts.write().await.remove(tenant);
1511+
}
14871512
}
14881513

14891514
// TODO: add RBAC

src/query/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,17 @@ impl InMemorySessionContext {
118118
)
119119
.expect("Should be able to register new schema");
120120
}
121+
122+
pub fn remove_schema(&self, tenant_id: &str) {
123+
if let Some(catalog) = self
124+
.session_context
125+
.write()
126+
.expect("SessionContext should be writeable")
127+
.catalog("datafusion")
128+
{
129+
let _ = catalog.deregister_schema(tenant_id, true);
130+
}
131+
}
121132
}
122133

123134
/// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU

src/users/dashboards.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,10 @@ impl Dashboards {
505505
}
506506
}
507507

508+
pub async fn delete_tenant(&self, tenant_id: &str) {
509+
self.0.write().await.remove(tenant_id);
510+
}
511+
508512
/// Ensure the user is the owner of the dashboard
509513
/// This function is called when updating or deleting a dashboard
510514
/// check if the user is the owner of the dashboard

src/users/filters.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ impl Filters {
164164
}
165165
}
166166

167+
pub async fn delete_tenant(&self, tenant_id: &str) {
168+
self.0.write().await.remove(tenant_id);
169+
}
170+
167171
pub async fn list_filters(&self, key: &SessionKey) -> Vec<Filter> {
168172
let read = self.0.read().await;
169173
let tenant_id = get_tenant_id_from_key(key);

0 commit comments

Comments
 (0)