Your Name a32102f583 feat: 补全后端 API 并对齐前后端类型
- 后端新增: Project CRUD / Brief CRUD / 组织关系管理 / 工作台统计 / SSE 推送 / 认证依赖注入
- 后端完善: 任务 API 全流程(创建/审核/申诉) + Task Service + Task Schema
- 前端修复: login 页面 localStorage key 错误 (miaosi_auth -> miaosi_user)
- 前端对齐: types/task.ts 与后端 TaskStage/TaskResponse 完全对齐
- 前端新增: project/brief/organization/dashboard 类型定义
- 前端补全: api.ts 新增 30+ API 方法覆盖所有后端接口

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 14:13:08 +08:00

183 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Brief API
项目 Brief 文档的 CRUD
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.user import User, UserRole
from app.models.project import Project
from app.models.brief import Brief
from app.models.organization import Brand, Agency
from app.api.deps import get_current_user
from app.schemas.brief import (
BriefCreateRequest,
BriefUpdateRequest,
BriefResponse,
)
from app.services.auth import generate_id
router = APIRouter(prefix="/projects/{project_id}/brief", tags=["Brief"])
async def _get_project_with_permission(
project_id: str,
current_user: User,
db: AsyncSession,
require_write: bool = False,
) -> Project:
"""获取项目并检查权限"""
result = await db.execute(
select(Project)
.options(selectinload(Project.brand), selectinload(Project.agencies))
.where(Project.id == project_id)
)
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
if current_user.role == UserRole.BRAND:
brand_result = await db.execute(
select(Brand).where(Brand.user_id == current_user.id)
)
brand = brand_result.scalar_one_or_none()
if not brand or project.brand_id != brand.id:
raise HTTPException(status_code=403, detail="无权访问此项目")
elif current_user.role == UserRole.AGENCY:
if require_write:
raise HTTPException(status_code=403, detail="代理商无权修改 Brief")
agency_result = await db.execute(
select(Agency).where(Agency.user_id == current_user.id)
)
agency = agency_result.scalar_one_or_none()
if not agency or agency not in project.agencies:
raise HTTPException(status_code=403, detail="无权访问此项目")
elif current_user.role == UserRole.CREATOR:
# 达人可以查看 Brief只读
if require_write:
raise HTTPException(status_code=403, detail="达人无权修改 Brief")
else:
raise HTTPException(status_code=403, detail="无权访问")
return project
def _brief_to_response(brief: Brief) -> BriefResponse:
"""转换 Brief 为响应"""
return BriefResponse(
id=brief.id,
project_id=brief.project_id,
project_name=brief.project.name if brief.project else None,
file_url=brief.file_url,
file_name=brief.file_name,
selling_points=brief.selling_points,
blacklist_words=brief.blacklist_words,
competitors=brief.competitors,
brand_tone=brief.brand_tone,
min_duration=brief.min_duration,
max_duration=brief.max_duration,
other_requirements=brief.other_requirements,
attachments=brief.attachments,
created_at=brief.created_at,
updated_at=brief.updated_at,
)
@router.get("", response_model=BriefResponse)
async def get_brief(
project_id: str,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""获取项目 Brief"""
await _get_project_with_permission(project_id, current_user, db)
result = await db.execute(
select(Brief)
.options(selectinload(Brief.project))
.where(Brief.project_id == project_id)
)
brief = result.scalar_one_or_none()
if not brief:
raise HTTPException(status_code=404, detail="Brief 不存在")
return _brief_to_response(brief)
@router.post("", response_model=BriefResponse, status_code=status.HTTP_201_CREATED)
async def create_brief(
project_id: str,
request: BriefCreateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""创建项目 Brief品牌方操作"""
await _get_project_with_permission(project_id, current_user, db, require_write=True)
# 检查是否已存在
existing = await db.execute(
select(Brief).where(Brief.project_id == project_id)
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=400, detail="该项目已有 Brief请使用更新接口")
brief = Brief(
id=generate_id("BF"),
project_id=project_id,
file_url=request.file_url,
file_name=request.file_name,
selling_points=request.selling_points,
blacklist_words=request.blacklist_words,
competitors=request.competitors,
brand_tone=request.brand_tone,
min_duration=request.min_duration,
max_duration=request.max_duration,
other_requirements=request.other_requirements,
attachments=request.attachments,
)
db.add(brief)
await db.flush()
# 重新加载
result = await db.execute(
select(Brief)
.options(selectinload(Brief.project))
.where(Brief.id == brief.id)
)
brief = result.scalar_one()
return _brief_to_response(brief)
@router.put("", response_model=BriefResponse)
async def update_brief(
project_id: str,
request: BriefUpdateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""更新项目 Brief品牌方操作"""
await _get_project_with_permission(project_id, current_user, db, require_write=True)
result = await db.execute(
select(Brief)
.options(selectinload(Brief.project))
.where(Brief.project_id == project_id)
)
brief = result.scalar_one_or_none()
if not brief:
raise HTTPException(status_code=404, detail="Brief 不存在")
# 更新字段
update_fields = request.model_dump(exclude_unset=True)
for field, value in update_fields.items():
setattr(brief, field, value)
await db.flush()
await db.refresh(brief)
return _brief_to_response(brief)