From 3a2a6a69e0098c4858ae7d02147a1947b67a9d5c Mon Sep 17 00:00:00 2001 From: wangshaoqing Date: Wed, 27 May 2026 14:06:47 +0800 Subject: [PATCH] Add XHS browser feed downloader --- .gitignore | 5 + README.md | 84 ++++++++-- XHS.py | 397 ++++++++++++++++++++++++++++++++++++++++++++++ login_xhs.py | 121 ++++++++++++++ test_login_xhs.py | 95 +++++++++++ test_xhs.py | 178 +++++++++++++++++++++ 6 files changed, 865 insertions(+), 15 deletions(-) create mode 100644 .gitignore create mode 100644 XHS.py create mode 100644 login_xhs.py create mode 100644 test_login_xhs.py create mode 100644 test_xhs.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ad09369 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.venv/ +__pycache__/ +*.pyc +.xhs-chrome-profile/ +video/ diff --git a/README.md b/README.md index 2b3b861..4f861d7 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,14 @@ 用于探索和研究小红书视频公开内容抓取流程的项目。 +## 当前能力 + +第一版采用和抖音参考项目类似的两步式工作流: + +1. `login_xhs.py` 启动一个可见 Chrome,并打开小红书发现页。 +2. 你在 Chrome 中手动登录和处理验证码。 +3. `XHS.py` 附着到这个 Chrome,监听页面已经收到的 `feed` 响应,提取其中的 mp4 视频地址并下载。 + ## 项目目的 本项目用于学习和验证视频信息采集相关技术,包括请求分析、页面解析、数据结构整理、下载流程设计和后续自动化处理。 @@ -15,7 +23,67 @@ - 不绕过平台访问控制、验证码、登录风控或反爬限制。 - 不将本项目用于批量侵权下载、商业化搬运或其他违规用途。 -## 计划功能 +## 安装环境 + +```bash +cd /Users/wangshaoqing/Desktop/MiaoSi/Study/xhs_video_crawler +python3 -m venv .venv +source .venv/bin/activate +pip install requests DrissionPage +``` + +## 使用方法 + +### 步骤 1:启动 Chrome 并手动登录 + +```bash +./.venv/bin/python login_xhs.py +``` + +脚本会打开 `https://www.xiaohongshu.com/explore`。请在打开的浏览器里完成登录;如果出现验证码,也需要手动处理。 + +### 步骤 2:下载发现页视频 + +登录完成后,保持 Chrome 不要关闭,执行: + +```bash +./.venv/bin/python XHS.py --max-videos 10 +``` + +常用参数: + +```bash +# 指定下载数量 +./.venv/bin/python XHS.py --max-videos 20 + +# 指定保存目录 +./.venv/bin/python XHS.py --max-videos 10 --output-dir video + +# 如果启动 Chrome 时换了端口,下载脚本也要使用同一个端口 +./.venv/bin/python login_xhs.py --browser-port 9334 +./.venv/bin/python XHS.py --browser-port 9334 --max-videos 10 +``` + +下载文件默认保存到 `video/` 目录,文件名格式大致为: + +```text +[作者]标题-note_id.mp4 +``` + +## 工作原理 + +- 浏览器负责加载小红书页面和保留登录态。 +- 脚本只监听浏览器里已经产生的网络响应。 +- 解析器会递归查找响应 JSON 中的 `master_url`、`backup_urls` 等视频地址字段。 +- 下载过程会去重,并在单个视频失败时继续处理后续视频。 + +## 测试 + +```bash +python3 -m unittest test_xhs.py test_login_xhs.py -v +``` + +## 后续计划 - 分析公开视频页面的数据结构。 - 提取视频标题、作者、发布时间、封面和视频地址等元数据。 @@ -23,20 +91,6 @@ - 保存抓取结果到本地文件或结构化数据表。 - 为后续下载、去重和任务队列处理预留接口。 -## 项目状态 - -当前处于初始化阶段,README 先作为项目说明和开发规划入口。 - -## 开发建议 - -后续可以按以下方向逐步推进: - -1. 初始化运行环境和依赖管理。 -2. 增加基础配置文件,例如 `.gitignore`、依赖清单和示例配置。 -3. 实现单个公开视频链接的数据解析。 -4. 增加错误处理、日志和请求频率控制。 -5. 编写基础测试,确保解析逻辑稳定。 - ## 免责声明 本项目仅用于技术学习与研究。使用者应自行承担使用本项目产生的全部责任。 diff --git a/XHS.py b/XHS.py new file mode 100644 index 0000000..39012c9 --- /dev/null +++ b/XHS.py @@ -0,0 +1,397 @@ +from __future__ import annotations + +import argparse +import json +import re +import socket +import sys +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +DEFAULT_EXPLORE_URL = "https://www.xiaohongshu.com/explore" +DEFAULT_BROWSER_PORT = 9224 +DEFAULT_OUTPUT_DIR = Path("video") +LISTEN_TARGET = "feed" +MAX_FILENAME_BYTES = 240 +INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]') +VIDEO_URL_KEYS = {"master_url", "backup_url", "backup_urls", "url"} +TITLE_KEYS = ("display_title", "title", "desc", "description") +ID_KEYS = ("id", "note_id", "noteId", "video_id", "file_id") +AUTHOR_KEYS = ("nickname", "name", "user_name", "userName") + + +@dataclass(frozen=True) +class VideoCandidate: + video_id: str + title: str + video_url: str + author_name: str + source_key: str + + +def sanitize_filename(value: str, fallback: str = "untitled") -> str: + cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._") + return cleaned or fallback + + +def truncate_utf8_bytes(value: str, max_bytes: int) -> str: + if len(value.encode("utf-8")) <= max_bytes: + return value + + result = "" + used = 0 + for character in value: + character_bytes = len(character.encode("utf-8")) + if used + character_bytes > max_bytes: + break + result += character + used += character_bytes + return result.rstrip(" ._") + + +def looks_like_video_url(value: str) -> bool: + normalized = value.strip() + return normalized.startswith(("http://", "https://")) and ( + ".mp4" in normalized or "sns-video" in normalized or "xhscdn.com" in normalized + ) + + +def first_string_by_keys(value: Any, keys: tuple[str, ...]) -> str | None: + if isinstance(value, dict): + for key in keys: + candidate = value.get(key) + if isinstance(candidate, str) and candidate.strip(): + return candidate.strip() + for child in value.values(): + found = first_string_by_keys(child, keys) + if found: + return found + elif isinstance(value, list): + for child in value: + found = first_string_by_keys(child, keys) + if found: + return found + return None + + +def find_nearest_note_context(path: tuple[Any, ...]) -> dict[str, str]: + video_id = "" + title = "" + author_name = "" + + for node in reversed(path): + if not isinstance(node, dict): + continue + if not video_id: + video_id = first_string_by_keys(node, ID_KEYS) or "" + if not title: + title = first_string_by_keys(node, TITLE_KEYS) or "" + if not author_name: + user = node.get("user") or node.get("author") + if isinstance(user, dict): + author_name = first_string_by_keys(user, AUTHOR_KEYS) or "" + + return { + "video_id": video_id or "unknown", + "title": title or "untitled", + "author_name": author_name or "unknown", + } + + +def append_candidate( + candidates: list[VideoCandidate], + url: str, + source_key: str, + path: tuple[Any, ...], +) -> None: + if not looks_like_video_url(url): + return + context = find_nearest_note_context(path) + candidates.append( + VideoCandidate( + video_id=context["video_id"], + title=context["title"], + video_url=url.strip(), + author_name=context["author_name"], + source_key=source_key, + ) + ) + + +def walk_for_video_candidates(value: Any, path: tuple[Any, ...], candidates: list[VideoCandidate]) -> None: + if isinstance(value, dict): + current_path = (*path, value) + for key, child in value.items(): + if key in VIDEO_URL_KEYS: + if isinstance(child, str): + append_candidate(candidates, child, key, current_path) + elif isinstance(child, list): + for item in child: + if isinstance(item, str): + append_candidate(candidates, item, key, current_path) + walk_for_video_candidates(child, current_path, candidates) + elif isinstance(value, list): + for child in value: + walk_for_video_candidates(child, path, candidates) + + +def extract_video_candidates(payload: Any) -> list[VideoCandidate]: + candidates: list[VideoCandidate] = [] + walk_for_video_candidates(payload, (), candidates) + return candidates + + +def choose_video_candidate(candidates: list[VideoCandidate]) -> VideoCandidate: + if not candidates: + raise ValueError("没有可用的视频候选地址。") + + source_priority = {"master_url": 0, "backup_url": 1, "backup_urls": 2, "url": 3} + return sorted(candidates, key=lambda item: source_priority.get(item.source_key, 99))[0] + + +def group_video_candidates(candidates: list[VideoCandidate]) -> list[VideoCandidate]: + grouped: dict[str, list[VideoCandidate]] = {} + order: list[str] = [] + for candidate in candidates: + key = candidate.video_id or candidate.video_url + if key not in grouped: + grouped[key] = [] + order.append(key) + grouped[key].append(candidate) + return [choose_video_candidate(grouped[key]) for key in order] + + +def build_output_path(candidate: VideoCandidate, output_dir: Path = DEFAULT_OUTPUT_DIR) -> Path: + safe_author = sanitize_filename(candidate.author_name, fallback="unknown") + safe_title = sanitize_filename(candidate.title, fallback="untitled") + safe_video_id = sanitize_filename(candidate.video_id, fallback="unknown") + prefix = f"[{safe_author}]" + suffix = f"-{safe_video_id}.mp4" + title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8")) + if title_budget < 1: + prefix_budget = MAX_FILENAME_BYTES - len(suffix.encode("utf-8")) - 1 + prefix = truncate_utf8_bytes(prefix, max(1, prefix_budget)) + title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8")) + filename = f"{prefix}{truncate_utf8_bytes(safe_title, max(1, title_budget))}{suffix}" + return output_dir / filename + + +def build_browser_address(browser_port: int | None) -> str | None: + if browser_port is None: + return None + return f"127.0.0.1:{browser_port}" + + +def ensure_browser_debug_port_ready(browser_port: int) -> None: + try: + with socket.create_connection(("127.0.0.1", browser_port), timeout=2): + return + except OSError as exc: + raise RuntimeError( + "无法连接到已启动的 Chrome 调试端口。" + f"请先运行 `./.venv/bin/python login_xhs.py --browser-port {browser_port}`," + "并确认 Chrome 仍在运行且端口一致。" + ) from exc + + +def build_headers(referer: str) -> dict[str, str]: + return { + "referer": referer, + "user-agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/135.0.0.0 Safari/537.36" + ), + } + + +def import_runtime_dependencies() -> tuple[Any, Any, Any]: + try: + import requests + except ModuleNotFoundError as exc: + raise SystemExit("缺少 requests,请先执行: python3 -m pip install requests") from exc + + try: + from DrissionPage import ChromiumOptions + from DrissionPage import ChromiumPage + except ModuleNotFoundError as exc: + raise SystemExit("缺少 DrissionPage,请先执行: python3 -m pip install DrissionPage") from exc + + return requests, ChromiumPage, ChromiumOptions + + +def create_page(chromium_page_cls: Any, chromium_options_cls: Any, browser_port: int | None) -> Any: + browser_address = build_browser_address(browser_port) + if browser_address is None: + return chromium_page_cls() + + options = chromium_options_cls().set_address(browser_address).existing_only(True) + return chromium_page_cls(options) + + +def extract_feed_payload(response: Any) -> dict[str, Any]: + body = getattr(response, "body", None) + if isinstance(body, dict): + return body + + raw_body = getattr(response, "raw_body", None) + if isinstance(raw_body, str) and raw_body.strip(): + payload = json.loads(raw_body) + if isinstance(payload, dict): + return payload + + raise ValueError("响应体不是可解析的 JSON 字典。") + + +def download_video( + requests_module: Any, + headers: dict[str, str], + video_url: str, + output_path: Path, +) -> None: + response = requests_module.get(video_url, headers=headers, timeout=60) + response.raise_for_status() + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(response.content) + + +def wait_for_feed_packet(page: Any, timeout: int) -> Any | None: + try: + packet = page.listen.wait(timeout=timeout) + return packet if packet else None + except Exception as exc: + print(f"[WARN] 等待 feed 数据超时或失败: {exc}") + return None + + +def scroll_feed(page: Any, distance: int = 900) -> None: + script = f""" +const distance = {distance}; +const candidates = Array.from(document.querySelectorAll('*')) + .filter((el) => {{ + const rect = el.getBoundingClientRect(); + return rect.width > 300 + && rect.height > 200 + && el.scrollHeight > el.clientHeight + 20; + }}) + .sort((a, b) => {{ + const areaA = a.getBoundingClientRect().width * a.getBoundingClientRect().height; + const areaB = b.getBoundingClientRect().width * b.getBoundingClientRect().height; + return areaB - areaA; + }}); +const target = candidates[0] || document.scrollingElement || document.documentElement; +target.scrollBy(0, distance); +""" + page.run_js(script) + time.sleep(2) + + +def collect_videos( + max_videos: int, + timeout: int, + output_dir: Path, + browser_port: int | None, + start_url: str = DEFAULT_EXPLORE_URL, +) -> int: + requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() + if browser_port is not None: + ensure_browser_debug_port_ready(browser_port) + page = create_page(chromium_page_cls, chromium_options_cls, browser_port) + page.listen.start(LISTEN_TARGET) + + print("[INFO] 正在打开小红书发现页。若出现登录或验证码,请先在浏览器窗口里完成。") + page.get(start_url) + time.sleep(3) + + downloaded = 0 + seen_urls: set[str] = set() + seen_files: set[Path] = set() + consecutive_empty = 0 + max_consecutive_empty = 6 + + while downloaded < max_videos and consecutive_empty < max_consecutive_empty: + packet = wait_for_feed_packet(page, timeout=timeout) + if packet is None: + consecutive_empty += 1 + scroll_feed(page) + continue + + try: + payload = extract_feed_payload(packet.response) + candidates = group_video_candidates(extract_video_candidates(payload)) + except Exception as exc: + print(f"[WARN] 解析 feed 数据失败: {exc}") + consecutive_empty += 1 + scroll_feed(page) + continue + + fresh_candidates = [candidate for candidate in candidates if candidate.video_url not in seen_urls] + if not fresh_candidates: + consecutive_empty += 1 + scroll_feed(page) + continue + + consecutive_empty = 0 + for candidate in fresh_candidates: + if downloaded >= max_videos: + break + seen_urls.add(candidate.video_url) + output_path = build_output_path(candidate, output_dir=output_dir) + if output_path in seen_files or output_path.exists(): + continue + headers = build_headers(getattr(page, "url", start_url) or start_url) + try: + download_video( + requests_module=requests_module, + headers=headers, + video_url=candidate.video_url, + output_path=output_path, + ) + except Exception as exc: + print(f"[WARN] 下载失败 {candidate.video_id}: {exc}") + continue + + downloaded += 1 + seen_files.add(output_path) + print(f"[OK] 已保存: {output_path}") + + if downloaded < max_videos: + scroll_feed(page) + + if downloaded == 0: + print("[WARN] 没有下载到视频。请确认已登录小红书、页面已加载 feed,并在浏览器中滚动后重试。") + return downloaded + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="附着到已登录小红书 Chrome,监听 feed 响应并下载视频") + parser.add_argument("--max-videos", type=int, default=10, help="最多下载视频数量,默认 10") + parser.add_argument("--output-dir", default=str(DEFAULT_OUTPUT_DIR), help="视频保存目录,默认 video") + parser.add_argument("--browser-port", type=int, default=DEFAULT_BROWSER_PORT, help="Chrome 调试端口,默认 9224") + parser.add_argument("--timeout", type=int, default=20, help="等待单次 feed 响应的秒数,默认 20") + parser.add_argument("--start-url", default=DEFAULT_EXPLORE_URL, help="打开或刷新使用的小红书页面") + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + if args.max_videos <= 0: + parser.error("--max-videos 必须大于 0") + if args.browser_port <= 0: + parser.error("--browser-port 必须大于 0") + downloaded = collect_videos( + max_videos=args.max_videos, + timeout=args.timeout, + output_dir=Path(args.output_dir), + browser_port=args.browser_port, + start_url=args.start_url, + ) + print(f"[INFO] 本次共下载 {downloaded} 个视频。") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/login_xhs.py b/login_xhs.py new file mode 100644 index 0000000..df0e077 --- /dev/null +++ b/login_xhs.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +import argparse +import socket +import subprocess +import sys +import time +from pathlib import Path + +DEFAULT_START_URL = "https://www.xiaohongshu.com/explore" +DEFAULT_CHROME_PATH = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" +DEFAULT_BROWSER_PORT = 9224 +DEFAULT_PROFILE_DIR = Path(".xhs-chrome-profile") + + +def derive_chrome_app_path(chrome_path: str) -> str: + marker = ".app/" + if marker not in chrome_path: + return chrome_path + prefix, _ = chrome_path.split(marker, 1) + return f"{prefix}.app" + + +def build_login_command( + chrome_path: str, + profile_dir: Path, + browser_port: int, + start_url: str, +) -> list[str]: + return [ + "open", + "-na", + derive_chrome_app_path(chrome_path), + "--args", + f"--user-data-dir={profile_dir}", + f"--remote-debugging-port={browser_port}", + start_url, + ] + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="启动可见 Chrome,供小红书手动登录后附着抓取") + parser.add_argument("--chrome-path", default=DEFAULT_CHROME_PATH, help="Chrome 可执行文件路径") + parser.add_argument( + "--profile-dir", + default=str(DEFAULT_PROFILE_DIR), + help="Chrome 用户数据目录,默认复用项目内固定目录", + ) + parser.add_argument( + "--browser-port", + type=int, + default=DEFAULT_BROWSER_PORT, + help="Chrome 调试端口,默认 9224", + ) + parser.add_argument("--start-url", default=DEFAULT_START_URL, help="启动后打开的小红书页面 URL") + return parser + + +def launch_browser(command: list[str]) -> subprocess.Popen[str]: + return subprocess.Popen(command) + + +def wait_for_browser_debug_port( + browser_port: int, + timeout_seconds: float = 15.0, + interval_seconds: float = 0.25, +) -> None: + deadline = time.monotonic() + timeout_seconds + while time.monotonic() < deadline: + try: + with socket.create_connection(("127.0.0.1", browser_port), timeout=1): + return + except OSError: + time.sleep(interval_seconds) + + raise RuntimeError(f"Chrome 已启动命令,但调试端口 {browser_port} 在限定时间内未就绪。") + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + + if args.browser_port <= 0: + parser.error("--browser-port 必须大于 0") + + chrome_path = Path(args.chrome_path) + if not chrome_path.exists(): + print(f"[ERROR] Chrome 可执行文件不存在: {chrome_path}") + return 1 + + profile_dir = Path(args.profile_dir).resolve() + profile_dir.mkdir(parents=True, exist_ok=True) + command = build_login_command( + chrome_path=str(chrome_path), + profile_dir=profile_dir, + browser_port=args.browser_port, + start_url=args.start_url, + ) + + try: + launch_browser(command) + except OSError as exc: + print(f"[ERROR] 启动 Chrome 失败: {exc}") + return 1 + + try: + wait_for_browser_debug_port(args.browser_port) + except RuntimeError as exc: + print(f"[ERROR] {exc}") + return 1 + + print("[INFO] Chrome 已启动。请在打开的浏览器中完成小红书登录和验证码。") + next_command = "./.venv/bin/python XHS.py" + if args.browser_port != DEFAULT_BROWSER_PORT: + next_command = f"{next_command} --browser-port {args.browser_port}" + print(f"[INFO] 登录完成后执行: {next_command}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/test_login_xhs.py b/test_login_xhs.py new file mode 100644 index 0000000..bb191b1 --- /dev/null +++ b/test_login_xhs.py @@ -0,0 +1,95 @@ +import importlib +import io +import tempfile +import unittest +from contextlib import redirect_stdout +from pathlib import Path +from unittest import mock + + +class LoginXhsModuleTests(unittest.TestCase): + def test_build_login_command_uses_expected_chrome_arguments(self) -> None: + module = importlib.import_module("login_xhs") + command = module.build_login_command( + chrome_path="/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", + profile_dir=Path("/tmp/xhs-profile"), + browser_port=9224, + start_url="https://www.xiaohongshu.com/explore", + ) + self.assertEqual( + command, + [ + "open", + "-na", + "/Applications/Google Chrome.app", + "--args", + "--user-data-dir=/tmp/xhs-profile", + "--remote-debugging-port=9224", + "https://www.xiaohongshu.com/explore", + ], + ) + + def test_build_parser_uses_expected_defaults(self) -> None: + module = importlib.import_module("login_xhs") + args = module.build_parser().parse_args([]) + self.assertEqual(args.browser_port, 9224) + self.assertEqual(args.chrome_path, module.DEFAULT_CHROME_PATH) + self.assertEqual(args.start_url, module.DEFAULT_START_URL) + + def test_main_creates_profile_dir_and_prints_next_step(self) -> None: + module = importlib.import_module("login_xhs") + with tempfile.TemporaryDirectory() as temp_dir: + profile_dir = Path(temp_dir) / "profile" + stdout = io.StringIO() + with redirect_stdout(stdout): + with mock.patch.object(module, "launch_browser") as mocked_launch: + with mock.patch.object(module, "wait_for_browser_debug_port") as mocked_wait: + exit_code = module.main( + [ + "--chrome-path", + "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", + "--profile-dir", + str(profile_dir), + "--browser-port", + "9334", + ] + ) + self.assertEqual(exit_code, 0) + self.assertTrue(profile_dir.exists()) + mocked_launch.assert_called_once() + mocked_wait.assert_called_once_with(9334) + self.assertIn("9334", stdout.getvalue()) + self.assertIn("./.venv/bin/python XHS.py --browser-port 9334", stdout.getvalue()) + + def test_main_uses_zero_argument_next_step_for_default_browser_port(self) -> None: + module = importlib.import_module("login_xhs") + with tempfile.TemporaryDirectory() as temp_dir: + profile_dir = Path(temp_dir) / "profile" + stdout = io.StringIO() + with redirect_stdout(stdout): + with mock.patch.object(module, "launch_browser"): + with mock.patch.object(module, "wait_for_browser_debug_port"): + exit_code = module.main( + [ + "--chrome-path", + "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", + "--profile-dir", + str(profile_dir), + ] + ) + self.assertEqual(exit_code, 0) + self.assertIn("./.venv/bin/python XHS.py", stdout.getvalue()) + self.assertNotIn("--browser-port 9224", stdout.getvalue()) + + def test_main_returns_error_when_chrome_path_missing(self) -> None: + module = importlib.import_module("login_xhs") + stdout = io.StringIO() + with redirect_stdout(stdout): + exit_code = module.main(["--chrome-path", "/tmp/does-not-exist-chrome"]) + self.assertEqual(exit_code, 1) + self.assertIn("Chrome", stdout.getvalue()) + self.assertIn("不存在", stdout.getvalue()) + + +if __name__ == "__main__": + unittest.main() diff --git a/test_xhs.py b/test_xhs.py new file mode 100644 index 0000000..1cf224d --- /dev/null +++ b/test_xhs.py @@ -0,0 +1,178 @@ +import importlib +import unittest +from unittest import mock + + +class FakeResponse: + def __init__(self, body, raw_body): + self.body = body + self.raw_body = raw_body + + +class XhsModuleTests(unittest.TestCase): + def test_module_can_import_without_optional_runtime_dependencies(self) -> None: + module = importlib.import_module("XHS") + self.assertIsNotNone(module) + + def test_sanitize_filename_removes_invalid_characters(self) -> None: + module = importlib.import_module("XHS") + self.assertEqual( + module.sanitize_filename('a/b:c*?d"eg|h\n'), + "a_b_c__d_e_f_g_h", + ) + + def test_truncate_utf8_bytes_keeps_valid_utf8(self) -> None: + module = importlib.import_module("XHS") + result = module.truncate_utf8_bytes("测试标题" * 20, 20) + self.assertLessEqual(len(result.encode("utf-8")), 20) + result.encode("utf-8") + + def test_choose_video_url_prefers_master_url(self) -> None: + module = importlib.import_module("XHS") + candidates = [ + module.VideoCandidate( + video_id="note-1", + title="标题", + video_url="https://example.com/backup.mp4", + author_name="作者", + source_key="backup_urls", + ), + module.VideoCandidate( + video_id="note-1", + title="标题", + video_url="https://sns-video.xhscdn.com/master.mp4", + author_name="作者", + source_key="master_url", + ), + ] + self.assertEqual(module.choose_video_candidate(candidates).video_url, "https://sns-video.xhscdn.com/master.mp4") + + def test_group_video_candidates_keeps_one_preferred_candidate_per_video_id(self) -> None: + module = importlib.import_module("XHS") + candidates = [ + module.VideoCandidate("note-1", "标题", "https://example.com/backup.mp4", "作者", "backup_urls"), + module.VideoCandidate("note-1", "标题", "https://example.com/master.mp4", "作者", "master_url"), + module.VideoCandidate("note-2", "标题2", "https://example.com/two.mp4", "作者", "master_url"), + ] + grouped = module.group_video_candidates(candidates) + self.assertEqual([item.video_id for item in grouped], ["note-1", "note-2"]) + self.assertEqual(grouped[0].video_url, "https://example.com/master.mp4") + + def test_extract_video_candidates_from_nested_feed_payload(self) -> None: + module = importlib.import_module("XHS") + payload = { + "data": { + "items": [ + { + "id": "note123", + "note_card": { + "display_title": "海边日落", + "user": {"nickname": "摄影师"}, + "video": { + "media": { + "stream": { + "h264": [ + { + "master_url": "https://sns-video.xhscdn.com/stream/a.mp4?sign=1", + "backup_urls": [ + "https://sns-video.xhscdn.com/stream/a-backup.mp4?sign=1" + ], + } + ] + } + } + }, + }, + } + ] + } + } + candidates = module.extract_video_candidates(payload) + self.assertEqual(len(candidates), 2) + self.assertEqual(candidates[0].video_id, "note123") + self.assertEqual(candidates[0].title, "海边日落") + self.assertEqual(candidates[0].author_name, "摄影师") + self.assertEqual(candidates[0].source_key, "master_url") + + def test_build_output_path_uses_author_title_and_video_id(self) -> None: + module = importlib.import_module("XHS") + candidate = module.VideoCandidate( + video_id="note123", + title="海边日落", + video_url="https://sns-video.xhscdn.com/a.mp4", + author_name="摄影师", + source_key="master_url", + ) + output_path = module.build_output_path(candidate) + self.assertEqual(output_path.as_posix(), "video/[摄影师]海边日落-note123.mp4") + + def test_build_browser_address_from_port(self) -> None: + module = importlib.import_module("XHS") + self.assertEqual(module.build_browser_address(9224), "127.0.0.1:9224") + self.assertIsNone(module.build_browser_address(None)) + + def test_ensure_browser_debug_port_ready_accepts_open_port(self) -> None: + module = importlib.import_module("XHS") + connection = mock.MagicMock() + connection.__enter__.return_value = connection + connection.__exit__.return_value = False + with mock.patch.object(module.socket, "create_connection", return_value=connection) as mocked_connect: + module.ensure_browser_debug_port_ready(9224) + mocked_connect.assert_called_once() + + def test_ensure_browser_debug_port_ready_rejects_closed_port(self) -> None: + module = importlib.import_module("XHS") + with mock.patch.object(module.socket, "create_connection", side_effect=OSError("boom")): + with self.assertRaisesRegex(RuntimeError, "login_xhs.py"): + module.ensure_browser_debug_port_ready(9224) + + def test_extract_feed_payload_uses_dict_body(self) -> None: + module = importlib.import_module("XHS") + response = FakeResponse({"data": {"items": []}}, "") + self.assertEqual(module.extract_feed_payload(response), {"data": {"items": []}}) + + def test_extract_feed_payload_falls_back_to_raw_json(self) -> None: + module = importlib.import_module("XHS") + response = FakeResponse("", '{"data": {"items": [{"id": "1"}]}}') + self.assertEqual( + module.extract_feed_payload(response), + {"data": {"items": [{"id": "1"}]}}, + ) + + def test_build_parser_uses_expected_defaults(self) -> None: + module = importlib.import_module("XHS") + args = module.build_parser().parse_args([]) + self.assertEqual(args.max_videos, 10) + self.assertEqual(args.output_dir, "video") + self.assertEqual(args.browser_port, 9224) + self.assertEqual(args.timeout, 20) + self.assertEqual(args.start_url, module.DEFAULT_EXPLORE_URL) + + def test_main_invokes_collect_videos_with_cli_values(self) -> None: + module = importlib.import_module("XHS") + with mock.patch.object(module, "collect_videos", return_value=3) as mocked_collect: + exit_code = module.main( + [ + "--max-videos", + "3", + "--output-dir", + "downloads", + "--browser-port", + "9334", + "--timeout", + "7", + "--start-url", + "https://www.xiaohongshu.com/explore", + ] + ) + self.assertEqual(exit_code, 0) + mocked_collect.assert_called_once() + _, kwargs = mocked_collect.call_args + self.assertEqual(kwargs["max_videos"], 3) + self.assertEqual(kwargs["output_dir"].as_posix(), "downloads") + self.assertEqual(kwargs["browser_port"], 9334) + self.assertEqual(kwargs["timeout"], 7) + + +if __name__ == "__main__": + unittest.main()