Coverage for src / lilbee / runtime / lock.py: 100%

29 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Write locking for LanceDB access. 

2 

3Combines an in-process mutex with a cross-process file lock (filelock) 

4so separate processes also coordinate writes. Read consistency is handled 

5by LanceDB's built-in MVCC via ``read_consistency_interval`` in 

6``lilbee.data.store``. 

7""" 

8 

9import logging 

10import threading 

11from collections.abc import Generator 

12from contextlib import contextmanager 

13from pathlib import Path 

14 

15from filelock import FileLock 

16from filelock import Timeout as FileLockTimeout 

17 

18from lilbee.core.config import cfg 

19 

20log = logging.getLogger(__name__) 

21 

22# Default timeout (seconds) for acquiring the write lock 

23LOCK_TIMEOUT = 30.0 

24 

25 

26class LockTimeoutError(TimeoutError): 

27 """Raised when a lock cannot be acquired within the timeout.""" 

28 

29 

30# In-process write mutex: serializes writers within the same process 

31_write_mutex = threading.Lock() 

32 

33 

34def _lock_path() -> Path: 

35 return cfg.lancedb_dir / ".lock" 

36 

37 

38@contextmanager 

39def write_lock(timeout: float = LOCK_TIMEOUT) -> Generator[None, None, None]: 

40 """Context manager: acquire exclusive file lock then in-process mutex.""" 

41 flock = FileLock(_lock_path()) 

42 try: 

43 flock.acquire(timeout=timeout) 

44 except FileLockTimeout: 

45 raise LockTimeoutError("Timed out waiting for exclusive file lock") from None 

46 try: 

47 acquired = _write_mutex.acquire(timeout=timeout) 

48 if not acquired: 

49 raise LockTimeoutError("Timed out waiting for write lock") 

50 try: 

51 yield 

52 finally: 

53 _write_mutex.release() 

54 finally: 

55 flock.release()