Save metadata for downloaded XHS videos

This commit is contained in:
wangshaoqing 2026-05-27 18:29:54 +08:00
parent b13f170ce8
commit dea08527eb
3 changed files with 477 additions and 2 deletions

View File

@ -79,6 +79,7 @@ pip install requests DrissionPage
--source video-channel \
--target-videos 1000 \
--queue-file data/xhs_queue.jsonl \
--metadata-file data/xhs_metadata.jsonl \
--max-runtime 7200
# 搜索关键词结果下载:例如猫咪相关视频
@ -110,6 +111,7 @@ pip install requests DrissionPage
- 默认会在发现页和详情页之间随机停留、上下滚动,并在阶段下载后长停留。
- 下载过程会去重,并在单个视频失败时继续处理后续视频。
- 队列模式会把笔记链接和处理状态保存到 JSONL 文件,支持长任务恢复。
- 队列模式下载成功后会追加写入元数据 JSONL包含 note id、标题、描述、封面、作者、点赞/收藏/评论/分享数、视频地址、保存路径,以及页面可见评论(默认最多 20 条,评论不可见时为空数组)。
## 测试

354
XHS.py
View File

@ -27,6 +27,10 @@ VIDEO_URL_KEYS = {"master_url", "backup_url", "backup_urls"}
TITLE_KEYS = ("display_title", "title", "desc", "description")
ID_KEYS = ("id", "note_id", "noteId", "video_id", "file_id")
AUTHOR_KEYS = ("nickname", "name", "user_name", "userName")
AUTHOR_ID_KEYS = ("user_id", "userId", "id", "uid")
AVATAR_KEYS = ("avatar", "image", "image_url", "avatar_url")
COVER_KEYS = ("cover", "cover_url", "image", "image_url", "url", "thumbnail")
STAT_KEYS = ("liked_count", "collected_count", "comment_count", "share_count")
@dataclass(frozen=True)
@ -120,6 +124,168 @@ def first_string_by_keys(value: Any, keys: tuple[str, ...]) -> str | None:
return None
def first_value_by_keys(value: Any, keys: tuple[str, ...]) -> Any:
if isinstance(value, dict):
for key in keys:
candidate = value.get(key)
if candidate not in (None, ""):
return candidate
for child in value.values():
found = first_value_by_keys(child, keys)
if found not in (None, ""):
return found
elif isinstance(value, list):
for child in value:
found = first_value_by_keys(child, keys)
if found not in (None, ""):
return found
return None
def stringify_metadata_value(value: Any) -> str:
if value is None:
return ""
if isinstance(value, str):
return value.strip()
if isinstance(value, (int, float)):
return str(value)
return ""
def looks_like_image_url(value: str) -> bool:
normalized = value.strip()
return normalized.startswith(("http://", "https://")) and (
"sns-img" in normalized
or "xhscdn.com" in normalized
or any(ext in normalized.lower() for ext in (".jpg", ".jpeg", ".png", ".webp"))
)
def first_image_url(value: Any) -> str:
if isinstance(value, str):
return value.strip() if looks_like_image_url(value) else ""
if isinstance(value, dict):
for key in COVER_KEYS:
candidate = value.get(key)
found = first_image_url(candidate)
if found:
return found
for child in value.values():
found = first_image_url(child)
if found:
return found
elif isinstance(value, list):
for child in value:
found = first_image_url(child)
if found:
return found
return ""
def find_note_card(value: Any, note_id: str) -> dict[str, Any]:
if isinstance(value, dict):
note_card = value.get("note_card") or value.get("noteCard")
if isinstance(note_card, dict):
return note_card
for child in value.values():
found = find_note_card(child, note_id)
if found:
return found
candidate_id = first_string_by_keys(value, ID_KEYS)
if not note_id or candidate_id == note_id:
return value
elif isinstance(value, list):
for child in value:
found = find_note_card(child, note_id)
if found:
return found
return {}
def extract_note_metadata(payload: Any, note_id: str = "") -> dict[str, Any]:
card = find_note_card(payload, note_id)
user = card.get("user") or card.get("author") if isinstance(card, dict) else {}
if not isinstance(user, dict):
user = {}
interact_info = card.get("interact_info") or card.get("interactInfo") if isinstance(card, dict) else {}
if not isinstance(interact_info, dict):
interact_info = {}
resolved_note_id = note_id or first_string_by_keys(card, ID_KEYS) or ""
stats = {
key: stringify_metadata_value(first_value_by_keys(interact_info, (key,)))
for key in STAT_KEYS
}
return {
"note_id": resolved_note_id,
"title": first_string_by_keys(card, TITLE_KEYS) or "",
"description": first_string_by_keys(card, ("desc", "description")) or "",
"cover_url": first_image_url(card),
"author": {
"id": first_string_by_keys(user, AUTHOR_ID_KEYS) or "",
"nickname": first_string_by_keys(user, AUTHOR_KEYS) or "",
"avatar_url": first_image_url(user),
},
"stats": stats,
}
def build_download_metadata_record(
base_metadata: dict[str, Any],
candidate: VideoCandidate,
queue_record: QueueRecord,
output_path: Path,
comments: list[dict[str, Any]] | None = None,
comments_error: str = "",
) -> dict[str, Any]:
record = dict(base_metadata)
record["note_id"] = record.get("note_id") or queue_record.note_id or candidate.video_id
record["title"] = record.get("title") or candidate.title
author = record.get("author")
if not isinstance(author, dict):
author = {}
author["nickname"] = author.get("nickname") or candidate.author_name
record["author"] = author
record["source"] = queue_record.source
record["note_url"] = queue_record.url
record["video_url"] = candidate.video_url
record["video_source_key"] = candidate.source_key
record["downloaded_path"] = output_path.as_posix()
record["downloaded_at"] = current_timestamp()
record["comments"] = comments or []
if comments_error:
record["comments_error"] = comments_error
return record
def merge_metadata(primary: dict[str, Any], secondary: dict[str, Any]) -> dict[str, Any]:
merged = dict(primary)
for key, value in secondary.items():
if key == "author" and isinstance(value, dict):
author = dict(merged.get("author") if isinstance(merged.get("author"), dict) else {})
for author_key, author_value in value.items():
if author_value:
author[author_key] = author_value
merged["author"] = author
elif key == "stats" and isinstance(value, dict):
stats = dict(merged.get("stats") if isinstance(merged.get("stats"), dict) else {})
for stat_key, stat_value in value.items():
if stat_value:
stats[stat_key] = stat_value
merged["stats"] = stats
elif key == "comments" and value:
merged[key] = value
elif value:
merged[key] = value
return merged
def append_jsonl_record(path: Path, record: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as file:
file.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n")
def find_nearest_note_context(path: tuple[Any, ...]) -> dict[str, str]:
video_id = ""
title = ""
@ -217,6 +383,167 @@ def extract_video_candidates_from_html(source: str, video_id: str = "current-pag
return candidates
def extract_note_metadata_from_html(source: str, note_id: str = "") -> dict[str, Any]:
def first_pattern(patterns: list[str]) -> str:
for pattern in patterns:
match = re.search(pattern, source, flags=re.DOTALL)
if match:
return decode_html_video_url(match.group(1)).strip()
return ""
metadata = {
"note_id": note_id,
"title": first_pattern(
[
r'"display_title"\s*:\s*"([^"]+)"',
r'\\"display_title\\"\s*:\s*\\"(.*?)\\"',
r'"title"\s*:\s*"([^"]+)"',
]
),
"description": first_pattern(
[
r'"desc"\s*:\s*"([^"]+)"',
r'\\"desc\\"\s*:\s*\\"(.*?)\\"',
r'"description"\s*:\s*"([^"]+)"',
]
),
"cover_url": first_pattern(
[
r'"cover_url"\s*:\s*"([^"]+)"',
r'\\"cover_url\\"\s*:\s*\\"(.*?)\\"',
r'"url"\s*:\s*"(https?://sns-img[^"]+)"',
r'\\"url\\"\s*:\s*\\"(https?:\\?/\\?/sns-img.*?)(?<!\\)\\"',
]
),
"author": {
"id": first_pattern(
[
r'"user_id"\s*:\s*"([^"]+)"',
r'\\"user_id\\"\s*:\s*\\"(.*?)\\"',
r'"userId"\s*:\s*"([^"]+)"',
]
),
"nickname": first_pattern(
[
r'"nickname"\s*:\s*"([^"]+)"',
r'\\"nickname\\"\s*:\s*\\"(.*?)\\"',
]
),
"avatar_url": first_pattern(
[
r'"avatar"\s*:\s*"(https?://[^"]+)"',
r'\\"avatar\\"\s*:\s*\\"(https?:\\?/\\?/.*?)(?<!\\)\\"',
]
),
},
"stats": {
key: first_pattern(
[
rf'"{key}"\s*:\s*"([^"]*)"',
rf'\\"{key}\\"\s*:\s*\\"(.*?)\\"',
rf'"{key}"\s*:\s*(\d+)',
]
)
for key in STAT_KEYS
},
}
return metadata
def extract_note_metadata_from_page(page: Any, note_id: str = "", max_comments: int = 20) -> dict[str, Any]:
script = f"""
const pickText = (...selectors) => {{
for (const selector of selectors) {{
const element = document.querySelector(selector);
const text = element ? (element.innerText || element.textContent || '').trim() : '';
if (text) return text;
}}
return '';
}};
const pickAttr = (selector, attr) => {{
const element = document.querySelector(selector);
return element ? (element.getAttribute(attr) || element[attr] || '') : '';
}};
const clean = (value) => (value || '').trim();
const cleanCount = (value) => {{
const text = clean(value);
return ['', '回复', '分享', '评论', '收藏'].includes(text) ? '' : text;
}};
const metaContent = (...selectors) => {{
for (const selector of selectors) {{
const element = document.querySelector(selector);
const content = element ? clean(element.getAttribute('content') || '') : '';
if (content) return content;
}}
return '';
}};
const authorRoot = document.querySelector('.author-container')
|| document.querySelector('.note-content .author')
|| document.querySelector('.interaction-container .author')
|| document.querySelector('.author');
const profile = authorRoot ? authorRoot.querySelector('a[href*="/user/profile/"]') : null;
const profileUrl = profile ? (profile.href || profile.getAttribute('href') || '') : '';
const profileMatch = profileUrl.match(/\\/user\\/profile\\/([^/?#]+)/);
const actionRoot = document.querySelector('.interact-container .buttons .left')
|| document.querySelector('.interact-container .left')
|| document.querySelector('.buttons .left')
|| document.querySelector('.interact-container');
const comments = Array.from(document.querySelectorAll('.comments-container .comment-item:not(.comment-item-sub), .comment-item:not(.comment-item-sub)'))
.slice(0, {int(max_comments)})
.map((item) => {{
const commentId = item.id || '';
const author = clean(item.querySelector('.author .name, .name, .user-name')?.innerText || '');
const content = clean(item.querySelector('.content, .note-text, .comment-content')?.innerText || '');
const liked = cleanCount(item.querySelector('.like-wrapper .count, .like .count, .like-wrapper')?.innerText || '');
const time = clean(item.querySelector('.date, .time, .location')?.innerText || '');
return {{comment_id: commentId, author, content, liked_count: liked, time}};
}})
.filter((comment) => comment.author || comment.content);
const dedupedComments = [];
const seenComments = new Set();
for (const comment of comments) {{
const key = comment.comment_id || `${{comment.author}}\\n${{comment.content}}`;
if (seenComments.has(key)) continue;
seenComments.add(key);
dedupedComments.push(comment);
}}
return {{
note_id: '',
title: pickText('#detail-title', '.note-content .title', '.interaction-container .title', '.title')
|| metaContent('meta[name="og:title"]', 'meta[property="og:title"]'),
description: pickText('.note-content .desc', '.desc', '.note-text')
|| metaContent('meta[name="description"]', 'meta[property="og:description"]'),
cover_url: metaContent('meta[name="og:image"]', 'meta[property="og:image"]')
|| pickAttr('.swiper-slide-active img, .media-container img, .note-slider-img, .cover img, video', 'poster')
|| pickAttr('.swiper-slide-active img, .media-container img, .note-slider-img, .cover img', 'src'),
author: {{
id: profileMatch ? profileMatch[1] : '',
nickname: authorRoot ? clean(authorRoot.querySelector('.name, .user-name, .nickname')?.innerText || authorRoot.innerText || '') : '',
avatar_url: authorRoot ? clean(authorRoot.querySelector('img')?.src || '') : '',
profile_url: profileUrl,
}},
stats: {{
liked_count: cleanCount(actionRoot?.querySelector('.like-wrapper .count')?.innerText || metaContent('meta[name="og:xhs:note_like"]')),
collected_count: cleanCount(actionRoot?.querySelector('.collect-wrapper .count')?.innerText || metaContent('meta[name="og:xhs:note_collect"]')),
comment_count: cleanCount(actionRoot?.querySelector('.chat-wrapper .count')?.innerText || metaContent('meta[name="og:xhs:note_comment"]')),
share_count: cleanCount(document.querySelector('.interact-container .share-wrapper .count, .buttons .share-wrapper .count')?.innerText || ''),
}},
comments: dedupedComments.slice(0, {int(max_comments)}),
}};
"""
try:
metadata = page.run_js(script)
except Exception:
metadata = None
if not isinstance(metadata, dict):
metadata = {}
metadata["note_id"] = metadata.get("note_id") or note_id
metadata.setdefault("author", {})
metadata.setdefault("stats", {})
metadata.setdefault("comments", [])
return metadata
def choose_video_candidate(candidates: list[VideoCandidate]) -> VideoCandidate:
if not candidates:
raise ValueError("没有可用的视频候选地址。")
@ -837,6 +1164,8 @@ def run_queue_download(
) -> int:
timeout = int(kwargs.get("timeout", 20))
output_dir = Path(kwargs.get("output_dir", DEFAULT_OUTPUT_DIR))
metadata_file = Path(kwargs.get("metadata_file") or output_dir / "metadata.jsonl")
max_comments = int(kwargs.get("max_comments", 20))
browser_port = kwargs.get("browser_port", DEFAULT_BROWSER_PORT)
human_settings = HumanBrowseSettings(
enabled=bool(kwargs.get("human_mode", True)),
@ -907,16 +1236,24 @@ def run_queue_download(
human_pause(human_settings)
if human_settings.enabled:
run_human_browse_sequence(page, create_human_browse_plan(human_settings))
page_html = page.run_js("return document.documentElement.outerHTML")
metadata = merge_metadata(
extract_note_metadata_from_html(page_html, note_id=record.note_id),
extract_note_metadata_from_page(page, note_id=record.note_id, max_comments=max_comments),
)
candidates = group_video_candidates(
extract_video_candidates_from_html(
page.run_js("return document.documentElement.outerHTML"),
page_html,
video_id=record.note_id,
)
)
if not candidates:
packet = wait_for_feed_packet(page, timeout=timeout)
if packet is not None:
candidates = group_video_candidates(extract_video_candidates(extract_feed_payload(packet.response)))
payload = extract_feed_payload(packet.response)
payload_metadata = extract_note_metadata(payload, note_id=record.note_id)
metadata = merge_metadata(metadata, payload_metadata)
candidates = group_video_candidates(extract_video_candidates(payload))
if not candidates:
records[index] = mark_queue_record_skipped(record, "no video candidate")
save_queue(queue_file, records)
@ -939,6 +1276,15 @@ def run_queue_download(
video_url=candidate.video_url,
output_path=output_path,
)
append_jsonl_record(
metadata_file,
build_download_metadata_record(
base_metadata=metadata,
candidate=candidate,
queue_record=record,
output_path=output_path,
),
)
seen_files.add(output_path)
records[index] = mark_queue_record_downloaded(record, output_path)
save_queue(queue_file, records)
@ -981,6 +1327,8 @@ def build_parser() -> argparse.ArgumentParser:
parser.add_argument("--keyword", default=None, help="搜索来源关键词,例如 猫咪")
parser.add_argument("--target-videos", type=int, default=0, help="队列模式目标下载数量0 表示不启用")
parser.add_argument("--queue-file", default=None, help="JSONL 队列文件路径,提供后启用可恢复队列模式")
parser.add_argument("--metadata-file", default=None, help="下载成功后追加写入的元数据 JSONL 路径,默认 output-dir/metadata.jsonl")
parser.add_argument("--max-comments", type=int, default=20, help="随元数据保存的可见热门评论数量,默认 20")
parser.add_argument("--retry-limit", type=int, default=1, help="队列项下载失败重试次数,默认 1")
return parser
@ -1004,6 +1352,8 @@ def main(argv: list[str] | None = None) -> int:
keyword=args.keyword,
timeout=args.timeout,
output_dir=Path(args.output_dir),
metadata_file=Path(args.metadata_file) if args.metadata_file else None,
max_comments=args.max_comments,
browser_port=args.browser_port,
human_mode=args.human_mode,
min_wait=args.min_wait,

View File

@ -89,6 +89,38 @@ class FakeVideoOnlyLinkPage:
]
class FakeMetadataPage:
def run_js(self, script):
if "detail-title" not in script:
return None
return {
"note_id": "",
"title": "这一碗能把面食脑袋香得七荤八素",
"description": "#豆角焖面 #面条",
"cover_url": "https://sns-img.xhscdn.com/cover.jpg",
"author": {
"id": "author123",
"nickname": "日食记",
"avatar_url": "https://sns-avatar.xhscdn.com/a.jpg",
"profile_url": "https://www.xiaohongshu.com/user/profile/author123",
},
"stats": {
"liked_count": "3.5万",
"collected_count": "2.5万",
"comment_count": "1220",
"share_count": "",
},
"comments": [
{
"author": "莫多西卡多西",
"content": "不相信面能熟",
"liked_count": "290",
"time": "5天前重庆",
}
],
}
class XhsModuleTests(unittest.TestCase):
def test_module_can_import_without_optional_runtime_dependencies(self) -> None:
module = importlib.import_module("XHS")
@ -187,6 +219,97 @@ class XhsModuleTests(unittest.TestCase):
}
self.assertEqual(module.extract_video_candidates(payload), [])
def test_extract_metadata_from_nested_note_payload(self) -> None:
module = importlib.import_module("XHS")
payload = {
"data": {
"items": [
{
"id": "note123",
"note_card": {
"display_title": "海边日落",
"desc": "一段描述",
"cover": {"url": "https://sns-img.xhscdn.com/cover.jpg"},
"user": {
"user_id": "user123",
"nickname": "摄影师",
"avatar": "https://sns-avatar.xhscdn.com/a.jpg",
},
"interact_info": {
"liked_count": "12",
"collected_count": "3",
"comment_count": "4",
"share_count": "5",
},
},
}
]
}
}
metadata = module.extract_note_metadata(payload, note_id="note123")
self.assertEqual(metadata["note_id"], "note123")
self.assertEqual(metadata["title"], "海边日落")
self.assertEqual(metadata["description"], "一段描述")
self.assertEqual(metadata["cover_url"], "https://sns-img.xhscdn.com/cover.jpg")
self.assertEqual(metadata["author"]["id"], "user123")
self.assertEqual(metadata["author"]["nickname"], "摄影师")
self.assertEqual(metadata["stats"]["liked_count"], "12")
self.assertEqual(metadata["stats"]["collected_count"], "3")
self.assertEqual(metadata["stats"]["comment_count"], "4")
self.assertEqual(metadata["stats"]["share_count"], "5")
def test_build_download_metadata_record_includes_download_context(self) -> None:
module = importlib.import_module("XHS")
candidate = module.VideoCandidate(
video_id="note123",
title="视频标题",
video_url="https://sns-video.xhscdn.com/a.mp4",
author_name="作者",
source_key="master_url",
)
base_metadata = {"title": "真实标题", "author": {"nickname": "真实作者"}}
with mock.patch.object(module, "current_timestamp", return_value="2026-05-27T17:00:00+0800"):
record = module.build_download_metadata_record(
base_metadata=base_metadata,
candidate=candidate,
queue_record=module.QueueRecord("note123", "https://www.xiaohongshu.com/explore/note123", "video-channel"),
output_path=Path("video/a.mp4"),
)
self.assertEqual(record["note_id"], "note123")
self.assertEqual(record["title"], "真实标题")
self.assertEqual(record["author"]["nickname"], "真实作者")
self.assertEqual(record["source"], "video-channel")
self.assertEqual(record["note_url"], "https://www.xiaohongshu.com/explore/note123")
self.assertEqual(record["video_url"], "https://sns-video.xhscdn.com/a.mp4")
self.assertEqual(record["downloaded_path"], "video/a.mp4")
self.assertEqual(record["downloaded_at"], "2026-05-27T17:00:00+0800")
self.assertEqual(record["comments"], [])
def test_extract_note_metadata_from_page_uses_visible_dom(self) -> None:
module = importlib.import_module("XHS")
metadata = module.extract_note_metadata_from_page(FakeMetadataPage(), note_id="note123", max_comments=20)
self.assertEqual(metadata["note_id"], "note123")
self.assertEqual(metadata["title"], "这一碗能把面食脑袋香得七荤八素")
self.assertEqual(metadata["description"], "#豆角焖面 #面条")
self.assertEqual(metadata["cover_url"], "https://sns-img.xhscdn.com/cover.jpg")
self.assertEqual(metadata["author"]["id"], "author123")
self.assertEqual(metadata["author"]["nickname"], "日食记")
self.assertEqual(metadata["stats"]["liked_count"], "3.5万")
self.assertEqual(metadata["stats"]["collected_count"], "2.5万")
self.assertEqual(metadata["stats"]["comment_count"], "1220")
self.assertEqual(metadata["comments"][0]["content"], "不相信面能熟")
def test_append_jsonl_record_writes_utf8_json_line(self) -> None:
module = importlib.import_module("XHS")
with tempfile.TemporaryDirectory() as temp_dir:
path = Path(temp_dir) / "meta" / "records.jsonl"
module.append_jsonl_record(path, {"title": "海边日落", "count": 1})
module.append_jsonl_record(path, {"title": "猫咪", "count": 2})
lines = path.read_text(encoding="utf-8").splitlines()
self.assertEqual(len(lines), 2)
self.assertIn("海边日落", lines[0])
self.assertIn("猫咪", lines[1])
def test_extract_video_candidates_from_escaped_html_state(self) -> None:
module = importlib.import_module("XHS")
html = (