From dea08527eb71c9df19e79183450adad41a8873e7 Mon Sep 17 00:00:00 2001 From: wangshaoqing Date: Wed, 27 May 2026 18:29:54 +0800 Subject: [PATCH] Save metadata for downloaded XHS videos --- README.md | 2 + XHS.py | 354 +++++++++++++++++++++++++++++++++++++++++++++++++++- test_xhs.py | 123 ++++++++++++++++++ 3 files changed, 477 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index eeb1ae8..4f154b4 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ pip install requests DrissionPage --source video-channel \ --target-videos 1000 \ --queue-file data/xhs_queue.jsonl \ + --metadata-file data/xhs_metadata.jsonl \ --max-runtime 7200 # 搜索关键词结果下载:例如猫咪相关视频 @@ -110,6 +111,7 @@ pip install requests DrissionPage - 默认会在发现页和详情页之间随机停留、上下滚动,并在阶段下载后长停留。 - 下载过程会去重,并在单个视频失败时继续处理后续视频。 - 队列模式会把笔记链接和处理状态保存到 JSONL 文件,支持长任务恢复。 +- 队列模式下载成功后会追加写入元数据 JSONL,包含 note id、标题、描述、封面、作者、点赞/收藏/评论/分享数、视频地址、保存路径,以及页面可见评论(默认最多 20 条,评论不可见时为空数组)。 ## 测试 diff --git a/XHS.py b/XHS.py index 3dc68ac..d2432ec 100644 --- a/XHS.py +++ b/XHS.py @@ -27,6 +27,10 @@ VIDEO_URL_KEYS = {"master_url", "backup_url", "backup_urls"} TITLE_KEYS = ("display_title", "title", "desc", "description") ID_KEYS = ("id", "note_id", "noteId", "video_id", "file_id") AUTHOR_KEYS = ("nickname", "name", "user_name", "userName") +AUTHOR_ID_KEYS = ("user_id", "userId", "id", "uid") +AVATAR_KEYS = ("avatar", "image", "image_url", "avatar_url") +COVER_KEYS = ("cover", "cover_url", "image", "image_url", "url", "thumbnail") +STAT_KEYS = ("liked_count", "collected_count", "comment_count", "share_count") @dataclass(frozen=True) @@ -120,6 +124,168 @@ def first_string_by_keys(value: Any, keys: tuple[str, ...]) -> str | None: return None +def first_value_by_keys(value: Any, keys: tuple[str, ...]) -> Any: + if isinstance(value, dict): + for key in keys: + candidate = value.get(key) + if candidate not in (None, ""): + return candidate + for child in value.values(): + found = first_value_by_keys(child, keys) + if found not in (None, ""): + return found + elif isinstance(value, list): + for child in value: + found = first_value_by_keys(child, keys) + if found not in (None, ""): + return found + return None + + +def stringify_metadata_value(value: Any) -> str: + if value is None: + return "" + if isinstance(value, str): + return value.strip() + if isinstance(value, (int, float)): + return str(value) + return "" + + +def looks_like_image_url(value: str) -> bool: + normalized = value.strip() + return normalized.startswith(("http://", "https://")) and ( + "sns-img" in normalized + or "xhscdn.com" in normalized + or any(ext in normalized.lower() for ext in (".jpg", ".jpeg", ".png", ".webp")) + ) + + +def first_image_url(value: Any) -> str: + if isinstance(value, str): + return value.strip() if looks_like_image_url(value) else "" + if isinstance(value, dict): + for key in COVER_KEYS: + candidate = value.get(key) + found = first_image_url(candidate) + if found: + return found + for child in value.values(): + found = first_image_url(child) + if found: + return found + elif isinstance(value, list): + for child in value: + found = first_image_url(child) + if found: + return found + return "" + + +def find_note_card(value: Any, note_id: str) -> dict[str, Any]: + if isinstance(value, dict): + note_card = value.get("note_card") or value.get("noteCard") + if isinstance(note_card, dict): + return note_card + for child in value.values(): + found = find_note_card(child, note_id) + if found: + return found + candidate_id = first_string_by_keys(value, ID_KEYS) + if not note_id or candidate_id == note_id: + return value + elif isinstance(value, list): + for child in value: + found = find_note_card(child, note_id) + if found: + return found + return {} + + +def extract_note_metadata(payload: Any, note_id: str = "") -> dict[str, Any]: + card = find_note_card(payload, note_id) + user = card.get("user") or card.get("author") if isinstance(card, dict) else {} + if not isinstance(user, dict): + user = {} + interact_info = card.get("interact_info") or card.get("interactInfo") if isinstance(card, dict) else {} + if not isinstance(interact_info, dict): + interact_info = {} + + resolved_note_id = note_id or first_string_by_keys(card, ID_KEYS) or "" + stats = { + key: stringify_metadata_value(first_value_by_keys(interact_info, (key,))) + for key in STAT_KEYS + } + return { + "note_id": resolved_note_id, + "title": first_string_by_keys(card, TITLE_KEYS) or "", + "description": first_string_by_keys(card, ("desc", "description")) or "", + "cover_url": first_image_url(card), + "author": { + "id": first_string_by_keys(user, AUTHOR_ID_KEYS) or "", + "nickname": first_string_by_keys(user, AUTHOR_KEYS) or "", + "avatar_url": first_image_url(user), + }, + "stats": stats, + } + + +def build_download_metadata_record( + base_metadata: dict[str, Any], + candidate: VideoCandidate, + queue_record: QueueRecord, + output_path: Path, + comments: list[dict[str, Any]] | None = None, + comments_error: str = "", +) -> dict[str, Any]: + record = dict(base_metadata) + record["note_id"] = record.get("note_id") or queue_record.note_id or candidate.video_id + record["title"] = record.get("title") or candidate.title + author = record.get("author") + if not isinstance(author, dict): + author = {} + author["nickname"] = author.get("nickname") or candidate.author_name + record["author"] = author + record["source"] = queue_record.source + record["note_url"] = queue_record.url + record["video_url"] = candidate.video_url + record["video_source_key"] = candidate.source_key + record["downloaded_path"] = output_path.as_posix() + record["downloaded_at"] = current_timestamp() + record["comments"] = comments or [] + if comments_error: + record["comments_error"] = comments_error + return record + + +def merge_metadata(primary: dict[str, Any], secondary: dict[str, Any]) -> dict[str, Any]: + merged = dict(primary) + for key, value in secondary.items(): + if key == "author" and isinstance(value, dict): + author = dict(merged.get("author") if isinstance(merged.get("author"), dict) else {}) + for author_key, author_value in value.items(): + if author_value: + author[author_key] = author_value + merged["author"] = author + elif key == "stats" and isinstance(value, dict): + stats = dict(merged.get("stats") if isinstance(merged.get("stats"), dict) else {}) + for stat_key, stat_value in value.items(): + if stat_value: + stats[stat_key] = stat_value + merged["stats"] = stats + elif key == "comments" and value: + merged[key] = value + elif value: + merged[key] = value + return merged + + +def append_jsonl_record(path: Path, record: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("a", encoding="utf-8") as file: + file.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n") + + def find_nearest_note_context(path: tuple[Any, ...]) -> dict[str, str]: video_id = "" title = "" @@ -217,6 +383,167 @@ def extract_video_candidates_from_html(source: str, video_id: str = "current-pag return candidates +def extract_note_metadata_from_html(source: str, note_id: str = "") -> dict[str, Any]: + def first_pattern(patterns: list[str]) -> str: + for pattern in patterns: + match = re.search(pattern, source, flags=re.DOTALL) + if match: + return decode_html_video_url(match.group(1)).strip() + return "" + + metadata = { + "note_id": note_id, + "title": first_pattern( + [ + r'"display_title"\s*:\s*"([^"]+)"', + r'\\"display_title\\"\s*:\s*\\"(.*?)\\"', + r'"title"\s*:\s*"([^"]+)"', + ] + ), + "description": first_pattern( + [ + r'"desc"\s*:\s*"([^"]+)"', + r'\\"desc\\"\s*:\s*\\"(.*?)\\"', + r'"description"\s*:\s*"([^"]+)"', + ] + ), + "cover_url": first_pattern( + [ + r'"cover_url"\s*:\s*"([^"]+)"', + r'\\"cover_url\\"\s*:\s*\\"(.*?)\\"', + r'"url"\s*:\s*"(https?://sns-img[^"]+)"', + r'\\"url\\"\s*:\s*\\"(https?:\\?/\\?/sns-img.*?)(? dict[str, Any]: + script = f""" +const pickText = (...selectors) => {{ + for (const selector of selectors) {{ + const element = document.querySelector(selector); + const text = element ? (element.innerText || element.textContent || '').trim() : ''; + if (text) return text; + }} + return ''; +}}; +const pickAttr = (selector, attr) => {{ + const element = document.querySelector(selector); + return element ? (element.getAttribute(attr) || element[attr] || '') : ''; +}}; +const clean = (value) => (value || '').trim(); +const cleanCount = (value) => {{ + const text = clean(value); + return ['赞', '回复', '分享', '评论', '收藏'].includes(text) ? '' : text; +}}; +const metaContent = (...selectors) => {{ + for (const selector of selectors) {{ + const element = document.querySelector(selector); + const content = element ? clean(element.getAttribute('content') || '') : ''; + if (content) return content; + }} + return ''; +}}; +const authorRoot = document.querySelector('.author-container') + || document.querySelector('.note-content .author') + || document.querySelector('.interaction-container .author') + || document.querySelector('.author'); +const profile = authorRoot ? authorRoot.querySelector('a[href*="/user/profile/"]') : null; +const profileUrl = profile ? (profile.href || profile.getAttribute('href') || '') : ''; +const profileMatch = profileUrl.match(/\\/user\\/profile\\/([^/?#]+)/); +const actionRoot = document.querySelector('.interact-container .buttons .left') + || document.querySelector('.interact-container .left') + || document.querySelector('.buttons .left') + || document.querySelector('.interact-container'); +const comments = Array.from(document.querySelectorAll('.comments-container .comment-item:not(.comment-item-sub), .comment-item:not(.comment-item-sub)')) + .slice(0, {int(max_comments)}) + .map((item) => {{ + const commentId = item.id || ''; + const author = clean(item.querySelector('.author .name, .name, .user-name')?.innerText || ''); + const content = clean(item.querySelector('.content, .note-text, .comment-content')?.innerText || ''); + const liked = cleanCount(item.querySelector('.like-wrapper .count, .like .count, .like-wrapper')?.innerText || ''); + const time = clean(item.querySelector('.date, .time, .location')?.innerText || ''); + return {{comment_id: commentId, author, content, liked_count: liked, time}}; + }}) + .filter((comment) => comment.author || comment.content); +const dedupedComments = []; +const seenComments = new Set(); +for (const comment of comments) {{ + const key = comment.comment_id || `${{comment.author}}\\n${{comment.content}}`; + if (seenComments.has(key)) continue; + seenComments.add(key); + dedupedComments.push(comment); +}} +return {{ + note_id: '', + title: pickText('#detail-title', '.note-content .title', '.interaction-container .title', '.title') + || metaContent('meta[name="og:title"]', 'meta[property="og:title"]'), + description: pickText('.note-content .desc', '.desc', '.note-text') + || metaContent('meta[name="description"]', 'meta[property="og:description"]'), + cover_url: metaContent('meta[name="og:image"]', 'meta[property="og:image"]') + || pickAttr('.swiper-slide-active img, .media-container img, .note-slider-img, .cover img, video', 'poster') + || pickAttr('.swiper-slide-active img, .media-container img, .note-slider-img, .cover img', 'src'), + author: {{ + id: profileMatch ? profileMatch[1] : '', + nickname: authorRoot ? clean(authorRoot.querySelector('.name, .user-name, .nickname')?.innerText || authorRoot.innerText || '') : '', + avatar_url: authorRoot ? clean(authorRoot.querySelector('img')?.src || '') : '', + profile_url: profileUrl, + }}, + stats: {{ + liked_count: cleanCount(actionRoot?.querySelector('.like-wrapper .count')?.innerText || metaContent('meta[name="og:xhs:note_like"]')), + collected_count: cleanCount(actionRoot?.querySelector('.collect-wrapper .count')?.innerText || metaContent('meta[name="og:xhs:note_collect"]')), + comment_count: cleanCount(actionRoot?.querySelector('.chat-wrapper .count')?.innerText || metaContent('meta[name="og:xhs:note_comment"]')), + share_count: cleanCount(document.querySelector('.interact-container .share-wrapper .count, .buttons .share-wrapper .count')?.innerText || ''), + }}, + comments: dedupedComments.slice(0, {int(max_comments)}), +}}; +""" + try: + metadata = page.run_js(script) + except Exception: + metadata = None + if not isinstance(metadata, dict): + metadata = {} + metadata["note_id"] = metadata.get("note_id") or note_id + metadata.setdefault("author", {}) + metadata.setdefault("stats", {}) + metadata.setdefault("comments", []) + return metadata + + def choose_video_candidate(candidates: list[VideoCandidate]) -> VideoCandidate: if not candidates: raise ValueError("没有可用的视频候选地址。") @@ -837,6 +1164,8 @@ def run_queue_download( ) -> int: timeout = int(kwargs.get("timeout", 20)) output_dir = Path(kwargs.get("output_dir", DEFAULT_OUTPUT_DIR)) + metadata_file = Path(kwargs.get("metadata_file") or output_dir / "metadata.jsonl") + max_comments = int(kwargs.get("max_comments", 20)) browser_port = kwargs.get("browser_port", DEFAULT_BROWSER_PORT) human_settings = HumanBrowseSettings( enabled=bool(kwargs.get("human_mode", True)), @@ -907,16 +1236,24 @@ def run_queue_download( human_pause(human_settings) if human_settings.enabled: run_human_browse_sequence(page, create_human_browse_plan(human_settings)) + page_html = page.run_js("return document.documentElement.outerHTML") + metadata = merge_metadata( + extract_note_metadata_from_html(page_html, note_id=record.note_id), + extract_note_metadata_from_page(page, note_id=record.note_id, max_comments=max_comments), + ) candidates = group_video_candidates( extract_video_candidates_from_html( - page.run_js("return document.documentElement.outerHTML"), + page_html, video_id=record.note_id, ) ) if not candidates: packet = wait_for_feed_packet(page, timeout=timeout) if packet is not None: - candidates = group_video_candidates(extract_video_candidates(extract_feed_payload(packet.response))) + payload = extract_feed_payload(packet.response) + payload_metadata = extract_note_metadata(payload, note_id=record.note_id) + metadata = merge_metadata(metadata, payload_metadata) + candidates = group_video_candidates(extract_video_candidates(payload)) if not candidates: records[index] = mark_queue_record_skipped(record, "no video candidate") save_queue(queue_file, records) @@ -939,6 +1276,15 @@ def run_queue_download( video_url=candidate.video_url, output_path=output_path, ) + append_jsonl_record( + metadata_file, + build_download_metadata_record( + base_metadata=metadata, + candidate=candidate, + queue_record=record, + output_path=output_path, + ), + ) seen_files.add(output_path) records[index] = mark_queue_record_downloaded(record, output_path) save_queue(queue_file, records) @@ -981,6 +1327,8 @@ def build_parser() -> argparse.ArgumentParser: parser.add_argument("--keyword", default=None, help="搜索来源关键词,例如 猫咪") parser.add_argument("--target-videos", type=int, default=0, help="队列模式目标下载数量,0 表示不启用") parser.add_argument("--queue-file", default=None, help="JSONL 队列文件路径,提供后启用可恢复队列模式") + parser.add_argument("--metadata-file", default=None, help="下载成功后追加写入的元数据 JSONL 路径,默认 output-dir/metadata.jsonl") + parser.add_argument("--max-comments", type=int, default=20, help="随元数据保存的可见热门评论数量,默认 20") parser.add_argument("--retry-limit", type=int, default=1, help="队列项下载失败重试次数,默认 1") return parser @@ -1004,6 +1352,8 @@ def main(argv: list[str] | None = None) -> int: keyword=args.keyword, timeout=args.timeout, output_dir=Path(args.output_dir), + metadata_file=Path(args.metadata_file) if args.metadata_file else None, + max_comments=args.max_comments, browser_port=args.browser_port, human_mode=args.human_mode, min_wait=args.min_wait, diff --git a/test_xhs.py b/test_xhs.py index a1f251b..9af60a3 100644 --- a/test_xhs.py +++ b/test_xhs.py @@ -89,6 +89,38 @@ class FakeVideoOnlyLinkPage: ] +class FakeMetadataPage: + def run_js(self, script): + if "detail-title" not in script: + return None + return { + "note_id": "", + "title": "这一碗能把面食脑袋香得七荤八素", + "description": "#豆角焖面 #面条", + "cover_url": "https://sns-img.xhscdn.com/cover.jpg", + "author": { + "id": "author123", + "nickname": "日食记", + "avatar_url": "https://sns-avatar.xhscdn.com/a.jpg", + "profile_url": "https://www.xiaohongshu.com/user/profile/author123", + }, + "stats": { + "liked_count": "3.5万", + "collected_count": "2.5万", + "comment_count": "1220", + "share_count": "", + }, + "comments": [ + { + "author": "莫多西卡多西", + "content": "不相信面能熟", + "liked_count": "290", + "time": "5天前重庆", + } + ], + } + + class XhsModuleTests(unittest.TestCase): def test_module_can_import_without_optional_runtime_dependencies(self) -> None: module = importlib.import_module("XHS") @@ -187,6 +219,97 @@ class XhsModuleTests(unittest.TestCase): } self.assertEqual(module.extract_video_candidates(payload), []) + def test_extract_metadata_from_nested_note_payload(self) -> None: + module = importlib.import_module("XHS") + payload = { + "data": { + "items": [ + { + "id": "note123", + "note_card": { + "display_title": "海边日落", + "desc": "一段描述", + "cover": {"url": "https://sns-img.xhscdn.com/cover.jpg"}, + "user": { + "user_id": "user123", + "nickname": "摄影师", + "avatar": "https://sns-avatar.xhscdn.com/a.jpg", + }, + "interact_info": { + "liked_count": "12", + "collected_count": "3", + "comment_count": "4", + "share_count": "5", + }, + }, + } + ] + } + } + metadata = module.extract_note_metadata(payload, note_id="note123") + self.assertEqual(metadata["note_id"], "note123") + self.assertEqual(metadata["title"], "海边日落") + self.assertEqual(metadata["description"], "一段描述") + self.assertEqual(metadata["cover_url"], "https://sns-img.xhscdn.com/cover.jpg") + self.assertEqual(metadata["author"]["id"], "user123") + self.assertEqual(metadata["author"]["nickname"], "摄影师") + self.assertEqual(metadata["stats"]["liked_count"], "12") + self.assertEqual(metadata["stats"]["collected_count"], "3") + self.assertEqual(metadata["stats"]["comment_count"], "4") + self.assertEqual(metadata["stats"]["share_count"], "5") + + def test_build_download_metadata_record_includes_download_context(self) -> None: + module = importlib.import_module("XHS") + candidate = module.VideoCandidate( + video_id="note123", + title="视频标题", + video_url="https://sns-video.xhscdn.com/a.mp4", + author_name="作者", + source_key="master_url", + ) + base_metadata = {"title": "真实标题", "author": {"nickname": "真实作者"}} + with mock.patch.object(module, "current_timestamp", return_value="2026-05-27T17:00:00+0800"): + record = module.build_download_metadata_record( + base_metadata=base_metadata, + candidate=candidate, + queue_record=module.QueueRecord("note123", "https://www.xiaohongshu.com/explore/note123", "video-channel"), + output_path=Path("video/a.mp4"), + ) + self.assertEqual(record["note_id"], "note123") + self.assertEqual(record["title"], "真实标题") + self.assertEqual(record["author"]["nickname"], "真实作者") + self.assertEqual(record["source"], "video-channel") + self.assertEqual(record["note_url"], "https://www.xiaohongshu.com/explore/note123") + self.assertEqual(record["video_url"], "https://sns-video.xhscdn.com/a.mp4") + self.assertEqual(record["downloaded_path"], "video/a.mp4") + self.assertEqual(record["downloaded_at"], "2026-05-27T17:00:00+0800") + self.assertEqual(record["comments"], []) + + def test_extract_note_metadata_from_page_uses_visible_dom(self) -> None: + module = importlib.import_module("XHS") + metadata = module.extract_note_metadata_from_page(FakeMetadataPage(), note_id="note123", max_comments=20) + self.assertEqual(metadata["note_id"], "note123") + self.assertEqual(metadata["title"], "这一碗能把面食脑袋香得七荤八素") + self.assertEqual(metadata["description"], "#豆角焖面 #面条") + self.assertEqual(metadata["cover_url"], "https://sns-img.xhscdn.com/cover.jpg") + self.assertEqual(metadata["author"]["id"], "author123") + self.assertEqual(metadata["author"]["nickname"], "日食记") + self.assertEqual(metadata["stats"]["liked_count"], "3.5万") + self.assertEqual(metadata["stats"]["collected_count"], "2.5万") + self.assertEqual(metadata["stats"]["comment_count"], "1220") + self.assertEqual(metadata["comments"][0]["content"], "不相信面能熟") + + def test_append_jsonl_record_writes_utf8_json_line(self) -> None: + module = importlib.import_module("XHS") + with tempfile.TemporaryDirectory() as temp_dir: + path = Path(temp_dir) / "meta" / "records.jsonl" + module.append_jsonl_record(path, {"title": "海边日落", "count": 1}) + module.append_jsonl_record(path, {"title": "猫咪", "count": 2}) + lines = path.read_text(encoding="utf-8").splitlines() + self.assertEqual(len(lines), 2) + self.assertIn("海边日落", lines[0]) + self.assertIn("猫咪", lines[1]) + def test_extract_video_candidates_from_escaped_html_state(self) -> None: module = importlib.import_module("XHS") html = (