""" Tests for Video Analysis Service (T-024) 覆盖: - calculate_cost_metrics 计算 - _needs_api_call 缓存判断 - get_video_analysis_data 详情页(缓存命中 / API 调用 / API 失败降级) - update_video_a3_metrics(含 heated_cost) - get_video_list_with_a3 列表页(混合缓存 + 并发 API) """ import pytest from datetime import datetime from unittest.mock import AsyncMock, patch, MagicMock from app.services.video_analysis import ( _build_video_list_item, _needs_api_call, calculate_cost_metrics, get_video_analysis_data, get_video_list_with_a3, update_video_a3_metrics, ) from app.services.yuntu_api import YuntuAPIError def _make_mock_video(**overrides): """创建标准 mock video 对象,带合理默认值。""" defaults = { "item_id": "video_123", "title": "测试视频", "video_url": "https://example.com/video", "vid": "vid_123", "star_id": "star_001", "star_unique_id": "unique_001", "star_nickname": "测试达人", "star_uid": "uid_001", "star_fans_cnt": 100000, "star_mcn": "MCN1", "publish_time": datetime(2025, 1, 15), "create_date": datetime(2025, 1, 15), "industry_name": "母婴", "industry_id": "20", "brand_id": "brand_001", "hot_type": "爆款", "viral_type": "爆款", "is_hot": True, "has_cart": False, "total_play_cnt": 50000, "natural_play_cnt": 40000, "heated_play_cnt": 10000, "total_interaction_cnt": 5000, "total_interact": 5000, "natural_interaction_cnt": 3000, "heated_interaction_cnt": 2000, "digg_cnt": 3000, "like_cnt": 3000, "share_cnt": 1000, "comment_cnt": 1000, "play_over_cnt": 20000, "play_over_rate": 0.4, "after_view_search_uv": 1000, "after_view_search_cnt": 1200, "after_view_search_rate": 0.02, "back_search_cnt": 50, "back_search_uv": 50, "return_search_cnt": 50, "new_a3_rate": 0.05, "total_new_a3_cnt": 0, "heated_new_a3_cnt": 0, "natural_new_a3_cnt": 0, "total_cost": 0.0, "heated_cost": 0.0, "star_task_cost": 0.0, "search_cost": 0.0, "ad_hot_roi": 0.0, "estimated_video_cost": 10000.0, "order_id": None, "content_type": None, "industry_tags": None, "ad_hot_type": None, "trend": None, "trend_daily": None, "trend_total": None, "component_metric_list": None, "key_word_after_search_infos": None, "index_map": None, "search_keywords": None, "keywords": None, "price_under_20s": None, "price_20_60s": None, "price_over_60s": None, "video_duration": None, "data_date": None, "created_at": None, "updated_at": None, } defaults.update(overrides) mock = MagicMock() for k, v in defaults.items(): setattr(mock, k, v) return mock class TestCalculateCostMetrics: """Tests for calculate_cost_metrics function.""" def test_all_metrics_calculated(self): result = calculate_cost_metrics( cost=10000, natural_play_cnt=40000, a3_increase_cnt=500, natural_a3_increase_cnt=400, after_view_search_uv=1000, total_play_cnt=50000, ) assert result["cpm"] == 200.0 assert result["natural_cpm"] == 250.0 assert result["cpa3"] == 20.0 assert result["natural_cpa3"] == 25.0 assert result["cp_search"] == 10.0 assert result["estimated_natural_search_uv"] == 800.0 assert result["natural_cp_search"] == 12.5 def test_zero_total_play_cnt(self): result = calculate_cost_metrics( cost=10000, natural_play_cnt=0, a3_increase_cnt=500, natural_a3_increase_cnt=400, after_view_search_uv=1000, total_play_cnt=0, ) assert result["cpm"] is None assert result["natural_cpm"] is None assert result["estimated_natural_search_uv"] is None assert result["natural_cp_search"] is None def test_zero_a3_counts(self): result = calculate_cost_metrics( cost=10000, natural_play_cnt=40000, a3_increase_cnt=0, natural_a3_increase_cnt=0, after_view_search_uv=1000, total_play_cnt=50000, ) assert result["cpa3"] is None assert result["natural_cpa3"] is None assert result["cpm"] == 200.0 def test_zero_search_uv(self): result = calculate_cost_metrics( cost=10000, natural_play_cnt=40000, a3_increase_cnt=500, natural_a3_increase_cnt=400, after_view_search_uv=0, total_play_cnt=50000, ) assert result["cp_search"] is None assert result["estimated_natural_search_uv"] is None assert result["natural_cp_search"] is None def test_all_zeros(self): result = calculate_cost_metrics( cost=0, natural_play_cnt=0, a3_increase_cnt=0, natural_a3_increase_cnt=0, after_view_search_uv=0, total_play_cnt=0, ) assert result["cpm"] is None assert result["natural_cpm"] is None assert result["cpa3"] is None assert result["natural_cpa3"] is None assert result["cp_search"] is None assert result["estimated_natural_search_uv"] is None assert result["natural_cp_search"] is None def test_decimal_precision(self): result = calculate_cost_metrics( cost=10000, natural_play_cnt=30000, a3_increase_cnt=333, natural_a3_increase_cnt=111, after_view_search_uv=777, total_play_cnt=70000, ) assert isinstance(result["cpm"], float) assert len(str(result["cpm"]).split(".")[-1]) <= 2 class TestNeedsApiCall: """Tests for _needs_api_call helper.""" def test_needs_call_when_no_data(self): """A3=0 且 cost=0 → 需要调 API""" video = _make_mock_video(total_new_a3_cnt=0, total_cost=0.0) assert _needs_api_call(video) is True def test_needs_call_when_none_values(self): """A3=None 且 cost=None → 需要调 API""" video = _make_mock_video(total_new_a3_cnt=None, total_cost=None) assert _needs_api_call(video) is True def test_no_call_when_a3_exists(self): """有 A3 数据 → 不需要调 API""" video = _make_mock_video(total_new_a3_cnt=500, total_cost=0.0) assert _needs_api_call(video) is False def test_no_call_when_cost_exists(self): """有 cost 数据 → 不需要调 API""" video = _make_mock_video(total_new_a3_cnt=0, total_cost=10000.0) assert _needs_api_call(video) is False def test_no_call_when_both_exist(self): """A3 和 cost 都有 → 不需要调 API""" video = _make_mock_video(total_new_a3_cnt=500, total_cost=10000.0) assert _needs_api_call(video) is False class TestGetVideoAnalysisData: """Tests for get_video_analysis_data function.""" @pytest.mark.asyncio async def test_uses_db_when_cached(self): """数据库已有 A3/Cost → 直接使用,不调 API""" mock_video = _make_mock_video( total_new_a3_cnt=500, heated_new_a3_cnt=100, natural_new_a3_cnt=400, total_cost=10000.0, heated_cost=5000.0, ) mock_session = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_video mock_session.execute.return_value = mock_result with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands: mock_brands.return_value = {"brand_001": "品牌A"} with patch("app.services.video_analysis.fetch_yuntu_analysis") as mock_api: result = await get_video_analysis_data(mock_session, "video_123") # API 不应被调用 mock_api.assert_not_called() # 验证使用了数据库数据 assert result["a3_metrics"]["total_new_a3_cnt"] == 500 assert result["a3_metrics"]["heated_new_a3_cnt"] == 100 assert result["a3_metrics"]["natural_new_a3_cnt"] == 400 assert result["cost_metrics"]["total_cost"] == 10000.0 assert result["cost_metrics"]["heated_cost"] == 5000.0 @pytest.mark.asyncio async def test_calls_api_and_saves_to_db(self): """数据库无数据 → 调 API → 写回 DB""" mock_video = _make_mock_video( total_new_a3_cnt=0, total_cost=0.0, heated_cost=0.0, ) mock_session = AsyncMock() mock_select_result = MagicMock() mock_select_result.scalar_one_or_none.return_value = mock_video mock_update_result = MagicMock() mock_update_result.rowcount = 1 call_count = [0] async def mock_execute(stmt): stmt_str = str(stmt) if "SELECT" in stmt_str.upper() or call_count[0] == 0: call_count[0] += 1 return mock_select_result return mock_update_result mock_session.execute.side_effect = mock_execute api_response = { "code": 0, "data": { "a3_increase_cnt": "500", "ad_a3_increase_cnt": "100", "natural_a3_increase_cnt": "400", "cost": 15000, "ad_cost": 8000, "natural_cost": 0, }, } with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands: mock_brands.return_value = {"brand_001": "品牌A"} with patch("app.services.video_analysis.fetch_yuntu_analysis") as mock_api: mock_api.return_value = api_response result = await get_video_analysis_data(mock_session, "video_123") # API 应被调用 mock_api.assert_called_once_with( item_id="video_123", publish_time=datetime(2025, 1, 15), industry_id="20", ) # 验证 A3 数据 assert result["a3_metrics"]["total_new_a3_cnt"] == 500 assert result["a3_metrics"]["heated_new_a3_cnt"] == 100 assert result["a3_metrics"]["natural_new_a3_cnt"] == 400 # 验证 cost assert result["cost_metrics"]["total_cost"] == 15000 assert result["cost_metrics"]["heated_cost"] == 8000 # 验证计算指标存在 assert "estimated_cpm" in result["calculated_metrics"] assert "estimated_natural_cpm" in result["calculated_metrics"] @pytest.mark.asyncio async def test_video_not_found(self): mock_session = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = None mock_session.execute.return_value = mock_result with pytest.raises(ValueError) as exc_info: await get_video_analysis_data(mock_session, "nonexistent") assert "not found" in str(exc_info.value).lower() @pytest.mark.asyncio async def test_fallback_on_api_failure(self): """API 失败 → 降级使用数据库数据""" mock_video = _make_mock_video( total_new_a3_cnt=0, heated_new_a3_cnt=0, natural_new_a3_cnt=0, total_cost=0.0, heated_cost=0.0, ) mock_session = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_video mock_session.execute.return_value = mock_result with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands: mock_brands.return_value = {} with patch("app.services.video_analysis.fetch_yuntu_analysis") as mock_api: mock_api.side_effect = YuntuAPIError("API Error") result = await get_video_analysis_data(mock_session, "video_123") # 降级使用 DB 数据(都是 0) assert result["a3_metrics"]["total_new_a3_cnt"] == 0 assert result["cost_metrics"]["total_cost"] == 0.0 # 基础信息仍然正常 assert result["base_info"]["vid"] == "video_123" assert result["reach_metrics"]["total_play_cnt"] == 50000 @pytest.mark.asyncio async def test_null_publish_time(self): mock_video = _make_mock_video( publish_time=None, create_date=None, total_new_a3_cnt=0, total_cost=0.0, total_play_cnt=0, natural_play_cnt=0, heated_play_cnt=0, after_view_search_uv=0, ) mock_session = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_video mock_session.execute.return_value = mock_result with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands: mock_brands.return_value = {} with patch("app.services.video_analysis.fetch_yuntu_analysis") as mock_api: mock_api.return_value = {"code": 0, "data": {}} result = await get_video_analysis_data(mock_session, "video_123") assert result["base_info"]["create_date"] is None @pytest.mark.asyncio async def test_response_structure(self): """验证返回数据包含所有 6 大类""" mock_video = _make_mock_video(total_new_a3_cnt=500, total_cost=10000.0) mock_session = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_video mock_session.execute.return_value = mock_result with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands: mock_brands.return_value = {} result = await get_video_analysis_data(mock_session, "video_123") assert "base_info" in result assert "reach_metrics" in result assert "a3_metrics" in result assert "search_metrics" in result assert "cost_metrics" in result assert "calculated_metrics" in result # base_info 关键字段 assert "star_nickname" in result["base_info"] assert "vid" in result["base_info"] assert "brand_name" in result["base_info"] # reach_metrics 关键字段 assert "total_play_cnt" in result["reach_metrics"] assert "natural_play_cnt" in result["reach_metrics"] class TestUpdateVideoA3Metrics: """Tests for update_video_a3_metrics function (T-025).""" @pytest.mark.asyncio async def test_update_success(self): mock_session = AsyncMock() mock_result = MagicMock() mock_result.rowcount = 1 mock_session.execute.return_value = mock_result result = await update_video_a3_metrics( session=mock_session, item_id="video_123", total_new_a3_cnt=500, heated_new_a3_cnt=100, natural_new_a3_cnt=400, total_cost=10000.0, ) assert result is True mock_session.commit.assert_called_once() @pytest.mark.asyncio async def test_update_with_heated_cost(self): """验证 heated_cost 参数正常传递""" mock_session = AsyncMock() mock_result = MagicMock() mock_result.rowcount = 1 mock_session.execute.return_value = mock_result result = await update_video_a3_metrics( session=mock_session, item_id="video_123", total_new_a3_cnt=500, heated_new_a3_cnt=100, natural_new_a3_cnt=400, total_cost=15000.0, heated_cost=8000.0, ) assert result is True mock_session.commit.assert_called_once() @pytest.mark.asyncio async def test_update_video_not_found(self): mock_session = AsyncMock() mock_result = MagicMock() mock_result.rowcount = 0 mock_session.execute.return_value = mock_result result = await update_video_a3_metrics( session=mock_session, item_id="nonexistent", total_new_a3_cnt=500, heated_new_a3_cnt=100, natural_new_a3_cnt=400, total_cost=10000.0, ) assert result is False @pytest.mark.asyncio async def test_update_database_error(self): mock_session = AsyncMock() mock_session.execute.side_effect = Exception("Database error") result = await update_video_a3_metrics( session=mock_session, item_id="video_123", total_new_a3_cnt=500, heated_new_a3_cnt=100, natural_new_a3_cnt=400, total_cost=10000.0, ) assert result is False mock_session.rollback.assert_called_once() class TestBuildVideoListItem: """Tests for _build_video_list_item helper.""" def test_build_item_with_full_data(self): video = _make_mock_video( total_play_cnt=50000, natural_play_cnt=40000, after_view_search_uv=1000, estimated_video_cost=10000.0, ) result = _build_video_list_item( video=video, a3_increase_cnt=500, ad_a3_increase_cnt=100, natural_a3_increase_cnt=400, api_cost=15000.0, brand_name="品牌A", ) assert result["item_id"] == "video_123" assert result["brand_name"] == "品牌A" assert result["total_new_a3_cnt"] == 500 assert result["estimated_natural_cpm"] is not None assert result["estimated_cp_a3"] == 30.0 # 15000/500 def test_build_item_zero_division(self): """分母为 0 时应返回 None""" video = _make_mock_video( total_play_cnt=0, natural_play_cnt=0, after_view_search_uv=0, estimated_video_cost=0.0, ) result = _build_video_list_item( video=video, a3_increase_cnt=0, ad_a3_increase_cnt=0, natural_a3_increase_cnt=0, api_cost=0.0, brand_name="", ) assert result["estimated_natural_cpm"] is None assert result["estimated_cp_a3"] is None assert result["estimated_natural_cp_a3"] is None assert result["estimated_cp_search"] is None assert result["estimated_natural_cp_search"] is None class TestGetVideoListWithA3: """Tests for get_video_list_with_a3 function.""" @pytest.mark.asyncio async def test_all_cached(self): """所有视频都有缓存 → 不调 API""" videos = [ _make_mock_video( item_id="v1", total_new_a3_cnt=500, total_cost=10000.0, brand_id="b1" ), _make_mock_video( item_id="v2", total_new_a3_cnt=300, total_cost=8000.0, brand_id="b2" ), ] mock_session = AsyncMock() with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands: mock_brands.return_value = {"b1": "品牌1", "b2": "品牌2"} with patch("app.services.video_analysis.call_yuntu_api") as mock_api: result = await get_video_list_with_a3(mock_session, videos) mock_api.assert_not_called() assert len(result) == 2 assert result[0]["item_id"] == "v1" assert result[0]["total_new_a3_cnt"] == 500 assert result[1]["item_id"] == "v2" assert result[1]["total_new_a3_cnt"] == 300 @pytest.mark.asyncio async def test_all_need_api(self): """所有视频都需要 API → 并发调用 → 首次即返回正确数据 → gather 后顺序写 DB""" videos = [ _make_mock_video( item_id="v1", total_new_a3_cnt=0, total_cost=0.0, brand_id="b1" ), _make_mock_video( item_id="v2", total_new_a3_cnt=0, total_cost=0.0, brand_id="b2" ), ] mock_session = AsyncMock() mock_update_result = MagicMock() mock_update_result.rowcount = 1 mock_session.execute.return_value = mock_update_result api_response = { "data": { "a3_increase_cnt": "200", "ad_a3_increase_cnt": "50", "natural_a3_increase_cnt": "150", "cost": 5000, "ad_cost": 3000, } } with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands: mock_brands.return_value = {} with patch("app.services.video_analysis.call_yuntu_api", new_callable=AsyncMock) as mock_api: mock_api.return_value = api_response with patch("app.services.video_analysis.get_distinct_configs", new_callable=AsyncMock) as mock_configs: mock_configs.return_value = [ {"aadvid": "aad1", "auth_token": "tok1"}, {"aadvid": "aad2", "auth_token": "tok2"}, ] with patch("app.services.video_analysis.update_video_a3_metrics", new_callable=AsyncMock) as mock_update: mock_update.return_value = True result = await get_video_list_with_a3(mock_session, videos) assert len(result) == 2 assert mock_api.call_count == 2 # 首次查询即返回正确 API 数据(核心:不依赖 DB 写入成功) assert result[0]["total_new_a3_cnt"] == 200 assert result[1]["total_new_a3_cnt"] == 200 # 验证两个视频用了不同 config api_calls = mock_api.call_args_list tokens = {c.kwargs["auth_token"] for c in api_calls} assert len(tokens) == 2 # DB 写入在 gather 之后顺序执行 assert mock_update.call_count == 2 update_item_ids = [c.kwargs["item_id"] for c in mock_update.call_args_list] assert "v1" in update_item_ids assert "v2" in update_item_ids @pytest.mark.asyncio async def test_mixed_cached_and_api(self): """混合场景:部分缓存,部分需 API → 只对 API 成功的写 DB""" videos = [ _make_mock_video( item_id="v1", total_new_a3_cnt=500, total_cost=10000.0, brand_id="b1" ), _make_mock_video( item_id="v2", total_new_a3_cnt=0, total_cost=0.0, brand_id="b2" ), _make_mock_video( item_id="v3", total_new_a3_cnt=300, total_cost=5000.0, brand_id="b3" ), ] mock_session = AsyncMock() api_response = { "data": { "a3_increase_cnt": "200", "ad_a3_increase_cnt": "50", "natural_a3_increase_cnt": "150", "cost": 5000, "ad_cost": 3000, } } with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands: mock_brands.return_value = {} with patch("app.services.video_analysis.call_yuntu_api", new_callable=AsyncMock) as mock_api: mock_api.return_value = api_response with patch("app.services.video_analysis.get_distinct_configs", new_callable=AsyncMock) as mock_configs: mock_configs.return_value = [ {"aadvid": "aad1", "auth_token": "tok1"}, ] with patch("app.services.video_analysis.update_video_a3_metrics", new_callable=AsyncMock) as mock_update: mock_update.return_value = True result = await get_video_list_with_a3(mock_session, videos) # 保持原始排序 assert len(result) == 3 assert result[0]["item_id"] == "v1" assert result[0]["total_new_a3_cnt"] == 500 # from DB assert result[1]["item_id"] == "v2" assert result[1]["total_new_a3_cnt"] == 200 # from API assert result[2]["item_id"] == "v3" assert result[2]["total_new_a3_cnt"] == 300 # from DB # 只有 v2 调了 API assert mock_api.call_count == 1 # 只对 v2 写回 DB assert mock_update.call_count == 1 assert mock_update.call_args.kwargs["item_id"] == "v2" assert mock_update.call_args.kwargs["total_new_a3_cnt"] == 200 @pytest.mark.asyncio async def test_empty_list(self): """空列表 → 返回空""" mock_session = AsyncMock() with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands: mock_brands.return_value = {} result = await get_video_list_with_a3(mock_session, []) assert result == [] @pytest.mark.asyncio async def test_api_failure_fallback(self): """API 调用失败 → 降级使用 DB 数据 → 不写回 DB""" videos = [ _make_mock_video( item_id="v1", total_new_a3_cnt=0, total_cost=0.0, brand_id="b1" ), ] mock_session = AsyncMock() with patch("app.services.brand_api.get_brand_names", new_callable=AsyncMock) as mock_brands: mock_brands.return_value = {} with patch("app.services.video_analysis.call_yuntu_api", new_callable=AsyncMock) as mock_api: mock_api.side_effect = YuntuAPIError("API Error") with patch("app.services.video_analysis.get_distinct_configs", new_callable=AsyncMock) as mock_configs: mock_configs.return_value = [ {"aadvid": "aad1", "auth_token": "tok1"}, ] with patch("app.services.video_analysis.update_video_a3_metrics", new_callable=AsyncMock) as mock_update: result = await get_video_list_with_a3(mock_session, videos) # 降级到 DB 数据 assert len(result) == 1 assert result[0]["total_new_a3_cnt"] == 0 # API 失败不应写回 DB mock_update.assert_not_called()