From a43168d5a9c23c4ef094ed982e9a86a5e57c2abf Mon Sep 17 00:00:00 2001 From: wangshaoqing Date: Wed, 27 May 2026 17:35:30 +0800 Subject: [PATCH] Filter search queue to video cards --- XHS.py | 57 ++++++++++++++++++++++++++++++++++++++++++++++++----- test_xhs.py | 52 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 5 deletions(-) diff --git a/XHS.py b/XHS.py index b991bfc..87c3c77 100644 --- a/XHS.py +++ b/XHS.py @@ -674,8 +674,17 @@ def normalize_note_urls(urls: list[str]) -> list[str]: return [by_note_id[note_id] for note_id in order] -def collect_note_urls_from_page(page: Any, limit: int) -> list[str]: - script = """ +def collect_note_urls_from_page(page: Any, limit: int, video_only: bool = False) -> list[str]: + if video_only: + script = """ +return Array.from(document.querySelectorAll('section.note-item')) + .filter((section) => section.querySelector('.play-icon, use[href="#play-s"], use[xlink\\\\:href="#play-s"]')) + .flatMap((section) => Array.from(section.querySelectorAll('a[href*="/explore/"], a[href*="/search_result/"]'))) + .map((a) => a.href || a.getAttribute('href') || '') + .filter(Boolean); +""" + else: + script = """ return Array.from(document.querySelectorAll('a[href*="/explore/"], a[href*="/search_result/"]')) .map((a) => a.href || a.getAttribute('href') || '') .filter(Boolean); @@ -686,15 +695,45 @@ return Array.from(document.querySelectorAll('a[href*="/explore/"], a[href*="/sea return normalize_note_urls([str(url) for url in raw_urls])[:limit] -def wait_for_note_urls_from_page(page: Any, limit: int, timeout: float = 8.0, interval: float = 0.5) -> list[str]: +def wait_for_note_urls_from_page( + page: Any, + limit: int, + timeout: float = 8.0, + interval: float = 0.5, + video_only: bool = False, +) -> list[str]: deadline = time.monotonic() + timeout while True: - note_urls = collect_note_urls_from_page(page, limit=limit) + note_urls = collect_note_urls_from_page(page, limit=limit, video_only=video_only) if note_urls or time.monotonic() >= deadline: return note_urls time.sleep(interval) +def collect_note_urls_with_browse( + page: Any, + limit: int, + human_settings: HumanBrowseSettings, + rounds: int = 3, + video_only: bool = False, +) -> list[str]: + collected: list[str] = [] + seen_note_ids: set[str] = set() + for round_index in range(max(1, rounds)): + note_urls = wait_for_note_urls_from_page(page, limit=limit, video_only=video_only) + for note_url in note_urls: + note_id = extract_note_id_from_url(note_url) + if note_id in seen_note_ids: + continue + seen_note_ids.add(note_id) + collected.append(note_url) + if len(collected) >= limit: + return collected + if round_index < rounds - 1: + run_human_browse_sequence(page, create_human_browse_plan(human_settings)) + return collected + + def filter_unvisited_note_urls(urls: list[str], visited_note_ids: set[str]) -> list[str]: return [url for url in urls if extract_note_id_from_url(url) not in visited_note_ids] @@ -835,7 +874,15 @@ def run_queue_download( else: page.get(source_url) human_pause(human_settings) - note_urls = wait_for_note_urls_from_page(page, limit=max(50, target_videos * 2)) + note_limit = max(50, target_videos * 3 if source == "search" else target_videos * 2) + browse_rounds = 8 if source == "search" else 2 + note_urls = collect_note_urls_with_browse( + page, + limit=note_limit, + human_settings=human_settings, + rounds=browse_rounds, + video_only=source == "search", + ) records = merge_note_urls_into_queue(records, note_urls, source=source) save_queue(queue_file, records) added = len(records) - before_count diff --git a/test_xhs.py b/test_xhs.py index c28cf64..a76be7f 100644 --- a/test_xhs.py +++ b/test_xhs.py @@ -59,6 +59,36 @@ class FakeDelayedLinkPage: return ["https://www.xiaohongshu.com/search_result/abc?xsec_token=token"] +class FakeGrowingLinkPage: + def __init__(self): + self.collect_calls = 0 + + def run_js(self, script): + if "querySelectorAll" not in script: + return None + self.collect_calls += 1 + if self.collect_calls == 1: + return ["https://www.xiaohongshu.com/search_result/one?xsec_token=token1"] + return [ + "https://www.xiaohongshu.com/search_result/one?xsec_token=token1", + "https://www.xiaohongshu.com/search_result/two?xsec_token=token2", + ] + + +class FakeVideoOnlyLinkPage: + def __init__(self): + self.scripts = [] + + def run_js(self, script): + self.scripts.append(script) + if "play-icon" in script: + return ["https://www.xiaohongshu.com/search_result/video?xsec_token=video-token"] + return [ + "https://www.xiaohongshu.com/search_result/image?xsec_token=image-token", + "https://www.xiaohongshu.com/search_result/video?xsec_token=video-token", + ] + + class XhsModuleTests(unittest.TestCase): def test_module_can_import_without_optional_runtime_dependencies(self) -> None: module = importlib.import_module("XHS") @@ -390,6 +420,13 @@ class XhsModuleTests(unittest.TestCase): ) self.assertIn('/search_result/', page.scripts[0]) + def test_collect_note_urls_from_page_can_filter_video_cards(self) -> None: + module = importlib.import_module("XHS") + page = FakeVideoOnlyLinkPage() + urls = module.collect_note_urls_from_page(page, limit=10, video_only=True) + self.assertEqual(urls, ["https://www.xiaohongshu.com/search_result/video?xsec_token=video-token"]) + self.assertIn("play-icon", page.scripts[0]) + def test_wait_for_note_urls_from_page_polls_until_links_are_rendered(self) -> None: module = importlib.import_module("XHS") page = FakeDelayedLinkPage() @@ -398,6 +435,21 @@ class XhsModuleTests(unittest.TestCase): self.assertEqual(urls, ["https://www.xiaohongshu.com/search_result/abc?xsec_token=token"]) mocked_sleep.assert_called_once_with(0.1) + def test_collect_note_urls_with_browse_accumulates_after_scroll(self) -> None: + module = importlib.import_module("XHS") + page = FakeGrowingLinkPage() + settings = module.HumanBrowseSettings(enabled=False) + with mock.patch.object(module, "run_human_browse_sequence") as mocked_browse: + urls = module.collect_note_urls_with_browse(page, limit=10, human_settings=settings, rounds=2) + self.assertEqual( + urls, + [ + "https://www.xiaohongshu.com/search_result/one?xsec_token=token1", + "https://www.xiaohongshu.com/search_result/two?xsec_token=token2", + ], + ) + mocked_browse.assert_called_once() + def test_filter_unvisited_note_urls_skips_seen_note_ids(self) -> None: module = importlib.import_module("XHS") urls = [