""" 视频分析服务 (T-024) 实现视频分析数据获取和成本指标计算。 """ import asyncio import logging from datetime import datetime from typing import Any, Dict, Optional from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from app.models.kol_video import KolVideo from app.services.session_pool import ( get_distinct_configs, get_random_config, session_pool, ) from app.services.yuntu_api import ( SessionInvalidError, call_yuntu_api, parse_analysis_response, ) from app.services.yuntu_api import ( get_video_analysis as fetch_yuntu_analysis, ) logger = logging.getLogger(__name__) def _needs_api_call(video: KolVideo) -> bool: """ 判断是否需要调用 Yuntu API 获取 A3/Cost 数据。 如果数据库中已有 A3 或 Cost 数据,直接使用数据库数据,不调 API。 """ has_a3 = (video.total_new_a3_cnt or 0) > 0 has_cost = (video.total_cost or 0) > 0 return not (has_a3 or has_cost) def calculate_cost_metrics( cost: float, natural_play_cnt: int, a3_increase_cnt: int, natural_a3_increase_cnt: int, after_view_search_uv: int, total_play_cnt: int, ) -> Dict[str, Optional[float]]: """ 计算成本指标。 Args: cost: 总花费 natural_play_cnt: 自然播放数 a3_increase_cnt: 新增A3 natural_a3_increase_cnt: 自然新增A3 after_view_search_uv: 看后搜人数 total_play_cnt: 总播放数 Returns: Dict: 成本指标字典 """ metrics = {} # CPM = cost / total_play_cnt * 1000 if total_play_cnt and total_play_cnt > 0: metrics["cpm"] = round(cost / total_play_cnt * 1000, 2) else: metrics["cpm"] = None # 自然CPM = cost / natural_play_cnt * 1000 if natural_play_cnt and natural_play_cnt > 0: metrics["natural_cpm"] = round(cost / natural_play_cnt * 1000, 2) else: metrics["natural_cpm"] = None # CPA3 = cost / a3_increase_cnt if a3_increase_cnt and a3_increase_cnt > 0: metrics["cpa3"] = round(cost / a3_increase_cnt, 2) else: metrics["cpa3"] = None # 自然CPA3 = cost / natural_a3_increase_cnt if natural_a3_increase_cnt and natural_a3_increase_cnt > 0: metrics["natural_cpa3"] = round(cost / natural_a3_increase_cnt, 2) else: metrics["natural_cpa3"] = None # CPsearch = cost / after_view_search_uv if after_view_search_uv and after_view_search_uv > 0: metrics["cp_search"] = round(cost / after_view_search_uv, 2) else: metrics["cp_search"] = None # 预估自然看后搜人数 = natural_play_cnt / total_play_cnt * after_view_search_uv if total_play_cnt and total_play_cnt > 0 and after_view_search_uv: estimated_natural_search_uv = ( natural_play_cnt / total_play_cnt * after_view_search_uv ) metrics["estimated_natural_search_uv"] = round(estimated_natural_search_uv, 2) # 自然CPsearch = cost / estimated_natural_search_uv if estimated_natural_search_uv > 0: metrics["natural_cp_search"] = round(cost / estimated_natural_search_uv, 2) else: metrics["natural_cp_search"] = None else: metrics["estimated_natural_search_uv"] = None metrics["natural_cp_search"] = None return metrics async def get_video_base_info( session: AsyncSession, item_id: str ) -> Optional[KolVideo]: """ 从数据库获取视频基础信息。 Args: session: 数据库会话 item_id: 视频ID Returns: KolVideo or None """ stmt = select(KolVideo).where(KolVideo.item_id == item_id) result = await session.execute(stmt) return result.scalar_one_or_none() async def get_video_analysis_data( session: AsyncSession, item_id: str ) -> Dict[str, Any]: """ 获取视频分析数据(T-024主接口)。 返回6大类指标(匹配前端 VideoAnalysisData 类型): - base_info: 基础信息 - reach_metrics: 触达指标 - a3_metrics: A3指标 - search_metrics: 搜索指标 - cost_metrics: 费用指标 - calculated_metrics: 成本指标(实时计算) Args: session: 数据库会话 item_id: 视频ID Returns: Dict: 完整的视频分析数据 Raises: ValueError: 视频不存在时抛出 """ from app.services.brand_api import get_brand_names # 1. 从数据库获取基础信息 video = await get_video_base_info(session, item_id) if video is None: raise ValueError(f"Video not found: {item_id}") # 2. 获取品牌名称 brand_name = "" if video.brand_id: brand_map = await get_brand_names([video.brand_id]) brand_name = brand_map.get(video.brand_id, video.brand_id) # 3. 获取 A3 数据和 cost(缓存优先策略) a3_increase_cnt = 0 ad_a3_increase_cnt = 0 natural_a3_increase_cnt = 0 api_cost = 0.0 ad_cost = 0.0 if not _needs_api_call(video): # 数据库已有数据,直接使用 logger.info(f"Using DB data for {item_id} (A3/Cost already cached)") a3_increase_cnt = video.total_new_a3_cnt or 0 ad_a3_increase_cnt = video.heated_new_a3_cnt or 0 natural_a3_increase_cnt = video.natural_new_a3_cnt or 0 api_cost = video.total_cost or 0.0 ad_cost = video.heated_cost or 0.0 else: # 需要调用 API 获取数据 try: publish_time = video.publish_time or datetime.now() industry_id = video.industry_id or "" api_response = await fetch_yuntu_analysis( item_id=item_id, publish_time=publish_time, industry_id=industry_id, ) analysis_data = parse_analysis_response(api_response) a3_increase_cnt = analysis_data.get("a3_increase_cnt", 0) ad_a3_increase_cnt = analysis_data.get("ad_a3_increase_cnt", 0) natural_a3_increase_cnt = analysis_data.get("natural_a3_increase_cnt", 0) api_cost = analysis_data.get("cost", 0) ad_cost = analysis_data.get("ad_cost", 0) # 写回数据库 await update_video_a3_metrics( session=session, item_id=item_id, total_new_a3_cnt=int(a3_increase_cnt), heated_new_a3_cnt=int(ad_a3_increase_cnt), natural_new_a3_cnt=int(natural_a3_increase_cnt), total_cost=float(api_cost), heated_cost=float(ad_cost), ) logger.info(f"API data fetched and saved to DB for {item_id}") except Exception as e: logger.warning(f"API failed for {item_id}: {e}, using DB data") a3_increase_cnt = video.total_new_a3_cnt or 0 ad_a3_increase_cnt = video.heated_new_a3_cnt or 0 natural_a3_increase_cnt = video.natural_new_a3_cnt or 0 api_cost = video.total_cost or 0.0 ad_cost = video.heated_cost or 0.0 # 4. 数据库字段 estimated_video_cost = video.estimated_video_cost or 0.0 natural_play_cnt = video.natural_play_cnt or 0 heated_play_cnt = video.heated_play_cnt or 0 total_play_cnt = video.total_play_cnt or 0 after_view_search_uv = video.after_view_search_uv or 0 # 5. 计算成本指标 heated_cost = ad_cost # 预估自然看后搜人数 estimated_natural_search_uv = None if total_play_cnt > 0 and after_view_search_uv > 0: estimated_natural_search_uv = round((natural_play_cnt / total_play_cnt) * after_view_search_uv, 2) # 预估CPM = (total_cost / total_play_cnt) * 1000 estimated_cpm = round((api_cost / total_play_cnt) * 1000, 2) if total_play_cnt > 0 else None # 预估自然CPM = (estimated_video_cost / natural_play_cnt) * 1000 estimated_natural_cpm = round((estimated_video_cost / natural_play_cnt) * 1000, 2) if natural_play_cnt > 0 else None # 预估CPA3 = total_cost / a3_increase_cnt estimated_cp_a3 = round(api_cost / a3_increase_cnt, 2) if a3_increase_cnt > 0 else None # 预估自然CPA3 = estimated_video_cost / natural_a3_increase_cnt estimated_natural_cp_a3 = round(estimated_video_cost / natural_a3_increase_cnt, 2) if natural_a3_increase_cnt > 0 else None # 预估CPsearch = total_cost / after_view_search_uv estimated_cp_search = round(api_cost / after_view_search_uv, 2) if after_view_search_uv > 0 else None # 自然CPsearch = estimated_video_cost / estimated_natural_search_uv estimated_natural_cp_search = round(estimated_video_cost / estimated_natural_search_uv, 2) if estimated_natural_search_uv and estimated_natural_search_uv > 0 else None # 6. 组装返回数据(匹配前端 VideoAnalysisData 类型) return { "base_info": { "star_nickname": video.star_nickname or "", "star_unique_id": video.star_unique_id or "", "vid": video.item_id, "title": video.title or "", "create_date": video.publish_time.isoformat() if video.publish_time else None, "hot_type": video.viral_type or "", "industry_id": video.industry_id or "", "brand_id": video.brand_id or "", "brand_name": brand_name, "video_url": video.video_url or "", }, "reach_metrics": { "natural_play_cnt": natural_play_cnt, "heated_play_cnt": heated_play_cnt, "total_play_cnt": total_play_cnt, "total_interaction_cnt": video.total_interact or 0, "digg_cnt": video.like_cnt or 0, "share_cnt": video.share_cnt or 0, "comment_cnt": video.comment_cnt or 0, }, "a3_metrics": { "total_new_a3_cnt": a3_increase_cnt, "heated_new_a3_cnt": ad_a3_increase_cnt, "natural_new_a3_cnt": natural_a3_increase_cnt, }, "search_metrics": { "back_search_uv": video.return_search_cnt or 0, "back_search_cnt": video.return_search_cnt or 0, "after_view_search_uv": after_view_search_uv, "after_view_search_cnt": after_view_search_uv, "estimated_natural_search_uv": estimated_natural_search_uv, }, "cost_metrics": { "total_cost": api_cost, "heated_cost": heated_cost, "estimated_video_cost": estimated_video_cost, }, "calculated_metrics": { "estimated_cpm": estimated_cpm, "estimated_natural_cpm": estimated_natural_cpm, "estimated_cp_a3": estimated_cp_a3, "estimated_natural_cp_a3": estimated_natural_cp_a3, "estimated_cp_search": estimated_cp_search, "estimated_natural_cp_search": estimated_natural_cp_search, }, } async def update_video_a3_metrics( session: AsyncSession, item_id: str, total_new_a3_cnt: int, heated_new_a3_cnt: int, natural_new_a3_cnt: int, total_cost: float, heated_cost: float = 0.0, ) -> bool: """ 更新数据库中的A3指标和费用数据 (T-025)。 Args: session: 数据库会话 item_id: 视频ID total_new_a3_cnt: 总新增A3 heated_new_a3_cnt: 加热新增A3 natural_new_a3_cnt: 自然新增A3 total_cost: 预估总费用 heated_cost: 预估加热费用 Returns: bool: 更新是否成功 """ try: stmt = ( update(KolVideo) .where(KolVideo.item_id == item_id) .values( total_new_a3_cnt=total_new_a3_cnt, heated_new_a3_cnt=heated_new_a3_cnt, natural_new_a3_cnt=natural_new_a3_cnt, total_cost=total_cost, heated_cost=heated_cost, ) ) result = await session.execute(stmt) await session.commit() if result.rowcount > 0: logger.info(f"Updated A3 metrics for video {item_id}") return True else: logger.warning(f"No video found to update: {item_id}") return False except Exception as e: logger.error(f"Failed to update A3 metrics for {item_id}: {e}") await session.rollback() return False async def search_videos_by_star_id( session: AsyncSession, star_id: str ) -> list[KolVideo]: """根据星图ID精准匹配搜索视频列表。""" stmt = select(KolVideo).where(KolVideo.star_id == star_id) result = await session.execute(stmt) return list(result.scalars().all()) async def search_videos_by_unique_id( session: AsyncSession, unique_id: str ) -> list[KolVideo]: """根据达人unique_id精准匹配搜索视频列表。""" stmt = select(KolVideo).where(KolVideo.star_unique_id == unique_id) result = await session.execute(stmt) return list(result.scalars().all()) async def search_videos_by_nickname( session: AsyncSession, nickname: str ) -> list[KolVideo]: """根据达人昵称模糊匹配搜索视频列表。""" stmt = select(KolVideo).where(KolVideo.star_nickname.ilike(f"%{nickname}%")) result = await session.execute(stmt) return list(result.scalars().all()) def _build_video_list_item( video: KolVideo, a3_increase_cnt: int, ad_a3_increase_cnt: int, natural_a3_increase_cnt: int, api_cost: float, brand_name: str, ) -> Dict[str, Any]: """构建视频列表项的结果字典。""" estimated_video_cost = video.estimated_video_cost or 0.0 natural_play_cnt = video.natural_play_cnt or 0 total_play_cnt = video.total_play_cnt or 0 after_view_search_uv = video.after_view_search_uv or 0 estimated_natural_search_uv = None if total_play_cnt > 0 and after_view_search_uv > 0: estimated_natural_search_uv = (natural_play_cnt / total_play_cnt) * after_view_search_uv estimated_natural_cpm = round((estimated_video_cost / natural_play_cnt) * 1000, 2) if natural_play_cnt > 0 else None estimated_cp_a3 = round(api_cost / a3_increase_cnt, 2) if a3_increase_cnt > 0 else None estimated_natural_cp_a3 = round(estimated_video_cost / natural_a3_increase_cnt, 2) if natural_a3_increase_cnt > 0 else None estimated_cp_search = round(api_cost / after_view_search_uv, 2) if after_view_search_uv > 0 else None estimated_natural_cp_search = round(estimated_video_cost / estimated_natural_search_uv, 2) if estimated_natural_search_uv and estimated_natural_search_uv > 0 else None return { "item_id": video.item_id, "star_nickname": video.star_nickname or "", "title": video.title or "", "video_url": video.video_url or "", "create_date": video.publish_time.isoformat() if video.publish_time else None, "hot_type": video.viral_type or "", "industry_id": video.industry_id or "", "brand_id": video.brand_id or "", "brand_name": brand_name, "total_new_a3_cnt": a3_increase_cnt, "heated_new_a3_cnt": ad_a3_increase_cnt, "natural_new_a3_cnt": natural_a3_increase_cnt, "estimated_natural_cpm": estimated_natural_cpm, "estimated_cp_a3": estimated_cp_a3, "estimated_natural_cp_a3": estimated_natural_cp_a3, "estimated_cp_search": estimated_cp_search, "estimated_natural_cp_search": estimated_natural_cp_search, } async def get_video_list_with_a3( session: AsyncSession, videos: list[KolVideo] ) -> list[Dict[str, Any]]: """ 获取视频列表的摘要数据。 缓存优先策略: - 数据库有 A3/Cost 数据 → 直接使用 - 数据库无数据 → 并发调用云图 API(预分配不同 cookie)→ 写回数据库 """ from app.services.brand_api import get_brand_names # 批量获取品牌名称 brand_ids = [video.brand_id for video in videos if video.brand_id] brand_map = await get_brand_names(brand_ids) if brand_ids else {} # 分组:已有数据 vs 需要 API 调用 cached_videos: list[tuple[int, KolVideo]] = [] # (原始索引, video) api_videos: list[tuple[int, KolVideo]] = [] # (原始索引, video) for idx, video in enumerate(videos): if _needs_api_call(video): api_videos.append((idx, video)) else: cached_videos.append((idx, video)) logger.info( f"Video list: {len(cached_videos)} cached, {len(api_videos)} need API" ) # 结果数组(按原始索引填充) results: list[Optional[Dict[str, Any]]] = [None] * len(videos) # 组 A:直接用数据库数据 for idx, video in cached_videos: brand_name = brand_map.get(video.brand_id, video.brand_id or "") if video.brand_id else "" results[idx] = _build_video_list_item( video=video, a3_increase_cnt=video.total_new_a3_cnt or 0, ad_a3_increase_cnt=video.heated_new_a3_cnt or 0, natural_a3_increase_cnt=video.natural_new_a3_cnt or 0, api_cost=video.total_cost or 0.0, brand_name=brand_name, ) # 组 B:并发调用 API(预分配不同 cookie) if api_videos: configs = await get_distinct_configs(len(api_videos)) semaphore = asyncio.Semaphore(5) # 收集需要写回 DB 的数据(避免并发 session 操作) pending_updates: list[Dict[str, Any]] = [] async def _fetch_single( idx: int, video: KolVideo, config: Dict[str, Any] ) -> None: a3_increase_cnt = 0 ad_a3_increase_cnt = 0 natural_a3_increase_cnt = 0 api_cost = 0.0 ad_cost_val = 0.0 api_success = False async with semaphore: try: publish_time = video.publish_time or datetime.now() industry_id = video.industry_id or "" api_response = await call_yuntu_api( item_id=video.item_id, publish_time=publish_time, industry_id=industry_id, aadvid=config["aadvid"], auth_token=config["auth_token"], ) api_data = parse_analysis_response(api_response) a3_increase_cnt = api_data.get("a3_increase_cnt", 0) ad_a3_increase_cnt = api_data.get("ad_a3_increase_cnt", 0) natural_a3_increase_cnt = api_data.get("natural_a3_increase_cnt", 0) api_cost = api_data.get("cost", 0) ad_cost_val = api_data.get("ad_cost", 0) api_success = True except SessionInvalidError: # Session 失效,从池中移除,重新获取随机 config 重试 session_pool.remove_by_auth_token(config["auth_token"]) logger.warning(f"Session invalid for {video.item_id}, retrying") retry_config = await get_random_config() if retry_config: try: publish_time = video.publish_time or datetime.now() industry_id = video.industry_id or "" api_response = await call_yuntu_api( item_id=video.item_id, publish_time=publish_time, industry_id=industry_id, aadvid=retry_config["aadvid"], auth_token=retry_config["auth_token"], ) api_data = parse_analysis_response(api_response) a3_increase_cnt = api_data.get("a3_increase_cnt", 0) ad_a3_increase_cnt = api_data.get("ad_a3_increase_cnt", 0) natural_a3_increase_cnt = api_data.get("natural_a3_increase_cnt", 0) api_cost = api_data.get("cost", 0) ad_cost_val = api_data.get("ad_cost", 0) api_success = True except Exception as e2: logger.warning(f"Retry failed for {video.item_id}: {e2}") a3_increase_cnt = video.total_new_a3_cnt or 0 ad_a3_increase_cnt = video.heated_new_a3_cnt or 0 natural_a3_increase_cnt = video.natural_new_a3_cnt or 0 api_cost = video.total_cost or 0.0 except Exception as e: logger.warning(f"API failed for {video.item_id}: {e}") a3_increase_cnt = video.total_new_a3_cnt or 0 ad_a3_increase_cnt = video.heated_new_a3_cnt or 0 natural_a3_increase_cnt = video.natural_new_a3_cnt or 0 api_cost = video.total_cost or 0.0 # 收集待写回 DB 的数据(不在并发中操作 session) if api_success: pending_updates.append({ "item_id": video.item_id, "total_new_a3_cnt": int(a3_increase_cnt), "heated_new_a3_cnt": int(ad_a3_increase_cnt), "natural_new_a3_cnt": int(natural_a3_increase_cnt), "total_cost": float(api_cost), "heated_cost": float(ad_cost_val), }) brand_name = brand_map.get(video.brand_id, video.brand_id or "") if video.brand_id else "" results[idx] = _build_video_list_item( video=video, a3_increase_cnt=a3_increase_cnt, ad_a3_increase_cnt=ad_a3_increase_cnt, natural_a3_increase_cnt=natural_a3_increase_cnt, api_cost=api_cost, brand_name=brand_name, ) # 为每个视频分配一个独立的 config,并发执行 tasks = [] for i, (idx, video) in enumerate(api_videos): config = configs[i] if i < len(configs) else configs[i % len(configs)] if configs else {} tasks.append(_fetch_single(idx, video, config)) await asyncio.gather(*tasks) # 顺序写回 DB(避免并发 session 操作导致状态损坏) for upd in pending_updates: await update_video_a3_metrics( session=session, item_id=upd["item_id"], total_new_a3_cnt=upd["total_new_a3_cnt"], heated_new_a3_cnt=upd["heated_new_a3_cnt"], natural_new_a3_cnt=upd["natural_new_a3_cnt"], total_cost=upd["total_cost"], heated_cost=upd["heated_cost"], ) # 过滤 None(不应发生,防御性编程) return [r for r in results if r is not None]