""" Tests for Video Analysis Service (T-024) """ import pytest from datetime import datetime from unittest.mock import AsyncMock, patch, MagicMock from app.services.video_analysis import ( calculate_cost_metrics, get_video_base_info, get_video_analysis_data, update_video_a3_metrics, get_and_update_video_analysis, ) from app.services.yuntu_api import YuntuAPIError class TestCalculateCostMetrics: """Tests for calculate_cost_metrics function.""" def test_all_metrics_calculated(self): """Test calculation of all cost metrics.""" result = calculate_cost_metrics( cost=10000, natural_play_cnt=40000, a3_increase_cnt=500, natural_a3_increase_cnt=400, after_view_search_uv=1000, total_play_cnt=50000, ) # CPM = 10000 / 50000 * 1000 = 200 assert result["cpm"] == 200.0 # 自然CPM = 10000 / 40000 * 1000 = 250 assert result["natural_cpm"] == 250.0 # CPA3 = 10000 / 500 = 20 assert result["cpa3"] == 20.0 # 自然CPA3 = 10000 / 400 = 25 assert result["natural_cpa3"] == 25.0 # CPsearch = 10000 / 1000 = 10 assert result["cp_search"] == 10.0 # 预估自然看后搜人数 = 40000 / 50000 * 1000 = 800 assert result["estimated_natural_search_uv"] == 800.0 # 自然CPsearch = 10000 / 800 = 12.5 assert result["natural_cp_search"] == 12.5 def test_zero_total_play_cnt(self): """Test with zero total_play_cnt (division by zero).""" result = calculate_cost_metrics( cost=10000, natural_play_cnt=0, a3_increase_cnt=500, natural_a3_increase_cnt=400, after_view_search_uv=1000, total_play_cnt=0, ) assert result["cpm"] is None assert result["natural_cpm"] is None assert result["estimated_natural_search_uv"] is None assert result["natural_cp_search"] is None def test_zero_a3_counts(self): """Test with zero A3 counts.""" result = calculate_cost_metrics( cost=10000, natural_play_cnt=40000, a3_increase_cnt=0, natural_a3_increase_cnt=0, after_view_search_uv=1000, total_play_cnt=50000, ) assert result["cpa3"] is None assert result["natural_cpa3"] is None # 其他指标应该正常计算 assert result["cpm"] == 200.0 def test_zero_search_uv(self): """Test with zero after_view_search_uv.""" result = calculate_cost_metrics( cost=10000, natural_play_cnt=40000, a3_increase_cnt=500, natural_a3_increase_cnt=400, after_view_search_uv=0, total_play_cnt=50000, ) assert result["cp_search"] is None # 当 after_view_search_uv=0 时,预估自然看后搜人数也应为 None(无意义) assert result["estimated_natural_search_uv"] is None assert result["natural_cp_search"] is None def test_all_zeros(self): """Test with all zero values.""" result = calculate_cost_metrics( cost=0, natural_play_cnt=0, a3_increase_cnt=0, natural_a3_increase_cnt=0, after_view_search_uv=0, total_play_cnt=0, ) assert result["cpm"] is None assert result["natural_cpm"] is None assert result["cpa3"] is None assert result["natural_cpa3"] is None assert result["cp_search"] is None assert result["estimated_natural_search_uv"] is None assert result["natural_cp_search"] is None def test_decimal_precision(self): """Test that results are rounded to 2 decimal places.""" result = calculate_cost_metrics( cost=10000, natural_play_cnt=30000, a3_increase_cnt=333, natural_a3_increase_cnt=111, after_view_search_uv=777, total_play_cnt=70000, ) # 验证都是2位小数 assert isinstance(result["cpm"], float) assert len(str(result["cpm"]).split(".")[-1]) <= 2 class TestGetVideoAnalysisData: """Tests for get_video_analysis_data function.""" async def test_success_with_api_data(self): """Test successful data retrieval with API data.""" # Mock database video mock_video = MagicMock() mock_video.item_id = "video_123" mock_video.title = "测试视频" mock_video.video_url = "https://example.com/video" mock_video.star_id = "star_001" mock_video.star_unique_id = "unique_001" mock_video.star_nickname = "测试达人" mock_video.publish_time = datetime(2025, 1, 15) mock_video.industry_name = "母婴" mock_video.industry_id = "20" mock_video.total_play_cnt = 50000 mock_video.natural_play_cnt = 40000 mock_video.heated_play_cnt = 10000 mock_video.after_view_search_uv = 1000 mock_video.return_search_cnt = 50 mock_video.estimated_video_cost = 10000 # Mock session mock_session = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_video mock_session.execute.return_value = mock_result # Mock API response api_response = { "code": 0, "data": { "total_show_cnt": 100000, "natural_show_cnt": 80000, "ad_show_cnt": 20000, "total_play_cnt": 50000, "natural_play_cnt": 40000, "ad_play_cnt": 10000, "effective_play_cnt": 30000, "a3_increase_cnt": 500, "ad_a3_increase_cnt": 100, "natural_a3_increase_cnt": 400, "after_view_search_uv": 1000, "after_view_search_pv": 1500, "brand_search_uv": 200, "product_search_uv": 300, "return_search_cnt": 50, "cost": 10000, "natural_cost": 0, "ad_cost": 10000, }, } with patch( "app.services.video_analysis.fetch_yuntu_analysis" ) as mock_api: mock_api.return_value = api_response result = await get_video_analysis_data(mock_session, "video_123") # 验证基础信息 assert result["base_info"]["item_id"] == "video_123" assert result["base_info"]["title"] == "测试视频" assert result["base_info"]["star_nickname"] == "测试达人" # 验证触达指标 assert result["reach_metrics"]["total_show_cnt"] == 100000 assert result["reach_metrics"]["natural_play_cnt"] == 40000 # 验证A3指标 assert result["a3_metrics"]["a3_increase_cnt"] == 500 assert result["a3_metrics"]["natural_a3_increase_cnt"] == 400 # 验证搜索指标 assert result["search_metrics"]["after_view_search_uv"] == 1000 # 验证费用指标 assert result["cost_metrics_raw"]["cost"] == 10000 # 验证计算指标 assert result["cost_metrics_calculated"]["cpm"] is not None assert result["cost_metrics_calculated"]["cpa3"] is not None async def test_video_not_found(self): """Test error when video is not found.""" mock_session = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = None mock_session.execute.return_value = mock_result with pytest.raises(ValueError) as exc_info: await get_video_analysis_data(mock_session, "nonexistent") assert "not found" in str(exc_info.value).lower() async def test_fallback_on_api_failure(self): """Test fallback to database data when API fails.""" # Mock database video mock_video = MagicMock() mock_video.item_id = "video_123" mock_video.title = "测试视频" mock_video.video_url = None mock_video.star_id = "star_001" mock_video.star_unique_id = "unique_001" mock_video.star_nickname = "测试达人" mock_video.publish_time = datetime(2025, 1, 15) mock_video.industry_name = "母婴" mock_video.industry_id = "20" mock_video.total_play_cnt = 50000 mock_video.natural_play_cnt = 40000 mock_video.heated_play_cnt = 10000 mock_video.after_view_search_uv = 1000 mock_video.return_search_cnt = 50 mock_video.estimated_video_cost = 10000 # Mock session mock_session = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_video mock_session.execute.return_value = mock_result with patch( "app.services.video_analysis.fetch_yuntu_analysis" ) as mock_api: mock_api.side_effect = YuntuAPIError("API Error") result = await get_video_analysis_data(mock_session, "video_123") # 应该使用数据库数据 assert result["reach_metrics"]["total_play_cnt"] == 50000 assert result["reach_metrics"]["natural_play_cnt"] == 40000 assert result["search_metrics"]["after_view_search_uv"] == 1000 async def test_null_publish_time(self): """Test handling of null publish_time.""" mock_video = MagicMock() mock_video.item_id = "video_123" mock_video.title = "测试视频" mock_video.video_url = None mock_video.star_id = "star_001" mock_video.star_unique_id = "unique_001" mock_video.star_nickname = "测试达人" mock_video.publish_time = None # NULL mock_video.industry_name = None mock_video.industry_id = None mock_video.total_play_cnt = 0 mock_video.natural_play_cnt = 0 mock_video.heated_play_cnt = 0 mock_video.after_view_search_uv = 0 mock_video.return_search_cnt = 0 mock_video.estimated_video_cost = 0 mock_session = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = mock_video mock_session.execute.return_value = mock_result with patch( "app.services.video_analysis.fetch_yuntu_analysis" ) as mock_api: mock_api.return_value = {"code": 0, "data": {}} result = await get_video_analysis_data(mock_session, "video_123") assert result["base_info"]["publish_time"] is None class TestUpdateVideoA3Metrics: """Tests for update_video_a3_metrics function (T-025).""" async def test_update_success(self): """Test successful A3 metrics update.""" mock_session = AsyncMock() mock_result = MagicMock() mock_result.rowcount = 1 mock_session.execute.return_value = mock_result result = await update_video_a3_metrics( session=mock_session, item_id="video_123", total_new_a3_cnt=500, heated_new_a3_cnt=100, natural_new_a3_cnt=400, total_cost=10000.0, ) assert result is True mock_session.commit.assert_called_once() async def test_update_video_not_found(self): """Test update when video not found.""" mock_session = AsyncMock() mock_result = MagicMock() mock_result.rowcount = 0 mock_session.execute.return_value = mock_result result = await update_video_a3_metrics( session=mock_session, item_id="nonexistent", total_new_a3_cnt=500, heated_new_a3_cnt=100, natural_new_a3_cnt=400, total_cost=10000.0, ) assert result is False async def test_update_database_error(self): """Test update with database error.""" mock_session = AsyncMock() mock_session.execute.side_effect = Exception("Database error") result = await update_video_a3_metrics( session=mock_session, item_id="video_123", total_new_a3_cnt=500, heated_new_a3_cnt=100, natural_new_a3_cnt=400, total_cost=10000.0, ) assert result is False mock_session.rollback.assert_called_once() class TestGetAndUpdateVideoAnalysis: """Tests for get_and_update_video_analysis function (T-024 + T-025).""" async def test_get_and_update_success(self): """Test successful get and update.""" # Mock database video mock_video = MagicMock() mock_video.item_id = "video_123" mock_video.title = "测试视频" mock_video.video_url = None mock_video.star_id = "star_001" mock_video.star_unique_id = "unique_001" mock_video.star_nickname = "测试达人" mock_video.publish_time = datetime(2025, 1, 15) mock_video.industry_name = "母婴" mock_video.industry_id = "20" mock_video.total_play_cnt = 50000 mock_video.natural_play_cnt = 40000 mock_video.heated_play_cnt = 10000 mock_video.after_view_search_uv = 1000 mock_video.return_search_cnt = 50 mock_video.estimated_video_cost = 10000 # Mock session mock_session = AsyncMock() mock_select_result = MagicMock() mock_select_result.scalar_one_or_none.return_value = mock_video mock_update_result = MagicMock() mock_update_result.rowcount = 1 # 根据不同的SQL语句返回不同的结果 async def mock_execute(stmt): # 简单判断:如果是 SELECT 返回视频,如果是 UPDATE 返回更新结果 stmt_str = str(stmt) if "SELECT" in stmt_str.upper(): return mock_select_result return mock_update_result mock_session.execute.side_effect = mock_execute with patch( "app.services.video_analysis.fetch_yuntu_analysis" ) as mock_api: mock_api.return_value = { "code": 0, "data": { "a3_increase_cnt": 500, "ad_a3_increase_cnt": 100, "natural_a3_increase_cnt": 400, "cost": 10000, }, } result = await get_and_update_video_analysis(mock_session, "video_123") # 验证返回数据 assert result["a3_metrics"]["a3_increase_cnt"] == 500 # 验证数据库更新被调用 mock_session.commit.assert_called()