Your Name a32102f583 feat: 补全后端 API 并对齐前后端类型
- 后端新增: Project CRUD / Brief CRUD / 组织关系管理 / 工作台统计 / SSE 推送 / 认证依赖注入
- 后端完善: 任务 API 全流程(创建/审核/申诉) + Task Service + Task Schema
- 前端修复: login 页面 localStorage key 错误 (miaosi_auth -> miaosi_user)
- 前端对齐: types/task.ts 与后端 TaskStage/TaskResponse 完全对齐
- 前端新增: project/brief/organization/dashboard 类型定义
- 前端补全: api.ts 新增 30+ API 方法覆盖所有后端接口

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 14:13:08 +08:00

634 lines
18 KiB
Python

"""
任务服务
处理任务的创建、状态流转、审核等业务逻辑
"""
from typing import Optional, List, Tuple
from datetime import datetime, timezone
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.task import Task, TaskStage, TaskStatus
from app.models.project import Project
from app.models.organization import Brand, Agency, Creator
from app.models.user import User, UserRole
from app.services.auth import generate_id
async def get_next_task_sequence(
db: AsyncSession,
project_id: str,
creator_id: str,
) -> int:
"""获取该项目下该达人的下一个任务序号"""
result = await db.execute(
select(func.count(Task.id)).where(
and_(
Task.project_id == project_id,
Task.creator_id == creator_id,
)
)
)
count = result.scalar() or 0
return count + 1
async def create_task(
db: AsyncSession,
project_id: str,
agency_id: str,
creator_id: str,
name: Optional[str] = None,
) -> Task:
"""
创建任务(代理商操作)
- 自动生成任务名称 "宣传任务(N)"
- 初始阶段: script_upload
"""
# 获取序号
sequence = await get_next_task_sequence(db, project_id, creator_id)
# 生成任务名称
if not name:
name = f"宣传任务({sequence})"
task = Task(
id=generate_id("TK"),
project_id=project_id,
agency_id=agency_id,
creator_id=creator_id,
name=name,
sequence=sequence,
stage=TaskStage.SCRIPT_UPLOAD,
appeal_count=1, # 初始申诉次数
)
db.add(task)
await db.flush()
await db.refresh(task)
return task
async def get_task_by_id(
db: AsyncSession,
task_id: str,
) -> Optional[Task]:
"""通过 ID 获取任务(带关联加载)"""
result = await db.execute(
select(Task)
.options(
selectinload(Task.project),
selectinload(Task.agency),
selectinload(Task.creator),
)
.where(Task.id == task_id)
)
return result.scalar_one_or_none()
async def check_task_permission(
task: Task,
user: User,
db: AsyncSession,
) -> bool:
"""
检查用户是否有权限访问任务
- 达人: 只能访问分配给自己的任务
- 代理商: 只能访问自己创建的任务
- 品牌方: 可以访问自己项目下的所有任务
"""
if user.role == UserRole.CREATOR:
result = await db.execute(
select(Creator).where(Creator.user_id == user.id)
)
creator = result.scalar_one_or_none()
return creator and task.creator_id == creator.id
elif user.role == UserRole.AGENCY:
result = await db.execute(
select(Agency).where(Agency.user_id == user.id)
)
agency = result.scalar_one_or_none()
return agency and task.agency_id == agency.id
elif user.role == UserRole.BRAND:
result = await db.execute(
select(Brand).where(Brand.user_id == user.id)
)
brand = result.scalar_one_or_none()
if not brand:
return False
result = await db.execute(
select(Project).where(Project.id == task.project_id)
)
project = result.scalar_one_or_none()
return project and project.brand_id == brand.id
return False
async def upload_script(
db: AsyncSession,
task: Task,
file_url: str,
file_name: str,
) -> Task:
"""
上传脚本(达人操作)
- 更新脚本信息
- 状态流转到 script_ai_review
"""
if task.stage not in [TaskStage.SCRIPT_UPLOAD, TaskStage.REJECTED]:
raise ValueError(f"当前阶段 {task.stage.value} 不允许上传脚本")
task.script_file_url = file_url
task.script_file_name = file_name
task.script_uploaded_at = datetime.now(timezone.utc)
task.stage = TaskStage.SCRIPT_AI_REVIEW
# 如果是申诉重新上传,重置申诉状态
if task.is_appeal:
task.is_appeal = False
task.appeal_reason = None
await db.flush()
await db.refresh(task)
return task
async def upload_video(
db: AsyncSession,
task: Task,
file_url: str,
file_name: str,
duration: Optional[int] = None,
thumbnail_url: Optional[str] = None,
) -> Task:
"""
上传视频(达人操作)
- 更新视频信息
- 状态流转到 video_ai_review
"""
if task.stage not in [TaskStage.VIDEO_UPLOAD, TaskStage.REJECTED]:
raise ValueError(f"当前阶段 {task.stage.value} 不允许上传视频")
task.video_file_url = file_url
task.video_file_name = file_name
task.video_duration = duration
task.video_thumbnail_url = thumbnail_url
task.video_uploaded_at = datetime.now(timezone.utc)
task.stage = TaskStage.VIDEO_AI_REVIEW
# 如果是申诉重新上传,重置申诉状态
if task.is_appeal:
task.is_appeal = False
task.appeal_reason = None
await db.flush()
await db.refresh(task)
return task
async def complete_ai_review(
db: AsyncSession,
task: Task,
review_type: str, # "script" or "video"
score: int,
result: dict,
) -> Task:
"""
完成 AI 审核
- 更新 AI 审核结果
- 状态流转到代理商审核
"""
now = datetime.now(timezone.utc)
if review_type == "script":
if task.stage != TaskStage.SCRIPT_AI_REVIEW:
raise ValueError(f"当前阶段 {task.stage.value} 不在脚本 AI 审核中")
task.script_ai_score = score
task.script_ai_result = result
task.script_ai_reviewed_at = now
task.stage = TaskStage.SCRIPT_AGENCY_REVIEW
elif review_type == "video":
if task.stage != TaskStage.VIDEO_AI_REVIEW:
raise ValueError(f"当前阶段 {task.stage.value} 不在视频 AI 审核中")
task.video_ai_score = score
task.video_ai_result = result
task.video_ai_reviewed_at = now
task.stage = TaskStage.VIDEO_AGENCY_REVIEW
else:
raise ValueError(f"不支持的审核类型: {review_type}")
await db.flush()
await db.refresh(task)
return task
async def agency_review(
db: AsyncSession,
task: Task,
reviewer_id: str,
action: str, # "pass" | "reject" | "force_pass"
comment: Optional[str] = None,
) -> Task:
"""
代理商审核
- pass: 通过,进入品牌方审核(如果开启)或下一阶段
- reject: 驳回,回到上传阶段
- force_pass: 强制通过,跳过品牌方审核
"""
now = datetime.now(timezone.utc)
# 获取项目信息以检查是否开启品牌方终审
project = await db.execute(
select(Project)
.options(selectinload(Project.brand))
.where(Project.id == task.project_id)
)
project = project.scalar_one_or_none()
brand_review_enabled = project and project.brand and project.brand.final_review_enabled
if task.stage == TaskStage.SCRIPT_AGENCY_REVIEW:
if action == "pass":
task.script_agency_status = TaskStatus.PASSED
if brand_review_enabled:
task.stage = TaskStage.SCRIPT_BRAND_REVIEW
else:
task.stage = TaskStage.VIDEO_UPLOAD
elif action == "reject":
task.script_agency_status = TaskStatus.REJECTED
task.stage = TaskStage.REJECTED
elif action == "force_pass":
task.script_agency_status = TaskStatus.FORCE_PASSED
task.stage = TaskStage.VIDEO_UPLOAD # 跳过品牌方审核
else:
raise ValueError(f"不支持的操作: {action}")
task.script_agency_comment = comment
task.script_agency_reviewer_id = reviewer_id
task.script_agency_reviewed_at = now
elif task.stage == TaskStage.VIDEO_AGENCY_REVIEW:
if action == "pass":
task.video_agency_status = TaskStatus.PASSED
if brand_review_enabled:
task.stage = TaskStage.VIDEO_BRAND_REVIEW
else:
task.stage = TaskStage.COMPLETED
elif action == "reject":
task.video_agency_status = TaskStatus.REJECTED
task.stage = TaskStage.REJECTED
elif action == "force_pass":
task.video_agency_status = TaskStatus.FORCE_PASSED
task.stage = TaskStage.COMPLETED # 跳过品牌方审核
else:
raise ValueError(f"不支持的操作: {action}")
task.video_agency_comment = comment
task.video_agency_reviewer_id = reviewer_id
task.video_agency_reviewed_at = now
else:
raise ValueError(f"当前阶段 {task.stage.value} 不在代理商审核中")
await db.flush()
await db.refresh(task)
return task
async def brand_review(
db: AsyncSession,
task: Task,
reviewer_id: str,
action: str, # "pass" | "reject"
comment: Optional[str] = None,
) -> Task:
"""
品牌方终审
- pass: 通过,进入下一阶段
- reject: 驳回,回到上传阶段(需要走申诉流程)
"""
now = datetime.now(timezone.utc)
if task.stage == TaskStage.SCRIPT_BRAND_REVIEW:
if action == "pass":
task.script_brand_status = TaskStatus.PASSED
task.stage = TaskStage.VIDEO_UPLOAD
elif action == "reject":
task.script_brand_status = TaskStatus.REJECTED
task.stage = TaskStage.REJECTED
else:
raise ValueError(f"不支持的操作: {action}")
task.script_brand_comment = comment
task.script_brand_reviewer_id = reviewer_id
task.script_brand_reviewed_at = now
elif task.stage == TaskStage.VIDEO_BRAND_REVIEW:
if action == "pass":
task.video_brand_status = TaskStatus.PASSED
task.stage = TaskStage.COMPLETED
elif action == "reject":
task.video_brand_status = TaskStatus.REJECTED
task.stage = TaskStage.REJECTED
else:
raise ValueError(f"不支持的操作: {action}")
task.video_brand_comment = comment
task.video_brand_reviewer_id = reviewer_id
task.video_brand_reviewed_at = now
else:
raise ValueError(f"当前阶段 {task.stage.value} 不在品牌方审核中")
await db.flush()
await db.refresh(task)
return task
async def submit_appeal(
db: AsyncSession,
task: Task,
reason: str,
) -> Task:
"""
提交申诉(达人操作)
- 使用一次申诉次数
- 回到对应的上传阶段
"""
if task.stage != TaskStage.REJECTED:
raise ValueError(f"当前阶段 {task.stage.value} 不允许申诉")
if task.appeal_count <= 0:
raise ValueError("申诉次数已用完,请联系代理商申请增加")
# 消耗一次申诉次数
task.appeal_count -= 1
task.is_appeal = True
task.appeal_reason = reason
# 根据驳回阶段回到对应的上传阶段
# 检查是脚本阶段被驳回还是视频阶段被驳回
if task.video_agency_status == TaskStatus.REJECTED or task.video_brand_status == TaskStatus.REJECTED:
task.stage = TaskStage.VIDEO_UPLOAD
# 重置视频审核状态
task.video_agency_status = None
task.video_brand_status = None
else:
task.stage = TaskStage.SCRIPT_UPLOAD
# 重置脚本审核状态
task.script_agency_status = None
task.script_brand_status = None
await db.flush()
await db.refresh(task)
return task
async def increase_appeal_count(
db: AsyncSession,
task: Task,
additional_count: int = 1,
) -> Task:
"""
增加申诉次数(代理商操作)
"""
task.appeal_count += additional_count
await db.flush()
await db.refresh(task)
return task
async def list_tasks_for_creator(
db: AsyncSession,
creator_id: str,
page: int = 1,
page_size: int = 20,
stage: Optional[TaskStage] = None,
) -> Tuple[List[Task], int]:
"""获取达人的任务列表"""
query = (
select(Task)
.options(
selectinload(Task.project),
selectinload(Task.agency),
selectinload(Task.creator),
)
.where(Task.creator_id == creator_id)
)
if stage:
query = query.where(Task.stage == stage)
query = query.order_by(Task.created_at.desc())
# 获取总数
count_query = select(func.count(Task.id)).where(Task.creator_id == creator_id)
if stage:
count_query = count_query.where(Task.stage == stage)
count_result = await db.execute(count_query)
total = count_result.scalar() or 0
# 分页
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
tasks = list(result.scalars().all())
return tasks, total
async def list_tasks_for_agency(
db: AsyncSession,
agency_id: str,
page: int = 1,
page_size: int = 20,
stage: Optional[TaskStage] = None,
) -> Tuple[List[Task], int]:
"""获取代理商的任务列表"""
query = (
select(Task)
.options(
selectinload(Task.project),
selectinload(Task.agency),
selectinload(Task.creator),
)
.where(Task.agency_id == agency_id)
)
if stage:
query = query.where(Task.stage == stage)
query = query.order_by(Task.created_at.desc())
# 获取总数
count_query = select(func.count(Task.id)).where(Task.agency_id == agency_id)
if stage:
count_query = count_query.where(Task.stage == stage)
count_result = await db.execute(count_query)
total = count_result.scalar() or 0
# 分页
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
tasks = list(result.scalars().all())
return tasks, total
async def list_tasks_for_brand(
db: AsyncSession,
brand_id: str,
page: int = 1,
page_size: int = 20,
stage: Optional[TaskStage] = None,
) -> Tuple[List[Task], int]:
"""获取品牌方的任务列表(通过项目关联)"""
# 先获取品牌方的所有项目
project_ids_query = select(Project.id).where(Project.brand_id == brand_id)
project_ids_result = await db.execute(project_ids_query)
project_ids = [row[0] for row in project_ids_result.all()]
if not project_ids:
return [], 0
query = (
select(Task)
.options(
selectinload(Task.project),
selectinload(Task.agency),
selectinload(Task.creator),
)
.where(Task.project_id.in_(project_ids))
)
if stage:
query = query.where(Task.stage == stage)
query = query.order_by(Task.created_at.desc())
# 获取总数
count_query = select(func.count(Task.id)).where(Task.project_id.in_(project_ids))
if stage:
count_query = count_query.where(Task.stage == stage)
count_result = await db.execute(count_query)
total = count_result.scalar() or 0
# 分页
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
tasks = list(result.scalars().all())
return tasks, total
async def list_pending_reviews_for_agency(
db: AsyncSession,
agency_id: str,
page: int = 1,
page_size: int = 20,
) -> Tuple[List[Task], int]:
"""获取代理商待审核的任务列表"""
stages = [TaskStage.SCRIPT_AGENCY_REVIEW, TaskStage.VIDEO_AGENCY_REVIEW]
query = (
select(Task)
.options(
selectinload(Task.project),
selectinload(Task.agency),
selectinload(Task.creator),
)
.where(
and_(
Task.agency_id == agency_id,
Task.stage.in_(stages),
)
)
)
query = query.order_by(Task.created_at.desc())
# 获取总数
count_query = select(func.count(Task.id)).where(
and_(
Task.agency_id == agency_id,
Task.stage.in_(stages),
)
)
count_result = await db.execute(count_query)
total = count_result.scalar() or 0
# 分页
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
tasks = list(result.scalars().all())
return tasks, total
async def list_pending_reviews_for_brand(
db: AsyncSession,
brand_id: str,
page: int = 1,
page_size: int = 20,
) -> Tuple[List[Task], int]:
"""获取品牌方待审核的任务列表"""
# 先获取品牌方的所有项目
project_ids_query = select(Project.id).where(Project.brand_id == brand_id)
project_ids_result = await db.execute(project_ids_query)
project_ids = [row[0] for row in project_ids_result.all()]
if not project_ids:
return [], 0
stages = [TaskStage.SCRIPT_BRAND_REVIEW, TaskStage.VIDEO_BRAND_REVIEW]
query = (
select(Task)
.options(
selectinload(Task.project),
selectinload(Task.agency),
selectinload(Task.creator),
)
.where(
and_(
Task.project_id.in_(project_ids),
Task.stage.in_(stages),
)
)
)
query = query.order_by(Task.created_at.desc())
# 获取总数
count_query = select(func.count(Task.id)).where(
and_(
Task.project_id.in_(project_ids),
Task.stage.in_(stages),
)
)
count_result = await db.execute(count_query)
total = count_result.scalar() or 0
# 分页
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
tasks = list(result.scalars().all())
return tasks, total