kol-insight/backend/app/services/video_analysis.py
zfc 7cd29c5980 feat(frontend): 重构视频分析页面,支持多种搜索方式
主要更新:
- 前端改用 Ant Design 组件(Table、Modal、Select 等)
- 支持三种搜索方式:星图ID、达人unique_id、达人昵称模糊匹配
- 列表页实时调用云图 API 获取 A3 数据和成本指标
- 详情弹窗显示完整 6 大类指标,支持文字复制
- 品牌 API URL 格式修复为查询参数形式
- 优化云图 API 参数格式和会话池管理

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 22:01:55 +08:00

432 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
视频分析服务 (T-024)
实现视频分析数据获取和成本指标计算。
"""
import logging
from datetime import datetime
from typing import Dict, Optional, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy import update
from app.models.kol_video import KolVideo
from app.services.yuntu_api import (
get_video_analysis as fetch_yuntu_analysis,
parse_analysis_response,
YuntuAPIError,
)
logger = logging.getLogger(__name__)
def calculate_cost_metrics(
cost: float,
natural_play_cnt: int,
a3_increase_cnt: int,
natural_a3_increase_cnt: int,
after_view_search_uv: int,
total_play_cnt: int,
) -> Dict[str, Optional[float]]:
"""
计算成本指标。
Args:
cost: 总花费
natural_play_cnt: 自然播放数
a3_increase_cnt: 新增A3
natural_a3_increase_cnt: 自然新增A3
after_view_search_uv: 看后搜人数
total_play_cnt: 总播放数
Returns:
Dict: 成本指标字典
"""
metrics = {}
# CPM = cost / total_play_cnt * 1000
if total_play_cnt and total_play_cnt > 0:
metrics["cpm"] = round(cost / total_play_cnt * 1000, 2)
else:
metrics["cpm"] = None
# 自然CPM = cost / natural_play_cnt * 1000
if natural_play_cnt and natural_play_cnt > 0:
metrics["natural_cpm"] = round(cost / natural_play_cnt * 1000, 2)
else:
metrics["natural_cpm"] = None
# CPA3 = cost / a3_increase_cnt
if a3_increase_cnt and a3_increase_cnt > 0:
metrics["cpa3"] = round(cost / a3_increase_cnt, 2)
else:
metrics["cpa3"] = None
# 自然CPA3 = cost / natural_a3_increase_cnt
if natural_a3_increase_cnt and natural_a3_increase_cnt > 0:
metrics["natural_cpa3"] = round(cost / natural_a3_increase_cnt, 2)
else:
metrics["natural_cpa3"] = None
# CPsearch = cost / after_view_search_uv
if after_view_search_uv and after_view_search_uv > 0:
metrics["cp_search"] = round(cost / after_view_search_uv, 2)
else:
metrics["cp_search"] = None
# 预估自然看后搜人数 = natural_play_cnt / total_play_cnt * after_view_search_uv
if total_play_cnt and total_play_cnt > 0 and after_view_search_uv:
estimated_natural_search_uv = (
natural_play_cnt / total_play_cnt * after_view_search_uv
)
metrics["estimated_natural_search_uv"] = round(estimated_natural_search_uv, 2)
# 自然CPsearch = cost / estimated_natural_search_uv
if estimated_natural_search_uv > 0:
metrics["natural_cp_search"] = round(cost / estimated_natural_search_uv, 2)
else:
metrics["natural_cp_search"] = None
else:
metrics["estimated_natural_search_uv"] = None
metrics["natural_cp_search"] = None
return metrics
async def get_video_base_info(
session: AsyncSession, item_id: str
) -> Optional[KolVideo]:
"""
从数据库获取视频基础信息。
Args:
session: 数据库会话
item_id: 视频ID
Returns:
KolVideo or None
"""
stmt = select(KolVideo).where(KolVideo.item_id == item_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def get_video_analysis_data(
session: AsyncSession, item_id: str
) -> Dict[str, Any]:
"""
获取视频分析数据T-024主接口
包含:
- 基础信息(从数据库)
- 触达指标从巨量云图API
- A3指标
- 搜索指标
- 费用指标
- 成本指标(计算得出)
Args:
session: 数据库会话
item_id: 视频ID
Returns:
Dict: 完整的视频分析数据
Raises:
ValueError: 视频不存在时抛出
YuntuAPIError: API调用失败时抛出
"""
# 1. 从数据库获取基础信息
video = await get_video_base_info(session, item_id)
if video is None:
raise ValueError(f"Video not found: {item_id}")
# 2. 构建基础信息
base_info = {
"item_id": video.item_id,
"title": video.title,
"video_url": video.video_url,
"star_id": video.star_id,
"star_unique_id": video.star_unique_id,
"star_nickname": video.star_nickname,
"publish_time": video.publish_time.isoformat() if video.publish_time else None,
"industry_name": video.industry_name,
}
# 3. 调用巨量云图API获取实时数据
try:
publish_time = video.publish_time or datetime.now()
industry_id = video.industry_id or ""
api_response = await fetch_yuntu_analysis(
item_id=item_id,
publish_time=publish_time,
industry_id=industry_id,
)
# 4. 解析API响应
analysis_data = parse_analysis_response(api_response)
except YuntuAPIError as e:
logger.error(f"Failed to get yuntu analysis for {item_id}: {e.message}")
# API失败时使用数据库中的数据
analysis_data = {
"total_show_cnt": video.total_play_cnt or 0,
"natural_show_cnt": video.natural_play_cnt or 0,
"ad_show_cnt": video.heated_play_cnt or 0,
"total_play_cnt": video.total_play_cnt or 0,
"natural_play_cnt": video.natural_play_cnt or 0,
"ad_play_cnt": video.heated_play_cnt or 0,
"effective_play_cnt": 0,
"a3_increase_cnt": 0,
"ad_a3_increase_cnt": 0,
"natural_a3_increase_cnt": 0,
"after_view_search_uv": video.after_view_search_uv or 0,
"after_view_search_pv": 0,
"brand_search_uv": 0,
"product_search_uv": 0,
"return_search_cnt": video.return_search_cnt or 0,
"cost": video.estimated_video_cost or 0,
"natural_cost": 0,
"ad_cost": 0,
}
# 5. 计算成本指标
cost = analysis_data.get("cost", 0) or (video.estimated_video_cost or 0)
cost_metrics = calculate_cost_metrics(
cost=cost,
natural_play_cnt=analysis_data.get("natural_play_cnt", 0),
a3_increase_cnt=analysis_data.get("a3_increase_cnt", 0),
natural_a3_increase_cnt=analysis_data.get("natural_a3_increase_cnt", 0),
after_view_search_uv=analysis_data.get("after_view_search_uv", 0),
total_play_cnt=analysis_data.get("total_play_cnt", 0),
)
# 6. 组装返回数据
return {
"base_info": base_info,
"reach_metrics": {
"total_show_cnt": analysis_data.get("total_show_cnt", 0),
"natural_show_cnt": analysis_data.get("natural_show_cnt", 0),
"ad_show_cnt": analysis_data.get("ad_show_cnt", 0),
"total_play_cnt": analysis_data.get("total_play_cnt", 0),
"natural_play_cnt": analysis_data.get("natural_play_cnt", 0),
"ad_play_cnt": analysis_data.get("ad_play_cnt", 0),
"effective_play_cnt": analysis_data.get("effective_play_cnt", 0),
},
"a3_metrics": {
"a3_increase_cnt": analysis_data.get("a3_increase_cnt", 0),
"ad_a3_increase_cnt": analysis_data.get("ad_a3_increase_cnt", 0),
"natural_a3_increase_cnt": analysis_data.get("natural_a3_increase_cnt", 0),
},
"search_metrics": {
"after_view_search_uv": analysis_data.get("after_view_search_uv", 0),
"after_view_search_pv": analysis_data.get("after_view_search_pv", 0),
"brand_search_uv": analysis_data.get("brand_search_uv", 0),
"product_search_uv": analysis_data.get("product_search_uv", 0),
"return_search_cnt": analysis_data.get("return_search_cnt", 0),
},
"cost_metrics_raw": {
"cost": analysis_data.get("cost", 0),
"natural_cost": analysis_data.get("natural_cost", 0),
"ad_cost": analysis_data.get("ad_cost", 0),
},
"cost_metrics_calculated": cost_metrics,
}
async def update_video_a3_metrics(
session: AsyncSession,
item_id: str,
total_new_a3_cnt: int,
heated_new_a3_cnt: int,
natural_new_a3_cnt: int,
total_cost: float,
) -> bool:
"""
更新数据库中的A3指标 (T-025)。
Args:
session: 数据库会话
item_id: 视频ID
total_new_a3_cnt: 总新增A3
heated_new_a3_cnt: 加热新增A3
natural_new_a3_cnt: 自然新增A3
total_cost: 总花费
Returns:
bool: 更新是否成功
"""
try:
stmt = (
update(KolVideo)
.where(KolVideo.item_id == item_id)
.values(
total_new_a3_cnt=total_new_a3_cnt,
heated_new_a3_cnt=heated_new_a3_cnt,
natural_new_a3_cnt=natural_new_a3_cnt,
total_cost=total_cost,
)
)
result = await session.execute(stmt)
await session.commit()
if result.rowcount > 0:
logger.info(f"Updated A3 metrics for video {item_id}")
return True
else:
logger.warning(f"No video found to update: {item_id}")
return False
except Exception as e:
logger.error(f"Failed to update A3 metrics for {item_id}: {e}")
await session.rollback()
return False
async def get_and_update_video_analysis(
session: AsyncSession, item_id: str
) -> Dict[str, Any]:
"""
获取视频分析数据并更新数据库中的A3指标 (T-024 + T-025 组合)。
Args:
session: 数据库会话
item_id: 视频ID
Returns:
Dict: 完整的视频分析数据
"""
# 获取分析数据
result = await get_video_analysis_data(session, item_id)
# 提取A3指标
a3_metrics = result.get("a3_metrics", {})
cost_raw = result.get("cost_metrics_raw", {})
# 更新数据库
await update_video_a3_metrics(
session=session,
item_id=item_id,
total_new_a3_cnt=a3_metrics.get("a3_increase_cnt", 0),
heated_new_a3_cnt=a3_metrics.get("ad_a3_increase_cnt", 0),
natural_new_a3_cnt=a3_metrics.get("natural_a3_increase_cnt", 0),
total_cost=cost_raw.get("cost", 0),
)
return result
async def search_videos_by_star_id(
session: AsyncSession, star_id: str
) -> list[KolVideo]:
"""根据星图ID精准匹配搜索视频列表。"""
stmt = select(KolVideo).where(KolVideo.star_id == star_id)
result = await session.execute(stmt)
return list(result.scalars().all())
async def search_videos_by_unique_id(
session: AsyncSession, unique_id: str
) -> list[KolVideo]:
"""根据达人unique_id精准匹配搜索视频列表。"""
stmt = select(KolVideo).where(KolVideo.star_unique_id == unique_id)
result = await session.execute(stmt)
return list(result.scalars().all())
async def search_videos_by_nickname(
session: AsyncSession, nickname: str
) -> list[KolVideo]:
"""根据达人昵称模糊匹配搜索视频列表。"""
stmt = select(KolVideo).where(KolVideo.star_nickname.ilike(f"%{nickname}%"))
result = await session.execute(stmt)
return list(result.scalars().all())
async def get_video_list_with_a3(
session: AsyncSession, videos: list[KolVideo]
) -> list[Dict[str, Any]]:
"""
获取视频列表的摘要数据实时调用云图API获取A3数据
"""
from app.services.brand_api import get_brand_names
# 批量获取品牌名称
brand_ids = [video.brand_id for video in videos if video.brand_id]
brand_map = await get_brand_names(brand_ids) if brand_ids else {}
result = []
for video in videos:
# 实时调用云图 API 获取 A3 数据和 cost
a3_increase_cnt = 0
ad_a3_increase_cnt = 0
natural_a3_increase_cnt = 0
api_cost = 0.0
try:
publish_time = video.publish_time or datetime.now()
industry_id = video.industry_id or ""
api_response = await fetch_yuntu_analysis(
item_id=video.item_id,
publish_time=publish_time,
industry_id=industry_id,
)
api_data = parse_analysis_response(api_response)
a3_increase_cnt = api_data.get("a3_increase_cnt", 0)
ad_a3_increase_cnt = api_data.get("ad_a3_increase_cnt", 0)
natural_a3_increase_cnt = api_data.get("natural_a3_increase_cnt", 0)
api_cost = api_data.get("cost", 0)
except Exception as e:
logger.warning(f"API failed for {video.item_id}: {e}")
a3_increase_cnt = video.total_new_a3_cnt or 0
ad_a3_increase_cnt = video.heated_new_a3_cnt or 0
natural_a3_increase_cnt = video.natural_new_a3_cnt or 0
api_cost = video.total_cost or 0.0
# 数据库字段
estimated_video_cost = video.estimated_video_cost or 0.0
natural_play_cnt = video.natural_play_cnt or 0
total_play_cnt = video.total_play_cnt or 0
after_view_search_uv = video.after_view_search_uv or 0
# 计算成本指标
estimated_natural_search_uv = None
if total_play_cnt > 0 and after_view_search_uv > 0:
estimated_natural_search_uv = (natural_play_cnt / total_play_cnt) * after_view_search_uv
estimated_natural_cpm = round((estimated_video_cost / natural_play_cnt) * 1000, 2) if natural_play_cnt > 0 else None
estimated_cp_a3 = round(api_cost / a3_increase_cnt, 2) if a3_increase_cnt > 0 else None
estimated_natural_cp_a3 = round(estimated_video_cost / natural_a3_increase_cnt, 2) if natural_a3_increase_cnt > 0 else None
estimated_cp_search = round(api_cost / after_view_search_uv, 2) if after_view_search_uv > 0 else None
estimated_natural_cp_search = round(estimated_video_cost / estimated_natural_search_uv, 2) if estimated_natural_search_uv and estimated_natural_search_uv > 0 else None
brand_name = brand_map.get(video.brand_id, video.brand_id) if video.brand_id else ""
result.append({
"item_id": video.item_id,
"star_nickname": video.star_nickname or "",
"title": video.title or "",
"video_url": video.video_url or "",
"create_date": video.publish_time.isoformat() if video.publish_time else None,
"hot_type": video.viral_type or "",
"industry_id": video.industry_id or "",
"brand_id": video.brand_id or "",
"brand_name": brand_name,
"total_new_a3_cnt": a3_increase_cnt,
"heated_new_a3_cnt": ad_a3_increase_cnt,
"natural_new_a3_cnt": natural_a3_increase_cnt,
"estimated_natural_cpm": estimated_natural_cpm,
"estimated_cp_a3": estimated_cp_a3,
"estimated_natural_cp_a3": estimated_natural_cp_a3,
"estimated_cp_search": estimated_cp_search,
"estimated_natural_cp_search": estimated_natural_cp_search,
})
return result