kol-insight/backend/tests/test_video_analysis.py
zfc 376f0be6b4 feat(backend): 视频分析模块增加缓存优先策略和并发API调用
- SessionPool 新增 get_distinct_configs 方法,支持获取不同配置用于并发调用
- video_analysis 重构为缓存优先策略:数据库有 A3/Cost 数据时直接使用
- 并发 API 调用预分配不同 cookie,避免 session 冲突
- API 数据写回数据库,实现下次查询缓存命中
- 新增 heated_cost 字段追踪
- 测试全面重写,覆盖缓存/API/混合/降级场景
2026-01-29 18:21:50 +08:00

761 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Tests for Video Analysis Service (T-024)
覆盖:
- calculate_cost_metrics 计算
- _needs_api_call 缓存判断
- get_video_analysis_data 详情页(缓存命中 / API 调用 / API 失败降级)
- update_video_a3_metrics含 heated_cost
- get_video_list_with_a3 列表页(混合缓存 + 并发 API
"""
import pytest
from datetime import datetime
from unittest.mock import AsyncMock, patch, MagicMock
from app.services.video_analysis import (
_build_video_list_item,
_needs_api_call,
calculate_cost_metrics,
get_video_analysis_data,
get_video_list_with_a3,
update_video_a3_metrics,
)
from app.services.yuntu_api import YuntuAPIError
def _make_mock_video(**overrides):
"""创建标准 mock video 对象,带合理默认值。"""
defaults = {
"item_id": "video_123",
"title": "测试视频",
"video_url": "https://example.com/video",
"vid": "vid_123",
"star_id": "star_001",
"star_unique_id": "unique_001",
"star_nickname": "测试达人",
"star_uid": "uid_001",
"star_fans_cnt": 100000,
"star_mcn": "MCN1",
"publish_time": datetime(2025, 1, 15),
"create_date": datetime(2025, 1, 15),
"industry_name": "母婴",
"industry_id": "20",
"brand_id": "brand_001",
"hot_type": "爆款",
"viral_type": "爆款",
"is_hot": True,
"has_cart": False,
"total_play_cnt": 50000,
"natural_play_cnt": 40000,
"heated_play_cnt": 10000,
"total_interaction_cnt": 5000,
"total_interact": 5000,
"natural_interaction_cnt": 3000,
"heated_interaction_cnt": 2000,
"digg_cnt": 3000,
"like_cnt": 3000,
"share_cnt": 1000,
"comment_cnt": 1000,
"play_over_cnt": 20000,
"play_over_rate": 0.4,
"after_view_search_uv": 1000,
"after_view_search_cnt": 1200,
"after_view_search_rate": 0.02,
"back_search_cnt": 50,
"back_search_uv": 50,
"return_search_cnt": 50,
"new_a3_rate": 0.05,
"total_new_a3_cnt": 0,
"heated_new_a3_cnt": 0,
"natural_new_a3_cnt": 0,
"total_cost": 0.0,
"heated_cost": 0.0,
"star_task_cost": 0.0,
"search_cost": 0.0,
"ad_hot_roi": 0.0,
"estimated_video_cost": 10000.0,
"order_id": None,
"content_type": None,
"industry_tags": None,
"ad_hot_type": None,
"trend": None,
"trend_daily": None,
"trend_total": None,
"component_metric_list": None,
"key_word_after_search_infos": None,
"index_map": None,
"search_keywords": None,
"keywords": None,
"price_under_20s": None,
"price_20_60s": None,
"price_over_60s": None,
"video_duration": None,
"data_date": None,
"created_at": None,
"updated_at": None,
}
defaults.update(overrides)
mock = MagicMock()
for k, v in defaults.items():
setattr(mock, k, v)
return mock
class TestCalculateCostMetrics:
"""Tests for calculate_cost_metrics function."""
def test_all_metrics_calculated(self):
result = calculate_cost_metrics(
cost=10000,
natural_play_cnt=40000,
a3_increase_cnt=500,
natural_a3_increase_cnt=400,
after_view_search_uv=1000,
total_play_cnt=50000,
)
assert result["cpm"] == 200.0
assert result["natural_cpm"] == 250.0
assert result["cpa3"] == 20.0
assert result["natural_cpa3"] == 25.0
assert result["cp_search"] == 10.0
assert result["estimated_natural_search_uv"] == 800.0
assert result["natural_cp_search"] == 12.5
def test_zero_total_play_cnt(self):
result = calculate_cost_metrics(
cost=10000,
natural_play_cnt=0,
a3_increase_cnt=500,
natural_a3_increase_cnt=400,
after_view_search_uv=1000,
total_play_cnt=0,
)
assert result["cpm"] is None
assert result["natural_cpm"] is None
assert result["estimated_natural_search_uv"] is None
assert result["natural_cp_search"] is None
def test_zero_a3_counts(self):
result = calculate_cost_metrics(
cost=10000,
natural_play_cnt=40000,
a3_increase_cnt=0,
natural_a3_increase_cnt=0,
after_view_search_uv=1000,
total_play_cnt=50000,
)
assert result["cpa3"] is None
assert result["natural_cpa3"] is None
assert result["cpm"] == 200.0
def test_zero_search_uv(self):
result = calculate_cost_metrics(
cost=10000,
natural_play_cnt=40000,
a3_increase_cnt=500,
natural_a3_increase_cnt=400,
after_view_search_uv=0,
total_play_cnt=50000,
)
assert result["cp_search"] is None
assert result["estimated_natural_search_uv"] is None
assert result["natural_cp_search"] is None
def test_all_zeros(self):
result = calculate_cost_metrics(
cost=0,
natural_play_cnt=0,
a3_increase_cnt=0,
natural_a3_increase_cnt=0,
after_view_search_uv=0,
total_play_cnt=0,
)
assert result["cpm"] is None
assert result["natural_cpm"] is None
assert result["cpa3"] is None
assert result["natural_cpa3"] is None
assert result["cp_search"] is None
assert result["estimated_natural_search_uv"] is None
assert result["natural_cp_search"] is None
def test_decimal_precision(self):
result = calculate_cost_metrics(
cost=10000,
natural_play_cnt=30000,
a3_increase_cnt=333,
natural_a3_increase_cnt=111,
after_view_search_uv=777,
total_play_cnt=70000,
)
assert isinstance(result["cpm"], float)
assert len(str(result["cpm"]).split(".")[-1]) <= 2
class TestNeedsApiCall:
"""Tests for _needs_api_call helper."""
def test_needs_call_when_no_data(self):
"""A3=0 且 cost=0 → 需要调 API"""
video = _make_mock_video(total_new_a3_cnt=0, total_cost=0.0)
assert _needs_api_call(video) is True
def test_needs_call_when_none_values(self):
"""A3=None 且 cost=None → 需要调 API"""
video = _make_mock_video(total_new_a3_cnt=None, total_cost=None)
assert _needs_api_call(video) is True
def test_no_call_when_a3_exists(self):
"""有 A3 数据 → 不需要调 API"""
video = _make_mock_video(total_new_a3_cnt=500, total_cost=0.0)
assert _needs_api_call(video) is False
def test_no_call_when_cost_exists(self):
"""有 cost 数据 → 不需要调 API"""
video = _make_mock_video(total_new_a3_cnt=0, total_cost=10000.0)
assert _needs_api_call(video) is False
def test_no_call_when_both_exist(self):
"""A3 和 cost 都有 → 不需要调 API"""
video = _make_mock_video(total_new_a3_cnt=500, total_cost=10000.0)
assert _needs_api_call(video) is False
class TestGetVideoAnalysisData:
"""Tests for get_video_analysis_data function."""
@pytest.mark.asyncio
async def test_uses_db_when_cached(self):
"""数据库已有 A3/Cost → 直接使用,不调 API"""
mock_video = _make_mock_video(
total_new_a3_cnt=500,
heated_new_a3_cnt=100,
natural_new_a3_cnt=400,
total_cost=10000.0,
heated_cost=5000.0,
)
mock_session = AsyncMock()
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_video
mock_session.execute.return_value = mock_result
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
mock_brands.return_value = {"brand_001": "品牌A"}
with patch("app.services.video_analysis.fetch_yuntu_analysis") as mock_api:
result = await get_video_analysis_data(mock_session, "video_123")
# API 不应被调用
mock_api.assert_not_called()
# 验证使用了数据库数据
assert result["a3_metrics"]["total_new_a3_cnt"] == 500
assert result["a3_metrics"]["heated_new_a3_cnt"] == 100
assert result["a3_metrics"]["natural_new_a3_cnt"] == 400
assert result["cost_metrics"]["total_cost"] == 10000.0
assert result["cost_metrics"]["heated_cost"] == 5000.0
@pytest.mark.asyncio
async def test_calls_api_and_saves_to_db(self):
"""数据库无数据 → 调 API → 写回 DB"""
mock_video = _make_mock_video(
total_new_a3_cnt=0,
total_cost=0.0,
heated_cost=0.0,
)
mock_session = AsyncMock()
mock_select_result = MagicMock()
mock_select_result.scalar_one_or_none.return_value = mock_video
mock_update_result = MagicMock()
mock_update_result.rowcount = 1
call_count = [0]
async def mock_execute(stmt):
stmt_str = str(stmt)
if "SELECT" in stmt_str.upper() or call_count[0] == 0:
call_count[0] += 1
return mock_select_result
return mock_update_result
mock_session.execute.side_effect = mock_execute
api_response = {
"code": 0,
"data": {
"a3_increase_cnt": "500",
"ad_a3_increase_cnt": "100",
"natural_a3_increase_cnt": "400",
"cost": 15000,
"ad_cost": 8000,
"natural_cost": 0,
},
}
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
mock_brands.return_value = {"brand_001": "品牌A"}
with patch("app.services.video_analysis.fetch_yuntu_analysis") as mock_api:
mock_api.return_value = api_response
result = await get_video_analysis_data(mock_session, "video_123")
# API 应被调用
mock_api.assert_called_once_with(
item_id="video_123",
publish_time=datetime(2025, 1, 15),
industry_id="20",
)
# 验证 A3 数据
assert result["a3_metrics"]["total_new_a3_cnt"] == 500
assert result["a3_metrics"]["heated_new_a3_cnt"] == 100
assert result["a3_metrics"]["natural_new_a3_cnt"] == 400
# 验证 cost
assert result["cost_metrics"]["total_cost"] == 15000
assert result["cost_metrics"]["heated_cost"] == 8000
# 验证计算指标存在
assert "estimated_cpm" in result["calculated_metrics"]
assert "estimated_natural_cpm" in result["calculated_metrics"]
@pytest.mark.asyncio
async def test_video_not_found(self):
mock_session = AsyncMock()
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = None
mock_session.execute.return_value = mock_result
with pytest.raises(ValueError) as exc_info:
await get_video_analysis_data(mock_session, "nonexistent")
assert "not found" in str(exc_info.value).lower()
@pytest.mark.asyncio
async def test_fallback_on_api_failure(self):
"""API 失败 → 降级使用数据库数据"""
mock_video = _make_mock_video(
total_new_a3_cnt=0,
heated_new_a3_cnt=0,
natural_new_a3_cnt=0,
total_cost=0.0,
heated_cost=0.0,
)
mock_session = AsyncMock()
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_video
mock_session.execute.return_value = mock_result
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
mock_brands.return_value = {}
with patch("app.services.video_analysis.fetch_yuntu_analysis") as mock_api:
mock_api.side_effect = YuntuAPIError("API Error")
result = await get_video_analysis_data(mock_session, "video_123")
# 降级使用 DB 数据(都是 0
assert result["a3_metrics"]["total_new_a3_cnt"] == 0
assert result["cost_metrics"]["total_cost"] == 0.0
# 基础信息仍然正常
assert result["base_info"]["vid"] == "video_123"
assert result["reach_metrics"]["total_play_cnt"] == 50000
@pytest.mark.asyncio
async def test_null_publish_time(self):
mock_video = _make_mock_video(
publish_time=None,
create_date=None,
total_new_a3_cnt=0,
total_cost=0.0,
total_play_cnt=0,
natural_play_cnt=0,
heated_play_cnt=0,
after_view_search_uv=0,
)
mock_session = AsyncMock()
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_video
mock_session.execute.return_value = mock_result
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
mock_brands.return_value = {}
with patch("app.services.video_analysis.fetch_yuntu_analysis") as mock_api:
mock_api.return_value = {"code": 0, "data": {}}
result = await get_video_analysis_data(mock_session, "video_123")
assert result["base_info"]["create_date"] is None
@pytest.mark.asyncio
async def test_response_structure(self):
"""验证返回数据包含所有 6 大类"""
mock_video = _make_mock_video(total_new_a3_cnt=500, total_cost=10000.0)
mock_session = AsyncMock()
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_video
mock_session.execute.return_value = mock_result
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
mock_brands.return_value = {}
result = await get_video_analysis_data(mock_session, "video_123")
assert "base_info" in result
assert "reach_metrics" in result
assert "a3_metrics" in result
assert "search_metrics" in result
assert "cost_metrics" in result
assert "calculated_metrics" in result
# base_info 关键字段
assert "star_nickname" in result["base_info"]
assert "vid" in result["base_info"]
assert "brand_name" in result["base_info"]
# reach_metrics 关键字段
assert "total_play_cnt" in result["reach_metrics"]
assert "natural_play_cnt" in result["reach_metrics"]
class TestUpdateVideoA3Metrics:
"""Tests for update_video_a3_metrics function (T-025)."""
@pytest.mark.asyncio
async def test_update_success(self):
mock_session = AsyncMock()
mock_result = MagicMock()
mock_result.rowcount = 1
mock_session.execute.return_value = mock_result
result = await update_video_a3_metrics(
session=mock_session,
item_id="video_123",
total_new_a3_cnt=500,
heated_new_a3_cnt=100,
natural_new_a3_cnt=400,
total_cost=10000.0,
)
assert result is True
mock_session.commit.assert_called_once()
@pytest.mark.asyncio
async def test_update_with_heated_cost(self):
"""验证 heated_cost 参数正常传递"""
mock_session = AsyncMock()
mock_result = MagicMock()
mock_result.rowcount = 1
mock_session.execute.return_value = mock_result
result = await update_video_a3_metrics(
session=mock_session,
item_id="video_123",
total_new_a3_cnt=500,
heated_new_a3_cnt=100,
natural_new_a3_cnt=400,
total_cost=15000.0,
heated_cost=8000.0,
)
assert result is True
mock_session.commit.assert_called_once()
@pytest.mark.asyncio
async def test_update_video_not_found(self):
mock_session = AsyncMock()
mock_result = MagicMock()
mock_result.rowcount = 0
mock_session.execute.return_value = mock_result
result = await update_video_a3_metrics(
session=mock_session,
item_id="nonexistent",
total_new_a3_cnt=500,
heated_new_a3_cnt=100,
natural_new_a3_cnt=400,
total_cost=10000.0,
)
assert result is False
@pytest.mark.asyncio
async def test_update_database_error(self):
mock_session = AsyncMock()
mock_session.execute.side_effect = Exception("Database error")
result = await update_video_a3_metrics(
session=mock_session,
item_id="video_123",
total_new_a3_cnt=500,
heated_new_a3_cnt=100,
natural_new_a3_cnt=400,
total_cost=10000.0,
)
assert result is False
mock_session.rollback.assert_called_once()
class TestBuildVideoListItem:
"""Tests for _build_video_list_item helper."""
def test_build_item_with_full_data(self):
video = _make_mock_video(
total_play_cnt=50000,
natural_play_cnt=40000,
after_view_search_uv=1000,
estimated_video_cost=10000.0,
)
result = _build_video_list_item(
video=video,
a3_increase_cnt=500,
ad_a3_increase_cnt=100,
natural_a3_increase_cnt=400,
api_cost=15000.0,
brand_name="品牌A",
)
assert result["item_id"] == "video_123"
assert result["brand_name"] == "品牌A"
assert result["total_new_a3_cnt"] == 500
assert result["estimated_natural_cpm"] is not None
assert result["estimated_cp_a3"] == 30.0 # 15000/500
def test_build_item_zero_division(self):
"""分母为 0 时应返回 None"""
video = _make_mock_video(
total_play_cnt=0,
natural_play_cnt=0,
after_view_search_uv=0,
estimated_video_cost=0.0,
)
result = _build_video_list_item(
video=video,
a3_increase_cnt=0,
ad_a3_increase_cnt=0,
natural_a3_increase_cnt=0,
api_cost=0.0,
brand_name="",
)
assert result["estimated_natural_cpm"] is None
assert result["estimated_cp_a3"] is None
assert result["estimated_natural_cp_a3"] is None
assert result["estimated_cp_search"] is None
assert result["estimated_natural_cp_search"] is None
class TestGetVideoListWithA3:
"""Tests for get_video_list_with_a3 function."""
@pytest.mark.asyncio
async def test_all_cached(self):
"""所有视频都有缓存 → 不调 API"""
videos = [
_make_mock_video(
item_id="v1", total_new_a3_cnt=500, total_cost=10000.0, brand_id="b1"
),
_make_mock_video(
item_id="v2", total_new_a3_cnt=300, total_cost=8000.0, brand_id="b2"
),
]
mock_session = AsyncMock()
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
mock_brands.return_value = {"b1": "品牌1", "b2": "品牌2"}
with patch("app.services.video_analysis.call_yuntu_api") as mock_api:
result = await get_video_list_with_a3(mock_session, videos)
mock_api.assert_not_called()
assert len(result) == 2
assert result[0]["item_id"] == "v1"
assert result[0]["total_new_a3_cnt"] == 500
assert result[1]["item_id"] == "v2"
assert result[1]["total_new_a3_cnt"] == 300
@pytest.mark.asyncio
async def test_all_need_api(self):
"""所有视频都需要 API → 并发调用 → 首次即返回正确数据 → gather 后顺序写 DB"""
videos = [
_make_mock_video(
item_id="v1", total_new_a3_cnt=0, total_cost=0.0, brand_id="b1"
),
_make_mock_video(
item_id="v2", total_new_a3_cnt=0, total_cost=0.0, brand_id="b2"
),
]
mock_session = AsyncMock()
mock_update_result = MagicMock()
mock_update_result.rowcount = 1
mock_session.execute.return_value = mock_update_result
api_response = {
"data": {
"a3_increase_cnt": "200",
"ad_a3_increase_cnt": "50",
"natural_a3_increase_cnt": "150",
"cost": 5000,
"ad_cost": 3000,
}
}
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
mock_brands.return_value = {}
with patch("app.services.video_analysis.call_yuntu_api", new_callable=AsyncMock) as mock_api:
mock_api.return_value = api_response
with patch("app.services.video_analysis.get_distinct_configs", new_callable=AsyncMock) as mock_configs:
mock_configs.return_value = [
{"aadvid": "aad1", "auth_token": "tok1"},
{"aadvid": "aad2", "auth_token": "tok2"},
]
with patch("app.services.video_analysis.update_video_a3_metrics", new_callable=AsyncMock) as mock_update:
mock_update.return_value = True
result = await get_video_list_with_a3(mock_session, videos)
assert len(result) == 2
assert mock_api.call_count == 2
# 首次查询即返回正确 API 数据(核心:不依赖 DB 写入成功)
assert result[0]["total_new_a3_cnt"] == 200
assert result[1]["total_new_a3_cnt"] == 200
# 验证两个视频用了不同 config
api_calls = mock_api.call_args_list
tokens = {c.kwargs["auth_token"] for c in api_calls}
assert len(tokens) == 2
# DB 写入在 gather 之后顺序执行
assert mock_update.call_count == 2
update_item_ids = [c.kwargs["item_id"] for c in mock_update.call_args_list]
assert "v1" in update_item_ids
assert "v2" in update_item_ids
@pytest.mark.asyncio
async def test_mixed_cached_and_api(self):
"""混合场景:部分缓存,部分需 API → 只对 API 成功的写 DB"""
videos = [
_make_mock_video(
item_id="v1", total_new_a3_cnt=500, total_cost=10000.0, brand_id="b1"
),
_make_mock_video(
item_id="v2", total_new_a3_cnt=0, total_cost=0.0, brand_id="b2"
),
_make_mock_video(
item_id="v3", total_new_a3_cnt=300, total_cost=5000.0, brand_id="b3"
),
]
mock_session = AsyncMock()
api_response = {
"data": {
"a3_increase_cnt": "200",
"ad_a3_increase_cnt": "50",
"natural_a3_increase_cnt": "150",
"cost": 5000,
"ad_cost": 3000,
}
}
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
mock_brands.return_value = {}
with patch("app.services.video_analysis.call_yuntu_api", new_callable=AsyncMock) as mock_api:
mock_api.return_value = api_response
with patch("app.services.video_analysis.get_distinct_configs", new_callable=AsyncMock) as mock_configs:
mock_configs.return_value = [
{"aadvid": "aad1", "auth_token": "tok1"},
]
with patch("app.services.video_analysis.update_video_a3_metrics", new_callable=AsyncMock) as mock_update:
mock_update.return_value = True
result = await get_video_list_with_a3(mock_session, videos)
# 保持原始排序
assert len(result) == 3
assert result[0]["item_id"] == "v1"
assert result[0]["total_new_a3_cnt"] == 500 # from DB
assert result[1]["item_id"] == "v2"
assert result[1]["total_new_a3_cnt"] == 200 # from API
assert result[2]["item_id"] == "v3"
assert result[2]["total_new_a3_cnt"] == 300 # from DB
# 只有 v2 调了 API
assert mock_api.call_count == 1
# 只对 v2 写回 DB
assert mock_update.call_count == 1
assert mock_update.call_args.kwargs["item_id"] == "v2"
assert mock_update.call_args.kwargs["total_new_a3_cnt"] == 200
@pytest.mark.asyncio
async def test_empty_list(self):
"""空列表 → 返回空"""
mock_session = AsyncMock()
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
mock_brands.return_value = {}
result = await get_video_list_with_a3(mock_session, [])
assert result == []
@pytest.mark.asyncio
async def test_api_failure_fallback(self):
"""API 调用失败 → 降级使用 DB 数据 → 不写回 DB"""
videos = [
_make_mock_video(
item_id="v1", total_new_a3_cnt=0, total_cost=0.0, brand_id="b1"
),
]
mock_session = AsyncMock()
with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands:
mock_brands.return_value = {}
with patch("app.services.video_analysis.call_yuntu_api", new_callable=AsyncMock) as mock_api:
mock_api.side_effect = YuntuAPIError("API Error")
with patch("app.services.video_analysis.get_distinct_configs", new_callable=AsyncMock) as mock_configs:
mock_configs.return_value = [
{"aadvid": "aad1", "auth_token": "tok1"},
]
with patch("app.services.video_analysis.update_video_a3_metrics", new_callable=AsyncMock) as mock_update:
result = await get_video_list_with_a3(mock_session, videos)
# 降级到 DB 数据
assert len(result) == 1
assert result[0]["total_new_a3_cnt"] == 0
# API 失败不应写回 DB
mock_update.assert_not_called()