kol-insight/backend/app/services/video_analysis.py
zfc 3ae63ff27a fix(backend): 修复详情页数据结构与前端不匹配的问题
将 get_video_analysis_data 返回的字段名改为匹配前端 VideoAnalysisData 类型:
- cost_metrics_raw -> cost_metrics
- cost_metrics_calculated -> calculated_metrics
- 字段名统一使用前端期望的命名

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 22:06:17 +08:00

458 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
视频分析服务 (T-024)
实现视频分析数据获取和成本指标计算。
"""
import logging
from datetime import datetime
from typing import Dict, Optional, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy import update
from app.models.kol_video import KolVideo
from app.services.yuntu_api import (
get_video_analysis as fetch_yuntu_analysis,
parse_analysis_response,
YuntuAPIError,
)
logger = logging.getLogger(__name__)
def calculate_cost_metrics(
cost: float,
natural_play_cnt: int,
a3_increase_cnt: int,
natural_a3_increase_cnt: int,
after_view_search_uv: int,
total_play_cnt: int,
) -> Dict[str, Optional[float]]:
"""
计算成本指标。
Args:
cost: 总花费
natural_play_cnt: 自然播放数
a3_increase_cnt: 新增A3
natural_a3_increase_cnt: 自然新增A3
after_view_search_uv: 看后搜人数
total_play_cnt: 总播放数
Returns:
Dict: 成本指标字典
"""
metrics = {}
# CPM = cost / total_play_cnt * 1000
if total_play_cnt and total_play_cnt > 0:
metrics["cpm"] = round(cost / total_play_cnt * 1000, 2)
else:
metrics["cpm"] = None
# 自然CPM = cost / natural_play_cnt * 1000
if natural_play_cnt and natural_play_cnt > 0:
metrics["natural_cpm"] = round(cost / natural_play_cnt * 1000, 2)
else:
metrics["natural_cpm"] = None
# CPA3 = cost / a3_increase_cnt
if a3_increase_cnt and a3_increase_cnt > 0:
metrics["cpa3"] = round(cost / a3_increase_cnt, 2)
else:
metrics["cpa3"] = None
# 自然CPA3 = cost / natural_a3_increase_cnt
if natural_a3_increase_cnt and natural_a3_increase_cnt > 0:
metrics["natural_cpa3"] = round(cost / natural_a3_increase_cnt, 2)
else:
metrics["natural_cpa3"] = None
# CPsearch = cost / after_view_search_uv
if after_view_search_uv and after_view_search_uv > 0:
metrics["cp_search"] = round(cost / after_view_search_uv, 2)
else:
metrics["cp_search"] = None
# 预估自然看后搜人数 = natural_play_cnt / total_play_cnt * after_view_search_uv
if total_play_cnt and total_play_cnt > 0 and after_view_search_uv:
estimated_natural_search_uv = (
natural_play_cnt / total_play_cnt * after_view_search_uv
)
metrics["estimated_natural_search_uv"] = round(estimated_natural_search_uv, 2)
# 自然CPsearch = cost / estimated_natural_search_uv
if estimated_natural_search_uv > 0:
metrics["natural_cp_search"] = round(cost / estimated_natural_search_uv, 2)
else:
metrics["natural_cp_search"] = None
else:
metrics["estimated_natural_search_uv"] = None
metrics["natural_cp_search"] = None
return metrics
async def get_video_base_info(
session: AsyncSession, item_id: str
) -> Optional[KolVideo]:
"""
从数据库获取视频基础信息。
Args:
session: 数据库会话
item_id: 视频ID
Returns:
KolVideo or None
"""
stmt = select(KolVideo).where(KolVideo.item_id == item_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def get_video_analysis_data(
session: AsyncSession, item_id: str
) -> Dict[str, Any]:
"""
获取视频分析数据T-024主接口
返回6大类指标匹配前端 VideoAnalysisData 类型):
- base_info: 基础信息
- reach_metrics: 触达指标
- a3_metrics: A3指标
- search_metrics: 搜索指标
- cost_metrics: 费用指标
- calculated_metrics: 成本指标(实时计算)
Args:
session: 数据库会话
item_id: 视频ID
Returns:
Dict: 完整的视频分析数据
Raises:
ValueError: 视频不存在时抛出
"""
from app.services.brand_api import get_brand_names
# 1. 从数据库获取基础信息
video = await get_video_base_info(session, item_id)
if video is None:
raise ValueError(f"Video not found: {item_id}")
# 2. 获取品牌名称
brand_name = ""
if video.brand_id:
brand_map = await get_brand_names([video.brand_id])
brand_name = brand_map.get(video.brand_id, video.brand_id)
# 3. 调用巨量云图API获取实时 A3 数据和 cost
a3_increase_cnt = 0
ad_a3_increase_cnt = 0
natural_a3_increase_cnt = 0
api_cost = 0.0
try:
publish_time = video.publish_time or datetime.now()
industry_id = video.industry_id or ""
api_response = await fetch_yuntu_analysis(
item_id=item_id,
publish_time=publish_time,
industry_id=industry_id,
)
analysis_data = parse_analysis_response(api_response)
a3_increase_cnt = analysis_data.get("a3_increase_cnt", 0)
ad_a3_increase_cnt = analysis_data.get("ad_a3_increase_cnt", 0)
natural_a3_increase_cnt = analysis_data.get("natural_a3_increase_cnt", 0)
api_cost = analysis_data.get("cost", 0)
except Exception as e:
logger.warning(f"API failed for {item_id}: {e}, using DB data")
a3_increase_cnt = video.total_new_a3_cnt or 0
ad_a3_increase_cnt = video.heated_new_a3_cnt or 0
natural_a3_increase_cnt = video.natural_new_a3_cnt or 0
api_cost = video.total_cost or 0.0
# 4. 数据库字段
estimated_video_cost = video.estimated_video_cost or 0.0
natural_play_cnt = video.natural_play_cnt or 0
heated_play_cnt = video.heated_play_cnt or 0
total_play_cnt = video.total_play_cnt or 0
after_view_search_uv = video.after_view_search_uv or 0
# 5. 计算成本指标
# 预估加热费用 = max(total_cost - estimated_video_cost, 0)
heated_cost = max(api_cost - estimated_video_cost, 0) if api_cost > estimated_video_cost else 0
# 预估自然看后搜人数
estimated_natural_search_uv = None
if total_play_cnt > 0 and after_view_search_uv > 0:
estimated_natural_search_uv = round((natural_play_cnt / total_play_cnt) * after_view_search_uv, 2)
# 预估CPM = (total_cost / total_play_cnt) * 1000
estimated_cpm = round((api_cost / total_play_cnt) * 1000, 2) if total_play_cnt > 0 else None
# 预估自然CPM = (estimated_video_cost / natural_play_cnt) * 1000
estimated_natural_cpm = round((estimated_video_cost / natural_play_cnt) * 1000, 2) if natural_play_cnt > 0 else None
# 预估CPA3 = total_cost / a3_increase_cnt
estimated_cp_a3 = round(api_cost / a3_increase_cnt, 2) if a3_increase_cnt > 0 else None
# 预估自然CPA3 = estimated_video_cost / natural_a3_increase_cnt
estimated_natural_cp_a3 = round(estimated_video_cost / natural_a3_increase_cnt, 2) if natural_a3_increase_cnt > 0 else None
# 预估CPsearch = total_cost / after_view_search_uv
estimated_cp_search = round(api_cost / after_view_search_uv, 2) if after_view_search_uv > 0 else None
# 自然CPsearch = estimated_video_cost / estimated_natural_search_uv
estimated_natural_cp_search = round(estimated_video_cost / estimated_natural_search_uv, 2) if estimated_natural_search_uv and estimated_natural_search_uv > 0 else None
# 6. 组装返回数据(匹配前端 VideoAnalysisData 类型)
return {
"base_info": {
"star_nickname": video.star_nickname or "",
"star_unique_id": video.star_unique_id or "",
"vid": video.item_id,
"title": video.title or "",
"create_date": video.publish_time.isoformat() if video.publish_time else None,
"hot_type": video.viral_type or "",
"industry_id": video.industry_id or "",
"brand_id": video.brand_id or "",
"brand_name": brand_name,
"video_url": video.video_url or "",
},
"reach_metrics": {
"natural_play_cnt": natural_play_cnt,
"heated_play_cnt": heated_play_cnt,
"total_play_cnt": total_play_cnt,
"total_interaction_cnt": video.total_interact or 0,
"digg_cnt": video.like_cnt or 0,
"share_cnt": video.share_cnt or 0,
"comment_cnt": video.comment_cnt or 0,
},
"a3_metrics": {
"total_new_a3_cnt": a3_increase_cnt,
"heated_new_a3_cnt": ad_a3_increase_cnt,
"natural_new_a3_cnt": natural_a3_increase_cnt,
},
"search_metrics": {
"back_search_uv": video.return_search_cnt or 0,
"back_search_cnt": video.return_search_cnt or 0,
"after_view_search_uv": after_view_search_uv,
"after_view_search_cnt": after_view_search_uv,
"estimated_natural_search_uv": estimated_natural_search_uv,
},
"cost_metrics": {
"total_cost": api_cost,
"heated_cost": heated_cost,
"estimated_video_cost": estimated_video_cost,
},
"calculated_metrics": {
"estimated_cpm": estimated_cpm,
"estimated_natural_cpm": estimated_natural_cpm,
"estimated_cp_a3": estimated_cp_a3,
"estimated_natural_cp_a3": estimated_natural_cp_a3,
"estimated_cp_search": estimated_cp_search,
"estimated_natural_cp_search": estimated_natural_cp_search,
},
}
async def update_video_a3_metrics(
session: AsyncSession,
item_id: str,
total_new_a3_cnt: int,
heated_new_a3_cnt: int,
natural_new_a3_cnt: int,
total_cost: float,
) -> bool:
"""
更新数据库中的A3指标 (T-025)。
Args:
session: 数据库会话
item_id: 视频ID
total_new_a3_cnt: 总新增A3
heated_new_a3_cnt: 加热新增A3
natural_new_a3_cnt: 自然新增A3
total_cost: 总花费
Returns:
bool: 更新是否成功
"""
try:
stmt = (
update(KolVideo)
.where(KolVideo.item_id == item_id)
.values(
total_new_a3_cnt=total_new_a3_cnt,
heated_new_a3_cnt=heated_new_a3_cnt,
natural_new_a3_cnt=natural_new_a3_cnt,
total_cost=total_cost,
)
)
result = await session.execute(stmt)
await session.commit()
if result.rowcount > 0:
logger.info(f"Updated A3 metrics for video {item_id}")
return True
else:
logger.warning(f"No video found to update: {item_id}")
return False
except Exception as e:
logger.error(f"Failed to update A3 metrics for {item_id}: {e}")
await session.rollback()
return False
async def get_and_update_video_analysis(
session: AsyncSession, item_id: str
) -> Dict[str, Any]:
"""
获取视频分析数据并更新数据库中的A3指标 (T-024 + T-025 组合)。
Args:
session: 数据库会话
item_id: 视频ID
Returns:
Dict: 完整的视频分析数据
"""
# 获取分析数据
result = await get_video_analysis_data(session, item_id)
# 提取A3指标
a3_metrics = result.get("a3_metrics", {})
cost_raw = result.get("cost_metrics_raw", {})
# 更新数据库
await update_video_a3_metrics(
session=session,
item_id=item_id,
total_new_a3_cnt=a3_metrics.get("a3_increase_cnt", 0),
heated_new_a3_cnt=a3_metrics.get("ad_a3_increase_cnt", 0),
natural_new_a3_cnt=a3_metrics.get("natural_a3_increase_cnt", 0),
total_cost=cost_raw.get("cost", 0),
)
return result
async def search_videos_by_star_id(
session: AsyncSession, star_id: str
) -> list[KolVideo]:
"""根据星图ID精准匹配搜索视频列表。"""
stmt = select(KolVideo).where(KolVideo.star_id == star_id)
result = await session.execute(stmt)
return list(result.scalars().all())
async def search_videos_by_unique_id(
session: AsyncSession, unique_id: str
) -> list[KolVideo]:
"""根据达人unique_id精准匹配搜索视频列表。"""
stmt = select(KolVideo).where(KolVideo.star_unique_id == unique_id)
result = await session.execute(stmt)
return list(result.scalars().all())
async def search_videos_by_nickname(
session: AsyncSession, nickname: str
) -> list[KolVideo]:
"""根据达人昵称模糊匹配搜索视频列表。"""
stmt = select(KolVideo).where(KolVideo.star_nickname.ilike(f"%{nickname}%"))
result = await session.execute(stmt)
return list(result.scalars().all())
async def get_video_list_with_a3(
session: AsyncSession, videos: list[KolVideo]
) -> list[Dict[str, Any]]:
"""
获取视频列表的摘要数据实时调用云图API获取A3数据
"""
from app.services.brand_api import get_brand_names
# 批量获取品牌名称
brand_ids = [video.brand_id for video in videos if video.brand_id]
brand_map = await get_brand_names(brand_ids) if brand_ids else {}
result = []
for video in videos:
# 实时调用云图 API 获取 A3 数据和 cost
a3_increase_cnt = 0
ad_a3_increase_cnt = 0
natural_a3_increase_cnt = 0
api_cost = 0.0
try:
publish_time = video.publish_time or datetime.now()
industry_id = video.industry_id or ""
api_response = await fetch_yuntu_analysis(
item_id=video.item_id,
publish_time=publish_time,
industry_id=industry_id,
)
api_data = parse_analysis_response(api_response)
a3_increase_cnt = api_data.get("a3_increase_cnt", 0)
ad_a3_increase_cnt = api_data.get("ad_a3_increase_cnt", 0)
natural_a3_increase_cnt = api_data.get("natural_a3_increase_cnt", 0)
api_cost = api_data.get("cost", 0)
except Exception as e:
logger.warning(f"API failed for {video.item_id}: {e}")
a3_increase_cnt = video.total_new_a3_cnt or 0
ad_a3_increase_cnt = video.heated_new_a3_cnt or 0
natural_a3_increase_cnt = video.natural_new_a3_cnt or 0
api_cost = video.total_cost or 0.0
# 数据库字段
estimated_video_cost = video.estimated_video_cost or 0.0
natural_play_cnt = video.natural_play_cnt or 0
total_play_cnt = video.total_play_cnt or 0
after_view_search_uv = video.after_view_search_uv or 0
# 计算成本指标
estimated_natural_search_uv = None
if total_play_cnt > 0 and after_view_search_uv > 0:
estimated_natural_search_uv = (natural_play_cnt / total_play_cnt) * after_view_search_uv
estimated_natural_cpm = round((estimated_video_cost / natural_play_cnt) * 1000, 2) if natural_play_cnt > 0 else None
estimated_cp_a3 = round(api_cost / a3_increase_cnt, 2) if a3_increase_cnt > 0 else None
estimated_natural_cp_a3 = round(estimated_video_cost / natural_a3_increase_cnt, 2) if natural_a3_increase_cnt > 0 else None
estimated_cp_search = round(api_cost / after_view_search_uv, 2) if after_view_search_uv > 0 else None
estimated_natural_cp_search = round(estimated_video_cost / estimated_natural_search_uv, 2) if estimated_natural_search_uv and estimated_natural_search_uv > 0 else None
brand_name = brand_map.get(video.brand_id, video.brand_id) if video.brand_id else ""
result.append({
"item_id": video.item_id,
"star_nickname": video.star_nickname or "",
"title": video.title or "",
"video_url": video.video_url or "",
"create_date": video.publish_time.isoformat() if video.publish_time else None,
"hot_type": video.viral_type or "",
"industry_id": video.industry_id or "",
"brand_id": video.brand_id or "",
"brand_name": brand_name,
"total_new_a3_cnt": a3_increase_cnt,
"heated_new_a3_cnt": ad_a3_increase_cnt,
"natural_new_a3_cnt": natural_a3_increase_cnt,
"estimated_natural_cpm": estimated_natural_cpm,
"estimated_cp_a3": estimated_cp_a3,
"estimated_natural_cp_a3": estimated_natural_cp_a3,
"estimated_cp_search": estimated_cp_search,
"estimated_natural_cp_search": estimated_natural_cp_search,
})
return result