""" 使用 DrissionPage 监听抖音作品列表接口,并批量下载视频。 运行示例: python3 Douyin.py "https://www.douyin.com/user/xxx?from_tab_name=main" 依赖: pip install requests DrissionPage """ from __future__ import annotations import argparse import json import re import socket import sys import time from pathlib import Path from typing import Any DEFAULT_USER_URL = ( "https://www.douyin.com/user/" "MS4wLjABAAAAx7--dRYA0mPwhwvxNJ-35i6sB8d1Kv4Sj1WmugquqiHK19QYlB18Ikx6cECT1RVO" "?from_tab_name=main" ) LISTEN_TARGET = "web/aweme/post/" INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]') def sanitize_filename(value: str, fallback: str = "untitled") -> str: cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._") return cleaned or fallback def choose_video_url(url_list: list[str]) -> str: for url in url_list: if "douyinvod.com" in url: return url if url_list: return url_list[0] raise ValueError("url_list 为空,无法选择视频地址。") def build_output_path(title: str, video_id: str, output_dir: Path = Path("video")) -> Path: safe_title = sanitize_filename(title, fallback="untitled") return output_dir / f"{safe_title}-{video_id}.mp4" def build_browser_address(browser_port: int | None) -> str | None: if browser_port is None: return None return f"127.0.0.1:{browser_port}" def ensure_browser_debug_port_ready(browser_port: int) -> None: try: with socket.create_connection(("127.0.0.1", browser_port), timeout=2): return except OSError as exc: raise RuntimeError( "无法连接到已启动的 Chrome 调试端口。" f"请先运行 `./.venv/bin/python login_douyin.py --browser-port {browser_port}`," "并确认 Chrome 仍在运行且端口一致。" ) from exc def extract_aweme_payload(response: Any) -> dict[str, Any]: body = getattr(response, "body", None) if isinstance(body, dict): return body raw_body = getattr(response, "raw_body", None) if isinstance(raw_body, str) and raw_body.strip(): payload = json.loads(raw_body) if isinstance(payload, dict): return payload raise ValueError("响应体不是可解析的 JSON 字典。") def parse_aweme_items(body: Any) -> list[dict[str, str]]: if not isinstance(body, dict): raise ValueError("接口响应不是字典,无法解析。") aweme_list = body.get("aweme_list") if not isinstance(aweme_list, list): raise ValueError("接口响应中缺少 aweme_list。") items: list[dict[str, str]] = [] for aweme in aweme_list: if not isinstance(aweme, dict): continue video = aweme.get("video") or {} play_addr = video.get("play_addr") or {} url_list = play_addr.get("url_list") or [] if not url_list: continue video_id = str(aweme.get("aweme_id") or "").strip() if not video_id: continue title = str(aweme.get("desc") or "").strip() or "untitled" items.append( { "title": title, "video_id": video_id, "video_url": choose_video_url([str(url) for url in url_list]), } ) return items def build_headers(referer: str) -> dict[str, str]: return { "referer": referer, "user-agent": ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/135.0.0.0 Safari/537.36" ), } def import_runtime_dependencies() -> tuple[Any, Any, Any]: try: import requests except ModuleNotFoundError as exc: raise SystemExit( "缺少 requests,请先执行: python3 -m pip install requests" ) from exc try: from DrissionPage import ChromiumPage from DrissionPage import ChromiumOptions except ModuleNotFoundError as exc: raise SystemExit( "缺少 DrissionPage,请先执行: python3 -m pip install DrissionPage" ) from exc return requests, ChromiumPage, ChromiumOptions def create_page(chromium_page_cls: Any, chromium_options_cls: Any, browser_port: int | None) -> Any: browser_address = build_browser_address(browser_port) if browser_address is None: return chromium_page_cls() options = chromium_options_cls().set_address(browser_address).existing_only(True) return chromium_page_cls(options) def wait_for_aweme_packet(page: Any, timeout: int) -> Any | None: try: return page.listen.wait(timeout=timeout) except Exception as exc: print(f"[WARN] 等待接口数据超时或失败: {exc}") return None def scroll_to_next_page(page: Any) -> None: page.run_js("window.scrollTo(0, document.body.scrollHeight);") time.sleep(2) def download_video( requests_module: Any, headers: dict[str, str], video_url: str, output_path: Path, ) -> None: response = requests_module.get(video_url, headers=headers, timeout=60) response.raise_for_status() output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_bytes(response.content) def collect_videos( user_url: str, max_pages: int, timeout: int, output_dir: Path, browser_port: int | None, ) -> int: requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() headers = build_headers(user_url) if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) page.listen.start(LISTEN_TARGET) print("[INFO] 正在打开抖音主页。若出现登录或验证码,请先在浏览器窗口里完成。") page.get(user_url) time.sleep(3) downloaded = 0 seen_ids: set[str] = set() for page_number in range(1, max_pages + 1): print(f"[INFO] 正在处理第 {page_number} 页") packet = wait_for_aweme_packet(page, timeout=timeout) if packet is None: scroll_to_next_page(page) continue try: payload = extract_aweme_payload(packet.response) items = parse_aweme_items(payload) except Exception as exc: print(f"[WARN] 解析接口数据失败: {exc}") scroll_to_next_page(page) continue if not items: print("[WARN] 这一页没有解析到视频。") for item in items: if item["video_id"] in seen_ids: continue seen_ids.add(item["video_id"]) output_path = build_output_path( title=item["title"], video_id=item["video_id"], output_dir=output_dir, ) try: download_video( requests_module=requests_module, headers=headers, video_url=item["video_url"], output_path=output_path, ) except Exception as exc: print(f"[WARN] 下载失败 {item['video_id']}: {exc}") continue downloaded += 1 print(f"[OK] 已保存: {output_path}") scroll_to_next_page(page) return downloaded def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="监听抖音作品接口并下载视频") parser.add_argument("user_url", nargs="?", default=DEFAULT_USER_URL, help="抖音博主主页 URL") parser.add_argument("--pages", type=int, default=10, help="最多抓取多少页,默认 10") parser.add_argument("--timeout", type=int, default=10, help="单次等待接口响应秒数,默认 10") parser.add_argument( "--output-dir", default="video", help="视频输出目录,默认 video", ) parser.add_argument( "--browser-port", type=int, default=None, help="附着到已启动 Chrome 的调试端口,例如 9223;不传则由 DrissionPage 新开浏览器", ) return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) if args.pages <= 0: parser.error("--pages 必须大于 0") if args.timeout <= 0: parser.error("--timeout 必须大于 0") if args.browser_port is not None and args.browser_port <= 0: parser.error("--browser-port 必须大于 0") try: total = collect_videos( user_url=args.user_url, max_pages=args.pages, timeout=args.timeout, output_dir=Path(args.output_dir), browser_port=args.browser_port, ) except RuntimeError as exc: print(f"[ERROR] {exc}") return 1 except KeyboardInterrupt: print("\n[INFO] 用户中断。") return 130 print(f"[INFO] 处理结束,共下载 {total} 个视频。") return 0 if __name__ == "__main__": sys.exit(main())