Your Name 0b3dfa3c52 feat: AI 审核自动驳回 + 功效词可配置 + UI 修复
- AI 自动驳回:法规/品牌安全 HIGH 违规或总分<40 自动打回上传阶段
- 功效词可配置:从硬编码改为品牌方在规则页面自行管理
- 驳回通知:AI 驳回时只通知达人,含具体原因
- 达人端:脚本/视频页面展示 AI 驳回原因 + 重新上传入口
- 规则页面:新增"功效词"分类
- 种子数据:新增 6 条默认功效词
- 其他:代理商管理下拉修复、AI 配置模型列表扩展、视觉模型标签修正、规则编辑放开限制

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 20:24:32 +08:00

1030 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
脚本预审 API
"""
import re
from typing import Optional
from fastapi import APIRouter, Depends, Header
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.schemas.review import (
ScriptReviewRequest,
ScriptReviewResponse,
Violation,
ViolationType,
RiskLevel,
Position,
SoftRiskWarning,
SoftRiskAction,
ReviewDimension,
ReviewDimensions,
SellingPointMatch,
BriefMatchDetail,
)
from app.api.rules import (
get_whitelist_for_brand,
get_other_brands_whitelist_terms,
get_forbidden_words_for_tenant,
get_active_platform_rules,
get_competitors_for_brand,
_platform_rules,
)
from app.services.soft_risk import evaluate_soft_risk
from app.services.ai_service import AIServiceFactory
from app.services.document_parser import DocumentParser
router = APIRouter(prefix="/scripts", tags=["scripts"])
# 内置违禁词库(广告极限词)
ABSOLUTE_WORDS = ["最好", "第一", "最佳", "绝对", "100%"]
# 默认功效词库(品牌方未配置时的兜底)
DEFAULT_EFFICACY_WORDS = ["根治", "治愈", "治疗", "药效", "疗效", "特效"]
# 广告语境关键词(用于判断是否为广告场景)
AD_CONTEXT_KEYWORDS = ["产品", "购买", "销量", "品质", "推荐", "价格", "优惠", "促销"]
def _is_ad_context(content: str, word: str) -> bool:
"""
判断是否为广告语境
规则:
- 如果内容中包含广告关键词,认为是广告语境
- 如果违禁词出现在明显的非广告句式中,不是广告语境
"""
# 非广告语境模式
non_ad_patterns = [
r"他是第一[个名位]", # 他是第一个/名
r"[是为]第一[个名位]", # 是第一个
r"最开心|最高兴|最难忘", # 情感表达
r"第一[次个].*[到来抵达]", # 第一次到达
]
for pattern in non_ad_patterns:
if re.search(pattern, content):
return False
# 检查是否包含广告关键词
return any(kw in content for kw in AD_CONTEXT_KEYWORDS)
def _normalize_selling_points(raw_points: list[dict] | None) -> list[dict]:
"""
标准化卖点列表,兼容旧 required:bool 格式
返回 [{content, priority}]
"""
if not raw_points:
return []
result = []
for sp in raw_points:
content = sp.get("content", "")
if not content:
continue
# 兼容旧格式
if "priority" in sp:
priority = sp["priority"]
elif "required" in sp:
priority = "core" if sp["required"] else "recommended"
else:
priority = "recommended"
result.append({"content": content, "priority": priority})
return result
async def _ai_selling_point_analysis(
ai_client, content: str, selling_points: list[dict], model: str
) -> list[SellingPointMatch]:
"""
AI 语义匹配卖点覆盖
只检查 core 和 recommended跳过 reference。
AI 不可用时回退:简单文本包含检测。
"""
# 过滤出需要检查的卖点
points_to_check = [sp for sp in selling_points if sp["priority"] in ("core", "recommended")]
reference_points = [sp for sp in selling_points if sp["priority"] == "reference"]
# reference 卖点直接标记为匹配
results: list[SellingPointMatch] = [
SellingPointMatch(content=sp["content"], priority="reference", matched=True, evidence="参考信息,不检查")
for sp in reference_points
]
if not points_to_check:
return results
if not ai_client:
# 回退:简单文本包含
for sp in points_to_check:
matched = sp["content"] in content
results.append(SellingPointMatch(
content=sp["content"], priority=sp["priority"], matched=matched,
evidence="文本匹配" if matched else "未检测到相关内容",
))
return results
try:
points_text = "\n".join(f"- [{sp['priority']}] {sp['content']}" for sp in points_to_check)
prompt = f"""作为广告合规审核专家,请判断以下脚本内容是否覆盖了每个卖点。
脚本内容:
{content}
需要检查的卖点:
{points_text}
请以 JSON 数组返回,每项包含:
- content: 卖点原文
- matched: true/false脚本中是否传达了该卖点的含义语义匹配即可不要求原文出现
- evidence: 匹配依据(如果匹配,指出脚本中对应的表述;如果不匹配,说明原因)
请只返回 JSON 数组,不要包含其他内容。"""
response = await ai_client.chat_completion(
messages=[{"role": "user", "content": prompt}],
model=model,
temperature=0.2,
max_tokens=1000,
)
import json
response_content = response.content.strip()
if response_content.startswith("```"):
response_content = response_content.split("\n", 1)[1]
if response_content.endswith("```"):
response_content = response_content.rsplit("\n", 1)[0]
ai_results = json.loads(response_content)
# 构建结果映射
ai_map = {item.get("content", ""): item for item in ai_results}
for sp in points_to_check:
ai_item = ai_map.get(sp["content"], {})
results.append(SellingPointMatch(
content=sp["content"],
priority=sp["priority"],
matched=ai_item.get("matched", False),
evidence=ai_item.get("evidence", ""),
))
except Exception:
# AI 失败时回退
for sp in points_to_check:
matched = sp["content"] in content
results.append(SellingPointMatch(
content=sp["content"], priority=sp["priority"], matched=matched,
evidence="文本匹配AI不可用" if matched else "未检测到AI不可用",
))
return results
async def _ai_brief_overall_analysis(
ai_client, content: str, selling_points: list[dict], model: str
) -> dict:
"""
AI 分析脚本与 Brief 的整体匹配度,输出亮点和问题点。
返回 {"overall_score": int, "highlights": [...], "issues": [...]}
AI 不可用时返回空结果。
"""
if not ai_client:
return {}
try:
sp_text = "\n".join(f"- [{sp['priority']}] {sp['content']}" for sp in selling_points) if selling_points else "(无卖点要求)"
prompt = f"""作为广告内容审核专家,请分析以下脚本与 Brief 要求的整体匹配程度。
脚本内容:
{content}
Brief 卖点要求:
{sp_text}
请从以下角度综合分析,以 JSON 返回:
{{
"overall_score": 0-100 的整数(整体匹配度评分),
"highlights": ["亮点1", "亮点2"],
"issues": ["问题1", "问题2"]
}}
分析角度:
- 卖点传达是否清晰自然(不要求死板对照,语义传达即可)
- 内容氛围和场景是否贴合产品定位
- 表达语气和风格是否合适
- 内容结构和节奏是否流畅
- 是否有吸引力和说服力
要求:
- highlights: 脚本做得好的方面,每条一句话,简明具体(如"开头用痛点切入,吸引力强"
- issues: 可以改进的方面,每条一句话,简明具体(如"缺少产品使用演示环节"
- 每项最多给 4 条,只写最重要的
- 如果整体不错issues 可以为空数组
- overall_score: 综合考虑各角度的整体分数
请只返回 JSON不要包含其他内容。"""
response = await ai_client.chat_completion(
messages=[{"role": "user", "content": prompt}],
model=model,
temperature=0.3,
max_tokens=800,
)
import json
resp = response.content.strip()
if resp.startswith("```"):
resp = resp.split("\n", 1)[1]
if resp.endswith("```"):
resp = resp.rsplit("\n", 1)[0]
return json.loads(resp)
except Exception:
return {}
@router.post("/review", response_model=ScriptReviewResponse)
async def review_script(
request: ScriptReviewRequest,
x_tenant_id: str = Header(..., alias="X-Tenant-ID"),
db: AsyncSession = Depends(get_db),
) -> ScriptReviewResponse:
"""
脚本预审(多维度评分)
四个独立维度:
- legal: 法规合规违禁词、功效词、Brief黑名单词
- platform: 平台规则
- brand_safety: 品牌安全(竞品、其他品牌词)
- brief_match: Brief 匹配度(卖点覆盖)
"""
violations: list[Violation] = []
content = request.content
image_data: list[str] | None = None
# 如果提供了文件 URL自动解析文本和提取图片
if request.file_url and request.file_name:
try:
file_text = await DocumentParser.download_and_parse(
request.file_url, request.file_name
)
if file_text:
content = content + "\n\n" + file_text if content.strip() else file_text
except Exception as e:
import logging
logging.getLogger(__name__).warning(f"文件文本解析失败: {e}")
try:
image_data = await DocumentParser.download_and_get_images(
request.file_url, request.file_name
)
except Exception as e:
import logging
logging.getLogger(__name__).warning(f"文件图片提取失败: {e}")
# 获取品牌方配置的所有规则数据
whitelist = await get_whitelist_for_brand(x_tenant_id, request.brand_id, db)
all_tenant_words = await get_forbidden_words_for_tenant(x_tenant_id, db)
# 分离功效词和普通违禁词
efficacy_words = [w["word"] for w in all_tenant_words if w.get("category") == "功效词"]
if not efficacy_words:
efficacy_words = list(DEFAULT_EFFICACY_WORDS)
tenant_forbidden_words = [w for w in all_tenant_words if w.get("category") != "功效词"]
competitors = await get_competitors_for_brand(x_tenant_id, request.brand_id, db)
db_platform_rules = await get_active_platform_rules(
x_tenant_id, request.brand_id, request.platform.value, db,
)
# ===== Step 1: 法规合规检测 (legal) =====
# 1a. 内置违禁词(广告极限词)
for word in ABSOLUTE_WORDS:
if word in whitelist:
continue
start = 0
while True:
pos = content.find(word, start)
if pos == -1:
break
if not _is_ad_context(content, word):
start = pos + 1
continue
violations.append(Violation(
type=ViolationType.FORBIDDEN_WORD,
content=word, severity=RiskLevel.HIGH, dimension="legal",
suggestion=f"建议删除或替换违禁词:{word}",
position=Position(start=pos, end=pos + len(word)),
))
start = pos + 1
# 1b. 功效词检测(从品牌方配置加载,未配置则用默认列表)
for word in efficacy_words:
if word in whitelist:
continue
start = 0
while True:
pos = content.find(word, start)
if pos == -1:
break
violations.append(Violation(
type=ViolationType.EFFICACY_CLAIM,
content=word, severity=RiskLevel.HIGH, dimension="legal",
suggestion=f"功效宣称词违反广告法,建议删除:{word}",
position=Position(start=pos, end=pos + len(word)),
))
start = pos + 1
# 1c. Brief 黑名单词
if request.blacklist_words:
for item in request.blacklist_words:
word = item.get("word", "")
reason = item.get("reason", "")
if not word or word in whitelist:
continue
start_pos = 0
while True:
pos = content.find(word, start_pos)
if pos == -1:
break
suggestion = f"Brief 黑名单词:{word}"
if reason:
suggestion += f"{reason}"
violations.append(Violation(
type=ViolationType.FORBIDDEN_WORD,
content=word, severity=RiskLevel.HIGH, dimension="legal",
suggestion=suggestion,
position=Position(start=pos, end=pos + len(word)),
))
start_pos = pos + 1
# 1d. 租户自定义违禁词 → legal 维度
for fw in tenant_forbidden_words:
word = fw["word"]
if word in whitelist or word in ABSOLUTE_WORDS:
continue
start = 0
while True:
pos = content.find(word, start)
if pos == -1:
break
if not _is_ad_context(content, word):
start = pos + 1
continue
violations.append(Violation(
type=ViolationType.FORBIDDEN_WORD,
content=word, severity=RiskLevel.HIGH, dimension="legal",
suggestion=f"建议删除或替换违禁词:{word}",
position=Position(start=pos, end=pos + len(word)),
))
start = pos + 1
# ===== Step 2: 平台规则检测 (platform) =====
already_checked = set(ABSOLUTE_WORDS + efficacy_words + [w["word"] for w in tenant_forbidden_words])
platform_forbidden_words: list[str] = []
platform_restricted_words: list[dict] = []
platform_content_requirements: list[str] = []
platform_other_rules: list[dict] = []
# 优先使用品牌方上传的 DB 平台规则,否则用硬编码兜底
if db_platform_rules:
platform_forbidden_words = db_platform_rules.get("forbidden_words", [])
platform_restricted_words = db_platform_rules.get("restricted_words", [])
platform_content_requirements = db_platform_rules.get("content_requirements", [])
platform_other_rules = db_platform_rules.get("other_rules", [])
else:
platform_rule = _platform_rules.get(request.platform.value)
if platform_rule:
for rule in platform_rule.get("rules", []):
if rule.get("type") == "forbidden_word":
platform_forbidden_words.extend(rule.get("words", []))
# 2a. 平台违禁词检测
for word in platform_forbidden_words:
if word in already_checked or word in whitelist:
continue
start = 0
while True:
pos = content.find(word, start)
if pos == -1:
break
if not _is_ad_context(content, word):
start = pos + 1
continue
violations.append(Violation(
type=ViolationType.FORBIDDEN_WORD,
content=word, severity=RiskLevel.MEDIUM, dimension="platform",
suggestion=f"违反{request.platform.value}平台规则,建议删除:{word}",
position=Position(start=pos, end=pos + len(word)),
))
start = pos + 1
# 2b. 平台限制词检测(有条件限制的词语)
for rw in platform_restricted_words:
word = rw.get("word", "")
if not word or word in whitelist:
continue
if word in content:
suggestion = rw.get("suggestion", f"{word}」为平台限制用语")
condition = rw.get("condition", "")
if condition:
suggestion = f"{word}」限制条件:{condition}{suggestion}"
violations.append(Violation(
type=ViolationType.FORBIDDEN_WORD,
content=word, severity=RiskLevel.LOW, dimension="platform",
suggestion=suggestion,
position=Position(start=content.find(word), end=content.find(word) + len(word)),
))
# ===== Step 3: 品牌安全检测 (brand_safety) =====
# 3a. 其他品牌专属词
other_brand_terms = await get_other_brands_whitelist_terms(x_tenant_id, request.brand_id, db)
for term, owner_brand in other_brand_terms:
if term in content:
violations.append(Violation(
type=ViolationType.BRAND_SAFETY,
content=term, severity=RiskLevel.MEDIUM, dimension="brand_safety",
suggestion=f"使用了其他品牌的专属词汇:{term}",
position=Position(start=content.find(term), end=content.find(term) + len(term)),
))
# 3b. 竞品名称和关键词检测
for comp in competitors:
comp_name = comp["name"]
if comp_name in whitelist:
continue
if comp_name in content:
violations.append(Violation(
type=ViolationType.BRAND_SAFETY,
content=comp_name, severity=RiskLevel.HIGH, dimension="brand_safety",
suggestion=f"脚本中出现竞品品牌名「{comp_name}」,请删除或替换",
position=Position(start=content.find(comp_name), end=content.find(comp_name) + len(comp_name)),
))
for kw in comp.get("keywords", []):
if not kw or kw in whitelist:
continue
if kw in content:
violations.append(Violation(
type=ViolationType.BRAND_SAFETY,
content=kw, severity=RiskLevel.MEDIUM, dimension="brand_safety",
suggestion=f"脚本中出现竞品「{comp_name}」的关联词「{kw}」,请确认是否需要删除",
position=Position(start=content.find(kw), end=content.find(kw) + len(kw)),
))
# ===== Step 4: AI 深度分析 =====
# 构建品牌方规则上下文传给 AI
brand_rules_context = _build_brand_rules_context(
competitors=competitors,
tenant_forbidden_words=tenant_forbidden_words,
whitelist=whitelist,
db_platform_rules=db_platform_rules,
platform_content_requirements=platform_content_requirements,
platform_other_rules=platform_other_rules,
)
ai_violations, ai_warnings = await _ai_deep_analysis(
x_tenant_id, content, db,
image_data=image_data,
platform=request.platform.value,
brand_rules_context=brand_rules_context,
)
if ai_violations:
for v in ai_violations:
# 根据类型分配维度
if v.type in (ViolationType.FORBIDDEN_WORD, ViolationType.EFFICACY_CLAIM):
v.dimension = "legal"
elif v.type == ViolationType.COMPETITOR_LOGO:
v.dimension = "brand_safety"
else:
v.dimension = "brand_safety"
violations.append(v)
# ===== Step 4b: AI 语境复核(过滤误报) =====
# 将关键词匹配到的违规项交给 AI 复核上下文语义,去除误判
if violations:
violations = await _ai_context_verify(
x_tenant_id, content, violations, db,
)
# ===== Step 5: 卖点语义匹配 + 整体 Brief 匹配分析 =====
selling_points = _normalize_selling_points(request.selling_points)
selling_point_matches: list[SellingPointMatch] = []
brief_overall: dict = {}
ai_client = None
ai_available = False
text_model = "gpt-4o"
try:
ai_client = await AIServiceFactory.get_client(x_tenant_id, db)
if ai_client:
ai_available = True
config = await AIServiceFactory.get_config(x_tenant_id, db)
if config:
text_model = config.models.get("text", "gpt-4o")
except Exception:
pass
if selling_points:
selling_point_matches = await _ai_selling_point_analysis(
ai_client, content, selling_points, text_model
)
# AI 整体 Brief 匹配分析(亮点 + 问题点)
brief_overall = await _ai_brief_overall_analysis(
ai_client, content, selling_points, text_model
)
# ===== Step 6: 各维度独立评分 =====
def _calc_dimension_score(dim: str) -> tuple[int, int]:
dim_violations = [v for v in violations if v.dimension == dim]
score = 100
for v in dim_violations:
if v.severity == RiskLevel.HIGH:
score -= 25
elif v.severity == RiskLevel.MEDIUM:
score -= 15
else:
score -= 5
return max(0, score), len(dim_violations)
legal_score, legal_count = _calc_dimension_score("legal")
platform_score, platform_count = _calc_dimension_score("platform")
brand_safety_score, brand_safety_count = _calc_dimension_score("brand_safety")
# brief_match 评分:基于 min_selling_points 覆盖率 + AI 整体匹配度
checkable = [spm for spm in selling_point_matches if spm.priority in ("core", "recommended")]
matched_count = sum(1 for spm in checkable if spm.matched)
total_checkable = len(checkable)
# 代理商要求的最少体现条数(默认 = 全部 core 数量)
core_count = sum(1 for spm in checkable if spm.priority == "core")
min_required = request.min_selling_points if request.min_selling_points is not None else core_count
# 确保不超过可检查的总数
min_required = min(min_required, total_checkable) if total_checkable > 0 else 0
# 覆盖率得分matched / min_required满足要求 = 100 分)
if min_required > 0:
coverage_ratio = min(matched_count / min_required, 1.0)
coverage_score = round(coverage_ratio * 100)
elif total_checkable > 0:
# 没有要求但有卖点 → 按全量比例
coverage_score = round(matched_count / total_checkable * 100)
else:
coverage_score = 100 # 无卖点要求
# AI 整体匹配度得分
ai_overall_score = brief_overall.get("overall_score", coverage_score)
ai_overall_score = max(0, min(100, ai_overall_score))
# 综合 brief_match 得分 = 覆盖率 60% + 整体匹配度 40%
brief_match_score = round(coverage_score * 0.6 + ai_overall_score * 0.4)
brief_match_score = max(0, min(100, brief_match_score))
# 构建 BriefMatchDetail
highlights = brief_overall.get("highlights", [])[:4]
issues_list = brief_overall.get("issues", [])[:4]
# 生成评分说明
if min_required > 0:
explanation = f"要求至少体现 {min_required} 条卖点,实际匹配 {matched_count} 条(覆盖率 {coverage_score}%),整体匹配度 {ai_overall_score}%"
elif total_checkable > 0:
explanation = f"{total_checkable} 条卖点,匹配 {matched_count} 条(覆盖率 {coverage_score}%),整体匹配度 {ai_overall_score}%"
else:
explanation = f"整体匹配度 {ai_overall_score}%"
brief_match_detail = BriefMatchDetail(
total_points=total_checkable,
matched_points=matched_count,
required_points=min_required,
coverage_score=coverage_score,
overall_score=ai_overall_score,
highlights=highlights,
issues=issues_list,
explanation=explanation,
)
# 加权总分
total_score = round(
legal_score * 0.35
+ platform_score * 0.25
+ brand_safety_score * 0.25
+ brief_match_score * 0.15
)
total_score = max(0, min(100, total_score))
# ===== Step 7: 各维度 passed 判定 =====
has_high_legal = any(
v.dimension == "legal" and v.severity == RiskLevel.HIGH for v in violations
)
legal_passed = legal_score >= 60 and not has_high_legal
platform_passed = platform_score >= 60
brand_safety_passed = brand_safety_score >= 70
# brief_match passed: 覆盖率达标matched >= min_required
brief_match_passed = matched_count >= min_required if min_required > 0 else True
dimensions = ReviewDimensions(
legal=ReviewDimension(score=legal_score, passed=legal_passed, issue_count=legal_count),
platform=ReviewDimension(score=platform_score, passed=platform_passed, issue_count=platform_count),
brand_safety=ReviewDimension(score=brand_safety_score, passed=brand_safety_passed, issue_count=brand_safety_count),
brief_match=ReviewDimension(
score=brief_match_score, passed=brief_match_passed,
issue_count=sum(1 for spm in checkable if not spm.matched),
),
)
# 向后兼容 missing_points
missing_points: list[str] | None = None
if selling_point_matches:
core_missing = [spm.content for spm in selling_point_matches if spm.priority == "core" and not spm.matched]
missing_points = core_missing
# 生成摘要
parts = []
if not legal_passed:
parts.append(f"法规合规问题 {legal_count}")
if not platform_passed:
parts.append(f"平台规则问题 {platform_count}")
if not brand_safety_passed:
parts.append(f"品牌安全问题 {brand_safety_count}")
if not brief_match_passed:
unmatched = min_required - matched_count
parts.append(f"卖点覆盖不足(还差 {unmatched} 条)")
if not parts:
summary = "脚本内容合规,未发现问题"
else:
summary = "".join(parts)
# 软性风控评估
soft_warnings: list[SoftRiskWarning] = []
if request.soft_risk_context:
soft_warnings = evaluate_soft_risk(request.soft_risk_context)
if ai_warnings:
soft_warnings.extend(ai_warnings)
if missing_points:
soft_warnings.append(SoftRiskWarning(
code="missing_selling_points",
message=f"核心卖点未覆盖:{', '.join(missing_points)}",
action_required=SoftRiskAction.NOTE,
blocking=False,
))
return ScriptReviewResponse(
score=total_score,
summary=summary,
dimensions=dimensions,
selling_point_matches=selling_point_matches,
brief_match_detail=brief_match_detail,
violations=violations,
missing_points=missing_points,
soft_warnings=soft_warnings,
ai_available=ai_available,
)
async def _ai_context_verify(
tenant_id: str,
content: str,
violations: list[Violation],
db: AsyncSession,
) -> list[Violation]:
"""
AI 语境复核:将关键词匹配到的违规项交给 AI 判断上下文语义。
例如违禁词"小孩",如果脚本写"这不是小孩玩的",则属于否定语境,不构成违规。
AI 不可用时直接返回原列表(降级为纯关键词匹配)。
"""
if not violations:
return violations
try:
ai_client = await AIServiceFactory.get_client(tenant_id, db)
if not ai_client:
return violations
config = await AIServiceFactory.get_config(tenant_id, db)
if not config:
return violations
text_model = config.models.get("text", "gpt-4o")
# 构建违规项列表
items_text = []
for i, v in enumerate(violations):
# 提取违规词周围的上下文(前后各 40 字符)
ctx = ""
if v.position and v.position.start is not None:
ctx_start = max(0, v.position.start - 40)
ctx_end = min(len(content), v.position.end + 40)
ctx = content[ctx_start:ctx_end]
else:
# 没有位置信息,尝试找上下文
pos = content.find(v.content)
if pos != -1:
ctx_start = max(0, pos - 40)
ctx_end = min(len(content), pos + len(v.content) + 40)
ctx = content[ctx_start:ctx_end]
items_text.append(
f"{i}. 词语「{v.content}」| 维度: {v.dimension} | 上下文: ...{ctx}..."
)
prompt = f"""你是广告合规审核专家。以下脚本中通过关键词匹配检测到了一些疑似违规项。
请根据脚本的完整上下文语义,判断每一项是否真正构成违规。
完整脚本内容:
{content}
检测到的疑似违规项:
{chr(10).join(items_text)}
判断标准:
- 如果该词出现在否定语境中(如"不是XX""不含XX""避免XX"),通常不构成违规
- 如果该词用于客观描述、对比说明或免责声明中,需要根据具体语境判断
- 如果该词用于正面宣传、推荐、承诺等语境中,构成违规
- 仅当你非常确定不构成违规时才标记为 false
请以 JSON 数组返回,每项包含:
- index: 违规项编号(对应上面的编号)
- is_violation: true/false在上下文中是否真正构成违规
- reason: 简要说明判断理由20字以内
请只返回 JSON 数组,不要包含其他内容。"""
response = await ai_client.chat_completion(
messages=[{"role": "user", "content": prompt}],
model=text_model,
temperature=0.1,
max_tokens=1000,
)
import json as _json
response_content = response.content.strip()
if response_content.startswith("```"):
response_content = response_content.split("\n", 1)[1]
if response_content.endswith("```"):
response_content = response_content.rsplit("\n", 1)[0]
ai_results = _json.loads(response_content)
# 构建复核结果映射
verify_map: dict[int, dict] = {}
for item in ai_results:
idx = item.get("index")
if idx is not None:
verify_map[idx] = item
# 过滤误报
verified = []
import logging
_logger = logging.getLogger(__name__)
for i, v in enumerate(violations):
result = verify_map.get(i)
if result and not result.get("is_violation", True):
reason = result.get("reason", "")
_logger.info(f"AI 语境复核排除误报: 「{v.content}」— {reason}")
continue
verified.append(v)
return verified
except Exception as e:
import logging
logging.getLogger(__name__).warning(f"AI 语境复核失败,保留原始结果: {e}")
return violations
def _build_brand_rules_context(
competitors: list[dict],
tenant_forbidden_words: list[dict],
whitelist: list[str],
db_platform_rules: dict | None,
platform_content_requirements: list[str],
platform_other_rules: list[dict],
) -> str:
"""构建品牌方规则上下文文本,注入 AI prompt"""
sections = []
# 竞品列表
if competitors:
comp_lines = []
for c in competitors:
kws = ", ".join(c.get("keywords", []))
line = f" - {c['name']}"
if kws:
line += f"(关键词:{kws}"
comp_lines.append(line)
sections.append("【竞品品牌列表】脚本中不得出现以下竞品品牌名或关联词:\n" + "\n".join(comp_lines))
# 自定义违禁词
if tenant_forbidden_words:
words = [w["word"] for w in tenant_forbidden_words]
sections.append(f"【品牌方自定义违禁词】以下词语禁止使用:{', '.join(words)}")
# 白名单
if whitelist:
sections.append(f"【白名单】以下词语已获授权可以使用,不应标记为违规:{', '.join(whitelist)}")
# DB 平台规则中的内容要求和其他规则
if platform_content_requirements:
sections.append("【平台内容要求】\n" + "\n".join(f" - {r}" for r in platform_content_requirements))
if platform_other_rules:
other_lines = []
for r in platform_other_rules:
rule_name = r.get("rule", "")
rule_desc = r.get("description", "")
other_lines.append(f" - {rule_name}{rule_desc}")
sections.append("【平台其他规则】\n" + "\n".join(other_lines))
# DB 平台规则中的限制词
if db_platform_rules:
restricted = db_platform_rules.get("restricted_words", [])
if restricted:
rw_lines = []
for rw in restricted:
word = rw.get("word", "")
condition = rw.get("condition", "")
rw_lines.append(f" - 「{word}」— {condition}")
sections.append("【平台限制用语】以下词语有使用条件限制:\n" + "\n".join(rw_lines))
return "\n\n".join(sections) if sections else ""
async def _ai_deep_analysis(
tenant_id: str,
content: str,
db: AsyncSession,
image_data: list[str] | None = None,
platform: str = "douyin",
brand_rules_context: str = "",
) -> tuple[list[Violation], list[SoftRiskWarning]]:
"""
使用 AI 进行深度分析(支持纯文本和多模态图片审核)
Args:
tenant_id: 租户 ID
content: 脚本文本内容
db: 数据库会话
image_data: 可选的 base64 图片列表(从文档中提取)
platform: 投放平台
brand_rules_context: 品牌方配置的规则上下文
返回 (violations, soft_warnings)
AI 分析失败时返回空列表,降级到规则检测
"""
platform_labels = {
"douyin": "抖音", "xiaohongshu": "小红书", "bilibili": "B站",
"kuaishou": "快手", "weibo": "微博", "wechat": "微信",
}
platform_label = platform_labels.get(platform, platform)
# 获取平台特定规则(硬编码兜底)
platform_rule_details = _platform_rules.get(platform, {})
platform_rule_text = ""
if platform_rule_details:
rule_items = []
for rule in platform_rule_details.get("rules", []):
if rule.get("type") == "forbidden_word":
rule_items.append(f"- 平台违禁词:{', '.join(rule.get('words', []))}")
elif rule.get("type") == "duration":
if rule.get("min_seconds"):
rule_items.append(f"- 最短时长要求:{rule['min_seconds']}")
if rule_items:
platform_rule_text = f"\n\n{platform_label}平台基础规则:\n" + "\n".join(rule_items)
# 品牌方配置的规则上下文
brand_context_text = ""
if brand_rules_context:
brand_context_text = f"\n\n===== 品牌方审核规则配置 =====\n{brand_rules_context}\n============================="
try:
# 获取 AI 客户端
ai_client = await AIServiceFactory.get_client(tenant_id, db)
if not ai_client:
return [], []
# 获取模型配置
config = await AIServiceFactory.get_config(tenant_id, db)
if not config:
return [], []
text_model = config.models.get("text", "gpt-4o")
# 构建基础分析提示
base_prompt = f"""作为广告合规审核专家,请分析以下将在「{platform_label}」平台发布的广告脚本内容,检测潜在的合规风险:
脚本内容:
{content}
{platform_rule_text}{brand_context_text}
请结合上述所有规则配置,重点检查以下方面:
1. 是否存在隐性的虚假宣传(如暗示疗效但不直接说明)
2. 是否存在容易引起误解的表述
3. 是否存在夸大描述
4. 是否存在可能违反广告法的其他内容
5. 是否违反{platform_label}平台的内容规范和社区规则
6. 是否出现竞品品牌名称或关联词汇(如有竞品列表)
7. 是否符合平台内容要求(如有具体要求)"""
# 有图片时追加图片审核要点
if image_data:
base_prompt += """
5. 图片中是否出现竞品品牌 logo 或商标
6. 图片中是否存在违规画面(涉黄、暴力、敏感内容等)
7. 图片中是否存在虚假对比图或误导性图片
8. 图片中的文字是否包含违禁词或夸大宣传"""
base_prompt += """
请以 JSON 数组返回,每项包含:
- category: "violation"(硬性违规,明确违法/违规)或 "warning"(软性提醒,需人工判断)
- type: 违规类型 (forbidden_word/efficacy_claim/brand_safety/competitor_logo)
- content: 问题内容
- severity: 严重程度 (high/medium/low)
- suggestion: 修改建议
分类标准:
- violation: 违禁词、功效宣称、品牌安全、竞品露出等明确违规
- warning: 夸大描述、易误解表述、潜在风险
如果未发现问题,返回空数组 []
请只返回 JSON 数组,不要包含其他内容。"""
# 根据是否有图片选择纯文本或多模态分析
if image_data:
vision_model = config.models.get("vision", text_model)
image_urls = [f"data:image/png;base64,{b64}" for b64 in image_data]
response = await ai_client.vision_analysis(
image_urls=image_urls,
prompt=base_prompt,
model=vision_model,
temperature=0.3,
max_tokens=1500,
)
else:
response = await ai_client.chat_completion(
messages=[{"role": "user", "content": base_prompt}],
model=text_model,
temperature=0.3,
max_tokens=1000,
)
# 解析 AI 响应
import json
try:
# 清理响应内容(移除可能的 markdown 标记)
response_content = response.content.strip()
if response_content.startswith("```"):
response_content = response_content.split("\n", 1)[1]
if response_content.endswith("```"):
response_content = response_content.rsplit("\n", 1)[0]
ai_results = json.loads(response_content)
violations = []
warnings = []
for item in ai_results:
category = item.get("category", "violation") # 默认当硬性违规(安全兜底)
violation_type = item.get("type", "forbidden_word")
if violation_type == "forbidden_word":
vtype = ViolationType.FORBIDDEN_WORD
elif violation_type == "efficacy_claim":
vtype = ViolationType.EFFICACY_CLAIM
elif violation_type == "competitor_logo":
vtype = ViolationType.COMPETITOR_LOGO
else:
vtype = ViolationType.BRAND_SAFETY
severity = item.get("severity", "medium")
if severity == "high":
slevel = RiskLevel.HIGH
elif severity == "low":
slevel = RiskLevel.LOW
else:
slevel = RiskLevel.MEDIUM
if category == "warning":
# 软性提醒 → SoftRiskWarning
warnings.append(SoftRiskWarning(
code="ai_warning",
message=f"{item.get('content', '')}: {item.get('suggestion', '建议修改')}",
action_required=SoftRiskAction.NOTE,
blocking=False,
context={"type": violation_type, "severity": severity},
))
else:
# 硬性违规 → Violation
violations.append(Violation(
type=vtype,
content=item.get("content", ""),
severity=slevel,
suggestion=item.get("suggestion", "建议修改"),
))
return violations, warnings
except json.JSONDecodeError:
return [], []
except Exception:
return [], []