Add resumable XHS queue downloader

This commit is contained in:
wangshaoqing 2026-05-27 16:30:06 +08:00
parent ed8357f65a
commit 37b17d8ccf
6 changed files with 480 additions and 0 deletions

3
.gitignore vendored
View File

@ -3,7 +3,10 @@ __pycache__/
*.pyc
.DS_Store
.xhs-chrome-profile/
data/
data_queue_smoke/
video/
video_queue_smoke/
video_bad_*/
video_good_*/
video_human_test/

View File

@ -73,6 +73,16 @@ pip install requests DrissionPage
# 限制最长运行时间,单位秒
./.venv/bin/python XHS.py --max-videos 20 --max-runtime 600
# 长任务队列模式:适合下载大量视频,可中断后继续
./.venv/bin/python XHS.py \
--source video-channel \
--target-videos 1000 \
--queue-file data/xhs_queue.jsonl \
--max-runtime 7200
# 继续上次未完成的队列任务
./.venv/bin/python XHS.py --queue-file data/xhs_queue.jsonl --target-videos 1000
# 如果启动 Chrome 时换了端口,下载脚本也要使用同一个端口
./.venv/bin/python login_xhs.py --browser-port 9334
./.venv/bin/python XHS.py --browser-port 9334 --max-videos 10
@ -91,6 +101,7 @@ pip install requests DrissionPage
- 解析器会递归查找响应 JSON 中的 `master_url``backup_urls` 等视频地址字段。
- 默认会在发现页和详情页之间随机停留、上下滚动,并在阶段下载后长停留。
- 下载过程会去重,并在单个视频失败时继续处理后续视频。
- 队列模式会把笔记链接和处理状态保存到 JSONL 文件,支持长任务恢复。
## 测试

248
XHS.py
View File

@ -9,11 +9,14 @@ import socket
import sys
import time
from dataclasses import dataclass
from dataclasses import asdict
from pathlib import Path
from typing import Any
from urllib.parse import urljoin
from dataclasses import replace
DEFAULT_EXPLORE_URL = "https://www.xiaohongshu.com/explore"
DEFAULT_VIDEO_CHANNEL_URL = "https://www.xiaohongshu.com/explore?channel_id=video"
DEFAULT_BROWSER_PORT = 9223
DEFAULT_OUTPUT_DIR = Path("video")
LISTEN_TARGET = "/api/sns/web/v1/feed"
@ -59,6 +62,18 @@ class HumanBrowsePlan:
settle_wait: float = 0.0
@dataclass(frozen=True)
class QueueRecord:
note_id: str
url: str
source: str
status: str = "pending"
attempts: int = 0
downloaded_path: str = ""
last_error: str = ""
updated_at: str = ""
def sanitize_filename(value: str, fallback: str = "untitled") -> str:
cleaned = INVALID_FILENAME_CHARS.sub("_", value).strip(" ._")
return cleaned or fallback
@ -674,6 +689,217 @@ def filter_unvisited_note_urls(urls: list[str], visited_note_ids: set[str]) -> l
return [url for url in urls if extract_note_id_from_url(url) not in visited_note_ids]
def load_queue(queue_path: Path) -> list[QueueRecord]:
if not queue_path.exists():
return []
records: list[QueueRecord] = []
for line in queue_path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
data = json.loads(line)
records.append(QueueRecord(**data))
return records
def save_queue(queue_path: Path, records: list[QueueRecord]) -> None:
queue_path.parent.mkdir(parents=True, exist_ok=True)
temp_path = queue_path.with_suffix(f"{queue_path.suffix}.tmp")
lines = [json.dumps(asdict(record), ensure_ascii=False, sort_keys=True) for record in records]
temp_path.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")
temp_path.replace(queue_path)
def merge_note_urls_into_queue(records: list[QueueRecord], note_urls: list[str], source: str) -> list[QueueRecord]:
merged = list(records)
existing_ids = {record.note_id for record in merged}
for note_url in normalize_note_urls(note_urls):
note_id = extract_note_id_from_url(note_url)
if note_id in existing_ids:
continue
existing_ids.add(note_id)
merged.append(QueueRecord(note_id=note_id, url=note_url, source=source))
return merged
def count_queue_status(records: list[QueueRecord]) -> dict[str, int]:
counts: dict[str, int] = {}
for record in records:
counts[record.status] = counts.get(record.status, 0) + 1
return counts
def current_timestamp() -> str:
return time.strftime("%Y-%m-%dT%H:%M:%S%z")
def mark_queue_record_downloaded(record: QueueRecord, output_path: Path) -> QueueRecord:
return replace(
record,
status="downloaded",
downloaded_path=output_path.as_posix(),
last_error="",
updated_at=current_timestamp(),
)
def mark_queue_record_skipped(record: QueueRecord, reason: str) -> QueueRecord:
return replace(
record,
status="skipped_image",
last_error=reason,
updated_at=current_timestamp(),
)
def mark_queue_record_failed(record: QueueRecord, error: str, retry_limit: int) -> QueueRecord:
attempts = record.attempts + 1
status = "failed" if attempts >= retry_limit else "pending"
return replace(
record,
status=status,
attempts=attempts,
last_error=error,
updated_at=current_timestamp(),
)
def build_source_url(source: str) -> str:
if source == "explore":
return DEFAULT_EXPLORE_URL
if source == "video-channel":
return DEFAULT_VIDEO_CHANNEL_URL
if source == "current-page":
return ""
raise ValueError(f"不支持的来源: {source}")
def run_queue_download(
source: str,
target_videos: int,
queue_file: Path,
retry_limit: int,
**kwargs: Any,
) -> int:
timeout = int(kwargs.get("timeout", 20))
output_dir = Path(kwargs.get("output_dir", DEFAULT_OUTPUT_DIR))
browser_port = kwargs.get("browser_port", DEFAULT_BROWSER_PORT)
human_settings = HumanBrowseSettings(
enabled=bool(kwargs.get("human_mode", True)),
min_wait=float(kwargs.get("min_wait", 2.0)),
max_wait=float(kwargs.get("max_wait", 6.0)),
long_break_every=int(kwargs.get("long_break_every", 4)),
max_runtime=float(kwargs.get("max_runtime", 0.0)),
)
started_at = time.monotonic()
source_url = build_source_url(source)
requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies()
if browser_port is not None:
ensure_browser_debug_port_ready(int(browser_port))
page = create_page(chromium_page_cls, chromium_options_cls, int(browser_port) if browser_port is not None else None)
page.listen.start(LISTEN_TARGET)
records = load_queue(queue_file)
downloaded_this_run = 0
seen_urls: set[str] = set()
seen_files: set[Path] = set()
empty_rounds = 0
max_empty_rounds = 5
while count_queue_status(records).get("downloaded", 0) < target_videos:
if human_settings.max_runtime > 0 and time.monotonic() - started_at >= human_settings.max_runtime:
print("[INFO] 已达到最大运行时间,结束队列任务。")
break
pending_indices = [index for index, record in enumerate(records) if record.status == "pending"]
if not pending_indices:
before_count = len(records)
if source == "current-page":
note_urls = normalize_note_urls([getattr(page, "url", "")])
else:
page.get(source_url)
human_pause(human_settings)
note_urls = collect_note_urls_from_page(page, limit=max(50, target_videos * 2))
records = merge_note_urls_into_queue(records, note_urls, source=source)
save_queue(queue_file, records)
added = len(records) - before_count
print(f"[INFO] 队列新增 {added} 条,当前总数 {len(records)}")
if added == 0:
empty_rounds += 1
if empty_rounds >= max_empty_rounds:
print("[INFO] 连续多轮没有新增队列项,结束队列任务。")
break
else:
empty_rounds = 0
if source != "current-page":
run_human_browse_sequence(page, create_human_browse_plan(human_settings))
continue
index = pending_indices[0]
record = records[index]
print(f"[INFO] 处理队列项 {index + 1}/{len(records)}: {record.note_id}")
try:
page.get(record.url)
human_pause(human_settings)
if human_settings.enabled:
run_human_browse_sequence(page, create_human_browse_plan(human_settings))
candidates = group_video_candidates(
extract_video_candidates_from_html(
page.run_js("return document.documentElement.outerHTML"),
video_id=record.note_id,
)
)
if not candidates:
packet = wait_for_feed_packet(page, timeout=timeout)
if packet is not None:
candidates = group_video_candidates(extract_video_candidates(extract_feed_payload(packet.response)))
if not candidates:
records[index] = mark_queue_record_skipped(record, "no video candidate")
save_queue(queue_file, records)
continue
candidate = candidates[0]
if candidate.video_url in seen_urls:
records[index] = mark_queue_record_skipped(record, "duplicate video url")
save_queue(queue_file, records)
continue
seen_urls.add(candidate.video_url)
output_path = build_output_path(candidate, output_dir=output_dir)
if output_path in seen_files or output_path.exists():
records[index] = mark_queue_record_downloaded(record, output_path)
save_queue(queue_file, records)
continue
download_video(
requests_module=requests_module,
headers=build_headers(record.url),
video_url=candidate.video_url,
output_path=output_path,
)
seen_files.add(output_path)
records[index] = mark_queue_record_downloaded(record, output_path)
save_queue(queue_file, records)
downloaded_this_run += 1
total_downloaded = count_queue_status(records).get("downloaded", 0)
print(f"[OK] 已保存 ({total_downloaded}/{target_videos}): {output_path}")
human_pause(human_settings)
if should_take_long_break(total_downloaded, human_settings):
take_long_break(human_settings)
except Exception as exc:
records[index] = mark_queue_record_failed(record, str(exc), retry_limit=retry_limit)
save_queue(queue_file, records)
print(f"[WARN] 队列项失败 {record.note_id}: {exc}")
counts = count_queue_status(records)
print(
"[INFO] 队列状态 "
f"downloaded={counts.get('downloaded', 0)} "
f"pending={counts.get('pending', 0)} "
f"skipped={counts.get('skipped_image', 0)} "
f"failed={counts.get('failed', 0)}"
)
return downloaded_this_run
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="附着到已登录小红书 Chrome监听 feed 响应并下载视频")
parser.add_argument("--max-videos", type=int, default=10, help="最多下载视频数量,默认 10")
@ -687,6 +913,10 @@ def build_parser() -> argparse.ArgumentParser:
parser.add_argument("--max-wait", type=float, default=6.0, help="随机停留最长秒数,默认 6")
parser.add_argument("--long-break-every", type=int, default=4, help="每下载 N 条长停留一次,默认 4")
parser.add_argument("--max-runtime", type=float, default=0.0, help="最大运行秒数0 表示不限制")
parser.add_argument("--source", choices=["explore", "video-channel", "current-page"], default="explore", help="长任务来源,默认 explore")
parser.add_argument("--target-videos", type=int, default=0, help="队列模式目标下载数量0 表示不启用")
parser.add_argument("--queue-file", default=None, help="JSONL 队列文件路径,提供后启用可恢复队列模式")
parser.add_argument("--retry-limit", type=int, default=1, help="队列项下载失败重试次数,默认 1")
return parser
@ -699,6 +929,24 @@ def main(argv: list[str] | None = None) -> int:
parser.error("--browser-port 必须大于 0")
if args.min_wait < 0 or args.max_wait < args.min_wait:
parser.error("--min-wait 和 --max-wait 必须满足 0 <= min <= max")
if args.queue_file or args.target_videos > 0:
target_videos = args.target_videos if args.target_videos > 0 else args.max_videos
downloaded = run_queue_download(
source=args.source,
target_videos=target_videos,
queue_file=Path(args.queue_file or "data/xhs_queue.jsonl"),
retry_limit=args.retry_limit,
timeout=args.timeout,
output_dir=Path(args.output_dir),
browser_port=args.browser_port,
human_mode=args.human_mode,
min_wait=args.min_wait,
max_wait=args.max_wait,
long_break_every=args.long_break_every,
max_runtime=args.max_runtime,
)
print(f"[INFO] 本次共下载 {downloaded} 个视频。")
return 0
downloaded = collect_videos(
max_videos=args.max_videos,
timeout=args.timeout,

View File

@ -0,0 +1,45 @@
# XHS Long Queue Downloader Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add a resumable JSONL queue mode so long Xiaohongshu video download jobs can target large counts like 1000 videos.
**Architecture:** Keep `XHS.py` as the CLI entry point. Add queue record helpers, source URL helpers, discovery/processing orchestration, and CLI flags while reusing existing parsing, download validation, shared Chrome, and human browsing cadence.
**Tech Stack:** Python 3, unittest, JSONL files, DrissionPage, requests.
---
## File Structure
- Modify `XHS.py`: queue dataclass/helpers, source selection, queue orchestration, CLI flags.
- Modify `test_xhs.py`: queue unit tests and CLI plumbing tests.
- Modify `README.md`: long task command examples.
## Task 1: Queue Persistence
- [ ] Write tests for queue load/save, deduping by note_id, counting downloaded records, and status updates.
- [ ] Run `python3 -m unittest test_xhs.py -v` and verify failures.
- [ ] Implement `QueueRecord`, `load_queue`, `save_queue`, `merge_note_urls_into_queue`, `count_queue_status`.
- [ ] Run tests and verify pass.
## Task 2: Source Selection and CLI
- [ ] Write tests for `build_source_url` and parser defaults for `--source`, `--target-videos`, `--queue-file`, `--retry-limit`.
- [ ] Run tests and verify failures.
- [ ] Implement source URL selection and CLI argument plumbing.
- [ ] Run tests and verify pass.
## Task 3: Queue Processing Orchestration
- [ ] Write tests for pure queue status transitions for success, skipped image, failed retry.
- [ ] Run tests and verify failures.
- [ ] Implement queue processing helpers and wire queue mode into `main` when `--queue-file` or `--target-videos` is provided.
- [ ] Run tests and verify pass.
## Task 4: Docs and Verification
- [ ] Update README with 1000-video queue command and resume behavior.
- [ ] Run `python3 -m unittest test_xhs.py test_login_xhs.py -v`.
- [ ] Run a small smoke command with low target and short waits if browser is available.
- [ ] Commit and push.

View File

@ -0,0 +1,62 @@
# XHS Long Queue Downloader Design
## Goal
Add a resumable long-task downloader for collecting large numbers of Xiaohongshu videos, such as 1000 videos, without relying on a single recommendation page pass.
## Scope
The feature stays within the existing manually logged-in browser model. It does not automate login, bypass verification, spoof device fingerprints, or call private APIs directly outside what the loaded web pages expose. It improves task durability, source density, and progress tracking.
## Architecture
The downloader becomes two-phase while preserving the current one-command UX:
1. Queue discovery collects note detail URLs from configured sources and writes them to a JSONL queue.
2. Queue processing opens pending note URLs, extracts video URLs from page state or feed responses, downloads valid videos, and updates each queue item status.
The queue file stores one JSON object per note:
```json
{"note_id":"...","url":"...","source":"video-channel","status":"pending","attempts":0,"downloaded_path":"","last_error":"","updated_at":"..."}
```
Statuses are `pending`, `downloaded`, `skipped_image`, and `failed`.
## Sources
The first implementation supports:
- `explore`: current recommendation page.
- `video-channel`: `https://www.xiaohongshu.com/explore?channel_id=video` as a best-effort source. If Xiaohongshu redirects or changes channel routing, the collector still reads visible `/explore/` cards.
- `current-page`: process the current browser page.
Future search keyword sources can be added after the queue engine is stable.
## Runtime Behavior
A command such as:
```bash
./.venv/bin/python XHS.py --source video-channel --target-videos 1000 --queue-file data/xhs_queue.jsonl --max-runtime 7200
```
will:
1. Load existing queue records.
2. Count already downloaded items.
3. Open the selected source page and collect visible note URLs.
4. Append new pending records, preserving existing statuses.
5. Process pending records until `target_videos`, `max_runtime`, or queue exhaustion.
6. If queue is exhausted before target, return to source, scroll, collect more URLs, and continue.
## Error Handling
- Non-video notes become `skipped_image`.
- Download failures increment attempts and become `failed` after retry limit.
- The queue is rewritten atomically after status changes.
- Progress logs include downloaded count, skipped count, failed count, and pending count.
## Testing
Unit tests cover JSONL queue load/save, deduplication, status updates, source URL selection, target counting, and CLI argument plumbing. Existing download and parsing tests remain in place.

View File

@ -1,5 +1,7 @@
import importlib
import tempfile
import unittest
from pathlib import Path
from unittest import mock
@ -206,6 +208,10 @@ class XhsModuleTests(unittest.TestCase):
self.assertEqual(args.max_wait, 6.0)
self.assertEqual(args.long_break_every, 4)
self.assertEqual(args.max_runtime, 0.0)
self.assertEqual(args.source, "explore")
self.assertIsNone(args.queue_file)
self.assertEqual(args.target_videos, 0)
self.assertEqual(args.retry_limit, 1)
def test_main_invokes_collect_videos_with_cli_values(self) -> None:
module = importlib.import_module("XHS")
@ -234,6 +240,37 @@ class XhsModuleTests(unittest.TestCase):
self.assertFalse(kwargs["use_current_page"])
self.assertTrue(kwargs["human_mode"])
def test_build_source_url_supports_video_channel_and_explore(self) -> None:
module = importlib.import_module("XHS")
self.assertEqual(module.build_source_url("explore"), module.DEFAULT_EXPLORE_URL)
self.assertEqual(
module.build_source_url("video-channel"),
"https://www.xiaohongshu.com/explore?channel_id=video",
)
def test_main_invokes_queue_mode_when_queue_file_is_provided(self) -> None:
module = importlib.import_module("XHS")
with mock.patch.object(module, "run_queue_download", return_value=5) as mocked_run:
exit_code = module.main(
[
"--source",
"video-channel",
"--target-videos",
"1000",
"--queue-file",
"data/q.jsonl",
"--retry-limit",
"2",
]
)
self.assertEqual(exit_code, 0)
mocked_run.assert_called_once()
_, kwargs = mocked_run.call_args
self.assertEqual(kwargs["source"], "video-channel")
self.assertEqual(kwargs["target_videos"], 1000)
self.assertEqual(kwargs["queue_file"].as_posix(), "data/q.jsonl")
self.assertEqual(kwargs["retry_limit"], 2)
def test_download_video_rejects_webp_response_before_writing_file(self) -> None:
module = importlib.import_module("XHS")
response = FakeDownloadResponse(b"RIFF....WEBP", content_type="image/webp")
@ -340,6 +377,80 @@ class XhsModuleTests(unittest.TestCase):
self.assertTrue(module.should_take_long_break(4, settings))
self.assertTrue(module.should_take_long_break(8, settings))
def test_queue_round_trip_jsonl(self) -> None:
module = importlib.import_module("XHS")
with tempfile.TemporaryDirectory() as temp_dir:
queue_path = Path(temp_dir) / "queue.jsonl"
records = [
module.QueueRecord(
note_id="note1",
url="https://www.xiaohongshu.com/explore/note1?xsec_token=a",
source="video-channel",
)
]
module.save_queue(queue_path, records)
loaded = module.load_queue(queue_path)
self.assertEqual(loaded, records)
def test_merge_note_urls_into_queue_deduplicates_existing_notes(self) -> None:
module = importlib.import_module("XHS")
records = [
module.QueueRecord(
note_id="note1",
url="https://www.xiaohongshu.com/explore/note1?xsec_token=a",
source="explore",
status="downloaded",
)
]
merged = module.merge_note_urls_into_queue(
records,
[
"https://www.xiaohongshu.com/explore/note1?xsec_token=a",
"https://www.xiaohongshu.com/explore/note2?xsec_token=b",
],
source="video-channel",
)
self.assertEqual([record.note_id for record in merged], ["note1", "note2"])
self.assertEqual(merged[0].status, "downloaded")
self.assertEqual(merged[1].status, "pending")
def test_count_queue_status_counts_records_by_status(self) -> None:
module = importlib.import_module("XHS")
records = [
module.QueueRecord("one", "url1", "source", status="downloaded"),
module.QueueRecord("two", "url2", "source", status="failed"),
module.QueueRecord("three", "url3", "source", status="downloaded"),
]
self.assertEqual(
module.count_queue_status(records),
{"downloaded": 2, "failed": 1},
)
def test_mark_queue_record_downloaded_updates_status_and_path(self) -> None:
module = importlib.import_module("XHS")
record = module.QueueRecord("note1", "url", "source")
updated = module.mark_queue_record_downloaded(record, Path("video/a.mp4"))
self.assertEqual(updated.status, "downloaded")
self.assertEqual(updated.downloaded_path, "video/a.mp4")
self.assertEqual(updated.last_error, "")
def test_mark_queue_record_skipped_records_reason(self) -> None:
module = importlib.import_module("XHS")
record = module.QueueRecord("note1", "url", "source")
updated = module.mark_queue_record_skipped(record, "no video")
self.assertEqual(updated.status, "skipped_image")
self.assertEqual(updated.last_error, "no video")
def test_mark_queue_record_failed_respects_retry_limit(self) -> None:
module = importlib.import_module("XHS")
record = module.QueueRecord("note1", "url", "source", attempts=0)
retry = module.mark_queue_record_failed(record, "timeout", retry_limit=2)
self.assertEqual(retry.status, "pending")
self.assertEqual(retry.attempts, 1)
failed = module.mark_queue_record_failed(retry, "timeout", retry_limit=2)
self.assertEqual(failed.status, "failed")
self.assertEqual(failed.attempts, 2)
if __name__ == "__main__":
unittest.main()