Your Name 4caafdb50f feat: 添加后端核心模块
用户认证:
- User 模型(支持邮箱/手机号登录)
- 双 Token JWT 认证(accessToken + refreshToken)
- 注册/登录/刷新 Token API

组织模型:
- Brand(品牌方)、Agency(代理商)、Creator(达人)
- 多对多关系:品牌方↔代理商、代理商↔达人

项目与任务:
- Project 模型(品牌方发布)
- Task 模型(完整审核流程追踪)
- Brief 模型(解析后的结构化内容)

文件上传:
- 阿里云 OSS 直传签名服务
- 支持分片上传,最大 500MB

数据库迁移:
- 003_user_org_project_task.py

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-09 13:47:36 +08:00

153 lines
4.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
阿里云 OSS 服务
"""
import time
import hmac
import base64
import hashlib
import json
from typing import Optional
from datetime import datetime
from app.config import settings
def generate_upload_policy(
max_size_mb: int = 500,
expire_seconds: int = 3600,
upload_dir: Optional[str] = None,
) -> dict:
"""
生成前端直传 OSS 所需的 Policy 和签名
Returns:
{
"accessKeyId": "...",
"policy": "base64 encoded policy",
"signature": "...",
"host": "https://bucket.oss-cn-hangzhou.aliyuncs.com",
"dir": "uploads/2026/02/",
"expire": 1234567890
}
"""
if not settings.OSS_ACCESS_KEY_ID or not settings.OSS_ACCESS_KEY_SECRET:
raise ValueError("OSS 配置未设置")
# 计算过期时间
expire_time = int(time.time()) + expire_seconds
expire_date = datetime.utcfromtimestamp(expire_time).strftime("%Y-%m-%dT%H:%M:%SZ")
# 默认上传目录uploads/年/月/
if upload_dir is None:
now = datetime.now()
upload_dir = f"uploads/{now.year}/{now.month:02d}/"
# 构建 Policy
policy_dict = {
"expiration": expire_date,
"conditions": [
{"bucket": settings.OSS_BUCKET_NAME},
["starts-with", "$key", upload_dir],
["content-length-range", 0, max_size_mb * 1024 * 1024],
]
}
# Base64 编码 Policy
policy_json = json.dumps(policy_dict)
policy_base64 = base64.b64encode(policy_json.encode()).decode()
# 计算签名
signature = base64.b64encode(
hmac.new(
settings.OSS_ACCESS_KEY_SECRET.encode(),
policy_base64.encode(),
hashlib.sha1
).digest()
).decode()
# 构建 Host
host = settings.OSS_BUCKET_DOMAIN
if not host:
host = f"https://{settings.OSS_BUCKET_NAME}.{settings.OSS_ENDPOINT}"
return {
"accessKeyId": settings.OSS_ACCESS_KEY_ID,
"policy": policy_base64,
"signature": signature,
"host": host,
"dir": upload_dir,
"expire": expire_time,
}
def generate_sts_token(
role_arn: str,
session_name: str = "miaosi-upload",
duration_seconds: int = 3600,
) -> dict:
"""
生成 STS 临时凭证(需要配置 RAM 角色)
注意:此方法需要安装 aliyun-python-sdk-sts
如果不使用 STS可以使用上面的 generate_upload_policy 方法
"""
# TODO: 实现 STS 临时凭证生成
# 需要安装 aliyun-python-sdk-core 和 aliyun-python-sdk-sts
raise NotImplementedError("STS 临时凭证生成暂未实现,请使用 generate_upload_policy")
def get_file_url(file_key: str) -> str:
"""
获取文件的公开访问 URL
Args:
file_key: 文件在 OSS 中的 key"uploads/2026/02/video.mp4"
Returns:
完整的访问 URL
"""
host = settings.OSS_BUCKET_DOMAIN
if not host:
host = f"https://{settings.OSS_BUCKET_NAME}.{settings.OSS_ENDPOINT}"
# 确保 host 以 https:// 开头
if not host.startswith("http"):
host = f"https://{host}"
# 确保 host 不以 / 结尾
host = host.rstrip("/")
# 确保 file_key 不以 / 开头
file_key = file_key.lstrip("/")
return f"{host}/{file_key}"
def parse_file_key_from_url(url: str) -> str:
"""
从完整 URL 解析出文件 key
Args:
url: 完整的 OSS URL
Returns:
文件 key
"""
host = settings.OSS_BUCKET_DOMAIN
if not host:
host = f"https://{settings.OSS_BUCKET_NAME}.{settings.OSS_ENDPOINT}"
# 移除 host 前缀
if url.startswith(host):
return url[len(host):].lstrip("/")
# 尝试其他格式
if settings.OSS_BUCKET_NAME in url:
# 格式: https://bucket.endpoint/key
parts = url.split(settings.OSS_BUCKET_NAME + ".")
if len(parts) > 1:
key_part = parts[1].split("/", 1)
if len(key_part) > 1:
return key_part[1]
return url