""" SessionID池服务 (T-021, T-027) 从内部API获取Cookie列表,随机选取 aadvid/auth_token 用于 API 调用。 T-027 修复: - 改为随机选取任意一组配置,不按 brand_id 匹配 - auth_token 直接使用完整值 (如 "sessionid=xxx") """ import asyncio import logging import random from dataclasses import dataclass from typing import Any, Dict, List, Optional import httpx from app.config import settings logger = logging.getLogger(__name__) @dataclass class CookieConfig: """Cookie 配置""" brand_id: str aadvid: str auth_token: str # 完整的 cookie 值,如 "sessionid=xxx" industry_id: int brand_name: str class SessionPool: """SessionID池管理器 - T-027: 改为随机选取""" def __init__(self): # 存储所有配置的列表 self._configs: List[CookieConfig] = [] self._lock = asyncio.Lock() async def refresh(self) -> bool: """ 从内部API刷新配置列表。 Returns: bool: 刷新是否成功 """ async with self._lock: try: headers = {} if settings.YUNTU_API_TOKEN: headers["Authorization"] = f"Bearer {settings.YUNTU_API_TOKEN}" async with httpx.AsyncClient( timeout=settings.YUNTU_API_TIMEOUT ) as client: response = await client.get( f"{settings.BRAND_API_BASE_URL}/v1/yuntu/get_cookie", params={"page": 1, "page_size": 100}, headers=headers, ) if response.status_code == 200: data = response.json() if isinstance(data, dict): cookie_list = data.get("data", []) if isinstance(cookie_list, list): self._configs = [] for item in cookie_list: if not isinstance(item, dict): continue brand_id = str(item.get("brand_id", "")) aadvid = str(item.get("aadvid", "")) # T-027: 直接使用 auth_token 或 sessionid_cookie 完整值 auth_token = item.get("auth_token") or item.get("sessionid_cookie", "") industry_id = item.get("industry_id", 0) brand_name = item.get("brand_name", "") if brand_id and aadvid and auth_token: self._configs.append(CookieConfig( brand_id=brand_id, aadvid=aadvid, auth_token=auth_token, industry_id=int(industry_id) if industry_id else 0, brand_name=brand_name, )) logger.info( f"SessionPool refreshed: {len(self._configs)} configs" ) return len(self._configs) > 0 logger.warning( f"Failed to refresh session pool: status={response.status_code}" ) return False except httpx.TimeoutException: logger.error("SessionPool refresh timeout") return False except httpx.RequestError as e: logger.error(f"SessionPool refresh request error: {e}") return False except Exception as e: logger.error(f"SessionPool refresh unexpected error: {e}") return False def get_random_config(self) -> Optional[Dict[str, Any]]: """ T-027: 随机选取任意一组配置。 Returns: Dict or None: 包含 aadvid 和 auth_token 的字典 """ if not self._configs: return None config = random.choice(self._configs) return { "brand_id": config.brand_id, "aadvid": config.aadvid, "auth_token": config.auth_token, "industry_id": config.industry_id, "brand_name": config.brand_name, } def remove_by_auth_token(self, auth_token: str) -> None: """ 从池中移除失效的配置。 Args: auth_token: 要移除的 auth_token """ self._configs = [c for c in self._configs if c.auth_token != auth_token] logger.info(f"Removed invalid config: {auth_token[:20]}...") # 兼容旧接口 def remove(self, session_id: str) -> None: """兼容旧接口:移除包含指定 session_id 的配置""" self._configs = [c for c in self._configs if session_id not in c.auth_token] @property def size(self) -> int: """返回池中配置数量""" return len(self._configs) @property def is_empty(self) -> bool: """检查池是否为空""" return len(self._configs) == 0 def get_distinct_configs(self, count: int) -> List[Dict[str, Any]]: """ 获取 count 个不同的配置,用于并发调用。 - 池中配置 >= count:随机抽样 count 个不重复的 - 池中配置 < count:全部取出,循环复用补足 - 池为空:返回空列表 Args: count: 需要的配置数量 Returns: List[Dict]: 配置字典列表 """ if not self._configs or count <= 0: return [] def _to_dict(config: CookieConfig) -> Dict[str, Any]: return { "brand_id": config.brand_id, "aadvid": config.aadvid, "auth_token": config.auth_token, "industry_id": config.industry_id, "brand_name": config.brand_name, } if len(self._configs) >= count: sampled = random.sample(self._configs, count) return [_to_dict(c) for c in sampled] # 池中配置不足,全部取出后循环复用 result = [_to_dict(c) for c in self._configs] shuffled = list(self._configs) random.shuffle(shuffled) idx = 0 while len(result) < count: result.append(_to_dict(shuffled[idx % len(shuffled)])) idx += 1 return result # 兼容旧接口 def get_random(self) -> Optional[str]: """兼容旧接口:随机获取一个 SessionID""" config = self.get_random_config() if config: # 从 auth_token 中提取 sessionid auth_token = config["auth_token"] if "=" in auth_token: return auth_token.split("=", 1)[-1] return auth_token return None # 兼容旧代码 @property def _brand_configs(self) -> Dict[str, Any]: """兼容旧接口""" return {c.brand_id: c for c in self._configs} # 全局单例 session_pool = SessionPool() async def get_random_config(max_retries: int = 3) -> Optional[Dict[str, Any]]: """ T-027: 随机获取一组配置,必要时刷新池。 Args: max_retries: 最大重试次数 Returns: Dict or None: 包含 aadvid 和 auth_token 的字典 """ for attempt in range(max_retries): if session_pool.is_empty: success = await session_pool.refresh() if not success: logger.warning(f"Session pool refresh failed, attempt {attempt + 1}") continue config = session_pool.get_random_config() if config: return config logger.error("Failed to get config after all retries") return None # 兼容旧接口 async def get_session_with_retry(max_retries: int = 3) -> Optional[str]: """ 获取SessionID,必要时刷新池 (兼容旧接口)。 Args: max_retries: 最大重试次数 Returns: Optional[str]: SessionID,获取失败返回None """ config = await get_random_config(max_retries) if config: auth_token = config["auth_token"] if "=" in auth_token: return auth_token.split("=", 1)[-1] return auth_token return None async def get_distinct_configs(count: int, max_retries: int = 3) -> List[Dict[str, Any]]: """ 获取 count 个不同的配置,必要时刷新池。 Args: count: 需要的配置数量 max_retries: 最大重试次数 Returns: List[Dict]: 配置字典列表 """ for attempt in range(max_retries): if session_pool.is_empty: success = await session_pool.refresh() if not success: logger.warning(f"Session pool refresh failed, attempt {attempt + 1}") continue configs = session_pool.get_distinct_configs(count) if configs: return configs logger.error("Failed to get distinct configs after all retries") return [] async def get_config_for_brand(brand_id: str, max_retries: int = 3) -> Optional[Any]: """ 兼容旧接口:获取品牌对应的配置。 T-027: 实际上现在随机选取,不再按 brand_id 匹配。 """ return await get_random_config(max_retries)