""" 审核任务 API 测试 (TDD - 红色阶段) 测试覆盖: 创建任务、查询任务、更新任务状态 """ import pytest from httpx import AsyncClient from app.schemas.review import TaskResponse, TaskListResponse, TaskStatus class TestCreateTask: """创建审核任务""" @pytest.mark.asyncio async def test_create_task_returns_201(self, client: AsyncClient, tenant_id: str, video_url: str, creator_id: str): """创建任务返回 201""" response = await client.post( "/api/v1/tasks", headers={"X-Tenant-ID": tenant_id}, json={ "platform": "douyin", "creator_id": creator_id, "video_url": video_url, } ) assert response.status_code == 201 @pytest.mark.asyncio async def test_create_task_returns_task_id(self, client: AsyncClient, tenant_id: str, video_url: str, creator_id: str): """创建任务返回任务 ID""" response = await client.post( "/api/v1/tasks", headers={"X-Tenant-ID": tenant_id}, json={ "platform": "douyin", "creator_id": creator_id, "video_url": video_url, } ) data = response.json() parsed = TaskResponse.model_validate(data) assert parsed.task_id @pytest.mark.asyncio async def test_create_task_initial_status_pending(self, client: AsyncClient, tenant_id: str, video_url: str, creator_id: str): """创建任务初始状态为 pending""" response = await client.post( "/api/v1/tasks", headers={"X-Tenant-ID": tenant_id}, json={ "platform": "douyin", "creator_id": creator_id, "video_url": video_url, } ) data = response.json() parsed = TaskResponse.model_validate(data) assert parsed.status == TaskStatus.PENDING @pytest.mark.asyncio async def test_create_task_validates_platform(self, client: AsyncClient, tenant_id: str, video_url: str, creator_id: str): """创建任务校验平台参数""" response = await client.post( "/api/v1/tasks", headers={"X-Tenant-ID": tenant_id}, json={ "platform": "invalid_platform", "creator_id": creator_id, "video_url": video_url, } ) assert response.status_code == 422 @pytest.mark.asyncio async def test_create_task_validates_video_url(self, client: AsyncClient, tenant_id: str, creator_id: str): """创建任务校验视频 URL""" response = await client.post( "/api/v1/tasks", headers={"X-Tenant-ID": tenant_id}, json={ "video_url": "not-a-url", "platform": "douyin", "creator_id": creator_id, } ) assert response.status_code == 422 @pytest.mark.asyncio async def test_create_task_allows_missing_video(self, client: AsyncClient, tenant_id: str, creator_id: str): """创建任务允许暂不上传视频""" response = await client.post( "/api/v1/tasks", headers={"X-Tenant-ID": tenant_id}, json={ "platform": "douyin", "creator_id": creator_id, } ) data = response.json() parsed = TaskResponse.model_validate(data) assert parsed.has_video is False @pytest.mark.asyncio async def test_create_task_with_script_content(self, client: AsyncClient, tenant_id: str, creator_id: str): """创建任务可携带脚本内容""" response = await client.post( "/api/v1/tasks", headers={"X-Tenant-ID": tenant_id}, json={ "platform": "douyin", "creator_id": creator_id, "script_content": "脚本内容示例", } ) data = response.json() parsed = TaskResponse.model_validate(data) assert parsed.has_script is True assert parsed.script_content == "脚本内容示例" class TestGetTask: """查询审核任务""" @pytest.mark.asyncio async def test_get_task_returns_200(self, client: AsyncClient, tenant_id: str, video_url: str, creator_id: str): """查询存在的任务返回 200""" headers = {"X-Tenant-ID": tenant_id} # 先创建任务 create_resp = await client.post( "/api/v1/tasks", headers=headers, json={ "platform": "douyin", "creator_id": creator_id, "video_url": video_url, } ) task_id = create_resp.json()["task_id"] # 查询任务 response = await client.get(f"/api/v1/tasks/{task_id}", headers=headers) assert response.status_code == 200 @pytest.mark.asyncio async def test_get_task_returns_task_details(self, client: AsyncClient, tenant_id: str, video_url: str, creator_id: str): """查询任务返回完整信息""" headers = {"X-Tenant-ID": tenant_id} create_resp = await client.post( "/api/v1/tasks", headers=headers, json={ "video_url": video_url, "platform": "douyin", "creator_id": creator_id, } ) task_id = create_resp.json()["task_id"] response = await client.get(f"/api/v1/tasks/{task_id}", headers=headers) data = response.json() parsed = TaskResponse.model_validate(data) assert parsed.task_id == task_id assert parsed.video_url == video_url assert parsed.platform.value == "douyin" assert parsed.creator_id == creator_id assert parsed.has_video is True assert parsed.created_at @pytest.mark.asyncio async def test_get_nonexistent_task_returns_404(self, client: AsyncClient, tenant_id: str): """查询不存在的任务返回 404""" response = await client.get( "/api/v1/tasks/nonexistent-task-id", headers={"X-Tenant-ID": tenant_id}, ) assert response.status_code == 404 class TestListTasks: """任务列表查询""" @pytest.mark.asyncio async def test_list_tasks_returns_200(self, client: AsyncClient, tenant_id: str): """查询任务列表返回 200""" response = await client.get( "/api/v1/tasks", headers={"X-Tenant-ID": tenant_id}, ) assert response.status_code == 200 @pytest.mark.asyncio async def test_list_tasks_returns_array(self, client: AsyncClient, tenant_id: str): """查询任务列表返回数组""" response = await client.get( "/api/v1/tasks", headers={"X-Tenant-ID": tenant_id}, ) data = response.json() parsed = TaskListResponse.model_validate(data) assert isinstance(parsed.items, list) @pytest.mark.asyncio async def test_list_tasks_pagination(self, client: AsyncClient, tenant_id: str): """任务列表支持分页""" response = await client.get( "/api/v1/tasks?page=1&page_size=10", headers={"X-Tenant-ID": tenant_id}, ) data = response.json() parsed = TaskListResponse.model_validate(data) assert parsed.page == 1 assert parsed.page_size == 10 @pytest.mark.asyncio async def test_list_tasks_filter_by_status(self, client: AsyncClient, tenant_id: str, video_url: str, creator_id: str): """任务列表支持按状态筛选""" headers = {"X-Tenant-ID": tenant_id} create_resp = await client.post( "/api/v1/tasks", headers=headers, json={ "video_url": video_url, "platform": "douyin", "creator_id": creator_id, } ) task_id = create_resp.json()["task_id"] response = await client.get("/api/v1/tasks?status=pending", headers=headers) assert response.status_code == 200 data = response.json() parsed = TaskListResponse.model_validate(data) assert any(item.task_id == task_id for item in parsed.items) @pytest.mark.asyncio async def test_list_tasks_filter_by_platform(self, client: AsyncClient, tenant_id: str, video_url: str, creator_id: str): """任务列表支持按平台筛选""" headers = {"X-Tenant-ID": tenant_id} create_resp = await client.post( "/api/v1/tasks", headers=headers, json={ "video_url": video_url, "platform": "douyin", "creator_id": creator_id, } ) task_id = create_resp.json()["task_id"] response = await client.get("/api/v1/tasks?platform=douyin", headers=headers) assert response.status_code == 200 data = response.json() parsed = TaskListResponse.model_validate(data) assert any(item.task_id == task_id for item in parsed.items) class TestUploadTaskAssets: """任务脚本/视频上传""" @pytest.mark.asyncio async def test_upload_script_requires_payload(self, client: AsyncClient, tenant_id: str, creator_id: str): """上传脚本必须提供内容或文件 URL""" headers = {"X-Tenant-ID": tenant_id} create_resp = await client.post( "/api/v1/tasks", headers=headers, json={ "platform": "douyin", "creator_id": creator_id, } ) task_id = create_resp.json()["task_id"] response = await client.post( f"/api/v1/tasks/{task_id}/script", headers=headers, json={}, ) assert response.status_code == 422 @pytest.mark.asyncio async def test_upload_script_updates_task(self, client: AsyncClient, tenant_id: str, creator_id: str): """上传脚本更新任务内容""" headers = {"X-Tenant-ID": tenant_id} create_resp = await client.post( "/api/v1/tasks", headers=headers, json={ "platform": "douyin", "creator_id": creator_id, } ) task_id = create_resp.json()["task_id"] response = await client.post( f"/api/v1/tasks/{task_id}/script", headers=headers, json={"script_content": "更新后的脚本"}, ) data = response.json() parsed = TaskResponse.model_validate(data) assert parsed.has_script is True assert parsed.script_content == "更新后的脚本" @pytest.mark.asyncio async def test_upload_video_updates_task(self, client: AsyncClient, tenant_id: str, creator_id: str, video_url: str): """上传视频更新任务视频 URL""" headers = {"X-Tenant-ID": tenant_id} create_resp = await client.post( "/api/v1/tasks", headers=headers, json={ "platform": "douyin", "creator_id": creator_id, } ) task_id = create_resp.json()["task_id"] response = await client.post( f"/api/v1/tasks/{task_id}/video", headers=headers, json={"video_url": video_url}, ) data = response.json() parsed = TaskResponse.model_validate(data) assert parsed.has_video is True assert parsed.video_url == video_url class TestUpdateTaskStatus: """更新任务状态""" @pytest.mark.asyncio async def test_approve_task_returns_200(self, client: AsyncClient, tenant_id: str, video_url: str, creator_id: str): """通过任务返回 200""" headers = {"X-Tenant-ID": tenant_id} # 创建任务 create_resp = await client.post( "/api/v1/tasks", headers=headers, json={ "video_url": video_url, "platform": "douyin", "creator_id": creator_id, } ) task_id = create_resp.json()["task_id"] # 通过任务 response = await client.post( f"/api/v1/tasks/{task_id}/approve", headers=headers, json={"comment": "审核通过"} ) assert response.status_code == 200 @pytest.mark.asyncio async def test_approve_task_updates_status(self, client: AsyncClient, tenant_id: str, video_url: str, creator_id: str): """通过任务更新状态为 approved""" headers = {"X-Tenant-ID": tenant_id} create_resp = await client.post( "/api/v1/tasks", headers=headers, json={ "video_url": video_url, "platform": "douyin", "creator_id": creator_id, } ) task_id = create_resp.json()["task_id"] await client.post( f"/api/v1/tasks/{task_id}/approve", headers=headers, json={"comment": "审核通过"} ) # 验证状态 get_resp = await client.get(f"/api/v1/tasks/{task_id}", headers=headers) parsed = TaskResponse.model_validate(get_resp.json()) assert parsed.status == TaskStatus.APPROVED @pytest.mark.asyncio async def test_reject_task_returns_200(self, client: AsyncClient, tenant_id: str, video_url: str, creator_id: str): """驳回任务返回 200""" headers = {"X-Tenant-ID": tenant_id} create_resp = await client.post( "/api/v1/tasks", headers=headers, json={ "video_url": video_url, "platform": "douyin", "creator_id": creator_id, } ) task_id = create_resp.json()["task_id"] response = await client.post( f"/api/v1/tasks/{task_id}/reject", headers=headers, json={"reason": "违规内容", "violations": ["forbidden_word"]} ) assert response.status_code == 200 get_resp = await client.get(f"/api/v1/tasks/{task_id}", headers=headers) parsed = TaskResponse.model_validate(get_resp.json()) assert parsed.status == TaskStatus.REJECTED @pytest.mark.asyncio async def test_reject_task_requires_reason(self, client: AsyncClient, tenant_id: str, video_url: str, creator_id: str): """驳回任务必须提供原因""" headers = {"X-Tenant-ID": tenant_id} create_resp = await client.post( "/api/v1/tasks", headers=headers, json={ "video_url": video_url, "platform": "douyin", "creator_id": creator_id, } ) task_id = create_resp.json()["task_id"] response = await client.post( f"/api/v1/tasks/{task_id}/reject", headers=headers, json={} ) assert response.status_code == 422