From d0f6c5e5ab669b0da6b7ccd4780a25c60baf04d4 Mon Sep 17 00:00:00 2001 From: wangshaoqing Date: Tue, 26 May 2026 15:29:59 +0800 Subject: [PATCH] feat: add human-like recommendation scrolling --- Douyin.py | 170 +++++++++++++++++++++++++++++++++++++++++++++---- test_douyin.py | 103 +++++++++++++++++++++++++++++- 2 files changed, 258 insertions(+), 15 deletions(-) diff --git a/Douyin.py b/Douyin.py index da6a6ba..542105c 100644 --- a/Douyin.py +++ b/Douyin.py @@ -30,6 +30,7 @@ DEFAULT_BROWSER_PORT = 9223 LISTEN_TARGET = "web/aweme/post/" RECOMMENDATION_LISTEN_TARGET = "aweme/v2/web/module/feed/" SINGLE_VIDEO_LISTEN_TARGET = "web/aweme/detail/" +MAX_FILENAME_BYTES = 240 INVALID_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|\r\n\t]') RECOMMENDATION_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/?(?:jingxuan)?(?:\?.*)?$") CREATOR_URL_PATTERN = re.compile(r"^https?://www\.douyin\.com/user/[^/?#]+(?:\?.*)?$") @@ -45,11 +46,48 @@ class ResolvedTarget: aweme_id: str | None = None +@dataclass(frozen=True) +class ScrollSettings: + mode: str = "human" + min_wait: float = 2.0 + max_wait: float = 8.0 + reverse_scroll_probability: float = 0.2 + max_runtime: float = 600.0 + min_scroll: int = 300 + max_scroll: int = 900 + min_reverse_scroll: int = 80 + max_reverse_scroll: int = 250 + + +@dataclass(frozen=True) +class HumanScrollPlan: + down_distance: int + down_wait: float + reverse_distance: int = 0 + reverse_wait: float = 0.0 + settle_wait: float = 0.0 + + def sanitize_filename(value: str, fallback: str = "untitled") -> str: cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._") return cleaned or fallback +def truncate_utf8_bytes(value: str, max_bytes: int) -> str: + if len(value.encode("utf-8")) <= max_bytes: + return value + + result = "" + used = 0 + for character in value: + character_bytes = len(character.encode("utf-8")) + if used + character_bytes > max_bytes: + break + result += character + used += character_bytes + return result.rstrip(" ._") + + def is_recommendation_url(value: str) -> bool: return bool(RECOMMENDATION_URL_PATTERN.match(value.strip())) @@ -181,11 +219,20 @@ def build_output_path( author_name: str | None = None, ) -> Path: safe_title = sanitize_filename(title, fallback="untitled") + suffix = f"-{video_id}.mp4" if author_name: safe_author = sanitize_filename(author_name, fallback="unknown") - filename = f"[{safe_author}]{safe_title}-{video_id}.mp4" + prefix = f"[{safe_author}]" else: - filename = f"{safe_title}-{video_id}.mp4" + prefix = "" + + title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8")) + if title_budget < 1: + prefix_budget = MAX_FILENAME_BYTES - len(suffix.encode("utf-8")) - 1 + prefix = truncate_utf8_bytes(prefix, max(1, prefix_budget)) + title_budget = MAX_FILENAME_BYTES - len(prefix.encode("utf-8")) - len(suffix.encode("utf-8")) + + filename = f"{prefix}{truncate_utf8_bytes(safe_title, max(1, title_budget))}{suffix}" return output_dir / filename @@ -319,7 +366,8 @@ def create_page(chromium_page_cls: Any, chromium_options_cls: Any, browser_port: def wait_for_aweme_packet(page: Any, timeout: int) -> Any | None: try: - return page.listen.wait(timeout=timeout) + packet = page.listen.wait(timeout=timeout) + return packet if packet else None except Exception as exc: print(f"[WARN] 等待接口数据超时或失败: {exc}") return None @@ -330,12 +378,52 @@ def scroll_to_next_page(page: Any) -> None: time.sleep(2) -def human_like_scroll(page: Any) -> None: - """模拟人类滚动行为:随机滚动距离和随机停顿时间""" - scroll_distance = random.randint(300, 800) - page.run_js(f"window.scrollBy(0, {scroll_distance});") - sleep_time = random.uniform(1.5, 4.0) - time.sleep(sleep_time) +def create_human_scroll_plan( + settings: ScrollSettings, + random_module: Any = random, +) -> HumanScrollPlan: + down_distance = random_module.randint(settings.min_scroll, settings.max_scroll) + down_wait = random_module.uniform(settings.min_wait, settings.max_wait) + settle_wait = random_module.uniform(settings.min_wait, settings.max_wait) + + reverse_distance = 0 + reverse_wait = 0.0 + if random_module.random() < settings.reverse_scroll_probability: + reverse_distance = random_module.randint( + settings.min_reverse_scroll, + settings.max_reverse_scroll, + ) + reverse_wait = random_module.uniform(1.0, min(3.0, settings.max_wait)) + + return HumanScrollPlan( + down_distance=down_distance, + down_wait=down_wait, + reverse_distance=reverse_distance, + reverse_wait=reverse_wait, + settle_wait=settle_wait, + ) + + +def run_human_scroll_sequence(page: Any, plan: HumanScrollPlan) -> None: + page.run_js(f"window.scrollBy(0, {plan.down_distance});") + print(f"[INFO] 向下滚动 {plan.down_distance}px,停留 {plan.down_wait:.1f}s") + time.sleep(plan.down_wait) + + if plan.reverse_distance > 0: + page.run_js(f"window.scrollBy(0, -{plan.reverse_distance});") + print(f"[INFO] 小幅回滚 {plan.reverse_distance}px,停留 {plan.reverse_wait:.1f}s") + time.sleep(plan.reverse_wait) + forward_distance = plan.reverse_distance * 2 + page.run_js(f"window.scrollBy(0, {forward_distance});") + + if plan.settle_wait > 0: + print(f"[INFO] 继续停留 {plan.settle_wait:.1f}s") + time.sleep(plan.settle_wait) + + +def human_like_scroll(page: Any, settings: ScrollSettings | None = None) -> None: + scroll_settings = settings or ScrollSettings() + run_human_scroll_sequence(page, create_human_scroll_plan(scroll_settings)) def download_video( @@ -435,6 +523,7 @@ def collect_recommendations( timeout: int, output_dir: Path, browser_port: int | None, + scroll_settings: ScrollSettings | None = None, ) -> int: requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() headers = build_headers("https://www.douyin.com/") @@ -450,16 +539,22 @@ def collect_recommendations( downloaded = 0 seen_ids: set[str] = set() consecutive_empty = 0 - max_consecutive_empty = 3 + max_consecutive_empty = 6 + settings = scroll_settings or ScrollSettings() + started_at = time.monotonic() while downloaded < max_videos: + if settings.max_runtime > 0 and time.monotonic() - started_at >= settings.max_runtime: + print("[INFO] 已达到最大运行时间,结束抓取。") + break + packet = wait_for_aweme_packet(page, timeout=timeout) if packet is None: consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: print("[INFO] 连续多次未获取到新数据,结束抓取。") break - human_like_scroll(page) + human_like_scroll(page, settings=settings) continue try: @@ -470,14 +565,14 @@ def collect_recommendations( consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: break - human_like_scroll(page) + human_like_scroll(page, settings=settings) continue if not items: consecutive_empty += 1 if consecutive_empty >= max_consecutive_empty: break - human_like_scroll(page) + human_like_scroll(page, settings=settings) continue consecutive_empty = 0 @@ -518,7 +613,7 @@ def collect_recommendations( if consecutive_empty >= max_consecutive_empty: break - human_like_scroll(page) + human_like_scroll(page, settings=settings) return downloaded @@ -596,6 +691,36 @@ def build_parser() -> argparse.ArgumentParser: default=50, help="推荐流最大抓取数量,默认 50", ) + parser.add_argument( + "--scroll-mode", + choices=["human"], + default="human", + help="推荐流滚动模式,默认 human", + ) + parser.add_argument( + "--min-wait", + type=float, + default=2.0, + help="推荐流每次滚动后的最短等待秒数,默认 2", + ) + parser.add_argument( + "--max-wait", + type=float, + default=8.0, + help="推荐流每次滚动后的最长等待秒数,默认 8", + ) + parser.add_argument( + "--reverse-scroll-probability", + type=float, + default=0.2, + help="推荐流小幅回滚概率,取值 0 到 1,默认 0.2", + ) + parser.add_argument( + "--max-runtime", + type=float, + default=600.0, + help="推荐流最大运行秒数,默认 600;设置为 0 表示不限制", + ) return parser @@ -611,6 +736,22 @@ def main(argv: list[str] | None = None) -> int: parser.error("--browser-port 必须大于 0") if args.max_videos <= 0: parser.error("--max-videos 必须大于 0") + if args.min_wait < 0: + parser.error("--min-wait 不能小于 0") + if args.max_wait < args.min_wait: + parser.error("--max-wait 必须大于或等于 --min-wait") + if not 0 <= args.reverse_scroll_probability <= 1: + parser.error("--reverse-scroll-probability 必须在 0 到 1 之间") + if args.max_runtime < 0: + parser.error("--max-runtime 不能小于 0") + + scroll_settings = ScrollSettings( + mode=args.scroll_mode, + min_wait=args.min_wait, + max_wait=args.max_wait, + reverse_scroll_probability=args.reverse_scroll_probability, + max_runtime=args.max_runtime, + ) try: target = resolve_cli_target(args.target, browser_port=args.browser_port) @@ -629,6 +770,7 @@ def main(argv: list[str] | None = None) -> int: timeout=args.timeout, output_dir=Path(args.output_dir), browser_port=args.browser_port, + scroll_settings=scroll_settings, ) elif target.kind == "single-video": total = collect_single_video( diff --git a/test_douyin.py b/test_douyin.py index 7841000..99ecba9 100644 --- a/test_douyin.py +++ b/test_douyin.py @@ -60,6 +60,14 @@ class FakeRuntimePage: raise AssertionError(f"unexpected scroll script: {script}") +class FakeScrollPage: + def __init__(self): + self.scripts = [] + + def run_js(self, script): + self.scripts.append(script) + + class DouyinModuleTests(unittest.TestCase): def test_module_can_import_without_optional_runtime_dependencies(self) -> None: module = importlib.import_module("Douyin") @@ -98,6 +106,16 @@ class DouyinModuleTests(unittest.TestCase): ) self.assertEqual(output_path.as_posix(), "video/[测试博主]测试标题-123456.mp4") + def test_build_output_path_limits_long_filename(self) -> None: + module = importlib.import_module("Douyin") + output_path = module.build_output_path( + title="超长标题" * 100, + video_id="7619989983668240802", + author_name="超长博主名" * 20, + ) + self.assertLessEqual(len(output_path.name.encode("utf-8")), 240) + self.assertTrue(output_path.name.endswith("-7619989983668240802.mp4")) + def test_extract_aweme_payload_uses_dict_body(self) -> None: module = importlib.import_module("Douyin") response = FakeResponse({"aweme_list": []}, "") @@ -111,11 +129,71 @@ class DouyinModuleTests(unittest.TestCase): {"aweme_list": [{"aweme_id": "1"}]}, ) + def test_wait_for_aweme_packet_treats_false_listener_result_as_missing(self) -> None: + module = importlib.import_module("Douyin") + page = mock.MagicMock() + page.listen.wait.return_value = False + self.assertIsNone(module.wait_for_aweme_packet(page, timeout=10)) + def test_build_browser_address_from_port(self) -> None: module = importlib.import_module("Douyin") self.assertEqual(module.build_browser_address(9223), "127.0.0.1:9223") self.assertIsNone(module.build_browser_address(None)) + def test_default_scroll_settings_uses_human_mode(self) -> None: + module = importlib.import_module("Douyin") + settings = module.ScrollSettings() + self.assertEqual(settings.mode, "human") + self.assertEqual(settings.min_wait, 2.0) + self.assertEqual(settings.max_wait, 8.0) + self.assertEqual(settings.reverse_scroll_probability, 0.2) + + def test_create_human_scroll_plan_uses_configured_ranges(self) -> None: + module = importlib.import_module("Douyin") + settings = module.ScrollSettings( + min_wait=2.0, + max_wait=4.0, + min_scroll=300, + max_scroll=900, + reverse_scroll_probability=0.0, + ) + plan = module.create_human_scroll_plan(settings, random_module=module.random.Random(7)) + self.assertGreaterEqual(plan.down_distance, 300) + self.assertLessEqual(plan.down_distance, 900) + self.assertGreaterEqual(plan.down_wait, 2.0) + self.assertLessEqual(plan.down_wait, 4.0) + self.assertEqual(plan.reverse_distance, 0) + + def test_create_human_scroll_plan_can_include_reverse_scroll(self) -> None: + module = importlib.import_module("Douyin") + settings = module.ScrollSettings(reverse_scroll_probability=1.0) + plan = module.create_human_scroll_plan(settings, random_module=module.random.Random(3)) + self.assertGreaterEqual(plan.reverse_distance, 80) + self.assertLessEqual(plan.reverse_distance, 250) + self.assertGreater(plan.reverse_wait, 0) + + def test_run_human_scroll_sequence_scrolls_down_and_optionally_back_up(self) -> None: + module = importlib.import_module("Douyin") + page = FakeScrollPage() + plan = module.HumanScrollPlan( + down_distance=500, + down_wait=2.5, + reverse_distance=120, + reverse_wait=1.0, + settle_wait=3.0, + ) + with mock.patch.object(module.time, "sleep") as mocked_sleep: + module.run_human_scroll_sequence(page, plan) + self.assertEqual( + page.scripts, + [ + "window.scrollBy(0, 500);", + "window.scrollBy(0, -120);", + "window.scrollBy(0, 240);", + ], + ) + mocked_sleep.assert_has_calls([mock.call(2.5), mock.call(1.0), mock.call(3.0)]) + def test_ensure_browser_debug_port_ready_accepts_open_port(self) -> None: module = importlib.import_module("Douyin") connection = mock.MagicMock() @@ -367,7 +445,7 @@ class DouyinModuleTests(unittest.TestCase): with mock.patch.object(module, "import_runtime_dependencies", return_value=(object(), object(), object())): with mock.patch.object(module, "create_page", return_value=page): with mock.patch.object(module, "download_video") as mocked_download: - with mock.patch.object(module, "scroll_to_next_page"): + with mock.patch.object(module, "human_like_scroll"): downloaded = module.collect_recommendations( max_videos=50, timeout=10, @@ -455,6 +533,28 @@ class DouyinModuleTests(unittest.TestCase): args = module.build_parser().parse_args(["--max-videos", "30"]) self.assertEqual(args.max_videos, 30) + def test_build_parser_has_human_scroll_arguments(self) -> None: + module = importlib.import_module("Douyin") + args = module.build_parser().parse_args( + [ + "--scroll-mode", + "human", + "--min-wait", + "3", + "--max-wait", + "9", + "--reverse-scroll-probability", + "0.4", + "--max-runtime", + "600", + ] + ) + self.assertEqual(args.scroll_mode, "human") + self.assertEqual(args.min_wait, 3) + self.assertEqual(args.max_wait, 9) + self.assertEqual(args.reverse_scroll_probability, 0.4) + self.assertEqual(args.max_runtime, 600) + def test_build_parser_defaults_to_zero_argument_current_page_flow(self) -> None: module = importlib.import_module("Douyin") args = module.build_parser().parse_args([]) @@ -488,6 +588,7 @@ class DouyinModuleTests(unittest.TestCase): timeout=10, output_dir=module.Path("video"), browser_port=9223, + scroll_settings=module.ScrollSettings(), ) def test_main_without_target_dispatches_current_page_creator_flow(self) -> None: