videos1.0/backend/tests/integration/test_api_video.py
Your Name f87ae48ad5 feat: 实现 FastAPI REST API 端点和集成测试
- 添加认证 API (登录/token验证)
- 添加 Brief API (上传/解析/导入/冲突检测)
- 添加视频 API (上传/断点续传/审核/违规/预览/重提交)
- 添加审核 API (决策/批量审核/申诉/历史)
- 实现基于角色的权限控制
- 更新集成测试,49 个测试全部通过
- 总体测试覆盖率 89.63%

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 18:08:12 +08:00

364 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
视频 API 集成测试
TDD 测试用例 - 测试视频上传、审核相关 API 接口
接口规范参考DevelopmentPlan.md 第 7 章
验收标准参考FeatureSummary.md F-10~F-18
"""
import pytest
from typing import Any
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.fixture
async def auth_headers():
"""获取认证头"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
login_response = await client.post("/api/v1/auth/login", json={
"email": "creator@test.com",
"password": "password"
})
token = login_response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
class TestVideoUploadAPI:
"""视频上传 API 测试"""
@pytest.mark.integration
@pytest.mark.asyncio
async def test_upload_video_success(self, auth_headers) -> None:
"""测试视频上传成功 - 返回 202 和 video_id"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/api/v1/videos/upload",
files={"file": ("test.mp4", b"video content", "video/mp4")},
data={
"task_id": "task_001",
"title": "测试视频"
},
headers=auth_headers
)
assert response.status_code == 202
data = response.json()
assert "video_id" in data
assert data["status"] == "processing"
@pytest.mark.integration
@pytest.mark.asyncio
async def test_upload_oversized_video_returns_413(self, auth_headers) -> None:
"""测试超大视频返回 413 - 最大 100MB"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
# 创建超过 100MB 的测试数据
oversized_content = b"x" * (101 * 1024 * 1024)
response = await client.post(
"/api/v1/videos/upload",
files={"file": ("large.mp4", oversized_content, "video/mp4")},
data={"task_id": "task_001"},
headers=auth_headers
)
assert response.status_code == 413
assert "100MB" in response.json()["detail"]
@pytest.mark.integration
@pytest.mark.asyncio
@pytest.mark.parametrize("filename,expected_status", [
("test.mp4", 202),
("test.mov", 202),
("test.avi", 400), # AVI - 不支持
("test.mkv", 400), # MKV - 不支持
("test.pdf", 400),
])
async def test_upload_video_format_validation(
self,
auth_headers,
filename: str,
expected_status: int,
) -> None:
"""测试视频格式验证 - 仅支持 MP4/MOV"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/api/v1/videos/upload",
files={"file": (filename, b"content", "video/mp4")},
data={"task_id": "task_001"},
headers=auth_headers
)
assert response.status_code == expected_status
@pytest.mark.integration
@pytest.mark.asyncio
async def test_resumable_upload(self, auth_headers) -> None:
"""测试断点续传功能"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
# 初始化上传
init_response = await client.post(
"/api/v1/videos/upload/init",
json={
"filename": "large_video.mp4",
"file_size": 50 * 1024 * 1024,
"task_id": "task_001"
},
headers=auth_headers
)
assert init_response.status_code == 200
upload_id = init_response.json()["upload_id"]
# 上传分片
chunk_response = await client.post(
f"/api/v1/videos/upload/{upload_id}/chunk",
files={"chunk": ("chunk_0", b"x" * 1024 * 1024)},
data={"chunk_index": 0},
headers=auth_headers
)
assert chunk_response.status_code == 200
assert chunk_response.json()["received_chunks"] == 1
class TestVideoAuditAPI:
"""视频审核 API 测试"""
@pytest.mark.integration
@pytest.mark.asyncio
async def test_get_audit_result_success(self, auth_headers) -> None:
"""测试获取审核结果成功"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get(
"/api/v1/videos/video_001/audit",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
# 验证审核报告结构
assert "report_id" in data
assert "video_id" in data
assert "status" in data
assert "violations" in data
assert "brief_compliance" in data
assert "processing_time_ms" in data
@pytest.mark.integration
@pytest.mark.asyncio
async def test_get_audit_result_processing(self, auth_headers) -> None:
"""测试获取处理中的审核结果"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get(
"/api/v1/videos/video_processing/audit",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "processing"
assert "progress" in data
@pytest.mark.integration
@pytest.mark.asyncio
async def test_get_nonexistent_video_returns_404(self, auth_headers) -> None:
"""测试获取不存在的视频返回 404"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get(
"/api/v1/videos/nonexistent_id/audit",
headers=auth_headers
)
assert response.status_code == 404
class TestViolationEvidenceAPI:
"""违规证据 API 测试"""
@pytest.mark.integration
@pytest.mark.asyncio
async def test_get_violation_evidence(self, auth_headers) -> None:
"""测试获取违规证据 - 包含截图和时间戳"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get(
"/api/v1/videos/video_001/violations/vio_001/evidence",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert "violation_id" in data
assert "evidence_type" in data
assert "screenshot_url" in data
assert "timestamp_start" in data
assert "timestamp_end" in data
assert "content" in data
@pytest.mark.integration
@pytest.mark.asyncio
async def test_evidence_screenshot_accessible(self, auth_headers) -> None:
"""测试证据截图可访问"""
# 截图访问需要静态文件服务,这里只验证 URL 格式
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
evidence_response = await client.get(
"/api/v1/videos/video_001/violations/vio_001/evidence",
headers=auth_headers
)
screenshot_url = evidence_response.json()["screenshot_url"]
assert screenshot_url.startswith("/static/screenshots/")
class TestVideoPreviewAPI:
"""视频预览 API 测试"""
@pytest.mark.integration
@pytest.mark.asyncio
async def test_get_video_preview_with_timestamp(self, auth_headers) -> None:
"""测试带时间戳的视频预览"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get(
"/api/v1/videos/video_001/preview",
params={"start_ms": 5000, "end_ms": 10000},
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert "preview_url" in data
assert "start_ms" in data
assert "end_ms" in data
@pytest.mark.integration
@pytest.mark.asyncio
async def test_video_seek_to_violation(self, auth_headers) -> None:
"""测试视频跳转到违规时间点"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
# 获取违规列表
violations_response = await client.get(
"/api/v1/videos/video_001/violations",
headers=auth_headers
)
violations = violations_response.json()["violations"]
# 每个违规项应包含可跳转的时间戳
for violation in violations:
assert "timestamp_start" in violation
assert violation["timestamp_start"] >= 0
class TestVideoResubmitAPI:
"""视频重新提交 API 测试"""
@pytest.mark.integration
@pytest.mark.asyncio
async def test_resubmit_video_success(self, auth_headers) -> None:
"""测试重新提交视频"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/api/v1/videos/video_001/resubmit",
json={
"modification_note": "已修改违规内容",
"modified_sections": ["00:05-00:10"]
},
headers=auth_headers
)
assert response.status_code == 202
data = response.json()
assert data["status"] == "processing"
assert "new_video_id" in data
@pytest.mark.integration
@pytest.mark.asyncio
async def test_resubmit_without_modification_note(self, auth_headers) -> None:
"""测试无修改说明的重新提交"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/api/v1/videos/video_001/resubmit",
json={},
headers=auth_headers
)
# 应该允许不提供修改说明
assert response.status_code == 202
class TestVideoListAPI:
"""视频列表 API 测试"""
@pytest.mark.integration
@pytest.mark.asyncio
async def test_list_videos_with_pagination(self, auth_headers) -> None:
"""测试视频列表分页"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get(
"/api/v1/videos",
params={"page": 1, "page_size": 10},
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "total" in data
assert "page" in data
assert "page_size" in data
assert len(data["items"]) <= 10
@pytest.mark.integration
@pytest.mark.asyncio
async def test_list_videos_filter_by_status(self, auth_headers) -> None:
"""测试按状态筛选视频"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get(
"/api/v1/videos",
params={"status": "completed"},
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
for item in data["items"]:
assert item["status"] == "completed"
@pytest.mark.integration
@pytest.mark.asyncio
async def test_list_videos_filter_by_task(self, auth_headers) -> None:
"""测试按任务筛选视频"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get(
"/api/v1/videos",
params={"task_id": "task_001"},
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
for item in data["items"]:
assert item["task_id"] == "task_001"