Coverage for src / lilbee / crawler / save.py: 100%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-15 20:55 +0000

1"""URL-to-filename mapping, metadata I/O, and per-page save-to-disk.""" 

2 

3from __future__ import annotations 

4 

5import hashlib 

6import json 

7import logging 

8import re 

9import tempfile 

10from dataclasses import dataclass 

11from pathlib import Path 

12from urllib.parse import urlparse 

13 

14from lilbee.core.config import cfg 

15from lilbee.core.security import validate_path_within 

16from lilbee.crawler.models import CrawlResult 

17 

18log = logging.getLogger(__name__) 

19 

20# Maximum filename length before truncation (most filesystems cap at 255 bytes) 

21_MAX_FILENAME_LEN = 200 

22 

23# Sentinel for index pages (trailing slash or empty path) 

24_INDEX_FILENAME = "index.md" 

25 

26# How often the crawl metadata JSON is rewritten during a streaming crawl. 

27# Markdown files are durable per-page; metadata batches to keep write volume 

28# bounded. Worst-case loss on crash is N-1 entries, recoverable from the files. 

29METADATA_FLUSH_INTERVAL = 10 

30 

31 

32def url_to_filename(url: str) -> str: 

33 """Convert a URL to a safe filesystem path ending in .md. 

34 

35 Examples: 

36 https://docs.python.org/3/tutorial/ → docs.python.org/3/tutorial/index.md 

37 https://example.com/page?q=1#frag → example.com/page.md 

38 https://example.com/ → example.com/index.md 

39 """ 

40 parsed = urlparse(url) 

41 host = parsed.hostname or "unknown" 

42 path = parsed.path.rstrip("/") 

43 

44 if not path or path == "/": 

45 return f"{host}/{_INDEX_FILENAME}" 

46 

47 # Strip leading slash 

48 path = path.lstrip("/") 

49 

50 # Neutralize path traversal segments 

51 path = re.sub(r"\.\.+", "_", path) 

52 

53 # Replace unsafe filesystem characters 

54 path = re.sub(r'[<>:"|?*]', "_", path) 

55 

56 # If the last segment has no extension, treat as directory 

57 last_segment = path.rsplit("/", 1)[-1] 

58 if "." not in last_segment: 

59 path = f"{path}/{_INDEX_FILENAME}" 

60 else: 

61 # Replace existing extension with .md 

62 path = re.sub(r"\.[^./]+$", ".md", path) 

63 

64 full = f"{host}/{path}" 

65 

66 # Truncate if too long, preserving .md extension 

67 if len(full) > _MAX_FILENAME_LEN: 

68 url_hash = hashlib.sha256(url.encode()).hexdigest()[:12] 

69 full = full[: _MAX_FILENAME_LEN - 16] + f"_{url_hash}.md" 

70 

71 return full 

72 

73 

74def _web_dir() -> Path: 

75 """Return the _web/ subdirectory under documents.""" 

76 return cfg.documents_dir / "_web" 

77 

78 

79def _crawl_meta_path() -> Path: 

80 """Path to the crawl metadata sidecar JSON.""" 

81 return cfg.data_dir / "crawl_meta.json" 

82 

83 

84@dataclass 

85class CrawlMeta: 

86 """Metadata for a single crawled URL.""" 

87 

88 file: str 

89 content_hash: str 

90 crawled_at: str 

91 

92 

93def load_crawl_metadata() -> dict[str, CrawlMeta]: 

94 """Load URL→metadata mapping from the JSON sidecar.""" 

95 path = _crawl_meta_path() 

96 if not path.exists(): 

97 return {} 

98 try: 

99 raw = json.loads(path.read_text(encoding="utf-8")) 

100 except (json.JSONDecodeError, OSError): 

101 return {} 

102 result: dict[str, CrawlMeta] = {} 

103 for url, data in raw.items(): 

104 try: 

105 result[url] = CrawlMeta(**data) 

106 except (TypeError, KeyError): 

107 log.warning("Skipping malformed crawl metadata entry: %s", url) 

108 return result 

109 

110 

111def save_crawl_metadata(meta: dict[str, CrawlMeta]) -> None: 

112 """Persist URL→metadata mapping to the JSON sidecar (atomic write).""" 

113 path = _crawl_meta_path() 

114 path.parent.mkdir(parents=True, exist_ok=True) 

115 serializable = { 

116 url: {"file": m.file, "content_hash": m.content_hash, "crawled_at": m.crawled_at} 

117 for url, m in meta.items() 

118 } 

119 tmp_name: str | None = None 

120 try: 

121 with tempfile.NamedTemporaryFile(dir=path.parent, suffix=".tmp", delete=False) as tmp: 

122 tmp_name = tmp.name 

123 tmp.write(json.dumps(serializable, indent=2).encode("utf-8")) 

124 Path(tmp_name).replace(path) 

125 except BaseException: 

126 if tmp_name is not None: 

127 Path(tmp_name).unlink(missing_ok=True) 

128 raise 

129 

130 

131def content_hash(text: str) -> str: 

132 """SHA-256 hex digest of text content.""" 

133 return hashlib.sha256(text.encode()).hexdigest() 

134 

135 

136@dataclass(frozen=True) 

137class SaveOutcome: 

138 """Return value of ``_save_single_result``: written path and the hash/filename used.""" 

139 

140 path: Path 

141 filename: str 

142 content_hash: str 

143 

144 

145def _save_single_result(result: CrawlResult, meta: dict[str, CrawlMeta]) -> SaveOutcome | None: 

146 """Write one crawl result to disk if it's new or changed. 

147 

148 Returns the outcome (written path plus reusable filename/hash), or 

149 None if skipped (failure, empty markdown, unchanged hash with file 

150 on disk, or blocked by path traversal). 

151 """ 

152 if not result.success or not result.markdown.strip(): 

153 return None 

154 filename = url_to_filename(result.url) 

155 web_dir = _web_dir() 

156 file_path = web_dir / filename 

157 resolved_web_dir = web_dir.resolve() 

158 try: 

159 validate_path_within(file_path, resolved_web_dir) 

160 except ValueError: 

161 log.warning("Path traversal blocked: %s -> %s", result.url, file_path) 

162 return None 

163 new_hash = content_hash(result.markdown) 

164 prev = meta.get(result.url) 

165 if prev is not None and prev.content_hash == new_hash and file_path.exists(): 

166 log.info("Content unchanged, skipping save: %s", result.url) 

167 return None 

168 file_path.parent.mkdir(parents=True, exist_ok=True) 

169 file_path.write_text(result.markdown, encoding="utf-8") 

170 return SaveOutcome(path=file_path, filename=filename, content_hash=new_hash) 

171 

172 

173def _update_single_metadata( 

174 meta: dict[str, CrawlMeta], 

175 url: str, 

176 outcome: SaveOutcome, 

177 now: str, 

178) -> None: 

179 """Update the metadata dict in place with a previously-computed outcome.""" 

180 meta[url] = CrawlMeta( 

181 file=outcome.filename, 

182 content_hash=outcome.content_hash, 

183 crawled_at=now, 

184 )