Add human browsing cadence

This commit is contained in:
wangshaoqing 2026-05-27 16:03:40 +08:00
parent d6e4443d40
commit ed8357f65a
4 changed files with 206 additions and 13 deletions

1
.gitignore vendored
View File

@ -6,3 +6,4 @@ __pycache__/
video/
video_bad_*/
video_good_*/
video_human_test/

View File

@ -63,6 +63,16 @@ pip install requests DrissionPage
# 指定保存目录
./.venv/bin/python XHS.py --max-videos 10 --output-dir video
# 默认启用温和随机浏览节奏;可调整停留时间和阶段长休息
./.venv/bin/python XHS.py --max-videos 20 --min-wait 2 --max-wait 6 --long-break-every 4
# 测试时可以缩短等待;需要最快速度时可关闭 human mode
./.venv/bin/python XHS.py --max-videos 3 --min-wait 0.5 --max-wait 1
./.venv/bin/python XHS.py --max-videos 3 --no-human-mode
# 限制最长运行时间,单位秒
./.venv/bin/python XHS.py --max-videos 20 --max-runtime 600
# 如果启动 Chrome 时换了端口,下载脚本也要使用同一个端口
./.venv/bin/python login_xhs.py --browser-port 9334
./.venv/bin/python XHS.py --browser-port 9334 --max-videos 10
@ -79,6 +89,7 @@ pip install requests DrissionPage
- 浏览器负责加载小红书页面和保留登录态。
- 脚本只监听浏览器里已经产生的网络响应。
- 解析器会递归查找响应 JSON 中的 `master_url``backup_urls` 等视频地址字段。
- 默认会在发现页和详情页之间随机停留、上下滚动,并在阶段下载后长停留。
- 下载过程会去重,并在单个视频失败时继续处理后续视频。
## 测试

151
XHS.py
View File

@ -3,6 +3,7 @@ from __future__ import annotations
import argparse
import html
import json
import random
import re
import socket
import sys
@ -33,6 +34,31 @@ class VideoCandidate:
source_key: str
@dataclass(frozen=True)
class HumanBrowseSettings:
enabled: bool = True
min_wait: float = 2.0
max_wait: float = 6.0
long_break_every: int = 4
long_break_min: float = 8.0
long_break_max: float = 20.0
max_runtime: float = 0.0
min_scroll: int = 500
max_scroll: int = 1200
reverse_scroll_probability: float = 0.45
min_reverse_scroll: int = 100
max_reverse_scroll: int = 400
@dataclass(frozen=True)
class HumanBrowsePlan:
down_distance: int
primary_wait: float
reverse_distance: int = 0
reverse_wait: float = 0.0
settle_wait: float = 0.0
def sanitize_filename(value: str, fallback: str = "untitled") -> str:
cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._")
return cleaned or fallback
@ -319,7 +345,28 @@ def wait_for_feed_packet(page: Any, timeout: int) -> Any | None:
return None
def scroll_feed(page: Any, distance: int = 900) -> None:
def create_human_browse_plan(
settings: HumanBrowseSettings,
random_module: Any = random,
) -> HumanBrowsePlan:
down_distance = random_module.randint(settings.min_scroll, settings.max_scroll)
primary_wait = random_module.uniform(settings.min_wait, settings.max_wait)
settle_wait = random_module.uniform(settings.min_wait, settings.max_wait)
reverse_distance = 0
reverse_wait = 0.0
if random_module.random() < settings.reverse_scroll_probability:
reverse_distance = random_module.randint(settings.min_reverse_scroll, settings.max_reverse_scroll)
reverse_wait = random_module.uniform(1.0, min(4.0, settings.max_wait))
return HumanBrowsePlan(
down_distance=down_distance,
primary_wait=primary_wait,
reverse_distance=reverse_distance,
reverse_wait=reverse_wait,
settle_wait=settle_wait,
)
def run_scroll_step(page: Any, distance: int) -> None:
script = f"""
const distance = {distance};
const candidates = Array.from(document.querySelectorAll('*'))
@ -338,6 +385,37 @@ const target = candidates[0] || document.scrollingElement || document.documentEl
target.scrollBy(0, distance);
"""
page.run_js(script)
def run_human_browse_sequence(page: Any, plan: HumanBrowsePlan) -> None:
run_scroll_step(page, plan.down_distance)
time.sleep(plan.primary_wait)
if plan.reverse_distance > 0:
run_scroll_step(page, -plan.reverse_distance)
time.sleep(plan.reverse_wait)
run_scroll_step(page, plan.reverse_distance * 2)
if plan.settle_wait > 0:
time.sleep(plan.settle_wait)
def human_pause(settings: HumanBrowseSettings, random_module: Any = random) -> None:
if settings.enabled:
time.sleep(random_module.uniform(settings.min_wait, settings.max_wait))
def should_take_long_break(downloaded: int, settings: HumanBrowseSettings) -> bool:
return settings.enabled and settings.long_break_every > 0 and downloaded > 0 and downloaded % settings.long_break_every == 0
def take_long_break(settings: HumanBrowseSettings, random_module: Any = random) -> None:
if settings.enabled:
wait_seconds = random_module.uniform(settings.long_break_min, settings.long_break_max)
print(f"[INFO] 已达到阶段下载数,停留 {wait_seconds:.1f}s")
time.sleep(wait_seconds)
def scroll_feed(page: Any, distance: int = 900) -> None:
run_scroll_step(page, distance)
time.sleep(2)
@ -348,7 +426,20 @@ def collect_videos(
browser_port: int | None,
start_url: str = DEFAULT_EXPLORE_URL,
use_current_page: bool = False,
human_mode: bool = True,
min_wait: float = 2.0,
max_wait: float = 6.0,
long_break_every: int = 4,
max_runtime: float = 0.0,
) -> int:
human_settings = HumanBrowseSettings(
enabled=human_mode,
min_wait=min_wait,
max_wait=max_wait,
long_break_every=long_break_every,
max_runtime=max_runtime,
)
started_at = time.monotonic()
requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies()
if browser_port is not None:
ensure_browser_debug_port_ready(browser_port)
@ -361,7 +452,7 @@ def collect_videos(
else:
print("[INFO] 正在打开小红书发现页。若出现登录或验证码,请先在浏览器窗口里完成。")
page.get(start_url)
time.sleep(3)
human_pause(human_settings)
downloaded = 0
seen_urls: set[str] = set()
@ -378,13 +469,18 @@ def collect_videos(
start_url=start_url,
seen_urls=seen_urls,
seen_files=seen_files,
human_settings=human_settings,
started_at=started_at,
)
if downloaded >= max_videos:
return downloaded
page.get(start_url)
time.sleep(2)
human_pause(human_settings)
while downloaded < max_videos and consecutive_empty < max_consecutive_empty:
if human_settings.max_runtime > 0 and time.monotonic() - started_at >= human_settings.max_runtime:
print("[INFO] 已达到最大运行时间,结束抓取。")
break
packet = wait_for_feed_packet(page, timeout=timeout)
if packet is None:
candidates = group_video_candidates(
@ -395,7 +491,7 @@ def collect_videos(
)
if not candidates:
consecutive_empty += 1
scroll_feed(page)
run_human_browse_sequence(page, create_human_browse_plan(human_settings))
continue
else:
try:
@ -404,13 +500,13 @@ def collect_videos(
except Exception as exc:
print(f"[WARN] 解析 feed 数据失败: {exc}")
consecutive_empty += 1
scroll_feed(page)
run_human_browse_sequence(page, create_human_browse_plan(human_settings))
continue
fresh_candidates = [candidate for candidate in candidates if candidate.video_url not in seen_urls]
if not fresh_candidates:
consecutive_empty += 1
scroll_feed(page)
run_human_browse_sequence(page, create_human_browse_plan(human_settings))
continue
consecutive_empty = 0
@ -436,9 +532,12 @@ def collect_videos(
downloaded += 1
seen_files.add(output_path)
print(f"[OK] 已保存: {output_path}")
human_pause(human_settings)
if should_take_long_break(downloaded, human_settings):
take_long_break(human_settings)
if downloaded < max_videos:
scroll_feed(page)
run_human_browse_sequence(page, create_human_browse_plan(human_settings))
if downloaded == 0:
print("[WARN] 没有下载到视频。请确认已登录小红书、页面已加载 feed并在浏览器中滚动后重试。")
@ -453,6 +552,8 @@ def collect_videos_from_explore_cards(
start_url: str,
seen_urls: set[str],
seen_files: set[Path],
human_settings: HumanBrowseSettings,
started_at: float,
) -> int:
downloaded = 0
visited_note_ids: set[str] = set()
@ -460,25 +561,34 @@ def collect_videos_from_explore_cards(
max_empty_rounds = 4
while downloaded < max_videos and empty_rounds < max_empty_rounds:
if human_settings.max_runtime > 0 and time.monotonic() - started_at >= human_settings.max_runtime:
print("[INFO] 已达到最大运行时间,结束抓取。")
break
page.get(start_url)
time.sleep(2)
human_pause(human_settings)
note_urls = filter_unvisited_note_urls(
collect_note_urls_from_page(page, limit=max_videos * 12),
visited_note_ids,
)
if not note_urls:
empty_rounds += 1
scroll_feed(page)
run_human_browse_sequence(page, create_human_browse_plan(human_settings))
continue
round_downloaded = 0
for note_url in note_urls:
if downloaded >= max_videos:
break
if human_settings.max_runtime > 0 and time.monotonic() - started_at >= human_settings.max_runtime:
print("[INFO] 已达到最大运行时间,结束抓取。")
break
note_id = extract_note_id_from_url(note_url)
visited_note_ids.add(note_id)
page.get(note_url)
time.sleep(2)
print(f"[INFO] 打开笔记 {len(visited_note_ids)}: {note_id}")
human_pause(human_settings)
if human_settings.enabled:
run_human_browse_sequence(page, create_human_browse_plan(human_settings))
candidates = group_video_candidates(
extract_video_candidates_from_html(
page.run_js("return document.documentElement.outerHTML"),
@ -509,7 +619,10 @@ def collect_videos_from_explore_cards(
downloaded += 1
round_downloaded += 1
seen_files.add(output_path)
print(f"[OK] 已保存: {output_path}")
print(f"[OK] 已保存 ({downloaded}/{max_videos}): {output_path}")
human_pause(human_settings)
if should_take_long_break(downloaded, human_settings):
take_long_break(human_settings)
if round_downloaded == 0:
empty_rounds += 1
@ -517,8 +630,8 @@ def collect_videos_from_explore_cards(
empty_rounds = 0
if downloaded < max_videos:
page.get(start_url)
time.sleep(1)
scroll_feed(page)
human_pause(human_settings)
run_human_browse_sequence(page, create_human_browse_plan(human_settings))
return downloaded
@ -569,6 +682,11 @@ def build_parser() -> argparse.ArgumentParser:
parser.add_argument("--timeout", type=int, default=20, help="等待单次 feed 响应的秒数,默认 20")
parser.add_argument("--start-url", default=DEFAULT_EXPLORE_URL, help="打开或刷新使用的小红书页面")
parser.add_argument("--use-current-page", action="store_true", help="使用浏览器当前页面,不强制打开发现页")
parser.add_argument("--human-mode", action=argparse.BooleanOptionalAction, default=True, help="启用温和随机浏览节奏,默认开启")
parser.add_argument("--min-wait", type=float, default=2.0, help="随机停留最短秒数,默认 2")
parser.add_argument("--max-wait", type=float, default=6.0, help="随机停留最长秒数,默认 6")
parser.add_argument("--long-break-every", type=int, default=4, help="每下载 N 条长停留一次,默认 4")
parser.add_argument("--max-runtime", type=float, default=0.0, help="最大运行秒数0 表示不限制")
return parser
@ -579,6 +697,8 @@ def main(argv: list[str] | None = None) -> int:
parser.error("--max-videos 必须大于 0")
if args.browser_port <= 0:
parser.error("--browser-port 必须大于 0")
if args.min_wait < 0 or args.max_wait < args.min_wait:
parser.error("--min-wait 和 --max-wait 必须满足 0 <= min <= max")
downloaded = collect_videos(
max_videos=args.max_videos,
timeout=args.timeout,
@ -586,6 +706,11 @@ def main(argv: list[str] | None = None) -> int:
browser_port=args.browser_port,
start_url=args.start_url,
use_current_page=args.use_current_page,
human_mode=args.human_mode,
min_wait=args.min_wait,
max_wait=args.max_wait,
long_break_every=args.long_break_every,
max_runtime=args.max_runtime,
)
print(f"[INFO] 本次共下载 {downloaded} 个视频。")
return 0

View File

@ -28,6 +28,14 @@ class FakeRequests:
return self.response
class FakeScrollPage:
def __init__(self):
self.scripts = []
def run_js(self, script):
self.scripts.append(script)
class XhsModuleTests(unittest.TestCase):
def test_module_can_import_without_optional_runtime_dependencies(self) -> None:
module = importlib.import_module("XHS")
@ -193,6 +201,11 @@ class XhsModuleTests(unittest.TestCase):
self.assertEqual(args.timeout, 20)
self.assertEqual(args.start_url, module.DEFAULT_EXPLORE_URL)
self.assertFalse(args.use_current_page)
self.assertTrue(args.human_mode)
self.assertEqual(args.min_wait, 2.0)
self.assertEqual(args.max_wait, 6.0)
self.assertEqual(args.long_break_every, 4)
self.assertEqual(args.max_runtime, 0.0)
def test_main_invokes_collect_videos_with_cli_values(self) -> None:
module = importlib.import_module("XHS")
@ -219,6 +232,7 @@ class XhsModuleTests(unittest.TestCase):
self.assertEqual(kwargs["browser_port"], 9334)
self.assertEqual(kwargs["timeout"], 7)
self.assertFalse(kwargs["use_current_page"])
self.assertTrue(kwargs["human_mode"])
def test_download_video_rejects_webp_response_before_writing_file(self) -> None:
module = importlib.import_module("XHS")
@ -284,6 +298,48 @@ class XhsModuleTests(unittest.TestCase):
["https://www.xiaohongshu.com/explore/def?xsec_token=token"],
)
def test_create_human_browse_plan_uses_wait_and_scroll_ranges(self) -> None:
module = importlib.import_module("XHS")
settings = module.HumanBrowseSettings(
min_wait=2.0,
max_wait=6.0,
reverse_scroll_probability=1.0,
min_scroll=500,
max_scroll=1200,
)
plan = module.create_human_browse_plan(settings, random_module=module.random.Random(7))
self.assertGreaterEqual(plan.primary_wait, 2.0)
self.assertLessEqual(plan.primary_wait, 6.0)
self.assertGreaterEqual(plan.down_distance, 500)
self.assertLessEqual(plan.down_distance, 1200)
self.assertGreater(plan.reverse_distance, 0)
def test_run_human_browse_sequence_scrolls_and_waits(self) -> None:
module = importlib.import_module("XHS")
page = FakeScrollPage()
plan = module.HumanBrowsePlan(
down_distance=800,
primary_wait=2.5,
reverse_distance=200,
reverse_wait=1.5,
settle_wait=3.0,
)
with mock.patch.object(module.time, "sleep") as mocked_sleep:
module.run_human_browse_sequence(page, plan)
self.assertIn("const distance = 800;", page.scripts[0])
self.assertIn("const distance = -200;", page.scripts[1])
self.assertIn("const distance = 400;", page.scripts[2])
self.assertIn("scrollBy(0, distance)", page.scripts[0])
mocked_sleep.assert_has_calls([mock.call(2.5), mock.call(1.5), mock.call(3.0)])
def test_should_take_long_break_uses_every_n_downloads(self) -> None:
module = importlib.import_module("XHS")
settings = module.HumanBrowseSettings(long_break_every=4)
self.assertFalse(module.should_take_long_break(0, settings))
self.assertFalse(module.should_take_long_break(3, settings))
self.assertTrue(module.should_take_long_break(4, settings))
self.assertTrue(module.should_take_long_break(8, settings))
if __name__ == "__main__":
unittest.main()