""" 文件上传 API """ from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile, File, Form, status from pydantic import BaseModel from typing import Optional from datetime import datetime from app.services.oss import generate_upload_policy, get_file_url, generate_presigned_url from app.config import settings from app.models.user import User from app.api.deps import get_current_user router = APIRouter(prefix="/upload", tags=["文件上传"]) class UploadPolicyRequest(BaseModel): """获取上传凭证请求""" file_type: str = "general" # script, video, image, general file_name: Optional[str] = None class UploadPolicyResponse(BaseModel): """TOS 直传凭证响应""" x_tos_algorithm: str x_tos_credential: str x_tos_date: str x_tos_signature: str policy: str host: str dir: str expire: int max_size_mb: int class FileUploadedRequest(BaseModel): """文件上传完成回调""" file_key: str file_name: str file_size: int file_type: str class FileUploadedResponse(BaseModel): """文件上传完成响应""" url: str file_key: str file_name: str file_size: int file_type: str @router.post("/policy", response_model=UploadPolicyResponse) async def get_upload_policy( request: UploadPolicyRequest, current_user: User = Depends(get_current_user), ): """ 获取 TOS 直传凭证 前端使用此凭证直接上传文件到火山引擎 TOS,无需经过后端。 文件类型说明: - script: 脚本文档 (docx, pdf, xlsx, txt, pptx) - video: 视频文件 (mp4, mov, webm) - image: 图片文件 (jpg, png, gif) - general: 通用文件 """ # 根据文件类型设置上传目录 now = datetime.now() base_dir = f"uploads/{now.year}/{now.month:02d}" if request.file_type == "script": upload_dir = f"{base_dir}/scripts/" elif request.file_type == "video": upload_dir = f"{base_dir}/videos/" elif request.file_type == "image": upload_dir = f"{base_dir}/images/" else: upload_dir = f"{base_dir}/files/" try: policy = generate_upload_policy( max_size_mb=settings.MAX_FILE_SIZE_MB, expire_seconds=3600, upload_dir=upload_dir, ) except ValueError as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e), ) return UploadPolicyResponse( x_tos_algorithm=policy["x_tos_algorithm"], x_tos_credential=policy["x_tos_credential"], x_tos_date=policy["x_tos_date"], x_tos_signature=policy["x_tos_signature"], policy=policy["policy"], host=policy["host"], dir=policy["dir"], expire=policy["expire"], max_size_mb=settings.MAX_FILE_SIZE_MB, ) @router.post("/complete", response_model=FileUploadedResponse) async def file_uploaded( request: FileUploadedRequest, current_user: User = Depends(get_current_user), ): """ 文件上传完成回调 前端上传完成后调用此接口,获取文件的完整 URL。 """ url = get_file_url(request.file_key) return FileUploadedResponse( url=url, file_key=request.file_key, file_name=request.file_name, file_size=request.file_size, file_type=request.file_type, ) class SignedUrlResponse(BaseModel): """签名 URL 响应""" signed_url: str expire_seconds: int @router.get("/sign-url", response_model=SignedUrlResponse) async def get_signed_url( url: str = Query(..., description="文件的原始 URL 或 file_key"), expire: int = Query(3600, ge=60, le=43200, description="有效期(秒),默认1小时,最长12小时"), download: bool = Query(False, description="是否强制下载(添加 Content-Disposition: attachment)"), current_user: User = Depends(get_current_user), ): """ 获取私有桶文件的预签名访问 URL 前端在展示/下载文件前调用此接口,获取带签名的临时访问链接。 支持传入完整 URL 或 file_key。 """ from app.services.oss import parse_file_key_from_url # 如果传入的是完整 URL,先解析出 file_key file_key = url if url.startswith("http"): file_key = parse_file_key_from_url(url) if not file_key: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="无效的文件路径", ) try: signed_url = generate_presigned_url(file_key, expire_seconds=expire, download=download) except ValueError as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e), ) return SignedUrlResponse( signed_url=signed_url, expire_seconds=expire, ) @router.post("/proxy", response_model=FileUploadedResponse) async def proxy_upload( file: UploadFile = File(...), file_type: str = Form("general"), current_user: User = Depends(get_current_user), ): """ 后端代理上传(用于本地开发 / 浏览器无法直连 TOS 的场景) 前端把文件 POST 到此接口,后端使用 TOS SDK 上传到对象存储。 """ import io import tos as tos_sdk if not settings.TOS_ACCESS_KEY_ID or not settings.TOS_SECRET_ACCESS_KEY: raise HTTPException(status_code=500, detail="TOS 配置未设置") now = datetime.now() base_dir = f"uploads/{now.year}/{now.month:02d}" type_dirs = {"script": "scripts", "video": "videos", "image": "images"} sub_dir = type_dirs.get(file_type, "files") file_key = f"{base_dir}/{sub_dir}/{int(now.timestamp())}_{file.filename}" content = await file.read() content_type = file.content_type or "application/octet-stream" region = settings.TOS_REGION endpoint = settings.TOS_ENDPOINT or f"tos-cn-{region}.volces.com" try: client = tos_sdk.TosClientV2( ak=settings.TOS_ACCESS_KEY_ID, sk=settings.TOS_SECRET_ACCESS_KEY, endpoint=f"https://{endpoint}", region=region, ) client.put_object( bucket=settings.TOS_BUCKET_NAME, key=file_key, content=io.BytesIO(content), content_type=content_type, ) except Exception as e: raise HTTPException( status_code=502, detail=f"TOS 上传失败: {str(e)[:200]}", ) url = get_file_url(file_key) return FileUploadedResponse( url=url, file_key=file_key, file_name=file.filename or "unknown", file_size=len(content), file_type=file_type, )