feat: implement collect_recommendations() for For You page

This commit is contained in:
wangshaoqing 2026-05-06 17:24:59 +08:00
parent 5ba771f882
commit 340293deba
2 changed files with 134 additions and 1 deletions

View File

@ -387,6 +387,99 @@ def collect_videos(
return downloaded
def collect_recommendations(
max_videos: int,
timeout: int,
output_dir: Path,
browser_port: int | None,
) -> int:
requests_module, chromium_page_cls, chromium_options_cls = import_runtime_dependencies()
headers = build_headers("https://www.douyin.com/")
if browser_port is not None:
ensure_browser_debug_port_ready(browser_port)
page = create_page(chromium_page_cls, chromium_options_cls, browser_port)
page.listen.start(LISTEN_TARGET)
print("[INFO] 正在打开抖音推荐流。若出现登录或验证码,请先在浏览器窗口里完成。")
page.get("https://www.douyin.com/")
time.sleep(3)
downloaded = 0
seen_ids: set[str] = set()
consecutive_empty = 0
max_consecutive_empty = 3
while downloaded < max_videos:
packet = wait_for_aweme_packet(page, timeout=timeout)
if packet is None:
consecutive_empty += 1
if consecutive_empty >= max_consecutive_empty:
print("[INFO] 连续多次未获取到新数据,结束抓取。")
break
scroll_to_next_page(page)
continue
try:
payload = extract_aweme_payload(packet.response)
items = parse_aweme_items(payload)
except Exception as exc:
print(f"[WARN] 解析接口数据失败: {exc}")
consecutive_empty += 1
if consecutive_empty >= max_consecutive_empty:
break
scroll_to_next_page(page)
continue
if not items:
consecutive_empty += 1
if consecutive_empty >= max_consecutive_empty:
break
scroll_to_next_page(page)
continue
consecutive_empty = 0
new_items_in_batch = 0
for item in items:
if item["video_id"] in seen_ids:
continue
if downloaded >= max_videos:
break
seen_ids.add(item["video_id"])
output_path = build_output_path(
title=item["title"],
video_id=item["video_id"],
output_dir=output_dir,
author_name=item.get("author_name"),
)
try:
download_video(
requests_module=requests_module,
headers=headers,
video_url=item["video_url"],
output_path=output_path,
)
except Exception as exc:
print(f"[WARN] 下载失败 {item['video_id']}: {exc}")
continue
downloaded += 1
new_items_in_batch += 1
print(f"[OK] 已保存: {output_path}")
if new_items_in_batch == 0:
consecutive_empty += 1
if consecutive_empty >= max_consecutive_empty:
break
scroll_to_next_page(page)
return downloaded
def collect_single_video(
target: ResolvedTarget,
timeout: int,

View File

@ -31,12 +31,16 @@ class FakeListener:
def __init__(self, packet):
self.packet = packet
self.started_targets = []
self.call_count = 0
def start(self, target):
self.started_targets.append(target)
def wait(self, timeout):
return self.packet
self.call_count += 1
if self.call_count == 1:
return self.packet
return None
class FakeRuntimePage:
@ -294,6 +298,42 @@ class DouyinModuleTests(unittest.TestCase):
"https://www.douyin.com/video/7619989983668240802",
)
def test_collect_recommendations_downloads_videos_with_author_prefix(self) -> None:
module = importlib.import_module("Douyin")
packet = FakePacket(
{
"aweme_list": [
{
"aweme_id": "7619989983668240802",
"desc": "推荐视频1",
"author": {"nickname": "博主A", "uid": "111"},
"video": {
"play_addr": {
"url_list": ["https://v26-web.douyinvod.com/example/video1.mp4"]
}
},
}
]
}
)
page = FakeRuntimePage("https://www.douyin.com/", packet)
with mock.patch.object(module, "import_runtime_dependencies", return_value=(object(), object(), object())):
with mock.patch.object(module, "create_page", return_value=page):
with mock.patch.object(module, "download_video") as mocked_download:
with mock.patch.object(module, "scroll_to_next_page"):
downloaded = module.collect_recommendations(
max_videos=50,
timeout=10,
output_dir=module.Path("video"),
browser_port=None,
)
self.assertEqual(downloaded, 1)
# 验证文件名包含博主前缀
call_kwargs = mocked_download.call_args[1]
self.assertIn("[博主A]", str(call_kwargs["output_path"]))
def test_collect_single_video_downloads_exactly_one_file_for_video_url_target(self) -> None:
module = importlib.import_module("Douyin")
packet = FakePacket(