- SessionPool 新增 get_distinct_configs 方法,支持获取不同配置用于并发调用 - video_analysis 重构为缓存优先策略:数据库有 A3/Cost 数据时直接使用 - 并发 API 调用预分配不同 cookie,避免 session 冲突 - API 数据写回数据库,实现下次查询缓存命中 - 新增 heated_cost 字段追踪 - 测试全面重写,覆盖缓存/API/混合/降级场景
761 lines
27 KiB
Python
761 lines
27 KiB
Python
"""
|
||
Tests for Video Analysis Service (T-024)
|
||
|
||
覆盖:
|
||
- calculate_cost_metrics 计算
|
||
- _needs_api_call 缓存判断
|
||
- get_video_analysis_data 详情页(缓存命中 / API 调用 / API 失败降级)
|
||
- update_video_a3_metrics(含 heated_cost)
|
||
- get_video_list_with_a3 列表页(混合缓存 + 并发 API)
|
||
"""
|
||
|
||
import pytest
|
||
from datetime import datetime
|
||
from unittest.mock import AsyncMock, patch, MagicMock
|
||
|
||
from app.services.video_analysis import (
|
||
_build_video_list_item,
|
||
_needs_api_call,
|
||
calculate_cost_metrics,
|
||
get_video_analysis_data,
|
||
get_video_list_with_a3,
|
||
update_video_a3_metrics,
|
||
)
|
||
from app.services.yuntu_api import YuntuAPIError
|
||
|
||
|
||
def _make_mock_video(**overrides):
|
||
"""创建标准 mock video 对象,带合理默认值。"""
|
||
defaults = {
|
||
"item_id": "video_123",
|
||
"title": "测试视频",
|
||
"video_url": "https://example.com/video",
|
||
"vid": "vid_123",
|
||
"star_id": "star_001",
|
||
"star_unique_id": "unique_001",
|
||
"star_nickname": "测试达人",
|
||
"star_uid": "uid_001",
|
||
"star_fans_cnt": 100000,
|
||
"star_mcn": "MCN1",
|
||
"publish_time": datetime(2025, 1, 15),
|
||
"create_date": datetime(2025, 1, 15),
|
||
"industry_name": "母婴",
|
||
"industry_id": "20",
|
||
"brand_id": "brand_001",
|
||
"hot_type": "爆款",
|
||
"viral_type": "爆款",
|
||
"is_hot": True,
|
||
"has_cart": False,
|
||
"total_play_cnt": 50000,
|
||
"natural_play_cnt": 40000,
|
||
"heated_play_cnt": 10000,
|
||
"total_interaction_cnt": 5000,
|
||
"total_interact": 5000,
|
||
"natural_interaction_cnt": 3000,
|
||
"heated_interaction_cnt": 2000,
|
||
"digg_cnt": 3000,
|
||
"like_cnt": 3000,
|
||
"share_cnt": 1000,
|
||
"comment_cnt": 1000,
|
||
"play_over_cnt": 20000,
|
||
"play_over_rate": 0.4,
|
||
"after_view_search_uv": 1000,
|
||
"after_view_search_cnt": 1200,
|
||
"after_view_search_rate": 0.02,
|
||
"back_search_cnt": 50,
|
||
"back_search_uv": 50,
|
||
"return_search_cnt": 50,
|
||
"new_a3_rate": 0.05,
|
||
"total_new_a3_cnt": 0,
|
||
"heated_new_a3_cnt": 0,
|
||
"natural_new_a3_cnt": 0,
|
||
"total_cost": 0.0,
|
||
"heated_cost": 0.0,
|
||
"star_task_cost": 0.0,
|
||
"search_cost": 0.0,
|
||
"ad_hot_roi": 0.0,
|
||
"estimated_video_cost": 10000.0,
|
||
"order_id": None,
|
||
"content_type": None,
|
||
"industry_tags": None,
|
||
"ad_hot_type": None,
|
||
"trend": None,
|
||
"trend_daily": None,
|
||
"trend_total": None,
|
||
"component_metric_list": None,
|
||
"key_word_after_search_infos": None,
|
||
"index_map": None,
|
||
"search_keywords": None,
|
||
"keywords": None,
|
||
"price_under_20s": None,
|
||
"price_20_60s": None,
|
||
"price_over_60s": None,
|
||
"video_duration": None,
|
||
"data_date": None,
|
||
"created_at": None,
|
||
"updated_at": None,
|
||
}
|
||
defaults.update(overrides)
|
||
mock = MagicMock()
|
||
for k, v in defaults.items():
|
||
setattr(mock, k, v)
|
||
return mock
|
||
|
||
|
||
class TestCalculateCostMetrics:
|
||
"""Tests for calculate_cost_metrics function."""
|
||
|
||
def test_all_metrics_calculated(self):
|
||
result = calculate_cost_metrics(
|
||
cost=10000,
|
||
natural_play_cnt=40000,
|
||
a3_increase_cnt=500,
|
||
natural_a3_increase_cnt=400,
|
||
after_view_search_uv=1000,
|
||
total_play_cnt=50000,
|
||
)
|
||
|
||
assert result["cpm"] == 200.0
|
||
assert result["natural_cpm"] == 250.0
|
||
assert result["cpa3"] == 20.0
|
||
assert result["natural_cpa3"] == 25.0
|
||
assert result["cp_search"] == 10.0
|
||
assert result["estimated_natural_search_uv"] == 800.0
|
||
assert result["natural_cp_search"] == 12.5
|
||
|
||
def test_zero_total_play_cnt(self):
|
||
result = calculate_cost_metrics(
|
||
cost=10000,
|
||
natural_play_cnt=0,
|
||
a3_increase_cnt=500,
|
||
natural_a3_increase_cnt=400,
|
||
after_view_search_uv=1000,
|
||
total_play_cnt=0,
|
||
)
|
||
|
||
assert result["cpm"] is None
|
||
assert result["natural_cpm"] is None
|
||
assert result["estimated_natural_search_uv"] is None
|
||
assert result["natural_cp_search"] is None
|
||
|
||
def test_zero_a3_counts(self):
|
||
result = calculate_cost_metrics(
|
||
cost=10000,
|
||
natural_play_cnt=40000,
|
||
a3_increase_cnt=0,
|
||
natural_a3_increase_cnt=0,
|
||
after_view_search_uv=1000,
|
||
total_play_cnt=50000,
|
||
)
|
||
|
||
assert result["cpa3"] is None
|
||
assert result["natural_cpa3"] is None
|
||
assert result["cpm"] == 200.0
|
||
|
||
def test_zero_search_uv(self):
|
||
result = calculate_cost_metrics(
|
||
cost=10000,
|
||
natural_play_cnt=40000,
|
||
a3_increase_cnt=500,
|
||
natural_a3_increase_cnt=400,
|
||
after_view_search_uv=0,
|
||
total_play_cnt=50000,
|
||
)
|
||
|
||
assert result["cp_search"] is None
|
||
assert result["estimated_natural_search_uv"] is None
|
||
assert result["natural_cp_search"] is None
|
||
|
||
def test_all_zeros(self):
|
||
result = calculate_cost_metrics(
|
||
cost=0,
|
||
natural_play_cnt=0,
|
||
a3_increase_cnt=0,
|
||
natural_a3_increase_cnt=0,
|
||
after_view_search_uv=0,
|
||
total_play_cnt=0,
|
||
)
|
||
|
||
assert result["cpm"] is None
|
||
assert result["natural_cpm"] is None
|
||
assert result["cpa3"] is None
|
||
assert result["natural_cpa3"] is None
|
||
assert result["cp_search"] is None
|
||
assert result["estimated_natural_search_uv"] is None
|
||
assert result["natural_cp_search"] is None
|
||
|
||
def test_decimal_precision(self):
|
||
result = calculate_cost_metrics(
|
||
cost=10000,
|
||
natural_play_cnt=30000,
|
||
a3_increase_cnt=333,
|
||
natural_a3_increase_cnt=111,
|
||
after_view_search_uv=777,
|
||
total_play_cnt=70000,
|
||
)
|
||
|
||
assert isinstance(result["cpm"], float)
|
||
assert len(str(result["cpm"]).split(".")[-1]) <= 2
|
||
|
||
|
||
class TestNeedsApiCall:
|
||
"""Tests for _needs_api_call helper."""
|
||
|
||
def test_needs_call_when_no_data(self):
|
||
"""A3=0 且 cost=0 → 需要调 API"""
|
||
video = _make_mock_video(total_new_a3_cnt=0, total_cost=0.0)
|
||
assert _needs_api_call(video) is True
|
||
|
||
def test_needs_call_when_none_values(self):
|
||
"""A3=None 且 cost=None → 需要调 API"""
|
||
video = _make_mock_video(total_new_a3_cnt=None, total_cost=None)
|
||
assert _needs_api_call(video) is True
|
||
|
||
def test_no_call_when_a3_exists(self):
|
||
"""有 A3 数据 → 不需要调 API"""
|
||
video = _make_mock_video(total_new_a3_cnt=500, total_cost=0.0)
|
||
assert _needs_api_call(video) is False
|
||
|
||
def test_no_call_when_cost_exists(self):
|
||
"""有 cost 数据 → 不需要调 API"""
|
||
video = _make_mock_video(total_new_a3_cnt=0, total_cost=10000.0)
|
||
assert _needs_api_call(video) is False
|
||
|
||
def test_no_call_when_both_exist(self):
|
||
"""A3 和 cost 都有 → 不需要调 API"""
|
||
video = _make_mock_video(total_new_a3_cnt=500, total_cost=10000.0)
|
||
assert _needs_api_call(video) is False
|
||
|
||
|
||
class TestGetVideoAnalysisData:
|
||
"""Tests for get_video_analysis_data function."""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_uses_db_when_cached(self):
|
||
"""数据库已有 A3/Cost → 直接使用,不调 API"""
|
||
mock_video = _make_mock_video(
|
||
total_new_a3_cnt=500,
|
||
heated_new_a3_cnt=100,
|
||
natural_new_a3_cnt=400,
|
||
total_cost=10000.0,
|
||
heated_cost=5000.0,
|
||
)
|
||
|
||
mock_session = AsyncMock()
|
||
mock_result = MagicMock()
|
||
mock_result.scalar_one_or_none.return_value = mock_video
|
||
mock_session.execute.return_value = mock_result
|
||
|
||
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
|
||
mock_brands.return_value = {"brand_001": "品牌A"}
|
||
|
||
with patch("app.services.video_analysis.fetch_yuntu_analysis") as mock_api:
|
||
result = await get_video_analysis_data(mock_session, "video_123")
|
||
|
||
# API 不应被调用
|
||
mock_api.assert_not_called()
|
||
|
||
# 验证使用了数据库数据
|
||
assert result["a3_metrics"]["total_new_a3_cnt"] == 500
|
||
assert result["a3_metrics"]["heated_new_a3_cnt"] == 100
|
||
assert result["a3_metrics"]["natural_new_a3_cnt"] == 400
|
||
assert result["cost_metrics"]["total_cost"] == 10000.0
|
||
assert result["cost_metrics"]["heated_cost"] == 5000.0
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_calls_api_and_saves_to_db(self):
|
||
"""数据库无数据 → 调 API → 写回 DB"""
|
||
mock_video = _make_mock_video(
|
||
total_new_a3_cnt=0,
|
||
total_cost=0.0,
|
||
heated_cost=0.0,
|
||
)
|
||
|
||
mock_session = AsyncMock()
|
||
mock_select_result = MagicMock()
|
||
mock_select_result.scalar_one_or_none.return_value = mock_video
|
||
|
||
mock_update_result = MagicMock()
|
||
mock_update_result.rowcount = 1
|
||
|
||
call_count = [0]
|
||
|
||
async def mock_execute(stmt):
|
||
stmt_str = str(stmt)
|
||
if "SELECT" in stmt_str.upper() or call_count[0] == 0:
|
||
call_count[0] += 1
|
||
return mock_select_result
|
||
return mock_update_result
|
||
|
||
mock_session.execute.side_effect = mock_execute
|
||
|
||
api_response = {
|
||
"code": 0,
|
||
"data": {
|
||
"a3_increase_cnt": "500",
|
||
"ad_a3_increase_cnt": "100",
|
||
"natural_a3_increase_cnt": "400",
|
||
"cost": 15000,
|
||
"ad_cost": 8000,
|
||
"natural_cost": 0,
|
||
},
|
||
}
|
||
|
||
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
|
||
mock_brands.return_value = {"brand_001": "品牌A"}
|
||
|
||
with patch("app.services.video_analysis.fetch_yuntu_analysis") as mock_api:
|
||
mock_api.return_value = api_response
|
||
|
||
result = await get_video_analysis_data(mock_session, "video_123")
|
||
|
||
# API 应被调用
|
||
mock_api.assert_called_once_with(
|
||
item_id="video_123",
|
||
publish_time=datetime(2025, 1, 15),
|
||
industry_id="20",
|
||
)
|
||
|
||
# 验证 A3 数据
|
||
assert result["a3_metrics"]["total_new_a3_cnt"] == 500
|
||
assert result["a3_metrics"]["heated_new_a3_cnt"] == 100
|
||
assert result["a3_metrics"]["natural_new_a3_cnt"] == 400
|
||
|
||
# 验证 cost
|
||
assert result["cost_metrics"]["total_cost"] == 15000
|
||
assert result["cost_metrics"]["heated_cost"] == 8000
|
||
|
||
# 验证计算指标存在
|
||
assert "estimated_cpm" in result["calculated_metrics"]
|
||
assert "estimated_natural_cpm" in result["calculated_metrics"]
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_video_not_found(self):
|
||
mock_session = AsyncMock()
|
||
mock_result = MagicMock()
|
||
mock_result.scalar_one_or_none.return_value = None
|
||
mock_session.execute.return_value = mock_result
|
||
|
||
with pytest.raises(ValueError) as exc_info:
|
||
await get_video_analysis_data(mock_session, "nonexistent")
|
||
|
||
assert "not found" in str(exc_info.value).lower()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_fallback_on_api_failure(self):
|
||
"""API 失败 → 降级使用数据库数据"""
|
||
mock_video = _make_mock_video(
|
||
total_new_a3_cnt=0,
|
||
heated_new_a3_cnt=0,
|
||
natural_new_a3_cnt=0,
|
||
total_cost=0.0,
|
||
heated_cost=0.0,
|
||
)
|
||
|
||
mock_session = AsyncMock()
|
||
mock_result = MagicMock()
|
||
mock_result.scalar_one_or_none.return_value = mock_video
|
||
mock_session.execute.return_value = mock_result
|
||
|
||
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
|
||
mock_brands.return_value = {}
|
||
|
||
with patch("app.services.video_analysis.fetch_yuntu_analysis") as mock_api:
|
||
mock_api.side_effect = YuntuAPIError("API Error")
|
||
|
||
result = await get_video_analysis_data(mock_session, "video_123")
|
||
|
||
# 降级使用 DB 数据(都是 0)
|
||
assert result["a3_metrics"]["total_new_a3_cnt"] == 0
|
||
assert result["cost_metrics"]["total_cost"] == 0.0
|
||
|
||
# 基础信息仍然正常
|
||
assert result["base_info"]["vid"] == "video_123"
|
||
assert result["reach_metrics"]["total_play_cnt"] == 50000
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_null_publish_time(self):
|
||
mock_video = _make_mock_video(
|
||
publish_time=None,
|
||
create_date=None,
|
||
total_new_a3_cnt=0,
|
||
total_cost=0.0,
|
||
total_play_cnt=0,
|
||
natural_play_cnt=0,
|
||
heated_play_cnt=0,
|
||
after_view_search_uv=0,
|
||
)
|
||
|
||
mock_session = AsyncMock()
|
||
mock_result = MagicMock()
|
||
mock_result.scalar_one_or_none.return_value = mock_video
|
||
mock_session.execute.return_value = mock_result
|
||
|
||
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
|
||
mock_brands.return_value = {}
|
||
|
||
with patch("app.services.video_analysis.fetch_yuntu_analysis") as mock_api:
|
||
mock_api.return_value = {"code": 0, "data": {}}
|
||
|
||
result = await get_video_analysis_data(mock_session, "video_123")
|
||
|
||
assert result["base_info"]["create_date"] is None
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_response_structure(self):
|
||
"""验证返回数据包含所有 6 大类"""
|
||
mock_video = _make_mock_video(total_new_a3_cnt=500, total_cost=10000.0)
|
||
|
||
mock_session = AsyncMock()
|
||
mock_result = MagicMock()
|
||
mock_result.scalar_one_or_none.return_value = mock_video
|
||
mock_session.execute.return_value = mock_result
|
||
|
||
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
|
||
mock_brands.return_value = {}
|
||
|
||
result = await get_video_analysis_data(mock_session, "video_123")
|
||
|
||
assert "base_info" in result
|
||
assert "reach_metrics" in result
|
||
assert "a3_metrics" in result
|
||
assert "search_metrics" in result
|
||
assert "cost_metrics" in result
|
||
assert "calculated_metrics" in result
|
||
|
||
# base_info 关键字段
|
||
assert "star_nickname" in result["base_info"]
|
||
assert "vid" in result["base_info"]
|
||
assert "brand_name" in result["base_info"]
|
||
|
||
# reach_metrics 关键字段
|
||
assert "total_play_cnt" in result["reach_metrics"]
|
||
assert "natural_play_cnt" in result["reach_metrics"]
|
||
|
||
|
||
class TestUpdateVideoA3Metrics:
|
||
"""Tests for update_video_a3_metrics function (T-025)."""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_update_success(self):
|
||
mock_session = AsyncMock()
|
||
mock_result = MagicMock()
|
||
mock_result.rowcount = 1
|
||
mock_session.execute.return_value = mock_result
|
||
|
||
result = await update_video_a3_metrics(
|
||
session=mock_session,
|
||
item_id="video_123",
|
||
total_new_a3_cnt=500,
|
||
heated_new_a3_cnt=100,
|
||
natural_new_a3_cnt=400,
|
||
total_cost=10000.0,
|
||
)
|
||
|
||
assert result is True
|
||
mock_session.commit.assert_called_once()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_update_with_heated_cost(self):
|
||
"""验证 heated_cost 参数正常传递"""
|
||
mock_session = AsyncMock()
|
||
mock_result = MagicMock()
|
||
mock_result.rowcount = 1
|
||
mock_session.execute.return_value = mock_result
|
||
|
||
result = await update_video_a3_metrics(
|
||
session=mock_session,
|
||
item_id="video_123",
|
||
total_new_a3_cnt=500,
|
||
heated_new_a3_cnt=100,
|
||
natural_new_a3_cnt=400,
|
||
total_cost=15000.0,
|
||
heated_cost=8000.0,
|
||
)
|
||
|
||
assert result is True
|
||
mock_session.commit.assert_called_once()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_update_video_not_found(self):
|
||
mock_session = AsyncMock()
|
||
mock_result = MagicMock()
|
||
mock_result.rowcount = 0
|
||
mock_session.execute.return_value = mock_result
|
||
|
||
result = await update_video_a3_metrics(
|
||
session=mock_session,
|
||
item_id="nonexistent",
|
||
total_new_a3_cnt=500,
|
||
heated_new_a3_cnt=100,
|
||
natural_new_a3_cnt=400,
|
||
total_cost=10000.0,
|
||
)
|
||
|
||
assert result is False
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_update_database_error(self):
|
||
mock_session = AsyncMock()
|
||
mock_session.execute.side_effect = Exception("Database error")
|
||
|
||
result = await update_video_a3_metrics(
|
||
session=mock_session,
|
||
item_id="video_123",
|
||
total_new_a3_cnt=500,
|
||
heated_new_a3_cnt=100,
|
||
natural_new_a3_cnt=400,
|
||
total_cost=10000.0,
|
||
)
|
||
|
||
assert result is False
|
||
mock_session.rollback.assert_called_once()
|
||
|
||
|
||
class TestBuildVideoListItem:
|
||
"""Tests for _build_video_list_item helper."""
|
||
|
||
def test_build_item_with_full_data(self):
|
||
video = _make_mock_video(
|
||
total_play_cnt=50000,
|
||
natural_play_cnt=40000,
|
||
after_view_search_uv=1000,
|
||
estimated_video_cost=10000.0,
|
||
)
|
||
|
||
result = _build_video_list_item(
|
||
video=video,
|
||
a3_increase_cnt=500,
|
||
ad_a3_increase_cnt=100,
|
||
natural_a3_increase_cnt=400,
|
||
api_cost=15000.0,
|
||
brand_name="品牌A",
|
||
)
|
||
|
||
assert result["item_id"] == "video_123"
|
||
assert result["brand_name"] == "品牌A"
|
||
assert result["total_new_a3_cnt"] == 500
|
||
assert result["estimated_natural_cpm"] is not None
|
||
assert result["estimated_cp_a3"] == 30.0 # 15000/500
|
||
|
||
def test_build_item_zero_division(self):
|
||
"""分母为 0 时应返回 None"""
|
||
video = _make_mock_video(
|
||
total_play_cnt=0,
|
||
natural_play_cnt=0,
|
||
after_view_search_uv=0,
|
||
estimated_video_cost=0.0,
|
||
)
|
||
|
||
result = _build_video_list_item(
|
||
video=video,
|
||
a3_increase_cnt=0,
|
||
ad_a3_increase_cnt=0,
|
||
natural_a3_increase_cnt=0,
|
||
api_cost=0.0,
|
||
brand_name="",
|
||
)
|
||
|
||
assert result["estimated_natural_cpm"] is None
|
||
assert result["estimated_cp_a3"] is None
|
||
assert result["estimated_natural_cp_a3"] is None
|
||
assert result["estimated_cp_search"] is None
|
||
assert result["estimated_natural_cp_search"] is None
|
||
|
||
|
||
class TestGetVideoListWithA3:
|
||
"""Tests for get_video_list_with_a3 function."""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_all_cached(self):
|
||
"""所有视频都有缓存 → 不调 API"""
|
||
videos = [
|
||
_make_mock_video(
|
||
item_id="v1", total_new_a3_cnt=500, total_cost=10000.0, brand_id="b1"
|
||
),
|
||
_make_mock_video(
|
||
item_id="v2", total_new_a3_cnt=300, total_cost=8000.0, brand_id="b2"
|
||
),
|
||
]
|
||
|
||
mock_session = AsyncMock()
|
||
|
||
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
|
||
mock_brands.return_value = {"b1": "品牌1", "b2": "品牌2"}
|
||
|
||
with patch("app.services.video_analysis.call_yuntu_api") as mock_api:
|
||
result = await get_video_list_with_a3(mock_session, videos)
|
||
|
||
mock_api.assert_not_called()
|
||
assert len(result) == 2
|
||
assert result[0]["item_id"] == "v1"
|
||
assert result[0]["total_new_a3_cnt"] == 500
|
||
assert result[1]["item_id"] == "v2"
|
||
assert result[1]["total_new_a3_cnt"] == 300
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_all_need_api(self):
|
||
"""所有视频都需要 API → 并发调用 → 首次即返回正确数据 → gather 后顺序写 DB"""
|
||
videos = [
|
||
_make_mock_video(
|
||
item_id="v1", total_new_a3_cnt=0, total_cost=0.0, brand_id="b1"
|
||
),
|
||
_make_mock_video(
|
||
item_id="v2", total_new_a3_cnt=0, total_cost=0.0, brand_id="b2"
|
||
),
|
||
]
|
||
|
||
mock_session = AsyncMock()
|
||
mock_update_result = MagicMock()
|
||
mock_update_result.rowcount = 1
|
||
mock_session.execute.return_value = mock_update_result
|
||
|
||
api_response = {
|
||
"data": {
|
||
"a3_increase_cnt": "200",
|
||
"ad_a3_increase_cnt": "50",
|
||
"natural_a3_increase_cnt": "150",
|
||
"cost": 5000,
|
||
"ad_cost": 3000,
|
||
}
|
||
}
|
||
|
||
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
|
||
mock_brands.return_value = {}
|
||
|
||
with patch("app.services.video_analysis.call_yuntu_api", new_callable=AsyncMock) as mock_api:
|
||
mock_api.return_value = api_response
|
||
|
||
with patch("app.services.video_analysis.get_distinct_configs", new_callable=AsyncMock) as mock_configs:
|
||
mock_configs.return_value = [
|
||
{"aadvid": "aad1", "auth_token": "tok1"},
|
||
{"aadvid": "aad2", "auth_token": "tok2"},
|
||
]
|
||
|
||
with patch("app.services.video_analysis.update_video_a3_metrics", new_callable=AsyncMock) as mock_update:
|
||
mock_update.return_value = True
|
||
|
||
result = await get_video_list_with_a3(mock_session, videos)
|
||
|
||
assert len(result) == 2
|
||
assert mock_api.call_count == 2
|
||
|
||
# 首次查询即返回正确 API 数据(核心:不依赖 DB 写入成功)
|
||
assert result[0]["total_new_a3_cnt"] == 200
|
||
assert result[1]["total_new_a3_cnt"] == 200
|
||
|
||
# 验证两个视频用了不同 config
|
||
api_calls = mock_api.call_args_list
|
||
tokens = {c.kwargs["auth_token"] for c in api_calls}
|
||
assert len(tokens) == 2
|
||
|
||
# DB 写入在 gather 之后顺序执行
|
||
assert mock_update.call_count == 2
|
||
update_item_ids = [c.kwargs["item_id"] for c in mock_update.call_args_list]
|
||
assert "v1" in update_item_ids
|
||
assert "v2" in update_item_ids
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_mixed_cached_and_api(self):
|
||
"""混合场景:部分缓存,部分需 API → 只对 API 成功的写 DB"""
|
||
videos = [
|
||
_make_mock_video(
|
||
item_id="v1", total_new_a3_cnt=500, total_cost=10000.0, brand_id="b1"
|
||
),
|
||
_make_mock_video(
|
||
item_id="v2", total_new_a3_cnt=0, total_cost=0.0, brand_id="b2"
|
||
),
|
||
_make_mock_video(
|
||
item_id="v3", total_new_a3_cnt=300, total_cost=5000.0, brand_id="b3"
|
||
),
|
||
]
|
||
|
||
mock_session = AsyncMock()
|
||
|
||
api_response = {
|
||
"data": {
|
||
"a3_increase_cnt": "200",
|
||
"ad_a3_increase_cnt": "50",
|
||
"natural_a3_increase_cnt": "150",
|
||
"cost": 5000,
|
||
"ad_cost": 3000,
|
||
}
|
||
}
|
||
|
||
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
|
||
mock_brands.return_value = {}
|
||
|
||
with patch("app.services.video_analysis.call_yuntu_api", new_callable=AsyncMock) as mock_api:
|
||
mock_api.return_value = api_response
|
||
|
||
with patch("app.services.video_analysis.get_distinct_configs", new_callable=AsyncMock) as mock_configs:
|
||
mock_configs.return_value = [
|
||
{"aadvid": "aad1", "auth_token": "tok1"},
|
||
]
|
||
|
||
with patch("app.services.video_analysis.update_video_a3_metrics", new_callable=AsyncMock) as mock_update:
|
||
mock_update.return_value = True
|
||
|
||
result = await get_video_list_with_a3(mock_session, videos)
|
||
|
||
# 保持原始排序
|
||
assert len(result) == 3
|
||
assert result[0]["item_id"] == "v1"
|
||
assert result[0]["total_new_a3_cnt"] == 500 # from DB
|
||
assert result[1]["item_id"] == "v2"
|
||
assert result[1]["total_new_a3_cnt"] == 200 # from API
|
||
assert result[2]["item_id"] == "v3"
|
||
assert result[2]["total_new_a3_cnt"] == 300 # from DB
|
||
|
||
# 只有 v2 调了 API
|
||
assert mock_api.call_count == 1
|
||
|
||
# 只对 v2 写回 DB
|
||
assert mock_update.call_count == 1
|
||
assert mock_update.call_args.kwargs["item_id"] == "v2"
|
||
assert mock_update.call_args.kwargs["total_new_a3_cnt"] == 200
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_empty_list(self):
|
||
"""空列表 → 返回空"""
|
||
mock_session = AsyncMock()
|
||
|
||
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
|
||
mock_brands.return_value = {}
|
||
|
||
result = await get_video_list_with_a3(mock_session, [])
|
||
|
||
assert result == []
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_api_failure_fallback(self):
|
||
"""API 调用失败 → 降级使用 DB 数据 → 不写回 DB"""
|
||
videos = [
|
||
_make_mock_video(
|
||
item_id="v1", total_new_a3_cnt=0, total_cost=0.0, brand_id="b1"
|
||
),
|
||
]
|
||
|
||
mock_session = AsyncMock()
|
||
|
||
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
|
||
mock_brands.return_value = {}
|
||
|
||
with patch("app.services.video_analysis.call_yuntu_api", new_callable=AsyncMock) as mock_api:
|
||
mock_api.side_effect = YuntuAPIError("API Error")
|
||
|
||
with patch("app.services.video_analysis.get_distinct_configs", new_callable=AsyncMock) as mock_configs:
|
||
mock_configs.return_value = [
|
||
{"aadvid": "aad1", "auth_token": "tok1"},
|
||
]
|
||
|
||
with patch("app.services.video_analysis.update_video_a3_metrics", new_callable=AsyncMock) as mock_update:
|
||
result = await get_video_list_with_a3(mock_session, videos)
|
||
|
||
# 降级到 DB 数据
|
||
assert len(result) == 1
|
||
assert result[0]["total_new_a3_cnt"] == 0
|
||
|
||
# API 失败不应写回 DB
|
||
mock_update.assert_not_called()
|