zfc d838a9bea2 feat(deploy): 完成 Phase 3 优化与测试
- T-013 错误处理: 增强 API 错误处理,添加日志记录
- T-015 视频链接: 已在 ResultTable 中实现点击跳转
- T-016 部署配置: 添加前后端 Dockerfile 和 docker-compose.yml
- 新增 11 个错误处理测试用例,共 55 个测试全部通过
- 测试覆盖率达到 93%

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 14:42:40 +08:00

88 lines
3.2 KiB
Python

from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import SQLAlchemyError
from app.database import get_db
from app.schemas.query import QueryRequest, QueryResponse, VideoData
from app.services.query_service import query_videos
from app.services.calculator import calculate_metrics
from app.services.brand_api import get_brand_names
from app.api.v1.export import set_export_data
from app.core.logging import get_logger
router = APIRouter()
logger = get_logger(__name__)
@router.post("/query", response_model=QueryResponse)
async def query(
request: QueryRequest,
db: AsyncSession = Depends(get_db),
) -> QueryResponse:
"""
批量查询 KOL 视频数据.
支持三种查询方式:
- star_id: 按星图ID精准匹配
- unique_id: 按达人unique_id精准匹配
- nickname: 按达人昵称模糊匹配
"""
try:
# 1. 查询数据库
logger.info(f"Querying videos: type={request.type}, count={len(request.values)}")
videos = await query_videos(db, request.type, request.values)
if not videos:
logger.info("No videos found for query")
return QueryResponse(success=True, data=[], total=0)
# 2. 提取品牌ID并批量获取品牌名称
brand_ids = [v.brand_id for v in videos if v.brand_id]
brand_map = {}
if brand_ids:
try:
brand_map = await get_brand_names(brand_ids)
except Exception as brand_err:
logger.warning(f"Failed to fetch brand names, using fallback: {brand_err}")
# 降级处理:使用 brand_id 作为名称
# 3. 转换为响应模型并计算指标
data = []
for video in videos:
video_data = VideoData.model_validate(video)
# 填充品牌名称
if video.brand_id:
video_data.brand_name = brand_map.get(video.brand_id, video.brand_id)
# 计算预估指标
metrics = calculate_metrics(
estimated_video_cost=video.estimated_video_cost,
natural_play_cnt=video.natural_play_cnt,
total_play_cnt=video.total_play_cnt,
after_view_search_uv=video.after_view_search_uv,
)
video_data.estimated_natural_cpm = metrics["estimated_natural_cpm"]
video_data.estimated_natural_search_uv = metrics["estimated_natural_search_uv"]
video_data.estimated_natural_search_cost = metrics["estimated_natural_search_cost"]
data.append(video_data)
# 缓存数据供导出使用
set_export_data([d.model_dump() for d in data])
logger.info(f"Query successful: {len(data)} videos found")
return QueryResponse(success=True, data=data, total=len(data))
except SQLAlchemyError as db_err:
logger.error(f"Database error: {db_err}")
return QueryResponse(
success=False,
data=[],
total=0,
error="数据库连接失败,请稍后重试"
)
except Exception as e:
logger.error(f"Query error: {e}")
return QueryResponse(success=False, data=[], total=0, error=str(e))