diff --git a/Douyin.py b/Douyin.py index b7d77e9..5d3ce40 100644 --- a/Douyin.py +++ b/Douyin.py @@ -20,6 +20,7 @@ import time from dataclasses import dataclass from pathlib import Path from typing import Any +from urllib.parse import quote DEFAULT_USER_URL = ( "https://www.douyin.com/user/" @@ -30,6 +31,7 @@ DEFAULT_BROWSER_PORT = 9223 LISTEN_TARGET = "web/aweme/post/" RECOMMENDATION_LISTEN_TARGET = "aweme/v2/web/module/feed/" SINGLE_VIDEO_LISTEN_TARGET = "web/aweme/detail/" +SEARCH_LISTEN_TARGET = "aweme/v1/web/general/search/single" MAX_FILENAME_BYTES = 240 INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]') RECOMMENDATION_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/?(?:jingxuan)?(?:\?.*)?$") @@ -115,6 +117,10 @@ def build_video_page_url(aweme_id: str) -> str: return f"https://www.douyin.com/video/{aweme_id}" +def build_search_page_url(keyword: str) -> str: + return f"https://www.douyin.com/search/{quote(keyword)}?type=general" + + def parse_target_input(value: str, source: str) -> ResolvedTarget: normalized = value.strip() if is_recommendation_url(normalized): @@ -325,6 +331,25 @@ def parse_single_aweme_item(body: Any) -> dict[str, str]: raise ValueError("接口响应中缺少可下载的单视频数据。") +def parse_search_items(body: Any) -> list[dict[str, str]]: + if not isinstance(body, dict): + raise ValueError("接口响应不是字典,无法解析。") + + data = body.get("data") + if not isinstance(data, list): + raise ValueError("搜索接口响应中缺少 data。") + + aweme_list = [] + for entry in data: + if not isinstance(entry, dict): + continue + aweme_info = entry.get("aweme_info") + if isinstance(aweme_info, dict): + aweme_list.append(aweme_info) + + return parse_aweme_items({"aweme_list": aweme_list}) + + def build_headers(referer: str) -> dict[str, str]: return { "referer": referer, @@ -659,6 +684,108 @@ def collect_recommendations( return downloaded +def collect_search_results( + keyword: str, + max_videos: int, + timeout: int, + output_dir: Path, + browser_port: int | None, + scroll_settings: ScrollSettings | None = None, +) -> int: + requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() + search_url = build_search_page_url(keyword) + headers = build_headers(search_url) + if browser_port is not None: + ensure_browser_debug_port_ready(browser_port) + page = create_page(chromium_page_cls, chromium_options_cls, browser_port) + page.listen.start(SEARCH_LISTEN_TARGET) + + print(f"[INFO] 正在打开抖音搜索页:{keyword}。若出现登录或验证码,请先在浏览器窗口里完成。") + page.get(search_url) + time.sleep(3) + + downloaded = 0 + seen_ids: set[str] = set() + consecutive_empty = 0 + max_consecutive_empty = 6 + settings = scroll_settings or ScrollSettings() + started_at = time.monotonic() + + while downloaded < max_videos: + if settings.max_runtime > 0 and time.monotonic() - started_at >= settings.max_runtime: + print("[INFO] 已达到最大运行时间,结束抓取。") + break + + packet = wait_for_aweme_packet(page, timeout=timeout) + if packet is None: + consecutive_empty += 1 + if consecutive_empty >= max_consecutive_empty: + print("[INFO] 连续多次未获取到新搜索数据,结束抓取。") + break + human_like_scroll(page, settings=settings) + continue + + try: + payload = extract_aweme_payload(packet.response) + items = parse_search_items(payload) + except Exception as exc: + print(f"[WARN] 解析搜索接口数据失败: {exc}") + consecutive_empty += 1 + if consecutive_empty >= max_consecutive_empty: + break + human_like_scroll(page, settings=settings) + continue + + if not items: + consecutive_empty += 1 + if consecutive_empty >= max_consecutive_empty: + break + human_like_scroll(page, settings=settings) + continue + + consecutive_empty = 0 + new_items_in_batch = 0 + + for item in items: + if item["video_id"] in seen_ids: + continue + + if downloaded >= max_videos: + break + + seen_ids.add(item["video_id"]) + output_path = build_output_path( + title=item["title"], + video_id=item["video_id"], + output_dir=output_dir, + author_name=item.get("author_name"), + ) + + try: + download_video( + requests_module=requests_module, + headers=headers, + video_url=item["video_url"], + output_path=output_path, + ) + except Exception as exc: + print(f"[WARN] 下载失败 {item['video_id']}: {exc}") + continue + + downloaded += 1 + new_items_in_batch += 1 + print(f"[OK] 已保存: {output_path}") + + if new_items_in_batch == 0: + consecutive_empty += 1 + if consecutive_empty >= max_consecutive_empty: + break + + human_like_scroll(page, settings=settings) + + return downloaded + + def collect_single_video( target: ResolvedTarget, timeout: int, @@ -732,6 +859,11 @@ def build_parser() -> argparse.ArgumentParser: default=50, help="推荐流最大抓取数量,默认 50", ) + parser.add_argument( + "--search-keyword", + default=None, + help="搜索关键词;提供后抓取搜索结果页视频", + ) parser.add_argument( "--scroll-mode", choices=["human"], @@ -795,33 +927,43 @@ def main(argv: list[str] | None = None) -> int: ) try: - target = resolve_cli_target(args.target, browser_port=args.browser_port) - if target.kind == "creator": - total = collect_videos( - user_url=target.value, - max_pages=args.pages, - timeout=args.timeout, - output_dir=Path(args.output_dir), - browser_port=args.browser_port, - auto_scroll=args.pages > 1, - ) - elif target.kind == "recommendation": - total = collect_recommendations( + if args.search_keyword: + total = collect_search_results( + keyword=args.search_keyword, max_videos=args.max_videos, timeout=args.timeout, output_dir=Path(args.output_dir), browser_port=args.browser_port, scroll_settings=scroll_settings, ) - elif target.kind == "single-video": - total = collect_single_video( - target=target, - timeout=args.timeout, - output_dir=Path(args.output_dir), - browser_port=args.browser_port, - ) else: - raise RuntimeError(f"不支持的目标类型: {target.kind}") + target = resolve_cli_target(args.target, browser_port=args.browser_port) + if target.kind == "creator": + total = collect_videos( + user_url=target.value, + max_pages=args.pages, + timeout=args.timeout, + output_dir=Path(args.output_dir), + browser_port=args.browser_port, + auto_scroll=args.pages > 1, + ) + elif target.kind == "recommendation": + total = collect_recommendations( + max_videos=args.max_videos, + timeout=args.timeout, + output_dir=Path(args.output_dir), + browser_port=args.browser_port, + scroll_settings=scroll_settings, + ) + elif target.kind == "single-video": + total = collect_single_video( + target=target, + timeout=args.timeout, + output_dir=Path(args.output_dir), + browser_port=args.browser_port, + ) + else: + raise RuntimeError(f"不支持的目标类型: {target.kind}") except RuntimeError as exc: print(f"[ERROR] {exc}") return 1 diff --git a/test_douyin.py b/test_douyin.py index 15458b0..5b8755f 100644 --- a/test_douyin.py +++ b/test_douyin.py @@ -442,6 +442,38 @@ class DouyinModuleTests(unittest.TestCase): "https://www.douyin.com/video/7619989983668240802", ) + def test_build_search_page_url_encodes_keyword(self) -> None: + module = importlib.import_module("Douyin") + self.assertEqual( + module.build_search_page_url("猫咪"), + "https://www.douyin.com/search/%E7%8C%AB%E5%92%AA?type=general", + ) + + def test_parse_search_items_extracts_aweme_info(self) -> None: + module = importlib.import_module("Douyin") + payload = { + "data": [ + { + "type": 1, + "aweme_info": { + "aweme_id": "7319795133048769829", + "desc": "猫咪视频", + "author": {"nickname": "奶芙芙", "uid": "75478174642"}, + "video": { + "play_addr_lowbr": { + "url_list": ["https://v26-web.douyinvod.com/example/search.mp4"] + } + }, + }, + } + ] + } + items = module.parse_search_items(payload) + self.assertEqual(len(items), 1) + self.assertEqual(items[0]["video_id"], "7319795133048769829") + self.assertEqual(items[0]["author_name"], "奶芙芙") + self.assertEqual(items[0]["video_url"], "https://v26-web.douyinvod.com/example/search.mp4") + def test_collect_recommendations_downloads_videos_with_author_prefix(self) -> None: module = importlib.import_module("Douyin") packet = FakePacket( @@ -575,6 +607,27 @@ class DouyinModuleTests(unittest.TestCase): self.assertEqual(args.reverse_scroll_probability, 0.4) self.assertEqual(args.max_runtime, 600) + def test_build_parser_has_search_keyword_argument(self) -> None: + module = importlib.import_module("Douyin") + args = module.build_parser().parse_args(["--search-keyword", "猫咪"]) + self.assertEqual(args.search_keyword, "猫咪") + + def test_main_dispatches_search_flow_for_search_keyword(self) -> None: + module = importlib.import_module("Douyin") + stdout = io.StringIO() + with redirect_stdout(stdout): + with mock.patch.object(module, "collect_search_results", return_value=7) as mocked_collect: + exit_code = module.main(["--search-keyword", "猫咪"]) + self.assertEqual(exit_code, 0) + mocked_collect.assert_called_once_with( + keyword="猫咪", + max_videos=50, + timeout=10, + output_dir=module.Path("video"), + browser_port=9223, + scroll_settings=module.ScrollSettings(), + ) + def test_build_parser_defaults_to_zero_argument_current_page_flow(self) -> None: module = importlib.import_module("Douyin") args = module.build_parser().parse_args([])