""" 使用 DrissionPage 监听抖音作品列表接口,并批量下载视频。 运行示例: python3 Douyin.py "https://www.douyin.com/user/xxx?from_tab_name=main" 依赖: pip install requests DrissionPage """ from __future__ import annotations import argparse import json import random import re import socket import sys import time from dataclasses import dataclass from pathlib import Path from typing import Any DEFAULT_USER_URL = ( "https://www.douyin.com/user/" "MS4wLjABAAAAx7--dRYA0mPwhwvxNJ-35i6sB8d1Kv4Sj1WmugquqiHK19QYlB18Ikx6cECT1RVO" "?from_tab_name=main" ) DEFAULT_BROWSER_PORT = 9223 LISTEN_TARGET = "web/aweme/post/" RECOMMENDATION_LISTEN_TARGET = "aweme/v2/web/module/feed/" SINGLE_VIDEO_LISTEN_TARGET = "web/aweme/detail/" MAX_FILENAME_BYTES = 240 INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]') RECOMMENDATION_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/?(?:jingxuan)?(?:\?.*)?$") CREATOR_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/user/[^/?#]+(?:\?.*)?$") VIDEO_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/video/(?P\d+)(?:[/?#].*)?$") AWEME_ID_PATTERN = re.compile(r"^\d{5,}$") @dataclass(frozen=True) class ResolvedTarget: kind: str value: str source: str aweme_id: str | None = None @dataclass(frozen=True) class ScrollSettings: mode: str = "human" min_wait: float = 2.0 max_wait: float = 8.0 reverse_scroll_probability: float = 0.2 max_runtime: float = 600.0 min_scroll: int = 300 max_scroll: int = 900 min_reverse_scroll: int = 80 max_reverse_scroll: int = 250 @dataclass(frozen=True) class HumanScrollPlan: down_distance: int down_wait: float reverse_distance: int = 0 reverse_wait: float = 0.0 settle_wait: float = 0.0 def sanitize_filename(value: str, fallback: str = "untitled") -> str: cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._") return cleaned or fallback def truncate_utf8_bytes(value: str, max_bytes: int) -> str: if len(value.encode("utf-8")) <= max_bytes: return value result = "" used = 0 for character in value: character_bytes = len(character.encode("utf-8")) if used + character_bytes > max_bytes: break result += character used += character_bytes return result.rstrip(" ._") def is_recommendation_url(value: str) -> bool: return bool(RECOMMENDATION_URL_PATTERN.match(value.strip())) def is_creator_url(value: str) -> bool: return bool(CREATOR_URL_PATTERN.match(value.strip())) def is_video_url(value: str) -> bool: return bool(VIDEO_URL_PATTERN.match(value.strip())) def is_aweme_id(value: str) -> bool: return bool(AWEME_ID_PATTERN.match(value.strip())) def extract_aweme_id_from_video_url(value: str) -> str: match = VIDEO_URL_PATTERN.match(value.strip()) if match is None: raise ValueError("不是合法的抖音视频 URL。") return match.group("aweme_id") def build_video_page_url(aweme_id: str) -> str: return f"https://www.douyin.com/video/{aweme_id}" def parse_target_input(value: str, source: str) -> ResolvedTarget: normalized = value.strip() if is_recommendation_url(normalized): return ResolvedTarget(kind="recommendation", value=normalized, source=source) if is_creator_url(normalized): return ResolvedTarget(kind="creator", value=normalized, source=source) if is_video_url(normalized): return ResolvedTarget( kind="single-video", value=normalized, source=source, aweme_id=extract_aweme_id_from_video_url(normalized), ) if is_aweme_id(normalized): return ResolvedTarget( kind="single-video", value=normalized, source=source, aweme_id=normalized, ) raise ValueError(f"不支持的目标: {value}") def get_active_page_url(page: Any) -> str: return str(getattr(page, "url", "") or "").strip() def resolve_target(page: Any, cli_target: str | None) -> ResolvedTarget: if cli_target: try: return parse_target_input(cli_target, source="manual") except ValueError as exc: raise RuntimeError(str(exc)) from exc current_url = get_active_page_url(page) try: return parse_target_input(current_url, source="current-page") except ValueError as exc: raise RuntimeError( "当前页面不是受支持的抖音博主页或单视频页,请切到目标页面后重试,或手动传入链接或 `aweme_id`。" ) from exc def resolve_cli_target(cli_target: str | None, browser_port: int | None) -> ResolvedTarget: if cli_target: return parse_target_input(cli_target, source="manual") _, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) return resolve_target(page=page, cli_target=None) def choose_video_url(url_list: list[str]) -> str: for url in url_list: if "douyinvod.com" in url: return url if url_list: return url_list[0] raise ValueError("url_list 为空,无法选择视频地址。") def extract_url_list_from_play_addr(play_addr: Any) -> list[str]: if not isinstance(play_addr, dict): return [] url_list = play_addr.get("url_list") or [] if not isinstance(url_list, list): return [] return [str(url) for url in url_list if str(url).strip()] def extract_video_url_list(video: Any) -> list[str]: if not isinstance(video, dict): return [] for address_key in ("play_addr", "play_addr_h264", "play_addr_lowbr"): url_list = extract_url_list_from_play_addr(video.get(address_key)) if url_list: return url_list bit_rate_list = video.get("bit_rate") or [] if not isinstance(bit_rate_list, list): return [] for bit_rate in bit_rate_list: if not isinstance(bit_rate, dict): continue url_list = extract_url_list_from_play_addr(bit_rate.get("play_addr")) if url_list: return url_list return [] def build_output_path( title: str, video_id: str, output_dir: Path = Path("video"), author_name: str | None = None, ) -> Path: safe_title = sanitize_filename(title, fallback="untitled") suffix = f"-{video_id}.mp4" if author_name: safe_author = sanitize_filename(author_name, fallback="unknown") prefix = f"[{safe_author}]" else: prefix = "" title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8")) if title_budget < 1: prefix_budget = MAX_FILENAME_BYTES - len(suffix.encode("utf-8")) - 1 prefix = truncate_utf8_bytes(prefix, max(1, prefix_budget)) title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8")) filename = f"{prefix}{truncate_utf8_bytes(safe_title, max(1, title_budget))}{suffix}" return output_dir / filename def build_browser_address(browser_port: int | None) -> str | None: if browser_port is None: return None return f"127.0.0.1:{browser_port}" def ensure_browser_debug_port_ready(browser_port: int) -> None: try: with socket.create_connection(("127.0.0.1", browser_port), timeout=2): return except OSError as exc: raise RuntimeError( "无法连接到已启动的 Chrome 调试端口。" f"请先运行 `./.venv/bin/python login_douyin.py --browser-port {browser_port}`," "并确认 Chrome 仍在运行且端口一致。" ) from exc def extract_aweme_payload(response: Any) -> dict[str, Any]: body = getattr(response, "body", None) if isinstance(body, dict): return body raw_body = getattr(response, "raw_body", None) if isinstance(raw_body, str) and raw_body.strip(): payload = json.loads(raw_body) if isinstance(payload, dict): return payload raise ValueError("响应体不是可解析的 JSON 字典。") def parse_aweme_items(body: Any) -> list[dict[str, str]]: if not isinstance(body, dict): raise ValueError("接口响应不是字典,无法解析。") aweme_list = body.get("aweme_list") if not isinstance(aweme_list, list): raise ValueError("接口响应中缺少 aweme_list。") items: list[dict[str, str]] = [] for aweme in aweme_list: if not isinstance(aweme, dict): continue video = aweme.get("video") or {} url_list = extract_video_url_list(video) if not url_list: continue video_id = str(aweme.get("aweme_id") or "").strip() if not video_id: continue title = str(aweme.get("desc") or "").strip() or "untitled" author = aweme.get("author") or {} author_name = str(author.get("nickname") or "").strip() or "unknown" author_id = str(author.get("uid") or "").strip() or "unknown" items.append( { "title": title, "video_id": video_id, "video_url": choose_video_url(url_list), "author_name": author_name, "author_id": author_id, } ) return items def parse_single_aweme_item(body: Any) -> dict[str, str]: if not isinstance(body, dict): raise ValueError("接口响应不是字典,无法解析。") if isinstance(body.get("aweme_detail"), dict): items = parse_aweme_items({"aweme_list": [body["aweme_detail"]]}) if items: return items[0] items = parse_aweme_items(body) if items: return items[0] raise ValueError("接口响应中缺少可下载的单视频数据。") def build_headers(referer: str) -> dict[str, str]: return { "referer": referer, "user-agent": ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/135.0.0.0 Safari/537.36" ), } def import_runtime_dependencies() -> tuple[Any, Any, Any]: try: import requests except ModuleNotFoundError as exc: raise SystemExit( "缺少 requests,请先执行: python3 -m pip install requests" ) from exc try: from DrissionPage import ChromiumPage from DrissionPage import ChromiumOptions except ModuleNotFoundError as exc: raise SystemExit( "缺少 DrissionPage,请先执行: python3 -m pip install DrissionPage" ) from exc return requests, ChromiumPage, ChromiumOptions def create_page(chromium_page_cls: Any, chromium_options_cls: Any, browser_port: int | None) -> Any: browser_address = build_browser_address(browser_port) if browser_address is None: return chromium_page_cls() options = chromium_options_cls().set_address(browser_address).existing_only(True) return chromium_page_cls(options) def wait_for_aweme_packet(page: Any, timeout: int) -> Any | None: try: packet = page.listen.wait(timeout=timeout) return packet if packet else None except Exception as exc: print(f"[WARN] 等待接口数据超时或失败: {exc}") return None def scroll_to_next_page(page: Any) -> None: page.run_js("window.scrollTo(0, document.body.scrollHeight);") time.sleep(2) def create_human_scroll_plan( settings: ScrollSettings, random_module: Any = random, ) -> HumanScrollPlan: down_distance = random_module.randint(settings.min_scroll, settings.max_scroll) down_wait = random_module.uniform(settings.min_wait, settings.max_wait) settle_wait = random_module.uniform(settings.min_wait, settings.max_wait) reverse_distance = 0 reverse_wait = 0.0 if random_module.random() < settings.reverse_scroll_probability: reverse_distance = random_module.randint( settings.min_reverse_scroll, settings.max_reverse_scroll, ) reverse_wait = random_module.uniform(1.0, min(3.0, settings.max_wait)) return HumanScrollPlan( down_distance=down_distance, down_wait=down_wait, reverse_distance=reverse_distance, reverse_wait=reverse_wait, settle_wait=settle_wait, ) def run_scroll_step(page: Any, distance: int) -> bool: script = f""" const distance = {distance}; function findMainScrollContainer() {{ const preferredSelectors = ['.tKqwmYAX', '.route-scroll-container', '.semi-tabs-content']; for (const selector of preferredSelectors) {{ const el = document.querySelector(selector); if (el && el.scrollHeight > el.clientHeight + 20) {{ return el; }} }} const candidates = Array.from(document.querySelectorAll('*')) .filter((el) => {{ const rect = el.getBoundingClientRect(); return rect.width > 300 && rect.height > 200 && el.scrollHeight > el.clientHeight + 20; }}) .sort((a, b) => {{ const areaA = a.getBoundingClientRect().width * a.getBoundingClientRect().height; const areaB = b.getBoundingClientRect().width * b.getBoundingClientRect().height; return areaB - areaA; }}); return candidates[0] || null; }} const scrollTarget = findMainScrollContainer(); if (scrollTarget) {{ scrollTarget.scrollBy(0, distance); return true; }} return false; """ scrolled_container = bool(page.run_js(script)) if not scrolled_container: page.run_js(f"window.scrollBy(0, {distance});") return scrolled_container def run_human_scroll_sequence(page: Any, plan: HumanScrollPlan) -> None: run_scroll_step(page, plan.down_distance) print(f"[INFO] 向下滚动 {plan.down_distance}px,停留 {plan.down_wait:.1f}s") time.sleep(plan.down_wait) if plan.reverse_distance > 0: run_scroll_step(page, -plan.reverse_distance) print(f"[INFO] 小幅回滚 {plan.reverse_distance}px,停留 {plan.reverse_wait:.1f}s") time.sleep(plan.reverse_wait) forward_distance = plan.reverse_distance * 2 run_scroll_step(page, forward_distance) if plan.settle_wait > 0: print(f"[INFO] 继续停留 {plan.settle_wait:.1f}s") time.sleep(plan.settle_wait) def human_like_scroll(page: Any, settings: ScrollSettings | None = None) -> None: scroll_settings = settings or ScrollSettings() run_human_scroll_sequence(page, create_human_scroll_plan(scroll_settings)) def download_video( requests_module: Any, headers: dict[str, str], video_url: str, output_path: Path, ) -> None: response = requests_module.get(video_url, headers=headers, timeout=60) response.raise_for_status() output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_bytes(response.content) def collect_videos( user_url: str, max_pages: int, timeout: int, output_dir: Path, browser_port: int | None, auto_scroll: bool = False, ) -> int: requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() headers = build_headers(user_url) if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) page.listen.start(LISTEN_TARGET) print("[INFO] 正在打开抖音主页。若出现登录或验证码,请先在浏览器窗口里完成。") page.get(user_url) time.sleep(3) downloaded = 0 seen_ids: set[str] = set() for page_number in range(1, max_pages + 1): print(f"[INFO] 正在处理第 {page_number} 页") packet = wait_for_aweme_packet(page, timeout=timeout) if packet is None: if auto_scroll: scroll_to_next_page(page) continue raise RuntimeError("当前页面未加载出可用作品数据,请先在浏览器中完成页面加载后重试。") try: payload = extract_aweme_payload(packet.response) items = parse_aweme_items(payload) except Exception as exc: print(f"[WARN] 解析接口数据失败: {exc}") if auto_scroll: scroll_to_next_page(page) continue raise RuntimeError("当前页面未加载出可用作品数据,请先在浏览器中完成页面加载后重试。") from exc if not items: if auto_scroll: print("[WARN] 这一页没有解析到视频。") else: raise RuntimeError("当前页面未加载出可用作品数据,请先在浏览器中完成页面加载后重试。") for item in items: if item["video_id"] in seen_ids: continue seen_ids.add(item["video_id"]) output_path = build_output_path( title=item["title"], video_id=item["video_id"], output_dir=output_dir, ) try: download_video( requests_module=requests_module, headers=headers, video_url=item["video_url"], output_path=output_path, ) except Exception as exc: print(f"[WARN] 下载失败 {item['video_id']}: {exc}") continue downloaded += 1 print(f"[OK] 已保存: {output_path}") if auto_scroll: scroll_to_next_page(page) continue break return downloaded def collect_recommendations( max_videos: int, timeout: int, output_dir: Path, browser_port: int | None, scroll_settings: ScrollSettings | None = None, ) -> int: requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() headers = build_headers("https://www.douyin.com/") if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) page.listen.start(RECOMMENDATION_LISTEN_TARGET) print("[INFO] 正在打开抖音推荐流。若出现登录或验证码,请先在浏览器窗口里完成。") page.get("https://www.douyin.com/") time.sleep(3) downloaded = 0 seen_ids: set[str] = set() consecutive_empty = 0 max_consecutive_empty = 6 settings = scroll_settings or ScrollSettings() started_at = time.monotonic() while downloaded < max_videos: if settings.max_runtime > 0 and time.monotonic() - started_at >= settings.max_runtime: print("[INFO] 已达到最大运行时间,结束抓取。") break packet = wait_for_aweme_packet(page, timeout=timeout) if packet is None: consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: print("[INFO] 连续多次未获取到新数据,结束抓取。") break human_like_scroll(page, settings=settings) continue try: payload = extract_aweme_payload(packet.response) items = parse_aweme_items(payload) except Exception as exc: print(f"[WARN] 解析接口数据失败: {exc}") consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: break human_like_scroll(page, settings=settings) continue if not items: consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: break human_like_scroll(page, settings=settings) continue consecutive_empty = 0 new_items_in_batch = 0 for item in items: if item["video_id"] in seen_ids: continue if downloaded >= max_videos: break seen_ids.add(item["video_id"]) output_path = build_output_path( title=item["title"], video_id=item["video_id"], output_dir=output_dir, author_name=item.get("author_name"), ) try: download_video( requests_module=requests_module, headers=headers, video_url=item["video_url"], output_path=output_path, ) except Exception as exc: print(f"[WARN] 下载失败 {item['video_id']}: {exc}") continue downloaded += 1 new_items_in_batch += 1 print(f"[OK] 已保存: {output_path}") if new_items_in_batch == 0: consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: break human_like_scroll(page, settings=settings) return downloaded def collect_single_video( target: ResolvedTarget, timeout: int, output_dir: Path, browser_port: int | None, ) -> int: requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) page_url = target.value if target.aweme_id is not None and not is_video_url(page_url): page_url = build_video_page_url(target.aweme_id) headers = build_headers(page_url) page.listen.start(SINGLE_VIDEO_LISTEN_TARGET) print("[INFO] 正在打开抖音视频页。若出现登录或验证码,请先在浏览器窗口里完成。") page.get(page_url) time.sleep(3) packet = wait_for_aweme_packet(page, timeout=timeout) if packet is None: raise RuntimeError("当前视频页面未加载出可用视频数据,请先在浏览器中完成页面加载后重试。") try: payload = extract_aweme_payload(packet.response) item = parse_single_aweme_item(payload) except Exception as exc: raise RuntimeError("当前视频页面未加载出可用视频数据,请先在浏览器中完成页面加载后重试。") from exc output_path = build_output_path( title=item["title"], video_id=item["video_id"], output_dir=output_dir, ) download_video( requests_module=requests_module, headers=headers, video_url=item["video_url"], output_path=output_path, ) print(f"[OK] 已保存: {output_path}") return 1 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="附着抖音登录浏览器并下载当前页面或指定目标的视频") parser.add_argument( "target", nargs="?", default=None, help="可选:博主主页 URL、单视频 URL 或 aweme_id;不传则读取当前浏览器页面", ) parser.add_argument("--pages", type=int, default=1, help="创作者抓取最多处理多少页;默认 1") parser.add_argument("--timeout", type=int, default=10, help="单次等待接口响应秒数,默认 10") parser.add_argument( "--output-dir", default="video", help="视频输出目录,默认 video", ) parser.add_argument( "--browser-port", type=int, default=DEFAULT_BROWSER_PORT, help="附着到已启动 Chrome 的调试端口,默认 9223", ) parser.add_argument( "--max-videos", type=int, default=50, help="推荐流最大抓取数量,默认 50", ) parser.add_argument( "--scroll-mode", choices=["human"], default="human", help="推荐流滚动模式,默认 human", ) parser.add_argument( "--min-wait", type=float, default=2.0, help="推荐流每次滚动后的最短等待秒数,默认 2", ) parser.add_argument( "--max-wait", type=float, default=8.0, help="推荐流每次滚动后的最长等待秒数,默认 8", ) parser.add_argument( "--reverse-scroll-probability", type=float, default=0.2, help="推荐流小幅回滚概率,取值 0 到 1,默认 0.2", ) parser.add_argument( "--max-runtime", type=float, default=600.0, help="推荐流最大运行秒数,默认 600;设置为 0 表示不限制", ) return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) if args.pages <= 0: parser.error("--pages 必须大于 0") if args.timeout <= 0: parser.error("--timeout 必须大于 0") if args.browser_port is not None and args.browser_port <= 0: parser.error("--browser-port 必须大于 0") if args.max_videos <= 0: parser.error("--max-videos 必须大于 0") if args.min_wait < 0: parser.error("--min-wait 不能小于 0") if args.max_wait < args.min_wait: parser.error("--max-wait 必须大于或等于 --min-wait") if not 0 <= args.reverse_scroll_probability <= 1: parser.error("--reverse-scroll-probability 必须在 0 到 1 之间") if args.max_runtime < 0: parser.error("--max-runtime 不能小于 0") scroll_settings = ScrollSettings( mode=args.scroll_mode, min_wait=args.min_wait, max_wait=args.max_wait, reverse_scroll_probability=args.reverse_scroll_probability, max_runtime=args.max_runtime, ) try: target = resolve_cli_target(args.target, browser_port=args.browser_port) if target.kind == "creator": total = collect_videos( user_url=target.value, max_pages=args.pages, timeout=args.timeout, output_dir=Path(args.output_dir), browser_port=args.browser_port, auto_scroll=args.pages > 1, ) elif target.kind == "recommendation": total = collect_recommendations( max_videos=args.max_videos, timeout=args.timeout, output_dir=Path(args.output_dir), browser_port=args.browser_port, scroll_settings=scroll_settings, ) elif target.kind == "single-video": total = collect_single_video( target=target, timeout=args.timeout, output_dir=Path(args.output_dir), browser_port=args.browser_port, ) else: raise RuntimeError(f"不支持的目标类型: {target.kind}") except RuntimeError as exc: print(f"[ERROR] {exc}") return 1 except KeyboardInterrupt: print("\n[INFO] 用户中断。") return 130 print(f"[INFO] 处理结束,共下载 {total} 个视频。") return 0 if __name__ == "__main__": sys.exit(main())