1027 lines
39 KiB
Python
1027 lines
39 KiB
Python
"""
|
||
脚本预审 API
|
||
"""
|
||
import re
|
||
from typing import Optional
|
||
from fastapi import APIRouter, Depends, Header
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.database import get_db
|
||
from app.schemas.review import (
|
||
ScriptReviewRequest,
|
||
ScriptReviewResponse,
|
||
Violation,
|
||
ViolationType,
|
||
RiskLevel,
|
||
Position,
|
||
SoftRiskWarning,
|
||
SoftRiskAction,
|
||
ReviewDimension,
|
||
ReviewDimensions,
|
||
SellingPointMatch,
|
||
BriefMatchDetail,
|
||
)
|
||
from app.api.rules import (
|
||
get_whitelist_for_brand,
|
||
get_other_brands_whitelist_terms,
|
||
get_forbidden_words_for_tenant,
|
||
get_active_platform_rules,
|
||
get_competitors_for_brand,
|
||
_platform_rules,
|
||
)
|
||
from app.services.soft_risk import evaluate_soft_risk
|
||
from app.services.ai_service import AIServiceFactory
|
||
from app.services.document_parser import DocumentParser
|
||
|
||
router = APIRouter(prefix="/scripts", tags=["scripts"])
|
||
|
||
# 内置违禁词库(广告极限词)
|
||
ABSOLUTE_WORDS = ["最好", "第一", "最佳", "绝对", "100%"]
|
||
|
||
# 功效词从品牌方规则库加载(category="功效词"),未配置则不检查
|
||
|
||
# 广告语境关键词(用于判断是否为广告场景)
|
||
AD_CONTEXT_KEYWORDS = ["产品", "购买", "销量", "品质", "推荐", "价格", "优惠", "促销"]
|
||
|
||
|
||
def _is_ad_context(content: str, word: str) -> bool:
|
||
"""
|
||
判断是否为广告语境
|
||
|
||
规则:
|
||
- 如果内容中包含广告关键词,认为是广告语境
|
||
- 如果违禁词出现在明显的非广告句式中,不是广告语境
|
||
"""
|
||
# 非广告语境模式
|
||
non_ad_patterns = [
|
||
r"他是第一[个名位]", # 他是第一个/名
|
||
r"[是为]第一[个名位]", # 是第一个
|
||
r"最开心|最高兴|最难忘", # 情感表达
|
||
r"第一[次个].*[到来抵达]", # 第一次到达
|
||
]
|
||
|
||
for pattern in non_ad_patterns:
|
||
if re.search(pattern, content):
|
||
return False
|
||
|
||
# 检查是否包含广告关键词
|
||
return any(kw in content for kw in AD_CONTEXT_KEYWORDS)
|
||
|
||
|
||
def _normalize_selling_points(raw_points: list[dict] | None) -> list[dict]:
|
||
"""
|
||
标准化卖点列表,兼容旧 required:bool 格式
|
||
返回 [{content, priority}]
|
||
"""
|
||
if not raw_points:
|
||
return []
|
||
result = []
|
||
for sp in raw_points:
|
||
content = sp.get("content", "")
|
||
if not content:
|
||
continue
|
||
# 兼容旧格式
|
||
if "priority" in sp:
|
||
priority = sp["priority"]
|
||
elif "required" in sp:
|
||
priority = "core" if sp["required"] else "recommended"
|
||
else:
|
||
priority = "recommended"
|
||
result.append({"content": content, "priority": priority})
|
||
return result
|
||
|
||
|
||
async def _ai_selling_point_analysis(
|
||
ai_client, content: str, selling_points: list[dict], model: str
|
||
) -> list[SellingPointMatch]:
|
||
"""
|
||
AI 语义匹配卖点覆盖
|
||
|
||
只检查 core 和 recommended,跳过 reference。
|
||
AI 不可用时回退:简单文本包含检测。
|
||
"""
|
||
# 过滤出需要检查的卖点
|
||
points_to_check = [sp for sp in selling_points if sp["priority"] in ("core", "recommended")]
|
||
reference_points = [sp for sp in selling_points if sp["priority"] == "reference"]
|
||
|
||
# reference 卖点直接标记为匹配
|
||
results: list[SellingPointMatch] = [
|
||
SellingPointMatch(content=sp["content"], priority="reference", matched=True, evidence="参考信息,不检查")
|
||
for sp in reference_points
|
||
]
|
||
|
||
if not points_to_check:
|
||
return results
|
||
|
||
if not ai_client:
|
||
# 回退:简单文本包含
|
||
for sp in points_to_check:
|
||
matched = sp["content"] in content
|
||
results.append(SellingPointMatch(
|
||
content=sp["content"], priority=sp["priority"], matched=matched,
|
||
evidence="文本匹配" if matched else "未检测到相关内容",
|
||
))
|
||
return results
|
||
|
||
try:
|
||
points_text = "\n".join(f"- [{sp['priority']}] {sp['content']}" for sp in points_to_check)
|
||
prompt = f"""作为广告合规审核专家,请判断以下脚本内容是否覆盖了每个卖点。
|
||
|
||
脚本内容:
|
||
{content}
|
||
|
||
需要检查的卖点:
|
||
{points_text}
|
||
|
||
请以 JSON 数组返回,每项包含:
|
||
- content: 卖点原文
|
||
- matched: true/false(脚本中是否传达了该卖点的含义,语义匹配即可,不要求原文出现)
|
||
- evidence: 匹配依据(如果匹配,指出脚本中对应的表述;如果不匹配,说明原因)
|
||
|
||
请只返回 JSON 数组,不要包含其他内容。"""
|
||
|
||
response = await ai_client.chat_completion(
|
||
messages=[{"role": "user", "content": prompt}],
|
||
model=model,
|
||
temperature=0.2,
|
||
max_tokens=1000,
|
||
)
|
||
|
||
import json
|
||
response_content = response.content.strip()
|
||
if response_content.startswith("```"):
|
||
response_content = response_content.split("\n", 1)[1]
|
||
if response_content.endswith("```"):
|
||
response_content = response_content.rsplit("\n", 1)[0]
|
||
|
||
ai_results = json.loads(response_content)
|
||
|
||
# 构建结果映射
|
||
ai_map = {item.get("content", ""): item for item in ai_results}
|
||
for sp in points_to_check:
|
||
ai_item = ai_map.get(sp["content"], {})
|
||
results.append(SellingPointMatch(
|
||
content=sp["content"],
|
||
priority=sp["priority"],
|
||
matched=ai_item.get("matched", False),
|
||
evidence=ai_item.get("evidence", ""),
|
||
))
|
||
except Exception:
|
||
# AI 失败时回退
|
||
for sp in points_to_check:
|
||
matched = sp["content"] in content
|
||
results.append(SellingPointMatch(
|
||
content=sp["content"], priority=sp["priority"], matched=matched,
|
||
evidence="文本匹配(AI不可用)" if matched else "未检测到(AI不可用)",
|
||
))
|
||
|
||
return results
|
||
|
||
|
||
async def _ai_brief_overall_analysis(
|
||
ai_client, content: str, selling_points: list[dict], model: str
|
||
) -> dict:
|
||
"""
|
||
AI 分析脚本与 Brief 的整体匹配度,输出亮点和问题点。
|
||
返回 {"overall_score": int, "highlights": [...], "issues": [...]}
|
||
AI 不可用时返回空结果。
|
||
"""
|
||
if not ai_client:
|
||
return {}
|
||
|
||
try:
|
||
sp_text = "\n".join(f"- [{sp['priority']}] {sp['content']}" for sp in selling_points) if selling_points else "(无卖点要求)"
|
||
prompt = f"""作为广告内容审核专家,请分析以下脚本与 Brief 要求的整体匹配程度。
|
||
|
||
脚本内容:
|
||
{content}
|
||
|
||
Brief 卖点要求:
|
||
{sp_text}
|
||
|
||
请从以下角度综合分析,以 JSON 返回:
|
||
{{
|
||
"overall_score": 0-100 的整数(整体匹配度评分),
|
||
"highlights": ["亮点1", "亮点2"],
|
||
"issues": ["问题1", "问题2"]
|
||
}}
|
||
|
||
分析角度:
|
||
- 卖点传达是否清晰自然(不要求死板对照,语义传达即可)
|
||
- 内容氛围和场景是否贴合产品定位
|
||
- 表达语气和风格是否合适
|
||
- 内容结构和节奏是否流畅
|
||
- 是否有吸引力和说服力
|
||
|
||
要求:
|
||
- highlights: 脚本做得好的方面,每条一句话,简明具体(如"开头用痛点切入,吸引力强")
|
||
- issues: 可以改进的方面,每条一句话,简明具体(如"缺少产品使用演示环节")
|
||
- 每项最多给 4 条,只写最重要的
|
||
- 如果整体不错,issues 可以为空数组
|
||
- overall_score: 综合考虑各角度的整体分数
|
||
|
||
请只返回 JSON,不要包含其他内容。"""
|
||
|
||
response = await ai_client.chat_completion(
|
||
messages=[{"role": "user", "content": prompt}],
|
||
model=model,
|
||
temperature=0.3,
|
||
max_tokens=800,
|
||
)
|
||
|
||
import json
|
||
resp = response.content.strip()
|
||
if resp.startswith("```"):
|
||
resp = resp.split("\n", 1)[1]
|
||
if resp.endswith("```"):
|
||
resp = resp.rsplit("\n", 1)[0]
|
||
return json.loads(resp)
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
@router.post("/review", response_model=ScriptReviewResponse)
|
||
async def review_script(
|
||
request: ScriptReviewRequest,
|
||
x_tenant_id: str = Header(..., alias="X-Tenant-ID"),
|
||
db: AsyncSession = Depends(get_db),
|
||
) -> ScriptReviewResponse:
|
||
"""
|
||
脚本预审(多维度评分)
|
||
|
||
四个独立维度:
|
||
- legal: 法规合规(违禁词、功效词、Brief黑名单词)
|
||
- platform: 平台规则
|
||
- brand_safety: 品牌安全(竞品、其他品牌词)
|
||
- brief_match: Brief 匹配度(卖点覆盖)
|
||
"""
|
||
violations: list[Violation] = []
|
||
content = request.content
|
||
image_data: list[str] | None = None
|
||
|
||
# 如果提供了文件 URL,自动解析文本和提取图片
|
||
if request.file_url and request.file_name:
|
||
try:
|
||
file_text = await DocumentParser.download_and_parse(
|
||
request.file_url, request.file_name
|
||
)
|
||
if file_text:
|
||
content = content + "\n\n" + file_text if content.strip() else file_text
|
||
except Exception as e:
|
||
import logging
|
||
logging.getLogger(__name__).warning(f"文件文本解析失败: {e}")
|
||
|
||
try:
|
||
image_data = await DocumentParser.download_and_get_images(
|
||
request.file_url, request.file_name
|
||
)
|
||
except Exception as e:
|
||
import logging
|
||
logging.getLogger(__name__).warning(f"文件图片提取失败: {e}")
|
||
|
||
# 获取品牌方配置的所有规则数据
|
||
whitelist = await get_whitelist_for_brand(x_tenant_id, request.brand_id, db)
|
||
all_tenant_words = await get_forbidden_words_for_tenant(x_tenant_id, db)
|
||
# 分离功效词和普通违禁词(品牌方未配置功效词则不检查)
|
||
efficacy_words = [w["word"] for w in all_tenant_words if w.get("category") == "功效词"]
|
||
tenant_forbidden_words = [w for w in all_tenant_words if w.get("category") != "功效词"]
|
||
competitors = await get_competitors_for_brand(x_tenant_id, request.brand_id, db)
|
||
db_platform_rules = await get_active_platform_rules(
|
||
x_tenant_id, request.brand_id, request.platform.value, db,
|
||
)
|
||
|
||
# ===== Step 1: 法规合规检测 (legal) =====
|
||
|
||
# 1a. 内置违禁词(广告极限词)
|
||
for word in ABSOLUTE_WORDS:
|
||
if word in whitelist:
|
||
continue
|
||
start = 0
|
||
while True:
|
||
pos = content.find(word, start)
|
||
if pos == -1:
|
||
break
|
||
if not _is_ad_context(content, word):
|
||
start = pos + 1
|
||
continue
|
||
violations.append(Violation(
|
||
type=ViolationType.FORBIDDEN_WORD,
|
||
content=word, severity=RiskLevel.HIGH, dimension="legal",
|
||
suggestion=f"建议删除或替换违禁词:{word}",
|
||
position=Position(start=pos, end=pos + len(word)),
|
||
))
|
||
start = pos + 1
|
||
|
||
# 1b. 功效词检测(从品牌方配置加载,未配置则用默认列表)
|
||
for word in efficacy_words:
|
||
if word in whitelist:
|
||
continue
|
||
start = 0
|
||
while True:
|
||
pos = content.find(word, start)
|
||
if pos == -1:
|
||
break
|
||
violations.append(Violation(
|
||
type=ViolationType.EFFICACY_CLAIM,
|
||
content=word, severity=RiskLevel.HIGH, dimension="legal",
|
||
suggestion=f"功效宣称词违反广告法,建议删除:{word}",
|
||
position=Position(start=pos, end=pos + len(word)),
|
||
))
|
||
start = pos + 1
|
||
|
||
# 1c. Brief 黑名单词
|
||
if request.blacklist_words:
|
||
for item in request.blacklist_words:
|
||
word = item.get("word", "")
|
||
reason = item.get("reason", "")
|
||
if not word or word in whitelist:
|
||
continue
|
||
start_pos = 0
|
||
while True:
|
||
pos = content.find(word, start_pos)
|
||
if pos == -1:
|
||
break
|
||
suggestion = f"Brief 黑名单词:{word}"
|
||
if reason:
|
||
suggestion += f"({reason})"
|
||
violations.append(Violation(
|
||
type=ViolationType.FORBIDDEN_WORD,
|
||
content=word, severity=RiskLevel.HIGH, dimension="legal",
|
||
suggestion=suggestion,
|
||
position=Position(start=pos, end=pos + len(word)),
|
||
))
|
||
start_pos = pos + 1
|
||
|
||
# 1d. 租户自定义违禁词 → legal 维度
|
||
for fw in tenant_forbidden_words:
|
||
word = fw["word"]
|
||
if word in whitelist or word in ABSOLUTE_WORDS:
|
||
continue
|
||
start = 0
|
||
while True:
|
||
pos = content.find(word, start)
|
||
if pos == -1:
|
||
break
|
||
if not _is_ad_context(content, word):
|
||
start = pos + 1
|
||
continue
|
||
violations.append(Violation(
|
||
type=ViolationType.FORBIDDEN_WORD,
|
||
content=word, severity=RiskLevel.HIGH, dimension="legal",
|
||
suggestion=f"建议删除或替换违禁词:{word}",
|
||
position=Position(start=pos, end=pos + len(word)),
|
||
))
|
||
start = pos + 1
|
||
|
||
# ===== Step 2: 平台规则检测 (platform) =====
|
||
already_checked = set(ABSOLUTE_WORDS + efficacy_words + [w["word"] for w in tenant_forbidden_words])
|
||
platform_forbidden_words: list[str] = []
|
||
platform_restricted_words: list[dict] = []
|
||
platform_content_requirements: list[str] = []
|
||
platform_other_rules: list[dict] = []
|
||
|
||
# 优先使用品牌方上传的 DB 平台规则,否则用硬编码兜底
|
||
if db_platform_rules:
|
||
platform_forbidden_words = db_platform_rules.get("forbidden_words", [])
|
||
platform_restricted_words = db_platform_rules.get("restricted_words", [])
|
||
platform_content_requirements = db_platform_rules.get("content_requirements", [])
|
||
platform_other_rules = db_platform_rules.get("other_rules", [])
|
||
else:
|
||
platform_rule = _platform_rules.get(request.platform.value)
|
||
if platform_rule:
|
||
for rule in platform_rule.get("rules", []):
|
||
if rule.get("type") == "forbidden_word":
|
||
platform_forbidden_words.extend(rule.get("words", []))
|
||
|
||
# 2a. 平台违禁词检测
|
||
for word in platform_forbidden_words:
|
||
if word in already_checked or word in whitelist:
|
||
continue
|
||
start = 0
|
||
while True:
|
||
pos = content.find(word, start)
|
||
if pos == -1:
|
||
break
|
||
if not _is_ad_context(content, word):
|
||
start = pos + 1
|
||
continue
|
||
violations.append(Violation(
|
||
type=ViolationType.FORBIDDEN_WORD,
|
||
content=word, severity=RiskLevel.MEDIUM, dimension="platform",
|
||
suggestion=f"违反{request.platform.value}平台规则,建议删除:{word}",
|
||
position=Position(start=pos, end=pos + len(word)),
|
||
))
|
||
start = pos + 1
|
||
|
||
# 2b. 平台限制词检测(有条件限制的词语)
|
||
for rw in platform_restricted_words:
|
||
word = rw.get("word", "")
|
||
if not word or word in whitelist:
|
||
continue
|
||
if word in content:
|
||
suggestion = rw.get("suggestion", f"「{word}」为平台限制用语")
|
||
condition = rw.get("condition", "")
|
||
if condition:
|
||
suggestion = f"「{word}」限制条件:{condition}。{suggestion}"
|
||
violations.append(Violation(
|
||
type=ViolationType.FORBIDDEN_WORD,
|
||
content=word, severity=RiskLevel.LOW, dimension="platform",
|
||
suggestion=suggestion,
|
||
position=Position(start=content.find(word), end=content.find(word) + len(word)),
|
||
))
|
||
|
||
# ===== Step 3: 品牌安全检测 (brand_safety) =====
|
||
|
||
# 3a. 其他品牌专属词
|
||
other_brand_terms = await get_other_brands_whitelist_terms(x_tenant_id, request.brand_id, db)
|
||
for term, owner_brand in other_brand_terms:
|
||
if term in content:
|
||
violations.append(Violation(
|
||
type=ViolationType.BRAND_SAFETY,
|
||
content=term, severity=RiskLevel.MEDIUM, dimension="brand_safety",
|
||
suggestion=f"使用了其他品牌的专属词汇:{term}",
|
||
position=Position(start=content.find(term), end=content.find(term) + len(term)),
|
||
))
|
||
|
||
# 3b. 竞品名称和关键词检测
|
||
for comp in competitors:
|
||
comp_name = comp["name"]
|
||
if comp_name in whitelist:
|
||
continue
|
||
if comp_name in content:
|
||
violations.append(Violation(
|
||
type=ViolationType.BRAND_SAFETY,
|
||
content=comp_name, severity=RiskLevel.HIGH, dimension="brand_safety",
|
||
suggestion=f"脚本中出现竞品品牌名「{comp_name}」,请删除或替换",
|
||
position=Position(start=content.find(comp_name), end=content.find(comp_name) + len(comp_name)),
|
||
))
|
||
for kw in comp.get("keywords", []):
|
||
if not kw or kw in whitelist:
|
||
continue
|
||
if kw in content:
|
||
violations.append(Violation(
|
||
type=ViolationType.BRAND_SAFETY,
|
||
content=kw, severity=RiskLevel.MEDIUM, dimension="brand_safety",
|
||
suggestion=f"脚本中出现竞品「{comp_name}」的关联词「{kw}」,请确认是否需要删除",
|
||
position=Position(start=content.find(kw), end=content.find(kw) + len(kw)),
|
||
))
|
||
|
||
# ===== Step 4: AI 深度分析 =====
|
||
# 构建品牌方规则上下文传给 AI
|
||
brand_rules_context = _build_brand_rules_context(
|
||
competitors=competitors,
|
||
tenant_forbidden_words=tenant_forbidden_words,
|
||
whitelist=whitelist,
|
||
db_platform_rules=db_platform_rules,
|
||
platform_content_requirements=platform_content_requirements,
|
||
platform_other_rules=platform_other_rules,
|
||
)
|
||
ai_violations, ai_warnings = await _ai_deep_analysis(
|
||
x_tenant_id, content, db,
|
||
image_data=image_data,
|
||
platform=request.platform.value,
|
||
brand_rules_context=brand_rules_context,
|
||
)
|
||
if ai_violations:
|
||
for v in ai_violations:
|
||
# 根据类型分配维度
|
||
if v.type in (ViolationType.FORBIDDEN_WORD, ViolationType.EFFICACY_CLAIM):
|
||
v.dimension = "legal"
|
||
elif v.type == ViolationType.COMPETITOR_LOGO:
|
||
v.dimension = "brand_safety"
|
||
else:
|
||
v.dimension = "brand_safety"
|
||
violations.append(v)
|
||
|
||
# ===== Step 4b: AI 语境复核(过滤误报) =====
|
||
# 将关键词匹配到的违规项交给 AI 复核上下文语义,去除误判
|
||
if violations:
|
||
violations = await _ai_context_verify(
|
||
x_tenant_id, content, violations, db,
|
||
)
|
||
|
||
# ===== Step 5: 卖点语义匹配 + 整体 Brief 匹配分析 =====
|
||
selling_points = _normalize_selling_points(request.selling_points)
|
||
selling_point_matches: list[SellingPointMatch] = []
|
||
brief_overall: dict = {}
|
||
|
||
ai_client = None
|
||
ai_available = False
|
||
text_model = "gpt-4o"
|
||
try:
|
||
ai_client = await AIServiceFactory.get_client(x_tenant_id, db)
|
||
if ai_client:
|
||
ai_available = True
|
||
config = await AIServiceFactory.get_config(x_tenant_id, db)
|
||
if config:
|
||
text_model = config.models.get("text", "gpt-4o")
|
||
except Exception:
|
||
pass
|
||
|
||
if selling_points:
|
||
selling_point_matches = await _ai_selling_point_analysis(
|
||
ai_client, content, selling_points, text_model
|
||
)
|
||
|
||
# AI 整体 Brief 匹配分析(亮点 + 问题点)
|
||
brief_overall = await _ai_brief_overall_analysis(
|
||
ai_client, content, selling_points, text_model
|
||
)
|
||
|
||
# ===== Step 6: 各维度独立评分 =====
|
||
def _calc_dimension_score(dim: str) -> tuple[int, int]:
|
||
dim_violations = [v for v in violations if v.dimension == dim]
|
||
score = 100
|
||
for v in dim_violations:
|
||
if v.severity == RiskLevel.HIGH:
|
||
score -= 25
|
||
elif v.severity == RiskLevel.MEDIUM:
|
||
score -= 15
|
||
else:
|
||
score -= 5
|
||
return max(0, score), len(dim_violations)
|
||
|
||
legal_score, legal_count = _calc_dimension_score("legal")
|
||
platform_score, platform_count = _calc_dimension_score("platform")
|
||
brand_safety_score, brand_safety_count = _calc_dimension_score("brand_safety")
|
||
|
||
# brief_match 评分:基于 min_selling_points 覆盖率 + AI 整体匹配度
|
||
checkable = [spm for spm in selling_point_matches if spm.priority in ("core", "recommended")]
|
||
matched_count = sum(1 for spm in checkable if spm.matched)
|
||
total_checkable = len(checkable)
|
||
|
||
# 代理商要求的最少体现条数(默认 = 全部 core 数量)
|
||
core_count = sum(1 for spm in checkable if spm.priority == "core")
|
||
min_required = request.min_selling_points if request.min_selling_points is not None else core_count
|
||
# 确保不超过可检查的总数
|
||
min_required = min(min_required, total_checkable) if total_checkable > 0 else 0
|
||
|
||
# 覆盖率得分:matched / min_required(满足要求 = 100 分)
|
||
if min_required > 0:
|
||
coverage_ratio = min(matched_count / min_required, 1.0)
|
||
coverage_score = round(coverage_ratio * 100)
|
||
elif total_checkable > 0:
|
||
# 没有要求但有卖点 → 按全量比例
|
||
coverage_score = round(matched_count / total_checkable * 100)
|
||
else:
|
||
coverage_score = 100 # 无卖点要求
|
||
|
||
# AI 整体匹配度得分
|
||
ai_overall_score = brief_overall.get("overall_score", coverage_score)
|
||
ai_overall_score = max(0, min(100, ai_overall_score))
|
||
|
||
# 综合 brief_match 得分 = 覆盖率 60% + 整体匹配度 40%
|
||
brief_match_score = round(coverage_score * 0.6 + ai_overall_score * 0.4)
|
||
brief_match_score = max(0, min(100, brief_match_score))
|
||
|
||
# 构建 BriefMatchDetail
|
||
highlights = brief_overall.get("highlights", [])[:4]
|
||
issues_list = brief_overall.get("issues", [])[:4]
|
||
|
||
# 生成评分说明
|
||
if min_required > 0:
|
||
explanation = f"要求至少体现 {min_required} 条卖点,实际匹配 {matched_count} 条(覆盖率 {coverage_score}%),整体匹配度 {ai_overall_score}%"
|
||
elif total_checkable > 0:
|
||
explanation = f"共 {total_checkable} 条卖点,匹配 {matched_count} 条(覆盖率 {coverage_score}%),整体匹配度 {ai_overall_score}%"
|
||
else:
|
||
explanation = f"整体匹配度 {ai_overall_score}%"
|
||
|
||
brief_match_detail = BriefMatchDetail(
|
||
total_points=total_checkable,
|
||
matched_points=matched_count,
|
||
required_points=min_required,
|
||
coverage_score=coverage_score,
|
||
overall_score=ai_overall_score,
|
||
highlights=highlights,
|
||
issues=issues_list,
|
||
explanation=explanation,
|
||
)
|
||
|
||
# 加权总分
|
||
total_score = round(
|
||
legal_score * 0.35
|
||
+ platform_score * 0.25
|
||
+ brand_safety_score * 0.25
|
||
+ brief_match_score * 0.15
|
||
)
|
||
total_score = max(0, min(100, total_score))
|
||
|
||
# ===== Step 7: 各维度 passed 判定 =====
|
||
has_high_legal = any(
|
||
v.dimension == "legal" and v.severity == RiskLevel.HIGH for v in violations
|
||
)
|
||
legal_passed = legal_score >= 60 and not has_high_legal
|
||
platform_passed = platform_score >= 60
|
||
brand_safety_passed = brand_safety_score >= 70
|
||
# brief_match passed: 覆盖率达标(matched >= min_required)
|
||
brief_match_passed = matched_count >= min_required if min_required > 0 else True
|
||
|
||
dimensions = ReviewDimensions(
|
||
legal=ReviewDimension(score=legal_score, passed=legal_passed, issue_count=legal_count),
|
||
platform=ReviewDimension(score=platform_score, passed=platform_passed, issue_count=platform_count),
|
||
brand_safety=ReviewDimension(score=brand_safety_score, passed=brand_safety_passed, issue_count=brand_safety_count),
|
||
brief_match=ReviewDimension(
|
||
score=brief_match_score, passed=brief_match_passed,
|
||
issue_count=sum(1 for spm in checkable if not spm.matched),
|
||
),
|
||
)
|
||
|
||
# 向后兼容 missing_points
|
||
missing_points: list[str] | None = None
|
||
if selling_point_matches:
|
||
core_missing = [spm.content for spm in selling_point_matches if spm.priority == "core" and not spm.matched]
|
||
missing_points = core_missing
|
||
|
||
# 生成摘要
|
||
parts = []
|
||
if not legal_passed:
|
||
parts.append(f"法规合规问题 {legal_count} 处")
|
||
if not platform_passed:
|
||
parts.append(f"平台规则问题 {platform_count} 处")
|
||
if not brand_safety_passed:
|
||
parts.append(f"品牌安全问题 {brand_safety_count} 处")
|
||
if not brief_match_passed:
|
||
unmatched = min_required - matched_count
|
||
parts.append(f"卖点覆盖不足(还差 {unmatched} 条)")
|
||
if not parts:
|
||
summary = "脚本内容合规,未发现问题"
|
||
else:
|
||
summary = ",".join(parts)
|
||
|
||
# 软性风控评估
|
||
soft_warnings: list[SoftRiskWarning] = []
|
||
if request.soft_risk_context:
|
||
soft_warnings = evaluate_soft_risk(request.soft_risk_context)
|
||
if ai_warnings:
|
||
soft_warnings.extend(ai_warnings)
|
||
if missing_points:
|
||
soft_warnings.append(SoftRiskWarning(
|
||
code="missing_selling_points",
|
||
message=f"核心卖点未覆盖:{', '.join(missing_points)}",
|
||
action_required=SoftRiskAction.NOTE,
|
||
blocking=False,
|
||
))
|
||
|
||
return ScriptReviewResponse(
|
||
score=total_score,
|
||
summary=summary,
|
||
dimensions=dimensions,
|
||
selling_point_matches=selling_point_matches,
|
||
brief_match_detail=brief_match_detail,
|
||
violations=violations,
|
||
missing_points=missing_points,
|
||
soft_warnings=soft_warnings,
|
||
ai_available=ai_available,
|
||
)
|
||
|
||
|
||
async def _ai_context_verify(
|
||
tenant_id: str,
|
||
content: str,
|
||
violations: list[Violation],
|
||
db: AsyncSession,
|
||
) -> list[Violation]:
|
||
"""
|
||
AI 语境复核:将关键词匹配到的违规项交给 AI 判断上下文语义。
|
||
|
||
例如违禁词"小孩",如果脚本写"这不是小孩玩的",则属于否定语境,不构成违规。
|
||
AI 不可用时直接返回原列表(降级为纯关键词匹配)。
|
||
"""
|
||
if not violations:
|
||
return violations
|
||
|
||
try:
|
||
ai_client = await AIServiceFactory.get_client(tenant_id, db)
|
||
if not ai_client:
|
||
return violations
|
||
|
||
config = await AIServiceFactory.get_config(tenant_id, db)
|
||
if not config:
|
||
return violations
|
||
|
||
text_model = config.models.get("text", "gpt-4o")
|
||
|
||
# 构建违规项列表
|
||
items_text = []
|
||
for i, v in enumerate(violations):
|
||
# 提取违规词周围的上下文(前后各 40 字符)
|
||
ctx = ""
|
||
if v.position and v.position.start is not None:
|
||
ctx_start = max(0, v.position.start - 40)
|
||
ctx_end = min(len(content), v.position.end + 40)
|
||
ctx = content[ctx_start:ctx_end]
|
||
else:
|
||
# 没有位置信息,尝试找上下文
|
||
pos = content.find(v.content)
|
||
if pos != -1:
|
||
ctx_start = max(0, pos - 40)
|
||
ctx_end = min(len(content), pos + len(v.content) + 40)
|
||
ctx = content[ctx_start:ctx_end]
|
||
|
||
items_text.append(
|
||
f"{i}. 词语「{v.content}」| 维度: {v.dimension} | 上下文: ...{ctx}..."
|
||
)
|
||
|
||
prompt = f"""你是广告合规审核专家。以下脚本中通过关键词匹配检测到了一些疑似违规项。
|
||
请根据脚本的完整上下文语义,判断每一项是否真正构成违规。
|
||
|
||
完整脚本内容:
|
||
{content}
|
||
|
||
检测到的疑似违规项:
|
||
{chr(10).join(items_text)}
|
||
|
||
判断标准:
|
||
- 如果该词出现在否定语境中(如"不是XX"、"不含XX"、"避免XX"),通常不构成违规
|
||
- 如果该词用于客观描述、对比说明或免责声明中,需要根据具体语境判断
|
||
- 如果该词用于正面宣传、推荐、承诺等语境中,构成违规
|
||
- 仅当你非常确定不构成违规时才标记为 false
|
||
|
||
请以 JSON 数组返回,每项包含:
|
||
- index: 违规项编号(对应上面的编号)
|
||
- is_violation: true/false(在上下文中是否真正构成违规)
|
||
- reason: 简要说明判断理由(20字以内)
|
||
|
||
请只返回 JSON 数组,不要包含其他内容。"""
|
||
|
||
response = await ai_client.chat_completion(
|
||
messages=[{"role": "user", "content": prompt}],
|
||
model=text_model,
|
||
temperature=0.1,
|
||
max_tokens=1000,
|
||
)
|
||
|
||
import json as _json
|
||
response_content = response.content.strip()
|
||
if response_content.startswith("```"):
|
||
response_content = response_content.split("\n", 1)[1]
|
||
if response_content.endswith("```"):
|
||
response_content = response_content.rsplit("\n", 1)[0]
|
||
|
||
ai_results = _json.loads(response_content)
|
||
|
||
# 构建复核结果映射
|
||
verify_map: dict[int, dict] = {}
|
||
for item in ai_results:
|
||
idx = item.get("index")
|
||
if idx is not None:
|
||
verify_map[idx] = item
|
||
|
||
# 过滤误报
|
||
verified = []
|
||
import logging
|
||
_logger = logging.getLogger(__name__)
|
||
for i, v in enumerate(violations):
|
||
result = verify_map.get(i)
|
||
if result and not result.get("is_violation", True):
|
||
reason = result.get("reason", "")
|
||
_logger.info(f"AI 语境复核排除误报: 「{v.content}」— {reason}")
|
||
continue
|
||
verified.append(v)
|
||
|
||
return verified
|
||
|
||
except Exception as e:
|
||
import logging
|
||
logging.getLogger(__name__).warning(f"AI 语境复核失败,保留原始结果: {e}")
|
||
return violations
|
||
|
||
|
||
def _build_brand_rules_context(
|
||
competitors: list[dict],
|
||
tenant_forbidden_words: list[dict],
|
||
whitelist: list[str],
|
||
db_platform_rules: dict | None,
|
||
platform_content_requirements: list[str],
|
||
platform_other_rules: list[dict],
|
||
) -> str:
|
||
"""构建品牌方规则上下文文本,注入 AI prompt"""
|
||
sections = []
|
||
|
||
# 竞品列表
|
||
if competitors:
|
||
comp_lines = []
|
||
for c in competitors:
|
||
kws = ", ".join(c.get("keywords", []))
|
||
line = f" - {c['name']}"
|
||
if kws:
|
||
line += f"(关键词:{kws})"
|
||
comp_lines.append(line)
|
||
sections.append("【竞品品牌列表】脚本中不得出现以下竞品品牌名或关联词:\n" + "\n".join(comp_lines))
|
||
|
||
# 自定义违禁词
|
||
if tenant_forbidden_words:
|
||
words = [w["word"] for w in tenant_forbidden_words]
|
||
sections.append(f"【品牌方自定义违禁词】以下词语禁止使用:{', '.join(words)}")
|
||
|
||
# 白名单
|
||
if whitelist:
|
||
sections.append(f"【白名单】以下词语已获授权可以使用,不应标记为违规:{', '.join(whitelist)}")
|
||
|
||
# DB 平台规则中的内容要求和其他规则
|
||
if platform_content_requirements:
|
||
sections.append("【平台内容要求】\n" + "\n".join(f" - {r}" for r in platform_content_requirements))
|
||
|
||
if platform_other_rules:
|
||
other_lines = []
|
||
for r in platform_other_rules:
|
||
rule_name = r.get("rule", "")
|
||
rule_desc = r.get("description", "")
|
||
other_lines.append(f" - {rule_name}:{rule_desc}")
|
||
sections.append("【平台其他规则】\n" + "\n".join(other_lines))
|
||
|
||
# DB 平台规则中的限制词
|
||
if db_platform_rules:
|
||
restricted = db_platform_rules.get("restricted_words", [])
|
||
if restricted:
|
||
rw_lines = []
|
||
for rw in restricted:
|
||
word = rw.get("word", "")
|
||
condition = rw.get("condition", "")
|
||
rw_lines.append(f" - 「{word}」— {condition}")
|
||
sections.append("【平台限制用语】以下词语有使用条件限制:\n" + "\n".join(rw_lines))
|
||
|
||
return "\n\n".join(sections) if sections else ""
|
||
|
||
|
||
async def _ai_deep_analysis(
|
||
tenant_id: str,
|
||
content: str,
|
||
db: AsyncSession,
|
||
image_data: list[str] | None = None,
|
||
platform: str = "douyin",
|
||
brand_rules_context: str = "",
|
||
) -> tuple[list[Violation], list[SoftRiskWarning]]:
|
||
"""
|
||
使用 AI 进行深度分析(支持纯文本和多模态图片审核)
|
||
|
||
Args:
|
||
tenant_id: 租户 ID
|
||
content: 脚本文本内容
|
||
db: 数据库会话
|
||
image_data: 可选的 base64 图片列表(从文档中提取)
|
||
platform: 投放平台
|
||
brand_rules_context: 品牌方配置的规则上下文
|
||
|
||
返回 (violations, soft_warnings)
|
||
AI 分析失败时返回空列表,降级到规则检测
|
||
"""
|
||
platform_labels = {
|
||
"douyin": "抖音", "xiaohongshu": "小红书", "bilibili": "B站",
|
||
"kuaishou": "快手", "weibo": "微博", "wechat": "微信",
|
||
}
|
||
platform_label = platform_labels.get(platform, platform)
|
||
|
||
# 获取平台特定规则(硬编码兜底)
|
||
platform_rule_details = _platform_rules.get(platform, {})
|
||
platform_rule_text = ""
|
||
if platform_rule_details:
|
||
rule_items = []
|
||
for rule in platform_rule_details.get("rules", []):
|
||
if rule.get("type") == "forbidden_word":
|
||
rule_items.append(f"- 平台违禁词:{', '.join(rule.get('words', []))}")
|
||
elif rule.get("type") == "duration":
|
||
if rule.get("min_seconds"):
|
||
rule_items.append(f"- 最短时长要求:{rule['min_seconds']}秒")
|
||
if rule_items:
|
||
platform_rule_text = f"\n\n{platform_label}平台基础规则:\n" + "\n".join(rule_items)
|
||
|
||
# 品牌方配置的规则上下文
|
||
brand_context_text = ""
|
||
if brand_rules_context:
|
||
brand_context_text = f"\n\n===== 品牌方审核规则配置 =====\n{brand_rules_context}\n============================="
|
||
|
||
try:
|
||
# 获取 AI 客户端
|
||
ai_client = await AIServiceFactory.get_client(tenant_id, db)
|
||
if not ai_client:
|
||
return [], []
|
||
|
||
# 获取模型配置
|
||
config = await AIServiceFactory.get_config(tenant_id, db)
|
||
if not config:
|
||
return [], []
|
||
|
||
text_model = config.models.get("text", "gpt-4o")
|
||
|
||
# 构建基础分析提示
|
||
base_prompt = f"""作为广告合规审核专家,请分析以下将在「{platform_label}」平台发布的广告脚本内容,检测潜在的合规风险:
|
||
|
||
脚本内容:
|
||
{content}
|
||
{platform_rule_text}{brand_context_text}
|
||
|
||
请结合上述所有规则配置,重点检查以下方面:
|
||
1. 是否存在隐性的虚假宣传(如暗示疗效但不直接说明)
|
||
2. 是否存在容易引起误解的表述
|
||
3. 是否存在夸大描述
|
||
4. 是否存在可能违反广告法的其他内容
|
||
5. 是否违反{platform_label}平台的内容规范和社区规则
|
||
6. 是否出现竞品品牌名称或关联词汇(如有竞品列表)
|
||
7. 是否符合平台内容要求(如有具体要求)"""
|
||
|
||
# 有图片时追加图片审核要点
|
||
if image_data:
|
||
base_prompt += """
|
||
5. 图片中是否出现竞品品牌 logo 或商标
|
||
6. 图片中是否存在违规画面(涉黄、暴力、敏感内容等)
|
||
7. 图片中是否存在虚假对比图或误导性图片
|
||
8. 图片中的文字是否包含违禁词或夸大宣传"""
|
||
|
||
base_prompt += """
|
||
|
||
请以 JSON 数组返回,每项包含:
|
||
- category: "violation"(硬性违规,明确违法/违规)或 "warning"(软性提醒,需人工判断)
|
||
- type: 违规类型 (forbidden_word/efficacy_claim/brand_safety/competitor_logo)
|
||
- content: 问题内容
|
||
- severity: 严重程度 (high/medium/low)
|
||
- suggestion: 修改建议
|
||
|
||
分类标准:
|
||
- violation: 违禁词、功效宣称、品牌安全、竞品露出等明确违规
|
||
- warning: 夸大描述、易误解表述、潜在风险
|
||
|
||
如果未发现问题,返回空数组 []
|
||
|
||
请只返回 JSON 数组,不要包含其他内容。"""
|
||
|
||
# 根据是否有图片选择纯文本或多模态分析
|
||
if image_data:
|
||
vision_model = config.models.get("vision", text_model)
|
||
image_urls = [f"data:image/png;base64,{b64}" for b64 in image_data]
|
||
response = await ai_client.vision_analysis(
|
||
image_urls=image_urls,
|
||
prompt=base_prompt,
|
||
model=vision_model,
|
||
temperature=0.3,
|
||
max_tokens=1500,
|
||
)
|
||
else:
|
||
response = await ai_client.chat_completion(
|
||
messages=[{"role": "user", "content": base_prompt}],
|
||
model=text_model,
|
||
temperature=0.3,
|
||
max_tokens=1000,
|
||
)
|
||
|
||
# 解析 AI 响应
|
||
import json
|
||
try:
|
||
# 清理响应内容(移除可能的 markdown 标记)
|
||
response_content = response.content.strip()
|
||
if response_content.startswith("```"):
|
||
response_content = response_content.split("\n", 1)[1]
|
||
if response_content.endswith("```"):
|
||
response_content = response_content.rsplit("\n", 1)[0]
|
||
|
||
ai_results = json.loads(response_content)
|
||
|
||
violations = []
|
||
warnings = []
|
||
for item in ai_results:
|
||
category = item.get("category", "violation") # 默认当硬性违规(安全兜底)
|
||
|
||
violation_type = item.get("type", "forbidden_word")
|
||
if violation_type == "forbidden_word":
|
||
vtype = ViolationType.FORBIDDEN_WORD
|
||
elif violation_type == "efficacy_claim":
|
||
vtype = ViolationType.EFFICACY_CLAIM
|
||
elif violation_type == "competitor_logo":
|
||
vtype = ViolationType.COMPETITOR_LOGO
|
||
else:
|
||
vtype = ViolationType.BRAND_SAFETY
|
||
|
||
severity = item.get("severity", "medium")
|
||
if severity == "high":
|
||
slevel = RiskLevel.HIGH
|
||
elif severity == "low":
|
||
slevel = RiskLevel.LOW
|
||
else:
|
||
slevel = RiskLevel.MEDIUM
|
||
|
||
if category == "warning":
|
||
# 软性提醒 → SoftRiskWarning
|
||
warnings.append(SoftRiskWarning(
|
||
code="ai_warning",
|
||
message=f"{item.get('content', '')}: {item.get('suggestion', '建议修改')}",
|
||
action_required=SoftRiskAction.NOTE,
|
||
blocking=False,
|
||
context={"type": violation_type, "severity": severity},
|
||
))
|
||
else:
|
||
# 硬性违规 → Violation
|
||
violations.append(Violation(
|
||
type=vtype,
|
||
content=item.get("content", ""),
|
||
severity=slevel,
|
||
suggestion=item.get("suggestion", "建议修改"),
|
||
))
|
||
|
||
return violations, warnings
|
||
|
||
except json.JSONDecodeError:
|
||
return [], []
|
||
|
||
except Exception:
|
||
return [], []
|