import importlib import io import unittest from contextlib import redirect_stdout from unittest import mock class FakeResponse: def __init__(self, body, raw_body): self.body = body self.raw_body = raw_body class FakePage: def __init__(self, url: str): self.url = url class FakePacketResponse: def __init__(self, body): self.body = body self.raw_body = "" class FakePacket: def __init__(self, body): self.response = FakePacketResponse(body) class FakeListener: def __init__(self, packet): self.packet = packet self.started_targets = [] self.call_count = 0 def start(self, target): self.started_targets.append(target) def wait(self, timeout): self.call_count += 1 if self.call_count == 1: return self.packet return None class FakeRuntimePage: def __init__(self, url: str, packet): self.url = url self.listen = FakeListener(packet) self.visited_urls = [] def get(self, url): self.visited_urls.append(url) self.url = url def run_js(self, script): # Allow both old scroll_to_next_page and new human_like_scroll if "window.scrollTo" in script or "window.scrollBy" in script: return raise AssertionError(f"unexpected scroll script: {script}") class FakeScrollPage: def __init__(self): self.scripts = [] def run_js(self, script): self.scripts.append(script) class DouyinModuleTests(unittest.TestCase): def test_module_can_import_without_optional_runtime_dependencies(self) -> None: module = importlib.import_module("Douyin") self.assertIsNotNone(module) def test_sanitize_filename_removes_invalid_characters(self) -> None: module = importlib.import_module("Douyin") self.assertEqual( module.sanitize_filename('a/b:c*?d"eg|h\n'), "a_b_c__d_e_f_g_h", ) def test_choose_video_url_prefers_douyinvod_link(self) -> None: module = importlib.import_module("Douyin") urls = [ "https://www.douyin.com/aweme/v1/play/?video_id=123", "https://v11-weba.douyinvod.com/example/video.mp4", "https://v26-web.douyinvod.com/example/video.mp4", ] self.assertEqual( module.choose_video_url(urls), "https://v11-weba.douyinvod.com/example/video.mp4", ) def test_build_output_path_uses_video_directory(self) -> None: module = importlib.import_module("Douyin") output_path = module.build_output_path("测试标题", "123456") self.assertEqual(output_path.as_posix(), "video/测试标题-123456.mp4") def test_build_output_path_with_author_uses_bracket_format(self) -> None: module = importlib.import_module("Douyin") output_path = module.build_output_path( title="测试标题", video_id="123456", author_name="测试博主" ) self.assertEqual(output_path.as_posix(), "video/[测试博主]测试标题-123456.mp4") def test_build_output_path_limits_long_filename(self) -> None: module = importlib.import_module("Douyin") output_path = module.build_output_path( title="超长标题" * 100, video_id="7619989983668240802", author_name="超长博主名" * 20, ) self.assertLessEqual(len(output_path.name.encode("utf-8")), 240) self.assertTrue(output_path.name.endswith("-7619989983668240802.mp4")) def test_extract_aweme_payload_uses_dict_body(self) -> None: module = importlib.import_module("Douyin") response = FakeResponse({"aweme_list": []}, "") self.assertEqual(module.extract_aweme_payload(response), {"aweme_list": []}) def test_extract_aweme_payload_falls_back_to_raw_json(self) -> None: module = importlib.import_module("Douyin") response = FakeResponse("", '{"aweme_list": [{"aweme_id": "1"}]}') self.assertEqual( module.extract_aweme_payload(response), {"aweme_list": [{"aweme_id": "1"}]}, ) def test_wait_for_aweme_packet_treats_false_listener_result_as_missing(self) -> None: module = importlib.import_module("Douyin") page = mock.MagicMock() page.listen.wait.return_value = False self.assertIsNone(module.wait_for_aweme_packet(page, timeout=10)) def test_build_browser_address_from_port(self) -> None: module = importlib.import_module("Douyin") self.assertEqual(module.build_browser_address(9223), "127.0.0.1:9223") self.assertIsNone(module.build_browser_address(None)) def test_default_scroll_settings_uses_human_mode(self) -> None: module = importlib.import_module("Douyin") settings = module.ScrollSettings() self.assertEqual(settings.mode, "human") self.assertEqual(settings.min_wait, 2.0) self.assertEqual(settings.max_wait, 8.0) self.assertEqual(settings.reverse_scroll_probability, 0.2) def test_create_human_scroll_plan_uses_configured_ranges(self) -> None: module = importlib.import_module("Douyin") settings = module.ScrollSettings( min_wait=2.0, max_wait=4.0, min_scroll=300, max_scroll=900, reverse_scroll_probability=0.0, ) plan = module.create_human_scroll_plan(settings, random_module=module.random.Random(7)) self.assertGreaterEqual(plan.down_distance, 300) self.assertLessEqual(plan.down_distance, 900) self.assertGreaterEqual(plan.down_wait, 2.0) self.assertLessEqual(plan.down_wait, 4.0) self.assertEqual(plan.reverse_distance, 0) def test_create_human_scroll_plan_can_include_reverse_scroll(self) -> None: module = importlib.import_module("Douyin") settings = module.ScrollSettings(reverse_scroll_probability=1.0) plan = module.create_human_scroll_plan(settings, random_module=module.random.Random(3)) self.assertGreaterEqual(plan.reverse_distance, 80) self.assertLessEqual(plan.reverse_distance, 250) self.assertGreater(plan.reverse_wait, 0) def test_run_human_scroll_sequence_scrolls_down_and_optionally_back_up(self) -> None: module = importlib.import_module("Douyin") page = FakeScrollPage() plan = module.HumanScrollPlan( down_distance=500, down_wait=2.5, reverse_distance=120, reverse_wait=1.0, settle_wait=3.0, ) with mock.patch.object(module.time, "sleep") as mocked_sleep: module.run_human_scroll_sequence(page, plan) self.assertEqual( page.scripts, [ "window.scrollBy(0, 500);", "window.scrollBy(0, -120);", "window.scrollBy(0, 240);", ], ) mocked_sleep.assert_has_calls([mock.call(2.5), mock.call(1.0), mock.call(3.0)]) def test_ensure_browser_debug_port_ready_accepts_open_port(self) -> None: module = importlib.import_module("Douyin") connection = mock.MagicMock() connection.__enter__.return_value = connection connection.__exit__.return_value = False with mock.patch.object(module.socket, "create_connection", return_value=connection) as mocked_connect: module.ensure_browser_debug_port_ready(9223) mocked_connect.assert_called_once() def test_ensure_browser_debug_port_ready_rejects_closed_port(self) -> None: module = importlib.import_module("Douyin") with mock.patch.object(module.socket, "create_connection", side_effect=OSError("boom")): with self.assertRaisesRegex(RuntimeError, "login_douyin.py"): module.ensure_browser_debug_port_ready(9223) def test_is_recommendation_url_accepts_douyin_homepage(self) -> None: module = importlib.import_module("Douyin") self.assertTrue(module.is_recommendation_url("https://www.douyin.com/")) self.assertTrue(module.is_recommendation_url("https://www.douyin.com")) self.assertTrue(module.is_recommendation_url("https://www.douyin.com/?from=web")) self.assertFalse(module.is_recommendation_url("https://www.douyin.com/user/xxx")) self.assertFalse(module.is_recommendation_url("https://www.douyin.com/video/123")) def test_is_creator_url_accepts_supported_douyin_creator_url(self) -> None: module = importlib.import_module("Douyin") self.assertTrue( module.is_creator_url( "https://www.douyin.com/user/MS4wLjABAAAAexample?from_tab_name=main" ) ) self.assertFalse(module.is_creator_url("https://www.douyin.com/video/7619989983668240802")) def test_is_video_url_accepts_supported_douyin_video_url(self) -> None: module = importlib.import_module("Douyin") self.assertTrue(module.is_video_url("https://www.douyin.com/video/7619989983668240802")) self.assertFalse( module.is_video_url("https://www.douyin.com/user/MS4wLjABAAAAexample?from_tab_name=main") ) def test_is_aweme_id_accepts_numeric_identifier(self) -> None: module = importlib.import_module("Douyin") self.assertTrue(module.is_aweme_id("7619989983668240802")) self.assertFalse(module.is_aweme_id("not-an-aweme-id")) def test_parse_target_input_classifies_recommendation_url(self) -> None: module = importlib.import_module("Douyin") target = module.parse_target_input("https://www.douyin.com/", source="manual") self.assertEqual(target.kind, "recommendation") self.assertEqual(target.value, "https://www.douyin.com/") self.assertEqual(target.source, "manual") def test_parse_target_input_classifies_creator_url(self) -> None: module = importlib.import_module("Douyin") target = module.parse_target_input( "https://www.douyin.com/user/MS4wLjABAAAAexample?from_tab_name=main", source="manual", ) self.assertEqual(target.kind, "creator") self.assertEqual( target.value, "https://www.douyin.com/user/MS4wLjABAAAAexample?from_tab_name=main", ) self.assertEqual(target.source, "manual") def test_parse_target_input_classifies_video_url(self) -> None: module = importlib.import_module("Douyin") target = module.parse_target_input( "https://www.douyin.com/video/7619989983668240802", source="manual", ) self.assertEqual(target.kind, "single-video") self.assertEqual(target.aweme_id, "7619989983668240802") self.assertEqual(target.source, "manual") def test_parse_target_input_classifies_aweme_id(self) -> None: module = importlib.import_module("Douyin") target = module.parse_target_input("7619989983668240802", source="manual") self.assertEqual(target.kind, "single-video") self.assertEqual(target.value, "7619989983668240802") self.assertEqual(target.aweme_id, "7619989983668240802") def test_resolve_target_uses_current_page_when_cli_target_is_absent(self) -> None: module = importlib.import_module("Douyin") target = module.resolve_target( page=FakePage("https://www.douyin.com/user/MS4wLjABAAAAexample?from_tab_name=main"), cli_target=None, ) self.assertEqual(target.kind, "creator") self.assertEqual(target.source, "current-page") def test_resolve_target_raises_readable_error_when_current_page_is_unsupported(self) -> None: module = importlib.import_module("Douyin") with self.assertRaisesRegex(RuntimeError, "手动传入链接或 `aweme_id`"): module.resolve_target(page=FakePage("https://www.example.com/"), cli_target=None) def test_resolve_target_raises_readable_error_when_manual_input_is_unsupported(self) -> None: module = importlib.import_module("Douyin") with self.assertRaisesRegex(RuntimeError, "不支持的目标"): module.resolve_target(page=FakePage("https://www.douyin.com/video/7619989983668240802"), cli_target="abc") def test_collect_videos_does_not_auto_scroll_when_processing_current_page_only(self) -> None: module = importlib.import_module("Douyin") packet = FakePacket( { "aweme_list": [ { "aweme_id": "7619989983668240802", "desc": "当前页视频", "video": { "play_addr": { "url_list": ["https://v26-web.douyinvod.com/example/video.mp4"] } }, } ] } ) page = FakeRuntimePage( "https://www.douyin.com/user/MS4wLjABAAAAexample?from_tab_name=main", packet, ) with mock.patch.object(module, "import_runtime_dependencies", return_value=(object(), object(), object())): with mock.patch.object(module, "create_page", return_value=page): with mock.patch.object(module, "download_video"): with mock.patch.object(module, "scroll_to_next_page") as mocked_scroll: downloaded = module.collect_videos( user_url="https://www.douyin.com/user/MS4wLjABAAAAexample?from_tab_name=main", max_pages=1, timeout=10, output_dir=module.Path("video"), browser_port=None, ) self.assertEqual(downloaded, 1) mocked_scroll.assert_not_called() def test_collect_videos_raises_readable_error_when_no_aweme_items_are_available(self) -> None: module = importlib.import_module("Douyin") packet = FakePacket({"aweme_list": []}) page = FakeRuntimePage( "https://www.douyin.com/user/MS4wLjABAAAAexample?from_tab_name=main", packet, ) with mock.patch.object(module, "import_runtime_dependencies", return_value=(object(), object(), object())): with mock.patch.object(module, "create_page", return_value=page): with mock.patch.object(module, "download_video"): with self.assertRaisesRegex(RuntimeError, "当前页面未加载出可用作品数据"): module.collect_videos( user_url="https://www.douyin.com/user/MS4wLjABAAAAexample?from_tab_name=main", max_pages=1, timeout=10, output_dir=module.Path("video"), browser_port=None, ) def test_parse_aweme_items_extracts_author_info(self) -> None: module = importlib.import_module("Douyin") payload = { "aweme_list": [ { "aweme_id": "7619989983668240802", "desc": "测试视频", "author": { "nickname": "测试博主", "uid": "123456789" }, "video": { "play_addr": { "url_list": ["https://v26-web.douyinvod.com/example/video.mp4"] } }, } ] } items = module.parse_aweme_items(payload) self.assertEqual(len(items), 1) self.assertEqual(items[0]["author_name"], "测试博主") self.assertEqual(items[0]["author_id"], "123456789") def test_parse_aweme_items_uses_play_addr_h264_when_play_addr_is_missing(self) -> None: module = importlib.import_module("Douyin") payload = { "aweme_list": [ { "aweme_id": "7619989983668240802", "desc": "推荐视频", "video": { "play_addr_h264": { "url_list": ["https://v26-web.douyinvod.com/example/h264.mp4"] } }, } ] } items = module.parse_aweme_items(payload) self.assertEqual(len(items), 1) self.assertEqual(items[0]["video_url"], "https://v26-web.douyinvod.com/example/h264.mp4") def test_parse_aweme_items_uses_bit_rate_play_addr_when_top_level_addresses_are_missing(self) -> None: module = importlib.import_module("Douyin") payload = { "aweme_list": [ { "aweme_id": "7619989983668240802", "desc": "推荐视频", "video": { "bit_rate": [ { "format": "mp4", "play_addr": { "url_list": ["https://v11-weba.douyinvod.com/example/bitrate.mp4"] }, } ] }, } ] } items = module.parse_aweme_items(payload) self.assertEqual(len(items), 1) self.assertEqual(items[0]["video_url"], "https://v11-weba.douyinvod.com/example/bitrate.mp4") def test_build_video_page_url_uses_aweme_id(self) -> None: module = importlib.import_module("Douyin") self.assertEqual( module.build_video_page_url("7619989983668240802"), "https://www.douyin.com/video/7619989983668240802", ) def test_collect_recommendations_downloads_videos_with_author_prefix(self) -> None: module = importlib.import_module("Douyin") packet = FakePacket( { "aweme_list": [ { "aweme_id": "7619989983668240802", "desc": "推荐视频1", "author": {"nickname": "博主A", "uid": "111"}, "video": { "play_addr": { "url_list": ["https://v26-web.douyinvod.com/example/video1.mp4"] } }, } ] } ) page = FakeRuntimePage("https://www.douyin.com/", packet) with mock.patch.object(module, "import_runtime_dependencies", return_value=(object(), object(), object())): with mock.patch.object(module, "create_page", return_value=page): with mock.patch.object(module, "download_video") as mocked_download: with mock.patch.object(module, "human_like_scroll"): downloaded = module.collect_recommendations( max_videos=50, timeout=10, output_dir=module.Path("video"), browser_port=None, ) self.assertEqual(downloaded, 1) # 验证文件名包含博主前缀 call_kwargs = mocked_download.call_args[1] self.assertIn("[博主A]", str(call_kwargs["output_path"])) def test_collect_single_video_downloads_exactly_one_file_for_video_url_target(self) -> None: module = importlib.import_module("Douyin") packet = FakePacket( { "aweme_detail": { "aweme_id": "7619989983668240802", "desc": "单视频页面", "video": { "play_addr": { "url_list": ["https://v26-web.douyinvod.com/example/single.mp4"] } }, } } ) page = FakeRuntimePage("https://www.douyin.com/video/7619989983668240802", packet) target = module.ResolvedTarget( kind="single-video", value="https://www.douyin.com/video/7619989983668240802", source="manual", aweme_id="7619989983668240802", ) with mock.patch.object(module, "import_runtime_dependencies", return_value=(object(), object(), object())): with mock.patch.object(module, "create_page", return_value=page): with mock.patch.object(module, "download_video") as mocked_download: downloaded = module.collect_single_video( target=target, timeout=10, output_dir=module.Path("video"), browser_port=None, ) self.assertEqual(downloaded, 1) self.assertEqual(page.visited_urls, ["https://www.douyin.com/video/7619989983668240802"]) mocked_download.assert_called_once() def test_collect_single_video_downloads_exactly_one_file_for_aweme_id_target(self) -> None: module = importlib.import_module("Douyin") packet = FakePacket( { "aweme_detail": { "aweme_id": "7619989983668240802", "desc": "单视频页面", "video": { "play_addr": { "url_list": ["https://v26-web.douyinvod.com/example/single.mp4"] } }, } } ) page = FakeRuntimePage("about:blank", packet) target = module.ResolvedTarget( kind="single-video", value="7619989983668240802", source="manual", aweme_id="7619989983668240802", ) with mock.patch.object(module, "import_runtime_dependencies", return_value=(object(), object(), object())): with mock.patch.object(module, "create_page", return_value=page): with mock.patch.object(module, "download_video") as mocked_download: downloaded = module.collect_single_video( target=target, timeout=10, output_dir=module.Path("video"), browser_port=None, ) self.assertEqual(downloaded, 1) self.assertEqual(page.visited_urls, ["https://www.douyin.com/video/7619989983668240802"]) mocked_download.assert_called_once() def test_build_parser_has_max_videos_argument(self) -> None: module = importlib.import_module("Douyin") args = module.build_parser().parse_args(["--max-videos", "30"]) self.assertEqual(args.max_videos, 30) def test_build_parser_has_human_scroll_arguments(self) -> None: module = importlib.import_module("Douyin") args = module.build_parser().parse_args( [ "--scroll-mode", "human", "--min-wait", "3", "--max-wait", "9", "--reverse-scroll-probability", "0.4", "--max-runtime", "600", ] ) self.assertEqual(args.scroll_mode, "human") self.assertEqual(args.min_wait, 3) self.assertEqual(args.max_wait, 9) self.assertEqual(args.reverse_scroll_probability, 0.4) self.assertEqual(args.max_runtime, 600) def test_build_parser_defaults_to_zero_argument_current_page_flow(self) -> None: module = importlib.import_module("Douyin") args = module.build_parser().parse_args([]) self.assertIsNone(args.target) self.assertEqual(args.browser_port, 9223) self.assertEqual(args.pages, 1) def test_resolve_cli_target_prefers_manual_target_without_attaching_browser(self) -> None: module = importlib.import_module("Douyin") with mock.patch.object(module, "import_runtime_dependencies") as mocked_imports: target = module.resolve_cli_target("7619989983668240802", browser_port=9223) self.assertEqual(target.kind, "single-video") self.assertEqual(target.aweme_id, "7619989983668240802") mocked_imports.assert_not_called() def test_main_dispatches_recommendation_flow_for_recommendation_url(self) -> None: module = importlib.import_module("Douyin") stdout = io.StringIO() recommendation_target = module.ResolvedTarget( kind="recommendation", value="https://www.douyin.com/", source="current-page", ) with redirect_stdout(stdout): with mock.patch.object(module, "resolve_cli_target", return_value=recommendation_target): with mock.patch.object(module, "collect_recommendations", return_value=5) as mocked_collect: exit_code = module.main([]) self.assertEqual(exit_code, 0) mocked_collect.assert_called_once_with( max_videos=50, timeout=10, output_dir=module.Path("video"), browser_port=9223, scroll_settings=module.ScrollSettings(), ) def test_main_without_target_dispatches_current_page_creator_flow(self) -> None: module = importlib.import_module("Douyin") stdout = io.StringIO() creator_target = module.ResolvedTarget( kind="creator", value="https://www.douyin.com/user/MS4wLjABAAAAexample?from_tab_name=main", source="current-page", ) with redirect_stdout(stdout): with mock.patch.object(module, "resolve_cli_target", return_value=creator_target): with mock.patch.object(module, "collect_videos", return_value=2) as mocked_collect: exit_code = module.main([]) self.assertEqual(exit_code, 0) mocked_collect.assert_called_once_with( user_url="https://www.douyin.com/user/MS4wLjABAAAAexample?from_tab_name=main", max_pages=1, timeout=10, output_dir=module.Path("video"), browser_port=9223, auto_scroll=False, ) self.assertIn("处理结束,共下载 2 个视频", stdout.getvalue()) def test_main_returns_fallback_hint_when_current_page_is_unsupported(self) -> None: module = importlib.import_module("Douyin") stdout = io.StringIO() with redirect_stdout(stdout): with mock.patch.object( module, "resolve_cli_target", side_effect=RuntimeError("请切到目标页面后重试,或手动传入链接或 `aweme_id`。"), ): exit_code = module.main([]) self.assertEqual(exit_code, 1) self.assertIn("手动传入链接或 `aweme_id`", stdout.getvalue()) if __name__ == "__main__": unittest.main()