kol-insight/backend/app/services/session_pool.py
zfc 7cd29c5980 feat(frontend): 重构视频分析页面,支持多种搜索方式
主要更新:
- 前端改用 Ant Design 组件(Table、Modal、Select 等)
- 支持三种搜索方式:星图ID、达人unique_id、达人昵称模糊匹配
- 列表页实时调用云图 API 获取 A3 数据和成本指标
- 详情弹窗显示完整 6 大类指标,支持文字复制
- 品牌 API URL 格式修复为查询参数形式
- 优化云图 API 参数格式和会话池管理

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 22:01:55 +08:00

227 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
SessionID池服务 (T-021, T-027)
从内部API获取Cookie列表随机选取 aadvid/auth_token 用于 API 调用。
T-027 修复:
- 改为随机选取任意一组配置,不按 brand_id 匹配
- auth_token 直接使用完整值 (如 "sessionid=xxx")
"""
import asyncio
import logging
import random
from typing import Dict, Optional, Any, List
from dataclasses import dataclass
import httpx
from app.config import settings
logger = logging.getLogger(__name__)
@dataclass
class CookieConfig:
"""Cookie 配置"""
brand_id: str
aadvid: str
auth_token: str # 完整的 cookie 值,如 "sessionid=xxx"
industry_id: int
brand_name: str
class SessionPool:
"""SessionID池管理器 - T-027: 改为随机选取"""
def __init__(self):
# 存储所有配置的列表
self._configs: List[CookieConfig] = []
self._lock = asyncio.Lock()
async def refresh(self) -> bool:
"""
从内部API刷新配置列表。
Returns:
bool: 刷新是否成功
"""
async with self._lock:
try:
headers = {}
if settings.YUNTU_API_TOKEN:
headers["Authorization"] = f"Bearer {settings.YUNTU_API_TOKEN}"
async with httpx.AsyncClient(
timeout=settings.YUNTU_API_TIMEOUT
) as client:
response = await client.get(
f"{settings.BRAND_API_BASE_URL}/v1/yuntu/get_cookie",
params={"page": 1, "page_size": 100},
headers=headers,
)
if response.status_code == 200:
data = response.json()
if isinstance(data, dict):
cookie_list = data.get("data", [])
if isinstance(cookie_list, list):
self._configs = []
for item in cookie_list:
if not isinstance(item, dict):
continue
brand_id = str(item.get("brand_id", ""))
aadvid = str(item.get("aadvid", ""))
# T-027: 直接使用 auth_token 或 sessionid_cookie 完整值
auth_token = item.get("auth_token") or item.get("sessionid_cookie", "")
industry_id = item.get("industry_id", 0)
brand_name = item.get("brand_name", "")
if brand_id and aadvid and auth_token:
self._configs.append(CookieConfig(
brand_id=brand_id,
aadvid=aadvid,
auth_token=auth_token,
industry_id=int(industry_id) if industry_id else 0,
brand_name=brand_name,
))
logger.info(
f"SessionPool refreshed: {len(self._configs)} configs"
)
return len(self._configs) > 0
logger.warning(
f"Failed to refresh session pool: status={response.status_code}"
)
return False
except httpx.TimeoutException:
logger.error("SessionPool refresh timeout")
return False
except httpx.RequestError as e:
logger.error(f"SessionPool refresh request error: {e}")
return False
except Exception as e:
logger.error(f"SessionPool refresh unexpected error: {e}")
return False
def get_random_config(self) -> Optional[Dict[str, Any]]:
"""
T-027: 随机选取任意一组配置。
Returns:
Dict or None: 包含 aadvid 和 auth_token 的字典
"""
if not self._configs:
return None
config = random.choice(self._configs)
return {
"brand_id": config.brand_id,
"aadvid": config.aadvid,
"auth_token": config.auth_token,
"industry_id": config.industry_id,
"brand_name": config.brand_name,
}
def remove_by_auth_token(self, auth_token: str) -> None:
"""
从池中移除失效的配置。
Args:
auth_token: 要移除的 auth_token
"""
self._configs = [c for c in self._configs if c.auth_token != auth_token]
logger.info(f"Removed invalid config: {auth_token[:20]}...")
# 兼容旧接口
def remove(self, session_id: str) -> None:
"""兼容旧接口:移除包含指定 session_id 的配置"""
self._configs = [c for c in self._configs if session_id not in c.auth_token]
@property
def size(self) -> int:
"""返回池中配置数量"""
return len(self._configs)
@property
def is_empty(self) -> bool:
"""检查池是否为空"""
return len(self._configs) == 0
# 兼容旧接口
def get_random(self) -> Optional[str]:
"""兼容旧接口:随机获取一个 SessionID"""
config = self.get_random_config()
if config:
# 从 auth_token 中提取 sessionid
auth_token = config["auth_token"]
if "=" in auth_token:
return auth_token.split("=", 1)[-1]
return auth_token
return None
# 兼容旧代码
@property
def _brand_configs(self) -> Dict[str, Any]:
"""兼容旧接口"""
return {c.brand_id: c for c in self._configs}
# 全局单例
session_pool = SessionPool()
async def get_random_config(max_retries: int = 3) -> Optional[Dict[str, Any]]:
"""
T-027: 随机获取一组配置,必要时刷新池。
Args:
max_retries: 最大重试次数
Returns:
Dict or None: 包含 aadvid 和 auth_token 的字典
"""
for attempt in range(max_retries):
if session_pool.is_empty:
success = await session_pool.refresh()
if not success:
logger.warning(f"Session pool refresh failed, attempt {attempt + 1}")
continue
config = session_pool.get_random_config()
if config:
return config
logger.error("Failed to get config after all retries")
return None
# 兼容旧接口
async def get_session_with_retry(max_retries: int = 3) -> Optional[str]:
"""
获取SessionID必要时刷新池 (兼容旧接口)。
Args:
max_retries: 最大重试次数
Returns:
Optional[str]: SessionID获取失败返回None
"""
config = await get_random_config(max_retries)
if config:
auth_token = config["auth_token"]
if "=" in auth_token:
return auth_token.split("=", 1)[-1]
return auth_token
return None
async def get_config_for_brand(brand_id: str, max_retries: int = 3) -> Optional[Any]:
"""
兼容旧接口:获取品牌对应的配置。
T-027: 实际上现在随机选取,不再按 brand_id 匹配。
"""
return await get_random_config(max_retries)