596 lines
21 KiB
Python
596 lines
21 KiB
Python
from __future__ import annotations
|
||
|
||
import argparse
|
||
import html
|
||
import json
|
||
import re
|
||
import socket
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Any
|
||
from urllib.parse import urljoin
|
||
|
||
DEFAULT_EXPLORE_URL = "https://www.xiaohongshu.com/explore"
|
||
DEFAULT_BROWSER_PORT = 9223
|
||
DEFAULT_OUTPUT_DIR = Path("video")
|
||
LISTEN_TARGET = "/api/sns/web/v1/feed"
|
||
MAX_FILENAME_BYTES = 240
|
||
INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]')
|
||
VIDEO_URL_KEYS = {"master_url", "backup_url", "backup_urls"}
|
||
TITLE_KEYS = ("display_title", "title", "desc", "description")
|
||
ID_KEYS = ("id", "note_id", "noteId", "video_id", "file_id")
|
||
AUTHOR_KEYS = ("nickname", "name", "user_name", "userName")
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class VideoCandidate:
|
||
video_id: str
|
||
title: str
|
||
video_url: str
|
||
author_name: str
|
||
source_key: str
|
||
|
||
|
||
def sanitize_filename(value: str, fallback: str = "untitled") -> str:
|
||
cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._")
|
||
return cleaned or fallback
|
||
|
||
|
||
def truncate_utf8_bytes(value: str, max_bytes: int) -> str:
|
||
if len(value.encode("utf-8")) <= max_bytes:
|
||
return value
|
||
|
||
result = ""
|
||
used = 0
|
||
for character in value:
|
||
character_bytes = len(character.encode("utf-8"))
|
||
if used + character_bytes > max_bytes:
|
||
break
|
||
result += character
|
||
used += character_bytes
|
||
return result.rstrip(" ._")
|
||
|
||
|
||
def looks_like_video_url(value: str) -> bool:
|
||
normalized = value.strip()
|
||
return normalized.startswith(("http://", "https://")) and (
|
||
".mp4" in normalized or "sns-video" in normalized or "xhscdn.com" in normalized
|
||
)
|
||
|
||
|
||
def first_string_by_keys(value: Any, keys: tuple[str, ...]) -> str | None:
|
||
if isinstance(value, dict):
|
||
for key in keys:
|
||
candidate = value.get(key)
|
||
if isinstance(candidate, str) and candidate.strip():
|
||
return candidate.strip()
|
||
for child in value.values():
|
||
found = first_string_by_keys(child, keys)
|
||
if found:
|
||
return found
|
||
elif isinstance(value, list):
|
||
for child in value:
|
||
found = first_string_by_keys(child, keys)
|
||
if found:
|
||
return found
|
||
return None
|
||
|
||
|
||
def find_nearest_note_context(path: tuple[Any, ...]) -> dict[str, str]:
|
||
video_id = ""
|
||
title = ""
|
||
author_name = ""
|
||
|
||
for node in reversed(path):
|
||
if not isinstance(node, dict):
|
||
continue
|
||
if not video_id:
|
||
video_id = first_string_by_keys(node, ID_KEYS) or ""
|
||
if not title:
|
||
title = first_string_by_keys(node, TITLE_KEYS) or ""
|
||
if not author_name:
|
||
user = node.get("user") or node.get("author")
|
||
if isinstance(user, dict):
|
||
author_name = first_string_by_keys(user, AUTHOR_KEYS) or ""
|
||
|
||
return {
|
||
"video_id": video_id or "unknown",
|
||
"title": title or "untitled",
|
||
"author_name": author_name or "unknown",
|
||
}
|
||
|
||
|
||
def append_candidate(
|
||
candidates: list[VideoCandidate],
|
||
url: str,
|
||
source_key: str,
|
||
path: tuple[Any, ...],
|
||
) -> None:
|
||
if not looks_like_video_url(url):
|
||
return
|
||
context = find_nearest_note_context(path)
|
||
candidates.append(
|
||
VideoCandidate(
|
||
video_id=context["video_id"],
|
||
title=context["title"],
|
||
video_url=url.strip(),
|
||
author_name=context["author_name"],
|
||
source_key=source_key,
|
||
)
|
||
)
|
||
|
||
|
||
def walk_for_video_candidates(value: Any, path: tuple[Any, ...], candidates: list[VideoCandidate]) -> None:
|
||
if isinstance(value, dict):
|
||
current_path = (*path, value)
|
||
for key, child in value.items():
|
||
if key in VIDEO_URL_KEYS:
|
||
if isinstance(child, str):
|
||
append_candidate(candidates, child, key, current_path)
|
||
elif isinstance(child, list):
|
||
for item in child:
|
||
if isinstance(item, str):
|
||
append_candidate(candidates, item, key, current_path)
|
||
walk_for_video_candidates(child, current_path, candidates)
|
||
elif isinstance(value, list):
|
||
for child in value:
|
||
walk_for_video_candidates(child, path, candidates)
|
||
|
||
|
||
def extract_video_candidates(payload: Any) -> list[VideoCandidate]:
|
||
candidates: list[VideoCandidate] = []
|
||
walk_for_video_candidates(payload, (), candidates)
|
||
return candidates
|
||
|
||
|
||
def decode_html_video_url(value: str) -> str:
|
||
decoded = html.unescape(value)
|
||
return decoded.replace("\\u002F", "/").replace("\\/", "/")
|
||
|
||
|
||
def extract_video_candidates_from_html(source: str, video_id: str = "current-page") -> list[VideoCandidate]:
|
||
patterns = [
|
||
r'\\"master_url\\"\s*:\s*\\"(.*?)\\"',
|
||
r'"master_url"\s*:\s*"(.*?)"',
|
||
]
|
||
candidates: list[VideoCandidate] = []
|
||
seen_urls: set[str] = set()
|
||
for pattern in patterns:
|
||
for match in re.findall(pattern, source):
|
||
video_url = decode_html_video_url(match)
|
||
if video_url in seen_urls or not looks_like_video_url(video_url):
|
||
continue
|
||
seen_urls.add(video_url)
|
||
candidates.append(
|
||
VideoCandidate(
|
||
video_id=video_id,
|
||
title="current-page-video",
|
||
video_url=video_url,
|
||
author_name="unknown",
|
||
source_key="html_master_url",
|
||
)
|
||
)
|
||
return candidates
|
||
|
||
|
||
def choose_video_candidate(candidates: list[VideoCandidate]) -> VideoCandidate:
|
||
if not candidates:
|
||
raise ValueError("没有可用的视频候选地址。")
|
||
|
||
source_priority = {"master_url": 0, "html_master_url": 0, "backup_url": 1, "backup_urls": 2, "url": 3}
|
||
return sorted(candidates, key=lambda item: source_priority.get(item.source_key, 99))[0]
|
||
|
||
|
||
def group_video_candidates(candidates: list[VideoCandidate]) -> list[VideoCandidate]:
|
||
grouped: dict[str, list[VideoCandidate]] = {}
|
||
order: list[str] = []
|
||
for candidate in candidates:
|
||
key = candidate.video_id or candidate.video_url
|
||
if key not in grouped:
|
||
grouped[key] = []
|
||
order.append(key)
|
||
grouped[key].append(candidate)
|
||
return [choose_video_candidate(grouped[key]) for key in order]
|
||
|
||
|
||
def build_output_path(candidate: VideoCandidate, output_dir: Path = DEFAULT_OUTPUT_DIR) -> Path:
|
||
safe_author = sanitize_filename(candidate.author_name, fallback="unknown")
|
||
safe_title = sanitize_filename(candidate.title, fallback="untitled")
|
||
safe_video_id = sanitize_filename(candidate.video_id, fallback="unknown")
|
||
prefix = f"[{safe_author}]"
|
||
suffix = f"-{safe_video_id}.mp4"
|
||
title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8"))
|
||
if title_budget < 1:
|
||
prefix_budget = MAX_FILENAME_BYTES - len(suffix.encode("utf-8")) - 1
|
||
prefix = truncate_utf8_bytes(prefix, max(1, prefix_budget))
|
||
title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8"))
|
||
filename = f"{prefix}{truncate_utf8_bytes(safe_title, max(1, title_budget))}{suffix}"
|
||
return output_dir / filename
|
||
|
||
|
||
def build_browser_address(browser_port: int | None) -> str | None:
|
||
if browser_port is None:
|
||
return None
|
||
return f"127.0.0.1:{browser_port}"
|
||
|
||
|
||
def ensure_browser_debug_port_ready(browser_port: int) -> None:
|
||
try:
|
||
with socket.create_connection(("127.0.0.1", browser_port), timeout=2):
|
||
return
|
||
except OSError as exc:
|
||
raise RuntimeError(
|
||
"无法连接到已启动的 Chrome 调试端口。"
|
||
f"请先运行 `./.venv/bin/python login_xhs.py --browser-port {browser_port}`,"
|
||
"并确认 Chrome 仍在运行且端口一致。"
|
||
) from exc
|
||
|
||
|
||
def build_headers(referer: str) -> dict[str, str]:
|
||
return {
|
||
"referer": referer,
|
||
"user-agent": (
|
||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/135.0.0.0 Safari/537.36"
|
||
),
|
||
}
|
||
|
||
|
||
def import_runtime_dependencies() -> tuple[Any, Any, Any]:
|
||
try:
|
||
import requests
|
||
except ModuleNotFoundError as exc:
|
||
raise SystemExit("缺少 requests,请先执行: python3 -m pip install requests") from exc
|
||
|
||
try:
|
||
from DrissionPage import ChromiumOptions
|
||
from DrissionPage import ChromiumPage
|
||
except ModuleNotFoundError as exc:
|
||
raise SystemExit("缺少 DrissionPage,请先执行: python3 -m pip install DrissionPage") from exc
|
||
|
||
return requests, ChromiumPage, ChromiumOptions
|
||
|
||
|
||
def create_page(chromium_page_cls: Any, chromium_options_cls: Any, browser_port: int | None) -> Any:
|
||
browser_address = build_browser_address(browser_port)
|
||
if browser_address is None:
|
||
return chromium_page_cls()
|
||
|
||
options = chromium_options_cls().set_address(browser_address).existing_only(True)
|
||
return chromium_page_cls(options)
|
||
|
||
|
||
def extract_feed_payload(response: Any) -> dict[str, Any]:
|
||
body = getattr(response, "body", None)
|
||
if isinstance(body, dict):
|
||
return body
|
||
|
||
raw_body = getattr(response, "raw_body", None)
|
||
if isinstance(raw_body, str) and raw_body.strip():
|
||
payload = json.loads(raw_body)
|
||
if isinstance(payload, dict):
|
||
return payload
|
||
|
||
raise ValueError("响应体不是可解析的 JSON 字典。")
|
||
|
||
|
||
def download_video(
|
||
requests_module: Any,
|
||
headers: dict[str, str],
|
||
video_url: str,
|
||
output_path: Path,
|
||
) -> None:
|
||
response = requests_module.get(video_url, headers=headers, timeout=60)
|
||
response.raise_for_status()
|
||
validate_video_response(response, video_url)
|
||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
output_path.write_bytes(response.content)
|
||
|
||
|
||
def validate_video_response(response: Any, video_url: str) -> None:
|
||
content = getattr(response, "content", b"")
|
||
content_type = str(getattr(response, "headers", {}).get("content-type", "")).lower()
|
||
|
||
if content_type.startswith("image/"):
|
||
raise ValueError(f"非视频响应: {content_type} {video_url}")
|
||
if content.startswith(b"RIFF") and b"WEBP" in content[:16]:
|
||
raise ValueError(f"非视频响应: image/webp {video_url}")
|
||
if content.lstrip().startswith((b"<html", b"<!doctype", b"{")):
|
||
raise ValueError(f"非视频响应: {content_type or 'unknown'} {video_url}")
|
||
|
||
has_video_type = content_type.startswith("video/")
|
||
has_mp4_signature = len(content) >= 12 and content[4:8] == b"ftyp"
|
||
has_webm_signature = content.startswith(b"\x1a\x45\xdf\xa3")
|
||
if has_video_type or has_mp4_signature or has_webm_signature:
|
||
return
|
||
|
||
raise ValueError(f"非视频响应: {content_type or 'unknown'} {video_url}")
|
||
|
||
|
||
def wait_for_feed_packet(page: Any, timeout: int) -> Any | None:
|
||
try:
|
||
packet = page.listen.wait(timeout=timeout)
|
||
return packet if packet else None
|
||
except Exception as exc:
|
||
print(f"[WARN] 等待 feed 数据超时或失败: {exc}")
|
||
return None
|
||
|
||
|
||
def scroll_feed(page: Any, distance: int = 900) -> None:
|
||
script = f"""
|
||
const distance = {distance};
|
||
const candidates = Array.from(document.querySelectorAll('*'))
|
||
.filter((el) => {{
|
||
const rect = el.getBoundingClientRect();
|
||
return rect.width > 300
|
||
&& rect.height > 200
|
||
&& el.scrollHeight > el.clientHeight + 20;
|
||
}})
|
||
.sort((a, b) => {{
|
||
const areaA = a.getBoundingClientRect().width * a.getBoundingClientRect().height;
|
||
const areaB = b.getBoundingClientRect().width * b.getBoundingClientRect().height;
|
||
return areaB - areaA;
|
||
}});
|
||
const target = candidates[0] || document.scrollingElement || document.documentElement;
|
||
target.scrollBy(0, distance);
|
||
"""
|
||
page.run_js(script)
|
||
time.sleep(2)
|
||
|
||
|
||
def collect_videos(
|
||
max_videos: int,
|
||
timeout: int,
|
||
output_dir: Path,
|
||
browser_port: int | None,
|
||
start_url: str = DEFAULT_EXPLORE_URL,
|
||
use_current_page: bool = False,
|
||
) -> int:
|
||
requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies()
|
||
if browser_port is not None:
|
||
ensure_browser_debug_port_ready(browser_port)
|
||
page = create_page(chromium_page_cls, chromium_options_cls, browser_port)
|
||
page.listen.start(LISTEN_TARGET)
|
||
|
||
if use_current_page:
|
||
print(f"[INFO] 使用当前页面: {getattr(page, 'url', '')}")
|
||
page.refresh()
|
||
else:
|
||
print("[INFO] 正在打开小红书发现页。若出现登录或验证码,请先在浏览器窗口里完成。")
|
||
page.get(start_url)
|
||
time.sleep(3)
|
||
|
||
downloaded = 0
|
||
seen_urls: set[str] = set()
|
||
seen_files: set[Path] = set()
|
||
consecutive_empty = 0
|
||
max_consecutive_empty = 6
|
||
|
||
if not use_current_page:
|
||
downloaded += collect_videos_from_explore_cards(
|
||
page=page,
|
||
requests_module=requests_module,
|
||
output_dir=output_dir,
|
||
max_videos=max_videos,
|
||
start_url=start_url,
|
||
seen_urls=seen_urls,
|
||
seen_files=seen_files,
|
||
)
|
||
if downloaded >= max_videos:
|
||
return downloaded
|
||
page.get(start_url)
|
||
time.sleep(2)
|
||
|
||
while downloaded < max_videos and consecutive_empty < max_consecutive_empty:
|
||
packet = wait_for_feed_packet(page, timeout=timeout)
|
||
if packet is None:
|
||
candidates = group_video_candidates(
|
||
extract_video_candidates_from_html(
|
||
page.run_js("return document.documentElement.outerHTML"),
|
||
video_id=extract_note_id_from_url(getattr(page, "url", "")),
|
||
)
|
||
)
|
||
if not candidates:
|
||
consecutive_empty += 1
|
||
scroll_feed(page)
|
||
continue
|
||
else:
|
||
try:
|
||
payload = extract_feed_payload(packet.response)
|
||
candidates = group_video_candidates(extract_video_candidates(payload))
|
||
except Exception as exc:
|
||
print(f"[WARN] 解析 feed 数据失败: {exc}")
|
||
consecutive_empty += 1
|
||
scroll_feed(page)
|
||
continue
|
||
|
||
fresh_candidates = [candidate for candidate in candidates if candidate.video_url not in seen_urls]
|
||
if not fresh_candidates:
|
||
consecutive_empty += 1
|
||
scroll_feed(page)
|
||
continue
|
||
|
||
consecutive_empty = 0
|
||
for candidate in fresh_candidates:
|
||
if downloaded >= max_videos:
|
||
break
|
||
seen_urls.add(candidate.video_url)
|
||
output_path = build_output_path(candidate, output_dir=output_dir)
|
||
if output_path in seen_files or output_path.exists():
|
||
continue
|
||
headers = build_headers(getattr(page, "url", start_url) or start_url)
|
||
try:
|
||
download_video(
|
||
requests_module=requests_module,
|
||
headers=headers,
|
||
video_url=candidate.video_url,
|
||
output_path=output_path,
|
||
)
|
||
except Exception as exc:
|
||
print(f"[WARN] 下载失败 {candidate.video_id}: {exc}")
|
||
continue
|
||
|
||
downloaded += 1
|
||
seen_files.add(output_path)
|
||
print(f"[OK] 已保存: {output_path}")
|
||
|
||
if downloaded < max_videos:
|
||
scroll_feed(page)
|
||
|
||
if downloaded == 0:
|
||
print("[WARN] 没有下载到视频。请确认已登录小红书、页面已加载 feed,并在浏览器中滚动后重试。")
|
||
return downloaded
|
||
|
||
|
||
def collect_videos_from_explore_cards(
|
||
page: Any,
|
||
requests_module: Any,
|
||
output_dir: Path,
|
||
max_videos: int,
|
||
start_url: str,
|
||
seen_urls: set[str],
|
||
seen_files: set[Path],
|
||
) -> int:
|
||
downloaded = 0
|
||
visited_note_ids: set[str] = set()
|
||
empty_rounds = 0
|
||
max_empty_rounds = 4
|
||
|
||
while downloaded < max_videos and empty_rounds < max_empty_rounds:
|
||
page.get(start_url)
|
||
time.sleep(2)
|
||
note_urls = filter_unvisited_note_urls(
|
||
collect_note_urls_from_page(page, limit=max_videos * 12),
|
||
visited_note_ids,
|
||
)
|
||
if not note_urls:
|
||
empty_rounds += 1
|
||
scroll_feed(page)
|
||
continue
|
||
|
||
round_downloaded = 0
|
||
for note_url in note_urls:
|
||
if downloaded >= max_videos:
|
||
break
|
||
note_id = extract_note_id_from_url(note_url)
|
||
visited_note_ids.add(note_id)
|
||
page.get(note_url)
|
||
time.sleep(2)
|
||
candidates = group_video_candidates(
|
||
extract_video_candidates_from_html(
|
||
page.run_js("return document.documentElement.outerHTML"),
|
||
video_id=note_id,
|
||
)
|
||
)
|
||
if not candidates:
|
||
continue
|
||
for candidate in candidates:
|
||
if downloaded >= max_videos:
|
||
break
|
||
if candidate.video_url in seen_urls:
|
||
continue
|
||
seen_urls.add(candidate.video_url)
|
||
output_path = build_output_path(candidate, output_dir=output_dir)
|
||
if output_path in seen_files or output_path.exists():
|
||
continue
|
||
try:
|
||
download_video(
|
||
requests_module=requests_module,
|
||
headers=build_headers(start_url),
|
||
video_url=candidate.video_url,
|
||
output_path=output_path,
|
||
)
|
||
except Exception as exc:
|
||
print(f"[WARN] 下载失败 {candidate.video_id}: {exc}")
|
||
continue
|
||
downloaded += 1
|
||
round_downloaded += 1
|
||
seen_files.add(output_path)
|
||
print(f"[OK] 已保存: {output_path}")
|
||
|
||
if round_downloaded == 0:
|
||
empty_rounds += 1
|
||
else:
|
||
empty_rounds = 0
|
||
if downloaded < max_videos:
|
||
page.get(start_url)
|
||
time.sleep(1)
|
||
scroll_feed(page)
|
||
return downloaded
|
||
|
||
|
||
def extract_note_id_from_url(url: str) -> str:
|
||
match = re.search(r"/explore/([^/?#]+)", url)
|
||
if match:
|
||
return match.group(1)
|
||
return "current-page"
|
||
|
||
|
||
def normalize_note_urls(urls: list[str]) -> list[str]:
|
||
by_note_id: dict[str, str] = {}
|
||
order: list[str] = []
|
||
for url in urls:
|
||
full_url = urljoin("https://www.xiaohongshu.com", str(url))
|
||
note_id = extract_note_id_from_url(full_url)
|
||
if note_id == "current-page":
|
||
continue
|
||
if note_id not in by_note_id:
|
||
order.append(note_id)
|
||
by_note_id[note_id] = f"https://www.xiaohongshu.com/explore/{note_id}?xsec_source=pc_feed"
|
||
if "xsec_token=" in full_url:
|
||
by_note_id[note_id] = full_url
|
||
return [by_note_id[note_id] for note_id in order]
|
||
|
||
|
||
def collect_note_urls_from_page(page: Any, limit: int) -> list[str]:
|
||
script = """
|
||
return Array.from(document.querySelectorAll('a[href*="/explore/"]'))
|
||
.map((a) => a.href || a.getAttribute('href') || '')
|
||
.filter(Boolean);
|
||
"""
|
||
raw_urls = page.run_js(script) or []
|
||
if not isinstance(raw_urls, list):
|
||
return []
|
||
return normalize_note_urls([str(url) for url in raw_urls])[:limit]
|
||
|
||
|
||
def filter_unvisited_note_urls(urls: list[str], visited_note_ids: set[str]) -> list[str]:
|
||
return [url for url in urls if extract_note_id_from_url(url) not in visited_note_ids]
|
||
|
||
|
||
def build_parser() -> argparse.ArgumentParser:
|
||
parser = argparse.ArgumentParser(description="附着到已登录小红书 Chrome,监听 feed 响应并下载视频")
|
||
parser.add_argument("--max-videos", type=int, default=10, help="最多下载视频数量,默认 10")
|
||
parser.add_argument("--output-dir", default=str(DEFAULT_OUTPUT_DIR), help="视频保存目录,默认 video")
|
||
parser.add_argument("--browser-port", type=int, default=DEFAULT_BROWSER_PORT, help="Chrome 调试端口,默认 9223")
|
||
parser.add_argument("--timeout", type=int, default=20, help="等待单次 feed 响应的秒数,默认 20")
|
||
parser.add_argument("--start-url", default=DEFAULT_EXPLORE_URL, help="打开或刷新使用的小红书页面")
|
||
parser.add_argument("--use-current-page", action="store_true", help="使用浏览器当前页面,不强制打开发现页")
|
||
return parser
|
||
|
||
|
||
def main(argv: list[str] | None = None) -> int:
|
||
parser = build_parser()
|
||
args = parser.parse_args(argv)
|
||
if args.max_videos <= 0:
|
||
parser.error("--max-videos 必须大于 0")
|
||
if args.browser_port <= 0:
|
||
parser.error("--browser-port 必须大于 0")
|
||
downloaded = collect_videos(
|
||
max_videos=args.max_videos,
|
||
timeout=args.timeout,
|
||
output_dir=Path(args.output_dir),
|
||
browser_port=args.browser_port,
|
||
start_url=args.start_url,
|
||
use_current_page=args.use_current_page,
|
||
)
|
||
print(f"[INFO] 本次共下载 {downloaded} 个视频。")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|