613 lines
20 KiB
Python
613 lines
20 KiB
Python
"""
|
||
使用 DrissionPage 监听抖音作品列表接口,并批量下载视频。
|
||
|
||
运行示例:
|
||
python3 Douyin.py "https://www.douyin.com/user/xxx?from_tab_name=main"
|
||
|
||
依赖:
|
||
pip install requests DrissionPage
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
import socket
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
DEFAULT_USER_URL = (
|
||
"https://www.douyin.com/user/"
|
||
"MS4wLjABAAAAx7--dRYA0mPwhwvxNJ-35i6sB8d1Kv4Sj1WmugquqiHK19QYlB18Ikx6cECT1RVO"
|
||
"?from_tab_name=main"
|
||
)
|
||
DEFAULT_BROWSER_PORT = 9223
|
||
LISTEN_TARGET = "web/aweme/post/"
|
||
RECOMMENDATION_LISTEN_TARGET = "web/module/feed/"
|
||
SINGLE_VIDEO_LISTEN_TARGET = "web/aweme/detail/"
|
||
INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]')
|
||
RECOMMENDATION_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/?(?:\?.*)?$")
|
||
CREATOR_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/user/[^/?#]+(?:\?.*)?$")
|
||
VIDEO_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/video/(?P<aweme_id>\d+)(?:[/?#].*)?$")
|
||
AWEME_ID_PATTERN = re.compile(r"^\d{5,}$")
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class ResolvedTarget:
|
||
kind: str
|
||
value: str
|
||
source: str
|
||
aweme_id: str | None = None
|
||
|
||
|
||
def sanitize_filename(value: str, fallback: str = "untitled") -> str:
|
||
cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._")
|
||
return cleaned or fallback
|
||
|
||
|
||
def is_recommendation_url(value: str) -> bool:
|
||
return bool(RECOMMENDATION_URL_PATTERN.match(value.strip()))
|
||
|
||
|
||
def is_creator_url(value: str) -> bool:
|
||
return bool(CREATOR_URL_PATTERN.match(value.strip()))
|
||
|
||
|
||
def is_video_url(value: str) -> bool:
|
||
return bool(VIDEO_URL_PATTERN.match(value.strip()))
|
||
|
||
|
||
def is_aweme_id(value: str) -> bool:
|
||
return bool(AWEME_ID_PATTERN.match(value.strip()))
|
||
|
||
|
||
def extract_aweme_id_from_video_url(value: str) -> str:
|
||
match = VIDEO_URL_PATTERN.match(value.strip())
|
||
if match is None:
|
||
raise ValueError("不是合法的抖音视频 URL。")
|
||
return match.group("aweme_id")
|
||
|
||
|
||
def build_video_page_url(aweme_id: str) -> str:
|
||
return f"https://www.douyin.com/video/{aweme_id}"
|
||
|
||
|
||
def parse_target_input(value: str, source: str) -> ResolvedTarget:
|
||
normalized = value.strip()
|
||
if is_recommendation_url(normalized):
|
||
return ResolvedTarget(kind="recommendation", value=normalized, source=source)
|
||
if is_creator_url(normalized):
|
||
return ResolvedTarget(kind="creator", value=normalized, source=source)
|
||
if is_video_url(normalized):
|
||
return ResolvedTarget(
|
||
kind="single-video",
|
||
value=normalized,
|
||
source=source,
|
||
aweme_id=extract_aweme_id_from_video_url(normalized),
|
||
)
|
||
if is_aweme_id(normalized):
|
||
return ResolvedTarget(
|
||
kind="single-video",
|
||
value=normalized,
|
||
source=source,
|
||
aweme_id=normalized,
|
||
)
|
||
raise ValueError(f"不支持的目标: {value}")
|
||
|
||
|
||
def get_active_page_url(page: Any) -> str:
|
||
return str(getattr(page, "url", "") or "").strip()
|
||
|
||
|
||
def resolve_target(page: Any, cli_target: str | None) -> ResolvedTarget:
|
||
if cli_target:
|
||
try:
|
||
return parse_target_input(cli_target, source="manual")
|
||
except ValueError as exc:
|
||
raise RuntimeError(str(exc)) from exc
|
||
|
||
current_url = get_active_page_url(page)
|
||
try:
|
||
return parse_target_input(current_url, source="current-page")
|
||
except ValueError as exc:
|
||
raise RuntimeError(
|
||
"当前页面不是受支持的抖音博主页或单视频页,请切到目标页面后重试,或手动传入链接或 `aweme_id`。"
|
||
) from exc
|
||
|
||
|
||
def resolve_cli_target(cli_target: str | None, browser_port: int | None) -> ResolvedTarget:
|
||
if cli_target:
|
||
return parse_target_input(cli_target, source="manual")
|
||
|
||
_, chromium_page_cls, chromium_options_cls = import_runtime_dependencies()
|
||
if browser_port is not None:
|
||
ensure_browser_debug_port_ready(browser_port)
|
||
page = create_page(chromium_page_cls, chromium_options_cls, browser_port)
|
||
return resolve_target(page=page, cli_target=None)
|
||
|
||
|
||
def choose_video_url(url_list: list[str]) -> str:
|
||
for url in url_list:
|
||
if "douyinvod.com" in url:
|
||
return url
|
||
if url_list:
|
||
return url_list[0]
|
||
raise ValueError("url_list 为空,无法选择视频地址。")
|
||
|
||
|
||
def build_output_path(
|
||
title: str,
|
||
video_id: str,
|
||
output_dir: Path = Path("video"),
|
||
author_name: str | None = None,
|
||
) -> Path:
|
||
safe_title = sanitize_filename(title, fallback="untitled")
|
||
if author_name:
|
||
safe_author = sanitize_filename(author_name, fallback="unknown")
|
||
filename = f"[{safe_author}]{safe_title}-{video_id}.mp4"
|
||
else:
|
||
filename = f"{safe_title}-{video_id}.mp4"
|
||
return output_dir / filename
|
||
|
||
|
||
def build_browser_address(browser_port: int | None) -> str | None:
|
||
if browser_port is None:
|
||
return None
|
||
return f"127.0.0.1:{browser_port}"
|
||
|
||
|
||
def ensure_browser_debug_port_ready(browser_port: int) -> None:
|
||
try:
|
||
with socket.create_connection(("127.0.0.1", browser_port), timeout=2):
|
||
return
|
||
except OSError as exc:
|
||
raise RuntimeError(
|
||
"无法连接到已启动的 Chrome 调试端口。"
|
||
f"请先运行 `./.venv/bin/python login_douyin.py --browser-port {browser_port}`,"
|
||
"并确认 Chrome 仍在运行且端口一致。"
|
||
) from exc
|
||
|
||
|
||
def extract_aweme_payload(response: Any) -> dict[str, Any]:
|
||
body = getattr(response, "body", None)
|
||
if isinstance(body, dict):
|
||
return body
|
||
|
||
raw_body = getattr(response, "raw_body", None)
|
||
if isinstance(raw_body, str) and raw_body.strip():
|
||
payload = json.loads(raw_body)
|
||
if isinstance(payload, dict):
|
||
return payload
|
||
|
||
raise ValueError("响应体不是可解析的 JSON 字典。")
|
||
|
||
|
||
def parse_aweme_items(body: Any) -> list[dict[str, str]]:
|
||
if not isinstance(body, dict):
|
||
raise ValueError("接口响应不是字典,无法解析。")
|
||
|
||
aweme_list = body.get("aweme_list")
|
||
if not isinstance(aweme_list, list):
|
||
raise ValueError("接口响应中缺少 aweme_list。")
|
||
|
||
items: list[dict[str, str]] = []
|
||
for aweme in aweme_list:
|
||
if not isinstance(aweme, dict):
|
||
continue
|
||
|
||
video = aweme.get("video") or {}
|
||
play_addr = video.get("play_addr") or {}
|
||
url_list = play_addr.get("url_list") or []
|
||
if not url_list:
|
||
continue
|
||
|
||
video_id = str(aweme.get("aweme_id") or "").strip()
|
||
if not video_id:
|
||
continue
|
||
|
||
title = str(aweme.get("desc") or "").strip() or "untitled"
|
||
|
||
author = aweme.get("author") or {}
|
||
author_name = str(author.get("nickname") or "").strip() or "unknown"
|
||
author_id = str(author.get("uid") or "").strip() or "unknown"
|
||
|
||
items.append(
|
||
{
|
||
"title": title,
|
||
"video_id": video_id,
|
||
"video_url": choose_video_url([str(url) for url in url_list]),
|
||
"author_name": author_name,
|
||
"author_id": author_id,
|
||
}
|
||
)
|
||
|
||
return items
|
||
|
||
|
||
def parse_single_aweme_item(body: Any) -> dict[str, str]:
|
||
if not isinstance(body, dict):
|
||
raise ValueError("接口响应不是字典,无法解析。")
|
||
|
||
if isinstance(body.get("aweme_detail"), dict):
|
||
items = parse_aweme_items({"aweme_list": [body["aweme_detail"]]})
|
||
if items:
|
||
return items[0]
|
||
|
||
items = parse_aweme_items(body)
|
||
if items:
|
||
return items[0]
|
||
|
||
raise ValueError("接口响应中缺少可下载的单视频数据。")
|
||
|
||
|
||
def build_headers(referer: str) -> dict[str, str]:
|
||
return {
|
||
"referer": referer,
|
||
"user-agent": (
|
||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/135.0.0.0 Safari/537.36"
|
||
),
|
||
}
|
||
|
||
|
||
def import_runtime_dependencies() -> tuple[Any, Any, Any]:
|
||
try:
|
||
import requests
|
||
except ModuleNotFoundError as exc:
|
||
raise SystemExit(
|
||
"缺少 requests,请先执行: python3 -m pip install requests"
|
||
) from exc
|
||
|
||
try:
|
||
from DrissionPage import ChromiumPage
|
||
from DrissionPage import ChromiumOptions
|
||
except ModuleNotFoundError as exc:
|
||
raise SystemExit(
|
||
"缺少 DrissionPage,请先执行: python3 -m pip install DrissionPage"
|
||
) from exc
|
||
|
||
return requests, ChromiumPage, ChromiumOptions
|
||
|
||
|
||
def create_page(chromium_page_cls: Any, chromium_options_cls: Any, browser_port: int | None) -> Any:
|
||
browser_address = build_browser_address(browser_port)
|
||
if browser_address is None:
|
||
return chromium_page_cls()
|
||
|
||
options = chromium_options_cls().set_address(browser_address).existing_only(True)
|
||
return chromium_page_cls(options)
|
||
|
||
|
||
def wait_for_aweme_packet(page: Any, timeout: int) -> Any | None:
|
||
try:
|
||
return page.listen.wait(timeout=timeout)
|
||
except Exception as exc:
|
||
print(f"[WARN] 等待接口数据超时或失败: {exc}")
|
||
return None
|
||
|
||
|
||
def scroll_to_next_page(page: Any) -> None:
|
||
page.run_js("window.scrollTo(0, document.body.scrollHeight);")
|
||
time.sleep(2)
|
||
|
||
|
||
def download_video(
|
||
requests_module: Any,
|
||
headers: dict[str, str],
|
||
video_url: str,
|
||
output_path: Path,
|
||
) -> None:
|
||
response = requests_module.get(video_url, headers=headers, timeout=60)
|
||
response.raise_for_status()
|
||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
output_path.write_bytes(response.content)
|
||
|
||
|
||
def collect_videos(
|
||
user_url: str,
|
||
max_pages: int,
|
||
timeout: int,
|
||
output_dir: Path,
|
||
browser_port: int | None,
|
||
auto_scroll: bool = False,
|
||
) -> int:
|
||
requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies()
|
||
headers = build_headers(user_url)
|
||
if browser_port is not None:
|
||
ensure_browser_debug_port_ready(browser_port)
|
||
page = create_page(chromium_page_cls, chromium_options_cls, browser_port)
|
||
page.listen.start(LISTEN_TARGET)
|
||
|
||
print("[INFO] 正在打开抖音主页。若出现登录或验证码,请先在浏览器窗口里完成。")
|
||
page.get(user_url)
|
||
time.sleep(3)
|
||
|
||
downloaded = 0
|
||
seen_ids: set[str] = set()
|
||
|
||
for page_number in range(1, max_pages + 1):
|
||
print(f"[INFO] 正在处理第 {page_number} 页")
|
||
packet = wait_for_aweme_packet(page, timeout=timeout)
|
||
if packet is None:
|
||
if auto_scroll:
|
||
scroll_to_next_page(page)
|
||
continue
|
||
raise RuntimeError("当前页面未加载出可用作品数据,请先在浏览器中完成页面加载后重试。")
|
||
|
||
try:
|
||
payload = extract_aweme_payload(packet.response)
|
||
items = parse_aweme_items(payload)
|
||
except Exception as exc:
|
||
print(f"[WARN] 解析接口数据失败: {exc}")
|
||
if auto_scroll:
|
||
scroll_to_next_page(page)
|
||
continue
|
||
raise RuntimeError("当前页面未加载出可用作品数据,请先在浏览器中完成页面加载后重试。") from exc
|
||
|
||
if not items:
|
||
if auto_scroll:
|
||
print("[WARN] 这一页没有解析到视频。")
|
||
else:
|
||
raise RuntimeError("当前页面未加载出可用作品数据,请先在浏览器中完成页面加载后重试。")
|
||
|
||
for item in items:
|
||
if item["video_id"] in seen_ids:
|
||
continue
|
||
|
||
seen_ids.add(item["video_id"])
|
||
output_path = build_output_path(
|
||
title=item["title"],
|
||
video_id=item["video_id"],
|
||
output_dir=output_dir,
|
||
)
|
||
|
||
try:
|
||
download_video(
|
||
requests_module=requests_module,
|
||
headers=headers,
|
||
video_url=item["video_url"],
|
||
output_path=output_path,
|
||
)
|
||
except Exception as exc:
|
||
print(f"[WARN] 下载失败 {item['video_id']}: {exc}")
|
||
continue
|
||
|
||
downloaded += 1
|
||
print(f"[OK] 已保存: {output_path}")
|
||
|
||
if auto_scroll:
|
||
scroll_to_next_page(page)
|
||
continue
|
||
break
|
||
|
||
return downloaded
|
||
|
||
|
||
def collect_recommendations(
|
||
max_videos: int,
|
||
timeout: int,
|
||
output_dir: Path,
|
||
browser_port: int | None,
|
||
) -> int:
|
||
requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies()
|
||
headers = build_headers("https://www.douyin.com/")
|
||
if browser_port is not None:
|
||
ensure_browser_debug_port_ready(browser_port)
|
||
page = create_page(chromium_page_cls, chromium_options_cls, browser_port)
|
||
page.listen.start(RECOMMENDATION_LISTEN_TARGET)
|
||
|
||
print("[INFO] 正在打开抖音推荐流。若出现登录或验证码,请先在浏览器窗口里完成。")
|
||
page.get("https://www.douyin.com/")
|
||
time.sleep(3)
|
||
|
||
downloaded = 0
|
||
seen_ids: set[str] = set()
|
||
consecutive_empty = 0
|
||
max_consecutive_empty = 3
|
||
|
||
while downloaded < max_videos:
|
||
packet = wait_for_aweme_packet(page, timeout=timeout)
|
||
if packet is None:
|
||
consecutive_empty += 1
|
||
if consecutive_empty >= max_consecutive_empty:
|
||
print("[INFO] 连续多次未获取到新数据,结束抓取。")
|
||
break
|
||
scroll_to_next_page(page)
|
||
continue
|
||
|
||
try:
|
||
payload = extract_aweme_payload(packet.response)
|
||
items = parse_aweme_items(payload)
|
||
except Exception as exc:
|
||
print(f"[WARN] 解析接口数据失败: {exc}")
|
||
consecutive_empty += 1
|
||
if consecutive_empty >= max_consecutive_empty:
|
||
break
|
||
scroll_to_next_page(page)
|
||
continue
|
||
|
||
if not items:
|
||
consecutive_empty += 1
|
||
if consecutive_empty >= max_consecutive_empty:
|
||
break
|
||
scroll_to_next_page(page)
|
||
continue
|
||
|
||
consecutive_empty = 0
|
||
new_items_in_batch = 0
|
||
|
||
for item in items:
|
||
if item["video_id"] in seen_ids:
|
||
continue
|
||
|
||
if downloaded >= max_videos:
|
||
break
|
||
|
||
seen_ids.add(item["video_id"])
|
||
output_path = build_output_path(
|
||
title=item["title"],
|
||
video_id=item["video_id"],
|
||
output_dir=output_dir,
|
||
author_name=item.get("author_name"),
|
||
)
|
||
|
||
try:
|
||
download_video(
|
||
requests_module=requests_module,
|
||
headers=headers,
|
||
video_url=item["video_url"],
|
||
output_path=output_path,
|
||
)
|
||
except Exception as exc:
|
||
print(f"[WARN] 下载失败 {item['video_id']}: {exc}")
|
||
continue
|
||
|
||
downloaded += 1
|
||
new_items_in_batch += 1
|
||
print(f"[OK] 已保存: {output_path}")
|
||
|
||
if new_items_in_batch == 0:
|
||
consecutive_empty += 1
|
||
if consecutive_empty >= max_consecutive_empty:
|
||
break
|
||
|
||
scroll_to_next_page(page)
|
||
|
||
return downloaded
|
||
|
||
|
||
def collect_single_video(
|
||
target: ResolvedTarget,
|
||
timeout: int,
|
||
output_dir: Path,
|
||
browser_port: int | None,
|
||
) -> int:
|
||
requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies()
|
||
if browser_port is not None:
|
||
ensure_browser_debug_port_ready(browser_port)
|
||
page = create_page(chromium_page_cls, chromium_options_cls, browser_port)
|
||
|
||
page_url = target.value
|
||
if target.aweme_id is not None and not is_video_url(page_url):
|
||
page_url = build_video_page_url(target.aweme_id)
|
||
|
||
headers = build_headers(page_url)
|
||
page.listen.start(SINGLE_VIDEO_LISTEN_TARGET)
|
||
print("[INFO] 正在打开抖音视频页。若出现登录或验证码,请先在浏览器窗口里完成。")
|
||
page.get(page_url)
|
||
time.sleep(3)
|
||
|
||
packet = wait_for_aweme_packet(page, timeout=timeout)
|
||
if packet is None:
|
||
raise RuntimeError("当前视频页面未加载出可用视频数据,请先在浏览器中完成页面加载后重试。")
|
||
|
||
try:
|
||
payload = extract_aweme_payload(packet.response)
|
||
item = parse_single_aweme_item(payload)
|
||
except Exception as exc:
|
||
raise RuntimeError("当前视频页面未加载出可用视频数据,请先在浏览器中完成页面加载后重试。") from exc
|
||
|
||
output_path = build_output_path(
|
||
title=item["title"],
|
||
video_id=item["video_id"],
|
||
output_dir=output_dir,
|
||
)
|
||
download_video(
|
||
requests_module=requests_module,
|
||
headers=headers,
|
||
video_url=item["video_url"],
|
||
output_path=output_path,
|
||
)
|
||
print(f"[OK] 已保存: {output_path}")
|
||
return 1
|
||
|
||
|
||
def build_parser() -> argparse.ArgumentParser:
|
||
parser = argparse.ArgumentParser(description="附着抖音登录浏览器并下载当前页面或指定目标的视频")
|
||
parser.add_argument(
|
||
"target",
|
||
nargs="?",
|
||
default=None,
|
||
help="可选:博主主页 URL、单视频 URL 或 aweme_id;不传则读取当前浏览器页面",
|
||
)
|
||
parser.add_argument("--pages", type=int, default=1, help="创作者抓取最多处理多少页;默认 1")
|
||
parser.add_argument("--timeout", type=int, default=10, help="单次等待接口响应秒数,默认 10")
|
||
parser.add_argument(
|
||
"--output-dir",
|
||
default="video",
|
||
help="视频输出目录,默认 video",
|
||
)
|
||
parser.add_argument(
|
||
"--browser-port",
|
||
type=int,
|
||
default=DEFAULT_BROWSER_PORT,
|
||
help="附着到已启动 Chrome 的调试端口,默认 9223",
|
||
)
|
||
parser.add_argument(
|
||
"--max-videos",
|
||
type=int,
|
||
default=50,
|
||
help="推荐流最大抓取数量,默认 50",
|
||
)
|
||
return parser
|
||
|
||
|
||
def main(argv: list[str] | None = None) -> int:
|
||
parser = build_parser()
|
||
args = parser.parse_args(argv)
|
||
|
||
if args.pages <= 0:
|
||
parser.error("--pages 必须大于 0")
|
||
if args.timeout <= 0:
|
||
parser.error("--timeout 必须大于 0")
|
||
if args.browser_port is not None and args.browser_port <= 0:
|
||
parser.error("--browser-port 必须大于 0")
|
||
if args.max_videos <= 0:
|
||
parser.error("--max-videos 必须大于 0")
|
||
|
||
try:
|
||
target = resolve_cli_target(args.target, browser_port=args.browser_port)
|
||
if target.kind == "creator":
|
||
total = collect_videos(
|
||
user_url=target.value,
|
||
max_pages=args.pages,
|
||
timeout=args.timeout,
|
||
output_dir=Path(args.output_dir),
|
||
browser_port=args.browser_port,
|
||
auto_scroll=args.pages > 1,
|
||
)
|
||
elif target.kind == "recommendation":
|
||
total = collect_recommendations(
|
||
max_videos=args.max_videos,
|
||
timeout=args.timeout,
|
||
output_dir=Path(args.output_dir),
|
||
browser_port=args.browser_port,
|
||
)
|
||
elif target.kind == "single-video":
|
||
total = collect_single_video(
|
||
target=target,
|
||
timeout=args.timeout,
|
||
output_dir=Path(args.output_dir),
|
||
browser_port=args.browser_port,
|
||
)
|
||
else:
|
||
raise RuntimeError(f"不支持的目标类型: {target.kind}")
|
||
except RuntimeError as exc:
|
||
print(f"[ERROR] {exc}")
|
||
return 1
|
||
except KeyboardInterrupt:
|
||
print("\n[INFO] 用户中断。")
|
||
return 130
|
||
|
||
print(f"[INFO] 处理结束,共下载 {total} 个视频。")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|