用户认证: - User 模型(支持邮箱/手机号登录) - 双 Token JWT 认证(accessToken + refreshToken) - 注册/登录/刷新 Token API 组织模型: - Brand(品牌方)、Agency(代理商)、Creator(达人) - 多对多关系:品牌方↔代理商、代理商↔达人 项目与任务: - Project 模型(品牌方发布) - Task 模型(完整审核流程追踪) - Brief 模型(解析后的结构化内容) 文件上传: - 阿里云 OSS 直传签名服务 - 支持分片上传,最大 500MB 数据库迁移: - 003_user_org_project_task.py Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
153 lines
4.0 KiB
Python
153 lines
4.0 KiB
Python
"""
|
||
阿里云 OSS 服务
|
||
"""
|
||
import time
|
||
import hmac
|
||
import base64
|
||
import hashlib
|
||
import json
|
||
from typing import Optional
|
||
from datetime import datetime
|
||
from app.config import settings
|
||
|
||
|
||
def generate_upload_policy(
|
||
max_size_mb: int = 500,
|
||
expire_seconds: int = 3600,
|
||
upload_dir: Optional[str] = None,
|
||
) -> dict:
|
||
"""
|
||
生成前端直传 OSS 所需的 Policy 和签名
|
||
|
||
Returns:
|
||
{
|
||
"accessKeyId": "...",
|
||
"policy": "base64 encoded policy",
|
||
"signature": "...",
|
||
"host": "https://bucket.oss-cn-hangzhou.aliyuncs.com",
|
||
"dir": "uploads/2026/02/",
|
||
"expire": 1234567890
|
||
}
|
||
"""
|
||
if not settings.OSS_ACCESS_KEY_ID or not settings.OSS_ACCESS_KEY_SECRET:
|
||
raise ValueError("OSS 配置未设置")
|
||
|
||
# 计算过期时间
|
||
expire_time = int(time.time()) + expire_seconds
|
||
expire_date = datetime.utcfromtimestamp(expire_time).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
|
||
# 默认上传目录:uploads/年/月/
|
||
if upload_dir is None:
|
||
now = datetime.now()
|
||
upload_dir = f"uploads/{now.year}/{now.month:02d}/"
|
||
|
||
# 构建 Policy
|
||
policy_dict = {
|
||
"expiration": expire_date,
|
||
"conditions": [
|
||
{"bucket": settings.OSS_BUCKET_NAME},
|
||
["starts-with", "$key", upload_dir],
|
||
["content-length-range", 0, max_size_mb * 1024 * 1024],
|
||
]
|
||
}
|
||
|
||
# Base64 编码 Policy
|
||
policy_json = json.dumps(policy_dict)
|
||
policy_base64 = base64.b64encode(policy_json.encode()).decode()
|
||
|
||
# 计算签名
|
||
signature = base64.b64encode(
|
||
hmac.new(
|
||
settings.OSS_ACCESS_KEY_SECRET.encode(),
|
||
policy_base64.encode(),
|
||
hashlib.sha1
|
||
).digest()
|
||
).decode()
|
||
|
||
# 构建 Host
|
||
host = settings.OSS_BUCKET_DOMAIN
|
||
if not host:
|
||
host = f"https://{settings.OSS_BUCKET_NAME}.{settings.OSS_ENDPOINT}"
|
||
|
||
return {
|
||
"accessKeyId": settings.OSS_ACCESS_KEY_ID,
|
||
"policy": policy_base64,
|
||
"signature": signature,
|
||
"host": host,
|
||
"dir": upload_dir,
|
||
"expire": expire_time,
|
||
}
|
||
|
||
|
||
def generate_sts_token(
|
||
role_arn: str,
|
||
session_name: str = "miaosi-upload",
|
||
duration_seconds: int = 3600,
|
||
) -> dict:
|
||
"""
|
||
生成 STS 临时凭证(需要配置 RAM 角色)
|
||
|
||
注意:此方法需要安装 aliyun-python-sdk-sts
|
||
如果不使用 STS,可以使用上面的 generate_upload_policy 方法
|
||
"""
|
||
# TODO: 实现 STS 临时凭证生成
|
||
# 需要安装 aliyun-python-sdk-core 和 aliyun-python-sdk-sts
|
||
raise NotImplementedError("STS 临时凭证生成暂未实现,请使用 generate_upload_policy")
|
||
|
||
|
||
def get_file_url(file_key: str) -> str:
|
||
"""
|
||
获取文件的公开访问 URL
|
||
|
||
Args:
|
||
file_key: 文件在 OSS 中的 key,如 "uploads/2026/02/video.mp4"
|
||
|
||
Returns:
|
||
完整的访问 URL
|
||
"""
|
||
host = settings.OSS_BUCKET_DOMAIN
|
||
if not host:
|
||
host = f"https://{settings.OSS_BUCKET_NAME}.{settings.OSS_ENDPOINT}"
|
||
|
||
# 确保 host 以 https:// 开头
|
||
if not host.startswith("http"):
|
||
host = f"https://{host}"
|
||
|
||
# 确保 host 不以 / 结尾
|
||
host = host.rstrip("/")
|
||
|
||
# 确保 file_key 不以 / 开头
|
||
file_key = file_key.lstrip("/")
|
||
|
||
return f"{host}/{file_key}"
|
||
|
||
|
||
def parse_file_key_from_url(url: str) -> str:
|
||
"""
|
||
从完整 URL 解析出文件 key
|
||
|
||
Args:
|
||
url: 完整的 OSS URL
|
||
|
||
Returns:
|
||
文件 key
|
||
"""
|
||
host = settings.OSS_BUCKET_DOMAIN
|
||
if not host:
|
||
host = f"https://{settings.OSS_BUCKET_NAME}.{settings.OSS_ENDPOINT}"
|
||
|
||
# 移除 host 前缀
|
||
if url.startswith(host):
|
||
return url[len(host):].lstrip("/")
|
||
|
||
# 尝试其他格式
|
||
if settings.OSS_BUCKET_NAME in url:
|
||
# 格式: https://bucket.endpoint/key
|
||
parts = url.split(settings.OSS_BUCKET_NAME + ".")
|
||
if len(parts) > 1:
|
||
key_part = parts[1].split("/", 1)
|
||
if len(key_part) > 1:
|
||
return key_part[1]
|
||
|
||
return url
|