diff --git a/.gitignore b/.gitignore index b9b14ac..1f786c4 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ __pycache__/ .xhs-chrome-profile/ video/ video_bad_*/ +video_good_*/ diff --git a/XHS.py b/XHS.py index abbf4ca..abfc706 100644 --- a/XHS.py +++ b/XHS.py @@ -10,6 +10,7 @@ import time from dataclasses import dataclass from pathlib import Path from typing import Any +from urllib.parse import urljoin DEFAULT_EXPLORE_URL = "https://www.xiaohongshu.com/explore" DEFAULT_BROWSER_PORT = 9224 @@ -368,6 +369,21 @@ def collect_videos( consecutive_empty = 0 max_consecutive_empty = 6 + if not use_current_page: + downloaded += collect_videos_from_explore_cards( + page=page, + requests_module=requests_module, + output_dir=output_dir, + max_videos=max_videos, + start_url=start_url, + seen_urls=seen_urls, + seen_files=seen_files, + ) + if downloaded >= max_videos: + return downloaded + page.get(start_url) + time.sleep(2) + while downloaded < max_videos and consecutive_empty < max_consecutive_empty: packet = wait_for_feed_packet(page, timeout=timeout) if packet is None: @@ -429,6 +445,55 @@ def collect_videos( return downloaded +def collect_videos_from_explore_cards( + page: Any, + requests_module: Any, + output_dir: Path, + max_videos: int, + start_url: str, + seen_urls: set[str], + seen_files: set[Path], +) -> int: + note_urls = collect_note_urls_from_page(page, limit=max_videos * 8) + downloaded = 0 + for note_url in note_urls: + if downloaded >= max_videos: + break + page.get(note_url) + time.sleep(2) + candidates = group_video_candidates( + extract_video_candidates_from_html( + page.run_js("return document.documentElement.outerHTML"), + video_id=extract_note_id_from_url(note_url), + ) + ) + if not candidates: + continue + for candidate in candidates: + if downloaded >= max_videos: + break + if candidate.video_url in seen_urls: + continue + seen_urls.add(candidate.video_url) + output_path = build_output_path(candidate, output_dir=output_dir) + if output_path in seen_files or output_path.exists(): + continue + try: + download_video( + requests_module=requests_module, + headers=build_headers(start_url), + video_url=candidate.video_url, + output_path=output_path, + ) + except Exception as exc: + print(f"[WARN] 下载失败 {candidate.video_id}: {exc}") + continue + downloaded += 1 + seen_files.add(output_path) + print(f"[OK] 已保存: {output_path}") + return downloaded + + def extract_note_id_from_url(url: str) -> str: match = re.search(r"/explore/([^/?#]+)", url) if match: @@ -436,6 +501,34 @@ def extract_note_id_from_url(url: str) -> str: return "current-page" +def normalize_note_urls(urls: list[str]) -> list[str]: + by_note_id: dict[str, str] = {} + order: list[str] = [] + for url in urls: + full_url = urljoin("https://www.xiaohongshu.com", str(url)) + note_id = extract_note_id_from_url(full_url) + if note_id == "current-page": + continue + if note_id not in by_note_id: + order.append(note_id) + by_note_id[note_id] = f"https://www.xiaohongshu.com/explore/{note_id}?xsec_source=pc_feed" + if "xsec_token=" in full_url: + by_note_id[note_id] = full_url + return [by_note_id[note_id] for note_id in order] + + +def collect_note_urls_from_page(page: Any, limit: int) -> list[str]: + script = """ +return Array.from(document.querySelectorAll('a[href*="/explore/"]')) + .map((a) => a.href || a.getAttribute('href') || '') + .filter(Boolean); +""" + raw_urls = page.run_js(script) or [] + if not isinstance(raw_urls, list): + return [] + return normalize_note_urls([str(url) for url in raw_urls])[:limit] + + def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="附着到已登录小红书 Chrome,监听 feed 响应并下载视频") parser.add_argument("--max-videos", type=int, default=10, help="最多下载视频数量,默认 10") diff --git a/test_xhs.py b/test_xhs.py index 4132d46..d3e0893 100644 --- a/test_xhs.py +++ b/test_xhs.py @@ -245,6 +245,34 @@ class XhsModuleTests(unittest.TestCase): ) output_path.write_bytes.assert_called_once_with(b"\x00\x00\x00\x18ftypmp42payload") + def test_normalize_note_urls_deduplicates_explore_links(self) -> None: + module = importlib.import_module("XHS") + urls = module.normalize_note_urls( + [ + "https://www.xiaohongshu.com/explore/abc", + "https://www.xiaohongshu.com/explore/abc?xsec_token=token", + "/explore/def?xsec_token=token", + "https://www.xiaohongshu.com/user/profile/123", + ] + ) + self.assertEqual( + urls, + [ + "https://www.xiaohongshu.com/explore/abc?xsec_token=token", + "https://www.xiaohongshu.com/explore/def?xsec_token=token", + ], + ) + + def test_normalize_note_urls_prefers_xsec_token_url_for_same_note(self) -> None: + module = importlib.import_module("XHS") + urls = module.normalize_note_urls( + [ + "https://www.xiaohongshu.com/explore/abc", + "https://www.xiaohongshu.com/explore/abc?xsec_token=token&xsec_source=", + ], + ) + self.assertEqual(urls, ["https://www.xiaohongshu.com/explore/abc?xsec_token=token&xsec_source="]) + if __name__ == "__main__": unittest.main()