""" 文件上传 API """ from fastapi import APIRouter, Depends, HTTPException, Query, status from pydantic import BaseModel from typing import Optional from datetime import datetime from app.services.oss import generate_upload_policy, get_file_url, generate_presigned_url from app.config import settings from app.models.user import User from app.api.deps import get_current_user router = APIRouter(prefix="/upload", tags=["文件上传"]) class UploadPolicyRequest(BaseModel): """获取上传凭证请求""" file_type: str = "general" # script, video, image, general file_name: Optional[str] = None class UploadPolicyResponse(BaseModel): """TOS 直传凭证响应""" x_tos_algorithm: str x_tos_credential: str x_tos_date: str x_tos_signature: str policy: str host: str dir: str expire: int max_size_mb: int class FileUploadedRequest(BaseModel): """文件上传完成回调""" file_key: str file_name: str file_size: int file_type: str class FileUploadedResponse(BaseModel): """文件上传完成响应""" url: str file_key: str file_name: str file_size: int file_type: str @router.post("/policy", response_model=UploadPolicyResponse) async def get_upload_policy( request: UploadPolicyRequest, current_user: User = Depends(get_current_user), ): """ 获取 TOS 直传凭证 前端使用此凭证直接上传文件到火山引擎 TOS,无需经过后端。 文件类型说明: - script: 脚本文档 (docx, pdf, xlsx, txt, pptx) - video: 视频文件 (mp4, mov, webm) - image: 图片文件 (jpg, png, gif) - general: 通用文件 """ # 根据文件类型设置上传目录 now = datetime.now() base_dir = f"uploads/{now.year}/{now.month:02d}" if request.file_type == "script": upload_dir = f"{base_dir}/scripts/" elif request.file_type == "video": upload_dir = f"{base_dir}/videos/" elif request.file_type == "image": upload_dir = f"{base_dir}/images/" else: upload_dir = f"{base_dir}/files/" try: policy = generate_upload_policy( max_size_mb=settings.MAX_FILE_SIZE_MB, expire_seconds=3600, upload_dir=upload_dir, ) except ValueError as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e), ) return UploadPolicyResponse( x_tos_algorithm=policy["x_tos_algorithm"], x_tos_credential=policy["x_tos_credential"], x_tos_date=policy["x_tos_date"], x_tos_signature=policy["x_tos_signature"], policy=policy["policy"], host=policy["host"], dir=policy["dir"], expire=policy["expire"], max_size_mb=settings.MAX_FILE_SIZE_MB, ) @router.post("/complete", response_model=FileUploadedResponse) async def file_uploaded( request: FileUploadedRequest, current_user: User = Depends(get_current_user), ): """ 文件上传完成回调 前端上传完成后调用此接口,获取文件的完整 URL。 """ url = get_file_url(request.file_key) return FileUploadedResponse( url=url, file_key=request.file_key, file_name=request.file_name, file_size=request.file_size, file_type=request.file_type, ) class SignedUrlResponse(BaseModel): """签名 URL 响应""" signed_url: str expire_seconds: int @router.get("/sign-url", response_model=SignedUrlResponse) async def get_signed_url( url: str = Query(..., description="文件的原始 URL 或 file_key"), expire: int = Query(3600, ge=60, le=43200, description="有效期(秒),默认1小时,最长12小时"), current_user: User = Depends(get_current_user), ): """ 获取私有桶文件的预签名访问 URL 前端在展示/下载文件前调用此接口,获取带签名的临时访问链接。 支持传入完整 URL 或 file_key。 """ from app.services.oss import parse_file_key_from_url # 如果传入的是完整 URL,先解析出 file_key file_key = url if url.startswith("http"): file_key = parse_file_key_from_url(url) if not file_key: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="无效的文件路径", ) try: signed_url = generate_presigned_url(file_key, expire_seconds=expire) except ValueError as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e), ) return SignedUrlResponse( signed_url=signed_url, expire_seconds=expire, )