基于项目需求文档(PRD.md, FeatureSummary.md, DevelopmentPlan.md, UIDesign.md, User_Role_Interfaces.md)编写的 TDD 测试用例。 后端测试 (Python/pytest): - 单元测试: rule_engine, brief_parser, timestamp_alignment, video_auditor, validators - 集成测试: API Brief, Video, Review 端点 - AI 模块测试: ASR, OCR, Logo 检测服务 - 全局 fixtures 和 pytest 配置 前端测试 (TypeScript/Vitest): - 工具函数测试: utils.test.ts - 组件测试: Button, VideoPlayer, ViolationList - Hooks 测试: useVideoAudit, useVideoPlayer, useAppeal - MSW mock handlers 配置 E2E 测试 (Playwright): - 认证流程测试 - 视频上传流程测试 - 视频审核流程测试 - 申诉流程测试 所有测试当前使用 pytest.skip() / it.skip() 作为占位符, 遵循 TDD 红灯阶段 - 等待实现代码后运行。 验收标准覆盖: - ASR WER ≤ 10% - OCR 准确率 ≥ 95% - Logo F1 ≥ 0.85 - 时间戳误差 ≤ 0.5s - 频次统计准确率 ≥ 95% Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
380 lines
14 KiB
Python
380 lines
14 KiB
Python
"""
|
||
视频 API 集成测试
|
||
|
||
TDD 测试用例 - 测试视频上传、审核相关 API 接口
|
||
|
||
接口规范参考:DevelopmentPlan.md 第 7 章
|
||
验收标准参考:FeatureSummary.md F-10~F-18
|
||
"""
|
||
|
||
import pytest
|
||
from typing import Any
|
||
|
||
# 导入待实现的模块(TDD 红灯阶段)
|
||
# from httpx import AsyncClient
|
||
# from app.main import app
|
||
|
||
|
||
class TestVideoUploadAPI:
|
||
"""视频上传 API 测试"""
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_upload_video_success(self) -> None:
|
||
"""测试视频上传成功 - 返回 202 和 video_id"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# # 登录获取 token
|
||
# login_response = await client.post("/api/v1/auth/login", json={
|
||
# "email": "creator@test.com",
|
||
# "password": "password"
|
||
# })
|
||
# token = login_response.json()["access_token"]
|
||
# headers = {"Authorization": f"Bearer {token}"}
|
||
#
|
||
# # 上传视频
|
||
# with open("tests/fixtures/videos/sample_video.mp4", "rb") as f:
|
||
# response = await client.post(
|
||
# "/api/v1/videos/upload",
|
||
# files={"file": ("test.mp4", f, "video/mp4")},
|
||
# data={
|
||
# "task_id": "task_001",
|
||
# "title": "测试视频"
|
||
# },
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# assert response.status_code == 202
|
||
# data = response.json()
|
||
# assert "video_id" in data
|
||
# assert data["status"] == "processing"
|
||
pytest.skip("待实现:视频上传 API")
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_upload_oversized_video_returns_413(self) -> None:
|
||
"""测试超大视频返回 413 - 最大 100MB"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# # 创建超过 100MB 的测试数据
|
||
# oversized_content = b"x" * (101 * 1024 * 1024)
|
||
#
|
||
# response = await client.post(
|
||
# "/api/v1/videos/upload",
|
||
# files={"file": ("large.mp4", oversized_content, "video/mp4")},
|
||
# data={"task_id": "task_001"},
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# assert response.status_code == 413
|
||
# assert "100MB" in response.json()["error"]
|
||
pytest.skip("待实现:超大视频测试")
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.parametrize("mime_type,expected_status", [
|
||
("video/mp4", 202),
|
||
("video/quicktime", 202), # MOV
|
||
("video/x-msvideo", 400), # AVI - 不支持
|
||
("video/x-matroska", 400), # MKV - 不支持
|
||
("application/pdf", 400),
|
||
])
|
||
async def test_upload_video_format_validation(
|
||
self,
|
||
mime_type: str,
|
||
expected_status: int,
|
||
) -> None:
|
||
"""测试视频格式验证 - 仅支持 MP4/MOV"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# response = await client.post(
|
||
# "/api/v1/videos/upload",
|
||
# files={"file": ("test.video", b"content", mime_type)},
|
||
# data={"task_id": "task_001"},
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# assert response.status_code == expected_status
|
||
pytest.skip("待实现:视频格式验证")
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_resumable_upload(self) -> None:
|
||
"""测试断点续传功能"""
|
||
# TODO: 实现断点续传测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# # 初始化上传
|
||
# init_response = await client.post(
|
||
# "/api/v1/videos/upload/init",
|
||
# json={
|
||
# "filename": "large_video.mp4",
|
||
# "file_size": 50 * 1024 * 1024,
|
||
# "task_id": "task_001"
|
||
# },
|
||
# headers=headers
|
||
# )
|
||
# upload_id = init_response.json()["upload_id"]
|
||
#
|
||
# # 上传分片
|
||
# chunk_response = await client.post(
|
||
# f"/api/v1/videos/upload/{upload_id}/chunk",
|
||
# files={"chunk": ("chunk_0", b"x" * 1024 * 1024)},
|
||
# data={"chunk_index": 0},
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# assert chunk_response.status_code == 200
|
||
# assert chunk_response.json()["received_chunks"] == 1
|
||
pytest.skip("待实现:断点续传")
|
||
|
||
|
||
class TestVideoAuditAPI:
|
||
"""视频审核 API 测试"""
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_get_audit_result_success(self) -> None:
|
||
"""测试获取审核结果成功"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# response = await client.get(
|
||
# "/api/v1/videos/video_001/audit",
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# assert response.status_code == 200
|
||
# data = response.json()
|
||
#
|
||
# # 验证审核报告结构
|
||
# assert "report_id" in data
|
||
# assert "video_id" in data
|
||
# assert "status" in data
|
||
# assert "violations" in data
|
||
# assert "brief_compliance" in data
|
||
# assert "processing_time_ms" in data
|
||
pytest.skip("待实现:获取审核结果 API")
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_get_audit_result_processing(self) -> None:
|
||
"""测试获取处理中的审核结果"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# response = await client.get(
|
||
# "/api/v1/videos/video_processing/audit",
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# assert response.status_code == 200
|
||
# data = response.json()
|
||
# assert data["status"] == "processing"
|
||
# assert "progress" in data
|
||
pytest.skip("待实现:处理中状态测试")
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_get_nonexistent_video_returns_404(self) -> None:
|
||
"""测试获取不存在的视频返回 404"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# response = await client.get(
|
||
# "/api/v1/videos/nonexistent_id/audit",
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# assert response.status_code == 404
|
||
pytest.skip("待实现:404 测试")
|
||
|
||
|
||
class TestViolationEvidenceAPI:
|
||
"""违规证据 API 测试"""
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_get_violation_evidence(self) -> None:
|
||
"""测试获取违规证据 - 包含截图和时间戳"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# response = await client.get(
|
||
# "/api/v1/videos/video_001/violations/vio_001/evidence",
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# assert response.status_code == 200
|
||
# data = response.json()
|
||
#
|
||
# assert "violation_id" in data
|
||
# assert "evidence_type" in data
|
||
# assert "screenshot_url" in data
|
||
# assert "timestamp_start" in data
|
||
# assert "timestamp_end" in data
|
||
# assert "content" in data
|
||
pytest.skip("待实现:违规证据 API")
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_evidence_screenshot_accessible(self) -> None:
|
||
"""测试证据截图可访问"""
|
||
# TODO: 实现截图访问测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# # 获取证据
|
||
# evidence_response = await client.get(
|
||
# "/api/v1/videos/video_001/violations/vio_001/evidence",
|
||
# headers=headers
|
||
# )
|
||
# screenshot_url = evidence_response.json()["screenshot_url"]
|
||
#
|
||
# # 访问截图
|
||
# screenshot_response = await client.get(screenshot_url)
|
||
# assert screenshot_response.status_code == 200
|
||
# assert "image" in screenshot_response.headers["content-type"]
|
||
pytest.skip("待实现:截图访问测试")
|
||
|
||
|
||
class TestVideoPreviewAPI:
|
||
"""视频预览 API 测试"""
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_get_video_preview_with_timestamp(self) -> None:
|
||
"""测试带时间戳的视频预览"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# response = await client.get(
|
||
# "/api/v1/videos/video_001/preview",
|
||
# params={"start_ms": 5000, "end_ms": 10000},
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# assert response.status_code == 200
|
||
# data = response.json()
|
||
#
|
||
# assert "preview_url" in data
|
||
# assert "start_ms" in data
|
||
# assert "end_ms" in data
|
||
pytest.skip("待实现:视频预览 API")
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_video_seek_to_violation(self) -> None:
|
||
"""测试视频跳转到违规时间点"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# # 获取违规列表
|
||
# violations_response = await client.get(
|
||
# "/api/v1/videos/video_001/violations",
|
||
# headers=headers
|
||
# )
|
||
# violations = violations_response.json()["violations"]
|
||
#
|
||
# # 每个违规项应包含可跳转的时间戳
|
||
# for violation in violations:
|
||
# assert "timestamp_start" in violation
|
||
# assert violation["timestamp_start"] >= 0
|
||
pytest.skip("待实现:视频跳转")
|
||
|
||
|
||
class TestVideoResubmitAPI:
|
||
"""视频重新提交 API 测试"""
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_resubmit_video_success(self) -> None:
|
||
"""测试重新提交视频"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# response = await client.post(
|
||
# "/api/v1/videos/video_001/resubmit",
|
||
# json={
|
||
# "modification_note": "已修改违规内容",
|
||
# "modified_sections": ["00:05-00:10"]
|
||
# },
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# assert response.status_code == 202
|
||
# data = response.json()
|
||
# assert data["status"] == "processing"
|
||
# assert "new_video_id" in data
|
||
pytest.skip("待实现:重新提交 API")
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_resubmit_without_modification_note(self) -> None:
|
||
"""测试无修改说明的重新提交"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# response = await client.post(
|
||
# "/api/v1/videos/video_001/resubmit",
|
||
# json={},
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# # 应该允许不提供修改说明
|
||
# assert response.status_code in [202, 400]
|
||
pytest.skip("待实现:无修改说明测试")
|
||
|
||
|
||
class TestVideoListAPI:
|
||
"""视频列表 API 测试"""
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_list_videos_with_pagination(self) -> None:
|
||
"""测试视频列表分页"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# response = await client.get(
|
||
# "/api/v1/videos",
|
||
# params={"page": 1, "page_size": 10},
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# assert response.status_code == 200
|
||
# data = response.json()
|
||
#
|
||
# assert "items" in data
|
||
# assert "total" in data
|
||
# assert "page" in data
|
||
# assert "page_size" in data
|
||
# assert len(data["items"]) <= 10
|
||
pytest.skip("待实现:视频列表分页")
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_list_videos_filter_by_status(self) -> None:
|
||
"""测试按状态筛选视频"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# response = await client.get(
|
||
# "/api/v1/videos",
|
||
# params={"status": "pending_review"},
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# assert response.status_code == 200
|
||
# data = response.json()
|
||
#
|
||
# for item in data["items"]:
|
||
# assert item["status"] == "pending_review"
|
||
pytest.skip("待实现:状态筛选")
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.asyncio
|
||
async def test_list_videos_filter_by_task(self) -> None:
|
||
"""测试按任务筛选视频"""
|
||
# TODO: 实现 API 测试
|
||
# async with AsyncClient(app=app, base_url="http://test") as client:
|
||
# response = await client.get(
|
||
# "/api/v1/videos",
|
||
# params={"task_id": "task_001"},
|
||
# headers=headers
|
||
# )
|
||
#
|
||
# assert response.status_code == 200
|
||
# data = response.json()
|
||
#
|
||
# for item in data["items"]:
|
||
# assert item["task_id"] == "task_001"
|
||
pytest.skip("待实现:任务筛选")
|