Add XHS browser feed downloader

This commit is contained in:
wangshaoqing 2026-05-27 14:06:47 +08:00
parent ec5d174bdc
commit 3a2a6a69e0
6 changed files with 865 additions and 15 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
.venv/
__pycache__/
*.pyc
.xhs-chrome-profile/
video/

View File

@ -2,6 +2,14 @@
用于探索和研究小红书视频公开内容抓取流程的项目。
## 当前能力
第一版采用和抖音参考项目类似的两步式工作流:
1. `login_xhs.py` 启动一个可见 Chrome并打开小红书发现页。
2. 你在 Chrome 中手动登录和处理验证码。
3. `XHS.py` 附着到这个 Chrome监听页面已经收到的 `feed` 响应,提取其中的 mp4 视频地址并下载。
## 项目目的
本项目用于学习和验证视频信息采集相关技术,包括请求分析、页面解析、数据结构整理、下载流程设计和后续自动化处理。
@ -15,7 +23,67 @@
- 不绕过平台访问控制、验证码、登录风控或反爬限制。
- 不将本项目用于批量侵权下载、商业化搬运或其他违规用途。
## 计划功能
## 安装环境
```bash
cd /Users/wangshaoqing/Desktop/MiaoSi/Study/xhs_video_crawler
python3 -m venv .venv
source .venv/bin/activate
pip install requests DrissionPage
```
## 使用方法
### 步骤 1启动 Chrome 并手动登录
```bash
./.venv/bin/python login_xhs.py
```
脚本会打开 `https://www.xiaohongshu.com/explore`。请在打开的浏览器里完成登录;如果出现验证码,也需要手动处理。
### 步骤 2下载发现页视频
登录完成后,保持 Chrome 不要关闭,执行:
```bash
./.venv/bin/python XHS.py --max-videos 10
```
常用参数:
```bash
# 指定下载数量
./.venv/bin/python XHS.py --max-videos 20
# 指定保存目录
./.venv/bin/python XHS.py --max-videos 10 --output-dir video
# 如果启动 Chrome 时换了端口,下载脚本也要使用同一个端口
./.venv/bin/python login_xhs.py --browser-port 9334
./.venv/bin/python XHS.py --browser-port 9334 --max-videos 10
```
下载文件默认保存到 `video/` 目录,文件名格式大致为:
```text
[作者]标题-note_id.mp4
```
## 工作原理
- 浏览器负责加载小红书页面和保留登录态。
- 脚本只监听浏览器里已经产生的网络响应。
- 解析器会递归查找响应 JSON 中的 `master_url``backup_urls` 等视频地址字段。
- 下载过程会去重,并在单个视频失败时继续处理后续视频。
## 测试
```bash
python3 -m unittest test_xhs.py test_login_xhs.py -v
```
## 后续计划
- 分析公开视频页面的数据结构。
- 提取视频标题、作者、发布时间、封面和视频地址等元数据。
@ -23,20 +91,6 @@
- 保存抓取结果到本地文件或结构化数据表。
- 为后续下载、去重和任务队列处理预留接口。
## 项目状态
当前处于初始化阶段README 先作为项目说明和开发规划入口。
## 开发建议
后续可以按以下方向逐步推进:
1. 初始化运行环境和依赖管理。
2. 增加基础配置文件,例如 `.gitignore`、依赖清单和示例配置。
3. 实现单个公开视频链接的数据解析。
4. 增加错误处理、日志和请求频率控制。
5. 编写基础测试,确保解析逻辑稳定。
## 免责声明
本项目仅用于技术学习与研究。使用者应自行承担使用本项目产生的全部责任。

397
XHS.py Normal file
View File

@ -0,0 +1,397 @@
from __future__ import annotations
import argparse
import json
import re
import socket
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
DEFAULT_EXPLORE_URL = "https://www.xiaohongshu.com/explore"
DEFAULT_BROWSER_PORT = 9224
DEFAULT_OUTPUT_DIR = Path("video")
LISTEN_TARGET = "feed"
MAX_FILENAME_BYTES = 240
INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]')
VIDEO_URL_KEYS = {"master_url", "backup_url", "backup_urls", "url"}
TITLE_KEYS = ("display_title", "title", "desc", "description")
ID_KEYS = ("id", "note_id", "noteId", "video_id", "file_id")
AUTHOR_KEYS = ("nickname", "name", "user_name", "userName")
@dataclass(frozen=True)
class VideoCandidate:
video_id: str
title: str
video_url: str
author_name: str
source_key: str
def sanitize_filename(value: str, fallback: str = "untitled") -> str:
cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._")
return cleaned or fallback
def truncate_utf8_bytes(value: str, max_bytes: int) -> str:
if len(value.encode("utf-8")) <= max_bytes:
return value
result = ""
used = 0
for character in value:
character_bytes = len(character.encode("utf-8"))
if used + character_bytes > max_bytes:
break
result += character
used += character_bytes
return result.rstrip(" ._")
def looks_like_video_url(value: str) -> bool:
normalized = value.strip()
return normalized.startswith(("http://", "https://")) and (
".mp4" in normalized or "sns-video" in normalized or "xhscdn.com" in normalized
)
def first_string_by_keys(value: Any, keys: tuple[str, ...]) -> str | None:
if isinstance(value, dict):
for key in keys:
candidate = value.get(key)
if isinstance(candidate, str) and candidate.strip():
return candidate.strip()
for child in value.values():
found = first_string_by_keys(child, keys)
if found:
return found
elif isinstance(value, list):
for child in value:
found = first_string_by_keys(child, keys)
if found:
return found
return None
def find_nearest_note_context(path: tuple[Any, ...]) -> dict[str, str]:
video_id = ""
title = ""
author_name = ""
for node in reversed(path):
if not isinstance(node, dict):
continue
if not video_id:
video_id = first_string_by_keys(node, ID_KEYS) or ""
if not title:
title = first_string_by_keys(node, TITLE_KEYS) or ""
if not author_name:
user = node.get("user") or node.get("author")
if isinstance(user, dict):
author_name = first_string_by_keys(user, AUTHOR_KEYS) or ""
return {
"video_id": video_id or "unknown",
"title": title or "untitled",
"author_name": author_name or "unknown",
}
def append_candidate(
candidates: list[VideoCandidate],
url: str,
source_key: str,
path: tuple[Any, ...],
) -> None:
if not looks_like_video_url(url):
return
context = find_nearest_note_context(path)
candidates.append(
VideoCandidate(
video_id=context["video_id"],
title=context["title"],
video_url=url.strip(),
author_name=context["author_name"],
source_key=source_key,
)
)
def walk_for_video_candidates(value: Any, path: tuple[Any, ...], candidates: list[VideoCandidate]) -> None:
if isinstance(value, dict):
current_path = (*path, value)
for key, child in value.items():
if key in VIDEO_URL_KEYS:
if isinstance(child, str):
append_candidate(candidates, child, key, current_path)
elif isinstance(child, list):
for item in child:
if isinstance(item, str):
append_candidate(candidates, item, key, current_path)
walk_for_video_candidates(child, current_path, candidates)
elif isinstance(value, list):
for child in value:
walk_for_video_candidates(child, path, candidates)
def extract_video_candidates(payload: Any) -> list[VideoCandidate]:
candidates: list[VideoCandidate] = []
walk_for_video_candidates(payload, (), candidates)
return candidates
def choose_video_candidate(candidates: list[VideoCandidate]) -> VideoCandidate:
if not candidates:
raise ValueError("没有可用的视频候选地址。")
source_priority = {"master_url": 0, "backup_url": 1, "backup_urls": 2, "url": 3}
return sorted(candidates, key=lambda item: source_priority.get(item.source_key, 99))[0]
def group_video_candidates(candidates: list[VideoCandidate]) -> list[VideoCandidate]:
grouped: dict[str, list[VideoCandidate]] = {}
order: list[str] = []
for candidate in candidates:
key = candidate.video_id or candidate.video_url
if key not in grouped:
grouped[key] = []
order.append(key)
grouped[key].append(candidate)
return [choose_video_candidate(grouped[key]) for key in order]
def build_output_path(candidate: VideoCandidate, output_dir: Path = DEFAULT_OUTPUT_DIR) -> Path:
safe_author = sanitize_filename(candidate.author_name, fallback="unknown")
safe_title = sanitize_filename(candidate.title, fallback="untitled")
safe_video_id = sanitize_filename(candidate.video_id, fallback="unknown")
prefix = f"[{safe_author}]"
suffix = f"-{safe_video_id}.mp4"
title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8"))
if title_budget < 1:
prefix_budget = MAX_FILENAME_BYTES - len(suffix.encode("utf-8")) - 1
prefix = truncate_utf8_bytes(prefix, max(1, prefix_budget))
title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8"))
filename = f"{prefix}{truncate_utf8_bytes(safe_title, max(1, title_budget))}{suffix}"
return output_dir / filename
def build_browser_address(browser_port: int | None) -> str | None:
if browser_port is None:
return None
return f"127.0.0.1:{browser_port}"
def ensure_browser_debug_port_ready(browser_port: int) -> None:
try:
with socket.create_connection(("127.0.0.1", browser_port), timeout=2):
return
except OSError as exc:
raise RuntimeError(
"无法连接到已启动的 Chrome 调试端口。"
f"请先运行 `./.venv/bin/python login_xhs.py --browser-port {browser_port}`"
"并确认 Chrome 仍在运行且端口一致。"
) from exc
def build_headers(referer: str) -> dict[str, str]:
return {
"referer": referer,
"user-agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/135.0.0.0 Safari/537.36"
),
}
def import_runtime_dependencies() -> tuple[Any, Any, Any]:
try:
import requests
except ModuleNotFoundError as exc:
raise SystemExit("缺少 requests请先执行: python3 -m pip install requests") from exc
try:
from DrissionPage import ChromiumOptions
from DrissionPage import ChromiumPage
except ModuleNotFoundError as exc:
raise SystemExit("缺少 DrissionPage请先执行: python3 -m pip install DrissionPage") from exc
return requests, ChromiumPage, ChromiumOptions
def create_page(chromium_page_cls: Any, chromium_options_cls: Any, browser_port: int | None) -> Any:
browser_address = build_browser_address(browser_port)
if browser_address is None:
return chromium_page_cls()
options = chromium_options_cls().set_address(browser_address).existing_only(True)
return chromium_page_cls(options)
def extract_feed_payload(response: Any) -> dict[str, Any]:
body = getattr(response, "body", None)
if isinstance(body, dict):
return body
raw_body = getattr(response, "raw_body", None)
if isinstance(raw_body, str) and raw_body.strip():
payload = json.loads(raw_body)
if isinstance(payload, dict):
return payload
raise ValueError("响应体不是可解析的 JSON 字典。")
def download_video(
requests_module: Any,
headers: dict[str, str],
video_url: str,
output_path: Path,
) -> None:
response = requests_module.get(video_url, headers=headers, timeout=60)
response.raise_for_status()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(response.content)
def wait_for_feed_packet(page: Any, timeout: int) -> Any | None:
try:
packet = page.listen.wait(timeout=timeout)
return packet if packet else None
except Exception as exc:
print(f"[WARN] 等待 feed 数据超时或失败: {exc}")
return None
def scroll_feed(page: Any, distance: int = 900) -> None:
script = f"""
const distance = {distance};
const candidates = Array.from(document.querySelectorAll('*'))
.filter((el) => {{
const rect = el.getBoundingClientRect();
return rect.width > 300
&& rect.height > 200
&& el.scrollHeight > el.clientHeight + 20;
}})
.sort((a, b) => {{
const areaA = a.getBoundingClientRect().width * a.getBoundingClientRect().height;
const areaB = b.getBoundingClientRect().width * b.getBoundingClientRect().height;
return areaB - areaA;
}});
const target = candidates[0] || document.scrollingElement || document.documentElement;
target.scrollBy(0, distance);
"""
page.run_js(script)
time.sleep(2)
def collect_videos(
max_videos: int,
timeout: int,
output_dir: Path,
browser_port: int | None,
start_url: str = DEFAULT_EXPLORE_URL,
) -> int:
requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies()
if browser_port is not None:
ensure_browser_debug_port_ready(browser_port)
page = create_page(chromium_page_cls, chromium_options_cls, browser_port)
page.listen.start(LISTEN_TARGET)
print("[INFO] 正在打开小红书发现页。若出现登录或验证码,请先在浏览器窗口里完成。")
page.get(start_url)
time.sleep(3)
downloaded = 0
seen_urls: set[str] = set()
seen_files: set[Path] = set()
consecutive_empty = 0
max_consecutive_empty = 6
while downloaded < max_videos and consecutive_empty < max_consecutive_empty:
packet = wait_for_feed_packet(page, timeout=timeout)
if packet is None:
consecutive_empty += 1
scroll_feed(page)
continue
try:
payload = extract_feed_payload(packet.response)
candidates = group_video_candidates(extract_video_candidates(payload))
except Exception as exc:
print(f"[WARN] 解析 feed 数据失败: {exc}")
consecutive_empty += 1
scroll_feed(page)
continue
fresh_candidates = [candidate for candidate in candidates if candidate.video_url not in seen_urls]
if not fresh_candidates:
consecutive_empty += 1
scroll_feed(page)
continue
consecutive_empty = 0
for candidate in fresh_candidates:
if downloaded >= max_videos:
break
seen_urls.add(candidate.video_url)
output_path = build_output_path(candidate, output_dir=output_dir)
if output_path in seen_files or output_path.exists():
continue
headers = build_headers(getattr(page, "url", start_url) or start_url)
try:
download_video(
requests_module=requests_module,
headers=headers,
video_url=candidate.video_url,
output_path=output_path,
)
except Exception as exc:
print(f"[WARN] 下载失败 {candidate.video_id}: {exc}")
continue
downloaded += 1
seen_files.add(output_path)
print(f"[OK] 已保存: {output_path}")
if downloaded < max_videos:
scroll_feed(page)
if downloaded == 0:
print("[WARN] 没有下载到视频。请确认已登录小红书、页面已加载 feed并在浏览器中滚动后重试。")
return downloaded
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="附着到已登录小红书 Chrome监听 feed 响应并下载视频")
parser.add_argument("--max-videos", type=int, default=10, help="最多下载视频数量,默认 10")
parser.add_argument("--output-dir", default=str(DEFAULT_OUTPUT_DIR), help="视频保存目录,默认 video")
parser.add_argument("--browser-port", type=int, default=DEFAULT_BROWSER_PORT, help="Chrome 调试端口,默认 9224")
parser.add_argument("--timeout", type=int, default=20, help="等待单次 feed 响应的秒数,默认 20")
parser.add_argument("--start-url", default=DEFAULT_EXPLORE_URL, help="打开或刷新使用的小红书页面")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.max_videos <= 0:
parser.error("--max-videos 必须大于 0")
if args.browser_port <= 0:
parser.error("--browser-port 必须大于 0")
downloaded = collect_videos(
max_videos=args.max_videos,
timeout=args.timeout,
output_dir=Path(args.output_dir),
browser_port=args.browser_port,
start_url=args.start_url,
)
print(f"[INFO] 本次共下载 {downloaded} 个视频。")
return 0
if __name__ == "__main__":
sys.exit(main())

121
login_xhs.py Normal file
View File

@ -0,0 +1,121 @@
from __future__ import annotations
import argparse
import socket
import subprocess
import sys
import time
from pathlib import Path
DEFAULT_START_URL = "https://www.xiaohongshu.com/explore"
DEFAULT_CHROME_PATH = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
DEFAULT_BROWSER_PORT = 9224
DEFAULT_PROFILE_DIR = Path(".xhs-chrome-profile")
def derive_chrome_app_path(chrome_path: str) -> str:
marker = ".app/"
if marker not in chrome_path:
return chrome_path
prefix, _ = chrome_path.split(marker, 1)
return f"{prefix}.app"
def build_login_command(
chrome_path: str,
profile_dir: Path,
browser_port: int,
start_url: str,
) -> list[str]:
return [
"open",
"-na",
derive_chrome_app_path(chrome_path),
"--args",
f"--user-data-dir={profile_dir}",
f"--remote-debugging-port={browser_port}",
start_url,
]
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="启动可见 Chrome供小红书手动登录后附着抓取")
parser.add_argument("--chrome-path", default=DEFAULT_CHROME_PATH, help="Chrome 可执行文件路径")
parser.add_argument(
"--profile-dir",
default=str(DEFAULT_PROFILE_DIR),
help="Chrome 用户数据目录,默认复用项目内固定目录",
)
parser.add_argument(
"--browser-port",
type=int,
default=DEFAULT_BROWSER_PORT,
help="Chrome 调试端口,默认 9224",
)
parser.add_argument("--start-url", default=DEFAULT_START_URL, help="启动后打开的小红书页面 URL")
return parser
def launch_browser(command: list[str]) -> subprocess.Popen[str]:
return subprocess.Popen(command)
def wait_for_browser_debug_port(
browser_port: int,
timeout_seconds: float = 15.0,
interval_seconds: float = 0.25,
) -> None:
deadline = time.monotonic() + timeout_seconds
while time.monotonic() < deadline:
try:
with socket.create_connection(("127.0.0.1", browser_port), timeout=1):
return
except OSError:
time.sleep(interval_seconds)
raise RuntimeError(f"Chrome 已启动命令,但调试端口 {browser_port} 在限定时间内未就绪。")
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.browser_port <= 0:
parser.error("--browser-port 必须大于 0")
chrome_path = Path(args.chrome_path)
if not chrome_path.exists():
print(f"[ERROR] Chrome 可执行文件不存在: {chrome_path}")
return 1
profile_dir = Path(args.profile_dir).resolve()
profile_dir.mkdir(parents=True, exist_ok=True)
command = build_login_command(
chrome_path=str(chrome_path),
profile_dir=profile_dir,
browser_port=args.browser_port,
start_url=args.start_url,
)
try:
launch_browser(command)
except OSError as exc:
print(f"[ERROR] 启动 Chrome 失败: {exc}")
return 1
try:
wait_for_browser_debug_port(args.browser_port)
except RuntimeError as exc:
print(f"[ERROR] {exc}")
return 1
print("[INFO] Chrome 已启动。请在打开的浏览器中完成小红书登录和验证码。")
next_command = "./.venv/bin/python XHS.py"
if args.browser_port != DEFAULT_BROWSER_PORT:
next_command = f"{next_command} --browser-port {args.browser_port}"
print(f"[INFO] 登录完成后执行: {next_command}")
return 0
if __name__ == "__main__":
sys.exit(main())

95
test_login_xhs.py Normal file
View File

@ -0,0 +1,95 @@
import importlib
import io
import tempfile
import unittest
from contextlib import redirect_stdout
from pathlib import Path
from unittest import mock
class LoginXhsModuleTests(unittest.TestCase):
def test_build_login_command_uses_expected_chrome_arguments(self) -> None:
module = importlib.import_module("login_xhs")
command = module.build_login_command(
chrome_path="/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
profile_dir=Path("/tmp/xhs-profile"),
browser_port=9224,
start_url="https://www.xiaohongshu.com/explore",
)
self.assertEqual(
command,
[
"open",
"-na",
"/Applications/Google Chrome.app",
"--args",
"--user-data-dir=/tmp/xhs-profile",
"--remote-debugging-port=9224",
"https://www.xiaohongshu.com/explore",
],
)
def test_build_parser_uses_expected_defaults(self) -> None:
module = importlib.import_module("login_xhs")
args = module.build_parser().parse_args([])
self.assertEqual(args.browser_port, 9224)
self.assertEqual(args.chrome_path, module.DEFAULT_CHROME_PATH)
self.assertEqual(args.start_url, module.DEFAULT_START_URL)
def test_main_creates_profile_dir_and_prints_next_step(self) -> None:
module = importlib.import_module("login_xhs")
with tempfile.TemporaryDirectory() as temp_dir:
profile_dir = Path(temp_dir) / "profile"
stdout = io.StringIO()
with redirect_stdout(stdout):
with mock.patch.object(module, "launch_browser") as mocked_launch:
with mock.patch.object(module, "wait_for_browser_debug_port") as mocked_wait:
exit_code = module.main(
[
"--chrome-path",
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"--profile-dir",
str(profile_dir),
"--browser-port",
"9334",
]
)
self.assertEqual(exit_code, 0)
self.assertTrue(profile_dir.exists())
mocked_launch.assert_called_once()
mocked_wait.assert_called_once_with(9334)
self.assertIn("9334", stdout.getvalue())
self.assertIn("./.venv/bin/python XHS.py --browser-port 9334", stdout.getvalue())
def test_main_uses_zero_argument_next_step_for_default_browser_port(self) -> None:
module = importlib.import_module("login_xhs")
with tempfile.TemporaryDirectory() as temp_dir:
profile_dir = Path(temp_dir) / "profile"
stdout = io.StringIO()
with redirect_stdout(stdout):
with mock.patch.object(module, "launch_browser"):
with mock.patch.object(module, "wait_for_browser_debug_port"):
exit_code = module.main(
[
"--chrome-path",
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"--profile-dir",
str(profile_dir),
]
)
self.assertEqual(exit_code, 0)
self.assertIn("./.venv/bin/python XHS.py", stdout.getvalue())
self.assertNotIn("--browser-port 9224", stdout.getvalue())
def test_main_returns_error_when_chrome_path_missing(self) -> None:
module = importlib.import_module("login_xhs")
stdout = io.StringIO()
with redirect_stdout(stdout):
exit_code = module.main(["--chrome-path", "/tmp/does-not-exist-chrome"])
self.assertEqual(exit_code, 1)
self.assertIn("Chrome", stdout.getvalue())
self.assertIn("不存在", stdout.getvalue())
if __name__ == "__main__":
unittest.main()

178
test_xhs.py Normal file
View File

@ -0,0 +1,178 @@
import importlib
import unittest
from unittest import mock
class FakeResponse:
def __init__(self, body, raw_body):
self.body = body
self.raw_body = raw_body
class XhsModuleTests(unittest.TestCase):
def test_module_can_import_without_optional_runtime_dependencies(self) -> None:
module = importlib.import_module("XHS")
self.assertIsNotNone(module)
def test_sanitize_filename_removes_invalid_characters(self) -> None:
module = importlib.import_module("XHS")
self.assertEqual(
module.sanitize_filename('a/b:c*?d"e<f>g|h\n'),
"a_b_c__d_e_f_g_h",
)
def test_truncate_utf8_bytes_keeps_valid_utf8(self) -> None:
module = importlib.import_module("XHS")
result = module.truncate_utf8_bytes("测试标题" * 20, 20)
self.assertLessEqual(len(result.encode("utf-8")), 20)
result.encode("utf-8")
def test_choose_video_url_prefers_master_url(self) -> None:
module = importlib.import_module("XHS")
candidates = [
module.VideoCandidate(
video_id="note-1",
title="标题",
video_url="https://example.com/backup.mp4",
author_name="作者",
source_key="backup_urls",
),
module.VideoCandidate(
video_id="note-1",
title="标题",
video_url="https://sns-video.xhscdn.com/master.mp4",
author_name="作者",
source_key="master_url",
),
]
self.assertEqual(module.choose_video_candidate(candidates).video_url, "https://sns-video.xhscdn.com/master.mp4")
def test_group_video_candidates_keeps_one_preferred_candidate_per_video_id(self) -> None:
module = importlib.import_module("XHS")
candidates = [
module.VideoCandidate("note-1", "标题", "https://example.com/backup.mp4", "作者", "backup_urls"),
module.VideoCandidate("note-1", "标题", "https://example.com/master.mp4", "作者", "master_url"),
module.VideoCandidate("note-2", "标题2", "https://example.com/two.mp4", "作者", "master_url"),
]
grouped = module.group_video_candidates(candidates)
self.assertEqual([item.video_id for item in grouped], ["note-1", "note-2"])
self.assertEqual(grouped[0].video_url, "https://example.com/master.mp4")
def test_extract_video_candidates_from_nested_feed_payload(self) -> None:
module = importlib.import_module("XHS")
payload = {
"data": {
"items": [
{
"id": "note123",
"note_card": {
"display_title": "海边日落",
"user": {"nickname": "摄影师"},
"video": {
"media": {
"stream": {
"h264": [
{
"master_url": "https://sns-video.xhscdn.com/stream/a.mp4?sign=1",
"backup_urls": [
"https://sns-video.xhscdn.com/stream/a-backup.mp4?sign=1"
],
}
]
}
}
},
},
}
]
}
}
candidates = module.extract_video_candidates(payload)
self.assertEqual(len(candidates), 2)
self.assertEqual(candidates[0].video_id, "note123")
self.assertEqual(candidates[0].title, "海边日落")
self.assertEqual(candidates[0].author_name, "摄影师")
self.assertEqual(candidates[0].source_key, "master_url")
def test_build_output_path_uses_author_title_and_video_id(self) -> None:
module = importlib.import_module("XHS")
candidate = module.VideoCandidate(
video_id="note123",
title="海边日落",
video_url="https://sns-video.xhscdn.com/a.mp4",
author_name="摄影师",
source_key="master_url",
)
output_path = module.build_output_path(candidate)
self.assertEqual(output_path.as_posix(), "video/[摄影师]海边日落-note123.mp4")
def test_build_browser_address_from_port(self) -> None:
module = importlib.import_module("XHS")
self.assertEqual(module.build_browser_address(9224), "127.0.0.1:9224")
self.assertIsNone(module.build_browser_address(None))
def test_ensure_browser_debug_port_ready_accepts_open_port(self) -> None:
module = importlib.import_module("XHS")
connection = mock.MagicMock()
connection.__enter__.return_value = connection
connection.__exit__.return_value = False
with mock.patch.object(module.socket, "create_connection", return_value=connection) as mocked_connect:
module.ensure_browser_debug_port_ready(9224)
mocked_connect.assert_called_once()
def test_ensure_browser_debug_port_ready_rejects_closed_port(self) -> None:
module = importlib.import_module("XHS")
with mock.patch.object(module.socket, "create_connection", side_effect=OSError("boom")):
with self.assertRaisesRegex(RuntimeError, "login_xhs.py"):
module.ensure_browser_debug_port_ready(9224)
def test_extract_feed_payload_uses_dict_body(self) -> None:
module = importlib.import_module("XHS")
response = FakeResponse({"data": {"items": []}}, "")
self.assertEqual(module.extract_feed_payload(response), {"data": {"items": []}})
def test_extract_feed_payload_falls_back_to_raw_json(self) -> None:
module = importlib.import_module("XHS")
response = FakeResponse("", '{"data": {"items": [{"id": "1"}]}}')
self.assertEqual(
module.extract_feed_payload(response),
{"data": {"items": [{"id": "1"}]}},
)
def test_build_parser_uses_expected_defaults(self) -> None:
module = importlib.import_module("XHS")
args = module.build_parser().parse_args([])
self.assertEqual(args.max_videos, 10)
self.assertEqual(args.output_dir, "video")
self.assertEqual(args.browser_port, 9224)
self.assertEqual(args.timeout, 20)
self.assertEqual(args.start_url, module.DEFAULT_EXPLORE_URL)
def test_main_invokes_collect_videos_with_cli_values(self) -> None:
module = importlib.import_module("XHS")
with mock.patch.object(module, "collect_videos", return_value=3) as mocked_collect:
exit_code = module.main(
[
"--max-videos",
"3",
"--output-dir",
"downloads",
"--browser-port",
"9334",
"--timeout",
"7",
"--start-url",
"https://www.xiaohongshu.com/explore",
]
)
self.assertEqual(exit_code, 0)
mocked_collect.assert_called_once()
_, kwargs = mocked_collect.call_args
self.assertEqual(kwargs["max_videos"], 3)
self.assertEqual(kwargs["output_dir"].as_posix(), "downloads")
self.assertEqual(kwargs["browser_port"], 9334)
self.assertEqual(kwargs["timeout"], 7)
if __name__ == "__main__":
unittest.main()