diff --git a/XHS.py b/XHS.py index 63c490a..118f1fa 100644 --- a/XHS.py +++ b/XHS.py @@ -454,43 +454,71 @@ def collect_videos_from_explore_cards( seen_urls: set[str], seen_files: set[Path], ) -> int: - note_urls = collect_note_urls_from_page(page, limit=max_videos * 8) downloaded = 0 - for note_url in note_urls: - if downloaded >= max_videos: - break - page.get(note_url) + visited_note_ids: set[str] = set() + empty_rounds = 0 + max_empty_rounds = 4 + + while downloaded < max_videos and empty_rounds < max_empty_rounds: + page.get(start_url) time.sleep(2) - candidates = group_video_candidates( - extract_video_candidates_from_html( - page.run_js("return document.documentElement.outerHTML"), - video_id=extract_note_id_from_url(note_url), - ) + note_urls = filter_unvisited_note_urls( + collect_note_urls_from_page(page, limit=max_videos * 12), + visited_note_ids, ) - if not candidates: + if not note_urls: + empty_rounds += 1 + scroll_feed(page) continue - for candidate in candidates: + + round_downloaded = 0 + for note_url in note_urls: if downloaded >= max_videos: break - if candidate.video_url in seen_urls: - continue - seen_urls.add(candidate.video_url) - output_path = build_output_path(candidate, output_dir=output_dir) - if output_path in seen_files or output_path.exists(): - continue - try: - download_video( - requests_module=requests_module, - headers=build_headers(start_url), - video_url=candidate.video_url, - output_path=output_path, + note_id = extract_note_id_from_url(note_url) + visited_note_ids.add(note_id) + page.get(note_url) + time.sleep(2) + candidates = group_video_candidates( + extract_video_candidates_from_html( + page.run_js("return document.documentElement.outerHTML"), + video_id=note_id, ) - except Exception as exc: - print(f"[WARN] 下载失败 {candidate.video_id}: {exc}") + ) + if not candidates: continue - downloaded += 1 - seen_files.add(output_path) - print(f"[OK] 已保存: {output_path}") + for candidate in candidates: + if downloaded >= max_videos: + break + if candidate.video_url in seen_urls: + continue + seen_urls.add(candidate.video_url) + output_path = build_output_path(candidate, output_dir=output_dir) + if output_path in seen_files or output_path.exists(): + continue + try: + download_video( + requests_module=requests_module, + headers=build_headers(start_url), + video_url=candidate.video_url, + output_path=output_path, + ) + except Exception as exc: + print(f"[WARN] 下载失败 {candidate.video_id}: {exc}") + continue + downloaded += 1 + round_downloaded += 1 + seen_files.add(output_path) + print(f"[OK] 已保存: {output_path}") + + if round_downloaded == 0: + empty_rounds += 1 + else: + empty_rounds = 0 + if downloaded < max_videos: + page.get(start_url) + time.sleep(1) + scroll_feed(page) return downloaded @@ -529,6 +557,10 @@ return Array.from(document.querySelectorAll('a[href*="/explore/"]')) return normalize_note_urls([str(url) for url in raw_urls])[:limit] +def filter_unvisited_note_urls(urls: list[str], visited_note_ids: set[str]) -> list[str]: + return [url for url in urls if extract_note_id_from_url(url) not in visited_note_ids] + + def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="附着到已登录小红书 Chrome,监听 feed 响应并下载视频") parser.add_argument("--max-videos", type=int, default=10, help="最多下载视频数量,默认 10") diff --git a/test_xhs.py b/test_xhs.py index cd1736c..34a3275 100644 --- a/test_xhs.py +++ b/test_xhs.py @@ -273,6 +273,17 @@ class XhsModuleTests(unittest.TestCase): ) self.assertEqual(urls, ["https://www.xiaohongshu.com/explore/abc?xsec_token=token&xsec_source="]) + def test_filter_unvisited_note_urls_skips_seen_note_ids(self) -> None: + module = importlib.import_module("XHS") + urls = [ + "https://www.xiaohongshu.com/explore/abc?xsec_token=token", + "https://www.xiaohongshu.com/explore/def?xsec_token=token", + ] + self.assertEqual( + module.filter_unvisited_note_urls(urls, {"abc"}), + ["https://www.xiaohongshu.com/explore/def?xsec_token=token"], + ) + if __name__ == "__main__": unittest.main()