From 16f262ada1bef07d373862644265a105265bb8a1 Mon Sep 17 00:00:00 2001 From: wangshaoqing Date: Wed, 27 May 2026 15:11:42 +0800 Subject: [PATCH] Fix XHS video URL extraction --- .gitignore | 2 + XHS.py | 107 +++++++++++++++++++++++++++++++++++++++++++--------- test_xhs.py | 72 +++++++++++++++++++++++++++++++++++ 3 files changed, 164 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index ad09369..b9b14ac 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ .venv/ __pycache__/ *.pyc +.DS_Store .xhs-chrome-profile/ video/ +video_bad_*/ diff --git a/XHS.py b/XHS.py index 39012c9..abbf4ca 100644 --- a/XHS.py +++ b/XHS.py @@ -1,6 +1,7 @@ from __future__ import annotations import argparse +import html import json import re import socket @@ -13,10 +14,10 @@ from typing import Any DEFAULT_EXPLORE_URL = "https://www.xiaohongshu.com/explore" DEFAULT_BROWSER_PORT = 9224 DEFAULT_OUTPUT_DIR = Path("video") -LISTEN_TARGET = "feed" +LISTEN_TARGET = "/api/sns/web/v1/feed" MAX_FILENAME_BYTES = 240 INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]') -VIDEO_URL_KEYS = {"master_url", "backup_url", "backup_urls", "url"} +VIDEO_URL_KEYS = {"master_url", "backup_url", "backup_urls"} TITLE_KEYS = ("display_title", "title", "desc", "description") ID_KEYS = ("id", "note_id", "noteId", "video_id", "file_id") AUTHOR_KEYS = ("nickname", "name", "user_name", "userName") @@ -143,11 +144,41 @@ def extract_video_candidates(payload: Any) -> list[VideoCandidate]: return candidates +def decode_html_video_url(value: str) -> str: + decoded = html.unescape(value) + return decoded.replace("\\u002F", "/").replace("\\/", "/") + + +def extract_video_candidates_from_html(source: str, video_id: str = "current-page") -> list[VideoCandidate]: + patterns = [ + r'\\"master_url\\"\s*:\s*\\"(.*?)\\"', + r'"master_url"\s*:\s*"(.*?)"', + ] + candidates: list[VideoCandidate] = [] + seen_urls: set[str] = set() + for pattern in patterns: + for match in re.findall(pattern, source): + video_url = decode_html_video_url(match) + if video_url in seen_urls or not looks_like_video_url(video_url): + continue + seen_urls.add(video_url) + candidates.append( + VideoCandidate( + video_id=video_id, + title="current-page-video", + video_url=video_url, + author_name="unknown", + source_key="html_master_url", + ) + ) + return candidates + + def choose_video_candidate(candidates: list[VideoCandidate]) -> VideoCandidate: if not candidates: raise ValueError("没有可用的视频候选地址。") - source_priority = {"master_url": 0, "backup_url": 1, "backup_urls": 2, "url": 3} + source_priority = {"master_url": 0, "html_master_url": 0, "backup_url": 1, "backup_urls": 2, "url": 3} return sorted(candidates, key=lambda item: source_priority.get(item.source_key, 99))[0] @@ -253,10 +284,31 @@ def download_video( ) -> None: response = requests_module.get(video_url, headers=headers, timeout=60) response.raise_for_status() + validate_video_response(response, video_url) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_bytes(response.content) +def validate_video_response(response: Any, video_url: str) -> None: + content = getattr(response, "content", b"") + content_type = str(getattr(response, "headers", {}).get("content-type", "")).lower() + + if content_type.startswith("image/"): + raise ValueError(f"非视频响应: {content_type} {video_url}") + if content.startswith(b"RIFF") and b"WEBP" in content[:16]: + raise ValueError(f"非视频响应: image/webp {video_url}") + if content.lstrip().startswith((b"= 12 and content[4:8] == b"ftyp" + has_webm_signature = content.startswith(b"\x1a\x45\xdf\xa3") + if has_video_type or has_mp4_signature or has_webm_signature: + return + + raise ValueError(f"非视频响应: {content_type or 'unknown'} {video_url}") + + def wait_for_feed_packet(page: Any, timeout: int) -> Any | None: try: packet = page.listen.wait(timeout=timeout) @@ -294,6 +346,7 @@ def collect_videos( output_dir: Path, browser_port: int | None, start_url: str = DEFAULT_EXPLORE_URL, + use_current_page: bool = False, ) -> int: requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() if browser_port is not None: @@ -301,8 +354,12 @@ def collect_videos( page = create_page(chromium_page_cls, chromium_options_cls, browser_port) page.listen.start(LISTEN_TARGET) - print("[INFO] 正在打开小红书发现页。若出现登录或验证码,请先在浏览器窗口里完成。") - page.get(start_url) + if use_current_page: + print(f"[INFO] 使用当前页面: {getattr(page, 'url', '')}") + page.refresh() + else: + print("[INFO] 正在打开小红书发现页。若出现登录或验证码,请先在浏览器窗口里完成。") + page.get(start_url) time.sleep(3) downloaded = 0 @@ -314,18 +371,25 @@ def collect_videos( while downloaded < max_videos and consecutive_empty < max_consecutive_empty: packet = wait_for_feed_packet(page, timeout=timeout) if packet is None: - consecutive_empty += 1 - scroll_feed(page) - continue - - try: - payload = extract_feed_payload(packet.response) - candidates = group_video_candidates(extract_video_candidates(payload)) - except Exception as exc: - print(f"[WARN] 解析 feed 数据失败: {exc}") - consecutive_empty += 1 - scroll_feed(page) - continue + candidates = group_video_candidates( + extract_video_candidates_from_html( + page.run_js("return document.documentElement.outerHTML"), + video_id=extract_note_id_from_url(getattr(page, "url", "")), + ) + ) + if not candidates: + consecutive_empty += 1 + scroll_feed(page) + continue + else: + try: + payload = extract_feed_payload(packet.response) + candidates = group_video_candidates(extract_video_candidates(payload)) + except Exception as exc: + print(f"[WARN] 解析 feed 数据失败: {exc}") + consecutive_empty += 1 + scroll_feed(page) + continue fresh_candidates = [candidate for candidate in candidates if candidate.video_url not in seen_urls] if not fresh_candidates: @@ -365,6 +429,13 @@ def collect_videos( return downloaded +def extract_note_id_from_url(url: str) -> str: + match = re.search(r"/explore/([^/?#]+)", url) + if match: + return match.group(1) + return "current-page" + + def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="附着到已登录小红书 Chrome,监听 feed 响应并下载视频") parser.add_argument("--max-videos", type=int, default=10, help="最多下载视频数量,默认 10") @@ -372,6 +443,7 @@ def build_parser() -> argparse.ArgumentParser: parser.add_argument("--browser-port", type=int, default=DEFAULT_BROWSER_PORT, help="Chrome 调试端口,默认 9224") parser.add_argument("--timeout", type=int, default=20, help="等待单次 feed 响应的秒数,默认 20") parser.add_argument("--start-url", default=DEFAULT_EXPLORE_URL, help="打开或刷新使用的小红书页面") + parser.add_argument("--use-current-page", action="store_true", help="使用浏览器当前页面,不强制打开发现页") return parser @@ -388,6 +460,7 @@ def main(argv: list[str] | None = None) -> int: output_dir=Path(args.output_dir), browser_port=args.browser_port, start_url=args.start_url, + use_current_page=args.use_current_page, ) print(f"[INFO] 本次共下载 {downloaded} 个视频。") return 0 diff --git a/test_xhs.py b/test_xhs.py index 1cf224d..4132d46 100644 --- a/test_xhs.py +++ b/test_xhs.py @@ -9,6 +9,25 @@ class FakeResponse: self.raw_body = raw_body +class FakeDownloadResponse: + def __init__(self, content: bytes, content_type: str = "video/mp4", status_code: int = 200): + self.content = content + self.headers = {"content-type": content_type} + self.status_code = status_code + + def raise_for_status(self) -> None: + if self.status_code >= 400: + raise RuntimeError(f"HTTP {self.status_code}") + + +class FakeRequests: + def __init__(self, response: FakeDownloadResponse): + self.response = response + + def get(self, video_url, headers, timeout): + return self.response + + class XhsModuleTests(unittest.TestCase): def test_module_can_import_without_optional_runtime_dependencies(self) -> None: module = importlib.import_module("XHS") @@ -94,6 +113,32 @@ class XhsModuleTests(unittest.TestCase): self.assertEqual(candidates[0].author_name, "摄影师") self.assertEqual(candidates[0].source_key, "master_url") + def test_extract_video_candidates_ignores_plain_image_url_fields(self) -> None: + module = importlib.import_module("XHS") + payload = { + "id": "note-image", + "display_title": "图片笔记", + "user": {"nickname": "作者"}, + "image_list": [ + {"url": "https://sns-img.xhscdn.com/example.webp"}, + {"url": "https://sns-img.xhscdn.com/example.jpg"}, + ], + } + self.assertEqual(module.extract_video_candidates(payload), []) + + def test_extract_video_candidates_from_escaped_html_state(self) -> None: + module = importlib.import_module("XHS") + html = ( + '' + ) + candidates = module.extract_video_candidates_from_html(html, video_id="note123") + self.assertEqual(len(candidates), 1) + self.assertEqual(candidates[0].video_id, "note123") + self.assertEqual(candidates[0].video_url, "http://sns-video-qc.xhscdn.com/stream/a.mp4?sign=1&t=2") + self.assertEqual(candidates[0].source_key, "html_master_url") + def test_build_output_path_uses_author_title_and_video_id(self) -> None: module = importlib.import_module("XHS") candidate = module.VideoCandidate( @@ -147,6 +192,7 @@ class XhsModuleTests(unittest.TestCase): self.assertEqual(args.browser_port, 9224) self.assertEqual(args.timeout, 20) self.assertEqual(args.start_url, module.DEFAULT_EXPLORE_URL) + self.assertFalse(args.use_current_page) def test_main_invokes_collect_videos_with_cli_values(self) -> None: module = importlib.import_module("XHS") @@ -172,6 +218,32 @@ class XhsModuleTests(unittest.TestCase): self.assertEqual(kwargs["output_dir"].as_posix(), "downloads") self.assertEqual(kwargs["browser_port"], 9334) self.assertEqual(kwargs["timeout"], 7) + self.assertFalse(kwargs["use_current_page"]) + + def test_download_video_rejects_webp_response_before_writing_file(self) -> None: + module = importlib.import_module("XHS") + response = FakeDownloadResponse(b"RIFF....WEBP", content_type="image/webp") + with self.assertRaisesRegex(ValueError, "非视频响应"): + module.download_video( + requests_module=FakeRequests(response), + headers={}, + video_url="https://sns-img.xhscdn.com/example.webp", + output_path=mock.MagicMock(), + ) + + def test_download_video_accepts_mp4_signature(self) -> None: + module = importlib.import_module("XHS") + output_path = mock.MagicMock() + output_path.parent.mkdir = mock.MagicMock() + output_path.write_bytes = mock.MagicMock() + response = FakeDownloadResponse(b"\x00\x00\x00\x18ftypmp42payload", content_type="application/octet-stream") + module.download_video( + requests_module=FakeRequests(response), + headers={}, + video_url="https://sns-video.xhscdn.com/example.mp4", + output_path=output_path, + ) + output_path.write_bytes.assert_called_once_with(b"\x00\x00\x00\x18ftypmp42payload") if __name__ == "__main__":