import pytest from sqlalchemy import select from app.models import KolVideo class TestKolVideoModel: """Tests for KolVideo model.""" async def test_create_video(self, test_session, sample_video_data): """Test creating a video record.""" video = KolVideo(**sample_video_data) test_session.add(video) await test_session.commit() result = await test_session.execute( select(KolVideo).where(KolVideo.item_id == sample_video_data["item_id"]) ) saved_video = result.scalar_one() assert saved_video.item_id == sample_video_data["item_id"] assert saved_video.title == sample_video_data["title"] assert saved_video.star_id == sample_video_data["star_id"] async def test_query_by_star_id(self, test_session, sample_video_data): """Test querying videos by star_id.""" video = KolVideo(**sample_video_data) test_session.add(video) await test_session.commit() result = await test_session.execute( select(KolVideo).where(KolVideo.star_id == sample_video_data["star_id"]) ) videos = result.scalars().all() assert len(videos) == 1 assert videos[0].star_id == sample_video_data["star_id"] async def test_query_by_star_unique_id(self, test_session, sample_video_data): """Test querying videos by star_unique_id.""" video = KolVideo(**sample_video_data) test_session.add(video) await test_session.commit() result = await test_session.execute( select(KolVideo).where( KolVideo.star_unique_id == sample_video_data["star_unique_id"] ) ) videos = result.scalars().all() assert len(videos) == 1 assert videos[0].star_unique_id == sample_video_data["star_unique_id"] async def test_query_by_nickname_like(self, test_session, sample_video_data): """Test querying videos by nickname using LIKE.""" video = KolVideo(**sample_video_data) test_session.add(video) await test_session.commit() result = await test_session.execute( select(KolVideo).where(KolVideo.star_nickname.like("%测试%")) ) videos = result.scalars().all() assert len(videos) == 1 assert "测试" in videos[0].star_nickname async def test_batch_query_by_star_ids(self, test_session, sample_video_data): """Test batch querying videos by multiple star_ids.""" video1 = KolVideo(**sample_video_data) video2_data = sample_video_data.copy() video2_data["item_id"] = "test_item_002" video2_data["star_id"] = "star_002" video2 = KolVideo(**video2_data) test_session.add_all([video1, video2]) await test_session.commit() star_ids = ["star_001", "star_002"] result = await test_session.execute( select(KolVideo).where(KolVideo.star_id.in_(star_ids)) ) videos = result.scalars().all() assert len(videos) == 2 async def test_video_default_values(self, test_session): """Test that default values are set correctly.""" video = KolVideo( item_id="test_defaults", star_id="star_test", star_unique_id="unique_test", star_nickname="测试默认值", ) test_session.add(video) await test_session.commit() result = await test_session.execute( select(KolVideo).where(KolVideo.item_id == "test_defaults") ) saved_video = result.scalar_one() assert saved_video.natural_play_cnt == 0 assert saved_video.heated_play_cnt == 0 assert saved_video.total_play_cnt == 0 assert saved_video.estimated_video_cost == 0 async def test_video_nullable_fields(self, test_session): """Test that nullable fields can be None.""" video = KolVideo( item_id="test_nullable", star_id="star_nullable", star_unique_id="unique_nullable", star_nickname="测试可空字段", ) test_session.add(video) await test_session.commit() result = await test_session.execute( select(KolVideo).where(KolVideo.item_id == "test_nullable") ) saved_video = result.scalar_one() assert saved_video.title is None assert saved_video.video_url is None assert saved_video.brand_id is None assert saved_video.new_a3_rate is None async def test_update_video(self, test_session, sample_video_data): """Test updating a video record.""" video = KolVideo(**sample_video_data) test_session.add(video) await test_session.commit() result = await test_session.execute( select(KolVideo).where(KolVideo.item_id == sample_video_data["item_id"]) ) saved_video = result.scalar_one() saved_video.title = "更新后的标题" await test_session.commit() result = await test_session.execute( select(KolVideo).where(KolVideo.item_id == sample_video_data["item_id"]) ) updated_video = result.scalar_one() assert updated_video.title == "更新后的标题" async def test_delete_video(self, test_session, sample_video_data): """Test deleting a video record.""" video = KolVideo(**sample_video_data) test_session.add(video) await test_session.commit() result = await test_session.execute( select(KolVideo).where(KolVideo.item_id == sample_video_data["item_id"]) ) saved_video = result.scalar_one() await test_session.delete(saved_video) await test_session.commit() result = await test_session.execute( select(KolVideo).where(KolVideo.item_id == sample_video_data["item_id"]) ) assert result.scalar_one_or_none() is None