Save visible XHS comments by default

This commit is contained in:
wangshaoqing 2026-05-27 19:02:00 +08:00
parent dea08527eb
commit 7944ac1f0c
2 changed files with 139 additions and 1 deletions

83
XHS.py
View File

@ -252,7 +252,7 @@ def build_download_metadata_record(
record["video_source_key"] = candidate.source_key record["video_source_key"] = candidate.source_key
record["downloaded_path"] = output_path.as_posix() record["downloaded_path"] = output_path.as_posix()
record["downloaded_at"] = current_timestamp() record["downloaded_at"] = current_timestamp()
record["comments"] = comments or [] record["comments"] = comments if comments is not None else record.get("comments") or []
if comments_error: if comments_error:
record["comments_error"] = comments_error record["comments_error"] = comments_error
return record return record
@ -741,6 +741,78 @@ def run_human_browse_sequence(page: Any, plan: HumanBrowsePlan) -> None:
time.sleep(plan.settle_wait) time.sleep(plan.settle_wait)
def count_visible_comments(page: Any) -> int:
script = """
const xhsVisibleCommentCount = () => {
return document.querySelectorAll(
'.comments-container .comment-item:not(.comment-item-sub), .comment-item:not(.comment-item-sub)'
).length;
};
return xhsVisibleCommentCount();
"""
try:
count = page.run_js(script)
return int(count or 0)
except Exception:
return 0
def scroll_comment_container(page: Any, distance: int = 720) -> bool:
script = f"""
const xhsScrollCommentContainer = () => {{
const distance = {int(distance)};
const preferred = Array.from(document.querySelectorAll(
'.comments-container, .comments-el, .comment-list, .interaction-container, .note-scroller, .note-detail-mask'
));
const scrollables = Array.from(document.querySelectorAll('*'))
.filter((el) => {{
const className = String(el.className || '').toLowerCase();
const rect = el.getBoundingClientRect();
return el.scrollHeight > el.clientHeight + 20
&& rect.width > 240
&& rect.height > 160
&& /comment|interaction|note|detail|right/.test(className);
}});
const candidates = preferred.concat(scrollables)
.filter((el, index, list) => el && list.indexOf(el) === index)
.filter((el) => el.scrollHeight > el.clientHeight + 20);
const target = candidates[0] || document.scrollingElement || document.documentElement;
if (!target) return false;
target.scrollBy(0, distance);
target.dispatchEvent(new WheelEvent('wheel', {{deltaY: distance, bubbles: true}}));
return true;
}};
return xhsScrollCommentContainer();
"""
try:
return bool(page.run_js(script))
except Exception:
return False
def load_visible_comments(
page: Any,
human_settings: HumanBrowseSettings,
max_comments: int = 20,
timeout: float = 8.0,
) -> bool:
if max_comments <= 0:
return False
deadline = time.monotonic() + max(0.0, timeout)
wait_seconds = 0.3
if human_settings.enabled:
wait_seconds = max(0.1, min(1.5, human_settings.max_wait))
while True:
if count_visible_comments(page) > 0:
return True
if time.monotonic() >= deadline:
return False
scroll_comment_container(page)
time.sleep(wait_seconds)
def human_pause(settings: HumanBrowseSettings, random_module: Any = random) -> None: def human_pause(settings: HumanBrowseSettings, random_module: Any = random) -> None:
if settings.enabled: if settings.enabled:
time.sleep(random_module.uniform(settings.min_wait, settings.max_wait)) time.sleep(random_module.uniform(settings.min_wait, settings.max_wait))
@ -1236,6 +1308,15 @@ def run_queue_download(
human_pause(human_settings) human_pause(human_settings)
if human_settings.enabled: if human_settings.enabled:
run_human_browse_sequence(page, create_human_browse_plan(human_settings)) run_human_browse_sequence(page, create_human_browse_plan(human_settings))
if max_comments > 0:
loaded_comments = load_visible_comments(
page,
human_settings=human_settings,
max_comments=max_comments,
timeout=min(8.0, max(2.0, float(timeout))),
)
if loaded_comments:
print("[INFO] 已检测到可见评论,开始保存评论元数据。")
page_html = page.run_js("return document.documentElement.outerHTML") page_html = page.run_js("return document.documentElement.outerHTML")
metadata = merge_metadata( metadata = merge_metadata(
extract_note_metadata_from_html(page_html, note_id=record.note_id), extract_note_metadata_from_html(page_html, note_id=record.note_id),

View File

@ -38,6 +38,21 @@ class FakeScrollPage:
self.scripts.append(script) self.scripts.append(script)
class FakeDelayedCommentPage:
def __init__(self):
self.comment_checks = 0
self.scroll_scripts = []
def run_js(self, script):
if "xhsVisibleCommentCount" in script:
self.comment_checks += 1
return 1 if self.comment_checks >= 2 else 0
if "xhsScrollCommentContainer" in script:
self.scroll_scripts.append(script)
return True
return None
class FakeLinkPage: class FakeLinkPage:
def __init__(self, links): def __init__(self, links):
self.links = links self.links = links
@ -285,6 +300,35 @@ class XhsModuleTests(unittest.TestCase):
self.assertEqual(record["downloaded_at"], "2026-05-27T17:00:00+0800") self.assertEqual(record["downloaded_at"], "2026-05-27T17:00:00+0800")
self.assertEqual(record["comments"], []) self.assertEqual(record["comments"], [])
def test_build_download_metadata_record_preserves_metadata_comments(self) -> None:
module = importlib.import_module("XHS")
candidate = module.VideoCandidate(
video_id="note123",
title="视频标题",
video_url="https://sns-video.xhscdn.com/a.mp4",
author_name="作者",
source_key="master_url",
)
base_metadata = {
"comments": [
{
"author": "评论用户",
"content": "评论内容",
"liked_count": "9",
"time": "1小时前",
}
]
}
record = module.build_download_metadata_record(
base_metadata=base_metadata,
candidate=candidate,
queue_record=module.QueueRecord("note123", "https://www.xiaohongshu.com/explore/note123", "video-channel"),
output_path=Path("video/a.mp4"),
)
self.assertEqual(record["comments"], base_metadata["comments"])
def test_extract_note_metadata_from_page_uses_visible_dom(self) -> None: def test_extract_note_metadata_from_page_uses_visible_dom(self) -> None:
module = importlib.import_module("XHS") module = importlib.import_module("XHS")
metadata = module.extract_note_metadata_from_page(FakeMetadataPage(), note_id="note123", max_comments=20) metadata = module.extract_note_metadata_from_page(FakeMetadataPage(), note_id="note123", max_comments=20)
@ -618,6 +662,19 @@ class XhsModuleTests(unittest.TestCase):
self.assertIn("scrollBy(0, distance)", page.scripts[0]) self.assertIn("scrollBy(0, distance)", page.scripts[0])
mocked_sleep.assert_has_calls([mock.call(2.5), mock.call(1.5), mock.call(3.0)]) mocked_sleep.assert_has_calls([mock.call(2.5), mock.call(1.5), mock.call(3.0)])
def test_load_visible_comments_scrolls_until_comment_dom_exists(self) -> None:
module = importlib.import_module("XHS")
page = FakeDelayedCommentPage()
settings = module.HumanBrowseSettings(enabled=True, min_wait=0.1, max_wait=0.1)
with mock.patch.object(module.time, "sleep") as mocked_sleep:
loaded = module.load_visible_comments(page, human_settings=settings, max_comments=20, timeout=1.0)
self.assertTrue(loaded)
self.assertGreaterEqual(page.comment_checks, 2)
self.assertEqual(len(page.scroll_scripts), 1)
mocked_sleep.assert_called_once_with(0.1)
def test_should_take_long_break_uses_every_n_downloads(self) -> None: def test_should_take_long_break_uses_every_n_downloads(self) -> None:
module = importlib.import_module("XHS") module = importlib.import_module("XHS")
settings = module.HumanBrowseSettings(long_break_every=4) settings = module.HumanBrowseSettings(long_break_every=4)