Coverage for src / lilbee / runtime / lock.py: 100%
29 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Write locking for LanceDB access.
3Combines an in-process mutex with a cross-process file lock (filelock)
4so separate processes also coordinate writes. Read consistency is handled
5by LanceDB's built-in MVCC via ``read_consistency_interval`` in
6``lilbee.data.store``.
7"""
9import logging
10import threading
11from collections.abc import Generator
12from contextlib import contextmanager
13from pathlib import Path
15from filelock import FileLock
16from filelock import Timeout as FileLockTimeout
18from lilbee.core.config import cfg
20log = logging.getLogger(__name__)
22# Default timeout (seconds) for acquiring the write lock
23LOCK_TIMEOUT = 30.0
26class LockTimeoutError(TimeoutError):
27 """Raised when a lock cannot be acquired within the timeout."""
30# In-process write mutex: serializes writers within the same process
31_write_mutex = threading.Lock()
34def _lock_path() -> Path:
35 return cfg.lancedb_dir / ".lock"
38@contextmanager
39def write_lock(timeout: float = LOCK_TIMEOUT) -> Generator[None, None, None]:
40 """Context manager: acquire exclusive file lock then in-process mutex."""
41 flock = FileLock(_lock_path())
42 try:
43 flock.acquire(timeout=timeout)
44 except FileLockTimeout:
45 raise LockTimeoutError("Timed out waiting for exclusive file lock") from None
46 try:
47 acquired = _write_mutex.acquire(timeout=timeout)
48 if not acquired:
49 raise LockTimeoutError("Timed out waiting for write lock")
50 try:
51 yield
52 finally:
53 _write_mutex.release()
54 finally:
55 flock.release()