""" 火山引擎 TOS (Volcengine Object Storage) 服务 — 表单直传签名 (V4) """ import time import hmac import base64 import hashlib import json from typing import Optional from datetime import datetime, timezone from app.config import settings def generate_upload_policy( max_size_mb: int = 500, expire_seconds: int = 3600, upload_dir: Optional[str] = None, ) -> dict: """ 生成前端直传 TOS 所需的 Policy 和签名 (V4 HMAC-SHA256) TOS 表单直传签名流程 (PostObject): 1. 构建 policy JSON → Base64 编码 2. 派生签名密钥: kDate → kRegion → kService → kSigning 3. signature = HMAC-SHA256(kSigning, policy_base64) Returns: { "x_tos_algorithm": "TOS4-HMAC-SHA256", "x_tos_credential": "AKIDxxxx/20260210/cn-beijing/tos/request", "x_tos_date": "20260210T120000Z", "x_tos_signature": "...", "policy": "base64 encoded policy", "host": "https://bucket.tos-cn-beijing.volces.com", "dir": "uploads/2026/02/", "expire": 1234567890, } """ if not settings.TOS_ACCESS_KEY_ID or not settings.TOS_SECRET_ACCESS_KEY: raise ValueError("TOS 配置未设置") # 计算时间 now_utc = datetime.now(timezone.utc) date_stamp = now_utc.strftime("%Y%m%d") # 20260210 tos_date = now_utc.strftime("%Y%m%dT%H%M%SZ") # 20260210T120000Z expire_time = int(time.time()) + expire_seconds expiration = datetime.fromtimestamp(expire_time, tz=timezone.utc).strftime( "%Y-%m-%dT%H:%M:%S.000Z" ) # Credential scope region = settings.TOS_REGION credential = f"{settings.TOS_ACCESS_KEY_ID}/{date_stamp}/{region}/tos/request" # 默认上传目录:uploads/年/月/ if upload_dir is None: now = datetime.now() upload_dir = f"uploads/{now.year}/{now.month:02d}/" # 1. 构建 Policy policy_dict = { "expiration": expiration, "conditions": [ {"bucket": settings.TOS_BUCKET_NAME}, ["starts-with", "$key", upload_dir], {"x-tos-algorithm": "TOS4-HMAC-SHA256"}, {"x-tos-credential": credential}, {"x-tos-date": tos_date}, ["content-length-range", 0, max_size_mb * 1024 * 1024], ], } # 2. Base64 编码 Policy policy_json = json.dumps(policy_dict) policy_base64 = base64.b64encode(policy_json.encode()).decode() # 3. 派生签名密钥 (V4 Signing Key) k_date = hmac.new( f"TOS4{settings.TOS_SECRET_ACCESS_KEY}".encode(), date_stamp.encode(), hashlib.sha256, ).digest() k_region = hmac.new(k_date, region.encode(), hashlib.sha256).digest() k_service = hmac.new(k_region, b"tos", hashlib.sha256).digest() k_signing = hmac.new(k_service, b"request", hashlib.sha256).digest() # 4. signature = HMAC-SHA256(kSigning, policy_base64) signature = hmac.new( k_signing, policy_base64.encode(), hashlib.sha256, ).hexdigest() # 构建 Host endpoint = settings.TOS_ENDPOINT or f"tos-cn-{region}.volces.com" host = f"https://{settings.TOS_BUCKET_NAME}.{endpoint}" return { "x_tos_algorithm": "TOS4-HMAC-SHA256", "x_tos_credential": credential, "x_tos_date": tos_date, "x_tos_signature": signature, "policy": policy_base64, "host": host, "dir": upload_dir, "expire": expire_time, } def get_file_url(file_key: str) -> str: """ 获取文件的访问 URL 优先使用 CDN 域名,否则用 TOS 源站域名。 Args: file_key: 文件在 TOS 中的 key,如 "uploads/2026/02/video.mp4" Returns: 完整的访问 URL """ if settings.TOS_CDN_DOMAIN: host = settings.TOS_CDN_DOMAIN else: endpoint = settings.TOS_ENDPOINT or f"tos-cn-{settings.TOS_REGION}.volces.com" host = f"https://{settings.TOS_BUCKET_NAME}.{endpoint}" # 确保 host 以 https:// 开头 if not host.startswith("http"): host = f"https://{host}" # 确保 host 不以 / 结尾 host = host.rstrip("/") # 确保 file_key 不以 / 开头 file_key = file_key.lstrip("/") return f"{host}/{file_key}" def generate_presigned_url( file_key: str, expire_seconds: int = 3600, download: bool = False, filename: str | None = None, ) -> str: """ 为私有桶中的文件生成预签名访问 URL (TOS V4 Query String Auth) 签名流程: 1. 构建 CanonicalRequest 2. 构建 StringToSign 3. 用派生密钥签名 4. 拼接查询参数 Args: file_key: 文件在 TOS 中的 key expire_seconds: URL 有效期(秒),默认 1 小时 Returns: 预签名 URL """ if not settings.TOS_ACCESS_KEY_ID or not settings.TOS_SECRET_ACCESS_KEY: raise ValueError("TOS 配置未设置") from urllib.parse import quote region = settings.TOS_REGION endpoint = settings.TOS_ENDPOINT or f"tos-cn-{region}.volces.com" bucket = settings.TOS_BUCKET_NAME host = f"{bucket}.{endpoint}" now_utc = datetime.now(timezone.utc) date_stamp = now_utc.strftime("%Y%m%d") tos_date = now_utc.strftime("%Y%m%dT%H%M%SZ") credential = f"{settings.TOS_ACCESS_KEY_ID}/{date_stamp}/{region}/tos/request" # 对 file_key 中的路径段分别编码 encoded_key = "/".join(quote(seg, safe="") for seg in file_key.split("/")) # 查询参数(按字母序排列,TOS V4 签名要求严格字母序) params_dict: dict[str, str] = { "X-Tos-Algorithm": "TOS4-HMAC-SHA256", "X-Tos-Credential": quote(credential, safe=''), "X-Tos-Date": tos_date, "X-Tos-Expires": str(expire_seconds), "X-Tos-SignedHeaders": "host", } if download: dl_name = filename or file_key.split("/")[-1] params_dict["response-content-disposition"] = quote( f'attachment; filename="{dl_name}"', safe='' ) query_params = "&".join(f"{k}={v}" for k, v in sorted(params_dict.items())) # CanonicalRequest canonical_request = ( f"GET\n" f"/{encoded_key}\n" f"{query_params}\n" f"host:{host}\n" f"\n" f"host\n" f"UNSIGNED-PAYLOAD" ) # StringToSign canonical_request_hash = hashlib.sha256(canonical_request.encode()).hexdigest() string_to_sign = ( f"TOS4-HMAC-SHA256\n" f"{tos_date}\n" f"{date_stamp}/{region}/tos/request\n" f"{canonical_request_hash}" ) # 派生签名密钥 k_date = hmac.new( f"TOS4{settings.TOS_SECRET_ACCESS_KEY}".encode(), date_stamp.encode(), hashlib.sha256, ).digest() k_region = hmac.new(k_date, region.encode(), hashlib.sha256).digest() k_service = hmac.new(k_region, b"tos", hashlib.sha256).digest() k_signing = hmac.new(k_service, b"request", hashlib.sha256).digest() # 计算签名 signature = hmac.new( k_signing, string_to_sign.encode(), hashlib.sha256, ).hexdigest() return ( f"https://{host}/{encoded_key}" f"?{query_params}" f"&X-Tos-Signature={signature}" ) def parse_file_key_from_url(url: str) -> str: """ 从完整 URL 解析出文件 key Args: url: 完整的 TOS URL Returns: 文件 key """ # 尝试移除 CDN 域名 if settings.TOS_CDN_DOMAIN: cdn = settings.TOS_CDN_DOMAIN.rstrip("/") if not cdn.startswith("http"): cdn = f"https://{cdn}" if url.startswith(cdn): return url[len(cdn):].lstrip("/") # 尝试移除 TOS 源站域名 endpoint = settings.TOS_ENDPOINT or f"tos-cn-{settings.TOS_REGION}.volces.com" tos_host = f"https://{settings.TOS_BUCKET_NAME}.{endpoint}" if url.startswith(tos_host): return url[len(tos_host):].lstrip("/") return url