from __future__ import annotations import argparse import html import json import random import re import socket import sys import time from dataclasses import dataclass from dataclasses import asdict from pathlib import Path from typing import Any from urllib.parse import quote from urllib.parse import urljoin from dataclasses import replace DEFAULT_EXPLORE_URL = "https://www.xiaohongshu.com/explore" DEFAULT_VIDEO_CHANNEL_URL = "https://www.xiaohongshu.com/explore?channel_id=homefeed.video_v3" DEFAULT_BROWSER_PORT = 9223 DEFAULT_OUTPUT_DIR = Path("video") DEFAULT_MIN_VIDEO_BYTES = 200 * 1024 LISTEN_TARGET = "/api/sns/web/v1/feed" MAX_FILENAME_BYTES = 240 INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]') VIDEO_URL_KEYS = {"master_url", "backup_url", "backup_urls"} TITLE_KEYS = ("display_title", "title", "desc", "description") ID_KEYS = ("id", "note_id", "noteId", "video_id", "file_id") AUTHOR_KEYS = ("nickname", "name", "user_name", "userName") AUTHOR_ID_KEYS = ("user_id", "userId", "id", "uid") AVATAR_KEYS = ("avatar", "image", "image_url", "avatar_url") COVER_KEYS = ("cover", "cover_url", "image", "image_url", "url", "thumbnail") STAT_KEYS = ("liked_count", "collected_count", "comment_count", "share_count") @dataclass(frozen=True) class VideoCandidate: video_id: str title: str video_url: str author_name: str source_key: str @dataclass(frozen=True) class HumanBrowseSettings: enabled: bool = True min_wait: float = 2.0 max_wait: float = 6.0 long_break_every: int = 4 long_break_min: float = 8.0 long_break_max: float = 20.0 max_runtime: float = 0.0 min_scroll: int = 500 max_scroll: int = 1200 reverse_scroll_probability: float = 0.45 min_reverse_scroll: int = 100 max_reverse_scroll: int = 400 @dataclass(frozen=True) class HumanBrowsePlan: down_distance: int primary_wait: float reverse_distance: int = 0 reverse_wait: float = 0.0 settle_wait: float = 0.0 @dataclass(frozen=True) class QueueRecord: note_id: str url: str source: str status: str = "pending" attempts: int = 0 downloaded_path: str = "" last_error: str = "" updated_at: str = "" def sanitize_filename(value: str, fallback: str = "untitled") -> str: cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._") return cleaned or fallback def truncate_utf8_bytes(value: str, max_bytes: int) -> str: if len(value.encode("utf-8")) <= max_bytes: return value result = "" used = 0 for character in value: character_bytes = len(character.encode("utf-8")) if used + character_bytes > max_bytes: break result += character used += character_bytes return result.rstrip(" ._") def looks_like_video_url(value: str) -> bool: normalized = value.strip() return normalized.startswith(("http://", "https://")) and ( ".mp4" in normalized or "sns-video" in normalized or "xhscdn.com" in normalized ) def first_string_by_keys(value: Any, keys: tuple[str, ...]) -> str | None: if isinstance(value, dict): for key in keys: candidate = value.get(key) if isinstance(candidate, str) and candidate.strip(): return candidate.strip() for child in value.values(): found = first_string_by_keys(child, keys) if found: return found elif isinstance(value, list): for child in value: found = first_string_by_keys(child, keys) if found: return found return None def first_value_by_keys(value: Any, keys: tuple[str, ...]) -> Any: if isinstance(value, dict): for key in keys: candidate = value.get(key) if candidate not in (None, ""): return candidate for child in value.values(): found = first_value_by_keys(child, keys) if found not in (None, ""): return found elif isinstance(value, list): for child in value: found = first_value_by_keys(child, keys) if found not in (None, ""): return found return None def stringify_metadata_value(value: Any) -> str: if value is None: return "" if isinstance(value, str): return value.strip() if isinstance(value, (int, float)): return str(value) return "" def looks_like_image_url(value: str) -> bool: normalized = value.strip() return normalized.startswith(("http://", "https://")) and ( "sns-img" in normalized or "xhscdn.com" in normalized or any(ext in normalized.lower() for ext in (".jpg", ".jpeg", ".png", ".webp")) ) def first_image_url(value: Any) -> str: if isinstance(value, str): return value.strip() if looks_like_image_url(value) else "" if isinstance(value, dict): for key in COVER_KEYS: candidate = value.get(key) found = first_image_url(candidate) if found: return found for child in value.values(): found = first_image_url(child) if found: return found elif isinstance(value, list): for child in value: found = first_image_url(child) if found: return found return "" def find_note_card(value: Any, note_id: str) -> dict[str, Any]: if isinstance(value, dict): note_card = value.get("note_card") or value.get("noteCard") if isinstance(note_card, dict): return note_card for child in value.values(): found = find_note_card(child, note_id) if found: return found candidate_id = first_string_by_keys(value, ID_KEYS) if not note_id or candidate_id == note_id: return value elif isinstance(value, list): for child in value: found = find_note_card(child, note_id) if found: return found return {} def extract_note_metadata(payload: Any, note_id: str = "") -> dict[str, Any]: card = find_note_card(payload, note_id) user = card.get("user") or card.get("author") if isinstance(card, dict) else {} if not isinstance(user, dict): user = {} interact_info = card.get("interact_info") or card.get("interactInfo") if isinstance(card, dict) else {} if not isinstance(interact_info, dict): interact_info = {} resolved_note_id = note_id or first_string_by_keys(card, ID_KEYS) or "" stats = { key: stringify_metadata_value(first_value_by_keys(interact_info, (key,))) for key in STAT_KEYS } return { "note_id": resolved_note_id, "title": first_string_by_keys(card, TITLE_KEYS) or "", "description": first_string_by_keys(card, ("desc", "description")) or "", "cover_url": first_image_url(card), "author": { "id": first_string_by_keys(user, AUTHOR_ID_KEYS) or "", "nickname": first_string_by_keys(user, AUTHOR_KEYS) or "", "avatar_url": first_image_url(user), }, "stats": stats, } def build_download_metadata_record( base_metadata: dict[str, Any], candidate: VideoCandidate, queue_record: QueueRecord, output_path: Path, comments: list[dict[str, Any]] | None = None, comments_error: str = "", ) -> dict[str, Any]: record = dict(base_metadata) record["note_id"] = record.get("note_id") or queue_record.note_id or candidate.video_id record["title"] = record.get("title") or candidate.title author = record.get("author") if not isinstance(author, dict): author = {} author["nickname"] = author.get("nickname") or candidate.author_name record["author"] = author record["source"] = queue_record.source record["note_url"] = queue_record.url record["video_url"] = candidate.video_url record["video_source_key"] = candidate.source_key record["downloaded_path"] = output_path.as_posix() record["downloaded_at"] = current_timestamp() record["comments"] = comments if comments is not None else record.get("comments") or [] if comments_error: record["comments_error"] = comments_error return record def merge_metadata(primary: dict[str, Any], secondary: dict[str, Any]) -> dict[str, Any]: merged = dict(primary) for key, value in secondary.items(): if key == "author" and isinstance(value, dict): author = dict(merged.get("author") if isinstance(merged.get("author"), dict) else {}) for author_key, author_value in value.items(): if author_value: author[author_key] = author_value merged["author"] = author elif key == "stats" and isinstance(value, dict): stats = dict(merged.get("stats") if isinstance(merged.get("stats"), dict) else {}) for stat_key, stat_value in value.items(): if stat_value: stats[stat_key] = stat_value merged["stats"] = stats elif key == "comments" and value: merged[key] = value elif value: merged[key] = value return merged def append_jsonl_record(path: Path, record: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as file: file.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n") def find_nearest_note_context(path: tuple[Any, ...]) -> dict[str, str]: video_id = "" title = "" author_name = "" for node in reversed(path): if not isinstance(node, dict): continue if not video_id: video_id = first_string_by_keys(node, ID_KEYS) or "" if not title: title = first_string_by_keys(node, TITLE_KEYS) or "" if not author_name: user = node.get("user") or node.get("author") if isinstance(user, dict): author_name = first_string_by_keys(user, AUTHOR_KEYS) or "" return { "video_id": video_id or "unknown", "title": title or "untitled", "author_name": author_name or "unknown", } def append_candidate( candidates: list[VideoCandidate], url: str, source_key: str, path: tuple[Any, ...], ) -> None: if not looks_like_video_url(url): return context = find_nearest_note_context(path) candidates.append( VideoCandidate( video_id=context["video_id"], title=context["title"], video_url=url.strip(), author_name=context["author_name"], source_key=source_key, ) ) def walk_for_video_candidates(value: Any, path: tuple[Any, ...], candidates: list[VideoCandidate]) -> None: if isinstance(value, dict): current_path = (*path, value) for key, child in value.items(): if key in VIDEO_URL_KEYS: if isinstance(child, str): append_candidate(candidates, child, key, current_path) elif isinstance(child, list): for item in child: if isinstance(item, str): append_candidate(candidates, item, key, current_path) walk_for_video_candidates(child, current_path, candidates) elif isinstance(value, list): for child in value: walk_for_video_candidates(child, path, candidates) def extract_video_candidates(payload: Any) -> list[VideoCandidate]: candidates: list[VideoCandidate] = [] walk_for_video_candidates(payload, (), candidates) return candidates def decode_html_video_url(value: str) -> str: decoded = html.unescape(value) return decoded.replace("\\u002F", "/").replace("\\/", "/") def extract_video_candidates_from_html(source: str, video_id: str = "current-page") -> list[VideoCandidate]: patterns = [ r'\\"master_url\\"\s*:\s*\\"(.*?)\\"', r'"master_url"\s*:\s*"(.*?)"', ] candidates: list[VideoCandidate] = [] seen_urls: set[str] = set() for pattern in patterns: for match in re.findall(pattern, source): video_url = decode_html_video_url(match) if video_url in seen_urls or not looks_like_video_url(video_url): continue seen_urls.add(video_url) candidates.append( VideoCandidate( video_id=video_id, title="current-page-video", video_url=video_url, author_name="unknown", source_key="html_master_url", ) ) return candidates def extract_note_metadata_from_html(source: str, note_id: str = "") -> dict[str, Any]: def first_pattern(patterns: list[str]) -> str: for pattern in patterns: match = re.search(pattern, source, flags=re.DOTALL) if match: return decode_html_video_url(match.group(1)).strip() return "" metadata = { "note_id": note_id, "title": first_pattern( [ r'"display_title"\s*:\s*"([^"]+)"', r'\\"display_title\\"\s*:\s*\\"(.*?)\\"', r'"title"\s*:\s*"([^"]+)"', ] ), "description": first_pattern( [ r'"desc"\s*:\s*"([^"]+)"', r'\\"desc\\"\s*:\s*\\"(.*?)\\"', r'"description"\s*:\s*"([^"]+)"', ] ), "cover_url": first_pattern( [ r'"cover_url"\s*:\s*"([^"]+)"', r'\\"cover_url\\"\s*:\s*\\"(.*?)\\"', r'"url"\s*:\s*"(https?://sns-img[^"]+)"', r'\\"url\\"\s*:\s*\\"(https?:\\?/\\?/sns-img.*?)(? dict[str, Any]: script = f""" const pickText = (...selectors) => {{ for (const selector of selectors) {{ const element = document.querySelector(selector); const text = element ? (element.innerText || element.textContent || '').trim() : ''; if (text) return text; }} return ''; }}; const pickAttr = (selector, attr) => {{ const element = document.querySelector(selector); return element ? (element.getAttribute(attr) || element[attr] || '') : ''; }}; const clean = (value) => (value || '').trim(); const cleanCount = (value) => {{ const text = clean(value); return ['赞', '回复', '分享', '评论', '收藏'].includes(text) ? '' : text; }}; const metaContent = (...selectors) => {{ for (const selector of selectors) {{ const element = document.querySelector(selector); const content = element ? clean(element.getAttribute('content') || '') : ''; if (content) return content; }} return ''; }}; const authorRoot = document.querySelector('.author-container') || document.querySelector('.note-content .author') || document.querySelector('.interaction-container .author') || document.querySelector('.author'); const profile = authorRoot ? authorRoot.querySelector('a[href*="/user/profile/"]') : null; const profileUrl = profile ? (profile.href || profile.getAttribute('href') || '') : ''; const profileMatch = profileUrl.match(/\\/user\\/profile\\/([^/?#]+)/); const actionRoot = document.querySelector('.interact-container .buttons .left') || document.querySelector('.interact-container .left') || document.querySelector('.buttons .left') || document.querySelector('.interact-container'); const comments = Array.from(document.querySelectorAll('.comments-container .comment-item:not(.comment-item-sub), .comment-item:not(.comment-item-sub)')) .slice(0, {int(max_comments)}) .map((item) => {{ const commentId = item.id || ''; const author = clean(item.querySelector('.author .name, .name, .user-name')?.innerText || ''); const content = clean(item.querySelector('.content, .note-text, .comment-content')?.innerText || ''); const liked = cleanCount(item.querySelector('.like-wrapper .count, .like .count, .like-wrapper')?.innerText || ''); const time = clean(item.querySelector('.date, .time, .location')?.innerText || ''); return {{comment_id: commentId, author, content, liked_count: liked, time}}; }}) .filter((comment) => comment.author || comment.content); const dedupedComments = []; const seenComments = new Set(); for (const comment of comments) {{ const key = comment.comment_id || `${{comment.author}}\\n${{comment.content}}`; if (seenComments.has(key)) continue; seenComments.add(key); dedupedComments.push(comment); }} return {{ note_id: '', title: pickText('#detail-title', '.note-content .title', '.interaction-container .title', '.title') || metaContent('meta[name="og:title"]', 'meta[property="og:title"]'), description: pickText('.note-content .desc', '.desc', '.note-text') || metaContent('meta[name="description"]', 'meta[property="og:description"]'), cover_url: metaContent('meta[name="og:image"]', 'meta[property="og:image"]') || pickAttr('.swiper-slide-active img, .media-container img, .note-slider-img, .cover img, video', 'poster') || pickAttr('.swiper-slide-active img, .media-container img, .note-slider-img, .cover img', 'src'), author: {{ id: profileMatch ? profileMatch[1] : '', nickname: authorRoot ? clean(authorRoot.querySelector('.name, .user-name, .nickname')?.innerText || authorRoot.innerText || '') : '', avatar_url: authorRoot ? clean(authorRoot.querySelector('img')?.src || '') : '', profile_url: profileUrl, }}, stats: {{ liked_count: cleanCount(actionRoot?.querySelector('.like-wrapper .count')?.innerText || metaContent('meta[name="og:xhs:note_like"]')), collected_count: cleanCount(actionRoot?.querySelector('.collect-wrapper .count')?.innerText || metaContent('meta[name="og:xhs:note_collect"]')), comment_count: cleanCount(actionRoot?.querySelector('.chat-wrapper .count')?.innerText || metaContent('meta[name="og:xhs:note_comment"]')), share_count: cleanCount(document.querySelector('.interact-container .share-wrapper .count, .buttons .share-wrapper .count')?.innerText || ''), }}, comments: dedupedComments.slice(0, {int(max_comments)}), }}; """ try: metadata = page.run_js(script) except Exception: metadata = None if not isinstance(metadata, dict): metadata = {} metadata["note_id"] = metadata.get("note_id") or note_id metadata.setdefault("author", {}) metadata.setdefault("stats", {}) metadata.setdefault("comments", []) return metadata def choose_video_candidate(candidates: list[VideoCandidate]) -> VideoCandidate: if not candidates: raise ValueError("没有可用的视频候选地址。") source_priority = {"master_url": 0, "html_master_url": 0, "backup_url": 1, "backup_urls": 2, "url": 3} return sorted(candidates, key=lambda item: source_priority.get(item.source_key, 99))[0] def group_video_candidates(candidates: list[VideoCandidate]) -> list[VideoCandidate]: grouped: dict[str, list[VideoCandidate]] = {} order: list[str] = [] for candidate in candidates: key = candidate.video_id or candidate.video_url if key not in grouped: grouped[key] = [] order.append(key) grouped[key].append(candidate) return [choose_video_candidate(grouped[key]) for key in order] def build_output_path(candidate: VideoCandidate, output_dir: Path = DEFAULT_OUTPUT_DIR) -> Path: safe_author = sanitize_filename(candidate.author_name, fallback="unknown") safe_title = sanitize_filename(candidate.title, fallback="untitled") safe_video_id = sanitize_filename(candidate.video_id, fallback="unknown") prefix = f"[{safe_author}]" suffix = f"-{safe_video_id}.mp4" title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8")) if title_budget < 1: prefix_budget = MAX_FILENAME_BYTES - len(suffix.encode("utf-8")) - 1 prefix = truncate_utf8_bytes(prefix, max(1, prefix_budget)) title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8")) filename = f"{prefix}{truncate_utf8_bytes(safe_title, max(1, title_budget))}{suffix}" return output_dir / filename def build_browser_address(browser_port: int | None) -> str | None: if browser_port is None: return None return f"127.0.0.1:{browser_port}" def ensure_browser_debug_port_ready(browser_port: int) -> None: try: with socket.create_connection(("127.0.0.1", browser_port), timeout=2): return except OSError as exc: raise RuntimeError( "无法连接到已启动的 Chrome 调试端口。" f"请先运行 `./.venv/bin/python login_xhs.py --browser-port {browser_port}`," "并确认 Chrome 仍在运行且端口一致。" ) from exc def build_headers(referer: str) -> dict[str, str]: return { "referer": referer, "user-agent": ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/135.0.0.0 Safari/537.36" ), } def import_runtime_dependencies() -> tuple[Any, Any, Any]: try: import requests except ModuleNotFoundError as exc: raise SystemExit("缺少 requests,请先执行: python3 -m pip install requests") from exc try: from DrissionPage import ChromiumOptions from DrissionPage import ChromiumPage except ModuleNotFoundError as exc: raise SystemExit("缺少 DrissionPage,请先执行: python3 -m pip install DrissionPage") from exc return requests, ChromiumPage, ChromiumOptions def create_page(chromium_page_cls: Any, chromium_options_cls: Any, browser_port: int | None) -> Any: browser_address = build_browser_address(browser_port) if browser_address is None: return chromium_page_cls() options = chromium_options_cls().set_address(browser_address).existing_only(True) return chromium_page_cls(options) def extract_feed_payload(response: Any) -> dict[str, Any]: body = getattr(response, "body", None) if isinstance(body, dict): return body raw_body = getattr(response, "raw_body", None) if isinstance(raw_body, str) and raw_body.strip(): payload = json.loads(raw_body) if isinstance(payload, dict): return payload raise ValueError("响应体不是可解析的 JSON 字典。") def download_video( requests_module: Any, headers: dict[str, str], video_url: str, output_path: Path, min_video_bytes: int = DEFAULT_MIN_VIDEO_BYTES, ) -> None: response = requests_module.get(video_url, headers=headers, timeout=60, stream=True) response.raise_for_status() output_path.parent.mkdir(parents=True, exist_ok=True) temp_path = output_path.with_suffix(f"{output_path.suffix}.part") if not hasattr(response, "iter_content"): validate_video_response(response, video_url, min_video_bytes=min_video_bytes) output_path.write_bytes(response.content) return total_bytes = 0 prefix = b"" try: with temp_path.open("wb") as file: for chunk in response.iter_content(chunk_size=1024 * 1024): if not chunk: continue total_bytes += len(chunk) if len(prefix) < 4096: prefix += chunk[: 4096 - len(prefix)] file.write(chunk) validate_downloaded_video_bytes( content_type=str(getattr(response, "headers", {}).get("content-type", "")).lower(), prefix=prefix, total_bytes=total_bytes, video_url=video_url, min_video_bytes=min_video_bytes, ) temp_path.replace(output_path) except Exception: if temp_path.exists(): temp_path.unlink() raise def validate_video_response(response: Any, video_url: str, min_video_bytes: int = DEFAULT_MIN_VIDEO_BYTES) -> None: content = getattr(response, "content", b"") content_type = str(getattr(response, "headers", {}).get("content-type", "")).lower() if content_type.startswith("image/"): raise ValueError(f"非视频响应: {content_type} {video_url}") if content.startswith(b"RIFF") and b"WEBP" in content[:16]: raise ValueError(f"非视频响应: image/webp {video_url}") if content.lstrip().startswith((b"= 12 and content[4:8] == b"ftyp" has_webm_signature = content.startswith(b"\x1a\x45\xdf\xa3") if has_video_type or has_mp4_signature or has_webm_signature: if min_video_bytes > 0 and len(content) < min_video_bytes: raise ValueError(f"视频响应过小: {len(content)} bytes < {min_video_bytes} bytes {video_url}") return raise ValueError(f"非视频响应: {content_type or 'unknown'} {video_url}") def validate_downloaded_video_bytes( content_type: str, prefix: bytes, total_bytes: int, video_url: str, min_video_bytes: int = DEFAULT_MIN_VIDEO_BYTES, ) -> None: if content_type.startswith("image/"): raise ValueError(f"非视频响应: {content_type} {video_url}") if prefix.startswith(b"RIFF") and b"WEBP" in prefix[:16]: raise ValueError(f"非视频响应: image/webp {video_url}") if prefix.lstrip().startswith((b"= 12 and prefix[4:8] == b"ftyp" has_webm_signature = prefix.startswith(b"\x1a\x45\xdf\xa3") if has_video_type or has_mp4_signature or has_webm_signature: if min_video_bytes > 0 and total_bytes < min_video_bytes: raise ValueError(f"视频响应过小: {total_bytes} bytes < {min_video_bytes} bytes {video_url}") return raise ValueError(f"非视频响应: {content_type or 'unknown'} {video_url}") def wait_for_feed_packet(page: Any, timeout: int) -> Any | None: try: packet = page.listen.wait(timeout=timeout) return packet if packet else None except Exception as exc: print(f"[WARN] 等待 feed 数据超时或失败: {exc}") return None def create_human_browse_plan( settings: HumanBrowseSettings, random_module: Any = random, ) -> HumanBrowsePlan: down_distance = random_module.randint(settings.min_scroll, settings.max_scroll) primary_wait = random_module.uniform(settings.min_wait, settings.max_wait) settle_wait = random_module.uniform(settings.min_wait, settings.max_wait) reverse_distance = 0 reverse_wait = 0.0 if random_module.random() < settings.reverse_scroll_probability: reverse_distance = random_module.randint(settings.min_reverse_scroll, settings.max_reverse_scroll) reverse_wait = random_module.uniform(1.0, min(4.0, settings.max_wait)) return HumanBrowsePlan( down_distance=down_distance, primary_wait=primary_wait, reverse_distance=reverse_distance, reverse_wait=reverse_wait, settle_wait=settle_wait, ) def run_scroll_step(page: Any, distance: int) -> None: script = f""" const distance = {distance}; const candidates = Array.from(document.querySelectorAll('*')) .filter((el) => {{ const rect = el.getBoundingClientRect(); return rect.width > 300 && rect.height > 200 && el.scrollHeight > el.clientHeight + 20; }}) .sort((a, b) => {{ const areaA = a.getBoundingClientRect().width * a.getBoundingClientRect().height; const areaB = b.getBoundingClientRect().width * b.getBoundingClientRect().height; return areaB - areaA; }}); const target = candidates[0] || document.scrollingElement || document.documentElement; target.scrollBy(0, distance); """ page.run_js(script) def run_human_browse_sequence(page: Any, plan: HumanBrowsePlan) -> None: run_scroll_step(page, plan.down_distance) time.sleep(plan.primary_wait) if plan.reverse_distance > 0: run_scroll_step(page, -plan.reverse_distance) time.sleep(plan.reverse_wait) run_scroll_step(page, plan.reverse_distance * 2) if plan.settle_wait > 0: time.sleep(plan.settle_wait) def count_visible_comments(page: Any) -> int: script = """ const xhsVisibleCommentCount = () => { return document.querySelectorAll( '.comments-container .comment-item:not(.comment-item-sub), .comment-item:not(.comment-item-sub)' ).length; }; return xhsVisibleCommentCount(); """ try: count = page.run_js(script) return int(count or 0) except Exception: return 0 def scroll_comment_container(page: Any, distance: int = 720) -> bool: script = f""" const xhsScrollCommentContainer = () => {{ const distance = {int(distance)}; const preferred = Array.from(document.querySelectorAll( '.comments-container, .comments-el, .comment-list, .interaction-container, .note-scroller, .note-detail-mask' )); const scrollables = Array.from(document.querySelectorAll('*')) .filter((el) => {{ const className = String(el.className || '').toLowerCase(); const rect = el.getBoundingClientRect(); return el.scrollHeight > el.clientHeight + 20 && rect.width > 240 && rect.height > 160 && /comment|interaction|note|detail|right/.test(className); }}); const candidates = preferred.concat(scrollables) .filter((el, index, list) => el && list.indexOf(el) === index) .filter((el) => el.scrollHeight > el.clientHeight + 20); const target = candidates[0] || document.scrollingElement || document.documentElement; if (!target) return false; target.scrollBy(0, distance); target.dispatchEvent(new WheelEvent('wheel', {{deltaY: distance, bubbles: true}})); return true; }}; return xhsScrollCommentContainer(); """ try: return bool(page.run_js(script)) except Exception: return False def load_visible_comments( page: Any, human_settings: HumanBrowseSettings, max_comments: int = 20, timeout: float = 8.0, ) -> bool: if max_comments <= 0: return False deadline = time.monotonic() + max(0.0, timeout) wait_seconds = 0.3 if human_settings.enabled: wait_seconds = max(0.1, min(1.5, human_settings.max_wait)) while True: if count_visible_comments(page) > 0: return True if time.monotonic() >= deadline: return False scroll_comment_container(page) time.sleep(wait_seconds) def human_pause(settings: HumanBrowseSettings, random_module: Any = random) -> None: if settings.enabled: time.sleep(random_module.uniform(settings.min_wait, settings.max_wait)) def should_take_long_break(downloaded: int, settings: HumanBrowseSettings) -> bool: return settings.enabled and settings.long_break_every > 0 and downloaded > 0 and downloaded % settings.long_break_every == 0 def take_long_break(settings: HumanBrowseSettings, random_module: Any = random) -> None: if settings.enabled: wait_seconds = random_module.uniform(settings.long_break_min, settings.long_break_max) print(f"[INFO] 已达到阶段下载数,停留 {wait_seconds:.1f}s") time.sleep(wait_seconds) def scroll_feed(page: Any, distance: int = 900) -> None: run_scroll_step(page, distance) time.sleep(2) def collect_videos( max_videos: int, timeout: int, output_dir: Path, browser_port: int | None, start_url: str = DEFAULT_EXPLORE_URL, use_current_page: bool = False, human_mode: bool = True, min_wait: float = 2.0, max_wait: float = 6.0, long_break_every: int = 4, max_runtime: float = 0.0, ) -> int: human_settings = HumanBrowseSettings( enabled=human_mode, min_wait=min_wait, max_wait=max_wait, long_break_every=long_break_every, max_runtime=max_runtime, ) started_at = time.monotonic() requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) page.listen.start(LISTEN_TARGET) if use_current_page: print(f"[INFO] 使用当前页面: {getattr(page, 'url', '')}") page.refresh() else: print("[INFO] 正在打开小红书发现页。若出现登录或验证码,请先在浏览器窗口里完成。") page.get(start_url) human_pause(human_settings) downloaded = 0 seen_urls: set[str] = set() seen_files: set[Path] = set() consecutive_empty = 0 max_consecutive_empty = 6 if not use_current_page: downloaded += collect_videos_from_explore_cards( page=page, requests_module=requests_module, output_dir=output_dir, max_videos=max_videos, start_url=start_url, seen_urls=seen_urls, seen_files=seen_files, human_settings=human_settings, started_at=started_at, ) if downloaded >= max_videos: return downloaded page.get(start_url) human_pause(human_settings) while downloaded < max_videos and consecutive_empty < max_consecutive_empty: if human_settings.max_runtime > 0 and time.monotonic() - started_at >= human_settings.max_runtime: print("[INFO] 已达到最大运行时间,结束抓取。") break packet = wait_for_feed_packet(page, timeout=timeout) if packet is None: candidates = group_video_candidates( extract_video_candidates_from_html( page.run_js("return document.documentElement.outerHTML"), video_id=extract_note_id_from_url(getattr(page, "url", "")), ) ) if not candidates: consecutive_empty += 1 run_human_browse_sequence(page, create_human_browse_plan(human_settings)) continue else: try: payload = extract_feed_payload(packet.response) candidates = group_video_candidates(extract_video_candidates(payload)) except Exception as exc: print(f"[WARN] 解析 feed 数据失败: {exc}") consecutive_empty += 1 run_human_browse_sequence(page, create_human_browse_plan(human_settings)) continue fresh_candidates = [candidate for candidate in candidates if candidate.video_url not in seen_urls] if not fresh_candidates: consecutive_empty += 1 run_human_browse_sequence(page, create_human_browse_plan(human_settings)) continue consecutive_empty = 0 for candidate in fresh_candidates: if downloaded >= max_videos: break seen_urls.add(candidate.video_url) output_path = build_output_path(candidate, output_dir=output_dir) if output_path in seen_files or output_path.exists(): continue headers = build_headers(getattr(page, "url", start_url) or start_url) try: download_video( requests_module=requests_module, headers=headers, video_url=candidate.video_url, output_path=output_path, ) except Exception as exc: print(f"[WARN] 下载失败 {candidate.video_id}: {exc}") continue downloaded += 1 seen_files.add(output_path) print(f"[OK] 已保存: {output_path}") human_pause(human_settings) if should_take_long_break(downloaded, human_settings): take_long_break(human_settings) if downloaded < max_videos: run_human_browse_sequence(page, create_human_browse_plan(human_settings)) if downloaded == 0: print("[WARN] 没有下载到视频。请确认已登录小红书、页面已加载 feed,并在浏览器中滚动后重试。") return downloaded def collect_videos_from_explore_cards( page: Any, requests_module: Any, output_dir: Path, max_videos: int, start_url: str, seen_urls: set[str], seen_files: set[Path], human_settings: HumanBrowseSettings, started_at: float, ) -> int: downloaded = 0 visited_note_ids: set[str] = set() empty_rounds = 0 max_empty_rounds = 4 while downloaded < max_videos and empty_rounds < max_empty_rounds: if human_settings.max_runtime > 0 and time.monotonic() - started_at >= human_settings.max_runtime: print("[INFO] 已达到最大运行时间,结束抓取。") break page.get(start_url) human_pause(human_settings) note_urls = filter_unvisited_note_urls( wait_for_note_urls_from_page(page, limit=max_videos * 12), visited_note_ids, ) if not note_urls: empty_rounds += 1 run_human_browse_sequence(page, create_human_browse_plan(human_settings)) continue round_downloaded = 0 for note_url in note_urls: if downloaded >= max_videos: break if human_settings.max_runtime > 0 and time.monotonic() - started_at >= human_settings.max_runtime: print("[INFO] 已达到最大运行时间,结束抓取。") break note_id = extract_note_id_from_url(note_url) visited_note_ids.add(note_id) page.get(note_url) print(f"[INFO] 打开笔记 {len(visited_note_ids)}: {note_id}") human_pause(human_settings) if human_settings.enabled: run_human_browse_sequence(page, create_human_browse_plan(human_settings)) candidates = group_video_candidates( extract_video_candidates_from_html( page.run_js("return document.documentElement.outerHTML"), video_id=note_id, ) ) if not candidates: continue for candidate in candidates: if downloaded >= max_videos: break if candidate.video_url in seen_urls: continue seen_urls.add(candidate.video_url) output_path = build_output_path(candidate, output_dir=output_dir) if output_path in seen_files or output_path.exists(): continue try: download_video( requests_module=requests_module, headers=build_headers(start_url), video_url=candidate.video_url, output_path=output_path, ) except Exception as exc: print(f"[WARN] 下载失败 {candidate.video_id}: {exc}") continue downloaded += 1 round_downloaded += 1 seen_files.add(output_path) print(f"[OK] 已保存 ({downloaded}/{max_videos}): {output_path}") human_pause(human_settings) if should_take_long_break(downloaded, human_settings): take_long_break(human_settings) if round_downloaded == 0: empty_rounds += 1 else: empty_rounds = 0 if downloaded < max_videos: page.get(start_url) human_pause(human_settings) run_human_browse_sequence(page, create_human_browse_plan(human_settings)) return downloaded def extract_note_id_from_url(url: str) -> str: match = re.search(r"/(?:explore|search_result)/([^/?#]+)", url) if match: return match.group(1) return "current-page" def normalize_note_urls(urls: list[str]) -> list[str]: by_note_id: dict[str, str] = {} order: list[str] = [] for url in urls: full_url = urljoin("https://www.xiaohongshu.com", str(url)) note_id = extract_note_id_from_url(full_url) if note_id == "current-page": continue if note_id not in by_note_id: order.append(note_id) by_note_id[note_id] = f"https://www.xiaohongshu.com/explore/{note_id}?xsec_source=pc_feed" if "xsec_token=" in full_url: by_note_id[note_id] = full_url return [by_note_id[note_id] for note_id in order] def collect_note_urls_from_page(page: Any, limit: int, video_only: bool = False) -> list[str]: if video_only: script = """ return Array.from(document.querySelectorAll('section.note-item')) .filter((section) => section.querySelector('.play-icon, use[href="#play-s"], use[xlink\\\\:href="#play-s"]')) .flatMap((section) => Array.from(section.querySelectorAll('a[href*="/explore/"], a[href*="/search_result/"]'))) .map((a) => a.href || a.getAttribute('href') || '') .filter(Boolean); """ else: script = """ return Array.from(document.querySelectorAll('a[href*="/explore/"], a[href*="/search_result/"]')) .map((a) => a.href || a.getAttribute('href') || '') .filter(Boolean); """ raw_urls = page.run_js(script) or [] if not isinstance(raw_urls, list): return [] return normalize_note_urls([str(url) for url in raw_urls])[:limit] def wait_for_note_urls_from_page( page: Any, limit: int, timeout: float = 8.0, interval: float = 0.5, video_only: bool = False, ) -> list[str]: deadline = time.monotonic() + timeout while True: note_urls = collect_note_urls_from_page(page, limit=limit, video_only=video_only) if note_urls or time.monotonic() >= deadline: return note_urls time.sleep(interval) def collect_note_urls_with_browse( page: Any, limit: int, human_settings: HumanBrowseSettings, rounds: int = 3, video_only: bool = False, ) -> list[str]: collected: list[str] = [] seen_note_ids: set[str] = set() for round_index in range(max(1, rounds)): note_urls = wait_for_note_urls_from_page(page, limit=limit, video_only=video_only) for note_url in note_urls: note_id = extract_note_id_from_url(note_url) if note_id in seen_note_ids: continue seen_note_ids.add(note_id) collected.append(note_url) if len(collected) >= limit: return collected if round_index < rounds - 1: run_human_browse_sequence(page, create_human_browse_plan(human_settings)) return collected def filter_unvisited_note_urls(urls: list[str], visited_note_ids: set[str]) -> list[str]: return [url for url in urls if extract_note_id_from_url(url) not in visited_note_ids] def load_queue(queue_path: Path) -> list[QueueRecord]: if not queue_path.exists(): return [] records: list[QueueRecord] = [] for line in queue_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue data = json.loads(line) records.append(QueueRecord(**data)) return records def save_queue(queue_path: Path, records: list[QueueRecord]) -> None: queue_path.parent.mkdir(parents=True, exist_ok=True) temp_path = queue_path.with_suffix(f"{queue_path.suffix}.tmp") lines = [json.dumps(asdict(record), ensure_ascii=False, sort_keys=True) for record in records] temp_path.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8") temp_path.replace(queue_path) def merge_note_urls_into_queue(records: list[QueueRecord], note_urls: list[str], source: str) -> list[QueueRecord]: merged = list(records) existing_ids = {record.note_id for record in merged} for note_url in normalize_note_urls(note_urls): note_id = extract_note_id_from_url(note_url) if note_id in existing_ids: continue existing_ids.add(note_id) merged.append(QueueRecord(note_id=note_id, url=note_url, source=source)) return merged def count_queue_status(records: list[QueueRecord]) -> dict[str, int]: counts: dict[str, int] = {} for record in records: counts[record.status] = counts.get(record.status, 0) + 1 return counts def current_timestamp() -> str: return time.strftime("%Y-%m-%dT%H:%M:%S%z") def mark_queue_record_downloaded(record: QueueRecord, output_path: Path) -> QueueRecord: return replace( record, status="downloaded", downloaded_path=output_path.as_posix(), last_error="", updated_at=current_timestamp(), ) def mark_queue_record_skipped(record: QueueRecord, reason: str) -> QueueRecord: return replace( record, status="skipped_image", last_error=reason, updated_at=current_timestamp(), ) def mark_queue_record_failed(record: QueueRecord, error: str, retry_limit: int) -> QueueRecord: attempts = record.attempts + 1 status = "failed" if attempts >= retry_limit else "pending" return replace( record, status=status, attempts=attempts, last_error=error, updated_at=current_timestamp(), ) def build_source_url(source: str, keyword: str | None = None) -> str: if source == "explore": return DEFAULT_EXPLORE_URL if source == "video-channel": return DEFAULT_VIDEO_CHANNEL_URL if source == "current-page": return "" if source == "search": if not keyword or not keyword.strip(): raise ValueError("--source search 需要提供 --keyword") encoded_keyword = quote(keyword.strip()) return f"https://www.xiaohongshu.com/search_result?keyword={encoded_keyword}&source=web_search_result_notes&type=51" raise ValueError(f"不支持的来源: {source}") def directory_size_bytes(path: Path) -> int: if not path.exists(): return 0 if path.is_file(): return path.stat().st_size return sum(item.stat().st_size for item in path.rglob("*") if item.is_file()) def load_metadata_records(metadata_file: Path) -> list[dict[str, Any]]: if not metadata_file.exists(): return [] records: list[dict[str, Any]] = [] for line in metadata_file.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: record = json.loads(line) except json.JSONDecodeError: continue if isinstance(record, dict): records.append(record) return records def build_run_report( source: str, target_videos: int, queue_file: Path, output_dir: Path, metadata_file: Path, records: list[QueueRecord], downloaded_this_run: int, started_at: float, finished_at: float, ) -> dict[str, Any]: metadata_records = load_metadata_records(metadata_file) comment_counts = [len(record.get("comments") or []) for record in metadata_records] return { "source": source, "target_videos": target_videos, "queue_file": queue_file.as_posix(), "output_dir": output_dir.as_posix(), "metadata_file": metadata_file.as_posix(), "downloaded_this_run": downloaded_this_run, "queue_status": count_queue_status(records), "metadata_rows": len(metadata_records), "metadata_with_comments": sum(1 for count in comment_counts if count > 0), "total_comments": sum(comment_counts), "video_files": len(list(output_dir.glob("*.mp4"))) if output_dir.exists() else 0, "output_dir_bytes": directory_size_bytes(output_dir), "elapsed_seconds": round(max(0.0, finished_at - started_at), 3), "generated_at": current_timestamp(), } def write_run_report(report_file: Path, report: dict[str, Any]) -> None: report_file.parent.mkdir(parents=True, exist_ok=True) report_file.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8") def run_queue_download( source: str, target_videos: int, queue_file: Path, retry_limit: int, keyword: str | None = None, **kwargs: Any, ) -> int: timeout = int(kwargs.get("timeout", 20)) output_dir = Path(kwargs.get("output_dir", DEFAULT_OUTPUT_DIR)) metadata_file = Path(kwargs.get("metadata_file") or output_dir / "metadata.jsonl") report_file_value = kwargs.get("report_file") report_file = Path(report_file_value) if report_file_value else output_dir / "run_report.json" min_video_bytes = int(kwargs.get("min_video_bytes", DEFAULT_MIN_VIDEO_BYTES)) max_comments = int(kwargs.get("max_comments", 20)) browser_port = kwargs.get("browser_port", DEFAULT_BROWSER_PORT) human_settings = HumanBrowseSettings( enabled=bool(kwargs.get("human_mode", True)), min_wait=float(kwargs.get("min_wait", 2.0)), max_wait=float(kwargs.get("max_wait", 6.0)), long_break_every=int(kwargs.get("long_break_every", 4)), max_runtime=float(kwargs.get("max_runtime", 0.0)), ) started_at = time.monotonic() source_url = build_source_url(source, keyword=keyword) requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() if browser_port is not None: ensure_browser_debug_port_ready(int(browser_port)) page = create_page(chromium_page_cls, chromium_options_cls, int(browser_port) if browser_port is not None else None) page.listen.start(LISTEN_TARGET) records = load_queue(queue_file) downloaded_this_run = 0 seen_urls: set[str] = set() seen_files: set[Path] = set() empty_rounds = 0 max_empty_rounds = 5 while count_queue_status(records).get("downloaded", 0) < target_videos: if human_settings.max_runtime > 0 and time.monotonic() - started_at >= human_settings.max_runtime: print("[INFO] 已达到最大运行时间,结束队列任务。") break pending_indices = [index for index, record in enumerate(records) if record.status == "pending"] if not pending_indices: before_count = len(records) if source == "current-page": note_urls = normalize_note_urls([getattr(page, "url", "")]) else: page.get(source_url) human_pause(human_settings) video_card_source = source in {"search", "video-channel"} note_limit = max(50, target_videos * 3 if video_card_source else target_videos * 2) browse_rounds = 8 if video_card_source else 2 note_urls = collect_note_urls_with_browse( page, limit=note_limit, human_settings=human_settings, rounds=browse_rounds, video_only=video_card_source, ) records = merge_note_urls_into_queue(records, note_urls, source=source) save_queue(queue_file, records) added = len(records) - before_count print(f"[INFO] 队列新增 {added} 条,当前总数 {len(records)}") if added == 0: empty_rounds += 1 if empty_rounds >= max_empty_rounds: print("[INFO] 连续多轮没有新增队列项,结束队列任务。") break else: empty_rounds = 0 if source != "current-page": run_human_browse_sequence(page, create_human_browse_plan(human_settings)) continue index = pending_indices[0] record = records[index] print(f"[INFO] 处理队列项 {index + 1}/{len(records)}: {record.note_id}") try: page.get(record.url) human_pause(human_settings) if human_settings.enabled: run_human_browse_sequence(page, create_human_browse_plan(human_settings)) if max_comments > 0: loaded_comments = load_visible_comments( page, human_settings=human_settings, max_comments=max_comments, timeout=min(8.0, max(2.0, float(timeout))), ) if loaded_comments: print("[INFO] 已检测到可见评论,开始保存评论元数据。") page_html = page.run_js("return document.documentElement.outerHTML") metadata = merge_metadata( extract_note_metadata_from_html(page_html, note_id=record.note_id), extract_note_metadata_from_page(page, note_id=record.note_id, max_comments=max_comments), ) candidates = group_video_candidates( extract_video_candidates_from_html( page_html, video_id=record.note_id, ) ) if not candidates: packet = wait_for_feed_packet(page, timeout=timeout) if packet is not None: payload = extract_feed_payload(packet.response) payload_metadata = extract_note_metadata(payload, note_id=record.note_id) metadata = merge_metadata(metadata, payload_metadata) candidates = group_video_candidates(extract_video_candidates(payload)) if not candidates: records[index] = mark_queue_record_skipped(record, "no video candidate") save_queue(queue_file, records) continue candidate = candidates[0] if candidate.video_url in seen_urls: records[index] = mark_queue_record_skipped(record, "duplicate video url") save_queue(queue_file, records) continue seen_urls.add(candidate.video_url) output_path = build_output_path(candidate, output_dir=output_dir) if output_path in seen_files or output_path.exists(): records[index] = mark_queue_record_downloaded(record, output_path) save_queue(queue_file, records) continue download_video( requests_module=requests_module, headers=build_headers(record.url), video_url=candidate.video_url, output_path=output_path, min_video_bytes=min_video_bytes, ) file_size_bytes = output_path.stat().st_size metadata["file_size_bytes"] = file_size_bytes append_jsonl_record( metadata_file, build_download_metadata_record( base_metadata=metadata, candidate=candidate, queue_record=record, output_path=output_path, ), ) seen_files.add(output_path) records[index] = mark_queue_record_downloaded(record, output_path) save_queue(queue_file, records) downloaded_this_run += 1 total_downloaded = count_queue_status(records).get("downloaded", 0) print(f"[OK] 已保存 ({total_downloaded}/{target_videos}): {output_path}") human_pause(human_settings) if should_take_long_break(total_downloaded, human_settings): take_long_break(human_settings) except Exception as exc: records[index] = mark_queue_record_failed(record, str(exc), retry_limit=retry_limit) save_queue(queue_file, records) print(f"[WARN] 队列项失败 {record.note_id}: {exc}") counts = count_queue_status(records) print( "[INFO] 队列状态 " f"downloaded={counts.get('downloaded', 0)} " f"pending={counts.get('pending', 0)} " f"skipped={counts.get('skipped_image', 0)} " f"failed={counts.get('failed', 0)}" ) report = build_run_report( source=source, target_videos=target_videos, queue_file=queue_file, output_dir=output_dir, metadata_file=metadata_file, records=records, downloaded_this_run=downloaded_this_run, started_at=started_at, finished_at=time.monotonic(), ) write_run_report(report_file, report) print(f"[INFO] 运行报告已保存: {report_file}") return downloaded_this_run def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="附着到已登录小红书 Chrome,监听 feed 响应并下载视频") parser.add_argument("--max-videos", type=int, default=10, help="最多下载视频数量,默认 10") parser.add_argument("--output-dir", default=str(DEFAULT_OUTPUT_DIR), help="视频保存目录,默认 video") parser.add_argument("--browser-port", type=int, default=DEFAULT_BROWSER_PORT, help="Chrome 调试端口,默认 9223") parser.add_argument("--timeout", type=int, default=20, help="等待单次 feed 响应的秒数,默认 20") parser.add_argument("--start-url", default=DEFAULT_EXPLORE_URL, help="打开或刷新使用的小红书页面") parser.add_argument("--use-current-page", action="store_true", help="使用浏览器当前页面,不强制打开发现页") parser.add_argument("--human-mode", action=argparse.BooleanOptionalAction, default=True, help="启用温和随机浏览节奏,默认开启") parser.add_argument("--min-wait", type=float, default=2.0, help="随机停留最短秒数,默认 2") parser.add_argument("--max-wait", type=float, default=6.0, help="随机停留最长秒数,默认 6") parser.add_argument("--long-break-every", type=int, default=4, help="每下载 N 条长停留一次,默认 4") parser.add_argument("--max-runtime", type=float, default=0.0, help="最大运行秒数,0 表示不限制") parser.add_argument("--source", choices=["explore", "video-channel", "current-page", "search"], default="explore", help="长任务来源,默认 explore") parser.add_argument("--keyword", default=None, help="搜索来源关键词,例如 猫咪") parser.add_argument("--target-videos", type=int, default=0, help="队列模式目标下载数量,0 表示不启用") parser.add_argument("--queue-file", default=None, help="JSONL 队列文件路径,提供后启用可恢复队列模式") parser.add_argument("--metadata-file", default=None, help="下载成功后追加写入的元数据 JSONL 路径,默认 output-dir/metadata.jsonl") parser.add_argument("--max-comments", type=int, default=20, help="随元数据保存的可见热门评论数量,默认 20") parser.add_argument("--retry-limit", type=int, default=1, help="队列项下载失败重试次数,默认 1") parser.add_argument("--min-video-bytes", type=int, default=DEFAULT_MIN_VIDEO_BYTES, help="视频响应最小字节数,默认 204800") parser.add_argument("--report-file", default=None, help="队列任务运行报告 JSON 路径,默认 output-dir/run_report.json") return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) if args.max_videos <= 0: parser.error("--max-videos 必须大于 0") if args.browser_port <= 0: parser.error("--browser-port 必须大于 0") if args.min_wait < 0 or args.max_wait < args.min_wait: parser.error("--min-wait 和 --max-wait 必须满足 0 <= min <= max") if args.min_video_bytes < 0: parser.error("--min-video-bytes 必须大于等于 0") if args.queue_file or args.target_videos > 0: target_videos = args.target_videos if args.target_videos > 0 else args.max_videos downloaded = run_queue_download( source=args.source, target_videos=target_videos, queue_file=Path(args.queue_file or "data/xhs_queue.jsonl"), retry_limit=args.retry_limit, keyword=args.keyword, timeout=args.timeout, output_dir=Path(args.output_dir), metadata_file=Path(args.metadata_file) if args.metadata_file else None, max_comments=args.max_comments, min_video_bytes=args.min_video_bytes, report_file=Path(args.report_file) if args.report_file else None, browser_port=args.browser_port, human_mode=args.human_mode, min_wait=args.min_wait, max_wait=args.max_wait, long_break_every=args.long_break_every, max_runtime=args.max_runtime, ) print(f"[INFO] 本次共下载 {downloaded} 个视频。") return 0 downloaded = collect_videos( max_videos=args.max_videos, timeout=args.timeout, output_dir=Path(args.output_dir), browser_port=args.browser_port, start_url=args.start_url, use_current_page=args.use_current_page, human_mode=args.human_mode, min_wait=args.min_wait, max_wait=args.max_wait, long_break_every=args.long_break_every, max_runtime=args.max_runtime, ) print(f"[INFO] 本次共下载 {downloaded} 个视频。") return 0 if __name__ == "__main__": sys.exit(main())