Compare commits

...

2 Commits

Author SHA1 Message Date
wangshaoqing
3a2a6a69e0 Add XHS browser feed downloader 2026-05-27 14:06:47 +08:00
wangshaoqing
ec5d174bdc Add XHS downloader design and plan 2026-05-27 13:59:00 +08:00
8 changed files with 986 additions and 15 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
.venv/
__pycache__/
*.pyc
.xhs-chrome-profile/
video/

View File

@ -2,6 +2,14 @@
用于探索和研究小红书视频公开内容抓取流程的项目。
## 当前能力
第一版采用和抖音参考项目类似的两步式工作流:
1. `login_xhs.py` 启动一个可见 Chrome并打开小红书发现页。
2. 你在 Chrome 中手动登录和处理验证码。
3. `XHS.py` 附着到这个 Chrome监听页面已经收到的 `feed` 响应,提取其中的 mp4 视频地址并下载。
## 项目目的
本项目用于学习和验证视频信息采集相关技术,包括请求分析、页面解析、数据结构整理、下载流程设计和后续自动化处理。
@ -15,7 +23,67 @@
- 不绕过平台访问控制、验证码、登录风控或反爬限制。
- 不将本项目用于批量侵权下载、商业化搬运或其他违规用途。
## 计划功能
## 安装环境
```bash
cd /Users/wangshaoqing/Desktop/MiaoSi/Study/xhs_video_crawler
python3 -m venv .venv
source .venv/bin/activate
pip install requests DrissionPage
```
## 使用方法
### 步骤 1启动 Chrome 并手动登录
```bash
./.venv/bin/python login_xhs.py
```
脚本会打开 `https://www.xiaohongshu.com/explore`。请在打开的浏览器里完成登录;如果出现验证码,也需要手动处理。
### 步骤 2下载发现页视频
登录完成后,保持 Chrome 不要关闭,执行:
```bash
./.venv/bin/python XHS.py --max-videos 10
```
常用参数:
```bash
# 指定下载数量
./.venv/bin/python XHS.py --max-videos 20
# 指定保存目录
./.venv/bin/python XHS.py --max-videos 10 --output-dir video
# 如果启动 Chrome 时换了端口,下载脚本也要使用同一个端口
./.venv/bin/python login_xhs.py --browser-port 9334
./.venv/bin/python XHS.py --browser-port 9334 --max-videos 10
```
下载文件默认保存到 `video/` 目录,文件名格式大致为:
```text
[作者]标题-note_id.mp4
```
## 工作原理
- 浏览器负责加载小红书页面和保留登录态。
- 脚本只监听浏览器里已经产生的网络响应。
- 解析器会递归查找响应 JSON 中的 `master_url``backup_urls` 等视频地址字段。
- 下载过程会去重,并在单个视频失败时继续处理后续视频。
## 测试
```bash
python3 -m unittest test_xhs.py test_login_xhs.py -v
```
## 后续计划
- 分析公开视频页面的数据结构。
- 提取视频标题、作者、发布时间、封面和视频地址等元数据。
@ -23,20 +91,6 @@
- 保存抓取结果到本地文件或结构化数据表。
- 为后续下载、去重和任务队列处理预留接口。
## 项目状态
当前处于初始化阶段README 先作为项目说明和开发规划入口。
## 开发建议
后续可以按以下方向逐步推进:
1. 初始化运行环境和依赖管理。
2. 增加基础配置文件,例如 `.gitignore`、依赖清单和示例配置。
3. 实现单个公开视频链接的数据解析。
4. 增加错误处理、日志和请求频率控制。
5. 编写基础测试,确保解析逻辑稳定。
## 免责声明
本项目仅用于技术学习与研究。使用者应自行承担使用本项目产生的全部责任。

397
XHS.py Normal file
View File

@ -0,0 +1,397 @@
from __future__ import annotations
import argparse
import json
import re
import socket
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
DEFAULT_EXPLORE_URL = "https://www.xiaohongshu.com/explore"
DEFAULT_BROWSER_PORT = 9224
DEFAULT_OUTPUT_DIR = Path("video")
LISTEN_TARGET = "feed"
MAX_FILENAME_BYTES = 240
INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]')
VIDEO_URL_KEYS = {"master_url", "backup_url", "backup_urls", "url"}
TITLE_KEYS = ("display_title", "title", "desc", "description")
ID_KEYS = ("id", "note_id", "noteId", "video_id", "file_id")
AUTHOR_KEYS = ("nickname", "name", "user_name", "userName")
@dataclass(frozen=True)
class VideoCandidate:
video_id: str
title: str
video_url: str
author_name: str
source_key: str
def sanitize_filename(value: str, fallback: str = "untitled") -> str:
cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._")
return cleaned or fallback
def truncate_utf8_bytes(value: str, max_bytes: int) -> str:
if len(value.encode("utf-8")) <= max_bytes:
return value
result = ""
used = 0
for character in value:
character_bytes = len(character.encode("utf-8"))
if used + character_bytes > max_bytes:
break
result += character
used += character_bytes
return result.rstrip(" ._")
def looks_like_video_url(value: str) -> bool:
normalized = value.strip()
return normalized.startswith(("http://", "https://")) and (
".mp4" in normalized or "sns-video" in normalized or "xhscdn.com" in normalized
)
def first_string_by_keys(value: Any, keys: tuple[str, ...]) -> str | None:
if isinstance(value, dict):
for key in keys:
candidate = value.get(key)
if isinstance(candidate, str) and candidate.strip():
return candidate.strip()
for child in value.values():
found = first_string_by_keys(child, keys)
if found:
return found
elif isinstance(value, list):
for child in value:
found = first_string_by_keys(child, keys)
if found:
return found
return None
def find_nearest_note_context(path: tuple[Any, ...]) -> dict[str, str]:
video_id = ""
title = ""
author_name = ""
for node in reversed(path):
if not isinstance(node, dict):
continue
if not video_id:
video_id = first_string_by_keys(node, ID_KEYS) or ""
if not title:
title = first_string_by_keys(node, TITLE_KEYS) or ""
if not author_name:
user = node.get("user") or node.get("author")
if isinstance(user, dict):
author_name = first_string_by_keys(user, AUTHOR_KEYS) or ""
return {
"video_id": video_id or "unknown",
"title": title or "untitled",
"author_name": author_name or "unknown",
}
def append_candidate(
candidates: list[VideoCandidate],
url: str,
source_key: str,
path: tuple[Any, ...],
) -> None:
if not looks_like_video_url(url):
return
context = find_nearest_note_context(path)
candidates.append(
VideoCandidate(
video_id=context["video_id"],
title=context["title"],
video_url=url.strip(),
author_name=context["author_name"],
source_key=source_key,
)
)
def walk_for_video_candidates(value: Any, path: tuple[Any, ...], candidates: list[VideoCandidate]) -> None:
if isinstance(value, dict):
current_path = (*path, value)
for key, child in value.items():
if key in VIDEO_URL_KEYS:
if isinstance(child, str):
append_candidate(candidates, child, key, current_path)
elif isinstance(child, list):
for item in child:
if isinstance(item, str):
append_candidate(candidates, item, key, current_path)
walk_for_video_candidates(child, current_path, candidates)
elif isinstance(value, list):
for child in value:
walk_for_video_candidates(child, path, candidates)
def extract_video_candidates(payload: Any) -> list[VideoCandidate]:
candidates: list[VideoCandidate] = []
walk_for_video_candidates(payload, (), candidates)
return candidates
def choose_video_candidate(candidates: list[VideoCandidate]) -> VideoCandidate:
if not candidates:
raise ValueError("没有可用的视频候选地址。")
source_priority = {"master_url": 0, "backup_url": 1, "backup_urls": 2, "url": 3}
return sorted(candidates, key=lambda item: source_priority.get(item.source_key, 99))[0]
def group_video_candidates(candidates: list[VideoCandidate]) -> list[VideoCandidate]:
grouped: dict[str, list[VideoCandidate]] = {}
order: list[str] = []
for candidate in candidates:
key = candidate.video_id or candidate.video_url
if key not in grouped:
grouped[key] = []
order.append(key)
grouped[key].append(candidate)
return [choose_video_candidate(grouped[key]) for key in order]
def build_output_path(candidate: VideoCandidate, output_dir: Path = DEFAULT_OUTPUT_DIR) -> Path:
safe_author = sanitize_filename(candidate.author_name, fallback="unknown")
safe_title = sanitize_filename(candidate.title, fallback="untitled")
safe_video_id = sanitize_filename(candidate.video_id, fallback="unknown")
prefix = f"[{safe_author}]"
suffix = f"-{safe_video_id}.mp4"
title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8"))
if title_budget < 1:
prefix_budget = MAX_FILENAME_BYTES - len(suffix.encode("utf-8")) - 1
prefix = truncate_utf8_bytes(prefix, max(1, prefix_budget))
title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8"))
filename = f"{prefix}{truncate_utf8_bytes(safe_title, max(1, title_budget))}{suffix}"
return output_dir / filename
def build_browser_address(browser_port: int | None) -> str | None:
if browser_port is None:
return None
return f"127.0.0.1:{browser_port}"
def ensure_browser_debug_port_ready(browser_port: int) -> None:
try:
with socket.create_connection(("127.0.0.1", browser_port), timeout=2):
return
except OSError as exc:
raise RuntimeError(
"无法连接到已启动的 Chrome 调试端口。"
f"请先运行 `./.venv/bin/python login_xhs.py --browser-port {browser_port}`"
"并确认 Chrome 仍在运行且端口一致。"
) from exc
def build_headers(referer: str) -> dict[str, str]:
return {
"referer": referer,
"user-agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/135.0.0.0 Safari/537.36"
),
}
def import_runtime_dependencies() -> tuple[Any, Any, Any]:
try:
import requests
except ModuleNotFoundError as exc:
raise SystemExit("缺少 requests请先执行: python3 -m pip install requests") from exc
try:
from DrissionPage import ChromiumOptions
from DrissionPage import ChromiumPage
except ModuleNotFoundError as exc:
raise SystemExit("缺少 DrissionPage请先执行: python3 -m pip install DrissionPage") from exc
return requests, ChromiumPage, ChromiumOptions
def create_page(chromium_page_cls: Any, chromium_options_cls: Any, browser_port: int | None) -> Any:
browser_address = build_browser_address(browser_port)
if browser_address is None:
return chromium_page_cls()
options = chromium_options_cls().set_address(browser_address).existing_only(True)
return chromium_page_cls(options)
def extract_feed_payload(response: Any) -> dict[str, Any]:
body = getattr(response, "body", None)
if isinstance(body, dict):
return body
raw_body = getattr(response, "raw_body", None)
if isinstance(raw_body, str) and raw_body.strip():
payload = json.loads(raw_body)
if isinstance(payload, dict):
return payload
raise ValueError("响应体不是可解析的 JSON 字典。")
def download_video(
requests_module: Any,
headers: dict[str, str],
video_url: str,
output_path: Path,
) -> None:
response = requests_module.get(video_url, headers=headers, timeout=60)
response.raise_for_status()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(response.content)
def wait_for_feed_packet(page: Any, timeout: int) -> Any | None:
try:
packet = page.listen.wait(timeout=timeout)
return packet if packet else None
except Exception as exc:
print(f"[WARN] 等待 feed 数据超时或失败: {exc}")
return None
def scroll_feed(page: Any, distance: int = 900) -> None:
script = f"""
const distance = {distance};
const candidates = Array.from(document.querySelectorAll('*'))
.filter((el) => {{
const rect = el.getBoundingClientRect();
return rect.width > 300
&& rect.height > 200
&& el.scrollHeight > el.clientHeight + 20;
}})
.sort((a, b) => {{
const areaA = a.getBoundingClientRect().width * a.getBoundingClientRect().height;
const areaB = b.getBoundingClientRect().width * b.getBoundingClientRect().height;
return areaB - areaA;
}});
const target = candidates[0] || document.scrollingElement || document.documentElement;
target.scrollBy(0, distance);
"""
page.run_js(script)
time.sleep(2)
def collect_videos(
max_videos: int,
timeout: int,
output_dir: Path,
browser_port: int | None,
start_url: str = DEFAULT_EXPLORE_URL,
) -> int:
requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies()
if browser_port is not None:
ensure_browser_debug_port_ready(browser_port)
page = create_page(chromium_page_cls, chromium_options_cls, browser_port)
page.listen.start(LISTEN_TARGET)
print("[INFO] 正在打开小红书发现页。若出现登录或验证码,请先在浏览器窗口里完成。")
page.get(start_url)
time.sleep(3)
downloaded = 0
seen_urls: set[str] = set()
seen_files: set[Path] = set()
consecutive_empty = 0
max_consecutive_empty = 6
while downloaded < max_videos and consecutive_empty < max_consecutive_empty:
packet = wait_for_feed_packet(page, timeout=timeout)
if packet is None:
consecutive_empty += 1
scroll_feed(page)
continue
try:
payload = extract_feed_payload(packet.response)
candidates = group_video_candidates(extract_video_candidates(payload))
except Exception as exc:
print(f"[WARN] 解析 feed 数据失败: {exc}")
consecutive_empty += 1
scroll_feed(page)
continue
fresh_candidates = [candidate for candidate in candidates if candidate.video_url not in seen_urls]
if not fresh_candidates:
consecutive_empty += 1
scroll_feed(page)
continue
consecutive_empty = 0
for candidate in fresh_candidates:
if downloaded >= max_videos:
break
seen_urls.add(candidate.video_url)
output_path = build_output_path(candidate, output_dir=output_dir)
if output_path in seen_files or output_path.exists():
continue
headers = build_headers(getattr(page, "url", start_url) or start_url)
try:
download_video(
requests_module=requests_module,
headers=headers,
video_url=candidate.video_url,
output_path=output_path,
)
except Exception as exc:
print(f"[WARN] 下载失败 {candidate.video_id}: {exc}")
continue
downloaded += 1
seen_files.add(output_path)
print(f"[OK] 已保存: {output_path}")
if downloaded < max_videos:
scroll_feed(page)
if downloaded == 0:
print("[WARN] 没有下载到视频。请确认已登录小红书、页面已加载 feed并在浏览器中滚动后重试。")
return downloaded
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="附着到已登录小红书 Chrome监听 feed 响应并下载视频")
parser.add_argument("--max-videos", type=int, default=10, help="最多下载视频数量,默认 10")
parser.add_argument("--output-dir", default=str(DEFAULT_OUTPUT_DIR), help="视频保存目录,默认 video")
parser.add_argument("--browser-port", type=int, default=DEFAULT_BROWSER_PORT, help="Chrome 调试端口,默认 9224")
parser.add_argument("--timeout", type=int, default=20, help="等待单次 feed 响应的秒数,默认 20")
parser.add_argument("--start-url", default=DEFAULT_EXPLORE_URL, help="打开或刷新使用的小红书页面")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.max_videos <= 0:
parser.error("--max-videos 必须大于 0")
if args.browser_port <= 0:
parser.error("--browser-port 必须大于 0")
downloaded = collect_videos(
max_videos=args.max_videos,
timeout=args.timeout,
output_dir=Path(args.output_dir),
browser_port=args.browser_port,
start_url=args.start_url,
)
print(f"[INFO] 本次共下载 {downloaded} 个视频。")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,63 @@
# Xiaohongshu Browser Feed Download Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Build a first usable Xiaohongshu downloader that attaches to a manually logged-in Chrome session, listens for feed responses, extracts mp4 URLs, and downloads a limited number of videos.
**Architecture:** Add `login_xhs.py` for visible Chrome startup and `XHS.py` for browser attachment, feed listening, parsing, scrolling, and downloading. Keep browser/runtime imports lazy so unit tests run without Chrome automation dependencies.
**Tech Stack:** Python 3, unittest, requests, DrissionPage, macOS Chrome remote debugging.
---
## File Structure
- Create `login_xhs.py`: Chrome launch CLI, debug-port readiness wait, user-facing next command.
- Create `XHS.py`: parsing helpers, browser attachment helpers, downloader CLI, feed collection loop.
- Create `test_login_xhs.py`: command construction and CLI behavior tests.
- Create `test_xhs.py`: parser, filename, output path, browser address, and port-check tests.
- Modify `README.md`: install and usage instructions.
## Task 1: Login Browser Entrypoint
**Files:**
- Create: `test_login_xhs.py`
- Create: `login_xhs.py`
- [ ] Write failing tests for Chrome command construction, parser defaults, profile creation, and missing Chrome path handling.
- [ ] Run `python3 -m unittest test_login_xhs.py -v` and verify the tests fail because `login_xhs` does not exist.
- [ ] Implement `login_xhs.py` with lazy, testable functions matching the Douyin project pattern.
- [ ] Run `python3 -m unittest test_login_xhs.py -v` and verify it passes.
## Task 2: Pure XHS Parsing Helpers
**Files:**
- Create: `test_xhs.py`
- Create: `XHS.py`
- [ ] Write failing tests for filename sanitization, UTF-8 truncation, choosing video URLs, recursive extraction from a nested feed payload, output path generation, browser address construction, and debug-port validation.
- [ ] Run `python3 -m unittest test_xhs.py -v` and verify the tests fail because `XHS` does not exist.
- [ ] Implement pure helpers and lazy dependency imports in `XHS.py`.
- [ ] Run `python3 -m unittest test_xhs.py -v` and verify it passes.
## Task 3: Browser Feed Collection CLI
**Files:**
- Modify: `XHS.py`
- Modify: `test_xhs.py`
- [ ] Write failing tests for feed payload extraction from packet response objects and the argument parser defaults.
- [ ] Run `python3 -m unittest test_xhs.py -v` and verify the new tests fail.
- [ ] Implement DrissionPage attachment, feed listening, gentle scrolling, download loop, and CLI `main`.
- [ ] Run `python3 -m unittest test_xhs.py test_login_xhs.py -v` and verify it passes.
## Task 4: README and Final Verification
**Files:**
- Modify: `README.md`
- [ ] Update README with setup, login, download, output, and compliance notes.
- [ ] Run `python3 -m unittest test_xhs.py test_login_xhs.py -v`.
- [ ] Run `git status --short --branch`.
- [ ] Commit all implementation changes.
- [ ] Push to `origin/main`.

View File

@ -0,0 +1,58 @@
# Xiaohongshu Browser Feed Download Design
## Goal
Build a first usable Xiaohongshu video downloader that attaches to a manually logged-in Chrome session, listens for official site feed responses, extracts video URLs that the page already received, and downloads a limited number of videos.
## Scope
The first version supports `https://www.xiaohongshu.com/explore` and videos surfaced through feed responses while the visible browser is open. It does not automate login, bypass captcha, generate signatures, replay private APIs directly, or attempt to defeat platform protections.
## Architecture
The tool mirrors the existing Douyin project pattern:
- `login_xhs.py` starts a visible Chrome instance with a fixed profile directory and a remote debugging port.
- `XHS.py` connects to that existing Chrome through DrissionPage, listens for responses whose URL contains `feed`, recursively extracts mp4 URLs such as `master_url` and `backup_urls`, deduplicates them, and downloads videos through `requests`.
- Unit tests cover pure parsing, filename, URL choice, and login command construction.
## Data Flow
1. The user runs `python3 login_xhs.py`.
2. Chrome opens Xiaohongshu Explore with a persistent local profile.
3. The user logs in manually and handles any verification.
4. The user runs `python3 XHS.py --max-videos 10`.
5. `XHS.py` attaches to the Chrome debugging port and starts network listening.
6. The script opens or refreshes Explore, waits for feed packets, extracts video metadata and downloadable mp4 URLs, and writes files to `video/`.
7. The script scrolls gently between waits to trigger more page-loaded feed responses until it downloads the requested limit or reaches empty-response limits.
## CLI
```bash
python3 login_xhs.py
python3 XHS.py --max-videos 10
python3 XHS.py --browser-port 9224 --max-videos 20 --output-dir video
```
## Error Handling
- If the browser debugging port is closed, print an actionable message pointing to `login_xhs.py`.
- If optional dependencies are missing, print install commands.
- If no feed data is observed, explain that the user should confirm login, page loading, and scrolling.
- If one video download fails, continue with later videos.
## Testing
Use Python `unittest` without requiring browser dependencies at import time. Tests should not launch Chrome or make network requests.
Coverage targets:
- Safe filename generation and byte truncation.
- Recursive extraction of video candidates from nested Xiaohongshu-like JSON.
- URL selection preference for `master_url` and fallback URLs.
- Output path generation.
- Browser launch command construction and default CLI values.
## Open Constraints
The exact Xiaohongshu response shape may vary. The parser should be tolerant and recursive instead of hard-coding one complete schema from a screenshot.

121
login_xhs.py Normal file
View File

@ -0,0 +1,121 @@
from __future__ import annotations
import argparse
import socket
import subprocess
import sys
import time
from pathlib import Path
DEFAULT_START_URL = "https://www.xiaohongshu.com/explore"
DEFAULT_CHROME_PATH = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
DEFAULT_BROWSER_PORT = 9224
DEFAULT_PROFILE_DIR = Path(".xhs-chrome-profile")
def derive_chrome_app_path(chrome_path: str) -> str:
marker = ".app/"
if marker not in chrome_path:
return chrome_path
prefix, _ = chrome_path.split(marker, 1)
return f"{prefix}.app"
def build_login_command(
chrome_path: str,
profile_dir: Path,
browser_port: int,
start_url: str,
) -> list[str]:
return [
"open",
"-na",
derive_chrome_app_path(chrome_path),
"--args",
f"--user-data-dir={profile_dir}",
f"--remote-debugging-port={browser_port}",
start_url,
]
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="启动可见 Chrome供小红书手动登录后附着抓取")
parser.add_argument("--chrome-path", default=DEFAULT_CHROME_PATH, help="Chrome 可执行文件路径")
parser.add_argument(
"--profile-dir",
default=str(DEFAULT_PROFILE_DIR),
help="Chrome 用户数据目录,默认复用项目内固定目录",
)
parser.add_argument(
"--browser-port",
type=int,
default=DEFAULT_BROWSER_PORT,
help="Chrome 调试端口,默认 9224",
)
parser.add_argument("--start-url", default=DEFAULT_START_URL, help="启动后打开的小红书页面 URL")
return parser
def launch_browser(command: list[str]) -> subprocess.Popen[str]:
return subprocess.Popen(command)
def wait_for_browser_debug_port(
browser_port: int,
timeout_seconds: float = 15.0,
interval_seconds: float = 0.25,
) -> None:
deadline = time.monotonic() + timeout_seconds
while time.monotonic() < deadline:
try:
with socket.create_connection(("127.0.0.1", browser_port), timeout=1):
return
except OSError:
time.sleep(interval_seconds)
raise RuntimeError(f"Chrome 已启动命令,但调试端口 {browser_port} 在限定时间内未就绪。")
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.browser_port <= 0:
parser.error("--browser-port 必须大于 0")
chrome_path = Path(args.chrome_path)
if not chrome_path.exists():
print(f"[ERROR] Chrome 可执行文件不存在: {chrome_path}")
return 1
profile_dir = Path(args.profile_dir).resolve()
profile_dir.mkdir(parents=True, exist_ok=True)
command = build_login_command(
chrome_path=str(chrome_path),
profile_dir=profile_dir,
browser_port=args.browser_port,
start_url=args.start_url,
)
try:
launch_browser(command)
except OSError as exc:
print(f"[ERROR] 启动 Chrome 失败: {exc}")
return 1
try:
wait_for_browser_debug_port(args.browser_port)
except RuntimeError as exc:
print(f"[ERROR] {exc}")
return 1
print("[INFO] Chrome 已启动。请在打开的浏览器中完成小红书登录和验证码。")
next_command = "./.venv/bin/python XHS.py"
if args.browser_port != DEFAULT_BROWSER_PORT:
next_command = f"{next_command} --browser-port {args.browser_port}"
print(f"[INFO] 登录完成后执行: {next_command}")
return 0
if __name__ == "__main__":
sys.exit(main())

95
test_login_xhs.py Normal file
View File

@ -0,0 +1,95 @@
import importlib
import io
import tempfile
import unittest
from contextlib import redirect_stdout
from pathlib import Path
from unittest import mock
class LoginXhsModuleTests(unittest.TestCase):
def test_build_login_command_uses_expected_chrome_arguments(self) -> None:
module = importlib.import_module("login_xhs")
command = module.build_login_command(
chrome_path="/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
profile_dir=Path("/tmp/xhs-profile"),
browser_port=9224,
start_url="https://www.xiaohongshu.com/explore",
)
self.assertEqual(
command,
[
"open",
"-na",
"/Applications/Google Chrome.app",
"--args",
"--user-data-dir=/tmp/xhs-profile",
"--remote-debugging-port=9224",
"https://www.xiaohongshu.com/explore",
],
)
def test_build_parser_uses_expected_defaults(self) -> None:
module = importlib.import_module("login_xhs")
args = module.build_parser().parse_args([])
self.assertEqual(args.browser_port, 9224)
self.assertEqual(args.chrome_path, module.DEFAULT_CHROME_PATH)
self.assertEqual(args.start_url, module.DEFAULT_START_URL)
def test_main_creates_profile_dir_and_prints_next_step(self) -> None:
module = importlib.import_module("login_xhs")
with tempfile.TemporaryDirectory() as temp_dir:
profile_dir = Path(temp_dir) / "profile"
stdout = io.StringIO()
with redirect_stdout(stdout):
with mock.patch.object(module, "launch_browser") as mocked_launch:
with mock.patch.object(module, "wait_for_browser_debug_port") as mocked_wait:
exit_code = module.main(
[
"--chrome-path",
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"--profile-dir",
str(profile_dir),
"--browser-port",
"9334",
]
)
self.assertEqual(exit_code, 0)
self.assertTrue(profile_dir.exists())
mocked_launch.assert_called_once()
mocked_wait.assert_called_once_with(9334)
self.assertIn("9334", stdout.getvalue())
self.assertIn("./.venv/bin/python XHS.py --browser-port 9334", stdout.getvalue())
def test_main_uses_zero_argument_next_step_for_default_browser_port(self) -> None:
module = importlib.import_module("login_xhs")
with tempfile.TemporaryDirectory() as temp_dir:
profile_dir = Path(temp_dir) / "profile"
stdout = io.StringIO()
with redirect_stdout(stdout):
with mock.patch.object(module, "launch_browser"):
with mock.patch.object(module, "wait_for_browser_debug_port"):
exit_code = module.main(
[
"--chrome-path",
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"--profile-dir",
str(profile_dir),
]
)
self.assertEqual(exit_code, 0)
self.assertIn("./.venv/bin/python XHS.py", stdout.getvalue())
self.assertNotIn("--browser-port 9224", stdout.getvalue())
def test_main_returns_error_when_chrome_path_missing(self) -> None:
module = importlib.import_module("login_xhs")
stdout = io.StringIO()
with redirect_stdout(stdout):
exit_code = module.main(["--chrome-path", "/tmp/does-not-exist-chrome"])
self.assertEqual(exit_code, 1)
self.assertIn("Chrome", stdout.getvalue())
self.assertIn("不存在", stdout.getvalue())
if __name__ == "__main__":
unittest.main()

178
test_xhs.py Normal file
View File

@ -0,0 +1,178 @@
import importlib
import unittest
from unittest import mock
class FakeResponse:
def __init__(self, body, raw_body):
self.body = body
self.raw_body = raw_body
class XhsModuleTests(unittest.TestCase):
def test_module_can_import_without_optional_runtime_dependencies(self) -> None:
module = importlib.import_module("XHS")
self.assertIsNotNone(module)
def test_sanitize_filename_removes_invalid_characters(self) -> None:
module = importlib.import_module("XHS")
self.assertEqual(
module.sanitize_filename('a/b:c*?d"e<f>g|h\n'),
"a_b_c__d_e_f_g_h",
)
def test_truncate_utf8_bytes_keeps_valid_utf8(self) -> None:
module = importlib.import_module("XHS")
result = module.truncate_utf8_bytes("测试标题" * 20, 20)
self.assertLessEqual(len(result.encode("utf-8")), 20)
result.encode("utf-8")
def test_choose_video_url_prefers_master_url(self) -> None:
module = importlib.import_module("XHS")
candidates = [
module.VideoCandidate(
video_id="note-1",
title="标题",
video_url="https://example.com/backup.mp4",
author_name="作者",
source_key="backup_urls",
),
module.VideoCandidate(
video_id="note-1",
title="标题",
video_url="https://sns-video.xhscdn.com/master.mp4",
author_name="作者",
source_key="master_url",
),
]
self.assertEqual(module.choose_video_candidate(candidates).video_url, "https://sns-video.xhscdn.com/master.mp4")
def test_group_video_candidates_keeps_one_preferred_candidate_per_video_id(self) -> None:
module = importlib.import_module("XHS")
candidates = [
module.VideoCandidate("note-1", "标题", "https://example.com/backup.mp4", "作者", "backup_urls"),
module.VideoCandidate("note-1", "标题", "https://example.com/master.mp4", "作者", "master_url"),
module.VideoCandidate("note-2", "标题2", "https://example.com/two.mp4", "作者", "master_url"),
]
grouped = module.group_video_candidates(candidates)
self.assertEqual([item.video_id for item in grouped], ["note-1", "note-2"])
self.assertEqual(grouped[0].video_url, "https://example.com/master.mp4")
def test_extract_video_candidates_from_nested_feed_payload(self) -> None:
module = importlib.import_module("XHS")
payload = {
"data": {
"items": [
{
"id": "note123",
"note_card": {
"display_title": "海边日落",
"user": {"nickname": "摄影师"},
"video": {
"media": {
"stream": {
"h264": [
{
"master_url": "https://sns-video.xhscdn.com/stream/a.mp4?sign=1",
"backup_urls": [
"https://sns-video.xhscdn.com/stream/a-backup.mp4?sign=1"
],
}
]
}
}
},
},
}
]
}
}
candidates = module.extract_video_candidates(payload)
self.assertEqual(len(candidates), 2)
self.assertEqual(candidates[0].video_id, "note123")
self.assertEqual(candidates[0].title, "海边日落")
self.assertEqual(candidates[0].author_name, "摄影师")
self.assertEqual(candidates[0].source_key, "master_url")
def test_build_output_path_uses_author_title_and_video_id(self) -> None:
module = importlib.import_module("XHS")
candidate = module.VideoCandidate(
video_id="note123",
title="海边日落",
video_url="https://sns-video.xhscdn.com/a.mp4",
author_name="摄影师",
source_key="master_url",
)
output_path = module.build_output_path(candidate)
self.assertEqual(output_path.as_posix(), "video/[摄影师]海边日落-note123.mp4")
def test_build_browser_address_from_port(self) -> None:
module = importlib.import_module("XHS")
self.assertEqual(module.build_browser_address(9224), "127.0.0.1:9224")
self.assertIsNone(module.build_browser_address(None))
def test_ensure_browser_debug_port_ready_accepts_open_port(self) -> None:
module = importlib.import_module("XHS")
connection = mock.MagicMock()
connection.__enter__.return_value = connection
connection.__exit__.return_value = False
with mock.patch.object(module.socket, "create_connection", return_value=connection) as mocked_connect:
module.ensure_browser_debug_port_ready(9224)
mocked_connect.assert_called_once()
def test_ensure_browser_debug_port_ready_rejects_closed_port(self) -> None:
module = importlib.import_module("XHS")
with mock.patch.object(module.socket, "create_connection", side_effect=OSError("boom")):
with self.assertRaisesRegex(RuntimeError, "login_xhs.py"):
module.ensure_browser_debug_port_ready(9224)
def test_extract_feed_payload_uses_dict_body(self) -> None:
module = importlib.import_module("XHS")
response = FakeResponse({"data": {"items": []}}, "")
self.assertEqual(module.extract_feed_payload(response), {"data": {"items": []}})
def test_extract_feed_payload_falls_back_to_raw_json(self) -> None:
module = importlib.import_module("XHS")
response = FakeResponse("", '{"data": {"items": [{"id": "1"}]}}')
self.assertEqual(
module.extract_feed_payload(response),
{"data": {"items": [{"id": "1"}]}},
)
def test_build_parser_uses_expected_defaults(self) -> None:
module = importlib.import_module("XHS")
args = module.build_parser().parse_args([])
self.assertEqual(args.max_videos, 10)
self.assertEqual(args.output_dir, "video")
self.assertEqual(args.browser_port, 9224)
self.assertEqual(args.timeout, 20)
self.assertEqual(args.start_url, module.DEFAULT_EXPLORE_URL)
def test_main_invokes_collect_videos_with_cli_values(self) -> None:
module = importlib.import_module("XHS")
with mock.patch.object(module, "collect_videos", return_value=3) as mocked_collect:
exit_code = module.main(
[
"--max-videos",
"3",
"--output-dir",
"downloads",
"--browser-port",
"9334",
"--timeout",
"7",
"--start-url",
"https://www.xiaohongshu.com/explore",
]
)
self.assertEqual(exit_code, 0)
mocked_collect.assert_called_once()
_, kwargs = mocked_collect.call_args
self.assertEqual(kwargs["max_videos"], 3)
self.assertEqual(kwargs["output_dir"].as_posix(), "downloads")
self.assertEqual(kwargs["browser_port"], 9334)
self.assertEqual(kwargs["timeout"], 7)
if __name__ == "__main__":
unittest.main()