Coverage for src / lilbee / server / routes / search.py: 100%

58 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""Search, ask, ask_stream, chat, and chat_stream route handlers.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6from collections.abc import AsyncGenerator 

7 

8from litestar import get, post 

9from litestar.exceptions import HTTPException, ValidationException 

10from litestar.params import Parameter 

11from litestar.response import Stream 

12 

13from lilbee.core.results import DocumentResult 

14from lilbee.data.store import scope_to_chunk_type 

15from lilbee.retrieval.query import ChatMessage as ChatMessageDict 

16from lilbee.server import handlers 

17from lilbee.server.auth import read_only 

18from lilbee.server.models import ( 

19 AskRequest, 

20 AskResponse, 

21 ChatRequest, 

22) 

23 

24# Process-wide lock that gates the two streaming chat endpoints to one 

25# in-flight request at a time. The llama-cpp provider already serializes 

26# concurrent chat() calls under a thread lock, so a second concurrent 

27# stream blocks the client for many seconds with no feedback. Returning 

28# 429 + Retry-After fast lets clients surface a real error and decide. 

29# The lock binds to the worker's running event loop on first acquire. 

30_chat_inflight_lock = asyncio.Lock() 

31 

32 

33def _acquire_chat_lock_or_raise() -> None: 

34 """Non-blocking acquire on the running loop thread; raise 429 on contention. 

35 

36 Race-free because route handlers run on a single event loop thread and 

37 ``Lock.acquire()`` on a free lock returns synchronously without yielding. 

38 The check + acquire is atomic from the loop's perspective, no ``await`` 

39 can intervene between the two calls. 

40 """ 

41 if _chat_inflight_lock.locked(): 

42 raise HTTPException(status_code=429, headers={"Retry-After": "1"}) 

43 

44 

45async def _gated_stream( 

46 generator: AsyncGenerator[str, None], 

47) -> AsyncGenerator[str, None]: 

48 """Wrap *generator* so the chat lock is released when the stream ends. 

49 

50 The lock must already be held when this is called. Release happens on 

51 natural completion, exception, and client-disconnect (GeneratorExit 

52 fires the ``finally`` block). 

53 """ 

54 try: 

55 async for chunk in generator: 

56 yield chunk 

57 finally: 

58 _chat_inflight_lock.release() 

59 

60 

61@get("/api/search") 

62@read_only 

63async def search_route( 

64 q: str = Parameter(query="q"), 

65 top_k: int = Parameter(query="top_k", default=5, le=100), 

66 chunk_type: str | None = Parameter(query="chunk_type", default=None), 

67) -> list[DocumentResult]: 

68 """Search indexed documents by semantic similarity. No LLM call required.""" 

69 try: 

70 chunk_type = scope_to_chunk_type(chunk_type) 

71 except ValueError as exc: 

72 raise ValidationException(str(exc)) from exc 

73 try: 

74 return await handlers.search(q, top_k=top_k, chunk_type=chunk_type) 

75 except ValueError as exc: 

76 raise ValidationException(str(exc)) from exc 

77 except Exception as exc: 

78 raise HTTPException(status_code=503, detail=str(exc)) from exc 

79 

80 

81@post("/api/ask") 

82async def ask_route(data: AskRequest) -> AskResponse: 

83 """One-shot RAG question returning an answer with source chunks.""" 

84 try: 

85 return await handlers.ask( 

86 question=data.question, 

87 top_k=data.top_k, 

88 options=data.options, 

89 chunk_type=data.chunk_type, 

90 ) 

91 except ValueError as exc: 

92 raise ValidationException(str(exc)) from exc 

93 except Exception as exc: 

94 raise HTTPException(status_code=503, detail=str(exc)) from exc 

95 

96 

97@post("/api/ask/stream") 

98async def ask_stream_route(data: AskRequest) -> Stream: 

99 """Streaming SSE version of ask, emitting token-by-token answer chunks.""" 

100 _acquire_chat_lock_or_raise() 

101 await _chat_inflight_lock.acquire() 

102 return Stream( 

103 _gated_stream( 

104 handlers.ask_stream( 

105 question=data.question, 

106 top_k=data.top_k, 

107 options=data.options, 

108 chunk_type=data.chunk_type, 

109 ), 

110 ), 

111 media_type="text/event-stream", 

112 ) 

113 

114 

115@post("/api/chat") 

116async def chat_route(data: ChatRequest) -> AskResponse: 

117 """RAG chat with conversation history, returning an answer with sources.""" 

118 history: list[ChatMessageDict] = [ 

119 ChatMessageDict(role=m.role, content=m.content) for m in data.history 

120 ] 

121 return await handlers.chat( 

122 question=data.question, 

123 history=history, 

124 top_k=data.top_k, 

125 options=data.options, 

126 chunk_type=data.chunk_type, 

127 ) 

128 

129 

130@post("/api/chat/stream") 

131async def chat_stream_route(data: ChatRequest) -> Stream: 

132 """Streaming SSE version of chat with conversation history.""" 

133 _acquire_chat_lock_or_raise() 

134 await _chat_inflight_lock.acquire() 

135 history: list[ChatMessageDict] = [ 

136 ChatMessageDict(role=m.role, content=m.content) for m in data.history 

137 ] 

138 return Stream( 

139 _gated_stream( 

140 handlers.chat_stream( 

141 question=data.question, 

142 history=history, 

143 top_k=data.top_k, 

144 options=data.options, 

145 chunk_type=data.chunk_type, 

146 ), 

147 ), 

148 media_type="text/event-stream", 

149 )