2026-05-27 15:29:23 +08:00

564 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import argparse
import html
import json
import re
import socket
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import urljoin
DEFAULT_EXPLORE_URL = "https://www.xiaohongshu.com/explore"
DEFAULT_BROWSER_PORT = 9224
DEFAULT_OUTPUT_DIR = Path("video")
LISTEN_TARGET = "/api/sns/web/v1/feed"
MAX_FILENAME_BYTES = 240
INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]')
VIDEO_URL_KEYS = {"master_url", "backup_url", "backup_urls"}
TITLE_KEYS = ("display_title", "title", "desc", "description")
ID_KEYS = ("id", "note_id", "noteId", "video_id", "file_id")
AUTHOR_KEYS = ("nickname", "name", "user_name", "userName")
@dataclass(frozen=True)
class VideoCandidate:
video_id: str
title: str
video_url: str
author_name: str
source_key: str
def sanitize_filename(value: str, fallback: str = "untitled") -> str:
cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._")
return cleaned or fallback
def truncate_utf8_bytes(value: str, max_bytes: int) -> str:
if len(value.encode("utf-8")) <= max_bytes:
return value
result = ""
used = 0
for character in value:
character_bytes = len(character.encode("utf-8"))
if used + character_bytes > max_bytes:
break
result += character
used += character_bytes
return result.rstrip(" ._")
def looks_like_video_url(value: str) -> bool:
normalized = value.strip()
return normalized.startswith(("http://", "https://")) and (
".mp4" in normalized or "sns-video" in normalized or "xhscdn.com" in normalized
)
def first_string_by_keys(value: Any, keys: tuple[str, ...]) -> str | None:
if isinstance(value, dict):
for key in keys:
candidate = value.get(key)
if isinstance(candidate, str) and candidate.strip():
return candidate.strip()
for child in value.values():
found = first_string_by_keys(child, keys)
if found:
return found
elif isinstance(value, list):
for child in value:
found = first_string_by_keys(child, keys)
if found:
return found
return None
def find_nearest_note_context(path: tuple[Any, ...]) -> dict[str, str]:
video_id = ""
title = ""
author_name = ""
for node in reversed(path):
if not isinstance(node, dict):
continue
if not video_id:
video_id = first_string_by_keys(node, ID_KEYS) or ""
if not title:
title = first_string_by_keys(node, TITLE_KEYS) or ""
if not author_name:
user = node.get("user") or node.get("author")
if isinstance(user, dict):
author_name = first_string_by_keys(user, AUTHOR_KEYS) or ""
return {
"video_id": video_id or "unknown",
"title": title or "untitled",
"author_name": author_name or "unknown",
}
def append_candidate(
candidates: list[VideoCandidate],
url: str,
source_key: str,
path: tuple[Any, ...],
) -> None:
if not looks_like_video_url(url):
return
context = find_nearest_note_context(path)
candidates.append(
VideoCandidate(
video_id=context["video_id"],
title=context["title"],
video_url=url.strip(),
author_name=context["author_name"],
source_key=source_key,
)
)
def walk_for_video_candidates(value: Any, path: tuple[Any, ...], candidates: list[VideoCandidate]) -> None:
if isinstance(value, dict):
current_path = (*path, value)
for key, child in value.items():
if key in VIDEO_URL_KEYS:
if isinstance(child, str):
append_candidate(candidates, child, key, current_path)
elif isinstance(child, list):
for item in child:
if isinstance(item, str):
append_candidate(candidates, item, key, current_path)
walk_for_video_candidates(child, current_path, candidates)
elif isinstance(value, list):
for child in value:
walk_for_video_candidates(child, path, candidates)
def extract_video_candidates(payload: Any) -> list[VideoCandidate]:
candidates: list[VideoCandidate] = []
walk_for_video_candidates(payload, (), candidates)
return candidates
def decode_html_video_url(value: str) -> str:
decoded = html.unescape(value)
return decoded.replace("\\u002F", "/").replace("\\/", "/")
def extract_video_candidates_from_html(source: str, video_id: str = "current-page") -> list[VideoCandidate]:
patterns = [
r'\\"master_url\\"\s*:\s*\\"(.*?)\\"',
r'"master_url"\s*:\s*"(.*?)"',
]
candidates: list[VideoCandidate] = []
seen_urls: set[str] = set()
for pattern in patterns:
for match in re.findall(pattern, source):
video_url = decode_html_video_url(match)
if video_url in seen_urls or not looks_like_video_url(video_url):
continue
seen_urls.add(video_url)
candidates.append(
VideoCandidate(
video_id=video_id,
title="current-page-video",
video_url=video_url,
author_name="unknown",
source_key="html_master_url",
)
)
return candidates
def choose_video_candidate(candidates: list[VideoCandidate]) -> VideoCandidate:
if not candidates:
raise ValueError("没有可用的视频候选地址。")
source_priority = {"master_url": 0, "html_master_url": 0, "backup_url": 1, "backup_urls": 2, "url": 3}
return sorted(candidates, key=lambda item: source_priority.get(item.source_key, 99))[0]
def group_video_candidates(candidates: list[VideoCandidate]) -> list[VideoCandidate]:
grouped: dict[str, list[VideoCandidate]] = {}
order: list[str] = []
for candidate in candidates:
key = candidate.video_id or candidate.video_url
if key not in grouped:
grouped[key] = []
order.append(key)
grouped[key].append(candidate)
return [choose_video_candidate(grouped[key]) for key in order]
def build_output_path(candidate: VideoCandidate, output_dir: Path = DEFAULT_OUTPUT_DIR) -> Path:
safe_author = sanitize_filename(candidate.author_name, fallback="unknown")
safe_title = sanitize_filename(candidate.title, fallback="untitled")
safe_video_id = sanitize_filename(candidate.video_id, fallback="unknown")
prefix = f"[{safe_author}]"
suffix = f"-{safe_video_id}.mp4"
title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8"))
if title_budget < 1:
prefix_budget = MAX_FILENAME_BYTES - len(suffix.encode("utf-8")) - 1
prefix = truncate_utf8_bytes(prefix, max(1, prefix_budget))
title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8"))
filename = f"{prefix}{truncate_utf8_bytes(safe_title, max(1, title_budget))}{suffix}"
return output_dir / filename
def build_browser_address(browser_port: int | None) -> str | None:
if browser_port is None:
return None
return f"127.0.0.1:{browser_port}"
def ensure_browser_debug_port_ready(browser_port: int) -> None:
try:
with socket.create_connection(("127.0.0.1", browser_port), timeout=2):
return
except OSError as exc:
raise RuntimeError(
"无法连接到已启动的 Chrome 调试端口。"
f"请先运行 `./.venv/bin/python login_xhs.py --browser-port {browser_port}`"
"并确认 Chrome 仍在运行且端口一致。"
) from exc
def build_headers(referer: str) -> dict[str, str]:
return {
"referer": referer,
"user-agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/135.0.0.0 Safari/537.36"
),
}
def import_runtime_dependencies() -> tuple[Any, Any, Any]:
try:
import requests
except ModuleNotFoundError as exc:
raise SystemExit("缺少 requests请先执行: python3 -m pip install requests") from exc
try:
from DrissionPage import ChromiumOptions
from DrissionPage import ChromiumPage
except ModuleNotFoundError as exc:
raise SystemExit("缺少 DrissionPage请先执行: python3 -m pip install DrissionPage") from exc
return requests, ChromiumPage, ChromiumOptions
def create_page(chromium_page_cls: Any, chromium_options_cls: Any, browser_port: int | None) -> Any:
browser_address = build_browser_address(browser_port)
if browser_address is None:
return chromium_page_cls()
options = chromium_options_cls().set_address(browser_address).existing_only(True)
return chromium_page_cls(options)
def extract_feed_payload(response: Any) -> dict[str, Any]:
body = getattr(response, "body", None)
if isinstance(body, dict):
return body
raw_body = getattr(response, "raw_body", None)
if isinstance(raw_body, str) and raw_body.strip():
payload = json.loads(raw_body)
if isinstance(payload, dict):
return payload
raise ValueError("响应体不是可解析的 JSON 字典。")
def download_video(
requests_module: Any,
headers: dict[str, str],
video_url: str,
output_path: Path,
) -> None:
response = requests_module.get(video_url, headers=headers, timeout=60)
response.raise_for_status()
validate_video_response(response, video_url)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(response.content)
def validate_video_response(response: Any, video_url: str) -> None:
content = getattr(response, "content", b"")
content_type = str(getattr(response, "headers", {}).get("content-type", "")).lower()
if content_type.startswith("image/"):
raise ValueError(f"非视频响应: {content_type} {video_url}")
if content.startswith(b"RIFF") and b"WEBP" in content[:16]:
raise ValueError(f"非视频响应: image/webp {video_url}")
if content.lstrip().startswith((b"<html", b"<!doctype", b"{")):
raise ValueError(f"非视频响应: {content_type or 'unknown'} {video_url}")
has_video_type = content_type.startswith("video/")
has_mp4_signature = len(content) >= 12 and content[4:8] == b"ftyp"
has_webm_signature = content.startswith(b"\x1a\x45\xdf\xa3")
if has_video_type or has_mp4_signature or has_webm_signature:
return
raise ValueError(f"非视频响应: {content_type or 'unknown'} {video_url}")
def wait_for_feed_packet(page: Any, timeout: int) -> Any | None:
try:
packet = page.listen.wait(timeout=timeout)
return packet if packet else None
except Exception as exc:
print(f"[WARN] 等待 feed 数据超时或失败: {exc}")
return None
def scroll_feed(page: Any, distance: int = 900) -> None:
script = f"""
const distance = {distance};
const candidates = Array.from(document.querySelectorAll('*'))
.filter((el) => {{
const rect = el.getBoundingClientRect();
return rect.width > 300
&& rect.height > 200
&& el.scrollHeight > el.clientHeight + 20;
}})
.sort((a, b) => {{
const areaA = a.getBoundingClientRect().width * a.getBoundingClientRect().height;
const areaB = b.getBoundingClientRect().width * b.getBoundingClientRect().height;
return areaB - areaA;
}});
const target = candidates[0] || document.scrollingElement || document.documentElement;
target.scrollBy(0, distance);
"""
page.run_js(script)
time.sleep(2)
def collect_videos(
max_videos: int,
timeout: int,
output_dir: Path,
browser_port: int | None,
start_url: str = DEFAULT_EXPLORE_URL,
use_current_page: bool = False,
) -> int:
requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies()
if browser_port is not None:
ensure_browser_debug_port_ready(browser_port)
page = create_page(chromium_page_cls, chromium_options_cls, browser_port)
page.listen.start(LISTEN_TARGET)
if use_current_page:
print(f"[INFO] 使用当前页面: {getattr(page, 'url', '')}")
page.refresh()
else:
print("[INFO] 正在打开小红书发现页。若出现登录或验证码,请先在浏览器窗口里完成。")
page.get(start_url)
time.sleep(3)
downloaded = 0
seen_urls: set[str] = set()
seen_files: set[Path] = set()
consecutive_empty = 0
max_consecutive_empty = 6
if not use_current_page:
downloaded += collect_videos_from_explore_cards(
page=page,
requests_module=requests_module,
output_dir=output_dir,
max_videos=max_videos,
start_url=start_url,
seen_urls=seen_urls,
seen_files=seen_files,
)
if downloaded >= max_videos:
return downloaded
page.get(start_url)
time.sleep(2)
while downloaded < max_videos and consecutive_empty < max_consecutive_empty:
packet = wait_for_feed_packet(page, timeout=timeout)
if packet is None:
candidates = group_video_candidates(
extract_video_candidates_from_html(
page.run_js("return document.documentElement.outerHTML"),
video_id=extract_note_id_from_url(getattr(page, "url", "")),
)
)
if not candidates:
consecutive_empty += 1
scroll_feed(page)
continue
else:
try:
payload = extract_feed_payload(packet.response)
candidates = group_video_candidates(extract_video_candidates(payload))
except Exception as exc:
print(f"[WARN] 解析 feed 数据失败: {exc}")
consecutive_empty += 1
scroll_feed(page)
continue
fresh_candidates = [candidate for candidate in candidates if candidate.video_url not in seen_urls]
if not fresh_candidates:
consecutive_empty += 1
scroll_feed(page)
continue
consecutive_empty = 0
for candidate in fresh_candidates:
if downloaded >= max_videos:
break
seen_urls.add(candidate.video_url)
output_path = build_output_path(candidate, output_dir=output_dir)
if output_path in seen_files or output_path.exists():
continue
headers = build_headers(getattr(page, "url", start_url) or start_url)
try:
download_video(
requests_module=requests_module,
headers=headers,
video_url=candidate.video_url,
output_path=output_path,
)
except Exception as exc:
print(f"[WARN] 下载失败 {candidate.video_id}: {exc}")
continue
downloaded += 1
seen_files.add(output_path)
print(f"[OK] 已保存: {output_path}")
if downloaded < max_videos:
scroll_feed(page)
if downloaded == 0:
print("[WARN] 没有下载到视频。请确认已登录小红书、页面已加载 feed并在浏览器中滚动后重试。")
return downloaded
def collect_videos_from_explore_cards(
page: Any,
requests_module: Any,
output_dir: Path,
max_videos: int,
start_url: str,
seen_urls: set[str],
seen_files: set[Path],
) -> int:
note_urls = collect_note_urls_from_page(page, limit=max_videos * 8)
downloaded = 0
for note_url in note_urls:
if downloaded >= max_videos:
break
page.get(note_url)
time.sleep(2)
candidates = group_video_candidates(
extract_video_candidates_from_html(
page.run_js("return document.documentElement.outerHTML"),
video_id=extract_note_id_from_url(note_url),
)
)
if not candidates:
continue
for candidate in candidates:
if downloaded >= max_videos:
break
if candidate.video_url in seen_urls:
continue
seen_urls.add(candidate.video_url)
output_path = build_output_path(candidate, output_dir=output_dir)
if output_path in seen_files or output_path.exists():
continue
try:
download_video(
requests_module=requests_module,
headers=build_headers(start_url),
video_url=candidate.video_url,
output_path=output_path,
)
except Exception as exc:
print(f"[WARN] 下载失败 {candidate.video_id}: {exc}")
continue
downloaded += 1
seen_files.add(output_path)
print(f"[OK] 已保存: {output_path}")
return downloaded
def extract_note_id_from_url(url: str) -> str:
match = re.search(r"/explore/([^/?#]+)", url)
if match:
return match.group(1)
return "current-page"
def normalize_note_urls(urls: list[str]) -> list[str]:
by_note_id: dict[str, str] = {}
order: list[str] = []
for url in urls:
full_url = urljoin("https://www.xiaohongshu.com", str(url))
note_id = extract_note_id_from_url(full_url)
if note_id == "current-page":
continue
if note_id not in by_note_id:
order.append(note_id)
by_note_id[note_id] = f"https://www.xiaohongshu.com/explore/{note_id}?xsec_source=pc_feed"
if "xsec_token=" in full_url:
by_note_id[note_id] = full_url
return [by_note_id[note_id] for note_id in order]
def collect_note_urls_from_page(page: Any, limit: int) -> list[str]:
script = """
return Array.from(document.querySelectorAll('a[href*="/explore/"]'))
.map((a) => a.href || a.getAttribute('href') || '')
.filter(Boolean);
"""
raw_urls = page.run_js(script) or []
if not isinstance(raw_urls, list):
return []
return normalize_note_urls([str(url) for url in raw_urls])[:limit]
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="附着到已登录小红书 Chrome监听 feed 响应并下载视频")
parser.add_argument("--max-videos", type=int, default=10, help="最多下载视频数量,默认 10")
parser.add_argument("--output-dir", default=str(DEFAULT_OUTPUT_DIR), help="视频保存目录,默认 video")
parser.add_argument("--browser-port", type=int, default=DEFAULT_BROWSER_PORT, help="Chrome 调试端口,默认 9224")
parser.add_argument("--timeout", type=int, default=20, help="等待单次 feed 响应的秒数,默认 20")
parser.add_argument("--start-url", default=DEFAULT_EXPLORE_URL, help="打开或刷新使用的小红书页面")
parser.add_argument("--use-current-page", action="store_true", help="使用浏览器当前页面,不强制打开发现页")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.max_videos <= 0:
parser.error("--max-videos 必须大于 0")
if args.browser_port <= 0:
parser.error("--browser-port 必须大于 0")
downloaded = collect_videos(
max_videos=args.max_videos,
timeout=args.timeout,
output_dir=Path(args.output_dir),
browser_port=args.browser_port,
start_url=args.start_url,
use_current_page=args.use_current_page,
)
print(f"[INFO] 本次共下载 {downloaded} 个视频。")
return 0
if __name__ == "__main__":
sys.exit(main())