Add XHS keyword search source

This commit is contained in:
wangshaoqing 2026-05-27 16:49:36 +08:00
parent 37b17d8ccf
commit f247cb1a3a
6 changed files with 159 additions and 7 deletions

2
.gitignore vendored
View File

@ -5,8 +5,10 @@ __pycache__/
.xhs-chrome-profile/
data/
data_queue_smoke/
data_search_smoke/
video/
video_queue_smoke/
video_search_smoke/
video_bad_*/
video_good_*/
video_human_test/

View File

@ -80,6 +80,13 @@ pip install requests DrissionPage
--queue-file data/xhs_queue.jsonl \
--max-runtime 7200
# 搜索关键词结果下载:例如猫咪相关视频
./.venv/bin/python XHS.py \
--source search \
--keyword 猫咪 \
--target-videos 100 \
--queue-file data/search_cat_queue.jsonl
# 继续上次未完成的队列任务
./.venv/bin/python XHS.py --queue-file data/xhs_queue.jsonl --target-videos 1000

32
XHS.py
View File

@ -12,6 +12,7 @@ from dataclasses import dataclass
from dataclasses import asdict
from pathlib import Path
from typing import Any
from urllib.parse import quote
from urllib.parse import urljoin
from dataclasses import replace
@ -582,7 +583,7 @@ def collect_videos_from_explore_cards(
page.get(start_url)
human_pause(human_settings)
note_urls = filter_unvisited_note_urls(
collect_note_urls_from_page(page, limit=max_videos * 12),
wait_for_note_urls_from_page(page, limit=max_videos * 12),
visited_note_ids,
)
if not note_urls:
@ -651,7 +652,7 @@ def collect_videos_from_explore_cards(
def extract_note_id_from_url(url: str) -> str:
match = re.search(r"/explore/([^/?#]+)", url)
match = re.search(r"/(?:explore|search_result)/([^/?#]+)", url)
if match:
return match.group(1)
return "current-page"
@ -675,7 +676,7 @@ def normalize_note_urls(urls: list[str]) -> list[str]:
def collect_note_urls_from_page(page: Any, limit: int) -> list[str]:
script = """
return Array.from(document.querySelectorAll('a[href*="/explore/"]'))
return Array.from(document.querySelectorAll('a[href*="/explore/"], a[href*="/search_result/"]'))
.map((a) => a.href || a.getAttribute('href') || '')
.filter(Boolean);
"""
@ -685,6 +686,15 @@ return Array.from(document.querySelectorAll('a[href*="/explore/"]'))
return normalize_note_urls([str(url) for url in raw_urls])[:limit]
def wait_for_note_urls_from_page(page: Any, limit: int, timeout: float = 8.0, interval: float = 0.5) -> list[str]:
deadline = time.monotonic() + timeout
while True:
note_urls = collect_note_urls_from_page(page, limit=limit)
if note_urls or time.monotonic() >= deadline:
return note_urls
time.sleep(interval)
def filter_unvisited_note_urls(urls: list[str], visited_note_ids: set[str]) -> list[str]:
return [url for url in urls if extract_note_id_from_url(url) not in visited_note_ids]
@ -763,13 +773,18 @@ def mark_queue_record_failed(record: QueueRecord, error: str, retry_limit: int)
)
def build_source_url(source: str) -> str:
def build_source_url(source: str, keyword: str | None = None) -> str:
if source == "explore":
return DEFAULT_EXPLORE_URL
if source == "video-channel":
return DEFAULT_VIDEO_CHANNEL_URL
if source == "current-page":
return ""
if source == "search":
if not keyword or not keyword.strip():
raise ValueError("--source search 需要提供 --keyword")
encoded_keyword = quote(keyword.strip())
return f"https://www.xiaohongshu.com/search_result?keyword={encoded_keyword}&source=web_search_result_notes&type=51"
raise ValueError(f"不支持的来源: {source}")
@ -778,6 +793,7 @@ def run_queue_download(
target_videos: int,
queue_file: Path,
retry_limit: int,
keyword: str | None = None,
**kwargs: Any,
) -> int:
timeout = int(kwargs.get("timeout", 20))
@ -791,7 +807,7 @@ def run_queue_download(
max_runtime=float(kwargs.get("max_runtime", 0.0)),
)
started_at = time.monotonic()
source_url = build_source_url(source)
source_url = build_source_url(source, keyword=keyword)
requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies()
if browser_port is not None:
@ -819,7 +835,7 @@ def run_queue_download(
else:
page.get(source_url)
human_pause(human_settings)
note_urls = collect_note_urls_from_page(page, limit=max(50, target_videos * 2))
note_urls = wait_for_note_urls_from_page(page, limit=max(50, target_videos * 2))
records = merge_note_urls_into_queue(records, note_urls, source=source)
save_queue(queue_file, records)
added = len(records) - before_count
@ -913,7 +929,8 @@ def build_parser() -> argparse.ArgumentParser:
parser.add_argument("--max-wait", type=float, default=6.0, help="随机停留最长秒数,默认 6")
parser.add_argument("--long-break-every", type=int, default=4, help="每下载 N 条长停留一次,默认 4")
parser.add_argument("--max-runtime", type=float, default=0.0, help="最大运行秒数0 表示不限制")
parser.add_argument("--source", choices=["explore", "video-channel", "current-page"], default="explore", help="长任务来源,默认 explore")
parser.add_argument("--source", choices=["explore", "video-channel", "current-page", "search"], default="explore", help="长任务来源,默认 explore")
parser.add_argument("--keyword", default=None, help="搜索来源关键词,例如 猫咪")
parser.add_argument("--target-videos", type=int, default=0, help="队列模式目标下载数量0 表示不启用")
parser.add_argument("--queue-file", default=None, help="JSONL 队列文件路径,提供后启用可恢复队列模式")
parser.add_argument("--retry-limit", type=int, default=1, help="队列项下载失败重试次数,默认 1")
@ -936,6 +953,7 @@ def main(argv: list[str] | None = None) -> int:
target_videos=target_videos,
queue_file=Path(args.queue_file or "data/xhs_queue.jsonl"),
retry_limit=args.retry_limit,
keyword=args.keyword,
timeout=args.timeout,
output_dir=Path(args.output_dir),
browser_port=args.browser_port,

View File

@ -0,0 +1,24 @@
# XHS Search Source Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add `--source search --keyword <term>` to the resumable queue downloader.
**Architecture:** Extend `build_source_url`, CLI parser choices, `run_queue_download` arguments, and README examples. Reuse all queue and download code.
**Tech Stack:** Python 3, unittest, DrissionPage, requests.
---
## Task 1: Search URL and CLI
- [x] Write failing tests for encoded search source URL and CLI keyword plumbing.
- [x] Implement `build_source_url("search", keyword=...)`, parser support, and queue runner forwarding.
- [x] Run tests.
## Task 2: Docs and Smoke
- [x] Update README with search examples.
- [x] Run full unit tests.
- [x] Run a small search smoke test with `--keyword 猫咪 --target-videos 2`.
- [ ] Commit and push.

View File

@ -0,0 +1,27 @@
# XHS Search Source Design
## Goal
Allow the resumable queue downloader to use Xiaohongshu search results as a source, so queries such as `猫咪` or `猫咪 搞笑` can collect and download related video notes.
## Scope
This feature reuses the existing manually logged-in Chrome, queue persistence, page card collection, detail-page video extraction, validation, and human browsing cadence. It does not automate login, bypass verification, or call hidden APIs directly.
## CLI
```bash
./.venv/bin/python XHS.py --source search --keyword 猫咪 --target-videos 100 --queue-file data/search_cat_queue.jsonl
```
## Behavior
- `--source search` requires `--keyword`.
- The source URL is `https://www.xiaohongshu.com/search_result?keyword=<encoded keyword>&source=web_search_result_notes&type=51`, which opens the video-filtered search results page.
- Search result cards are collected from both `/explore/<note_id>` and tokenized `/search_result/<note_id>` links.
- Detail links are polled briefly after navigation because Xiaohongshu search result cards are rendered asynchronously.
- Queue mode handles videos, images, failures, retries, and resume semantics exactly like other sources.
## Testing
Unit tests cover search URL encoding, parser defaults, queue-mode CLI plumbing for keyword, `/search_result/` note ID extraction, tokenized search link normalization, and async result-link polling.

View File

@ -38,6 +38,27 @@ class FakeScrollPage:
self.scripts.append(script)
class FakeLinkPage:
def __init__(self, links):
self.links = links
self.scripts = []
def run_js(self, script):
self.scripts.append(script)
return self.links
class FakeDelayedLinkPage:
def __init__(self):
self.calls = 0
def run_js(self, script):
self.calls += 1
if self.calls == 1:
return []
return ["https://www.xiaohongshu.com/search_result/abc?xsec_token=token"]
class XhsModuleTests(unittest.TestCase):
def test_module_can_import_without_optional_runtime_dependencies(self) -> None:
module = importlib.import_module("XHS")
@ -248,6 +269,13 @@ class XhsModuleTests(unittest.TestCase):
"https://www.xiaohongshu.com/explore?channel_id=video",
)
def test_build_source_url_supports_encoded_search_keyword(self) -> None:
module = importlib.import_module("XHS")
self.assertEqual(
module.build_source_url("search", keyword="猫咪 搞笑"),
"https://www.xiaohongshu.com/search_result?keyword=%E7%8C%AB%E5%92%AA%20%E6%90%9E%E7%AC%91&source=web_search_result_notes&type=51",
)
def test_main_invokes_queue_mode_when_queue_file_is_provided(self) -> None:
module = importlib.import_module("XHS")
with mock.patch.object(module, "run_queue_download", return_value=5) as mocked_run:
@ -261,6 +289,8 @@ class XhsModuleTests(unittest.TestCase):
"data/q.jsonl",
"--retry-limit",
"2",
"--keyword",
"猫咪",
]
)
self.assertEqual(exit_code, 0)
@ -270,6 +300,7 @@ class XhsModuleTests(unittest.TestCase):
self.assertEqual(kwargs["target_videos"], 1000)
self.assertEqual(kwargs["queue_file"].as_posix(), "data/q.jsonl")
self.assertEqual(kwargs["retry_limit"], 2)
self.assertEqual(kwargs["keyword"], "猫咪")
def test_download_video_rejects_webp_response_before_writing_file(self) -> None:
module = importlib.import_module("XHS")
@ -324,6 +355,49 @@ class XhsModuleTests(unittest.TestCase):
)
self.assertEqual(urls, ["https://www.xiaohongshu.com/explore/abc?xsec_token=token&xsec_source="])
def test_extract_note_id_from_url_supports_search_result_detail(self) -> None:
module = importlib.import_module("XHS")
self.assertEqual(
module.extract_note_id_from_url("https://www.xiaohongshu.com/search_result/abc?xsec_token=token"),
"abc",
)
def test_normalize_note_urls_preserves_tokenized_search_result_url(self) -> None:
module = importlib.import_module("XHS")
urls = module.normalize_note_urls(
[
"https://www.xiaohongshu.com/explore/abc",
"https://www.xiaohongshu.com/search_result/abc?xsec_token=token&xsec_source=",
],
)
self.assertEqual(urls, ["https://www.xiaohongshu.com/search_result/abc?xsec_token=token&xsec_source="])
def test_collect_note_urls_from_page_includes_search_result_links(self) -> None:
module = importlib.import_module("XHS")
page = FakeLinkPage(
[
"https://www.xiaohongshu.com/search_result/abc?xsec_token=token",
"https://www.xiaohongshu.com/explore/def?xsec_token=token2",
]
)
urls = module.collect_note_urls_from_page(page, limit=10)
self.assertEqual(
urls,
[
"https://www.xiaohongshu.com/search_result/abc?xsec_token=token",
"https://www.xiaohongshu.com/explore/def?xsec_token=token2",
],
)
self.assertIn('/search_result/', page.scripts[0])
def test_wait_for_note_urls_from_page_polls_until_links_are_rendered(self) -> None:
module = importlib.import_module("XHS")
page = FakeDelayedLinkPage()
with mock.patch.object(module.time, "sleep") as mocked_sleep:
urls = module.wait_for_note_urls_from_page(page, limit=10, timeout=2, interval=0.1)
self.assertEqual(urls, ["https://www.xiaohongshu.com/search_result/abc?xsec_token=token"])
mocked_sleep.assert_called_once_with(0.1)
def test_filter_unvisited_note_urls_skips_seen_note_ids(self) -> None:
module = importlib.import_module("XHS")
urls = [