""" 使用 DrissionPage 监听抖音作品列表接口,并批量下载视频。 运行示例: python3 Douyin.py "https://www.douyin.com/user/xxx?from_tab_name=main" 依赖: pip install requests DrissionPage """ from __future__ import annotations import argparse import json import random import re import socket import sys import time from dataclasses import dataclass from pathlib import Path from typing import Any from urllib.parse import quote DEFAULT_USER_URL = ( "https://www.douyin.com/user/" "MS4wLjABAAAAx7--dRYA0mPwhwvxNJ-35i6sB8d1Kv4Sj1WmugquqiHK19QYlB18Ikx6cECT1RVO" "?from_tab_name=main" ) DEFAULT_BROWSER_PORT = 9223 LISTEN_TARGET = "web/aweme/post/" RECOMMENDATION_LISTEN_TARGET = "aweme/v2/web/module/feed/" SINGLE_VIDEO_LISTEN_TARGET = "web/aweme/detail/" SEARCH_LISTEN_TARGET = "aweme/v1/web/general/search/single" MAX_FILENAME_BYTES = 240 INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]') RECOMMENDATION_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/?(?:jingxuan)?(?:\?.*)?$") CREATOR_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/user/[^/?#]+(?:\?.*)?$") VIDEO_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/video/(?P\d+)(?:[/?#].*)?$") AWEME_ID_PATTERN = re.compile(r"^\d{5,}$") @dataclass(frozen=True) class ResolvedTarget: kind: str value: str source: str aweme_id: str | None = None @dataclass(frozen=True) class ScrollSettings: mode: str = "human" min_wait: float = 2.0 max_wait: float = 8.0 reverse_scroll_probability: float = 0.2 max_runtime: float = 600.0 min_scroll: int = 300 max_scroll: int = 900 min_reverse_scroll: int = 80 max_reverse_scroll: int = 250 @dataclass(frozen=True) class HumanScrollPlan: down_distance: int down_wait: float reverse_distance: int = 0 reverse_wait: float = 0.0 settle_wait: float = 0.0 def sanitize_filename(value: str, fallback: str = "untitled") -> str: cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._") return cleaned or fallback def truncate_utf8_bytes(value: str, max_bytes: int) -> str: if len(value.encode("utf-8")) <= max_bytes: return value result = "" used = 0 for character in value: character_bytes = len(character.encode("utf-8")) if used + character_bytes > max_bytes: break result += character used += character_bytes return result.rstrip(" ._") def is_recommendation_url(value: str) -> bool: return bool(RECOMMENDATION_URL_PATTERN.match(value.strip())) def is_creator_url(value: str) -> bool: return bool(CREATOR_URL_PATTERN.match(value.strip())) def is_video_url(value: str) -> bool: return bool(VIDEO_URL_PATTERN.match(value.strip())) def is_aweme_id(value: str) -> bool: return bool(AWEME_ID_PATTERN.match(value.strip())) def extract_aweme_id_from_video_url(value: str) -> str: match = VIDEO_URL_PATTERN.match(value.strip()) if match is None: raise ValueError("不是合法的抖音视频 URL。") return match.group("aweme_id") def build_video_page_url(aweme_id: str) -> str: return f"https://www.douyin.com/video/{aweme_id}" def build_search_page_url(keyword: str) -> str: return f"https://www.douyin.com/search/{quote(keyword)}?type=general" def parse_target_input(value: str, source: str) -> ResolvedTarget: normalized = value.strip() if is_recommendation_url(normalized): return ResolvedTarget(kind="recommendation", value=normalized, source=source) if is_creator_url(normalized): return ResolvedTarget(kind="creator", value=normalized, source=source) if is_video_url(normalized): return ResolvedTarget( kind="single-video", value=normalized, source=source, aweme_id=extract_aweme_id_from_video_url(normalized), ) if is_aweme_id(normalized): return ResolvedTarget( kind="single-video", value=normalized, source=source, aweme_id=normalized, ) raise ValueError(f"不支持的目标: {value}") def get_active_page_url(page: Any) -> str: return str(getattr(page, "url", "") or "").strip() def resolve_target(page: Any, cli_target: str | None) -> ResolvedTarget: if cli_target: try: return parse_target_input(cli_target, source="manual") except ValueError as exc: raise RuntimeError(str(exc)) from exc current_url = get_active_page_url(page) try: return parse_target_input(current_url, source="current-page") except ValueError as exc: raise RuntimeError( "当前页面不是受支持的抖音博主页或单视频页,请切到目标页面后重试,或手动传入链接或 `aweme_id`。" ) from exc def resolve_cli_target(cli_target: str | None, browser_port: int | None) -> ResolvedTarget: if cli_target: return parse_target_input(cli_target, source="manual") _, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) return resolve_target(page=page, cli_target=None) def choose_video_url(url_list: list[str]) -> str: for url in url_list: if "douyinvod.com" in url: return url if url_list: return url_list[0] raise ValueError("url_list 为空,无法选择视频地址。") def extract_url_list_from_play_addr(play_addr: Any) -> list[str]: if not isinstance(play_addr, dict): return [] url_list = play_addr.get("url_list") or [] if not isinstance(url_list, list): return [] return [str(url) for url in url_list if str(url).strip()] def extract_video_url_list(video: Any) -> list[str]: if not isinstance(video, dict): return [] for address_key in ("play_addr", "play_addr_h264", "play_addr_lowbr"): url_list = extract_url_list_from_play_addr(video.get(address_key)) if url_list: return url_list bit_rate_list = video.get("bit_rate") or [] if not isinstance(bit_rate_list, list): return [] for bit_rate in bit_rate_list: if not isinstance(bit_rate, dict): continue url_list = extract_url_list_from_play_addr(bit_rate.get("play_addr")) if url_list: return url_list return [] def build_output_path( title: str, video_id: str, output_dir: Path = Path("video"), author_name: str | None = None, ) -> Path: safe_title = sanitize_filename(title, fallback="untitled") suffix = f"-{video_id}.mp4" if author_name: safe_author = sanitize_filename(author_name, fallback="unknown") prefix = f"[{safe_author}]" else: prefix = "" title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8")) if title_budget < 1: prefix_budget = MAX_FILENAME_BYTES - len(suffix.encode("utf-8")) - 1 prefix = truncate_utf8_bytes(prefix, max(1, prefix_budget)) title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8")) filename = f"{prefix}{truncate_utf8_bytes(safe_title, max(1, title_budget))}{suffix}" return output_dir / filename def build_browser_address(browser_port: int | None) -> str | None: if browser_port is None: return None return f"127.0.0.1:{browser_port}" def ensure_browser_debug_port_ready(browser_port: int) -> None: try: with socket.create_connection(("127.0.0.1", browser_port), timeout=2): return except OSError as exc: raise RuntimeError( "无法连接到已启动的 Chrome 调试端口。" f"请先运行 `./.venv/bin/python login_douyin.py --browser-port {browser_port}`," "并确认 Chrome 仍在运行且端口一致。" ) from exc def extract_aweme_payload(response: Any) -> dict[str, Any]: body = getattr(response, "body", None) if isinstance(body, dict): return body raw_body = getattr(response, "raw_body", None) if isinstance(raw_body, str) and raw_body.strip(): payload = json.loads(raw_body) if isinstance(payload, dict): return payload raise ValueError("响应体不是可解析的 JSON 字典。") def parse_aweme_items(body: Any) -> list[dict[str, str]]: if not isinstance(body, dict): raise ValueError("接口响应不是字典,无法解析。") aweme_list = body.get("aweme_list") if not isinstance(aweme_list, list): raise ValueError("接口响应中缺少 aweme_list。") items: list[dict[str, str]] = [] for aweme in aweme_list: if not isinstance(aweme, dict): continue video = aweme.get("video") or {} url_list = extract_video_url_list(video) if not url_list: continue video_id = str(aweme.get("aweme_id") or "").strip() if not video_id: continue title = str(aweme.get("desc") or "").strip() or "untitled" author = aweme.get("author") or {} author_name = str(author.get("nickname") or "").strip() or "unknown" author_id = str(author.get("uid") or "").strip() or "unknown" items.append( { "title": title, "video_id": video_id, "video_url": choose_video_url(url_list), "author_name": author_name, "author_id": author_id, } ) return items def parse_single_aweme_item(body: Any) -> dict[str, str]: if not isinstance(body, dict): raise ValueError("接口响应不是字典,无法解析。") if isinstance(body.get("aweme_detail"), dict): items = parse_aweme_items({"aweme_list": [body["aweme_detail"]]}) if items: return items[0] items = parse_aweme_items(body) if items: return items[0] raise ValueError("接口响应中缺少可下载的单视频数据。") def parse_search_items(body: Any) -> list[dict[str, str]]: if not isinstance(body, dict): raise ValueError("接口响应不是字典,无法解析。") data = body.get("data") if not isinstance(data, list): raise ValueError("搜索接口响应中缺少 data。") aweme_list = [] for entry in data: if not isinstance(entry, dict): continue aweme_info = entry.get("aweme_info") if isinstance(aweme_info, dict): aweme_list.append(aweme_info) return parse_aweme_items({"aweme_list": aweme_list}) def build_headers(referer: str) -> dict[str, str]: return { "referer": referer, "user-agent": ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/135.0.0.0 Safari/537.36" ), } def import_runtime_dependencies() -> tuple[Any, Any, Any]: try: import requests except ModuleNotFoundError as exc: raise SystemExit( "缺少 requests,请先执行: python3 -m pip install requests" ) from exc try: from DrissionPage import ChromiumPage from DrissionPage import ChromiumOptions except ModuleNotFoundError as exc: raise SystemExit( "缺少 DrissionPage,请先执行: python3 -m pip install DrissionPage" ) from exc return requests, ChromiumPage, ChromiumOptions def create_page(chromium_page_cls: Any, chromium_options_cls: Any, browser_port: int | None) -> Any: browser_address = build_browser_address(browser_port) if browser_address is None: return chromium_page_cls() options = chromium_options_cls().set_address(browser_address).existing_only(True) return chromium_page_cls(options) def wait_for_aweme_packet(page: Any, timeout: int) -> Any | None: try: packet = page.listen.wait(timeout=timeout) return packet if packet else None except Exception as exc: print(f"[WARN] 等待接口数据超时或失败: {exc}") return None def scroll_to_next_page(page: Any) -> None: page.run_js("window.scrollTo(0, document.body.scrollHeight);") time.sleep(2) def create_human_scroll_plan( settings: ScrollSettings, random_module: Any = random, ) -> HumanScrollPlan: down_distance = random_module.randint(settings.min_scroll, settings.max_scroll) down_wait = random_module.uniform(settings.min_wait, settings.max_wait) settle_wait = random_module.uniform(settings.min_wait, settings.max_wait) reverse_distance = 0 reverse_wait = 0.0 if random_module.random() < settings.reverse_scroll_probability: reverse_distance = random_module.randint( settings.min_reverse_scroll, settings.max_reverse_scroll, ) reverse_wait = random_module.uniform(1.0, min(3.0, settings.max_wait)) return HumanScrollPlan( down_distance=down_distance, down_wait=down_wait, reverse_distance=reverse_distance, reverse_wait=reverse_wait, settle_wait=settle_wait, ) def run_scroll_step(page: Any, distance: int) -> bool: script = f""" const distance = {distance}; function findMainScrollContainer() {{ const preferredSelectors = ['.tKqwmYAX', '.route-scroll-container', '.semi-tabs-content']; for (const selector of preferredSelectors) {{ const el = document.querySelector(selector); if (el && el.scrollHeight > el.clientHeight + 20) {{ return el; }} }} const candidates = Array.from(document.querySelectorAll('*')) .filter((el) => {{ const rect = el.getBoundingClientRect(); return rect.width > 300 && rect.height > 200 && el.scrollHeight > el.clientHeight + 20; }}) .sort((a, b) => {{ const areaA = a.getBoundingClientRect().width * a.getBoundingClientRect().height; const areaB = b.getBoundingClientRect().width * b.getBoundingClientRect().height; return areaB - areaA; }}); return candidates[0] || null; }} const scrollTarget = findMainScrollContainer(); if (scrollTarget) {{ scrollTarget.scrollBy(0, distance); return true; }} return false; """ scrolled_container = bool(page.run_js(script)) if not scrolled_container: page.run_js(f"window.scrollBy(0, {distance});") return scrolled_container def run_human_scroll_sequence(page: Any, plan: HumanScrollPlan) -> None: run_scroll_step(page, plan.down_distance) print(f"[INFO] 向下滚动 {plan.down_distance}px,停留 {plan.down_wait:.1f}s") time.sleep(plan.down_wait) if plan.reverse_distance > 0: run_scroll_step(page, -plan.reverse_distance) print(f"[INFO] 小幅回滚 {plan.reverse_distance}px,停留 {plan.reverse_wait:.1f}s") time.sleep(plan.reverse_wait) forward_distance = plan.reverse_distance * 2 run_scroll_step(page, forward_distance) if plan.settle_wait > 0: print(f"[INFO] 继续停留 {plan.settle_wait:.1f}s") time.sleep(plan.settle_wait) def human_like_scroll(page: Any, settings: ScrollSettings | None = None) -> None: scroll_settings = settings or ScrollSettings() run_human_scroll_sequence(page, create_human_scroll_plan(scroll_settings)) def download_video( requests_module: Any, headers: dict[str, str], video_url: str, output_path: Path, ) -> None: response = requests_module.get(video_url, headers=headers, timeout=60) response.raise_for_status() output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_bytes(response.content) def collect_videos( user_url: str, max_pages: int, timeout: int, output_dir: Path, browser_port: int | None, auto_scroll: bool = False, ) -> int: requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() headers = build_headers(user_url) if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) page.listen.start(LISTEN_TARGET) print("[INFO] 正在打开抖音主页。若出现登录或验证码,请先在浏览器窗口里完成。") page.get(user_url) time.sleep(3) downloaded = 0 seen_ids: set[str] = set() for page_number in range(1, max_pages + 1): print(f"[INFO] 正在处理第 {page_number} 页") packet = wait_for_aweme_packet(page, timeout=timeout) if packet is None: if auto_scroll: scroll_to_next_page(page) continue raise RuntimeError("当前页面未加载出可用作品数据,请先在浏览器中完成页面加载后重试。") try: payload = extract_aweme_payload(packet.response) items = parse_aweme_items(payload) except Exception as exc: print(f"[WARN] 解析接口数据失败: {exc}") if auto_scroll: scroll_to_next_page(page) continue raise RuntimeError("当前页面未加载出可用作品数据,请先在浏览器中完成页面加载后重试。") from exc if not items: if auto_scroll: print("[WARN] 这一页没有解析到视频。") else: raise RuntimeError("当前页面未加载出可用作品数据,请先在浏览器中完成页面加载后重试。") for item in items: if item["video_id"] in seen_ids: continue seen_ids.add(item["video_id"]) output_path = build_output_path( title=item["title"], video_id=item["video_id"], output_dir=output_dir, ) try: download_video( requests_module=requests_module, headers=headers, video_url=item["video_url"], output_path=output_path, ) except Exception as exc: print(f"[WARN] 下载失败 {item['video_id']}: {exc}") continue downloaded += 1 print(f"[OK] 已保存: {output_path}") if auto_scroll: scroll_to_next_page(page) continue break return downloaded def collect_recommendations( max_videos: int, timeout: int, output_dir: Path, browser_port: int | None, scroll_settings: ScrollSettings | None = None, ) -> int: requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() headers = build_headers("https://www.douyin.com/") if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) page.listen.start(RECOMMENDATION_LISTEN_TARGET) print("[INFO] 正在打开抖音推荐流。若出现登录或验证码,请先在浏览器窗口里完成。") page.get("https://www.douyin.com/") time.sleep(3) downloaded = 0 seen_ids: set[str] = set() consecutive_empty = 0 max_consecutive_empty = 6 settings = scroll_settings or ScrollSettings() started_at = time.monotonic() while downloaded < max_videos: if settings.max_runtime > 0 and time.monotonic() - started_at >= settings.max_runtime: print("[INFO] 已达到最大运行时间,结束抓取。") break packet = wait_for_aweme_packet(page, timeout=timeout) if packet is None: consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: print("[INFO] 连续多次未获取到新数据,结束抓取。") break human_like_scroll(page, settings=settings) continue try: payload = extract_aweme_payload(packet.response) items = parse_aweme_items(payload) except Exception as exc: print(f"[WARN] 解析接口数据失败: {exc}") consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: break human_like_scroll(page, settings=settings) continue if not items: consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: break human_like_scroll(page, settings=settings) continue consecutive_empty = 0 new_items_in_batch = 0 for item in items: if item["video_id"] in seen_ids: continue if downloaded >= max_videos: break seen_ids.add(item["video_id"]) output_path = build_output_path( title=item["title"], video_id=item["video_id"], output_dir=output_dir, author_name=item.get("author_name"), ) try: download_video( requests_module=requests_module, headers=headers, video_url=item["video_url"], output_path=output_path, ) except Exception as exc: print(f"[WARN] 下载失败 {item['video_id']}: {exc}") continue downloaded += 1 new_items_in_batch += 1 print(f"[OK] 已保存: {output_path}") if new_items_in_batch == 0: consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: break human_like_scroll(page, settings=settings) return downloaded def collect_search_results( keyword: str, max_videos: int, timeout: int, output_dir: Path, browser_port: int | None, scroll_settings: ScrollSettings | None = None, ) -> int: requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() search_url = build_search_page_url(keyword) headers = build_headers(search_url) if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) page.listen.start(SEARCH_LISTEN_TARGET) print(f"[INFO] 正在打开抖音搜索页:{keyword}。若出现登录或验证码,请先在浏览器窗口里完成。") page.get(search_url) time.sleep(3) downloaded = 0 seen_ids: set[str] = set() consecutive_empty = 0 max_consecutive_empty = 6 settings = scroll_settings or ScrollSettings() started_at = time.monotonic() while downloaded < max_videos: if settings.max_runtime > 0 and time.monotonic() - started_at >= settings.max_runtime: print("[INFO] 已达到最大运行时间,结束抓取。") break packet = wait_for_aweme_packet(page, timeout=timeout) if packet is None: consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: print("[INFO] 连续多次未获取到新搜索数据,结束抓取。") break human_like_scroll(page, settings=settings) continue try: payload = extract_aweme_payload(packet.response) items = parse_search_items(payload) except Exception as exc: print(f"[WARN] 解析搜索接口数据失败: {exc}") consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: break human_like_scroll(page, settings=settings) continue if not items: consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: break human_like_scroll(page, settings=settings) continue consecutive_empty = 0 new_items_in_batch = 0 for item in items: if item["video_id"] in seen_ids: continue if downloaded >= max_videos: break seen_ids.add(item["video_id"]) output_path = build_output_path( title=item["title"], video_id=item["video_id"], output_dir=output_dir, author_name=item.get("author_name"), ) try: download_video( requests_module=requests_module, headers=headers, video_url=item["video_url"], output_path=output_path, ) except Exception as exc: print(f"[WARN] 下载失败 {item['video_id']}: {exc}") continue downloaded += 1 new_items_in_batch += 1 print(f"[OK] 已保存: {output_path}") if new_items_in_batch == 0: consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: break human_like_scroll(page, settings=settings) return downloaded def collect_single_video( target: ResolvedTarget, timeout: int, output_dir: Path, browser_port: int | None, ) -> int: requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() if browser_port is not None: ensure_browser_debug_port_ready(browser_port) page = create_page(chromium_page_cls, chromium_options_cls, browser_port) page_url = target.value if target.aweme_id is not None and not is_video_url(page_url): page_url = build_video_page_url(target.aweme_id) headers = build_headers(page_url) page.listen.start(SINGLE_VIDEO_LISTEN_TARGET) print("[INFO] 正在打开抖音视频页。若出现登录或验证码,请先在浏览器窗口里完成。") page.get(page_url) time.sleep(3) packet = wait_for_aweme_packet(page, timeout=timeout) if packet is None: raise RuntimeError("当前视频页面未加载出可用视频数据,请先在浏览器中完成页面加载后重试。") try: payload = extract_aweme_payload(packet.response) item = parse_single_aweme_item(payload) except Exception as exc: raise RuntimeError("当前视频页面未加载出可用视频数据,请先在浏览器中完成页面加载后重试。") from exc output_path = build_output_path( title=item["title"], video_id=item["video_id"], output_dir=output_dir, ) download_video( requests_module=requests_module, headers=headers, video_url=item["video_url"], output_path=output_path, ) print(f"[OK] 已保存: {output_path}") return 1 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="附着抖音登录浏览器并下载当前页面或指定目标的视频") parser.add_argument( "target", nargs="?", default=None, help="可选:博主主页 URL、单视频 URL 或 aweme_id;不传则读取当前浏览器页面", ) parser.add_argument("--pages", type=int, default=1, help="创作者抓取最多处理多少页;默认 1") parser.add_argument("--timeout", type=int, default=10, help="单次等待接口响应秒数,默认 10") parser.add_argument( "--output-dir", default="video", help="视频输出目录,默认 video", ) parser.add_argument( "--browser-port", type=int, default=DEFAULT_BROWSER_PORT, help="附着到已启动 Chrome 的调试端口,默认 9223", ) parser.add_argument( "--max-videos", type=int, default=50, help="推荐流最大抓取数量,默认 50", ) parser.add_argument( "--search-keyword", default=None, help="搜索关键词;提供后抓取搜索结果页视频", ) parser.add_argument( "--scroll-mode", choices=["human"], default="human", help="推荐流滚动模式,默认 human", ) parser.add_argument( "--min-wait", type=float, default=2.0, help="推荐流每次滚动后的最短等待秒数,默认 2", ) parser.add_argument( "--max-wait", type=float, default=8.0, help="推荐流每次滚动后的最长等待秒数,默认 8", ) parser.add_argument( "--reverse-scroll-probability", type=float, default=0.2, help="推荐流小幅回滚概率,取值 0 到 1,默认 0.2", ) parser.add_argument( "--max-runtime", type=float, default=600.0, help="推荐流最大运行秒数,默认 600;设置为 0 表示不限制", ) return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) if args.pages <= 0: parser.error("--pages 必须大于 0") if args.timeout <= 0: parser.error("--timeout 必须大于 0") if args.browser_port is not None and args.browser_port <= 0: parser.error("--browser-port 必须大于 0") if args.max_videos <= 0: parser.error("--max-videos 必须大于 0") if args.min_wait < 0: parser.error("--min-wait 不能小于 0") if args.max_wait < args.min_wait: parser.error("--max-wait 必须大于或等于 --min-wait") if not 0 <= args.reverse_scroll_probability <= 1: parser.error("--reverse-scroll-probability 必须在 0 到 1 之间") if args.max_runtime < 0: parser.error("--max-runtime 不能小于 0") scroll_settings = ScrollSettings( mode=args.scroll_mode, min_wait=args.min_wait, max_wait=args.max_wait, reverse_scroll_probability=args.reverse_scroll_probability, max_runtime=args.max_runtime, ) try: if args.search_keyword: total = collect_search_results( keyword=args.search_keyword, max_videos=args.max_videos, timeout=args.timeout, output_dir=Path(args.output_dir), browser_port=args.browser_port, scroll_settings=scroll_settings, ) else: target = resolve_cli_target(args.target, browser_port=args.browser_port) if target.kind == "creator": total = collect_videos( user_url=target.value, max_pages=args.pages, timeout=args.timeout, output_dir=Path(args.output_dir), browser_port=args.browser_port, auto_scroll=args.pages > 1, ) elif target.kind == "recommendation": total = collect_recommendations( max_videos=args.max_videos, timeout=args.timeout, output_dir=Path(args.output_dir), browser_port=args.browser_port, scroll_settings=scroll_settings, ) elif target.kind == "single-video": total = collect_single_video( target=target, timeout=args.timeout, output_dir=Path(args.output_dir), browser_port=args.browser_port, ) else: raise RuntimeError(f"不支持的目标类型: {target.kind}") except RuntimeError as exc: print(f"[ERROR] {exc}") return 1 except KeyboardInterrupt: print("\n[INFO] 用户中断。") return 130 print(f"[INFO] 处理结束,共下载 {total} 个视频。") return 0 if __name__ == "__main__": sys.exit(main())