Coverage for src / lilbee / server / routes / search.py: 100%
58 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-15 20:55 +0000
1"""Search, ask, ask_stream, chat, and chat_stream route handlers."""
3from __future__ import annotations
5import asyncio
6from collections.abc import AsyncGenerator
8from litestar import get, post
9from litestar.exceptions import HTTPException, ValidationException
10from litestar.params import Parameter
11from litestar.response import Stream
13from lilbee.core.results import DocumentResult
14from lilbee.data.store import scope_to_chunk_type
15from lilbee.retrieval.query import ChatMessage as ChatMessageDict
16from lilbee.server import handlers
17from lilbee.server.auth import read_only
18from lilbee.server.models import (
19 AskRequest,
20 AskResponse,
21 ChatRequest,
22)
24# Process-wide lock that gates the two streaming chat endpoints to one
25# in-flight request at a time. The llama-cpp provider already serializes
26# concurrent chat() calls under a thread lock, so a second concurrent
27# stream blocks the client for many seconds with no feedback. Returning
28# 429 + Retry-After fast lets clients surface a real error and decide.
29# The lock binds to the worker's running event loop on first acquire.
30_chat_inflight_lock = asyncio.Lock()
33def _acquire_chat_lock_or_raise() -> None:
34 """Non-blocking acquire on the running loop thread; raise 429 on contention.
36 Race-free because route handlers run on a single event loop thread and
37 ``Lock.acquire()`` on a free lock returns synchronously without yielding.
38 The check + acquire is atomic from the loop's perspective, no ``await``
39 can intervene between the two calls.
40 """
41 if _chat_inflight_lock.locked():
42 raise HTTPException(status_code=429, headers={"Retry-After": "1"})
45async def _gated_stream(
46 generator: AsyncGenerator[str, None],
47) -> AsyncGenerator[str, None]:
48 """Wrap *generator* so the chat lock is released when the stream ends.
50 The lock must already be held when this is called. Release happens on
51 natural completion, exception, and client-disconnect (GeneratorExit
52 fires the ``finally`` block).
53 """
54 try:
55 async for chunk in generator:
56 yield chunk
57 finally:
58 _chat_inflight_lock.release()
61@get("/api/search")
62@read_only
63async def search_route(
64 q: str = Parameter(query="q"),
65 top_k: int = Parameter(query="top_k", default=5, le=100),
66 chunk_type: str | None = Parameter(query="chunk_type", default=None),
67) -> list[DocumentResult]:
68 """Search indexed documents by semantic similarity. No LLM call required."""
69 try:
70 chunk_type = scope_to_chunk_type(chunk_type)
71 except ValueError as exc:
72 raise ValidationException(str(exc)) from exc
73 try:
74 return await handlers.search(q, top_k=top_k, chunk_type=chunk_type)
75 except ValueError as exc:
76 raise ValidationException(str(exc)) from exc
77 except Exception as exc:
78 raise HTTPException(status_code=503, detail=str(exc)) from exc
81@post("/api/ask")
82async def ask_route(data: AskRequest) -> AskResponse:
83 """One-shot RAG question returning an answer with source chunks."""
84 try:
85 return await handlers.ask(
86 question=data.question,
87 top_k=data.top_k,
88 options=data.options,
89 chunk_type=data.chunk_type,
90 )
91 except ValueError as exc:
92 raise ValidationException(str(exc)) from exc
93 except Exception as exc:
94 raise HTTPException(status_code=503, detail=str(exc)) from exc
97@post("/api/ask/stream")
98async def ask_stream_route(data: AskRequest) -> Stream:
99 """Streaming SSE version of ask, emitting token-by-token answer chunks."""
100 _acquire_chat_lock_or_raise()
101 await _chat_inflight_lock.acquire()
102 return Stream(
103 _gated_stream(
104 handlers.ask_stream(
105 question=data.question,
106 top_k=data.top_k,
107 options=data.options,
108 chunk_type=data.chunk_type,
109 ),
110 ),
111 media_type="text/event-stream",
112 )
115@post("/api/chat")
116async def chat_route(data: ChatRequest) -> AskResponse:
117 """RAG chat with conversation history, returning an answer with sources."""
118 history: list[ChatMessageDict] = [
119 ChatMessageDict(role=m.role, content=m.content) for m in data.history
120 ]
121 return await handlers.chat(
122 question=data.question,
123 history=history,
124 top_k=data.top_k,
125 options=data.options,
126 chunk_type=data.chunk_type,
127 )
130@post("/api/chat/stream")
131async def chat_stream_route(data: ChatRequest) -> Stream:
132 """Streaming SSE version of chat with conversation history."""
133 _acquire_chat_lock_or_raise()
134 await _chat_inflight_lock.acquire()
135 history: list[ChatMessageDict] = [
136 ChatMessageDict(role=m.role, content=m.content) for m in data.history
137 ]
138 return Stream(
139 _gated_stream(
140 handlers.chat_stream(
141 question=data.question,
142 history=history,
143 top_k=data.top_k,
144 options=data.options,
145 chunk_type=data.chunk_type,
146 ),
147 ),
148 media_type="text/event-stream",
149 )