""" 视频审核 API """ import uuid from typing import Optional from fastapi import APIRouter, Depends, Header, HTTPException, status from fastapi.responses import JSONResponse from sqlalchemy import select, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.models.tenant import Tenant from app.models.review import ReviewTask, TaskStatus as DBTaskStatus, Platform as DBPlatform from app.schemas.review import ( VideoReviewRequest, VideoReviewSubmitResponse, VideoReviewProgressResponse, VideoReviewResultResponse, TaskStatus, Violation, ViolationType, RiskLevel, ViolationSource, SoftRiskWarning, ) from app.services.ai_service import AIServiceFactory from app.services.ai_client import OpenAICompatibleClient router = APIRouter(prefix="/videos", tags=["videos"]) async def _ensure_tenant_exists(tenant_id: str, db: AsyncSession) -> Tenant: """确保租户存在,不存在则自动创建""" result = await db.execute( select(Tenant).where(Tenant.id == tenant_id) ) tenant = result.scalar_one_or_none() if not tenant: tenant = Tenant(id=tenant_id, name=f"租户-{tenant_id}") db.add(tenant) await db.flush() return tenant @router.post( "/review", response_model=VideoReviewSubmitResponse, status_code=status.HTTP_202_ACCEPTED, ) async def submit_video_review( request: VideoReviewRequest, x_tenant_id: str = Header(..., alias="X-Tenant-ID"), db: AsyncSession = Depends(get_db), ) -> VideoReviewSubmitResponse: """ 提交视频审核 返回 202 Accepted,异步处理 """ # 确保租户存在 await _ensure_tenant_exists(x_tenant_id, db) review_id = f"review-{uuid.uuid4().hex[:12]}" # 创建审核任务 task = ReviewTask( id=review_id, tenant_id=x_tenant_id, video_url=str(request.video_url), platform=DBPlatform(request.platform.value), brand_id=request.brand_id, creator_id=request.creator_id, status=DBTaskStatus.PENDING, progress=0, current_step="等待处理", competitors=request.competitors, requirements=request.requirements, ) db.add(task) await db.commit() # 触发 Celery 异步任务 try: from app.tasks.review import process_video_review_task process_video_review_task.delay( review_id=review_id, tenant_id=x_tenant_id, video_url=str(request.video_url), brand_id=request.brand_id, platform=request.platform.value, ) except Exception: # Celery 不可用时,任务保持 PENDING 状态 # 后续可通过定时任务或手动触发处理 pass return VideoReviewSubmitResponse( review_id=review_id, status=TaskStatus.PENDING, ) @router.get( "/review/{review_id}/progress", response_model=VideoReviewProgressResponse, ) async def get_review_progress( review_id: str, x_tenant_id: str = Header(..., alias="X-Tenant-ID"), db: AsyncSession = Depends(get_db), ) -> VideoReviewProgressResponse: """ 查询审核进度 """ result = await db.execute( select(ReviewTask).where( and_( ReviewTask.id == review_id, ReviewTask.tenant_id == x_tenant_id, ) ) ) task = result.scalar_one_or_none() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"审核任务不存在: {review_id}", ) return VideoReviewProgressResponse( review_id=review_id, status=TaskStatus(task.status.value), progress=task.progress, current_step=task.current_step, ) @router.get("/review/{review_id}/result") async def get_review_result( review_id: str, x_tenant_id: str = Header(..., alias="X-Tenant-ID"), db: AsyncSession = Depends(get_db), ): """ 查询审核结果 - 未完成:返回 202 + 进度结构 - 已完成:返回 200 + 结果结构 """ result = await db.execute( select(ReviewTask).where( and_( ReviewTask.id == review_id, ReviewTask.tenant_id == x_tenant_id, ) ) ) task = result.scalar_one_or_none() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"审核任务不存在: {review_id}", ) # 未完成:返回 202 + 进度 if task.status in [DBTaskStatus.PENDING, DBTaskStatus.PROCESSING]: progress_response = VideoReviewProgressResponse( review_id=review_id, status=TaskStatus(task.status.value), progress=task.progress, current_step=task.current_step, ) return JSONResponse( status_code=status.HTTP_202_ACCEPTED, content=progress_response.model_dump(), ) # 失败:返回错误信息 if task.status == DBTaskStatus.FAILED: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=task.error_message or "审核任务失败", ) # 已完成:返回 200 + 结果 violations = [] if task.violations: for v in task.violations: violations.append(Violation(**v)) soft_warnings = [] if task.soft_warnings: for w in task.soft_warnings: soft_warnings.append(SoftRiskWarning(**w)) return VideoReviewResultResponse( review_id=review_id, status=TaskStatus.COMPLETED, score=task.score or 100, summary=task.summary or "审核完成", violations=violations, soft_warnings=soft_warnings, ) # ==================== AI 辅助审核方法 ==================== async def _perform_ai_video_review( task: ReviewTask, ai_client: OpenAICompatibleClient, text_model: str, vision_model: str, audio_model: str, db: AsyncSession, ) -> dict: """ 使用 AI 执行视频审核 流程: 1. 下载视频 2. ASR 转写 3. 提取关键帧 4. 视觉分析 (竞品 Logo) 5. OCR 字幕 6. 生成报告 """ violations = [] score = 100 try: # 更新进度: 开始处理 task.status = DBTaskStatus.PROCESSING task.progress = 10 task.current_step = "下载视频" await db.flush() # TODO: 实际实现需要集成视频处理库 # 1. 下载视频 # video_path = await download_video(task.video_url) # 2. ASR 转写 task.progress = 30 task.current_step = "语音转写" await db.flush() # asr_result = await ai_client.audio_transcription( # audio_url=task.video_url, # 需要提取音频 # model=audio_model, # ) # transcript = asr_result.content # 3. 提取关键帧 task.progress = 50 task.current_step = "提取关键帧" await db.flush() # frames = await extract_keyframes(video_path) # 4. 视觉分析 task.progress = 70 task.current_step = "视觉分析" await db.flush() # 检测竞品 Logo # if task.competitors: # vision_prompt = f""" # 分析这些视频截图,检测是否包含以下竞品品牌的 Logo 或标识: # 竞品列表: {task.competitors} # # 如果发现竞品,请返回: # 1. 竞品名称 # 2. 出现的帧编号 # 3. 置信度 (0-1) # """ # vision_result = await ai_client.vision_analysis( # image_urls=frames, # prompt=vision_prompt, # model=vision_model, # ) # 5. 文本综合分析 task.progress = 85 task.current_step = "综合分析" await db.flush() # analysis_prompt = f""" # 作为广告合规审核专家,请分析以下视频脚本内容: # # 脚本内容: # {transcript} # # 请检查: # 1. 是否包含广告法违禁词(最好、第一、最佳等极限词) # 2. 是否包含虚假功效宣称 # 3. 品牌信息是否正确 # # 返回 JSON 格式: # {{"violations": [...], "score": 0-100, "summary": "..."}} # """ # analysis_result = await ai_client.chat_completion( # messages=[{"role": "user", "content": analysis_prompt}], # model=text_model, # ) # 6. 完成审核 task.progress = 100 task.current_step = "审核完成" task.status = DBTaskStatus.COMPLETED task.score = score task.summary = "审核完成,未发现违规" if not violations else f"发现 {len(violations)} 处违规" task.violations = [v.model_dump() for v in violations] if violations else [] await db.flush() return { "score": score, "summary": task.summary, "violations": violations, } except Exception as e: task.status = DBTaskStatus.FAILED task.error_message = str(e) await db.flush() raise # ==================== 后台任务入口 ==================== async def process_video_review_task( review_id: str, tenant_id: str, db: AsyncSession, ): """ 处理视频审核任务(由 Celery 或后台任务调用) """ # 获取任务 result = await db.execute( select(ReviewTask).where( and_( ReviewTask.id == review_id, ReviewTask.tenant_id == tenant_id, ) ) ) task = result.scalar_one_or_none() if not task: return # 获取 AI 客户端 ai_client = await AIServiceFactory.get_client(tenant_id, db) if not ai_client: # 没有配置 AI,使用规则引擎审核 task.status = DBTaskStatus.COMPLETED task.score = 100 task.summary = "审核完成(规则引擎)" task.progress = 100 task.current_step = "审核完成" await db.flush() return # 获取模型配置 config = await AIServiceFactory.get_config(tenant_id, db) models = config.models # 执行 AI 审核 await _perform_ai_video_review( task=task, ai_client=ai_client, text_model=models.get("text", "gpt-4o"), vision_model=models.get("vision", "gpt-4o"), audio_model=models.get("audio", "whisper-1"), db=db, )