""" 审核任务 API """ import uuid from datetime import datetime, timezone from fastapi import APIRouter, Depends, Header, HTTPException, Query, status from sqlalchemy import select, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.models.tenant import Tenant from app.models.review import ManualTask, TaskStatus as DBTaskStatus, Platform as DBPlatform from app.schemas.review import ( TaskCreateRequest, TaskResponse, TaskListResponse, TaskScriptUploadRequest, TaskVideoUploadRequest, TaskApproveRequest, TaskRejectRequest, TaskStatus, Platform, ) router = APIRouter(prefix="/tasks", tags=["tasks"]) async def _ensure_tenant_exists(tenant_id: str, db: AsyncSession) -> Tenant: """确保租户存在,不存在则自动创建""" result = await db.execute( select(Tenant).where(Tenant.id == tenant_id) ) tenant = result.scalar_one_or_none() if not tenant: tenant = Tenant(id=tenant_id, name=f"租户-{tenant_id}") db.add(tenant) await db.flush() return tenant def _task_to_response(task: ManualTask) -> TaskResponse: """将数据库模型转换为响应模型""" return TaskResponse( task_id=task.id, video_url=task.video_url, script_content=task.script_content, script_file_url=task.script_file_url, has_script=bool(task.script_content or task.script_file_url), has_video=bool(task.video_url), platform=Platform(task.platform.value), creator_id=task.creator_id, status=TaskStatus(task.status.value), created_at=task.created_at.isoformat() if task.created_at else "", ) @router.post("", response_model=TaskResponse, status_code=status.HTTP_201_CREATED) async def create_task( request: TaskCreateRequest, x_tenant_id: str = Header(..., alias="X-Tenant-ID"), db: AsyncSession = Depends(get_db), ) -> TaskResponse: """ 创建审核任务 """ # 确保租户存在 await _ensure_tenant_exists(x_tenant_id, db) task_id = f"task-{uuid.uuid4().hex[:12]}" task = ManualTask( id=task_id, tenant_id=x_tenant_id, video_url=str(request.video_url) if request.video_url else None, video_uploaded_at=datetime.now(timezone.utc) if request.video_url else None, platform=DBPlatform(request.platform.value), creator_id=request.creator_id, status=DBTaskStatus.PENDING, script_content=request.script_content, script_file_url=str(request.script_file_url) if request.script_file_url else None, script_uploaded_at=datetime.now(timezone.utc) if request.script_content or request.script_file_url else None, ) db.add(task) await db.flush() await db.refresh(task) return _task_to_response(task) @router.post("/{task_id}/script", response_model=TaskResponse) async def upload_task_script( task_id: str, request: TaskScriptUploadRequest, x_tenant_id: str = Header(..., alias="X-Tenant-ID"), db: AsyncSession = Depends(get_db), ) -> TaskResponse: """ 上传/更新任务脚本 """ if not request.script_content and not request.script_file_url: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="script_content 或 script_file_url 至少提供一个", ) result = await db.execute( select(ManualTask).where( and_( ManualTask.id == task_id, ManualTask.tenant_id == x_tenant_id, ) ) ) task = result.scalar_one_or_none() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"任务不存在: {task_id}", ) task.script_content = request.script_content task.script_file_url = ( str(request.script_file_url) if request.script_file_url else None ) task.script_uploaded_at = datetime.now(timezone.utc) await db.flush() await db.refresh(task) return _task_to_response(task) @router.post("/{task_id}/video", response_model=TaskResponse) async def upload_task_video( task_id: str, request: TaskVideoUploadRequest, x_tenant_id: str = Header(..., alias="X-Tenant-ID"), db: AsyncSession = Depends(get_db), ) -> TaskResponse: """ 上传/更新任务视频 """ result = await db.execute( select(ManualTask).where( and_( ManualTask.id == task_id, ManualTask.tenant_id == x_tenant_id, ) ) ) task = result.scalar_one_or_none() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"任务不存在: {task_id}", ) task.video_url = str(request.video_url) task.video_uploaded_at = datetime.now(timezone.utc) await db.flush() await db.refresh(task) return _task_to_response(task) @router.get("/{task_id}", response_model=TaskResponse) async def get_task( task_id: str, x_tenant_id: str = Header(..., alias="X-Tenant-ID"), db: AsyncSession = Depends(get_db), ) -> TaskResponse: """ 查询单个任务 """ result = await db.execute( select(ManualTask).where( and_( ManualTask.id == task_id, ManualTask.tenant_id == x_tenant_id, ) ) ) task = result.scalar_one_or_none() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"任务不存在: {task_id}", ) return _task_to_response(task) @router.get("", response_model=TaskListResponse) async def list_tasks( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), task_status: TaskStatus = Query(None, alias="status"), platform: Platform = None, x_tenant_id: str = Header(..., alias="X-Tenant-ID"), db: AsyncSession = Depends(get_db), ) -> TaskListResponse: """ 查询任务列表 支持分页和筛选 """ # 构建查询 query = select(ManualTask).where(ManualTask.tenant_id == x_tenant_id) if task_status: query = query.where(ManualTask.status == DBTaskStatus(task_status.value)) if platform: query = query.where(ManualTask.platform == DBPlatform(platform.value)) # 按创建时间倒序排列 query = query.order_by(ManualTask.created_at.desc()) # 执行查询获取总数 count_result = await db.execute( select(ManualTask.id).where(ManualTask.tenant_id == x_tenant_id) ) total = len(count_result.all()) # 分页 offset = (page - 1) * page_size query = query.offset(offset).limit(page_size) result = await db.execute(query) tasks = result.scalars().all() return TaskListResponse( items=[_task_to_response(t) for t in tasks], total=total, page=page, page_size=page_size, ) @router.post("/{task_id}/approve", response_model=TaskResponse) async def approve_task( task_id: str, request: TaskApproveRequest, x_tenant_id: str = Header(..., alias="X-Tenant-ID"), db: AsyncSession = Depends(get_db), ) -> TaskResponse: """ 通过任务 """ result = await db.execute( select(ManualTask).where( and_( ManualTask.id == task_id, ManualTask.tenant_id == x_tenant_id, ) ) ) task = result.scalar_one_or_none() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"任务不存在: {task_id}", ) task.status = DBTaskStatus.APPROVED task.approve_comment = request.comment task.reviewed_at = datetime.now(timezone.utc) await db.flush() await db.refresh(task) return _task_to_response(task) @router.post("/{task_id}/reject", response_model=TaskResponse) async def reject_task( task_id: str, request: TaskRejectRequest, x_tenant_id: str = Header(..., alias="X-Tenant-ID"), db: AsyncSession = Depends(get_db), ) -> TaskResponse: """ 驳回任务 """ result = await db.execute( select(ManualTask).where( and_( ManualTask.id == task_id, ManualTask.tenant_id == x_tenant_id, ) ) ) task = result.scalar_one_or_none() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"任务不存在: {task_id}", ) task.status = DBTaskStatus.REJECTED task.reject_reason = request.reason task.reject_violations = request.violations task.reviewed_at = datetime.now(timezone.utc) await db.flush() await db.refresh(task) return _task_to_response(task)