diff --git a/backend/.env.example b/backend/.env.example
index 98541c9..a053734 100644
--- a/backend/.env.example
+++ b/backend/.env.example
@@ -6,3 +6,7 @@ CORS_ORIGINS=["http://localhost:3000"]
# 品牌 API 配置
BRAND_API_BASE_URL=https://api.internal.intelligrow.cn
+BRAND_API_TOKEN=your_brand_api_token_here
+
+# 云图 API 配置 (SessionID池服务)
+YUNTU_API_TOKEN=your_yuntu_api_token_here
diff --git a/backend/app/api/v1/video_analysis.py b/backend/app/api/v1/video_analysis.py
new file mode 100644
index 0000000..4d4fdd7
--- /dev/null
+++ b/backend/app/api/v1/video_analysis.py
@@ -0,0 +1,55 @@
+"""
+视频分析API路由 (T-024)
+
+GET /api/v1/videos/{item_id}/analysis
+"""
+
+from fastapi import APIRouter, Depends, HTTPException
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.database import get_db
+from app.services.video_analysis import get_video_analysis_data
+from app.services.yuntu_api import YuntuAPIError
+
+router = APIRouter(prefix="/videos", tags=["视频分析"])
+
+
+@router.get("/{item_id}/analysis")
+async def get_video_analysis(
+ item_id: str,
+ db: AsyncSession = Depends(get_db),
+):
+ """
+ 获取视频分析数据。
+
+ 返回6大类指标:
+ - 基础信息 (8字段)
+ - 触达指标 (7字段)
+ - A3指标 (3字段)
+ - 搜索指标 (5字段)
+ - 费用指标 (3字段)
+ - 成本指标 (6字段,计算得出)
+
+ Args:
+ item_id: 视频ID
+
+ Returns:
+ 视频分析数据
+
+ Raises:
+ 404: 视频不存在
+ 500: API调用失败
+ """
+ try:
+ result = await get_video_analysis_data(db, item_id)
+ return {
+ "success": True,
+ "data": result,
+ }
+ except ValueError as e:
+ raise HTTPException(status_code=404, detail=str(e))
+ except YuntuAPIError as e:
+ # API失败但有降级数据时不抛错
+ raise HTTPException(status_code=500, detail=f"API Error: {e.message}")
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"Internal error: {str(e)}")
diff --git a/backend/app/config.py b/backend/app/config.py
index de23107..f4dc154 100644
--- a/backend/app/config.py
+++ b/backend/app/config.py
@@ -8,6 +8,7 @@ class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
+ extra="ignore", # 忽略额外的环境变量
)
# Database
@@ -18,11 +19,16 @@ class Settings(BaseSettings):
# Brand API
BRAND_API_BASE_URL: str = "https://api.internal.intelligrow.cn"
+ BRAND_API_TOKEN: str = "" # Bearer Token for Brand API authentication
+
+ # Yuntu API (for SessionID pool)
+ YUNTU_API_TOKEN: str = "" # Bearer Token for Yuntu Cookie API
# API Settings
MAX_QUERY_LIMIT: int = 1000
BRAND_API_TIMEOUT: float = 3.0
BRAND_API_CONCURRENCY: int = 10
+ YUNTU_API_TIMEOUT: float = 10.0 # 巨量云图API超时
settings = Settings()
diff --git a/backend/app/main.py b/backend/app/main.py
index 28379a1..f224720 100644
--- a/backend/app/main.py
+++ b/backend/app/main.py
@@ -2,7 +2,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
-from app.api.v1 import query, export
+from app.api.v1 import query, export, video_analysis
app = FastAPI(
title="KOL Insight API",
@@ -22,6 +22,7 @@ app.add_middleware(
# 注册 API 路由
app.include_router(query.router, prefix="/api/v1", tags=["Query"])
app.include_router(export.router, prefix="/api/v1", tags=["Export"])
+app.include_router(video_analysis.router, prefix="/api/v1", tags=["VideoAnalysis"])
@app.get("/")
diff --git a/backend/app/services/brand_api.py b/backend/app/services/brand_api.py
index c92d730..53fca0d 100644
--- a/backend/app/services/brand_api.py
+++ b/backend/app/services/brand_api.py
@@ -24,19 +24,30 @@ async def fetch_brand_name(
"""
async with semaphore:
try:
+ # 构建请求头,包含 Bearer Token 认证 (T-020)
+ headers = {}
+ if settings.BRAND_API_TOKEN:
+ headers["Authorization"] = f"Bearer {settings.BRAND_API_TOKEN}"
+
async with httpx.AsyncClient(
timeout=settings.BRAND_API_TIMEOUT
) as client:
response = await client.get(
- f"{settings.BRAND_API_BASE_URL}/v1/yuntu/brands/{brand_id}"
+ f"{settings.BRAND_API_BASE_URL}/v1/yuntu/brands/{brand_id}",
+ headers=headers,
)
if response.status_code == 200:
data = response.json()
- # 尝试从响应中获取品牌名称
+ # T-019: 正确解析品牌API响应
+ # 响应格式: {"total": 1, "data": [{"brand_id": xxx, "brand_name": "xxx"}]}
if isinstance(data, dict):
- name = data.get("data", {}).get("name") or data.get("name")
- if name:
- return brand_id, name
+ data_list = data.get("data", [])
+ if isinstance(data_list, list) and len(data_list) > 0:
+ first_item = data_list[0]
+ if isinstance(first_item, dict):
+ name = first_item.get("brand_name")
+ if name:
+ return brand_id, name
except httpx.TimeoutException:
logger.warning(f"Brand API timeout for brand_id: {brand_id}")
except httpx.RequestError as e:
diff --git a/backend/app/services/session_pool.py b/backend/app/services/session_pool.py
new file mode 100644
index 0000000..72ebdac
--- /dev/null
+++ b/backend/app/services/session_pool.py
@@ -0,0 +1,141 @@
+"""
+SessionID池服务 (T-021)
+
+从内部API获取Cookie列表,随机选取sessionid用于巨量云图API调用。
+"""
+
+import asyncio
+import random
+import logging
+from typing import List, Optional
+
+import httpx
+
+from app.config import settings
+
+logger = logging.getLogger(__name__)
+
+
+class SessionPool:
+ """SessionID池管理器"""
+
+ def __init__(self):
+ self._sessions: List[str] = []
+ self._lock = asyncio.Lock()
+
+ async def refresh(self) -> bool:
+ """
+ 从内部API刷新SessionID列表。
+
+ Returns:
+ bool: 刷新是否成功
+ """
+ async with self._lock:
+ try:
+ headers = {}
+ if settings.YUNTU_API_TOKEN:
+ headers["Authorization"] = f"Bearer {settings.YUNTU_API_TOKEN}"
+
+ async with httpx.AsyncClient(
+ timeout=settings.YUNTU_API_TIMEOUT
+ ) as client:
+ response = await client.get(
+ f"{settings.BRAND_API_BASE_URL}/v1/yuntu/get_cookie",
+ params={"page": 1, "page_size": 100},
+ headers=headers,
+ )
+
+ if response.status_code == 200:
+ data = response.json()
+ # 响应格式: {"data": [{"sessionid": "xxx", ...}, ...]}
+ if isinstance(data, dict):
+ cookie_list = data.get("data", [])
+ if isinstance(cookie_list, list):
+ self._sessions = [
+ item.get("sessionid")
+ for item in cookie_list
+ if isinstance(item, dict) and item.get("sessionid")
+ ]
+ logger.info(
+ f"SessionPool refreshed: {len(self._sessions)} sessions"
+ )
+ return len(self._sessions) > 0
+
+ logger.warning(
+ f"Failed to refresh session pool: status={response.status_code}"
+ )
+ return False
+
+ except httpx.TimeoutException:
+ logger.error("SessionPool refresh timeout")
+ return False
+ except httpx.RequestError as e:
+ logger.error(f"SessionPool refresh request error: {e}")
+ return False
+ except Exception as e:
+ logger.error(f"SessionPool refresh unexpected error: {e}")
+ return False
+
+ def get_random(self) -> Optional[str]:
+ """
+ 随机获取一个SessionID。
+
+ Returns:
+ Optional[str]: SessionID,池为空时返回None
+ """
+ if not self._sessions:
+ return None
+ return random.choice(self._sessions)
+
+ def remove(self, session_id: str) -> None:
+ """
+ 从池中移除失效的SessionID。
+
+ Args:
+ session_id: 要移除的SessionID
+ """
+ try:
+ self._sessions.remove(session_id)
+ logger.info(f"Removed invalid session: {session_id[:8]}...")
+ except ValueError:
+ pass # 已经被移除
+
+ @property
+ def size(self) -> int:
+ """返回池中SessionID数量"""
+ return len(self._sessions)
+
+ @property
+ def is_empty(self) -> bool:
+ """检查池是否为空"""
+ return len(self._sessions) == 0
+
+
+# 全局单例
+session_pool = SessionPool()
+
+
+async def get_session_with_retry(max_retries: int = 3) -> Optional[str]:
+ """
+ 获取SessionID,必要时刷新池 (T-022 支持)。
+
+ Args:
+ max_retries: 最大重试次数
+
+ Returns:
+ Optional[str]: SessionID,获取失败返回None
+ """
+ for attempt in range(max_retries):
+ # 如果池为空,尝试刷新
+ if session_pool.is_empty:
+ success = await session_pool.refresh()
+ if not success:
+ logger.warning(f"Session pool refresh failed, attempt {attempt + 1}")
+ continue
+
+ session_id = session_pool.get_random()
+ if session_id:
+ return session_id
+
+ logger.error("Failed to get session after all retries")
+ return None
diff --git a/backend/app/services/video_analysis.py b/backend/app/services/video_analysis.py
new file mode 100644
index 0000000..820bbe6
--- /dev/null
+++ b/backend/app/services/video_analysis.py
@@ -0,0 +1,320 @@
+"""
+视频分析服务 (T-024)
+
+实现视频分析数据获取和成本指标计算。
+"""
+
+import logging
+from datetime import datetime
+from typing import Dict, Optional, Any
+
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+
+from sqlalchemy import update
+
+from app.models.kol_video import KolVideo
+from app.services.yuntu_api import (
+ get_video_analysis as fetch_yuntu_analysis,
+ parse_analysis_response,
+ YuntuAPIError,
+)
+
+logger = logging.getLogger(__name__)
+
+
+def calculate_cost_metrics(
+ cost: float,
+ natural_play_cnt: int,
+ a3_increase_cnt: int,
+ natural_a3_increase_cnt: int,
+ after_view_search_uv: int,
+ total_play_cnt: int,
+) -> Dict[str, Optional[float]]:
+ """
+ 计算成本指标。
+
+ Args:
+ cost: 总花费
+ natural_play_cnt: 自然播放数
+ a3_increase_cnt: 新增A3
+ natural_a3_increase_cnt: 自然新增A3
+ after_view_search_uv: 看后搜人数
+ total_play_cnt: 总播放数
+
+ Returns:
+ Dict: 成本指标字典
+ """
+ metrics = {}
+
+ # CPM = cost / total_play_cnt * 1000
+ if total_play_cnt and total_play_cnt > 0:
+ metrics["cpm"] = round(cost / total_play_cnt * 1000, 2)
+ else:
+ metrics["cpm"] = None
+
+ # 自然CPM = cost / natural_play_cnt * 1000
+ if natural_play_cnt and natural_play_cnt > 0:
+ metrics["natural_cpm"] = round(cost / natural_play_cnt * 1000, 2)
+ else:
+ metrics["natural_cpm"] = None
+
+ # CPA3 = cost / a3_increase_cnt
+ if a3_increase_cnt and a3_increase_cnt > 0:
+ metrics["cpa3"] = round(cost / a3_increase_cnt, 2)
+ else:
+ metrics["cpa3"] = None
+
+ # 自然CPA3 = cost / natural_a3_increase_cnt
+ if natural_a3_increase_cnt and natural_a3_increase_cnt > 0:
+ metrics["natural_cpa3"] = round(cost / natural_a3_increase_cnt, 2)
+ else:
+ metrics["natural_cpa3"] = None
+
+ # CPsearch = cost / after_view_search_uv
+ if after_view_search_uv and after_view_search_uv > 0:
+ metrics["cp_search"] = round(cost / after_view_search_uv, 2)
+ else:
+ metrics["cp_search"] = None
+
+ # 预估自然看后搜人数 = natural_play_cnt / total_play_cnt * after_view_search_uv
+ if total_play_cnt and total_play_cnt > 0 and after_view_search_uv:
+ estimated_natural_search_uv = (
+ natural_play_cnt / total_play_cnt * after_view_search_uv
+ )
+ metrics["estimated_natural_search_uv"] = round(estimated_natural_search_uv, 2)
+
+ # 自然CPsearch = cost / estimated_natural_search_uv
+ if estimated_natural_search_uv > 0:
+ metrics["natural_cp_search"] = round(cost / estimated_natural_search_uv, 2)
+ else:
+ metrics["natural_cp_search"] = None
+ else:
+ metrics["estimated_natural_search_uv"] = None
+ metrics["natural_cp_search"] = None
+
+ return metrics
+
+
+async def get_video_base_info(
+ session: AsyncSession, item_id: str
+) -> Optional[KolVideo]:
+ """
+ 从数据库获取视频基础信息。
+
+ Args:
+ session: 数据库会话
+ item_id: 视频ID
+
+ Returns:
+ KolVideo or None
+ """
+ stmt = select(KolVideo).where(KolVideo.item_id == item_id)
+ result = await session.execute(stmt)
+ return result.scalar_one_or_none()
+
+
+async def get_video_analysis_data(
+ session: AsyncSession, item_id: str
+) -> Dict[str, Any]:
+ """
+ 获取视频分析数据(T-024主接口)。
+
+ 包含:
+ - 基础信息(从数据库)
+ - 触达指标(从巨量云图API)
+ - A3指标
+ - 搜索指标
+ - 费用指标
+ - 成本指标(计算得出)
+
+ Args:
+ session: 数据库会话
+ item_id: 视频ID
+
+ Returns:
+ Dict: 完整的视频分析数据
+
+ Raises:
+ ValueError: 视频不存在时抛出
+ YuntuAPIError: API调用失败时抛出
+ """
+ # 1. 从数据库获取基础信息
+ video = await get_video_base_info(session, item_id)
+ if video is None:
+ raise ValueError(f"Video not found: {item_id}")
+
+ # 2. 构建基础信息
+ base_info = {
+ "item_id": video.item_id,
+ "title": video.title,
+ "video_url": video.video_url,
+ "star_id": video.star_id,
+ "star_unique_id": video.star_unique_id,
+ "star_nickname": video.star_nickname,
+ "publish_time": video.publish_time.isoformat() if video.publish_time else None,
+ "industry_name": video.industry_name,
+ }
+
+ # 3. 调用巨量云图API获取实时数据
+ try:
+ publish_time = video.publish_time or datetime.now()
+ industry_id = video.industry_id or ""
+
+ api_response = await fetch_yuntu_analysis(
+ item_id=item_id,
+ publish_time=publish_time,
+ industry_id=industry_id,
+ )
+
+ # 4. 解析API响应
+ analysis_data = parse_analysis_response(api_response)
+
+ except YuntuAPIError as e:
+ logger.error(f"Failed to get yuntu analysis for {item_id}: {e.message}")
+ # API失败时,使用数据库中的数据
+ analysis_data = {
+ "total_show_cnt": video.total_play_cnt or 0,
+ "natural_show_cnt": video.natural_play_cnt or 0,
+ "ad_show_cnt": video.heated_play_cnt or 0,
+ "total_play_cnt": video.total_play_cnt or 0,
+ "natural_play_cnt": video.natural_play_cnt or 0,
+ "ad_play_cnt": video.heated_play_cnt or 0,
+ "effective_play_cnt": 0,
+ "a3_increase_cnt": 0,
+ "ad_a3_increase_cnt": 0,
+ "natural_a3_increase_cnt": 0,
+ "after_view_search_uv": video.after_view_search_uv or 0,
+ "after_view_search_pv": 0,
+ "brand_search_uv": 0,
+ "product_search_uv": 0,
+ "return_search_cnt": video.return_search_cnt or 0,
+ "cost": video.estimated_video_cost or 0,
+ "natural_cost": 0,
+ "ad_cost": 0,
+ }
+
+ # 5. 计算成本指标
+ cost = analysis_data.get("cost", 0) or (video.estimated_video_cost or 0)
+ cost_metrics = calculate_cost_metrics(
+ cost=cost,
+ natural_play_cnt=analysis_data.get("natural_play_cnt", 0),
+ a3_increase_cnt=analysis_data.get("a3_increase_cnt", 0),
+ natural_a3_increase_cnt=analysis_data.get("natural_a3_increase_cnt", 0),
+ after_view_search_uv=analysis_data.get("after_view_search_uv", 0),
+ total_play_cnt=analysis_data.get("total_play_cnt", 0),
+ )
+
+ # 6. 组装返回数据
+ return {
+ "base_info": base_info,
+ "reach_metrics": {
+ "total_show_cnt": analysis_data.get("total_show_cnt", 0),
+ "natural_show_cnt": analysis_data.get("natural_show_cnt", 0),
+ "ad_show_cnt": analysis_data.get("ad_show_cnt", 0),
+ "total_play_cnt": analysis_data.get("total_play_cnt", 0),
+ "natural_play_cnt": analysis_data.get("natural_play_cnt", 0),
+ "ad_play_cnt": analysis_data.get("ad_play_cnt", 0),
+ "effective_play_cnt": analysis_data.get("effective_play_cnt", 0),
+ },
+ "a3_metrics": {
+ "a3_increase_cnt": analysis_data.get("a3_increase_cnt", 0),
+ "ad_a3_increase_cnt": analysis_data.get("ad_a3_increase_cnt", 0),
+ "natural_a3_increase_cnt": analysis_data.get("natural_a3_increase_cnt", 0),
+ },
+ "search_metrics": {
+ "after_view_search_uv": analysis_data.get("after_view_search_uv", 0),
+ "after_view_search_pv": analysis_data.get("after_view_search_pv", 0),
+ "brand_search_uv": analysis_data.get("brand_search_uv", 0),
+ "product_search_uv": analysis_data.get("product_search_uv", 0),
+ "return_search_cnt": analysis_data.get("return_search_cnt", 0),
+ },
+ "cost_metrics_raw": {
+ "cost": analysis_data.get("cost", 0),
+ "natural_cost": analysis_data.get("natural_cost", 0),
+ "ad_cost": analysis_data.get("ad_cost", 0),
+ },
+ "cost_metrics_calculated": cost_metrics,
+ }
+
+
+async def update_video_a3_metrics(
+ session: AsyncSession,
+ item_id: str,
+ total_new_a3_cnt: int,
+ heated_new_a3_cnt: int,
+ natural_new_a3_cnt: int,
+ total_cost: float,
+) -> bool:
+ """
+ 更新数据库中的A3指标 (T-025)。
+
+ Args:
+ session: 数据库会话
+ item_id: 视频ID
+ total_new_a3_cnt: 总新增A3
+ heated_new_a3_cnt: 加热新增A3
+ natural_new_a3_cnt: 自然新增A3
+ total_cost: 总花费
+
+ Returns:
+ bool: 更新是否成功
+ """
+ try:
+ stmt = (
+ update(KolVideo)
+ .where(KolVideo.item_id == item_id)
+ .values(
+ total_new_a3_cnt=total_new_a3_cnt,
+ heated_new_a3_cnt=heated_new_a3_cnt,
+ natural_new_a3_cnt=natural_new_a3_cnt,
+ total_cost=total_cost,
+ )
+ )
+ result = await session.execute(stmt)
+ await session.commit()
+
+ if result.rowcount > 0:
+ logger.info(f"Updated A3 metrics for video {item_id}")
+ return True
+ else:
+ logger.warning(f"No video found to update: {item_id}")
+ return False
+
+ except Exception as e:
+ logger.error(f"Failed to update A3 metrics for {item_id}: {e}")
+ await session.rollback()
+ return False
+
+
+async def get_and_update_video_analysis(
+ session: AsyncSession, item_id: str
+) -> Dict[str, Any]:
+ """
+ 获取视频分析数据并更新数据库中的A3指标 (T-024 + T-025 组合)。
+
+ Args:
+ session: 数据库会话
+ item_id: 视频ID
+
+ Returns:
+ Dict: 完整的视频分析数据
+ """
+ # 获取分析数据
+ result = await get_video_analysis_data(session, item_id)
+
+ # 提取A3指标
+ a3_metrics = result.get("a3_metrics", {})
+ cost_raw = result.get("cost_metrics_raw", {})
+
+ # 更新数据库
+ await update_video_a3_metrics(
+ session=session,
+ item_id=item_id,
+ total_new_a3_cnt=a3_metrics.get("a3_increase_cnt", 0),
+ heated_new_a3_cnt=a3_metrics.get("ad_a3_increase_cnt", 0),
+ natural_new_a3_cnt=a3_metrics.get("natural_a3_increase_cnt", 0),
+ total_cost=cost_raw.get("cost", 0),
+ )
+
+ return result
diff --git a/backend/app/services/yuntu_api.py b/backend/app/services/yuntu_api.py
new file mode 100644
index 0000000..624dd6f
--- /dev/null
+++ b/backend/app/services/yuntu_api.py
@@ -0,0 +1,228 @@
+"""
+巨量云图API封装 (T-023)
+
+封装GetContentMaterialAnalysisInfo接口调用,获取视频分析数据。
+"""
+
+import logging
+from datetime import datetime, timedelta
+from typing import Dict, List, Optional, Any
+
+import httpx
+
+from app.config import settings
+from app.services.session_pool import session_pool, get_session_with_retry
+
+logger = logging.getLogger(__name__)
+
+# 巨量云图API基础URL
+YUNTU_BASE_URL = "https://yuntu.oceanengine.com"
+
+# 触发点ID列表(固定值)
+TRIGGER_POINT_IDS = ["610000", "610300", "610301"]
+
+
+class YuntuAPIError(Exception):
+ """巨量云图API错误"""
+
+ def __init__(self, message: str, status_code: int = 0, response_data: Any = None):
+ self.message = message
+ self.status_code = status_code
+ self.response_data = response_data
+ super().__init__(self.message)
+
+
+class SessionInvalidError(YuntuAPIError):
+ """SessionID失效错误"""
+
+ pass
+
+
+async def call_yuntu_api(
+ item_id: str,
+ publish_time: datetime,
+ industry_id: str,
+ session_id: Optional[str] = None,
+) -> Dict[str, Any]:
+ """
+ 调用巨量云图GetContentMaterialAnalysisInfo接口。
+
+ Args:
+ item_id: 视频ID
+ publish_time: 发布时间
+ industry_id: 行业ID
+ session_id: 可选的sessionid,不提供则从池中获取
+
+ Returns:
+ Dict: API响应数据
+
+ Raises:
+ SessionInvalidError: SessionID失效时抛出
+ YuntuAPIError: API调用失败时抛出
+ """
+ # 获取sessionid
+ if session_id is None:
+ session_id = await get_session_with_retry()
+ if session_id is None:
+ raise YuntuAPIError("Failed to get valid session")
+
+ # 构造请求参数
+ # end_date = start_date + 30天
+ start_date = publish_time.strftime("%Y-%m-%d")
+ end_date = (publish_time + timedelta(days=30)).strftime("%Y-%m-%d")
+
+ request_data = {
+ "is_my_video": "0",
+ "object_id": item_id,
+ "object_type": 2,
+ "start_date": start_date,
+ "end_date": end_date,
+ "assist_type": 3,
+ "assist_video_type": 3,
+ "industry_id_list": [industry_id] if industry_id else [],
+ "trigger_point_id_list": TRIGGER_POINT_IDS,
+ }
+
+ # 构造请求头
+ headers = {
+ "Content-Type": "application/json",
+ "Cookie": f"sessionid={session_id}",
+ }
+
+ try:
+ async with httpx.AsyncClient(timeout=settings.YUNTU_API_TIMEOUT) as client:
+ response = await client.post(
+ f"{YUNTU_BASE_URL}/yuntu_common/api/content/trigger_analysis/GetContentMaterialAnalysisInfo",
+ json=request_data,
+ headers=headers,
+ )
+
+ # 检查SessionID是否失效
+ if response.status_code in (401, 403):
+ logger.warning(f"Session invalid: {session_id[:8]}...")
+ raise SessionInvalidError(
+ f"Session invalid: {response.status_code}",
+ status_code=response.status_code,
+ )
+
+ if response.status_code != 200:
+ raise YuntuAPIError(
+ f"API returned {response.status_code}",
+ status_code=response.status_code,
+ response_data=response.text,
+ )
+
+ data = response.json()
+
+ # 检查业务错误码
+ if data.get("code") != 0:
+ error_msg = data.get("message", "Unknown error")
+ raise YuntuAPIError(
+ f"API business error: {error_msg}",
+ status_code=response.status_code,
+ response_data=data,
+ )
+
+ return data
+
+ except httpx.TimeoutException:
+ logger.error(f"Yuntu API timeout for item_id: {item_id}")
+ raise YuntuAPIError("API request timeout")
+ except httpx.RequestError as e:
+ logger.error(f"Yuntu API request error: {e}")
+ raise YuntuAPIError(f"API request error: {e}")
+
+
+async def get_video_analysis(
+ item_id: str,
+ publish_time: datetime,
+ industry_id: str,
+ max_retries: int = 3,
+) -> Dict[str, Any]:
+ """
+ 获取视频分析数据,支持SessionID失效自动重试 (T-022)。
+
+ Args:
+ item_id: 视频ID
+ publish_time: 发布时间
+ industry_id: 行业ID
+ max_retries: 最大重试次数
+
+ Returns:
+ Dict: 视频分析数据
+
+ Raises:
+ YuntuAPIError: 所有重试失败后抛出
+ """
+ last_error = None
+
+ for attempt in range(max_retries):
+ # 从池中获取sessionid
+ session_id = await get_session_with_retry()
+ if session_id is None:
+ last_error = YuntuAPIError("Failed to get valid session")
+ continue
+
+ try:
+ result = await call_yuntu_api(
+ item_id=item_id,
+ publish_time=publish_time,
+ industry_id=industry_id,
+ session_id=session_id,
+ )
+ return result
+
+ except SessionInvalidError:
+ # SessionID失效,从池中移除并重试
+ session_pool.remove(session_id)
+ logger.info(
+ f"Session invalid, retrying... attempt {attempt + 1}/{max_retries}"
+ )
+ last_error = SessionInvalidError("All sessions invalid")
+ continue
+
+ except YuntuAPIError as e:
+ last_error = e
+ logger.error(f"Yuntu API error on attempt {attempt + 1}: {e.message}")
+ # 非SessionID问题,不再重试
+ break
+
+ raise last_error or YuntuAPIError("Unknown error after retries")
+
+
+def parse_analysis_response(data: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ 解析巨量云图API响应,提取关键指标。
+
+ Args:
+ data: API原始响应数据
+
+ Returns:
+ Dict: 结构化的分析数据
+ """
+ result_data = data.get("data", {})
+
+ return {
+ # 触达指标
+ "total_show_cnt": result_data.get("total_show_cnt", 0), # 总曝光数
+ "natural_show_cnt": result_data.get("natural_show_cnt", 0), # 自然曝光数
+ "ad_show_cnt": result_data.get("ad_show_cnt", 0), # 加热曝光数
+ "total_play_cnt": result_data.get("total_play_cnt", 0), # 总播放数
+ "natural_play_cnt": result_data.get("natural_play_cnt", 0), # 自然播放数
+ "ad_play_cnt": result_data.get("ad_play_cnt", 0), # 加热播放数
+ "effective_play_cnt": result_data.get("effective_play_cnt", 0), # 有效播放数
+ # A3指标
+ "a3_increase_cnt": result_data.get("a3_increase_cnt", 0), # 新增A3
+ "ad_a3_increase_cnt": result_data.get("ad_a3_increase_cnt", 0), # 加热新增A3
+ "natural_a3_increase_cnt": result_data.get("natural_a3_increase_cnt", 0), # 自然新增A3
+ # 搜索指标
+ "after_view_search_uv": result_data.get("after_view_search_uv", 0), # 看后搜人数
+ "after_view_search_pv": result_data.get("after_view_search_pv", 0), # 看后搜次数
+ "brand_search_uv": result_data.get("brand_search_uv", 0), # 品牌搜索人数
+ "product_search_uv": result_data.get("product_search_uv", 0), # 商品搜索人数
+ "return_search_cnt": result_data.get("return_search_cnt", 0), # 回搜次数
+ # 费用指标
+ "cost": result_data.get("cost", 0), # 总花费
+ "natural_cost": result_data.get("natural_cost", 0), # 自然花费
+ "ad_cost": result_data.get("ad_cost", 0), # 加热花费
+ }
diff --git a/backend/tests/test_brand_api.py b/backend/tests/test_brand_api.py
index 8d02276..273d227 100644
--- a/backend/tests/test_brand_api.py
+++ b/backend/tests/test_brand_api.py
@@ -116,11 +116,24 @@ class TestBrandAPI:
# 验证所有调用都完成了
assert mock_fetch.call_count == 15
- async def test_fetch_brand_name_200_with_nested_data(self):
- """Test successful brand fetch with nested data structure."""
+ async def test_fetch_brand_name_200_with_array_data(self):
+ """Test successful brand fetch with array data structure (T-019 fix)."""
+ # 正确的API响应格式: data是数组,从data[0].brand_name获取品牌名称
mock_response = MagicMock()
mock_response.status_code = 200
- mock_response.json.return_value = {"data": {"name": "嵌套品牌名"}}
+ mock_response.json.return_value = {
+ "total": 1,
+ "last_updated": "2025-12-30T11:28:40.738185",
+ "has_more": 0,
+ "data": [
+ {
+ "industry_id": 20,
+ "industry_name": "母婴",
+ "brand_id": 533661,
+ "brand_name": "Giving/启初"
+ }
+ ]
+ }
mock_client = AsyncMock()
mock_client.get.return_value = mock_response
@@ -129,16 +142,19 @@ class TestBrandAPI:
with patch("httpx.AsyncClient", return_value=mock_client):
semaphore = asyncio.Semaphore(10)
- brand_id, brand_name = await fetch_brand_name("brand_nested", semaphore)
+ brand_id, brand_name = await fetch_brand_name("533661", semaphore)
- assert brand_id == "brand_nested"
- assert brand_name == "嵌套品牌名"
+ assert brand_id == "533661"
+ assert brand_name == "Giving/启初"
- async def test_fetch_brand_name_200_with_flat_data(self):
- """Test successful brand fetch with flat data structure."""
+ async def test_fetch_brand_name_200_with_empty_data_array(self):
+ """Test brand fetch with 200 but empty data array (T-019 edge case)."""
mock_response = MagicMock()
mock_response.status_code = 200
- mock_response.json.return_value = {"name": "扁平品牌名"}
+ mock_response.json.return_value = {
+ "total": 0,
+ "data": []
+ }
mock_client = AsyncMock()
mock_client.get.return_value = mock_response
@@ -147,16 +163,19 @@ class TestBrandAPI:
with patch("httpx.AsyncClient", return_value=mock_client):
semaphore = asyncio.Semaphore(10)
- brand_id, brand_name = await fetch_brand_name("brand_flat", semaphore)
+ brand_id, brand_name = await fetch_brand_name("unknown_brand", semaphore)
- assert brand_id == "brand_flat"
- assert brand_name == "扁平品牌名"
+ assert brand_id == "unknown_brand"
+ assert brand_name == "unknown_brand" # Fallback
- async def test_fetch_brand_name_200_no_name(self):
- """Test brand fetch with 200 but no name in response."""
+ async def test_fetch_brand_name_200_no_brand_name_field(self):
+ """Test brand fetch with 200 but no brand_name in data item."""
mock_response = MagicMock()
mock_response.status_code = 200
- mock_response.json.return_value = {"data": {"id": "123"}} # No name field
+ mock_response.json.return_value = {
+ "total": 1,
+ "data": [{"brand_id": 123}] # No brand_name field
+ }
mock_client = AsyncMock()
mock_client.get.return_value = mock_response
@@ -170,6 +189,35 @@ class TestBrandAPI:
assert brand_id == "brand_no_name"
assert brand_name == "brand_no_name" # Fallback
+ async def test_fetch_brand_name_with_auth_header(self):
+ """Test that Authorization header is sent (T-020)."""
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {
+ "total": 1,
+ "data": [{"brand_id": 123, "brand_name": "测试品牌"}]
+ }
+
+ mock_client = AsyncMock()
+ mock_client.get.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ with patch("app.services.brand_api.settings") as mock_settings:
+ mock_settings.BRAND_API_TIMEOUT = 3.0
+ mock_settings.BRAND_API_BASE_URL = "https://api.test.com"
+ mock_settings.BRAND_API_TOKEN = "test_token_123"
+
+ semaphore = asyncio.Semaphore(10)
+ await fetch_brand_name("123", semaphore)
+
+ # 验证请求包含 Authorization header
+ mock_client.get.assert_called_once()
+ call_args = mock_client.get.call_args
+ assert "headers" in call_args.kwargs
+ assert call_args.kwargs["headers"]["Authorization"] == "Bearer test_token_123"
+
async def test_fetch_brand_name_request_error(self):
"""Test brand fetch with request error."""
mock_client = AsyncMock()
diff --git a/backend/tests/test_session_pool.py b/backend/tests/test_session_pool.py
new file mode 100644
index 0000000..eb719dc
--- /dev/null
+++ b/backend/tests/test_session_pool.py
@@ -0,0 +1,314 @@
+"""
+Tests for SessionID Pool Service (T-021, T-022)
+"""
+
+import pytest
+from unittest.mock import AsyncMock, patch, MagicMock
+import httpx
+
+from app.services.session_pool import (
+ SessionPool,
+ session_pool,
+ get_session_with_retry,
+)
+
+
+class TestSessionPool:
+ """Tests for SessionPool class."""
+
+ async def test_refresh_success(self):
+ """Test successful session pool refresh."""
+ pool = SessionPool()
+
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {
+ "data": [
+ {"sessionid": "session_001", "user": "test1"},
+ {"sessionid": "session_002", "user": "test2"},
+ {"sessionid": "session_003", "user": "test3"},
+ ]
+ }
+
+ mock_client = AsyncMock()
+ mock_client.get.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ result = await pool.refresh()
+
+ assert result is True
+ assert pool.size == 3
+ assert not pool.is_empty
+
+ async def test_refresh_empty_data(self):
+ """Test refresh with empty data array."""
+ pool = SessionPool()
+
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {"data": []}
+
+ mock_client = AsyncMock()
+ mock_client.get.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ result = await pool.refresh()
+
+ assert result is False
+ assert pool.size == 0
+
+ async def test_refresh_api_error(self):
+ """Test refresh with API error."""
+ pool = SessionPool()
+
+ mock_response = MagicMock()
+ mock_response.status_code = 500
+
+ mock_client = AsyncMock()
+ mock_client.get.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ result = await pool.refresh()
+
+ assert result is False
+
+ async def test_refresh_timeout(self):
+ """Test refresh with timeout."""
+ pool = SessionPool()
+
+ mock_client = AsyncMock()
+ mock_client.get.side_effect = httpx.TimeoutException("Timeout")
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ result = await pool.refresh()
+
+ assert result is False
+
+ async def test_refresh_request_error(self):
+ """Test refresh with request error."""
+ pool = SessionPool()
+
+ mock_client = AsyncMock()
+ mock_client.get.side_effect = httpx.RequestError("Connection failed")
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ result = await pool.refresh()
+
+ assert result is False
+
+ async def test_refresh_unexpected_error(self):
+ """Test refresh with unexpected error."""
+ pool = SessionPool()
+
+ mock_client = AsyncMock()
+ mock_client.get.side_effect = ValueError("Unexpected")
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ result = await pool.refresh()
+
+ assert result is False
+
+ async def test_refresh_with_auth_header(self):
+ """Test that refresh includes Authorization header."""
+ pool = SessionPool()
+
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {"data": [{"sessionid": "test"}]}
+
+ mock_client = AsyncMock()
+ mock_client.get.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ with patch("app.services.session_pool.settings") as mock_settings:
+ mock_settings.YUNTU_API_TOKEN = "test_token"
+ mock_settings.YUNTU_API_TIMEOUT = 10.0
+ mock_settings.BRAND_API_BASE_URL = "https://api.test.com"
+
+ await pool.refresh()
+
+ mock_client.get.assert_called_once()
+ call_args = mock_client.get.call_args
+ assert "headers" in call_args.kwargs
+ assert call_args.kwargs["headers"]["Authorization"] == "Bearer test_token"
+
+ def test_get_random_from_pool(self):
+ """Test getting random session from pool."""
+ pool = SessionPool()
+ pool._sessions = ["session_1", "session_2", "session_3"]
+
+ session = pool.get_random()
+
+ assert session in pool._sessions
+
+ def test_get_random_from_empty_pool(self):
+ """Test getting random session from empty pool."""
+ pool = SessionPool()
+
+ session = pool.get_random()
+
+ assert session is None
+
+ def test_remove_session(self):
+ """Test removing a session from pool."""
+ pool = SessionPool()
+ pool._sessions = ["session_1", "session_2", "session_3"]
+
+ pool.remove("session_2")
+
+ assert pool.size == 2
+ assert "session_2" not in pool._sessions
+
+ def test_remove_nonexistent_session(self):
+ """Test removing a session that doesn't exist."""
+ pool = SessionPool()
+ pool._sessions = ["session_1"]
+
+ # Should not raise
+ pool.remove("nonexistent")
+
+ assert pool.size == 1
+
+ def test_size_property(self):
+ """Test size property."""
+ pool = SessionPool()
+ assert pool.size == 0
+
+ pool._sessions = ["a", "b"]
+ assert pool.size == 2
+
+ def test_is_empty_property(self):
+ """Test is_empty property."""
+ pool = SessionPool()
+ assert pool.is_empty is True
+
+ pool._sessions = ["a"]
+ assert pool.is_empty is False
+
+
+class TestGetSessionWithRetry:
+ """Tests for get_session_with_retry function (T-022)."""
+
+ async def test_get_session_success(self):
+ """Test successful session retrieval."""
+ with patch.object(session_pool, "_sessions", ["session_1", "session_2"]):
+ result = await get_session_with_retry()
+
+ assert result in ["session_1", "session_2"]
+
+ async def test_get_session_refresh_on_empty(self):
+ """Test that pool is refreshed when empty."""
+ with patch.object(session_pool, "_sessions", []):
+ with patch.object(session_pool, "refresh") as mock_refresh:
+ mock_refresh.return_value = True
+
+ # After refresh, pool should have sessions
+ async def refresh_side_effect():
+ session_pool._sessions.append("new_session")
+ return True
+
+ mock_refresh.side_effect = refresh_side_effect
+
+ result = await get_session_with_retry()
+
+ assert mock_refresh.called
+ assert result == "new_session"
+
+ async def test_get_session_retry_on_refresh_failure(self):
+ """Test retry behavior when refresh fails."""
+ original_sessions = session_pool._sessions.copy()
+
+ try:
+ session_pool._sessions = []
+
+ with patch.object(session_pool, "refresh") as mock_refresh:
+ mock_refresh.return_value = False
+
+ result = await get_session_with_retry(max_retries=3)
+
+ assert result is None
+ assert mock_refresh.call_count == 3
+ finally:
+ session_pool._sessions = original_sessions
+
+ async def test_get_session_max_retries(self):
+ """Test max retries limit."""
+ original_sessions = session_pool._sessions.copy()
+
+ try:
+ session_pool._sessions = []
+
+ with patch.object(session_pool, "refresh") as mock_refresh:
+ mock_refresh.return_value = False
+
+ result = await get_session_with_retry(max_retries=5)
+
+ assert result is None
+ assert mock_refresh.call_count == 5
+ finally:
+ session_pool._sessions = original_sessions
+
+
+class TestSessionPoolIntegration:
+ """Integration tests for session pool."""
+
+ async def test_refresh_filters_invalid_items(self):
+ """Test that refresh filters out invalid items."""
+ pool = SessionPool()
+
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {
+ "data": [
+ {"sessionid": "valid_session"},
+ {"no_sessionid": "missing"},
+ None,
+ {"sessionid": ""}, # Empty string should be filtered
+ {"sessionid": "another_valid"},
+ ]
+ }
+
+ mock_client = AsyncMock()
+ mock_client.get.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ result = await pool.refresh()
+
+ assert result is True
+ assert pool.size == 2
+ assert "valid_session" in pool._sessions
+ assert "another_valid" in pool._sessions
+
+ async def test_refresh_handles_non_dict_data(self):
+ """Test refresh with non-dict response."""
+ pool = SessionPool()
+
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = ["not", "a", "dict"]
+
+ mock_client = AsyncMock()
+ mock_client.get.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ result = await pool.refresh()
+
+ assert result is False
diff --git a/backend/tests/test_video_analysis.py b/backend/tests/test_video_analysis.py
new file mode 100644
index 0000000..f8f5628
--- /dev/null
+++ b/backend/tests/test_video_analysis.py
@@ -0,0 +1,423 @@
+"""
+Tests for Video Analysis Service (T-024)
+"""
+
+import pytest
+from datetime import datetime
+from unittest.mock import AsyncMock, patch, MagicMock
+
+from app.services.video_analysis import (
+ calculate_cost_metrics,
+ get_video_base_info,
+ get_video_analysis_data,
+ update_video_a3_metrics,
+ get_and_update_video_analysis,
+)
+from app.services.yuntu_api import YuntuAPIError
+
+
+class TestCalculateCostMetrics:
+ """Tests for calculate_cost_metrics function."""
+
+ def test_all_metrics_calculated(self):
+ """Test calculation of all cost metrics."""
+ result = calculate_cost_metrics(
+ cost=10000,
+ natural_play_cnt=40000,
+ a3_increase_cnt=500,
+ natural_a3_increase_cnt=400,
+ after_view_search_uv=1000,
+ total_play_cnt=50000,
+ )
+
+ # CPM = 10000 / 50000 * 1000 = 200
+ assert result["cpm"] == 200.0
+
+ # 自然CPM = 10000 / 40000 * 1000 = 250
+ assert result["natural_cpm"] == 250.0
+
+ # CPA3 = 10000 / 500 = 20
+ assert result["cpa3"] == 20.0
+
+ # 自然CPA3 = 10000 / 400 = 25
+ assert result["natural_cpa3"] == 25.0
+
+ # CPsearch = 10000 / 1000 = 10
+ assert result["cp_search"] == 10.0
+
+ # 预估自然看后搜人数 = 40000 / 50000 * 1000 = 800
+ assert result["estimated_natural_search_uv"] == 800.0
+
+ # 自然CPsearch = 10000 / 800 = 12.5
+ assert result["natural_cp_search"] == 12.5
+
+ def test_zero_total_play_cnt(self):
+ """Test with zero total_play_cnt (division by zero)."""
+ result = calculate_cost_metrics(
+ cost=10000,
+ natural_play_cnt=0,
+ a3_increase_cnt=500,
+ natural_a3_increase_cnt=400,
+ after_view_search_uv=1000,
+ total_play_cnt=0,
+ )
+
+ assert result["cpm"] is None
+ assert result["natural_cpm"] is None
+ assert result["estimated_natural_search_uv"] is None
+ assert result["natural_cp_search"] is None
+
+ def test_zero_a3_counts(self):
+ """Test with zero A3 counts."""
+ result = calculate_cost_metrics(
+ cost=10000,
+ natural_play_cnt=40000,
+ a3_increase_cnt=0,
+ natural_a3_increase_cnt=0,
+ after_view_search_uv=1000,
+ total_play_cnt=50000,
+ )
+
+ assert result["cpa3"] is None
+ assert result["natural_cpa3"] is None
+ # 其他指标应该正常计算
+ assert result["cpm"] == 200.0
+
+ def test_zero_search_uv(self):
+ """Test with zero after_view_search_uv."""
+ result = calculate_cost_metrics(
+ cost=10000,
+ natural_play_cnt=40000,
+ a3_increase_cnt=500,
+ natural_a3_increase_cnt=400,
+ after_view_search_uv=0,
+ total_play_cnt=50000,
+ )
+
+ assert result["cp_search"] is None
+ # 当 after_view_search_uv=0 时,预估自然看后搜人数也应为 None(无意义)
+ assert result["estimated_natural_search_uv"] is None
+ assert result["natural_cp_search"] is None
+
+ def test_all_zeros(self):
+ """Test with all zero values."""
+ result = calculate_cost_metrics(
+ cost=0,
+ natural_play_cnt=0,
+ a3_increase_cnt=0,
+ natural_a3_increase_cnt=0,
+ after_view_search_uv=0,
+ total_play_cnt=0,
+ )
+
+ assert result["cpm"] is None
+ assert result["natural_cpm"] is None
+ assert result["cpa3"] is None
+ assert result["natural_cpa3"] is None
+ assert result["cp_search"] is None
+ assert result["estimated_natural_search_uv"] is None
+ assert result["natural_cp_search"] is None
+
+ def test_decimal_precision(self):
+ """Test that results are rounded to 2 decimal places."""
+ result = calculate_cost_metrics(
+ cost=10000,
+ natural_play_cnt=30000,
+ a3_increase_cnt=333,
+ natural_a3_increase_cnt=111,
+ after_view_search_uv=777,
+ total_play_cnt=70000,
+ )
+
+ # 验证都是2位小数
+ assert isinstance(result["cpm"], float)
+ assert len(str(result["cpm"]).split(".")[-1]) <= 2
+
+
+class TestGetVideoAnalysisData:
+ """Tests for get_video_analysis_data function."""
+
+ async def test_success_with_api_data(self):
+ """Test successful data retrieval with API data."""
+ # Mock database video
+ mock_video = MagicMock()
+ mock_video.item_id = "video_123"
+ mock_video.title = "测试视频"
+ mock_video.video_url = "https://example.com/video"
+ mock_video.star_id = "star_001"
+ mock_video.star_unique_id = "unique_001"
+ mock_video.star_nickname = "测试达人"
+ mock_video.publish_time = datetime(2025, 1, 15)
+ mock_video.industry_name = "母婴"
+ mock_video.industry_id = "20"
+ mock_video.total_play_cnt = 50000
+ mock_video.natural_play_cnt = 40000
+ mock_video.heated_play_cnt = 10000
+ mock_video.after_view_search_uv = 1000
+ mock_video.return_search_cnt = 50
+ mock_video.estimated_video_cost = 10000
+
+ # Mock session
+ mock_session = AsyncMock()
+ mock_result = MagicMock()
+ mock_result.scalar_one_or_none.return_value = mock_video
+ mock_session.execute.return_value = mock_result
+
+ # Mock API response
+ api_response = {
+ "code": 0,
+ "data": {
+ "total_show_cnt": 100000,
+ "natural_show_cnt": 80000,
+ "ad_show_cnt": 20000,
+ "total_play_cnt": 50000,
+ "natural_play_cnt": 40000,
+ "ad_play_cnt": 10000,
+ "effective_play_cnt": 30000,
+ "a3_increase_cnt": 500,
+ "ad_a3_increase_cnt": 100,
+ "natural_a3_increase_cnt": 400,
+ "after_view_search_uv": 1000,
+ "after_view_search_pv": 1500,
+ "brand_search_uv": 200,
+ "product_search_uv": 300,
+ "return_search_cnt": 50,
+ "cost": 10000,
+ "natural_cost": 0,
+ "ad_cost": 10000,
+ },
+ }
+
+ with patch(
+ "app.services.video_analysis.fetch_yuntu_analysis"
+ ) as mock_api:
+ mock_api.return_value = api_response
+
+ result = await get_video_analysis_data(mock_session, "video_123")
+
+ # 验证基础信息
+ assert result["base_info"]["item_id"] == "video_123"
+ assert result["base_info"]["title"] == "测试视频"
+ assert result["base_info"]["star_nickname"] == "测试达人"
+
+ # 验证触达指标
+ assert result["reach_metrics"]["total_show_cnt"] == 100000
+ assert result["reach_metrics"]["natural_play_cnt"] == 40000
+
+ # 验证A3指标
+ assert result["a3_metrics"]["a3_increase_cnt"] == 500
+ assert result["a3_metrics"]["natural_a3_increase_cnt"] == 400
+
+ # 验证搜索指标
+ assert result["search_metrics"]["after_view_search_uv"] == 1000
+
+ # 验证费用指标
+ assert result["cost_metrics_raw"]["cost"] == 10000
+
+ # 验证计算指标
+ assert result["cost_metrics_calculated"]["cpm"] is not None
+ assert result["cost_metrics_calculated"]["cpa3"] is not None
+
+ async def test_video_not_found(self):
+ """Test error when video is not found."""
+ mock_session = AsyncMock()
+ mock_result = MagicMock()
+ mock_result.scalar_one_or_none.return_value = None
+ mock_session.execute.return_value = mock_result
+
+ with pytest.raises(ValueError) as exc_info:
+ await get_video_analysis_data(mock_session, "nonexistent")
+
+ assert "not found" in str(exc_info.value).lower()
+
+ async def test_fallback_on_api_failure(self):
+ """Test fallback to database data when API fails."""
+ # Mock database video
+ mock_video = MagicMock()
+ mock_video.item_id = "video_123"
+ mock_video.title = "测试视频"
+ mock_video.video_url = None
+ mock_video.star_id = "star_001"
+ mock_video.star_unique_id = "unique_001"
+ mock_video.star_nickname = "测试达人"
+ mock_video.publish_time = datetime(2025, 1, 15)
+ mock_video.industry_name = "母婴"
+ mock_video.industry_id = "20"
+ mock_video.total_play_cnt = 50000
+ mock_video.natural_play_cnt = 40000
+ mock_video.heated_play_cnt = 10000
+ mock_video.after_view_search_uv = 1000
+ mock_video.return_search_cnt = 50
+ mock_video.estimated_video_cost = 10000
+
+ # Mock session
+ mock_session = AsyncMock()
+ mock_result = MagicMock()
+ mock_result.scalar_one_or_none.return_value = mock_video
+ mock_session.execute.return_value = mock_result
+
+ with patch(
+ "app.services.video_analysis.fetch_yuntu_analysis"
+ ) as mock_api:
+ mock_api.side_effect = YuntuAPIError("API Error")
+
+ result = await get_video_analysis_data(mock_session, "video_123")
+
+ # 应该使用数据库数据
+ assert result["reach_metrics"]["total_play_cnt"] == 50000
+ assert result["reach_metrics"]["natural_play_cnt"] == 40000
+ assert result["search_metrics"]["after_view_search_uv"] == 1000
+
+ async def test_null_publish_time(self):
+ """Test handling of null publish_time."""
+ mock_video = MagicMock()
+ mock_video.item_id = "video_123"
+ mock_video.title = "测试视频"
+ mock_video.video_url = None
+ mock_video.star_id = "star_001"
+ mock_video.star_unique_id = "unique_001"
+ mock_video.star_nickname = "测试达人"
+ mock_video.publish_time = None # NULL
+ mock_video.industry_name = None
+ mock_video.industry_id = None
+ mock_video.total_play_cnt = 0
+ mock_video.natural_play_cnt = 0
+ mock_video.heated_play_cnt = 0
+ mock_video.after_view_search_uv = 0
+ mock_video.return_search_cnt = 0
+ mock_video.estimated_video_cost = 0
+
+ mock_session = AsyncMock()
+ mock_result = MagicMock()
+ mock_result.scalar_one_or_none.return_value = mock_video
+ mock_session.execute.return_value = mock_result
+
+ with patch(
+ "app.services.video_analysis.fetch_yuntu_analysis"
+ ) as mock_api:
+ mock_api.return_value = {"code": 0, "data": {}}
+
+ result = await get_video_analysis_data(mock_session, "video_123")
+
+ assert result["base_info"]["publish_time"] is None
+
+
+class TestUpdateVideoA3Metrics:
+ """Tests for update_video_a3_metrics function (T-025)."""
+
+ async def test_update_success(self):
+ """Test successful A3 metrics update."""
+ mock_session = AsyncMock()
+ mock_result = MagicMock()
+ mock_result.rowcount = 1
+ mock_session.execute.return_value = mock_result
+
+ result = await update_video_a3_metrics(
+ session=mock_session,
+ item_id="video_123",
+ total_new_a3_cnt=500,
+ heated_new_a3_cnt=100,
+ natural_new_a3_cnt=400,
+ total_cost=10000.0,
+ )
+
+ assert result is True
+ mock_session.commit.assert_called_once()
+
+ async def test_update_video_not_found(self):
+ """Test update when video not found."""
+ mock_session = AsyncMock()
+ mock_result = MagicMock()
+ mock_result.rowcount = 0
+ mock_session.execute.return_value = mock_result
+
+ result = await update_video_a3_metrics(
+ session=mock_session,
+ item_id="nonexistent",
+ total_new_a3_cnt=500,
+ heated_new_a3_cnt=100,
+ natural_new_a3_cnt=400,
+ total_cost=10000.0,
+ )
+
+ assert result is False
+
+ async def test_update_database_error(self):
+ """Test update with database error."""
+ mock_session = AsyncMock()
+ mock_session.execute.side_effect = Exception("Database error")
+
+ result = await update_video_a3_metrics(
+ session=mock_session,
+ item_id="video_123",
+ total_new_a3_cnt=500,
+ heated_new_a3_cnt=100,
+ natural_new_a3_cnt=400,
+ total_cost=10000.0,
+ )
+
+ assert result is False
+ mock_session.rollback.assert_called_once()
+
+
+class TestGetAndUpdateVideoAnalysis:
+ """Tests for get_and_update_video_analysis function (T-024 + T-025)."""
+
+ async def test_get_and_update_success(self):
+ """Test successful get and update."""
+ # Mock database video
+ mock_video = MagicMock()
+ mock_video.item_id = "video_123"
+ mock_video.title = "测试视频"
+ mock_video.video_url = None
+ mock_video.star_id = "star_001"
+ mock_video.star_unique_id = "unique_001"
+ mock_video.star_nickname = "测试达人"
+ mock_video.publish_time = datetime(2025, 1, 15)
+ mock_video.industry_name = "母婴"
+ mock_video.industry_id = "20"
+ mock_video.total_play_cnt = 50000
+ mock_video.natural_play_cnt = 40000
+ mock_video.heated_play_cnt = 10000
+ mock_video.after_view_search_uv = 1000
+ mock_video.return_search_cnt = 50
+ mock_video.estimated_video_cost = 10000
+
+ # Mock session
+ mock_session = AsyncMock()
+ mock_select_result = MagicMock()
+ mock_select_result.scalar_one_or_none.return_value = mock_video
+
+ mock_update_result = MagicMock()
+ mock_update_result.rowcount = 1
+
+ # 根据不同的SQL语句返回不同的结果
+ async def mock_execute(stmt):
+ # 简单判断:如果是 SELECT 返回视频,如果是 UPDATE 返回更新结果
+ stmt_str = str(stmt)
+ if "SELECT" in stmt_str.upper():
+ return mock_select_result
+ return mock_update_result
+
+ mock_session.execute.side_effect = mock_execute
+
+ with patch(
+ "app.services.video_analysis.fetch_yuntu_analysis"
+ ) as mock_api:
+ mock_api.return_value = {
+ "code": 0,
+ "data": {
+ "a3_increase_cnt": 500,
+ "ad_a3_increase_cnt": 100,
+ "natural_a3_increase_cnt": 400,
+ "cost": 10000,
+ },
+ }
+
+ result = await get_and_update_video_analysis(mock_session, "video_123")
+
+ # 验证返回数据
+ assert result["a3_metrics"]["a3_increase_cnt"] == 500
+
+ # 验证数据库更新被调用
+ mock_session.commit.assert_called()
diff --git a/backend/tests/test_yuntu_api.py b/backend/tests/test_yuntu_api.py
new file mode 100644
index 0000000..91a24d6
--- /dev/null
+++ b/backend/tests/test_yuntu_api.py
@@ -0,0 +1,416 @@
+"""
+Tests for Yuntu API Service (T-023)
+"""
+
+import pytest
+from datetime import datetime
+from unittest.mock import AsyncMock, patch, MagicMock
+import httpx
+
+from app.services.yuntu_api import (
+ call_yuntu_api,
+ get_video_analysis,
+ parse_analysis_response,
+ YuntuAPIError,
+ SessionInvalidError,
+)
+
+
+class TestCallYuntuAPI:
+ """Tests for call_yuntu_api function."""
+
+ async def test_call_success(self):
+ """Test successful API call."""
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {
+ "code": 0,
+ "message": "success",
+ "data": {
+ "total_show_cnt": 100000,
+ "a3_increase_cnt": 500,
+ },
+ }
+
+ mock_client = AsyncMock()
+ mock_client.post.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ result = await call_yuntu_api(
+ item_id="test_item_123",
+ publish_time=datetime(2025, 1, 1),
+ industry_id="20",
+ session_id="test_session",
+ )
+
+ assert result["code"] == 0
+ assert result["data"]["total_show_cnt"] == 100000
+
+ async def test_call_with_correct_parameters(self):
+ """Test that API is called with correct parameters."""
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {"code": 0, "data": {}}
+
+ mock_client = AsyncMock()
+ mock_client.post.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ await call_yuntu_api(
+ item_id="video_001",
+ publish_time=datetime(2025, 1, 15),
+ industry_id="30",
+ session_id="session_abc",
+ )
+
+ mock_client.post.assert_called_once()
+ call_args = mock_client.post.call_args
+
+ # 验证URL
+ assert "GetContentMaterialAnalysisInfo" in call_args.args[0]
+
+ # 验证请求体
+ json_data = call_args.kwargs["json"]
+ assert json_data["object_id"] == "video_001"
+ assert json_data["start_date"] == "2025-01-15"
+ assert json_data["end_date"] == "2025-02-14" # +30天
+ assert json_data["industry_id_list"] == ["30"]
+
+ # 验证headers包含sessionid
+ headers = call_args.kwargs["headers"]
+ assert "Cookie" in headers
+ assert "sessionid=session_abc" in headers["Cookie"]
+
+ async def test_call_session_invalid_401(self):
+ """Test handling of 401 response (session invalid)."""
+ mock_response = MagicMock()
+ mock_response.status_code = 401
+
+ mock_client = AsyncMock()
+ mock_client.post.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ with pytest.raises(SessionInvalidError) as exc_info:
+ await call_yuntu_api(
+ item_id="test",
+ publish_time=datetime.now(),
+ industry_id="20",
+ session_id="invalid_session",
+ )
+
+ assert exc_info.value.status_code == 401
+
+ async def test_call_session_invalid_403(self):
+ """Test handling of 403 response (session invalid)."""
+ mock_response = MagicMock()
+ mock_response.status_code = 403
+
+ mock_client = AsyncMock()
+ mock_client.post.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ with pytest.raises(SessionInvalidError):
+ await call_yuntu_api(
+ item_id="test",
+ publish_time=datetime.now(),
+ industry_id="20",
+ session_id="invalid_session",
+ )
+
+ async def test_call_api_error_500(self):
+ """Test handling of 500 response."""
+ mock_response = MagicMock()
+ mock_response.status_code = 500
+ mock_response.text = "Internal Server Error"
+
+ mock_client = AsyncMock()
+ mock_client.post.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ with pytest.raises(YuntuAPIError) as exc_info:
+ await call_yuntu_api(
+ item_id="test",
+ publish_time=datetime.now(),
+ industry_id="20",
+ session_id="session",
+ )
+
+ assert exc_info.value.status_code == 500
+
+ async def test_call_business_error(self):
+ """Test handling of business error (code != 0)."""
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {
+ "code": 1001,
+ "message": "Invalid parameter",
+ }
+
+ mock_client = AsyncMock()
+ mock_client.post.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ with pytest.raises(YuntuAPIError) as exc_info:
+ await call_yuntu_api(
+ item_id="test",
+ publish_time=datetime.now(),
+ industry_id="20",
+ session_id="session",
+ )
+
+ assert "Invalid parameter" in exc_info.value.message
+
+ async def test_call_timeout(self):
+ """Test handling of timeout."""
+ mock_client = AsyncMock()
+ mock_client.post.side_effect = httpx.TimeoutException("Timeout")
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ with pytest.raises(YuntuAPIError) as exc_info:
+ await call_yuntu_api(
+ item_id="test",
+ publish_time=datetime.now(),
+ industry_id="20",
+ session_id="session",
+ )
+
+ assert "timeout" in exc_info.value.message.lower()
+
+ async def test_call_request_error(self):
+ """Test handling of request error."""
+ mock_client = AsyncMock()
+ mock_client.post.side_effect = httpx.RequestError("Connection failed")
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ with pytest.raises(YuntuAPIError):
+ await call_yuntu_api(
+ item_id="test",
+ publish_time=datetime.now(),
+ industry_id="20",
+ session_id="session",
+ )
+
+ async def test_call_without_session_id(self):
+ """Test API call without providing session_id (gets from pool)."""
+ mock_response = MagicMock()
+ mock_response.status_code = 200
+ mock_response.json.return_value = {"code": 0, "data": {}}
+
+ mock_client = AsyncMock()
+ mock_client.post.return_value = mock_response
+ mock_client.__aenter__.return_value = mock_client
+ mock_client.__aexit__.return_value = None
+
+ with patch("httpx.AsyncClient", return_value=mock_client):
+ with patch(
+ "app.services.yuntu_api.get_session_with_retry"
+ ) as mock_get_session:
+ mock_get_session.return_value = "pool_session"
+
+ result = await call_yuntu_api(
+ item_id="test",
+ publish_time=datetime.now(),
+ industry_id="20",
+ )
+
+ assert result["code"] == 0
+ mock_get_session.assert_called_once()
+
+ async def test_call_no_session_available(self):
+ """Test API call when no session is available."""
+ with patch(
+ "app.services.yuntu_api.get_session_with_retry"
+ ) as mock_get_session:
+ mock_get_session.return_value = None
+
+ with pytest.raises(YuntuAPIError) as exc_info:
+ await call_yuntu_api(
+ item_id="test",
+ publish_time=datetime.now(),
+ industry_id="20",
+ )
+
+ assert "session" in exc_info.value.message.lower()
+
+
+class TestGetVideoAnalysis:
+ """Tests for get_video_analysis function with retry logic (T-022)."""
+
+ async def test_success_first_try(self):
+ """Test successful call on first attempt."""
+ with patch("app.services.yuntu_api.get_session_with_retry") as mock_session:
+ mock_session.return_value = "valid_session"
+
+ with patch("app.services.yuntu_api.call_yuntu_api") as mock_call:
+ mock_call.return_value = {"code": 0, "data": {"a3_increase_cnt": 100}}
+
+ result = await get_video_analysis(
+ item_id="test",
+ publish_time=datetime.now(),
+ industry_id="20",
+ )
+
+ assert result["data"]["a3_increase_cnt"] == 100
+ assert mock_call.call_count == 1
+
+ async def test_retry_on_session_invalid(self):
+ """Test retry when session is invalid."""
+ with patch("app.services.yuntu_api.get_session_with_retry") as mock_session:
+ mock_session.side_effect = ["session_1", "session_2", "session_3"]
+
+ with patch("app.services.yuntu_api.call_yuntu_api") as mock_call:
+ # 前两次失败,第三次成功
+ mock_call.side_effect = [
+ SessionInvalidError("Invalid"),
+ SessionInvalidError("Invalid"),
+ {"code": 0, "data": {}},
+ ]
+
+ with patch("app.services.yuntu_api.session_pool") as mock_pool:
+ result = await get_video_analysis(
+ item_id="test",
+ publish_time=datetime.now(),
+ industry_id="20",
+ max_retries=3,
+ )
+
+ assert result["code"] == 0
+ assert mock_call.call_count == 3
+ # 验证失效的session被移除
+ assert mock_pool.remove.call_count == 2
+
+ async def test_max_retries_exceeded(self):
+ """Test that error is raised after max retries."""
+ with patch("app.services.yuntu_api.get_session_with_retry") as mock_session:
+ mock_session.return_value = "session"
+
+ with patch("app.services.yuntu_api.call_yuntu_api") as mock_call:
+ mock_call.side_effect = SessionInvalidError("Invalid")
+
+ with patch("app.services.yuntu_api.session_pool"):
+ with pytest.raises(SessionInvalidError):
+ await get_video_analysis(
+ item_id="test",
+ publish_time=datetime.now(),
+ industry_id="20",
+ max_retries=3,
+ )
+
+ assert mock_call.call_count == 3
+
+ async def test_no_retry_on_api_error(self):
+ """Test that non-session errors don't trigger retry."""
+ with patch("app.services.yuntu_api.get_session_with_retry") as mock_session:
+ mock_session.return_value = "session"
+
+ with patch("app.services.yuntu_api.call_yuntu_api") as mock_call:
+ mock_call.side_effect = YuntuAPIError("Server error", status_code=500)
+
+ with pytest.raises(YuntuAPIError) as exc_info:
+ await get_video_analysis(
+ item_id="test",
+ publish_time=datetime.now(),
+ industry_id="20",
+ )
+
+ assert mock_call.call_count == 1
+ assert exc_info.value.status_code == 500
+
+ async def test_no_session_available(self):
+ """Test error when no session is available."""
+ with patch("app.services.yuntu_api.get_session_with_retry") as mock_session:
+ mock_session.return_value = None
+
+ with pytest.raises(YuntuAPIError):
+ await get_video_analysis(
+ item_id="test",
+ publish_time=datetime.now(),
+ industry_id="20",
+ )
+
+
+class TestParseAnalysisResponse:
+ """Tests for parse_analysis_response function."""
+
+ def test_parse_complete_response(self):
+ """Test parsing complete response data."""
+ response = {
+ "data": {
+ "total_show_cnt": 100000,
+ "natural_show_cnt": 80000,
+ "ad_show_cnt": 20000,
+ "total_play_cnt": 50000,
+ "natural_play_cnt": 40000,
+ "ad_play_cnt": 10000,
+ "effective_play_cnt": 30000,
+ "a3_increase_cnt": 500,
+ "ad_a3_increase_cnt": 100,
+ "natural_a3_increase_cnt": 400,
+ "after_view_search_uv": 1000,
+ "after_view_search_pv": 1500,
+ "brand_search_uv": 200,
+ "product_search_uv": 300,
+ "return_search_cnt": 50,
+ "cost": 10000.5,
+ "natural_cost": 0,
+ "ad_cost": 10000.5,
+ }
+ }
+
+ result = parse_analysis_response(response)
+
+ assert result["total_show_cnt"] == 100000
+ assert result["natural_show_cnt"] == 80000
+ assert result["a3_increase_cnt"] == 500
+ assert result["after_view_search_uv"] == 1000
+ assert result["cost"] == 10000.5
+
+ def test_parse_empty_response(self):
+ """Test parsing empty response."""
+ response = {"data": {}}
+
+ result = parse_analysis_response(response)
+
+ assert result["total_show_cnt"] == 0
+ assert result["a3_increase_cnt"] == 0
+ assert result["cost"] == 0
+
+ def test_parse_missing_data_key(self):
+ """Test parsing response without data key."""
+ response = {}
+
+ result = parse_analysis_response(response)
+
+ assert result["total_show_cnt"] == 0
+
+ def test_parse_partial_response(self):
+ """Test parsing partial response."""
+ response = {
+ "data": {
+ "total_show_cnt": 50000,
+ "a3_increase_cnt": 100,
+ }
+ }
+
+ result = parse_analysis_response(response)
+
+ assert result["total_show_cnt"] == 50000
+ assert result["a3_increase_cnt"] == 100
+ assert result["natural_show_cnt"] == 0 # Default value
+ assert result["cost"] == 0 # Default value
diff --git a/frontend/src/app/analysis/page.tsx b/frontend/src/app/analysis/page.tsx
new file mode 100644
index 0000000..f3fdb52
--- /dev/null
+++ b/frontend/src/app/analysis/page.tsx
@@ -0,0 +1,16 @@
+import VideoAnalysis from '@/components/VideoAnalysis';
+
+export default function AnalysisPage() {
+ return (
+