kol-insight/backend/app/services/video_analysis.py
zfc 376f0be6b4 feat(backend): 视频分析模块增加缓存优先策略和并发API调用
- SessionPool 新增 get_distinct_configs 方法,支持获取不同配置用于并发调用
- video_analysis 重构为缓存优先策略:数据库有 A3/Cost 数据时直接使用
- 并发 API 调用预分配不同 cookie,避免 session 冲突
- API 数据写回数据库,实现下次查询缓存命中
- 新增 heated_cost 字段追踪
- 测试全面重写,覆盖缓存/API/混合/降级场景
2026-01-29 18:21:50 +08:00

595 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
视频分析服务 (T-024)
实现视频分析数据获取和成本指标计算。
"""
import asyncio
import logging
from datetime import datetime
from typing import Any, Dict, Optional
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.kol_video import KolVideo
from app.services.session_pool import (
get_distinct_configs,
get_random_config,
session_pool,
)
from app.services.yuntu_api import (
SessionInvalidError,
call_yuntu_api,
parse_analysis_response,
)
from app.services.yuntu_api import (
get_video_analysis as fetch_yuntu_analysis,
)
logger = logging.getLogger(__name__)
def _needs_api_call(video: KolVideo) -> bool:
"""
判断是否需要调用 Yuntu API 获取 A3/Cost 数据。
如果数据库中已有 A3 或 Cost 数据,直接使用数据库数据,不调 API。
"""
has_a3 = (video.total_new_a3_cnt or 0) > 0
has_cost = (video.total_cost or 0) > 0
return not (has_a3 or has_cost)
def calculate_cost_metrics(
cost: float,
natural_play_cnt: int,
a3_increase_cnt: int,
natural_a3_increase_cnt: int,
after_view_search_uv: int,
total_play_cnt: int,
) -> Dict[str, Optional[float]]:
"""
计算成本指标。
Args:
cost: 总花费
natural_play_cnt: 自然播放数
a3_increase_cnt: 新增A3
natural_a3_increase_cnt: 自然新增A3
after_view_search_uv: 看后搜人数
total_play_cnt: 总播放数
Returns:
Dict: 成本指标字典
"""
metrics = {}
# CPM = cost / total_play_cnt * 1000
if total_play_cnt and total_play_cnt > 0:
metrics["cpm"] = round(cost / total_play_cnt * 1000, 2)
else:
metrics["cpm"] = None
# 自然CPM = cost / natural_play_cnt * 1000
if natural_play_cnt and natural_play_cnt > 0:
metrics["natural_cpm"] = round(cost / natural_play_cnt * 1000, 2)
else:
metrics["natural_cpm"] = None
# CPA3 = cost / a3_increase_cnt
if a3_increase_cnt and a3_increase_cnt > 0:
metrics["cpa3"] = round(cost / a3_increase_cnt, 2)
else:
metrics["cpa3"] = None
# 自然CPA3 = cost / natural_a3_increase_cnt
if natural_a3_increase_cnt and natural_a3_increase_cnt > 0:
metrics["natural_cpa3"] = round(cost / natural_a3_increase_cnt, 2)
else:
metrics["natural_cpa3"] = None
# CPsearch = cost / after_view_search_uv
if after_view_search_uv and after_view_search_uv > 0:
metrics["cp_search"] = round(cost / after_view_search_uv, 2)
else:
metrics["cp_search"] = None
# 预估自然看后搜人数 = natural_play_cnt / total_play_cnt * after_view_search_uv
if total_play_cnt and total_play_cnt > 0 and after_view_search_uv:
estimated_natural_search_uv = (
natural_play_cnt / total_play_cnt * after_view_search_uv
)
metrics["estimated_natural_search_uv"] = round(estimated_natural_search_uv, 2)
# 自然CPsearch = cost / estimated_natural_search_uv
if estimated_natural_search_uv > 0:
metrics["natural_cp_search"] = round(cost / estimated_natural_search_uv, 2)
else:
metrics["natural_cp_search"] = None
else:
metrics["estimated_natural_search_uv"] = None
metrics["natural_cp_search"] = None
return metrics
async def get_video_base_info(
session: AsyncSession, item_id: str
) -> Optional[KolVideo]:
"""
从数据库获取视频基础信息。
Args:
session: 数据库会话
item_id: 视频ID
Returns:
KolVideo or None
"""
stmt = select(KolVideo).where(KolVideo.item_id == item_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def get_video_analysis_data(
session: AsyncSession, item_id: str
) -> Dict[str, Any]:
"""
获取视频分析数据T-024主接口
返回6大类指标匹配前端 VideoAnalysisData 类型):
- base_info: 基础信息
- reach_metrics: 触达指标
- a3_metrics: A3指标
- search_metrics: 搜索指标
- cost_metrics: 费用指标
- calculated_metrics: 成本指标(实时计算)
Args:
session: 数据库会话
item_id: 视频ID
Returns:
Dict: 完整的视频分析数据
Raises:
ValueError: 视频不存在时抛出
"""
from app.services.brand_api import get_brand_names
# 1. 从数据库获取基础信息
video = await get_video_base_info(session, item_id)
if video is None:
raise ValueError(f"Video not found: {item_id}")
# 2. 获取品牌名称
brand_name = ""
if video.brand_id:
brand_map = await get_brand_names([video.brand_id])
brand_name = brand_map.get(video.brand_id, video.brand_id)
# 3. 获取 A3 数据和 cost缓存优先策略
a3_increase_cnt = 0
ad_a3_increase_cnt = 0
natural_a3_increase_cnt = 0
api_cost = 0.0
ad_cost = 0.0
if not _needs_api_call(video):
# 数据库已有数据,直接使用
logger.info(f"Using DB data for {item_id} (A3/Cost already cached)")
a3_increase_cnt = video.total_new_a3_cnt or 0
ad_a3_increase_cnt = video.heated_new_a3_cnt or 0
natural_a3_increase_cnt = video.natural_new_a3_cnt or 0
api_cost = video.total_cost or 0.0
ad_cost = video.heated_cost or 0.0
else:
# 需要调用 API 获取数据
try:
publish_time = video.publish_time or datetime.now()
industry_id = video.industry_id or ""
api_response = await fetch_yuntu_analysis(
item_id=item_id,
publish_time=publish_time,
industry_id=industry_id,
)
analysis_data = parse_analysis_response(api_response)
a3_increase_cnt = analysis_data.get("a3_increase_cnt", 0)
ad_a3_increase_cnt = analysis_data.get("ad_a3_increase_cnt", 0)
natural_a3_increase_cnt = analysis_data.get("natural_a3_increase_cnt", 0)
api_cost = analysis_data.get("cost", 0)
ad_cost = analysis_data.get("ad_cost", 0)
# 写回数据库
await update_video_a3_metrics(
session=session,
item_id=item_id,
total_new_a3_cnt=int(a3_increase_cnt),
heated_new_a3_cnt=int(ad_a3_increase_cnt),
natural_new_a3_cnt=int(natural_a3_increase_cnt),
total_cost=float(api_cost),
heated_cost=float(ad_cost),
)
logger.info(f"API data fetched and saved to DB for {item_id}")
except Exception as e:
logger.warning(f"API failed for {item_id}: {e}, using DB data")
a3_increase_cnt = video.total_new_a3_cnt or 0
ad_a3_increase_cnt = video.heated_new_a3_cnt or 0
natural_a3_increase_cnt = video.natural_new_a3_cnt or 0
api_cost = video.total_cost or 0.0
ad_cost = video.heated_cost or 0.0
# 4. 数据库字段
estimated_video_cost = video.estimated_video_cost or 0.0
natural_play_cnt = video.natural_play_cnt or 0
heated_play_cnt = video.heated_play_cnt or 0
total_play_cnt = video.total_play_cnt or 0
after_view_search_uv = video.after_view_search_uv or 0
# 5. 计算成本指标
heated_cost = ad_cost
# 预估自然看后搜人数
estimated_natural_search_uv = None
if total_play_cnt > 0 and after_view_search_uv > 0:
estimated_natural_search_uv = round((natural_play_cnt / total_play_cnt) * after_view_search_uv, 2)
# 预估CPM = (total_cost / total_play_cnt) * 1000
estimated_cpm = round((api_cost / total_play_cnt) * 1000, 2) if total_play_cnt > 0 else None
# 预估自然CPM = (estimated_video_cost / natural_play_cnt) * 1000
estimated_natural_cpm = round((estimated_video_cost / natural_play_cnt) * 1000, 2) if natural_play_cnt > 0 else None
# 预估CPA3 = total_cost / a3_increase_cnt
estimated_cp_a3 = round(api_cost / a3_increase_cnt, 2) if a3_increase_cnt > 0 else None
# 预估自然CPA3 = estimated_video_cost / natural_a3_increase_cnt
estimated_natural_cp_a3 = round(estimated_video_cost / natural_a3_increase_cnt, 2) if natural_a3_increase_cnt > 0 else None
# 预估CPsearch = total_cost / after_view_search_uv
estimated_cp_search = round(api_cost / after_view_search_uv, 2) if after_view_search_uv > 0 else None
# 自然CPsearch = estimated_video_cost / estimated_natural_search_uv
estimated_natural_cp_search = round(estimated_video_cost / estimated_natural_search_uv, 2) if estimated_natural_search_uv and estimated_natural_search_uv > 0 else None
# 6. 组装返回数据(匹配前端 VideoAnalysisData 类型)
return {
"base_info": {
"star_nickname": video.star_nickname or "",
"star_unique_id": video.star_unique_id or "",
"vid": video.item_id,
"title": video.title or "",
"create_date": video.publish_time.isoformat() if video.publish_time else None,
"hot_type": video.viral_type or "",
"industry_id": video.industry_id or "",
"brand_id": video.brand_id or "",
"brand_name": brand_name,
"video_url": video.video_url or "",
},
"reach_metrics": {
"natural_play_cnt": natural_play_cnt,
"heated_play_cnt": heated_play_cnt,
"total_play_cnt": total_play_cnt,
"total_interaction_cnt": video.total_interact or 0,
"digg_cnt": video.like_cnt or 0,
"share_cnt": video.share_cnt or 0,
"comment_cnt": video.comment_cnt or 0,
},
"a3_metrics": {
"total_new_a3_cnt": a3_increase_cnt,
"heated_new_a3_cnt": ad_a3_increase_cnt,
"natural_new_a3_cnt": natural_a3_increase_cnt,
},
"search_metrics": {
"back_search_uv": video.return_search_cnt or 0,
"back_search_cnt": video.return_search_cnt or 0,
"after_view_search_uv": after_view_search_uv,
"after_view_search_cnt": after_view_search_uv,
"estimated_natural_search_uv": estimated_natural_search_uv,
},
"cost_metrics": {
"total_cost": api_cost,
"heated_cost": heated_cost,
"estimated_video_cost": estimated_video_cost,
},
"calculated_metrics": {
"estimated_cpm": estimated_cpm,
"estimated_natural_cpm": estimated_natural_cpm,
"estimated_cp_a3": estimated_cp_a3,
"estimated_natural_cp_a3": estimated_natural_cp_a3,
"estimated_cp_search": estimated_cp_search,
"estimated_natural_cp_search": estimated_natural_cp_search,
},
}
async def update_video_a3_metrics(
session: AsyncSession,
item_id: str,
total_new_a3_cnt: int,
heated_new_a3_cnt: int,
natural_new_a3_cnt: int,
total_cost: float,
heated_cost: float = 0.0,
) -> bool:
"""
更新数据库中的A3指标和费用数据 (T-025)。
Args:
session: 数据库会话
item_id: 视频ID
total_new_a3_cnt: 总新增A3
heated_new_a3_cnt: 加热新增A3
natural_new_a3_cnt: 自然新增A3
total_cost: 预估总费用
heated_cost: 预估加热费用
Returns:
bool: 更新是否成功
"""
try:
stmt = (
update(KolVideo)
.where(KolVideo.item_id == item_id)
.values(
total_new_a3_cnt=total_new_a3_cnt,
heated_new_a3_cnt=heated_new_a3_cnt,
natural_new_a3_cnt=natural_new_a3_cnt,
total_cost=total_cost,
heated_cost=heated_cost,
)
)
result = await session.execute(stmt)
await session.commit()
if result.rowcount > 0:
logger.info(f"Updated A3 metrics for video {item_id}")
return True
else:
logger.warning(f"No video found to update: {item_id}")
return False
except Exception as e:
logger.error(f"Failed to update A3 metrics for {item_id}: {e}")
await session.rollback()
return False
async def search_videos_by_star_id(
session: AsyncSession, star_id: str
) -> list[KolVideo]:
"""根据星图ID精准匹配搜索视频列表。"""
stmt = select(KolVideo).where(KolVideo.star_id == star_id)
result = await session.execute(stmt)
return list(result.scalars().all())
async def search_videos_by_unique_id(
session: AsyncSession, unique_id: str
) -> list[KolVideo]:
"""根据达人unique_id精准匹配搜索视频列表。"""
stmt = select(KolVideo).where(KolVideo.star_unique_id == unique_id)
result = await session.execute(stmt)
return list(result.scalars().all())
async def search_videos_by_nickname(
session: AsyncSession, nickname: str
) -> list[KolVideo]:
"""根据达人昵称模糊匹配搜索视频列表。"""
stmt = select(KolVideo).where(KolVideo.star_nickname.ilike(f"%{nickname}%"))
result = await session.execute(stmt)
return list(result.scalars().all())
def _build_video_list_item(
video: KolVideo,
a3_increase_cnt: int,
ad_a3_increase_cnt: int,
natural_a3_increase_cnt: int,
api_cost: float,
brand_name: str,
) -> Dict[str, Any]:
"""构建视频列表项的结果字典。"""
estimated_video_cost = video.estimated_video_cost or 0.0
natural_play_cnt = video.natural_play_cnt or 0
total_play_cnt = video.total_play_cnt or 0
after_view_search_uv = video.after_view_search_uv or 0
estimated_natural_search_uv = None
if total_play_cnt > 0 and after_view_search_uv > 0:
estimated_natural_search_uv = (natural_play_cnt / total_play_cnt) * after_view_search_uv
estimated_natural_cpm = round((estimated_video_cost / natural_play_cnt) * 1000, 2) if natural_play_cnt > 0 else None
estimated_cp_a3 = round(api_cost / a3_increase_cnt, 2) if a3_increase_cnt > 0 else None
estimated_natural_cp_a3 = round(estimated_video_cost / natural_a3_increase_cnt, 2) if natural_a3_increase_cnt > 0 else None
estimated_cp_search = round(api_cost / after_view_search_uv, 2) if after_view_search_uv > 0 else None
estimated_natural_cp_search = round(estimated_video_cost / estimated_natural_search_uv, 2) if estimated_natural_search_uv and estimated_natural_search_uv > 0 else None
return {
"item_id": video.item_id,
"star_nickname": video.star_nickname or "",
"title": video.title or "",
"video_url": video.video_url or "",
"create_date": video.publish_time.isoformat() if video.publish_time else None,
"hot_type": video.viral_type or "",
"industry_id": video.industry_id or "",
"brand_id": video.brand_id or "",
"brand_name": brand_name,
"total_new_a3_cnt": a3_increase_cnt,
"heated_new_a3_cnt": ad_a3_increase_cnt,
"natural_new_a3_cnt": natural_a3_increase_cnt,
"estimated_natural_cpm": estimated_natural_cpm,
"estimated_cp_a3": estimated_cp_a3,
"estimated_natural_cp_a3": estimated_natural_cp_a3,
"estimated_cp_search": estimated_cp_search,
"estimated_natural_cp_search": estimated_natural_cp_search,
}
async def get_video_list_with_a3(
session: AsyncSession, videos: list[KolVideo]
) -> list[Dict[str, Any]]:
"""
获取视频列表的摘要数据。
缓存优先策略:
- 数据库有 A3/Cost 数据 → 直接使用
- 数据库无数据 → 并发调用云图 API预分配不同 cookie→ 写回数据库
"""
from app.services.brand_api import get_brand_names
# 批量获取品牌名称
brand_ids = [video.brand_id for video in videos if video.brand_id]
brand_map = await get_brand_names(brand_ids) if brand_ids else {}
# 分组:已有数据 vs 需要 API 调用
cached_videos: list[tuple[int, KolVideo]] = [] # (原始索引, video)
api_videos: list[tuple[int, KolVideo]] = [] # (原始索引, video)
for idx, video in enumerate(videos):
if _needs_api_call(video):
api_videos.append((idx, video))
else:
cached_videos.append((idx, video))
logger.info(
f"Video list: {len(cached_videos)} cached, {len(api_videos)} need API"
)
# 结果数组(按原始索引填充)
results: list[Optional[Dict[str, Any]]] = [None] * len(videos)
# 组 A直接用数据库数据
for idx, video in cached_videos:
brand_name = brand_map.get(video.brand_id, video.brand_id or "") if video.brand_id else ""
results[idx] = _build_video_list_item(
video=video,
a3_increase_cnt=video.total_new_a3_cnt or 0,
ad_a3_increase_cnt=video.heated_new_a3_cnt or 0,
natural_a3_increase_cnt=video.natural_new_a3_cnt or 0,
api_cost=video.total_cost or 0.0,
brand_name=brand_name,
)
# 组 B并发调用 API预分配不同 cookie
if api_videos:
configs = await get_distinct_configs(len(api_videos))
semaphore = asyncio.Semaphore(5)
# 收集需要写回 DB 的数据(避免并发 session 操作)
pending_updates: list[Dict[str, Any]] = []
async def _fetch_single(
idx: int, video: KolVideo, config: Dict[str, Any]
) -> None:
a3_increase_cnt = 0
ad_a3_increase_cnt = 0
natural_a3_increase_cnt = 0
api_cost = 0.0
ad_cost_val = 0.0
api_success = False
async with semaphore:
try:
publish_time = video.publish_time or datetime.now()
industry_id = video.industry_id or ""
api_response = await call_yuntu_api(
item_id=video.item_id,
publish_time=publish_time,
industry_id=industry_id,
aadvid=config["aadvid"],
auth_token=config["auth_token"],
)
api_data = parse_analysis_response(api_response)
a3_increase_cnt = api_data.get("a3_increase_cnt", 0)
ad_a3_increase_cnt = api_data.get("ad_a3_increase_cnt", 0)
natural_a3_increase_cnt = api_data.get("natural_a3_increase_cnt", 0)
api_cost = api_data.get("cost", 0)
ad_cost_val = api_data.get("ad_cost", 0)
api_success = True
except SessionInvalidError:
# Session 失效,从池中移除,重新获取随机 config 重试
session_pool.remove_by_auth_token(config["auth_token"])
logger.warning(f"Session invalid for {video.item_id}, retrying")
retry_config = await get_random_config()
if retry_config:
try:
publish_time = video.publish_time or datetime.now()
industry_id = video.industry_id or ""
api_response = await call_yuntu_api(
item_id=video.item_id,
publish_time=publish_time,
industry_id=industry_id,
aadvid=retry_config["aadvid"],
auth_token=retry_config["auth_token"],
)
api_data = parse_analysis_response(api_response)
a3_increase_cnt = api_data.get("a3_increase_cnt", 0)
ad_a3_increase_cnt = api_data.get("ad_a3_increase_cnt", 0)
natural_a3_increase_cnt = api_data.get("natural_a3_increase_cnt", 0)
api_cost = api_data.get("cost", 0)
ad_cost_val = api_data.get("ad_cost", 0)
api_success = True
except Exception as e2:
logger.warning(f"Retry failed for {video.item_id}: {e2}")
a3_increase_cnt = video.total_new_a3_cnt or 0
ad_a3_increase_cnt = video.heated_new_a3_cnt or 0
natural_a3_increase_cnt = video.natural_new_a3_cnt or 0
api_cost = video.total_cost or 0.0
except Exception as e:
logger.warning(f"API failed for {video.item_id}: {e}")
a3_increase_cnt = video.total_new_a3_cnt or 0
ad_a3_increase_cnt = video.heated_new_a3_cnt or 0
natural_a3_increase_cnt = video.natural_new_a3_cnt or 0
api_cost = video.total_cost or 0.0
# 收集待写回 DB 的数据(不在并发中操作 session
if api_success:
pending_updates.append({
"item_id": video.item_id,
"total_new_a3_cnt": int(a3_increase_cnt),
"heated_new_a3_cnt": int(ad_a3_increase_cnt),
"natural_new_a3_cnt": int(natural_a3_increase_cnt),
"total_cost": float(api_cost),
"heated_cost": float(ad_cost_val),
})
brand_name = brand_map.get(video.brand_id, video.brand_id or "") if video.brand_id else ""
results[idx] = _build_video_list_item(
video=video,
a3_increase_cnt=a3_increase_cnt,
ad_a3_increase_cnt=ad_a3_increase_cnt,
natural_a3_increase_cnt=natural_a3_increase_cnt,
api_cost=api_cost,
brand_name=brand_name,
)
# 为每个视频分配一个独立的 config并发执行
tasks = []
for i, (idx, video) in enumerate(api_videos):
config = configs[i] if i < len(configs) else configs[i % len(configs)] if configs else {}
tasks.append(_fetch_single(idx, video, config))
await asyncio.gather(*tasks)
# 顺序写回 DB避免并发 session 操作导致状态损坏)
for upd in pending_updates:
await update_video_a3_metrics(
session=session,
item_id=upd["item_id"],
total_new_a3_cnt=upd["total_new_a3_cnt"],
heated_new_a3_cnt=upd["heated_new_a3_cnt"],
natural_new_a3_cnt=upd["natural_new_a3_cnt"],
total_cost=upd["total_cost"],
heated_cost=upd["heated_cost"],
)
# 过滤 None不应发生防御性编程
return [r for r in results if r is not None]