kol-insight/backend/tests/test_database.py
zfc cdc364cb2a test: 提升测试覆盖率至 99%
- 新增 brand_api 测试: 覆盖所有异常处理分支
- 新增 main.py 测试: root 和 health 端点
- 新增 logging 测试: setup_logging 和 get_logger
- 新增 KolVideo __repr__ 测试
- 总计 67 个测试全部通过

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 14:44:54 +08:00

193 lines
6.6 KiB
Python

import pytest
from sqlalchemy import select
from app.models import KolVideo
from app.core.logging import setup_logging, get_logger
class TestLogging:
"""Tests for logging module."""
def test_setup_logging(self):
"""Test logging setup."""
# Should not raise
setup_logging()
def test_get_logger(self):
"""Test getting a logger."""
logger = get_logger("test")
assert logger is not None
assert logger.name == "test"
class TestKolVideoModel:
"""Tests for KolVideo model."""
async def test_create_video(self, test_session, sample_video_data):
"""Test creating a video record."""
video = KolVideo(**sample_video_data)
test_session.add(video)
await test_session.commit()
result = await test_session.execute(
select(KolVideo).where(KolVideo.item_id == sample_video_data["item_id"])
)
saved_video = result.scalar_one()
assert saved_video.item_id == sample_video_data["item_id"]
assert saved_video.title == sample_video_data["title"]
assert saved_video.star_id == sample_video_data["star_id"]
async def test_query_by_star_id(self, test_session, sample_video_data):
"""Test querying videos by star_id."""
video = KolVideo(**sample_video_data)
test_session.add(video)
await test_session.commit()
result = await test_session.execute(
select(KolVideo).where(KolVideo.star_id == sample_video_data["star_id"])
)
videos = result.scalars().all()
assert len(videos) == 1
assert videos[0].star_id == sample_video_data["star_id"]
async def test_query_by_star_unique_id(self, test_session, sample_video_data):
"""Test querying videos by star_unique_id."""
video = KolVideo(**sample_video_data)
test_session.add(video)
await test_session.commit()
result = await test_session.execute(
select(KolVideo).where(
KolVideo.star_unique_id == sample_video_data["star_unique_id"]
)
)
videos = result.scalars().all()
assert len(videos) == 1
assert videos[0].star_unique_id == sample_video_data["star_unique_id"]
async def test_query_by_nickname_like(self, test_session, sample_video_data):
"""Test querying videos by nickname using LIKE."""
video = KolVideo(**sample_video_data)
test_session.add(video)
await test_session.commit()
result = await test_session.execute(
select(KolVideo).where(KolVideo.star_nickname.like("%测试%"))
)
videos = result.scalars().all()
assert len(videos) == 1
assert "测试" in videos[0].star_nickname
async def test_batch_query_by_star_ids(self, test_session, sample_video_data):
"""Test batch querying videos by multiple star_ids."""
video1 = KolVideo(**sample_video_data)
video2_data = sample_video_data.copy()
video2_data["item_id"] = "test_item_002"
video2_data["star_id"] = "star_002"
video2 = KolVideo(**video2_data)
test_session.add_all([video1, video2])
await test_session.commit()
star_ids = ["star_001", "star_002"]
result = await test_session.execute(
select(KolVideo).where(KolVideo.star_id.in_(star_ids))
)
videos = result.scalars().all()
assert len(videos) == 2
async def test_video_default_values(self, test_session):
"""Test that default values are set correctly."""
video = KolVideo(
item_id="test_defaults",
star_id="star_test",
star_unique_id="unique_test",
star_nickname="测试默认值",
)
test_session.add(video)
await test_session.commit()
result = await test_session.execute(
select(KolVideo).where(KolVideo.item_id == "test_defaults")
)
saved_video = result.scalar_one()
assert saved_video.natural_play_cnt == 0
assert saved_video.heated_play_cnt == 0
assert saved_video.total_play_cnt == 0
assert saved_video.estimated_video_cost == 0
async def test_video_nullable_fields(self, test_session):
"""Test that nullable fields can be None."""
video = KolVideo(
item_id="test_nullable",
star_id="star_nullable",
star_unique_id="unique_nullable",
star_nickname="测试可空字段",
)
test_session.add(video)
await test_session.commit()
result = await test_session.execute(
select(KolVideo).where(KolVideo.item_id == "test_nullable")
)
saved_video = result.scalar_one()
assert saved_video.title is None
assert saved_video.video_url is None
assert saved_video.brand_id is None
assert saved_video.new_a3_rate is None
async def test_update_video(self, test_session, sample_video_data):
"""Test updating a video record."""
video = KolVideo(**sample_video_data)
test_session.add(video)
await test_session.commit()
result = await test_session.execute(
select(KolVideo).where(KolVideo.item_id == sample_video_data["item_id"])
)
saved_video = result.scalar_one()
saved_video.title = "更新后的标题"
await test_session.commit()
result = await test_session.execute(
select(KolVideo).where(KolVideo.item_id == sample_video_data["item_id"])
)
updated_video = result.scalar_one()
assert updated_video.title == "更新后的标题"
async def test_delete_video(self, test_session, sample_video_data):
"""Test deleting a video record."""
video = KolVideo(**sample_video_data)
test_session.add(video)
await test_session.commit()
result = await test_session.execute(
select(KolVideo).where(KolVideo.item_id == sample_video_data["item_id"])
)
saved_video = result.scalar_one()
await test_session.delete(saved_video)
await test_session.commit()
result = await test_session.execute(
select(KolVideo).where(KolVideo.item_id == sample_video_data["item_id"])
)
assert result.scalar_one_or_none() is None
async def test_video_repr(self, test_session, sample_video_data):
"""Test KolVideo __repr__ method."""
video = KolVideo(**sample_video_data)
test_session.add(video)
await test_session.commit()
repr_str = repr(video)
assert "KolVideo" in repr_str
assert sample_video_data["item_id"] in repr_str
assert sample_video_data["title"] in repr_str