diff --git a/HANDOVER.md b/HANDOVER.md new file mode 100644 index 0000000..43a8e6c --- /dev/null +++ b/HANDOVER.md @@ -0,0 +1,227 @@ +# xhs_video_crawler 工作移交文档 + +移交日期:2026-06-08 + +## 项目概述 + +本项目用于探索和验证小红书公开视频内容的采集和下载链路。当前实现采用人工登录浏览器 + 本地脚本附着调试端口的方式运行,不包含自动登录、验证码绕过或私有接口调用。 + +核心入口: + +- `login_xhs.py`:启动带远程调试端口的 Chrome,并打开小红书页面供人工登录。 +- `XHS.py`:附着到已登录 Chrome,发现笔记链接,解析视频地址,下载视频并写入元数据。 +- `test_xhs.py`、`test_login_xhs.py`:单元测试。 +- `docs/superpowers/`:需求和实现计划文档,记录了浏览器 feed 下载、长任务队列、搜索来源等设计背景。 + +## 当前能力 + +### 单次发现页下载 + +登录后可直接下载当前发现页或指定起始页的视频: + +```bash +./.venv/bin/python XHS.py --max-videos 10 +``` + +### 长任务队列下载 + +长任务使用 JSONL 队列文件保存状态,支持中断后继续: + +```bash +./.venv/bin/python XHS.py \ + --source video-channel \ + --target-videos 500 \ + --queue-file data/xhs_500_queue.jsonl \ + --metadata-file data/xhs_500_metadata.jsonl \ + --report-file data/xhs_500_report.json \ + --output-dir video/xhs_500 \ + --max-runtime 28800 \ + --timeout 25 \ + --retry-limit 2 \ + --min-wait 2 \ + --max-wait 6 \ + --long-break-every 20 +``` + +队列状态: + +- `pending`:待处理。 +- `downloaded`:已下载。 +- `skipped_image`:无视频候选,通常是图文笔记或详情页没有暴露视频地址。 +- `failed`:超过重试次数仍失败,例如响应过小、网络错误等。 + +### 搜索来源下载 + +支持关键词搜索来源: + +```bash +./.venv/bin/python XHS.py \ + --source search \ + --keyword 猫咪 \ + --target-videos 100 \ + --queue-file data/search_cat_queue.jsonl +``` + +## 近期重要改动 + +本次移交提交中包含长任务稳定性补强: + +- 下载改为 `requests.get(..., stream=True)` 分块写入,避免大文件一次性读入内存。 +- 下载时先写入 `.part` 临时文件,校验通过后再替换为正式 `.mp4`;失败时清理 `.part`。 +- 增加 `DEFAULT_MIN_VIDEO_BYTES = 200 * 1024`,默认拒绝过小视频响应,避免把异常响应保存为视频。 +- 队列模式支持 `--min-video-bytes` 和 `--report-file`。 +- 队列任务结束后生成运行报告 JSON,包含下载数量、队列状态、元数据行数、评论统计、目录大小、耗时等。 +- 元数据中增加 `file_size_bytes`。 +- 新增对应单元测试,覆盖流式下载、过小响应拒绝、CLI 参数透传、运行报告统计。 + +## 500 条长任务验证结果 + +2026-06-01 至 2026-06-02 跑过一次真实 500 条长任务验证。 + +命令使用 `video-channel` 来源,目标 `500` 条,输出到: + +- 视频目录:`video/xhs_500` +- 队列文件:`data/xhs_500_queue.jsonl` +- 元数据文件:`data/xhs_500_metadata.jsonl` +- 运行报告:`data/xhs_500_report.json` + +验证结果: + +- 下载成功:500 +- 本地 mp4 文件:500 +- 元数据行数:500 +- `.part` 临时文件残留:0 +- 输出目录大小:约 4.5 GB +- 总耗时:19052.52 秒,约 5 小时 17 分钟 +- 队列最终状态: + - `downloaded`: 500 + - `skipped_image`: 147 + - `failed`: 2 + - `pending`: 50 + +两个 `failed` 都是 `视频响应过小`,被最小字节数保护逻辑拦截。`skipped_image` 主要来自视频频道队列里混入无视频候选的笔记。 + +结论:当前链路可以稳定跑 500 条级别长任务。主要瓶颈不是进程稳定性,而是视频频道来源筛选精度和页面内容密度。 + +## 运行环境 + +推荐环境: + +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install requests DrissionPage +``` + +启动浏览器: + +```bash +./.venv/bin/python login_xhs.py --browser-port 9223 +``` + +注意事项: + +- Chrome 必须保持运行,脚本通过 `127.0.0.1:9223` 附着。 +- 登录和验证码需要人工处理。 +- 长任务期间不要让电脑睡眠、断网或关闭 Chrome。 +- 如果任务中断,保留 `queue-file` 后重新执行相同命令即可继续。 + +## 数据和产物 + +以下目录在 `.gitignore` 中,正常不应提交: + +- `.venv/` +- `.xhs-chrome-profile/` +- `data/` +- `video/` +- `data_queue_smoke/` +- `data_search_smoke/` +- `video_queue_smoke/` +- `video_search_smoke/` +- `video_human_test/` + +原因: + +- `.xhs-chrome-profile/` 可能包含本地浏览器登录态。 +- `data/` 和 `video/` 是运行产物,体积大,且可能包含采集数据。 + +## 测试 + +提交前建议运行: + +```bash +python3 -m unittest test_xhs.py test_login_xhs.py -v +``` + +当前测试覆盖重点: + +- 可选运行依赖缺失时模块可导入。 +- Chrome 调试端口检测。 +- feed 和 HTML 中视频候选提取。 +- 元数据、评论、输出文件名构造。 +- 队列加载、保存、去重和状态流转。 +- 搜索和视频频道来源构造。 +- 流式下载和异常响应校验。 +- 队列运行报告统计。 + +## 已知问题和风险 + +1. 视频频道来源筛选不够精确 + + 队列里会混入图文或详情页无视频候选笔记,500 条验证中有 147 条 `skipped_image`。这不影响稳定性,但会降低有效下载效率。 + +2. 页面依赖小红书前端结构 + + 当前依赖页面 DOM、内嵌状态和浏览器 feed 响应。如果小红书前端结构变化,解析器可能需要更新。 + +3. 不适合无人值守处理验证码 + + 项目设计前提是人工登录和人工处理验证码,不做自动绕过。 + +4. 单进程串行下载速度有限 + + 当前设计偏稳健,默认有人类化等待、详情页打开、评论加载和长停留。500 条真实验证耗时约 5 小时 17 分钟。 + +5. 运行产物没有进入 Git + + 真实验证产物保存在本地 `data/`、`video/`,默认不提交。如需归档,应走对象存储、网盘或其他专门的数据归档流程。 + +## 建议后续优化 + +1. 提高视频卡片筛选精度 + + 优先优化 `collect_note_urls_from_page(..., video_only=True)` 的 DOM 判断,降低 `skipped_image` 比例。 + +2. 增加任务心跳 + + 可周期性写 `heartbeat` 或增量报告,避免长任务中途只能从队列文件推断状态。 + +3. 增加中断报告 + + 当前报告在正常结束时写入。后续可以在 `KeyboardInterrupt` 或异常退出时也写入当前队列摘要。 + +4. 将长任务参数固化成脚本 + + 可以新增 `scripts/run_xhs_long_task.sh` 或类似入口,减少手动拼命令出错。 + +5. 降低评论采集开销 + + 如果只关注视频下载稳定性,可设置 `--max-comments 0`,能明显提升吞吐。 + +6. 增加数据入库或对象存储 + + 目前产物是本地 mp4 + JSONL。若后续要用于生产流程,应考虑上传对象存储并将元数据同步到数据库。 + +## 接手建议 + +接手时建议按以下顺序熟悉: + +1. 阅读 `README.md`,跑通 1 到 3 条小规模下载。 +2. 阅读 `XHS.py` 中 `run_queue_download`、`download_video`、`extract_video_candidates*`、`collect_note_urls*`。 +3. 运行完整单测。 +4. 用 `--target-videos 20` 跑一次队列模式,查看 queue、metadata、report 三类产物。 +5. 再考虑改动来源筛选或任务报告能力。 + +## 合规边界 + +本项目仅用于技术学习、链路验证和授权范围内的数据处理。使用时应遵守平台服务条款、robots 协议和相关法律法规,不应绕过访问控制、验证码或平台风控,不应采集或传播未授权内容。 diff --git a/XHS.py b/XHS.py index 1b54fc0..9de1d25 100644 --- a/XHS.py +++ b/XHS.py @@ -20,6 +20,7 @@ DEFAULT_EXPLORE_URL = "https://www.xiaohongshu.com/explore" DEFAULT_VIDEO_CHANNEL_URL = "https://www.xiaohongshu.com/explore?channel_id=homefeed.video_v3" DEFAULT_BROWSER_PORT = 9223 DEFAULT_OUTPUT_DIR = Path("video") +DEFAULT_MIN_VIDEO_BYTES = 200 * 1024 LISTEN_TARGET = "/api/sns/web/v1/feed" MAX_FILENAME_BYTES = 240 INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]') @@ -651,15 +652,44 @@ def download_video( headers: dict[str, str], video_url: str, output_path: Path, + min_video_bytes: int = DEFAULT_MIN_VIDEO_BYTES, ) -> None: - response = requests_module.get(video_url, headers=headers, timeout=60) + response = requests_module.get(video_url, headers=headers, timeout=60, stream=True) response.raise_for_status() - validate_video_response(response, video_url) output_path.parent.mkdir(parents=True, exist_ok=True) - output_path.write_bytes(response.content) + temp_path = output_path.with_suffix(f"{output_path.suffix}.part") + + if not hasattr(response, "iter_content"): + validate_video_response(response, video_url, min_video_bytes=min_video_bytes) + output_path.write_bytes(response.content) + return + + total_bytes = 0 + prefix = b"" + try: + with temp_path.open("wb") as file: + for chunk in response.iter_content(chunk_size=1024 * 1024): + if not chunk: + continue + total_bytes += len(chunk) + if len(prefix) < 4096: + prefix += chunk[: 4096 - len(prefix)] + file.write(chunk) + validate_downloaded_video_bytes( + content_type=str(getattr(response, "headers", {}).get("content-type", "")).lower(), + prefix=prefix, + total_bytes=total_bytes, + video_url=video_url, + min_video_bytes=min_video_bytes, + ) + temp_path.replace(output_path) + except Exception: + if temp_path.exists(): + temp_path.unlink() + raise -def validate_video_response(response: Any, video_url: str) -> None: +def validate_video_response(response: Any, video_url: str, min_video_bytes: int = DEFAULT_MIN_VIDEO_BYTES) -> None: content = getattr(response, "content", b"") content_type = str(getattr(response, "headers", {}).get("content-type", "")).lower() @@ -674,6 +704,33 @@ def validate_video_response(response: Any, video_url: str) -> None: has_mp4_signature = len(content) >= 12 and content[4:8] == b"ftyp" has_webm_signature = content.startswith(b"\x1a\x45\xdf\xa3") if has_video_type or has_mp4_signature or has_webm_signature: + if min_video_bytes > 0 and len(content) < min_video_bytes: + raise ValueError(f"视频响应过小: {len(content)} bytes < {min_video_bytes} bytes {video_url}") + return + + raise ValueError(f"非视频响应: {content_type or 'unknown'} {video_url}") + + +def validate_downloaded_video_bytes( + content_type: str, + prefix: bytes, + total_bytes: int, + video_url: str, + min_video_bytes: int = DEFAULT_MIN_VIDEO_BYTES, +) -> None: + if content_type.startswith("image/"): + raise ValueError(f"非视频响应: {content_type} {video_url}") + if prefix.startswith(b"RIFF") and b"WEBP" in prefix[:16]: + raise ValueError(f"非视频响应: image/webp {video_url}") + if prefix.lstrip().startswith((b"= 12 and prefix[4:8] == b"ftyp" + has_webm_signature = prefix.startswith(b"\x1a\x45\xdf\xa3") + if has_video_type or has_mp4_signature or has_webm_signature: + if min_video_bytes > 0 and total_bytes < min_video_bytes: + raise ValueError(f"视频响应过小: {total_bytes} bytes < {min_video_bytes} bytes {video_url}") return raise ValueError(f"非视频响应: {content_type or 'unknown'} {video_url}") @@ -1226,6 +1283,66 @@ def build_source_url(source: str, keyword: str | None = None) -> str: raise ValueError(f"不支持的来源: {source}") +def directory_size_bytes(path: Path) -> int: + if not path.exists(): + return 0 + if path.is_file(): + return path.stat().st_size + return sum(item.stat().st_size for item in path.rglob("*") if item.is_file()) + + +def load_metadata_records(metadata_file: Path) -> list[dict[str, Any]]: + if not metadata_file.exists(): + return [] + records: list[dict[str, Any]] = [] + for line in metadata_file.read_text(encoding="utf-8").splitlines(): + if not line.strip(): + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + if isinstance(record, dict): + records.append(record) + return records + + +def build_run_report( + source: str, + target_videos: int, + queue_file: Path, + output_dir: Path, + metadata_file: Path, + records: list[QueueRecord], + downloaded_this_run: int, + started_at: float, + finished_at: float, +) -> dict[str, Any]: + metadata_records = load_metadata_records(metadata_file) + comment_counts = [len(record.get("comments") or []) for record in metadata_records] + return { + "source": source, + "target_videos": target_videos, + "queue_file": queue_file.as_posix(), + "output_dir": output_dir.as_posix(), + "metadata_file": metadata_file.as_posix(), + "downloaded_this_run": downloaded_this_run, + "queue_status": count_queue_status(records), + "metadata_rows": len(metadata_records), + "metadata_with_comments": sum(1 for count in comment_counts if count > 0), + "total_comments": sum(comment_counts), + "video_files": len(list(output_dir.glob("*.mp4"))) if output_dir.exists() else 0, + "output_dir_bytes": directory_size_bytes(output_dir), + "elapsed_seconds": round(max(0.0, finished_at - started_at), 3), + "generated_at": current_timestamp(), + } + + +def write_run_report(report_file: Path, report: dict[str, Any]) -> None: + report_file.parent.mkdir(parents=True, exist_ok=True) + report_file.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8") + + def run_queue_download( source: str, target_videos: int, @@ -1237,6 +1354,9 @@ def run_queue_download( timeout = int(kwargs.get("timeout", 20)) output_dir = Path(kwargs.get("output_dir", DEFAULT_OUTPUT_DIR)) metadata_file = Path(kwargs.get("metadata_file") or output_dir / "metadata.jsonl") + report_file_value = kwargs.get("report_file") + report_file = Path(report_file_value) if report_file_value else output_dir / "run_report.json" + min_video_bytes = int(kwargs.get("min_video_bytes", DEFAULT_MIN_VIDEO_BYTES)) max_comments = int(kwargs.get("max_comments", 20)) browser_port = kwargs.get("browser_port", DEFAULT_BROWSER_PORT) human_settings = HumanBrowseSettings( @@ -1356,7 +1476,10 @@ def run_queue_download( headers=build_headers(record.url), video_url=candidate.video_url, output_path=output_path, + min_video_bytes=min_video_bytes, ) + file_size_bytes = output_path.stat().st_size + metadata["file_size_bytes"] = file_size_bytes append_jsonl_record( metadata_file, build_download_metadata_record( @@ -1388,6 +1511,19 @@ def run_queue_download( f"skipped={counts.get('skipped_image', 0)} " f"failed={counts.get('failed', 0)}" ) + report = build_run_report( + source=source, + target_videos=target_videos, + queue_file=queue_file, + output_dir=output_dir, + metadata_file=metadata_file, + records=records, + downloaded_this_run=downloaded_this_run, + started_at=started_at, + finished_at=time.monotonic(), + ) + write_run_report(report_file, report) + print(f"[INFO] 运行报告已保存: {report_file}") return downloaded_this_run @@ -1411,6 +1547,8 @@ def build_parser() -> argparse.ArgumentParser: parser.add_argument("--metadata-file", default=None, help="下载成功后追加写入的元数据 JSONL 路径,默认 output-dir/metadata.jsonl") parser.add_argument("--max-comments", type=int, default=20, help="随元数据保存的可见热门评论数量,默认 20") parser.add_argument("--retry-limit", type=int, default=1, help="队列项下载失败重试次数,默认 1") + parser.add_argument("--min-video-bytes", type=int, default=DEFAULT_MIN_VIDEO_BYTES, help="视频响应最小字节数,默认 204800") + parser.add_argument("--report-file", default=None, help="队列任务运行报告 JSON 路径,默认 output-dir/run_report.json") return parser @@ -1423,6 +1561,8 @@ def main(argv: list[str] | None = None) -> int: parser.error("--browser-port 必须大于 0") if args.min_wait < 0 or args.max_wait < args.min_wait: parser.error("--min-wait 和 --max-wait 必须满足 0 <= min <= max") + if args.min_video_bytes < 0: + parser.error("--min-video-bytes 必须大于等于 0") if args.queue_file or args.target_videos > 0: target_videos = args.target_videos if args.target_videos > 0 else args.max_videos downloaded = run_queue_download( @@ -1435,6 +1575,8 @@ def main(argv: list[str] | None = None) -> int: output_dir=Path(args.output_dir), metadata_file=Path(args.metadata_file) if args.metadata_file else None, max_comments=args.max_comments, + min_video_bytes=args.min_video_bytes, + report_file=Path(args.report_file) if args.report_file else None, browser_port=args.browser_port, human_mode=args.human_mode, min_wait=args.min_wait, diff --git a/test_xhs.py b/test_xhs.py index 1d657df..3e76ee7 100644 --- a/test_xhs.py +++ b/test_xhs.py @@ -25,11 +25,39 @@ class FakeDownloadResponse: class FakeRequests: def __init__(self, response: FakeDownloadResponse): self.response = response + self.calls = [] - def get(self, video_url, headers, timeout): + def get(self, video_url, headers, timeout, **kwargs): + self.calls.append( + { + "video_url": video_url, + "headers": headers, + "timeout": timeout, + **kwargs, + } + ) return self.response +class FakeStreamingResponse: + def __init__(self, chunks: list[bytes], content_type: str = "video/mp4", status_code: int = 200): + self.chunks = chunks + self.headers = {"content-type": content_type} + self.status_code = status_code + + @property + def content(self) -> bytes: + raise AssertionError("streaming download should not read full response.content") + + def raise_for_status(self) -> None: + if self.status_code >= 400: + raise RuntimeError(f"HTTP {self.status_code}") + + def iter_content(self, chunk_size: int): + for chunk in self.chunks: + yield chunk + + class FakeScrollPage: def __init__(self): self.scripts = [] @@ -430,6 +458,8 @@ class XhsModuleTests(unittest.TestCase): self.assertIsNone(args.queue_file) self.assertEqual(args.target_videos, 0) self.assertEqual(args.retry_limit, 1) + self.assertEqual(args.min_video_bytes, 200 * 1024) + self.assertIsNone(args.report_file) def test_main_invokes_collect_videos_with_cli_values(self) -> None: module = importlib.import_module("XHS") @@ -499,6 +529,26 @@ class XhsModuleTests(unittest.TestCase): self.assertEqual(kwargs["retry_limit"], 2) self.assertEqual(kwargs["keyword"], "猫咪") + def test_main_passes_queue_report_and_min_video_bytes_options(self) -> None: + module = importlib.import_module("XHS") + with mock.patch.object(module, "run_queue_download", return_value=5) as mocked_run: + exit_code = module.main( + [ + "--target-videos", + "5", + "--queue-file", + "data/q.jsonl", + "--min-video-bytes", + "4096", + "--report-file", + "data/report.json", + ] + ) + self.assertEqual(exit_code, 0) + _, kwargs = mocked_run.call_args + self.assertEqual(kwargs["min_video_bytes"], 4096) + self.assertEqual(kwargs["report_file"].as_posix(), "data/report.json") + def test_download_video_rejects_webp_response_before_writing_file(self) -> None: module = importlib.import_module("XHS") response = FakeDownloadResponse(b"RIFF....WEBP", content_type="image/webp") @@ -512,17 +562,46 @@ class XhsModuleTests(unittest.TestCase): def test_download_video_accepts_mp4_signature(self) -> None: module = importlib.import_module("XHS") - output_path = mock.MagicMock() - output_path.parent.mkdir = mock.MagicMock() - output_path.write_bytes = mock.MagicMock() response = FakeDownloadResponse(b"\x00\x00\x00\x18ftypmp42payload", content_type="application/octet-stream") - module.download_video( - requests_module=FakeRequests(response), - headers={}, - video_url="https://sns-video.xhscdn.com/example.mp4", - output_path=output_path, - ) - output_path.write_bytes.assert_called_once_with(b"\x00\x00\x00\x18ftypmp42payload") + with tempfile.TemporaryDirectory() as temp_dir: + output_path = Path(temp_dir) / "example.mp4" + module.download_video( + requests_module=FakeRequests(response), + headers={}, + video_url="https://sns-video.xhscdn.com/example.mp4", + output_path=output_path, + min_video_bytes=0, + ) + self.assertEqual(output_path.read_bytes(), b"\x00\x00\x00\x18ftypmp42payload") + + def test_download_video_streams_chunks_without_loading_full_content(self) -> None: + module = importlib.import_module("XHS") + chunks = [b"\x00\x00\x00\x18ftypmp42", b"payload", b"more"] + requests = FakeRequests(FakeStreamingResponse(chunks, content_type="application/octet-stream")) + with tempfile.TemporaryDirectory() as temp_dir: + output_path = Path(temp_dir) / "streamed.mp4" + + module.download_video( + requests_module=requests, + headers={"referer": "note"}, + video_url="https://sns-video.xhscdn.com/example.mp4", + output_path=output_path, + min_video_bytes=0, + ) + + self.assertEqual(output_path.read_bytes(), b"".join(chunks)) + self.assertTrue(requests.calls[0]["stream"]) + + def test_validate_video_response_rejects_tiny_video_payload(self) -> None: + module = importlib.import_module("XHS") + response = FakeDownloadResponse(b"\x00\x00\x00\x18ftypmp42payload", content_type="application/octet-stream") + + with self.assertRaisesRegex(ValueError, "视频响应过小"): + module.validate_video_response( + response, + "https://sns-video.xhscdn.com/example.mp4", + min_video_bytes=1024, + ) def test_normalize_note_urls_deduplicates_explore_links(self) -> None: module = importlib.import_module("XHS") @@ -757,6 +836,53 @@ class XhsModuleTests(unittest.TestCase): self.assertEqual(failed.status, "failed") self.assertEqual(failed.attempts, 2) + def test_build_run_report_summarizes_queue_metadata_and_files(self) -> None: + module = importlib.import_module("XHS") + with tempfile.TemporaryDirectory() as temp_dir: + output_dir = Path(temp_dir) / "video" + output_dir.mkdir() + video_path = output_dir / "a.mp4" + video_path.write_bytes(b"\x00\x00\x00\x18ftypmp42payload") + metadata_file = output_dir / "metadata.jsonl" + metadata_file.write_text( + "\n".join( + [ + '{"note_id":"one","comments":[{"content":"a"},{"content":"b"}],"file_size_bytes":20}', + '{"note_id":"two","comments":[],"file_size_bytes":30}', + ] + ) + + "\n", + encoding="utf-8", + ) + records = [ + module.QueueRecord("one", "url1", "source", status="downloaded", downloaded_path=video_path.as_posix()), + module.QueueRecord("two", "url2", "source", status="failed", last_error="download_timeout"), + ] + + report = module.build_run_report( + source="video-channel", + target_videos=10, + queue_file=Path(temp_dir) / "queue.jsonl", + output_dir=output_dir, + metadata_file=metadata_file, + records=records, + downloaded_this_run=1, + started_at=100.0, + finished_at=130.0, + ) + + self.assertEqual(report["source"], "video-channel") + self.assertEqual(report["target_videos"], 10) + self.assertEqual(report["downloaded_this_run"], 1) + self.assertEqual(report["queue_status"]["downloaded"], 1) + self.assertEqual(report["queue_status"]["failed"], 1) + self.assertEqual(report["metadata_rows"], 2) + self.assertEqual(report["metadata_with_comments"], 1) + self.assertEqual(report["total_comments"], 2) + self.assertEqual(report["video_files"], 1) + self.assertGreater(report["output_dir_bytes"], 0) + self.assertEqual(report["elapsed_seconds"], 30.0) + if __name__ == "__main__": unittest.main()