""" 使用 DrissionPage 监听抖音作品列表接口,并批量下载视频。 运行示例: python3 Douyin.py "https://www.douyin.com/user/xxx?from_tab_name=main" 依赖: pip install requests DrissionPage """ from __future__ import annotations import argparse import json import re import socket import sys import time from dataclasses import dataclass from pathlib import Path from typing import Any DEFAULT_USER_URL = ( "https://www.douyin.com/user/" "MS4wLjABAAAAx7--dRYA0mPwhwvxNJ-35i6sB8d1Kv4Sj1WmugquqiHK19QYlB18Ikx6cECT1RVO" "?from_tab_name=main" ) DEFAULT_BROWSER_PORT = 9223 LISTEN_TARGET = "web/aweme/post/" SINGLE_VIDEO_LISTEN_TARGET = "web/aweme/detail/" INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]') RECOMMENDATION_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/?(?:\?.*)?$") CREATOR_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/user/[^/?#]+(?:\?.*)?$") VIDEO_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/video/(?P\d+)(?:[/?#].*)?$") AWEME_ID_PATTERN = re.compile(r"^\d{5,}$") @dataclass(frozen=True) class ResolvedTarget: kind: str value: str source: str aweme_id: str | None = None def sanitize_filename(value: str, fallback: str = "untitled") -> str: cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._") return cleaned or fallback def is_recommendation_url(value: str) -> bool: return bool(RECOMMENDATION_URL_PATTERN.match(value.strip())) def is_creator_url(value: str) -> bool: return bool(CREATOR_URL_PATTERN.match(value.strip())) def is_video_url(value: str) -> bool: return bool(VIDEO_URL_PATTERN.match(value.strip())) def is_aweme_id(value: str) -> bool: return bool(AWEME_ID_PATTERN.match(value.strip())) def extract_aweme_id_from_video_url(value: str) -> str: match = VIDEO_URL_PATTERN.match(value.strip()) if match is None: raise ValueError("不是合法的抖音视频 URL。") return match.group("aweme_id") def build_video_page_url(aweme_id: str) -> str: return f"https://www.douyin.com/video/{aweme_id}" def parse_target_input(value: str, source: str) -> ResolvedTarget: normalized = value.strip() if is_creator_url(normalized): return ResolvedTarget(kind="creator", value=normalized, source=source) if is_video_url(normalized): return ResolvedTarget( kind="single-video", value=normalized, source=source, aweme_id=extract_aweme_id_from_video_url(normalized), ) if is_aweme_id(normalized): return ResolvedTarget( kind="single-video", value=normalized, source=source, aweme_id=normalized, ) raise ValueError(f"不支持的目标: {value}") def get_active_page_url(page: Any) -> str: return str(getattr(page, "url", "") or "").strip() def resolve_target(page: Any, cli_target: str | None) -> ResolvedTarget: if cli_target: try: return parse_target_input(cli_target, source="manual") except ValueError as exc: raise RuntimeError(str(exc)) from exc current_url = get_active_page_url(page) try: return parse_target_input(current_url, source="current-page") except ValueError as exc: raise RuntimeError( "当前页面不是受支持的抖音博主页或单视频页,请切到目标页面后重试,或手动传入链接或 `aweme_id`。" ) from exc def resolve_cli_target(cli_target: str | None, browser_port: int | None) -> ResolvedTarget: if cli_target: return parse_target_input(cli_target, source="manual") _, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) return resolve_target(page=page, cli_target=None) def choose_video_url(url_list: list[str]) -> str: for url in url_list: if "douyinvod.com" in url: return url if url_list: return url_list[0] raise ValueError("url_list 为空,无法选择视频地址。") def build_output_path(title: str, video_id: str, output_dir: Path = Path("video")) -> Path: safe_title = sanitize_filename(title, fallback="untitled") return output_dir / f"{safe_title}-{video_id}.mp4" def build_browser_address(browser_port: int | None) -> str | None: if browser_port is None: return None return f"127.0.0.1:{browser_port}" def ensure_browser_debug_port_ready(browser_port: int) -> None: try: with socket.create_connection(("127.0.0.1", browser_port), timeout=2): return except OSError as exc: raise RuntimeError( "无法连接到已启动的 Chrome 调试端口。" f"请先运行 `./.venv/bin/python login_douyin.py --browser-port {browser_port}`," "并确认 Chrome 仍在运行且端口一致。" ) from exc def extract_aweme_payload(response: Any) -> dict[str, Any]: body = getattr(response, "body", None) if isinstance(body, dict): return body raw_body = getattr(response, "raw_body", None) if isinstance(raw_body, str) and raw_body.strip(): payload = json.loads(raw_body) if isinstance(payload, dict): return payload raise ValueError("响应体不是可解析的 JSON 字典。") def parse_aweme_items(body: Any) -> list[dict[str, str]]: if not isinstance(body, dict): raise ValueError("接口响应不是字典,无法解析。") aweme_list = body.get("aweme_list") if not isinstance(aweme_list, list): raise ValueError("接口响应中缺少 aweme_list。") items: list[dict[str, str]] = [] for aweme in aweme_list: if not isinstance(aweme, dict): continue video = aweme.get("video") or {} play_addr = video.get("play_addr") or {} url_list = play_addr.get("url_list") or [] if not url_list: continue video_id = str(aweme.get("aweme_id") or "").strip() if not video_id: continue title = str(aweme.get("desc") or "").strip() or "untitled" items.append( { "title": title, "video_id": video_id, "video_url": choose_video_url([str(url) for url in url_list]), } ) return items def parse_single_aweme_item(body: Any) -> dict[str, str]: if not isinstance(body, dict): raise ValueError("接口响应不是字典,无法解析。") if isinstance(body.get("aweme_detail"), dict): items = parse_aweme_items({"aweme_list": [body["aweme_detail"]]}) if items: return items[0] items = parse_aweme_items(body) if items: return items[0] raise ValueError("接口响应中缺少可下载的单视频数据。") def build_headers(referer: str) -> dict[str, str]: return { "referer": referer, "user-agent": ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/135.0.0.0 Safari/537.36" ), } def import_runtime_dependencies() -> tuple[Any, Any, Any]: try: import requests except ModuleNotFoundError as exc: raise SystemExit( "缺少 requests,请先执行: python3 -m pip install requests" ) from exc try: from DrissionPage import ChromiumPage from DrissionPage import ChromiumOptions except ModuleNotFoundError as exc: raise SystemExit( "缺少 DrissionPage,请先执行: python3 -m pip install DrissionPage" ) from exc return requests, ChromiumPage, ChromiumOptions def create_page(chromium_page_cls: Any, chromium_options_cls: Any, browser_port: int | None) -> Any: browser_address = build_browser_address(browser_port) if browser_address is None: return chromium_page_cls() options = chromium_options_cls().set_address(browser_address).existing_only(True) return chromium_page_cls(options) def wait_for_aweme_packet(page: Any, timeout: int) -> Any | None: try: return page.listen.wait(timeout=timeout) except Exception as exc: print(f"[WARN] 等待接口数据超时或失败: {exc}") return None def scroll_to_next_page(page: Any) -> None: page.run_js("window.scrollTo(0, document.body.scrollHeight);") time.sleep(2) def download_video( requests_module: Any, headers: dict[str, str], video_url: str, output_path: Path, ) -> None: response = requests_module.get(video_url, headers=headers, timeout=60) response.raise_for_status() output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_bytes(response.content) def collect_videos( user_url: str, max_pages: int, timeout: int, output_dir: Path, browser_port: int | None, auto_scroll: bool = False, ) -> int: requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() headers = build_headers(user_url) if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) page.listen.start(LISTEN_TARGET) print("[INFO] 正在打开抖音主页。若出现登录或验证码,请先在浏览器窗口里完成。") page.get(user_url) time.sleep(3) downloaded = 0 seen_ids: set[str] = set() for page_number in range(1, max_pages + 1): print(f"[INFO] 正在处理第 {page_number} 页") packet = wait_for_aweme_packet(page, timeout=timeout) if packet is None: if auto_scroll: scroll_to_next_page(page) continue raise RuntimeError("当前页面未加载出可用作品数据,请先在浏览器中完成页面加载后重试。") try: payload = extract_aweme_payload(packet.response) items = parse_aweme_items(payload) except Exception as exc: print(f"[WARN] 解析接口数据失败: {exc}") if auto_scroll: scroll_to_next_page(page) continue raise RuntimeError("当前页面未加载出可用作品数据,请先在浏览器中完成页面加载后重试。") from exc if not items: if auto_scroll: print("[WARN] 这一页没有解析到视频。") else: raise RuntimeError("当前页面未加载出可用作品数据,请先在浏览器中完成页面加载后重试。") for item in items: if item["video_id"] in seen_ids: continue seen_ids.add(item["video_id"]) output_path = build_output_path( title=item["title"], video_id=item["video_id"], output_dir=output_dir, ) try: download_video( requests_module=requests_module, headers=headers, video_url=item["video_url"], output_path=output_path, ) except Exception as exc: print(f"[WARN] 下载失败 {item['video_id']}: {exc}") continue downloaded += 1 print(f"[OK] 已保存: {output_path}") if auto_scroll: scroll_to_next_page(page) continue break return downloaded def collect_single_video( target: ResolvedTarget, timeout: int, output_dir: Path, browser_port: int | None, ) -> int: requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) page_url = target.value if target.aweme_id is not None and not is_video_url(page_url): page_url = build_video_page_url(target.aweme_id) headers = build_headers(page_url) page.listen.start(SINGLE_VIDEO_LISTEN_TARGET) print("[INFO] 正在打开抖音视频页。若出现登录或验证码,请先在浏览器窗口里完成。") page.get(page_url) time.sleep(3) packet = wait_for_aweme_packet(page, timeout=timeout) if packet is None: raise RuntimeError("当前视频页面未加载出可用视频数据,请先在浏览器中完成页面加载后重试。") try: payload = extract_aweme_payload(packet.response) item = parse_single_aweme_item(payload) except Exception as exc: raise RuntimeError("当前视频页面未加载出可用视频数据,请先在浏览器中完成页面加载后重试。") from exc output_path = build_output_path( title=item["title"], video_id=item["video_id"], output_dir=output_dir, ) download_video( requests_module=requests_module, headers=headers, video_url=item["video_url"], output_path=output_path, ) print(f"[OK] 已保存: {output_path}") return 1 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="附着抖音登录浏览器并下载当前页面或指定目标的视频") parser.add_argument( "target", nargs="?", default=None, help="可选:博主主页 URL、单视频 URL 或 aweme_id;不传则读取当前浏览器页面", ) parser.add_argument("--pages", type=int, default=1, help="创作者抓取最多处理多少页;默认 1") parser.add_argument("--timeout", type=int, default=10, help="单次等待接口响应秒数,默认 10") parser.add_argument( "--output-dir", default="video", help="视频输出目录,默认 video", ) parser.add_argument( "--browser-port", type=int, default=DEFAULT_BROWSER_PORT, help="附着到已启动 Chrome 的调试端口,默认 9223", ) return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) if args.pages <= 0: parser.error("--pages 必须大于 0") if args.timeout <= 0: parser.error("--timeout 必须大于 0") if args.browser_port is not None and args.browser_port <= 0: parser.error("--browser-port 必须大于 0") try: target = resolve_cli_target(args.target, browser_port=args.browser_port) if target.kind == "creator": total = collect_videos( user_url=target.value, max_pages=args.pages, timeout=args.timeout, output_dir=Path(args.output_dir), browser_port=args.browser_port, auto_scroll=args.pages > 1, ) elif target.kind == "single-video": total = collect_single_video( target=target, timeout=args.timeout, output_dir=Path(args.output_dir), browser_port=args.browser_port, ) else: raise RuntimeError(f"不支持的目标类型: {target.kind}") except RuntimeError as exc: print(f"[ERROR] {exc}") return 1 except KeyboardInterrupt: print("\n[INFO] 用户中断。") return 130 print(f"[INFO] 处理结束,共下载 {total} 个视频。") return 0 if __name__ == "__main__": sys.exit(main())