Skip to content

Commit 7bb47fd

Browse files
Merge pull request #151 from ThomasWaldmann/rest-server-quota
quota
2 parents d67978e + bbfb212 commit 7bb47fd

11 files changed

Lines changed: 811 additions & 27 deletions

File tree

README.rst

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ API can be much simpler:
5353
- move: implements renaming, soft delete/undelete, and moving to the current
5454
nesting level.
5555
- defrag: general purpose defragmentation helper (copies blocks to new items)
56+
- quota: return quota limit and usage (-1 if quotas not enabled or not supported)
5657
- stats: API call counters, time spent in API methods, data volume/throughput.
5758
- latency/bandwidth emulator: can emulate higher latency (via BORGSTORE_LATENCY
5859
[us]) and lower bandwidth (via BORGSTORE_BANDWIDTH [bit/s]) than what is
@@ -114,6 +115,14 @@ Use storage on a local POSIX filesystem:
114115
filesystem path.
115116
- Namespaces: directories
116117
- Values: in key-named files
118+
- Quota: tracks backend storage size and rejects ``store`` if quota is exceeded.
119+
120+
The current usage is persisted to a hidden file in the storage directory.
121+
122+
When quota tracking is enabled on a backend that already contains data,
123+
the server automatically scans the directories at ``open`` time (that may
124+
take a while if there are many files). That scan can be avoided by always
125+
using quotas.
117126
- Permissions: This backend can enforce a simple, test-friendly permission system
118127
and raises ``PermissionDenied`` if access is not permitted by the configuration.
119128

@@ -344,6 +353,27 @@ Hierarchical rules (list-only at root, read/write in ``data/``)::
344353
--backend file:///tmp/teststore \
345354
--permissions '{"": "l", "data": "lrw"}'
346355

356+
Quota
357+
~~~~~
358+
359+
The REST server, when used with the ``file:`` backend, optionally supports
360+
quota tracking and enforcement.
361+
362+
Use the ``--quota`` argument to set a maximum storage size in bytes (default is
363+
no quota tracking and enforcement).
364+
365+
When the quota is exceeded, ``store`` operations are rejected with HTTP 507
366+
(Insufficient Storage).
367+
368+
Example — limit storage to 1 GiB:
369+
370+
.. code-block:: bash
371+
372+
python3 -m borgstore.server.rest --host 127.0.0.1 --port 5618 \\
373+
--username user --password pass \\
374+
--backend file:///tmp/teststore \\
375+
--quota 1073741824
376+
347377
348378
Scalability
349379
-----------

src/borgstore/backends/_base.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from collections import namedtuple
1010
from typing import Iterator
1111

12-
from ..constants import MAX_NAME_LENGTH, TMP_SUFFIX
12+
from ..constants import MAX_NAME_LENGTH, TMP_SUFFIX, HID_SUFFIX
1313

1414
ItemInfo = namedtuple("ItemInfo", "name exists size directory")
1515

@@ -45,6 +45,9 @@ def validate_name(name):
4545
if name.endswith(TMP_SUFFIX):
4646
# TMP_SUFFIX is used for temporary files internally, e.g. while files are uploading.
4747
raise ValueError(f"name must not end with {TMP_SUFFIX}, but got: {name}")
48+
if name.endswith(HID_SUFFIX):
49+
# HID_SUFFIX is used for hidden internal files, not accessible by users.
50+
raise ValueError(f"name must not end with {HID_SUFFIX}, but got: {name}")
4851

4952

5053
class BackendBase(ABC):
@@ -156,6 +159,10 @@ def hash(self, name: str, algorithm: str = "sha256") -> str:
156159
h.update(self.load(name))
157160
return h.hexdigest()
158161

162+
def quota(self) -> dict:
163+
"""Return quota information: limit and usage in bytes. -1 means not set / not tracked."""
164+
return dict(limit=-1, usage=-1)
165+
159166
@abstractmethod
160167
def list(self, name: str) -> Iterator[ItemInfo]:
161168
"""list the contents of <name>, non-recursively.

src/borgstore/backends/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,7 @@ class ObjectNotFound(BackendError):
3737

3838
class PermissionDenied(BackendError):
3939
"""Permission denied for the requested operation."""
40+
41+
42+
class QuotaExceeded(BackendError):
43+
"""Quota exceeded for the requested operation."""

src/borgstore/backends/posixfs.py

Lines changed: 144 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,29 @@
66
import os
77
import re
88
import sys
9+
import time
910
from urllib.parse import unquote
1011
from pathlib import Path
1112
import shutil
1213
import stat
1314
import tempfile
15+
import types
16+
17+
is_win32 = sys.platform == "win32"
18+
19+
fcntl: types.ModuleType | None
20+
try:
21+
import fcntl
22+
except ImportError:
23+
fcntl = None # not available on Windows
1424

1525
from ._base import BackendBase, ItemInfo, validate_name
1626
from .errors import BackendError, BackendAlreadyExists, BackendDoesNotExist, BackendMustNotBeOpen, BackendMustBeOpen
17-
from .errors import ObjectNotFound, PermissionDenied
18-
from ..constants import TMP_SUFFIX
27+
from .errors import ObjectNotFound, PermissionDenied, QuotaExceeded
28+
from ..constants import TMP_SUFFIX, QUOTA_STORE_NAME, QUOTA_PERSIST_DELTA, QUOTA_PERSIST_INTERVAL
1929

2030

21-
def get_file_backend(url, permissions=None):
31+
def get_file_backend(url, permissions=None, quota=None):
2232
# file:///absolute/path
2333
# notes:
2434
# - we only support **local** fs **absolute** paths.
@@ -42,23 +52,27 @@ def get_file_backend(url, permissions=None):
4252
if sys.platform in ("win32", "msys", "cygwin"):
4353
m = re.match(windows_file_regex, url, re.VERBOSE)
4454
if m:
45-
return PosixFS(path=unquote(m["drive_and_path"]), permissions=permissions)
55+
return PosixFS(path=unquote(m["drive_and_path"]), permissions=permissions, quota=quota)
4656
m = re.match(file_regex, url, re.VERBOSE)
4757
if m:
48-
return PosixFS(path=unquote(m["path"]), permissions=permissions)
58+
return PosixFS(path=unquote(m["path"]), permissions=permissions, quota=quota)
4959

5060

5161
class PosixFS(BackendBase):
5262
# PosixFS implementation supports precreate = True as well as = False.
5363
precreate_dirs: bool = False
5464

55-
def __init__(self, path, *, do_fsync=False, permissions=None):
65+
def __init__(self, path, *, do_fsync=False, permissions=None, quota=None):
5666
self.base_path = Path(path)
5767
if not self.base_path.is_absolute():
5868
raise BackendError(f"path must be an absolute path: {path}")
5969
self.opened = False
6070
self.do_fsync = do_fsync # False = 26x faster, see #10
6171
self.permissions = permissions or {} # name [str] -> granted_permissions [str]
72+
self.quota_limit = quota # maximum allowed storage size in bytes, None means unlimited
73+
self._quota_use = 0 # current tracked storage usage in bytes
74+
self._quota_use_persisted = 0 # last persisted value
75+
self._quota_last_persist_time = 0.0 # monotonic time of last persist
6276

6377
def _check_permission(self, name, required_permissions):
6478
"""
@@ -142,11 +156,17 @@ def open(self):
142156
raise BackendDoesNotExist(
143157
f"posixfs storage base path does not exist or is not a directory: {self.base_path}"
144158
)
159+
if self.quota_limit is not None:
160+
self._quota_persist(0)
161+
else:
162+
self._quota_delete()
145163
self.opened = True
146164

147165
def close(self):
148166
if not self.opened:
149167
raise BackendMustBeOpen()
168+
if self.quota_limit is not None:
169+
self._quota_update(0, force=True)
150170
self.opened = False
151171

152172
def _validate_join(self, name):
@@ -200,49 +220,62 @@ def load(self, name, *, size=None, offset=0):
200220
except FileNotFoundError:
201221
raise ObjectNotFound(name) from None
202222

203-
def store(self, name, value):
204-
def _write_to_tmpfile():
205-
with tempfile.NamedTemporaryFile(suffix=TMP_SUFFIX, dir=tmp_dir, delete=False) as f:
206-
f.write(value)
207-
if self.do_fsync:
208-
f.flush()
209-
os.fsync(f.fileno())
210-
tmp_path = Path(f.name)
211-
return tmp_path
223+
def _write_to_tempfile(self, path, value, suffix=TMP_SUFFIX, do_fsync=False):
224+
with tempfile.NamedTemporaryFile(suffix=suffix, dir=path, delete=False) as f:
225+
f.write(value)
226+
if do_fsync:
227+
f.flush()
228+
os.fsync(f.fileno())
229+
tmp_path = Path(f.name)
230+
return tmp_path
212231

232+
def store(self, name, value):
213233
if not self.opened:
214234
raise BackendMustBeOpen()
215235
path = self._validate_join(name)
216-
self._check_permission(name, "W" if path.exists() else "wW")
236+
overwrite = path.exists()
237+
self._check_permission(name, "W" if overwrite else "wW")
238+
if self.quota_limit is not None:
239+
old_size = path.stat().st_size if overwrite else 0
240+
new_size = len(value)
241+
delta = new_size - old_size
242+
if self._quota_use + delta > self.quota_limit:
243+
raise QuotaExceeded(f"Quota exceeded: {self._quota_use + delta} > {self.quota_limit}")
217244
tmp_dir = path.parent
218245
# write to a differently named temp file in same directory first,
219246
# so the store never sees partially written data.
220247
try:
221248
# try to do it quickly, not doing the mkdir. fs ops might be slow, esp. on network fs (latency).
222249
# this will frequently succeed, because the dir is already there.
223-
tmp_path = _write_to_tmpfile()
250+
tmp_path = self._write_to_tempfile(tmp_dir, value, do_fsync=self.do_fsync)
224251
except FileNotFoundError:
225252
# retry, create potentially missing dirs first. this covers these cases:
226253
# - either the dirs were not precreated
227254
# - a previously existing directory was "lost" in the filesystem
228255
tmp_dir.mkdir(parents=True, exist_ok=True)
229-
tmp_path = _write_to_tmpfile()
256+
tmp_path = self._write_to_tempfile(tmp_dir, value, do_fsync=self.do_fsync)
230257
# all written and synced to disk, rename it to the final name:
231258
try:
232259
tmp_path.replace(path)
233260
except OSError:
234261
tmp_path.unlink()
235262
raise
263+
if self.quota_limit is not None:
264+
self._quota_update(delta)
236265

237266
def delete(self, name):
238267
if not self.opened:
239268
raise BackendMustBeOpen()
240269
path = self._validate_join(name)
241270
self._check_permission(name, "D")
242271
try:
272+
if self.quota_limit is not None:
273+
size = path.stat().st_size
243274
path.unlink()
244275
except FileNotFoundError:
245276
raise ObjectNotFound(name) from None
277+
if self.quota_limit is not None:
278+
self._quota_update(-size)
246279

247280
def move(self, curr_name, new_name):
248281
def _rename_to_new_name():
@@ -329,3 +362,96 @@ def list(self, name):
329362
else:
330363
is_dir = stat.S_ISDIR(st.st_mode)
331364
yield ItemInfo(name=p.name, exists=True, size=st.st_size, directory=is_dir)
365+
366+
def quota(self) -> dict:
367+
"""Return quota information: limit and usage in bytes. -1 means not set / not tracked."""
368+
if self.quota_limit is None:
369+
return dict(limit=-1, usage=-1)
370+
return dict(limit=self.quota_limit, usage=self._quota_use)
371+
372+
def _quota_path(self):
373+
return self.base_path / QUOTA_STORE_NAME
374+
375+
def _quota_scan(self, path, skips):
376+
"""Scan the filesystem to determine actual storage usage."""
377+
total = 0
378+
with os.scandir(path) as it:
379+
for entry in it:
380+
if entry.is_file(follow_symlinks=False):
381+
if os.path.abspath(entry.path) not in skips:
382+
total += entry.stat(follow_symlinks=False).st_size
383+
elif entry.is_dir(follow_symlinks=False):
384+
total += self._quota_scan(entry.path, skips)
385+
return total
386+
387+
def _quota_persist(self, delta):
388+
"""Persist quota usage to the on-disk quota file.
389+
390+
To support concurrent sessions, this method applies the given *delta*
391+
to the current on-disk value under an exclusive file lock. This way,
392+
updates from other sessions are preserved.
393+
394+
If the quota file does not exist or contains an invalid value, a
395+
filesystem scan is performed to determine the actual usage.
396+
397+
The quota file itself is used as the lock file (opened and locked
398+
with flock) so no separate lock file is needed.
399+
"""
400+
quota_path = self._quota_path()
401+
try:
402+
fd = os.open(str(quota_path), os.O_RDONLY)
403+
except FileNotFoundError:
404+
# quota file missing, scan filesystem to determine usage
405+
skips = {os.path.abspath(quota_path)}
406+
quota_use = self._quota_scan(self.base_path, skips)
407+
quota_path.write_text(str(quota_use))
408+
self._quota_use_persisted = quota_use
409+
self._quota_use = quota_use
410+
self._quota_last_persist_time = time.monotonic()
411+
return
412+
try:
413+
if fcntl is not None:
414+
fcntl.flock(fd, fcntl.LOCK_EX)
415+
# read current on-disk value (may have been updated by another session)
416+
try:
417+
on_disk = int(os.read(fd, 100))
418+
except ValueError:
419+
# invalid content, scan filesystem to determine usage
420+
skips = {os.path.abspath(quota_path)}
421+
on_disk = self._quota_scan(self.base_path, skips)
422+
delta = 0 # scan already gives the true value
423+
if is_win32:
424+
# Close the file before replacing to avoid AccessDenied on Windows.
425+
os.close(fd)
426+
fd = -1
427+
new_value = max(on_disk + delta, 0)
428+
quota_content = str(new_value).encode()
429+
tmp_path = self._write_to_tempfile(quota_path.parent, quota_content, do_fsync=True)
430+
try:
431+
tmp_path.replace(quota_path) # atomic update
432+
except OSError:
433+
tmp_path.unlink()
434+
raise
435+
self._quota_use_persisted = new_value
436+
self._quota_use = new_value # re-sync with on-disk truth
437+
self._quota_last_persist_time = time.monotonic()
438+
finally:
439+
if fcntl is not None:
440+
fcntl.flock(fd, fcntl.LOCK_UN)
441+
if fd >= 0:
442+
os.close(fd)
443+
444+
def _quota_update(self, delta, force=False):
445+
"""Update quota usage by delta and persist if the change is significant or enough time has elapsed."""
446+
self._quota_use += delta
447+
persist_delta = self._quota_use - self._quota_use_persisted
448+
elapsed = time.monotonic() - self._quota_last_persist_time
449+
if force or abs(persist_delta) >= QUOTA_PERSIST_DELTA or elapsed >= QUOTA_PERSIST_INTERVAL:
450+
self._quota_persist(persist_delta)
451+
452+
def _quota_delete(self):
453+
"""Delete the quota file if it exists."""
454+
try:
455+
self._quota_path().unlink()
456+
except FileNotFoundError:
457+
pass

src/borgstore/backends/rest.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
BackendAlreadyExists,
3030
BackendDoesNotExist,
3131
PermissionDenied,
32+
QuotaExceeded,
3233
BackendError,
3334
BackendMustBeOpen,
3435
BackendMustNotBeOpen,
@@ -127,6 +128,8 @@ def _handle_response(self, response, name=None):
127128
raise BackendError(response.text)
128129
if response.status_code == HTTP.FORBIDDEN:
129130
raise PermissionDenied(name or self.base_url)
131+
if response.status_code == HTTP.INSUFFICIENT_STORAGE:
132+
raise QuotaExceeded(response.text)
130133
if response.status_code == HTTP.BAD_REQUEST:
131134
raise ValueError(response.text)
132135
response.raise_for_status()
@@ -237,6 +240,12 @@ def defrag(self, sources, *, target=None, algorithm=None, namespace=None, levels
237240
self._handle_response(response, "defrag")
238241
return response.text
239242

243+
def quota(self) -> dict:
244+
self._assert_open()
245+
response = self._request("post", self._url(""), params={"cmd": "quota"})
246+
self._handle_response(response, "quota")
247+
return response.json()
248+
240249
def hash(self, name: str, algorithm: str = "sha256") -> str:
241250
self._assert_open()
242251
validate_name(name)

src/borgstore/constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
# Filename suffixes used for special purposes
77
TMP_SUFFIX = ".tmp" # Temporary file while being uploaded/written
88
DEL_SUFFIX = ".del" # "Soft-deleted" item; can be undeleted
9+
HID_SUFFIX = ".hid" # Hidden internal file, not accessible by users
910

1011
# Maximum name length (not precise; suffixes might be added!)
1112
MAX_NAME_LENGTH = 100 # Being rather conservative here to improve portability between backends and platforms
13+
14+
# Quota tracking
15+
QUOTA_STORE_NAME = "quota.hid" # Hidden file storing current quota usage
16+
QUOTA_PERSIST_DELTA = 10 * 1000 * 1000 # Persist quota if usage changed by at least 10MB
17+
QUOTA_PERSIST_INTERVAL = 300 # Persist quota if at least 5 minutes have elapsed

0 commit comments

Comments
 (0)