Fix XHS video URL extraction

This commit is contained in:
wangshaoqing 2026-05-27 15:11:42 +08:00
parent 3a2a6a69e0
commit 16f262ada1
3 changed files with 164 additions and 17 deletions

2
.gitignore vendored
View File

@ -1,5 +1,7 @@
.venv/ .venv/
__pycache__/ __pycache__/
*.pyc *.pyc
.DS_Store
.xhs-chrome-profile/ .xhs-chrome-profile/
video/ video/
video_bad_*/

107
XHS.py
View File

@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import html
import json import json
import re import re
import socket import socket
@ -13,10 +14,10 @@ from typing import Any
DEFAULT_EXPLORE_URL = "https://www.xiaohongshu.com/explore" DEFAULT_EXPLORE_URL = "https://www.xiaohongshu.com/explore"
DEFAULT_BROWSER_PORT = 9224 DEFAULT_BROWSER_PORT = 9224
DEFAULT_OUTPUT_DIR = Path("video") DEFAULT_OUTPUT_DIR = Path("video")
LISTEN_TARGET = "feed" LISTEN_TARGET = "/api/sns/web/v1/feed"
MAX_FILENAME_BYTES = 240 MAX_FILENAME_BYTES = 240
INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]') INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]')
VIDEO_URL_KEYS = {"master_url", "backup_url", "backup_urls", "url"} VIDEO_URL_KEYS = {"master_url", "backup_url", "backup_urls"}
TITLE_KEYS = ("display_title", "title", "desc", "description") TITLE_KEYS = ("display_title", "title", "desc", "description")
ID_KEYS = ("id", "note_id", "noteId", "video_id", "file_id") ID_KEYS = ("id", "note_id", "noteId", "video_id", "file_id")
AUTHOR_KEYS = ("nickname", "name", "user_name", "userName") AUTHOR_KEYS = ("nickname", "name", "user_name", "userName")
@ -143,11 +144,41 @@ def extract_video_candidates(payload: Any) -> list[VideoCandidate]:
return candidates return candidates
def decode_html_video_url(value: str) -> str:
decoded = html.unescape(value)
return decoded.replace("\\u002F", "/").replace("\\/", "/")
def extract_video_candidates_from_html(source: str, video_id: str = "current-page") -> list[VideoCandidate]:
patterns = [
r'\\"master_url\\"\s*:\s*\\"(.*?)\\"',
r'"master_url"\s*:\s*"(.*?)"',
]
candidates: list[VideoCandidate] = []
seen_urls: set[str] = set()
for pattern in patterns:
for match in re.findall(pattern, source):
video_url = decode_html_video_url(match)
if video_url in seen_urls or not looks_like_video_url(video_url):
continue
seen_urls.add(video_url)
candidates.append(
VideoCandidate(
video_id=video_id,
title="current-page-video",
video_url=video_url,
author_name="unknown",
source_key="html_master_url",
)
)
return candidates
def choose_video_candidate(candidates: list[VideoCandidate]) -> VideoCandidate: def choose_video_candidate(candidates: list[VideoCandidate]) -> VideoCandidate:
if not candidates: if not candidates:
raise ValueError("没有可用的视频候选地址。") raise ValueError("没有可用的视频候选地址。")
source_priority = {"master_url": 0, "backup_url": 1, "backup_urls": 2, "url": 3} source_priority = {"master_url": 0, "html_master_url": 0, "backup_url": 1, "backup_urls": 2, "url": 3}
return sorted(candidates, key=lambda item: source_priority.get(item.source_key, 99))[0] return sorted(candidates, key=lambda item: source_priority.get(item.source_key, 99))[0]
@ -253,10 +284,31 @@ def download_video(
) -> None: ) -> None:
response = requests_module.get(video_url, headers=headers, timeout=60) response = requests_module.get(video_url, headers=headers, timeout=60)
response.raise_for_status() response.raise_for_status()
validate_video_response(response, video_url)
output_path.parent.mkdir(parents=True, exist_ok=True) output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(response.content) output_path.write_bytes(response.content)
def validate_video_response(response: Any, video_url: str) -> None:
content = getattr(response, "content", b"")
content_type = str(getattr(response, "headers", {}).get("content-type", "")).lower()
if content_type.startswith("image/"):
raise ValueError(f"非视频响应: {content_type} {video_url}")
if content.startswith(b"RIFF") and b"WEBP" in content[:16]:
raise ValueError(f"非视频响应: image/webp {video_url}")
if content.lstrip().startswith((b"<html", b"<!doctype", b"{")):
raise ValueError(f"非视频响应: {content_type or 'unknown'} {video_url}")
has_video_type = content_type.startswith("video/")
has_mp4_signature = len(content) >= 12 and content[4:8] == b"ftyp"
has_webm_signature = content.startswith(b"\x1a\x45\xdf\xa3")
if has_video_type or has_mp4_signature or has_webm_signature:
return
raise ValueError(f"非视频响应: {content_type or 'unknown'} {video_url}")
def wait_for_feed_packet(page: Any, timeout: int) -> Any | None: def wait_for_feed_packet(page: Any, timeout: int) -> Any | None:
try: try:
packet = page.listen.wait(timeout=timeout) packet = page.listen.wait(timeout=timeout)
@ -294,6 +346,7 @@ def collect_videos(
output_dir: Path, output_dir: Path,
browser_port: int | None, browser_port: int | None,
start_url: str = DEFAULT_EXPLORE_URL, start_url: str = DEFAULT_EXPLORE_URL,
use_current_page: bool = False,
) -> int: ) -> int:
requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies()
if browser_port is not None: if browser_port is not None:
@ -301,8 +354,12 @@ def collect_videos(
page = create_page(chromium_page_cls, chromium_options_cls, browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port)
page.listen.start(LISTEN_TARGET) page.listen.start(LISTEN_TARGET)
print("[INFO] 正在打开小红书发现页。若出现登录或验证码,请先在浏览器窗口里完成。") if use_current_page:
page.get(start_url) print(f"[INFO] 使用当前页面: {getattr(page, 'url', '')}")
page.refresh()
else:
print("[INFO] 正在打开小红书发现页。若出现登录或验证码,请先在浏览器窗口里完成。")
page.get(start_url)
time.sleep(3) time.sleep(3)
downloaded = 0 downloaded = 0
@ -314,18 +371,25 @@ def collect_videos(
while downloaded < max_videos and consecutive_empty < max_consecutive_empty: while downloaded < max_videos and consecutive_empty < max_consecutive_empty:
packet = wait_for_feed_packet(page, timeout=timeout) packet = wait_for_feed_packet(page, timeout=timeout)
if packet is None: if packet is None:
consecutive_empty += 1 candidates = group_video_candidates(
scroll_feed(page) extract_video_candidates_from_html(
continue page.run_js("return document.documentElement.outerHTML"),
video_id=extract_note_id_from_url(getattr(page, "url", "")),
try: )
payload = extract_feed_payload(packet.response) )
candidates = group_video_candidates(extract_video_candidates(payload)) if not candidates:
except Exception as exc: consecutive_empty += 1
print(f"[WARN] 解析 feed 数据失败: {exc}") scroll_feed(page)
consecutive_empty += 1 continue
scroll_feed(page) else:
continue try:
payload = extract_feed_payload(packet.response)
candidates = group_video_candidates(extract_video_candidates(payload))
except Exception as exc:
print(f"[WARN] 解析 feed 数据失败: {exc}")
consecutive_empty += 1
scroll_feed(page)
continue
fresh_candidates = [candidate for candidate in candidates if candidate.video_url not in seen_urls] fresh_candidates = [candidate for candidate in candidates if candidate.video_url not in seen_urls]
if not fresh_candidates: if not fresh_candidates:
@ -365,6 +429,13 @@ def collect_videos(
return downloaded return downloaded
def extract_note_id_from_url(url: str) -> str:
match = re.search(r"/explore/([^/?#]+)", url)
if match:
return match.group(1)
return "current-page"
def build_parser() -> argparse.ArgumentParser: def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="附着到已登录小红书 Chrome监听 feed 响应并下载视频") parser = argparse.ArgumentParser(description="附着到已登录小红书 Chrome监听 feed 响应并下载视频")
parser.add_argument("--max-videos", type=int, default=10, help="最多下载视频数量,默认 10") parser.add_argument("--max-videos", type=int, default=10, help="最多下载视频数量,默认 10")
@ -372,6 +443,7 @@ def build_parser() -> argparse.ArgumentParser:
parser.add_argument("--browser-port", type=int, default=DEFAULT_BROWSER_PORT, help="Chrome 调试端口,默认 9224") parser.add_argument("--browser-port", type=int, default=DEFAULT_BROWSER_PORT, help="Chrome 调试端口,默认 9224")
parser.add_argument("--timeout", type=int, default=20, help="等待单次 feed 响应的秒数,默认 20") parser.add_argument("--timeout", type=int, default=20, help="等待单次 feed 响应的秒数,默认 20")
parser.add_argument("--start-url", default=DEFAULT_EXPLORE_URL, help="打开或刷新使用的小红书页面") parser.add_argument("--start-url", default=DEFAULT_EXPLORE_URL, help="打开或刷新使用的小红书页面")
parser.add_argument("--use-current-page", action="store_true", help="使用浏览器当前页面,不强制打开发现页")
return parser return parser
@ -388,6 +460,7 @@ def main(argv: list[str] | None = None) -> int:
output_dir=Path(args.output_dir), output_dir=Path(args.output_dir),
browser_port=args.browser_port, browser_port=args.browser_port,
start_url=args.start_url, start_url=args.start_url,
use_current_page=args.use_current_page,
) )
print(f"[INFO] 本次共下载 {downloaded} 个视频。") print(f"[INFO] 本次共下载 {downloaded} 个视频。")
return 0 return 0

View File

@ -9,6 +9,25 @@ class FakeResponse:
self.raw_body = raw_body self.raw_body = raw_body
class FakeDownloadResponse:
def __init__(self, content: bytes, content_type: str = "video/mp4", status_code: int = 200):
self.content = content
self.headers = {"content-type": content_type}
self.status_code = status_code
def raise_for_status(self) -> None:
if self.status_code >= 400:
raise RuntimeError(f"HTTP {self.status_code}")
class FakeRequests:
def __init__(self, response: FakeDownloadResponse):
self.response = response
def get(self, video_url, headers, timeout):
return self.response
class XhsModuleTests(unittest.TestCase): class XhsModuleTests(unittest.TestCase):
def test_module_can_import_without_optional_runtime_dependencies(self) -> None: def test_module_can_import_without_optional_runtime_dependencies(self) -> None:
module = importlib.import_module("XHS") module = importlib.import_module("XHS")
@ -94,6 +113,32 @@ class XhsModuleTests(unittest.TestCase):
self.assertEqual(candidates[0].author_name, "摄影师") self.assertEqual(candidates[0].author_name, "摄影师")
self.assertEqual(candidates[0].source_key, "master_url") self.assertEqual(candidates[0].source_key, "master_url")
def test_extract_video_candidates_ignores_plain_image_url_fields(self) -> None:
module = importlib.import_module("XHS")
payload = {
"id": "note-image",
"display_title": "图片笔记",
"user": {"nickname": "作者"},
"image_list": [
{"url": "https://sns-img.xhscdn.com/example.webp"},
{"url": "https://sns-img.xhscdn.com/example.jpg"},
],
}
self.assertEqual(module.extract_video_candidates(payload), [])
def test_extract_video_candidates_from_escaped_html_state(self) -> None:
module = importlib.import_module("XHS")
html = (
'<script>{"display_title":"视频标题","nickname":"作者",'
r'\"master_url\":\"http:\/\/sns-video-qc.xhscdn.com\/stream\/a.mp4?sign=1&t=2\"'
'}</script>'
)
candidates = module.extract_video_candidates_from_html(html, video_id="note123")
self.assertEqual(len(candidates), 1)
self.assertEqual(candidates[0].video_id, "note123")
self.assertEqual(candidates[0].video_url, "http://sns-video-qc.xhscdn.com/stream/a.mp4?sign=1&t=2")
self.assertEqual(candidates[0].source_key, "html_master_url")
def test_build_output_path_uses_author_title_and_video_id(self) -> None: def test_build_output_path_uses_author_title_and_video_id(self) -> None:
module = importlib.import_module("XHS") module = importlib.import_module("XHS")
candidate = module.VideoCandidate( candidate = module.VideoCandidate(
@ -147,6 +192,7 @@ class XhsModuleTests(unittest.TestCase):
self.assertEqual(args.browser_port, 9224) self.assertEqual(args.browser_port, 9224)
self.assertEqual(args.timeout, 20) self.assertEqual(args.timeout, 20)
self.assertEqual(args.start_url, module.DEFAULT_EXPLORE_URL) self.assertEqual(args.start_url, module.DEFAULT_EXPLORE_URL)
self.assertFalse(args.use_current_page)
def test_main_invokes_collect_videos_with_cli_values(self) -> None: def test_main_invokes_collect_videos_with_cli_values(self) -> None:
module = importlib.import_module("XHS") module = importlib.import_module("XHS")
@ -172,6 +218,32 @@ class XhsModuleTests(unittest.TestCase):
self.assertEqual(kwargs["output_dir"].as_posix(), "downloads") self.assertEqual(kwargs["output_dir"].as_posix(), "downloads")
self.assertEqual(kwargs["browser_port"], 9334) self.assertEqual(kwargs["browser_port"], 9334)
self.assertEqual(kwargs["timeout"], 7) self.assertEqual(kwargs["timeout"], 7)
self.assertFalse(kwargs["use_current_page"])
def test_download_video_rejects_webp_response_before_writing_file(self) -> None:
module = importlib.import_module("XHS")
response = FakeDownloadResponse(b"RIFF....WEBP", content_type="image/webp")
with self.assertRaisesRegex(ValueError, "非视频响应"):
module.download_video(
requests_module=FakeRequests(response),
headers={},
video_url="https://sns-img.xhscdn.com/example.webp",
output_path=mock.MagicMock(),
)
def test_download_video_accepts_mp4_signature(self) -> None:
module = importlib.import_module("XHS")
output_path = mock.MagicMock()
output_path.parent.mkdir = mock.MagicMock()
output_path.write_bytes = mock.MagicMock()
response = FakeDownloadResponse(b"\x00\x00\x00\x18ftypmp42payload", content_type="application/octet-stream")
module.download_video(
requests_module=FakeRequests(response),
headers={},
video_url="https://sns-video.xhscdn.com/example.mp4",
output_path=output_path,
)
output_path.write_bytes.assert_called_once_with(b"\x00\x00\x00\x18ftypmp42payload")
if __name__ == "__main__": if __name__ == "__main__":