diff --git a/.gitignore b/.gitignore index 1f786c4..40113f8 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ __pycache__/ video/ video_bad_*/ video_good_*/ +video_human_test/ diff --git a/README.md b/README.md index 122501c..1416e43 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,16 @@ pip install requests DrissionPage # 指定保存目录 ./.venv/bin/python XHS.py --max-videos 10 --output-dir video +# 默认启用温和随机浏览节奏;可调整停留时间和阶段长休息 +./.venv/bin/python XHS.py --max-videos 20 --min-wait 2 --max-wait 6 --long-break-every 4 + +# 测试时可以缩短等待;需要最快速度时可关闭 human mode +./.venv/bin/python XHS.py --max-videos 3 --min-wait 0.5 --max-wait 1 +./.venv/bin/python XHS.py --max-videos 3 --no-human-mode + +# 限制最长运行时间,单位秒 +./.venv/bin/python XHS.py --max-videos 20 --max-runtime 600 + # 如果启动 Chrome 时换了端口,下载脚本也要使用同一个端口 ./.venv/bin/python login_xhs.py --browser-port 9334 ./.venv/bin/python XHS.py --browser-port 9334 --max-videos 10 @@ -79,6 +89,7 @@ pip install requests DrissionPage - 浏览器负责加载小红书页面和保留登录态。 - 脚本只监听浏览器里已经产生的网络响应。 - 解析器会递归查找响应 JSON 中的 `master_url`、`backup_urls` 等视频地址字段。 +- 默认会在发现页和详情页之间随机停留、上下滚动,并在阶段下载后长停留。 - 下载过程会去重,并在单个视频失败时继续处理后续视频。 ## 测试 diff --git a/XHS.py b/XHS.py index 118f1fa..d661ad3 100644 --- a/XHS.py +++ b/XHS.py @@ -3,6 +3,7 @@ from __future__ import annotations import argparse import html import json +import random import re import socket import sys @@ -33,6 +34,31 @@ class VideoCandidate: source_key: str +@dataclass(frozen=True) +class HumanBrowseSettings: + enabled: bool = True + min_wait: float = 2.0 + max_wait: float = 6.0 + long_break_every: int = 4 + long_break_min: float = 8.0 + long_break_max: float = 20.0 + max_runtime: float = 0.0 + min_scroll: int = 500 + max_scroll: int = 1200 + reverse_scroll_probability: float = 0.45 + min_reverse_scroll: int = 100 + max_reverse_scroll: int = 400 + + +@dataclass(frozen=True) +class HumanBrowsePlan: + down_distance: int + primary_wait: float + reverse_distance: int = 0 + reverse_wait: float = 0.0 + settle_wait: float = 0.0 + + def sanitize_filename(value: str, fallback: str = "untitled") -> str: cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._") return cleaned or fallback @@ -319,7 +345,28 @@ def wait_for_feed_packet(page: Any, timeout: int) -> Any | None: return None -def scroll_feed(page: Any, distance: int = 900) -> None: +def create_human_browse_plan( + settings: HumanBrowseSettings, + random_module: Any = random, +) -> HumanBrowsePlan: + down_distance = random_module.randint(settings.min_scroll, settings.max_scroll) + primary_wait = random_module.uniform(settings.min_wait, settings.max_wait) + settle_wait = random_module.uniform(settings.min_wait, settings.max_wait) + reverse_distance = 0 + reverse_wait = 0.0 + if random_module.random() < settings.reverse_scroll_probability: + reverse_distance = random_module.randint(settings.min_reverse_scroll, settings.max_reverse_scroll) + reverse_wait = random_module.uniform(1.0, min(4.0, settings.max_wait)) + return HumanBrowsePlan( + down_distance=down_distance, + primary_wait=primary_wait, + reverse_distance=reverse_distance, + reverse_wait=reverse_wait, + settle_wait=settle_wait, + ) + + +def run_scroll_step(page: Any, distance: int) -> None: script = f""" const distance = {distance}; const candidates = Array.from(document.querySelectorAll('*')) @@ -338,6 +385,37 @@ const target = candidates[0] || document.scrollingElement || document.documentEl target.scrollBy(0, distance); """ page.run_js(script) + + +def run_human_browse_sequence(page: Any, plan: HumanBrowsePlan) -> None: + run_scroll_step(page, plan.down_distance) + time.sleep(plan.primary_wait) + if plan.reverse_distance > 0: + run_scroll_step(page, -plan.reverse_distance) + time.sleep(plan.reverse_wait) + run_scroll_step(page, plan.reverse_distance * 2) + if plan.settle_wait > 0: + time.sleep(plan.settle_wait) + + +def human_pause(settings: HumanBrowseSettings, random_module: Any = random) -> None: + if settings.enabled: + time.sleep(random_module.uniform(settings.min_wait, settings.max_wait)) + + +def should_take_long_break(downloaded: int, settings: HumanBrowseSettings) -> bool: + return settings.enabled and settings.long_break_every > 0 and downloaded > 0 and downloaded % settings.long_break_every == 0 + + +def take_long_break(settings: HumanBrowseSettings, random_module: Any = random) -> None: + if settings.enabled: + wait_seconds = random_module.uniform(settings.long_break_min, settings.long_break_max) + print(f"[INFO] 已达到阶段下载数,停留 {wait_seconds:.1f}s") + time.sleep(wait_seconds) + + +def scroll_feed(page: Any, distance: int = 900) -> None: + run_scroll_step(page, distance) time.sleep(2) @@ -348,7 +426,20 @@ def collect_videos( browser_port: int | None, start_url: str = DEFAULT_EXPLORE_URL, use_current_page: bool = False, + human_mode: bool = True, + min_wait: float = 2.0, + max_wait: float = 6.0, + long_break_every: int = 4, + max_runtime: float = 0.0, ) -> int: + human_settings = HumanBrowseSettings( + enabled=human_mode, + min_wait=min_wait, + max_wait=max_wait, + long_break_every=long_break_every, + max_runtime=max_runtime, + ) + started_at = time.monotonic() requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies() if browser_port is not None: ensure_browser_debug_port_ready(browser_port) @@ -361,7 +452,7 @@ def collect_videos( else: print("[INFO] 正在打开小红书发现页。若出现登录或验证码,请先在浏览器窗口里完成。") page.get(start_url) - time.sleep(3) + human_pause(human_settings) downloaded = 0 seen_urls: set[str] = set() @@ -378,13 +469,18 @@ def collect_videos( start_url=start_url, seen_urls=seen_urls, seen_files=seen_files, + human_settings=human_settings, + started_at=started_at, ) if downloaded >= max_videos: return downloaded page.get(start_url) - time.sleep(2) + human_pause(human_settings) while downloaded < max_videos and consecutive_empty < max_consecutive_empty: + if human_settings.max_runtime > 0 and time.monotonic() - started_at >= human_settings.max_runtime: + print("[INFO] 已达到最大运行时间,结束抓取。") + break packet = wait_for_feed_packet(page, timeout=timeout) if packet is None: candidates = group_video_candidates( @@ -395,7 +491,7 @@ def collect_videos( ) if not candidates: consecutive_empty += 1 - scroll_feed(page) + run_human_browse_sequence(page, create_human_browse_plan(human_settings)) continue else: try: @@ -404,13 +500,13 @@ def collect_videos( except Exception as exc: print(f"[WARN] 解析 feed 数据失败: {exc}") consecutive_empty += 1 - scroll_feed(page) + run_human_browse_sequence(page, create_human_browse_plan(human_settings)) continue fresh_candidates = [candidate for candidate in candidates if candidate.video_url not in seen_urls] if not fresh_candidates: consecutive_empty += 1 - scroll_feed(page) + run_human_browse_sequence(page, create_human_browse_plan(human_settings)) continue consecutive_empty = 0 @@ -436,9 +532,12 @@ def collect_videos( downloaded += 1 seen_files.add(output_path) print(f"[OK] 已保存: {output_path}") + human_pause(human_settings) + if should_take_long_break(downloaded, human_settings): + take_long_break(human_settings) if downloaded < max_videos: - scroll_feed(page) + run_human_browse_sequence(page, create_human_browse_plan(human_settings)) if downloaded == 0: print("[WARN] 没有下载到视频。请确认已登录小红书、页面已加载 feed,并在浏览器中滚动后重试。") @@ -453,6 +552,8 @@ def collect_videos_from_explore_cards( start_url: str, seen_urls: set[str], seen_files: set[Path], + human_settings: HumanBrowseSettings, + started_at: float, ) -> int: downloaded = 0 visited_note_ids: set[str] = set() @@ -460,25 +561,34 @@ def collect_videos_from_explore_cards( max_empty_rounds = 4 while downloaded < max_videos and empty_rounds < max_empty_rounds: + if human_settings.max_runtime > 0 and time.monotonic() - started_at >= human_settings.max_runtime: + print("[INFO] 已达到最大运行时间,结束抓取。") + break page.get(start_url) - time.sleep(2) + human_pause(human_settings) note_urls = filter_unvisited_note_urls( collect_note_urls_from_page(page, limit=max_videos * 12), visited_note_ids, ) if not note_urls: empty_rounds += 1 - scroll_feed(page) + run_human_browse_sequence(page, create_human_browse_plan(human_settings)) continue round_downloaded = 0 for note_url in note_urls: if downloaded >= max_videos: break + if human_settings.max_runtime > 0 and time.monotonic() - started_at >= human_settings.max_runtime: + print("[INFO] 已达到最大运行时间,结束抓取。") + break note_id = extract_note_id_from_url(note_url) visited_note_ids.add(note_id) page.get(note_url) - time.sleep(2) + print(f"[INFO] 打开笔记 {len(visited_note_ids)}: {note_id}") + human_pause(human_settings) + if human_settings.enabled: + run_human_browse_sequence(page, create_human_browse_plan(human_settings)) candidates = group_video_candidates( extract_video_candidates_from_html( page.run_js("return document.documentElement.outerHTML"), @@ -509,7 +619,10 @@ def collect_videos_from_explore_cards( downloaded += 1 round_downloaded += 1 seen_files.add(output_path) - print(f"[OK] 已保存: {output_path}") + print(f"[OK] 已保存 ({downloaded}/{max_videos}): {output_path}") + human_pause(human_settings) + if should_take_long_break(downloaded, human_settings): + take_long_break(human_settings) if round_downloaded == 0: empty_rounds += 1 @@ -517,8 +630,8 @@ def collect_videos_from_explore_cards( empty_rounds = 0 if downloaded < max_videos: page.get(start_url) - time.sleep(1) - scroll_feed(page) + human_pause(human_settings) + run_human_browse_sequence(page, create_human_browse_plan(human_settings)) return downloaded @@ -569,6 +682,11 @@ def build_parser() -> argparse.ArgumentParser: parser.add_argument("--timeout", type=int, default=20, help="等待单次 feed 响应的秒数,默认 20") parser.add_argument("--start-url", default=DEFAULT_EXPLORE_URL, help="打开或刷新使用的小红书页面") parser.add_argument("--use-current-page", action="store_true", help="使用浏览器当前页面,不强制打开发现页") + parser.add_argument("--human-mode", action=argparse.BooleanOptionalAction, default=True, help="启用温和随机浏览节奏,默认开启") + parser.add_argument("--min-wait", type=float, default=2.0, help="随机停留最短秒数,默认 2") + parser.add_argument("--max-wait", type=float, default=6.0, help="随机停留最长秒数,默认 6") + parser.add_argument("--long-break-every", type=int, default=4, help="每下载 N 条长停留一次,默认 4") + parser.add_argument("--max-runtime", type=float, default=0.0, help="最大运行秒数,0 表示不限制") return parser @@ -579,6 +697,8 @@ def main(argv: list[str] | None = None) -> int: parser.error("--max-videos 必须大于 0") if args.browser_port <= 0: parser.error("--browser-port 必须大于 0") + if args.min_wait < 0 or args.max_wait < args.min_wait: + parser.error("--min-wait 和 --max-wait 必须满足 0 <= min <= max") downloaded = collect_videos( max_videos=args.max_videos, timeout=args.timeout, @@ -586,6 +706,11 @@ def main(argv: list[str] | None = None) -> int: browser_port=args.browser_port, start_url=args.start_url, use_current_page=args.use_current_page, + human_mode=args.human_mode, + min_wait=args.min_wait, + max_wait=args.max_wait, + long_break_every=args.long_break_every, + max_runtime=args.max_runtime, ) print(f"[INFO] 本次共下载 {downloaded} 个视频。") return 0 diff --git a/test_xhs.py b/test_xhs.py index 34a3275..968c778 100644 --- a/test_xhs.py +++ b/test_xhs.py @@ -28,6 +28,14 @@ class FakeRequests: return self.response +class FakeScrollPage: + def __init__(self): + self.scripts = [] + + def run_js(self, script): + self.scripts.append(script) + + class XhsModuleTests(unittest.TestCase): def test_module_can_import_without_optional_runtime_dependencies(self) -> None: module = importlib.import_module("XHS") @@ -193,6 +201,11 @@ class XhsModuleTests(unittest.TestCase): self.assertEqual(args.timeout, 20) self.assertEqual(args.start_url, module.DEFAULT_EXPLORE_URL) self.assertFalse(args.use_current_page) + self.assertTrue(args.human_mode) + self.assertEqual(args.min_wait, 2.0) + self.assertEqual(args.max_wait, 6.0) + self.assertEqual(args.long_break_every, 4) + self.assertEqual(args.max_runtime, 0.0) def test_main_invokes_collect_videos_with_cli_values(self) -> None: module = importlib.import_module("XHS") @@ -219,6 +232,7 @@ class XhsModuleTests(unittest.TestCase): self.assertEqual(kwargs["browser_port"], 9334) self.assertEqual(kwargs["timeout"], 7) self.assertFalse(kwargs["use_current_page"]) + self.assertTrue(kwargs["human_mode"]) def test_download_video_rejects_webp_response_before_writing_file(self) -> None: module = importlib.import_module("XHS") @@ -284,6 +298,48 @@ class XhsModuleTests(unittest.TestCase): ["https://www.xiaohongshu.com/explore/def?xsec_token=token"], ) + def test_create_human_browse_plan_uses_wait_and_scroll_ranges(self) -> None: + module = importlib.import_module("XHS") + settings = module.HumanBrowseSettings( + min_wait=2.0, + max_wait=6.0, + reverse_scroll_probability=1.0, + min_scroll=500, + max_scroll=1200, + ) + plan = module.create_human_browse_plan(settings, random_module=module.random.Random(7)) + self.assertGreaterEqual(plan.primary_wait, 2.0) + self.assertLessEqual(plan.primary_wait, 6.0) + self.assertGreaterEqual(plan.down_distance, 500) + self.assertLessEqual(plan.down_distance, 1200) + self.assertGreater(plan.reverse_distance, 0) + + def test_run_human_browse_sequence_scrolls_and_waits(self) -> None: + module = importlib.import_module("XHS") + page = FakeScrollPage() + plan = module.HumanBrowsePlan( + down_distance=800, + primary_wait=2.5, + reverse_distance=200, + reverse_wait=1.5, + settle_wait=3.0, + ) + with mock.patch.object(module.time, "sleep") as mocked_sleep: + module.run_human_browse_sequence(page, plan) + self.assertIn("const distance = 800;", page.scripts[0]) + self.assertIn("const distance = -200;", page.scripts[1]) + self.assertIn("const distance = 400;", page.scripts[2]) + self.assertIn("scrollBy(0, distance)", page.scripts[0]) + mocked_sleep.assert_has_calls([mock.call(2.5), mock.call(1.5), mock.call(3.0)]) + + def test_should_take_long_break_uses_every_n_downloads(self) -> None: + module = importlib.import_module("XHS") + settings = module.HumanBrowseSettings(long_break_every=4) + self.assertFalse(module.should_take_long_break(0, settings)) + self.assertFalse(module.should_take_long_break(3, settings)) + self.assertTrue(module.should_take_long_break(4, settings)) + self.assertTrue(module.should_take_long_break(8, settings)) + if __name__ == "__main__": unittest.main()