- 实现查询API (query.py): 支持star_id/unique_id/nickname三种查询方式 - 实现计算模块 (calculator.py): CPM/自然搜索UV/搜索成本计算 - 实现品牌API集成 (brand_api.py): 批量并发调用,10并发限制 - 实现导出服务 (export_service.py): Excel/CSV导出 - 前端组件: QueryForm/ResultTable/ExportButton - 主页面集成: 支持6种页面状态 - 测试: 44个测试全部通过,覆盖率88% Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
84 lines
2.5 KiB
Python
84 lines
2.5 KiB
Python
import asyncio
|
|
from typing import Dict, List, Tuple
|
|
import httpx
|
|
import logging
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def fetch_brand_name(
|
|
brand_id: str,
|
|
semaphore: asyncio.Semaphore,
|
|
) -> Tuple[str, str]:
|
|
"""
|
|
获取单个品牌名称.
|
|
|
|
Args:
|
|
brand_id: 品牌ID
|
|
semaphore: 并发控制信号量
|
|
|
|
Returns:
|
|
(brand_id, brand_name) 元组, 失败时 brand_name 为 brand_id
|
|
"""
|
|
async with semaphore:
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=settings.BRAND_API_TIMEOUT
|
|
) as client:
|
|
response = await client.get(
|
|
f"{settings.BRAND_API_BASE_URL}/v1/yuntu/brands/{brand_id}"
|
|
)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
# 尝试从响应中获取品牌名称
|
|
if isinstance(data, dict):
|
|
name = data.get("data", {}).get("name") or data.get("name")
|
|
if name:
|
|
return brand_id, name
|
|
except httpx.TimeoutException:
|
|
logger.warning(f"Brand API timeout for brand_id: {brand_id}")
|
|
except httpx.RequestError as e:
|
|
logger.warning(f"Brand API request error for brand_id: {brand_id}, error: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error fetching brand {brand_id}: {e}")
|
|
|
|
# 失败时降级返回 brand_id
|
|
return brand_id, brand_id
|
|
|
|
|
|
async def get_brand_names(brand_ids: List[str]) -> Dict[str, str]:
|
|
"""
|
|
批量获取品牌名称.
|
|
|
|
Args:
|
|
brand_ids: 品牌ID列表
|
|
|
|
Returns:
|
|
brand_id -> brand_name 映射字典
|
|
"""
|
|
# 过滤空值并去重
|
|
unique_ids = list(set(filter(None, brand_ids)))
|
|
|
|
if not unique_ids:
|
|
return {}
|
|
|
|
# 创建并发控制信号量
|
|
semaphore = asyncio.Semaphore(settings.BRAND_API_CONCURRENCY)
|
|
|
|
# 批量并发请求
|
|
tasks = [fetch_brand_name(brand_id, semaphore) for brand_id in unique_ids]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# 构建映射表
|
|
brand_map: Dict[str, str] = {}
|
|
for result in results:
|
|
if isinstance(result, tuple):
|
|
brand_id, brand_name = result
|
|
brand_map[brand_id] = brand_name
|
|
elif isinstance(result, Exception):
|
|
logger.error(f"Error in batch brand fetch: {result}")
|
|
|
|
return brand_map
|