Skip to content

Commit 584d4e6

Browse files
authored
fix: use batched DB deletes with SKIP LOCKED for sandbox cleanup (#739)
Replace the temp table approach with SELECT FOR UPDATE SKIP LOCKED to allow concurrent cleanup workers on MySQL. Each batch now: 1. Selects and locks rows 2. Deletes from S3 3. Deletes from DB Multiple workers process batches in parallel within a single call, and multiple instances of clean_sandboxes can also run concurrently. This distributes the DB load across smaller batches instead of one large delete at the end.
1 parent 0440a9c commit 584d4e6

3 files changed

Lines changed: 109 additions & 81 deletions

File tree

diracx-db/src/diracx/db/sql/sandbox_metadata/db.py

Lines changed: 44 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,10 @@
11
from __future__ import annotations
22

33
import logging
4-
from contextlib import asynccontextmanager
5-
from functools import partial
6-
from typing import Any, AsyncGenerator
4+
from typing import Any
75

86
from sqlalchemy import (
9-
BigInteger,
10-
Column,
117
Executable,
12-
MetaData,
13-
Table,
148
and_,
159
delete,
1610
exists,
@@ -41,14 +35,6 @@
4135
class SandboxMetadataDB(BaseSQLDB):
4236
metadata = SandboxMetadataDBBase.metadata
4337

44-
# Temporary table to store the sandboxes to delete, see `select_and_delete_expired`
45-
_temp_table = Table(
46-
"sb_to_delete",
47-
MetaData(),
48-
Column("SBId", BigInteger, primary_key=True),
49-
prefixes=["TEMPORARY"],
50-
)
51-
5238
async def get_owner_id(self, user: UserInfo) -> int | None:
5339
"""Get the id of the owner from the database."""
5440
stmt = select(SBOwners.OwnerID).where(
@@ -222,18 +208,20 @@ async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None:
222208
)
223209
await self.conn.execute(unassign_stmt)
224210

225-
@asynccontextmanager
226-
async def delete_unused_sandboxes(
227-
self, *, limit: int | None = None
228-
) -> AsyncGenerator[AsyncGenerator[str, None], None]:
229-
"""Get the sandbox PFNs to delete.
211+
async def select_sandboxes_for_deletion(
212+
self, *, batch_size: int = 500
213+
) -> tuple[list[int], list[str]]:
214+
"""Select and lock a batch of sandboxes for deletion.
230215
231-
The result of this function can be used as an async context manager
232-
to yield the PFNs of the sandboxes to delete. The context manager
233-
will automatically remove the sandboxes from the database upon exit.
216+
Uses FOR UPDATE SKIP LOCKED on MySQL to allow concurrent workers to
217+
process different sandboxes in parallel without conflicts.
234218
235219
Args:
236-
limit: If not None, the maximum number of sandboxes to delete.
220+
batch_size: Maximum number of sandboxes to select.
221+
222+
Returns:
223+
Tuple of (sb_ids, pfns) for the selected sandboxes.
224+
On MySQL, the rows remain locked until the transaction commits/rollbacks.
237225
238226
"""
239227
conditions = [
@@ -248,32 +236,41 @@ async def delete_unused_sandboxes(
248236
# Sandboxes which are not on S3 will be handled by legacy DIRAC
249237
condition = and_(SandBoxes.SEPFN.like("/S3/%"), or_(*conditions))
250238

251-
# Copy the in-flight rows to a temporary table
252-
await self.conn.run_sync(partial(self._temp_table.create, checkfirst=True))
253-
select_stmt = select(SandBoxes.SBId).where(condition)
254-
if limit:
255-
select_stmt = select_stmt.limit(limit)
256-
insert_stmt = insert(self._temp_table).from_select(["SBId"], select_stmt)
257-
await self.conn.execute(insert_stmt)
239+
select_stmt = (
240+
select(SandBoxes.SBId, SandBoxes.SEPFN).where(condition).limit(batch_size)
241+
)
258242

259-
try:
260-
# Select the sandbox PFNs from the temporary table and yield them
261-
select_stmt = select(SandBoxes.SEPFN).join(
262-
self._temp_table, self._temp_table.c.SBId == SandBoxes.SBId
243+
# FOR UPDATE SKIP LOCKED is only supported on MySQL
244+
# SQLite is used for testing and doesn't support row locking
245+
if self.conn.dialect.name == "mysql":
246+
select_stmt = select_stmt.with_for_update(skip_locked=True)
247+
elif self.conn.dialect.name != "sqlite":
248+
raise NotImplementedError(
249+
f"Unsupported database dialect: {self.conn.dialect.name}"
263250
)
264251

265-
async def yield_pfns() -> AsyncGenerator[str, None]:
266-
async for row in await self.conn.stream(select_stmt):
267-
yield row.SEPFN
252+
result = await self.conn.execute(select_stmt)
253+
rows = result.all()
268254

269-
yield yield_pfns()
255+
sb_ids = [row.SBId for row in rows]
256+
pfns = [row.SEPFN for row in rows]
270257

271-
# Delete the sandboxes from the main table
272-
delete_stmt = delete(SandBoxes).where(
273-
SandBoxes.SBId.in_(select(self._temp_table.c.SBId))
274-
)
275-
result = await self.conn.execute(delete_stmt)
276-
logger.info("Deleted %d expired/unassigned sandboxes", result.rowcount)
258+
return sb_ids, pfns
259+
260+
async def delete_sandboxes(self, sb_ids: list[int]) -> int:
261+
"""Delete sandboxes by their IDs.
262+
263+
Args:
264+
sb_ids: List of sandbox IDs to delete.
265+
266+
Returns:
267+
Number of rows deleted.
268+
269+
"""
270+
if not sb_ids:
271+
return 0
277272

278-
finally:
279-
await self.conn.run_sync(partial(self._temp_table.drop, checkfirst=True))
273+
delete_stmt = delete(SandBoxes).where(SandBoxes.SBId.in_(sb_ids))
274+
result = await self.conn.execute(delete_stmt)
275+
logger.info("Deleted %d expired/unassigned sandboxes", result.rowcount)
276+
return result.rowcount

diracx-logic/src/diracx/logic/jobs/sandboxes.py

Lines changed: 63 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
s3_object_exists,
1919
)
2020
from diracx.core.settings import SandboxStoreSettings
21-
from diracx.core.utils import batched_async
2221
from diracx.db.sql.sandbox_metadata.db import SandboxMetadataDB
2322

2423
if TYPE_CHECKING:
@@ -202,34 +201,68 @@ async def clean_sandboxes(
202201
sandbox_metadata_db: SandboxMetadataDB,
203202
settings: SandboxStoreSettings,
204203
*,
205-
limit: int = 10_000,
206-
max_concurrent_batches: int = 10,
204+
batch_size: int = 500,
205+
max_workers: int = 10,
207206
) -> int:
208-
"""Delete sandboxes that are not assigned to any job."""
209-
semaphore = asyncio.Semaphore(max_concurrent_batches)
210-
n_deleted = 0
211-
async with (
212-
sandbox_metadata_db.delete_unused_sandboxes(limit=limit) as generator,
213-
asyncio.TaskGroup() as tg,
214-
):
215-
async for batch in batched_async(generator, 500):
216-
objects: list[S3Object] = [{"Key": pfn_to_key(pfn)} for pfn in batch]
217-
if logger.isEnabledFor(logging.DEBUG):
218-
for pfn in batch:
219-
logger.debug("Deleting sandbox %s from S3", pfn)
220-
tg.create_task(delete_batch_and_log(settings, objects, semaphore))
221-
n_deleted += len(objects)
222-
return n_deleted
223-
224-
225-
async def delete_batch_and_log(
226-
settings: SandboxStoreSettings,
227-
objects: list[S3Object],
228-
semaphore: asyncio.Semaphore,
229-
) -> None:
230-
"""Helper function to delete a batch of objects and log the result."""
231-
async with semaphore:
232-
await s3_bulk_delete_with_retry(
233-
settings.s3_client, settings.bucket_name, objects
207+
"""Delete sandboxes that are not assigned to any job.
208+
209+
Uses SELECT FOR UPDATE SKIP LOCKED to allow multiple workers to run
210+
in parallel without conflicts. Each batch:
211+
1. Selects and locks rows
212+
2. Deletes from S3
213+
3. Deletes from DB
214+
215+
Args:
216+
sandbox_metadata_db: Database connection (not yet entered).
217+
settings: Sandbox store settings with S3 client.
218+
batch_size: Number of sandboxes to process per batch.
219+
max_workers: Maximum number of concurrent workers processing batches.
220+
221+
Returns:
222+
Total number of sandboxes deleted.
223+
224+
"""
225+
# Check if parallel workers are supported
226+
async with sandbox_metadata_db:
227+
dialect = sandbox_metadata_db.conn.dialect.name
228+
if max_workers > 1 and dialect == "sqlite":
229+
raise NotImplementedError(
230+
"SQLite does not support parallel workers (no SKIP LOCKED support)"
234231
)
235-
logger.info("Deleted %d sandboxes from %s", len(objects), settings.bucket_name)
232+
233+
async def worker() -> int:
234+
"""Process batches until no more work is available."""
235+
worker_deleted = 0
236+
while True:
237+
async with sandbox_metadata_db:
238+
# Select and lock a batch of sandboxes
239+
sb_ids, pfns = await sandbox_metadata_db.select_sandboxes_for_deletion(
240+
batch_size=batch_size
241+
)
242+
243+
if not pfns:
244+
break
245+
246+
# Delete from S3 first (while rows are locked)
247+
objects: list[S3Object] = [{"Key": pfn_to_key(pfn)} for pfn in pfns]
248+
if logger.isEnabledFor(logging.DEBUG):
249+
for pfn in pfns:
250+
logger.debug("Deleting sandbox %s from S3", pfn)
251+
252+
await s3_bulk_delete_with_retry(
253+
settings.s3_client, settings.bucket_name, objects
254+
)
255+
logger.info(
256+
"Deleted %d sandboxes from %s", len(objects), settings.bucket_name
257+
)
258+
259+
# Then delete from DB
260+
await sandbox_metadata_db.delete_sandboxes(sb_ids)
261+
worker_deleted += len(sb_ids)
262+
263+
return worker_deleted
264+
265+
async with asyncio.TaskGroup() as tg:
266+
tasks = [tg.create_task(worker()) for _ in range(max_workers)]
267+
268+
return sum(task.result() for task in tasks)

diracx-logic/tests/jobs/test_sandboxes.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,7 @@ async def test_upload_and_clean(
118118
assert response.content == data
119119

120120
# There should be no sandboxes to remove
121-
async with sandbox_metadata_db:
122-
await clean_sandboxes(sandbox_metadata_db, sandbox_settings)
121+
await clean_sandboxes(sandbox_metadata_db, sandbox_settings, max_workers=1)
123122

124123
# Try to download the sandbox
125124
async with sandbox_metadata_db:
@@ -140,8 +139,7 @@ async def test_upload_and_clean(
140139
)
141140

142141
# Now the sandbox should be removed
143-
async with sandbox_metadata_db:
144-
await clean_sandboxes(sandbox_metadata_db, sandbox_settings)
142+
await clean_sandboxes(sandbox_metadata_db, sandbox_settings, max_workers=1)
145143

146144
# Check that the sandbox was actually removed from the bucket
147145
with pytest.raises(botocore.exceptions.ClientError, match="Not Found"):

0 commit comments

Comments
 (0)