import importlib import tempfile import unittest from pathlib import Path from unittest import mock class FakeResponse: def __init__(self, body, raw_body): self.body = body self.raw_body = raw_body class FakeDownloadResponse: def __init__(self, content: bytes, content_type: str = "video/mp4", status_code: int = 200): self.content = content self.headers = {"content-type": content_type} self.status_code = status_code def raise_for_status(self) -> None: if self.status_code >= 400: raise RuntimeError(f"HTTP {self.status_code}") class FakeRequests: def __init__(self, response: FakeDownloadResponse): self.response = response def get(self, video_url, headers, timeout): return self.response class FakeScrollPage: def __init__(self): self.scripts = [] def run_js(self, script): self.scripts.append(script) class FakeLinkPage: def __init__(self, links): self.links = links self.scripts = [] def run_js(self, script): self.scripts.append(script) return self.links class FakeDelayedLinkPage: def __init__(self): self.calls = 0 def run_js(self, script): self.calls += 1 if self.calls == 1: return [] return ["https://www.xiaohongshu.com/search_result/abc?xsec_token=token"] class FakeGrowingLinkPage: def __init__(self): self.collect_calls = 0 def run_js(self, script): if "querySelectorAll" not in script: return None self.collect_calls += 1 if self.collect_calls == 1: return ["https://www.xiaohongshu.com/search_result/one?xsec_token=token1"] return [ "https://www.xiaohongshu.com/search_result/one?xsec_token=token1", "https://www.xiaohongshu.com/search_result/two?xsec_token=token2", ] class FakeVideoOnlyLinkPage: def __init__(self): self.scripts = [] def run_js(self, script): self.scripts.append(script) if "play-icon" in script: return ["https://www.xiaohongshu.com/search_result/video?xsec_token=video-token"] return [ "https://www.xiaohongshu.com/search_result/image?xsec_token=image-token", "https://www.xiaohongshu.com/search_result/video?xsec_token=video-token", ] class XhsModuleTests(unittest.TestCase): def test_module_can_import_without_optional_runtime_dependencies(self) -> None: module = importlib.import_module("XHS") self.assertIsNotNone(module) def test_sanitize_filename_removes_invalid_characters(self) -> None: module = importlib.import_module("XHS") self.assertEqual( module.sanitize_filename('a/b:c*?d"eg|h\n'), "a_b_c__d_e_f_g_h", ) def test_truncate_utf8_bytes_keeps_valid_utf8(self) -> None: module = importlib.import_module("XHS") result = module.truncate_utf8_bytes("测试标题" * 20, 20) self.assertLessEqual(len(result.encode("utf-8")), 20) result.encode("utf-8") def test_choose_video_url_prefers_master_url(self) -> None: module = importlib.import_module("XHS") candidates = [ module.VideoCandidate( video_id="note-1", title="标题", video_url="https://example.com/backup.mp4", author_name="作者", source_key="backup_urls", ), module.VideoCandidate( video_id="note-1", title="标题", video_url="https://sns-video.xhscdn.com/master.mp4", author_name="作者", source_key="master_url", ), ] self.assertEqual(module.choose_video_candidate(candidates).video_url, "https://sns-video.xhscdn.com/master.mp4") def test_group_video_candidates_keeps_one_preferred_candidate_per_video_id(self) -> None: module = importlib.import_module("XHS") candidates = [ module.VideoCandidate("note-1", "标题", "https://example.com/backup.mp4", "作者", "backup_urls"), module.VideoCandidate("note-1", "标题", "https://example.com/master.mp4", "作者", "master_url"), module.VideoCandidate("note-2", "标题2", "https://example.com/two.mp4", "作者", "master_url"), ] grouped = module.group_video_candidates(candidates) self.assertEqual([item.video_id for item in grouped], ["note-1", "note-2"]) self.assertEqual(grouped[0].video_url, "https://example.com/master.mp4") def test_extract_video_candidates_from_nested_feed_payload(self) -> None: module = importlib.import_module("XHS") payload = { "data": { "items": [ { "id": "note123", "note_card": { "display_title": "海边日落", "user": {"nickname": "摄影师"}, "video": { "media": { "stream": { "h264": [ { "master_url": "https://sns-video.xhscdn.com/stream/a.mp4?sign=1", "backup_urls": [ "https://sns-video.xhscdn.com/stream/a-backup.mp4?sign=1" ], } ] } } }, }, } ] } } candidates = module.extract_video_candidates(payload) self.assertEqual(len(candidates), 2) self.assertEqual(candidates[0].video_id, "note123") self.assertEqual(candidates[0].title, "海边日落") self.assertEqual(candidates[0].author_name, "摄影师") self.assertEqual(candidates[0].source_key, "master_url") def test_extract_video_candidates_ignores_plain_image_url_fields(self) -> None: module = importlib.import_module("XHS") payload = { "id": "note-image", "display_title": "图片笔记", "user": {"nickname": "作者"}, "image_list": [ {"url": "https://sns-img.xhscdn.com/example.webp"}, {"url": "https://sns-img.xhscdn.com/example.jpg"}, ], } self.assertEqual(module.extract_video_candidates(payload), []) def test_extract_video_candidates_from_escaped_html_state(self) -> None: module = importlib.import_module("XHS") html = ( '' ) candidates = module.extract_video_candidates_from_html(html, video_id="note123") self.assertEqual(len(candidates), 1) self.assertEqual(candidates[0].video_id, "note123") self.assertEqual(candidates[0].video_url, "http://sns-video-qc.xhscdn.com/stream/a.mp4?sign=1&t=2") self.assertEqual(candidates[0].source_key, "html_master_url") def test_build_output_path_uses_author_title_and_video_id(self) -> None: module = importlib.import_module("XHS") candidate = module.VideoCandidate( video_id="note123", title="海边日落", video_url="https://sns-video.xhscdn.com/a.mp4", author_name="摄影师", source_key="master_url", ) output_path = module.build_output_path(candidate) self.assertEqual(output_path.as_posix(), "video/[摄影师]海边日落-note123.mp4") def test_build_browser_address_from_port(self) -> None: module = importlib.import_module("XHS") self.assertEqual(module.build_browser_address(9223), "127.0.0.1:9223") self.assertIsNone(module.build_browser_address(None)) def test_ensure_browser_debug_port_ready_accepts_open_port(self) -> None: module = importlib.import_module("XHS") connection = mock.MagicMock() connection.__enter__.return_value = connection connection.__exit__.return_value = False with mock.patch.object(module.socket, "create_connection", return_value=connection) as mocked_connect: module.ensure_browser_debug_port_ready(9223) mocked_connect.assert_called_once() def test_ensure_browser_debug_port_ready_rejects_closed_port(self) -> None: module = importlib.import_module("XHS") with mock.patch.object(module.socket, "create_connection", side_effect=OSError("boom")): with self.assertRaisesRegex(RuntimeError, "login_xhs.py"): module.ensure_browser_debug_port_ready(9223) def test_extract_feed_payload_uses_dict_body(self) -> None: module = importlib.import_module("XHS") response = FakeResponse({"data": {"items": []}}, "") self.assertEqual(module.extract_feed_payload(response), {"data": {"items": []}}) def test_extract_feed_payload_falls_back_to_raw_json(self) -> None: module = importlib.import_module("XHS") response = FakeResponse("", '{"data": {"items": [{"id": "1"}]}}') self.assertEqual( module.extract_feed_payload(response), {"data": {"items": [{"id": "1"}]}}, ) def test_build_parser_uses_expected_defaults(self) -> None: module = importlib.import_module("XHS") args = module.build_parser().parse_args([]) self.assertEqual(args.max_videos, 10) self.assertEqual(args.output_dir, "video") self.assertEqual(args.browser_port, 9223) self.assertEqual(args.timeout, 20) self.assertEqual(args.start_url, module.DEFAULT_EXPLORE_URL) self.assertFalse(args.use_current_page) self.assertTrue(args.human_mode) self.assertEqual(args.min_wait, 2.0) self.assertEqual(args.max_wait, 6.0) self.assertEqual(args.long_break_every, 4) self.assertEqual(args.max_runtime, 0.0) self.assertEqual(args.source, "explore") self.assertIsNone(args.queue_file) self.assertEqual(args.target_videos, 0) self.assertEqual(args.retry_limit, 1) def test_main_invokes_collect_videos_with_cli_values(self) -> None: module = importlib.import_module("XHS") with mock.patch.object(module, "collect_videos", return_value=3) as mocked_collect: exit_code = module.main( [ "--max-videos", "3", "--output-dir", "downloads", "--browser-port", "9334", "--timeout", "7", "--start-url", "https://www.xiaohongshu.com/explore", ] ) self.assertEqual(exit_code, 0) mocked_collect.assert_called_once() _, kwargs = mocked_collect.call_args self.assertEqual(kwargs["max_videos"], 3) self.assertEqual(kwargs["output_dir"].as_posix(), "downloads") self.assertEqual(kwargs["browser_port"], 9334) self.assertEqual(kwargs["timeout"], 7) self.assertFalse(kwargs["use_current_page"]) self.assertTrue(kwargs["human_mode"]) def test_build_source_url_supports_video_channel_and_explore(self) -> None: module = importlib.import_module("XHS") self.assertEqual(module.build_source_url("explore"), module.DEFAULT_EXPLORE_URL) self.assertEqual( module.build_source_url("video-channel"), "https://www.xiaohongshu.com/explore?channel_id=video", ) def test_build_source_url_supports_encoded_search_keyword(self) -> None: module = importlib.import_module("XHS") self.assertEqual( module.build_source_url("search", keyword="猫咪 搞笑"), "https://www.xiaohongshu.com/search_result?keyword=%E7%8C%AB%E5%92%AA%20%E6%90%9E%E7%AC%91&source=web_search_result_notes&type=51", ) def test_main_invokes_queue_mode_when_queue_file_is_provided(self) -> None: module = importlib.import_module("XHS") with mock.patch.object(module, "run_queue_download", return_value=5) as mocked_run: exit_code = module.main( [ "--source", "video-channel", "--target-videos", "1000", "--queue-file", "data/q.jsonl", "--retry-limit", "2", "--keyword", "猫咪", ] ) self.assertEqual(exit_code, 0) mocked_run.assert_called_once() _, kwargs = mocked_run.call_args self.assertEqual(kwargs["source"], "video-channel") self.assertEqual(kwargs["target_videos"], 1000) self.assertEqual(kwargs["queue_file"].as_posix(), "data/q.jsonl") self.assertEqual(kwargs["retry_limit"], 2) self.assertEqual(kwargs["keyword"], "猫咪") def test_download_video_rejects_webp_response_before_writing_file(self) -> None: module = importlib.import_module("XHS") response = FakeDownloadResponse(b"RIFF....WEBP", content_type="image/webp") with self.assertRaisesRegex(ValueError, "非视频响应"): module.download_video( requests_module=FakeRequests(response), headers={}, video_url="https://sns-img.xhscdn.com/example.webp", output_path=mock.MagicMock(), ) def test_download_video_accepts_mp4_signature(self) -> None: module = importlib.import_module("XHS") output_path = mock.MagicMock() output_path.parent.mkdir = mock.MagicMock() output_path.write_bytes = mock.MagicMock() response = FakeDownloadResponse(b"\x00\x00\x00\x18ftypmp42payload", content_type="application/octet-stream") module.download_video( requests_module=FakeRequests(response), headers={}, video_url="https://sns-video.xhscdn.com/example.mp4", output_path=output_path, ) output_path.write_bytes.assert_called_once_with(b"\x00\x00\x00\x18ftypmp42payload") def test_normalize_note_urls_deduplicates_explore_links(self) -> None: module = importlib.import_module("XHS") urls = module.normalize_note_urls( [ "https://www.xiaohongshu.com/explore/abc", "https://www.xiaohongshu.com/explore/abc?xsec_token=token", "/explore/def?xsec_token=token", "https://www.xiaohongshu.com/user/profile/123", ] ) self.assertEqual( urls, [ "https://www.xiaohongshu.com/explore/abc?xsec_token=token", "https://www.xiaohongshu.com/explore/def?xsec_token=token", ], ) def test_normalize_note_urls_prefers_xsec_token_url_for_same_note(self) -> None: module = importlib.import_module("XHS") urls = module.normalize_note_urls( [ "https://www.xiaohongshu.com/explore/abc", "https://www.xiaohongshu.com/explore/abc?xsec_token=token&xsec_source=", ], ) self.assertEqual(urls, ["https://www.xiaohongshu.com/explore/abc?xsec_token=token&xsec_source="]) def test_extract_note_id_from_url_supports_search_result_detail(self) -> None: module = importlib.import_module("XHS") self.assertEqual( module.extract_note_id_from_url("https://www.xiaohongshu.com/search_result/abc?xsec_token=token"), "abc", ) def test_normalize_note_urls_preserves_tokenized_search_result_url(self) -> None: module = importlib.import_module("XHS") urls = module.normalize_note_urls( [ "https://www.xiaohongshu.com/explore/abc", "https://www.xiaohongshu.com/search_result/abc?xsec_token=token&xsec_source=", ], ) self.assertEqual(urls, ["https://www.xiaohongshu.com/search_result/abc?xsec_token=token&xsec_source="]) def test_collect_note_urls_from_page_includes_search_result_links(self) -> None: module = importlib.import_module("XHS") page = FakeLinkPage( [ "https://www.xiaohongshu.com/search_result/abc?xsec_token=token", "https://www.xiaohongshu.com/explore/def?xsec_token=token2", ] ) urls = module.collect_note_urls_from_page(page, limit=10) self.assertEqual( urls, [ "https://www.xiaohongshu.com/search_result/abc?xsec_token=token", "https://www.xiaohongshu.com/explore/def?xsec_token=token2", ], ) self.assertIn('/search_result/', page.scripts[0]) def test_collect_note_urls_from_page_can_filter_video_cards(self) -> None: module = importlib.import_module("XHS") page = FakeVideoOnlyLinkPage() urls = module.collect_note_urls_from_page(page, limit=10, video_only=True) self.assertEqual(urls, ["https://www.xiaohongshu.com/search_result/video?xsec_token=video-token"]) self.assertIn("play-icon", page.scripts[0]) def test_wait_for_note_urls_from_page_polls_until_links_are_rendered(self) -> None: module = importlib.import_module("XHS") page = FakeDelayedLinkPage() with mock.patch.object(module.time, "sleep") as mocked_sleep: urls = module.wait_for_note_urls_from_page(page, limit=10, timeout=2, interval=0.1) self.assertEqual(urls, ["https://www.xiaohongshu.com/search_result/abc?xsec_token=token"]) mocked_sleep.assert_called_once_with(0.1) def test_collect_note_urls_with_browse_accumulates_after_scroll(self) -> None: module = importlib.import_module("XHS") page = FakeGrowingLinkPage() settings = module.HumanBrowseSettings(enabled=False) with mock.patch.object(module, "run_human_browse_sequence") as mocked_browse: urls = module.collect_note_urls_with_browse(page, limit=10, human_settings=settings, rounds=2) self.assertEqual( urls, [ "https://www.xiaohongshu.com/search_result/one?xsec_token=token1", "https://www.xiaohongshu.com/search_result/two?xsec_token=token2", ], ) mocked_browse.assert_called_once() def test_filter_unvisited_note_urls_skips_seen_note_ids(self) -> None: module = importlib.import_module("XHS") urls = [ "https://www.xiaohongshu.com/explore/abc?xsec_token=token", "https://www.xiaohongshu.com/explore/def?xsec_token=token", ] self.assertEqual( module.filter_unvisited_note_urls(urls, {"abc"}), ["https://www.xiaohongshu.com/explore/def?xsec_token=token"], ) def test_create_human_browse_plan_uses_wait_and_scroll_ranges(self) -> None: module = importlib.import_module("XHS") settings = module.HumanBrowseSettings( min_wait=2.0, max_wait=6.0, reverse_scroll_probability=1.0, min_scroll=500, max_scroll=1200, ) plan = module.create_human_browse_plan(settings, random_module=module.random.Random(7)) self.assertGreaterEqual(plan.primary_wait, 2.0) self.assertLessEqual(plan.primary_wait, 6.0) self.assertGreaterEqual(plan.down_distance, 500) self.assertLessEqual(plan.down_distance, 1200) self.assertGreater(plan.reverse_distance, 0) def test_run_human_browse_sequence_scrolls_and_waits(self) -> None: module = importlib.import_module("XHS") page = FakeScrollPage() plan = module.HumanBrowsePlan( down_distance=800, primary_wait=2.5, reverse_distance=200, reverse_wait=1.5, settle_wait=3.0, ) with mock.patch.object(module.time, "sleep") as mocked_sleep: module.run_human_browse_sequence(page, plan) self.assertIn("const distance = 800;", page.scripts[0]) self.assertIn("const distance = -200;", page.scripts[1]) self.assertIn("const distance = 400;", page.scripts[2]) self.assertIn("scrollBy(0, distance)", page.scripts[0]) mocked_sleep.assert_has_calls([mock.call(2.5), mock.call(1.5), mock.call(3.0)]) def test_should_take_long_break_uses_every_n_downloads(self) -> None: module = importlib.import_module("XHS") settings = module.HumanBrowseSettings(long_break_every=4) self.assertFalse(module.should_take_long_break(0, settings)) self.assertFalse(module.should_take_long_break(3, settings)) self.assertTrue(module.should_take_long_break(4, settings)) self.assertTrue(module.should_take_long_break(8, settings)) def test_queue_round_trip_jsonl(self) -> None: module = importlib.import_module("XHS") with tempfile.TemporaryDirectory() as temp_dir: queue_path = Path(temp_dir) / "queue.jsonl" records = [ module.QueueRecord( note_id="note1", url="https://www.xiaohongshu.com/explore/note1?xsec_token=a", source="video-channel", ) ] module.save_queue(queue_path, records) loaded = module.load_queue(queue_path) self.assertEqual(loaded, records) def test_merge_note_urls_into_queue_deduplicates_existing_notes(self) -> None: module = importlib.import_module("XHS") records = [ module.QueueRecord( note_id="note1", url="https://www.xiaohongshu.com/explore/note1?xsec_token=a", source="explore", status="downloaded", ) ] merged = module.merge_note_urls_into_queue( records, [ "https://www.xiaohongshu.com/explore/note1?xsec_token=a", "https://www.xiaohongshu.com/explore/note2?xsec_token=b", ], source="video-channel", ) self.assertEqual([record.note_id for record in merged], ["note1", "note2"]) self.assertEqual(merged[0].status, "downloaded") self.assertEqual(merged[1].status, "pending") def test_count_queue_status_counts_records_by_status(self) -> None: module = importlib.import_module("XHS") records = [ module.QueueRecord("one", "url1", "source", status="downloaded"), module.QueueRecord("two", "url2", "source", status="failed"), module.QueueRecord("three", "url3", "source", status="downloaded"), ] self.assertEqual( module.count_queue_status(records), {"downloaded": 2, "failed": 1}, ) def test_mark_queue_record_downloaded_updates_status_and_path(self) -> None: module = importlib.import_module("XHS") record = module.QueueRecord("note1", "url", "source") updated = module.mark_queue_record_downloaded(record, Path("video/a.mp4")) self.assertEqual(updated.status, "downloaded") self.assertEqual(updated.downloaded_path, "video/a.mp4") self.assertEqual(updated.last_error, "") def test_mark_queue_record_skipped_records_reason(self) -> None: module = importlib.import_module("XHS") record = module.QueueRecord("note1", "url", "source") updated = module.mark_queue_record_skipped(record, "no video") self.assertEqual(updated.status, "skipped_image") self.assertEqual(updated.last_error, "no video") def test_mark_queue_record_failed_respects_retry_limit(self) -> None: module = importlib.import_module("XHS") record = module.QueueRecord("note1", "url", "source", attempts=0) retry = module.mark_queue_record_failed(record, "timeout", retry_limit=2) self.assertEqual(retry.status, "pending") self.assertEqual(retry.attempts, 1) failed = module.mark_queue_record_failed(retry, "timeout", retry_limit=2) self.assertEqual(failed.status, "failed") self.assertEqual(failed.attempts, 2) if __name__ == "__main__": unittest.main()