""" 脚本预审 API """ import re from typing import Optional from fastapi import APIRouter, Depends, Header from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.schemas.review import ( ScriptReviewRequest, ScriptReviewResponse, Violation, ViolationType, RiskLevel, Position, SoftRiskWarning, SoftRiskAction, ReviewDimension, ReviewDimensions, SellingPointMatch, BriefMatchDetail, ) from app.api.rules import ( get_whitelist_for_brand, get_other_brands_whitelist_terms, get_forbidden_words_for_tenant, get_active_platform_rules, get_competitors_for_brand, _platform_rules, ) from app.services.soft_risk import evaluate_soft_risk from app.services.ai_service import AIServiceFactory from app.services.document_parser import DocumentParser router = APIRouter(prefix="/scripts", tags=["scripts"]) # 内置违禁词库(广告极限词) ABSOLUTE_WORDS = ["最好", "第一", "最佳", "绝对", "100%"] # 功效词库(医疗/功效宣称) EFFICACY_WORDS = ["根治", "治愈", "治疗", "药效", "疗效", "特效"] # 广告语境关键词(用于判断是否为广告场景) AD_CONTEXT_KEYWORDS = ["产品", "购买", "销量", "品质", "推荐", "价格", "优惠", "促销"] def _is_ad_context(content: str, word: str) -> bool: """ 判断是否为广告语境 规则: - 如果内容中包含广告关键词,认为是广告语境 - 如果违禁词出现在明显的非广告句式中,不是广告语境 """ # 非广告语境模式 non_ad_patterns = [ r"他是第一[个名位]", # 他是第一个/名 r"[是为]第一[个名位]", # 是第一个 r"最开心|最高兴|最难忘", # 情感表达 r"第一[次个].*[到来抵达]", # 第一次到达 ] for pattern in non_ad_patterns: if re.search(pattern, content): return False # 检查是否包含广告关键词 return any(kw in content for kw in AD_CONTEXT_KEYWORDS) def _normalize_selling_points(raw_points: list[dict] | None) -> list[dict]: """ 标准化卖点列表,兼容旧 required:bool 格式 返回 [{content, priority}] """ if not raw_points: return [] result = [] for sp in raw_points: content = sp.get("content", "") if not content: continue # 兼容旧格式 if "priority" in sp: priority = sp["priority"] elif "required" in sp: priority = "core" if sp["required"] else "recommended" else: priority = "recommended" result.append({"content": content, "priority": priority}) return result async def _ai_selling_point_analysis( ai_client, content: str, selling_points: list[dict], model: str ) -> list[SellingPointMatch]: """ AI 语义匹配卖点覆盖 只检查 core 和 recommended,跳过 reference。 AI 不可用时回退:简单文本包含检测。 """ # 过滤出需要检查的卖点 points_to_check = [sp for sp in selling_points if sp["priority"] in ("core", "recommended")] reference_points = [sp for sp in selling_points if sp["priority"] == "reference"] # reference 卖点直接标记为匹配 results: list[SellingPointMatch] = [ SellingPointMatch(content=sp["content"], priority="reference", matched=True, evidence="参考信息,不检查") for sp in reference_points ] if not points_to_check: return results if not ai_client: # 回退:简单文本包含 for sp in points_to_check: matched = sp["content"] in content results.append(SellingPointMatch( content=sp["content"], priority=sp["priority"], matched=matched, evidence="文本匹配" if matched else "未检测到相关内容", )) return results try: points_text = "\n".join(f"- [{sp['priority']}] {sp['content']}" for sp in points_to_check) prompt = f"""作为广告合规审核专家,请判断以下脚本内容是否覆盖了每个卖点。 脚本内容: {content} 需要检查的卖点: {points_text} 请以 JSON 数组返回,每项包含: - content: 卖点原文 - matched: true/false(脚本中是否传达了该卖点的含义,语义匹配即可,不要求原文出现) - evidence: 匹配依据(如果匹配,指出脚本中对应的表述;如果不匹配,说明原因) 请只返回 JSON 数组,不要包含其他内容。""" response = await ai_client.chat_completion( messages=[{"role": "user", "content": prompt}], model=model, temperature=0.2, max_tokens=1000, ) import json response_content = response.content.strip() if response_content.startswith("```"): response_content = response_content.split("\n", 1)[1] if response_content.endswith("```"): response_content = response_content.rsplit("\n", 1)[0] ai_results = json.loads(response_content) # 构建结果映射 ai_map = {item.get("content", ""): item for item in ai_results} for sp in points_to_check: ai_item = ai_map.get(sp["content"], {}) results.append(SellingPointMatch( content=sp["content"], priority=sp["priority"], matched=ai_item.get("matched", False), evidence=ai_item.get("evidence", ""), )) except Exception: # AI 失败时回退 for sp in points_to_check: matched = sp["content"] in content results.append(SellingPointMatch( content=sp["content"], priority=sp["priority"], matched=matched, evidence="文本匹配(AI不可用)" if matched else "未检测到(AI不可用)", )) return results async def _ai_brief_overall_analysis( ai_client, content: str, selling_points: list[dict], model: str ) -> dict: """ AI 分析脚本与 Brief 的整体匹配度,输出亮点和问题点。 返回 {"overall_score": int, "highlights": [...], "issues": [...]} AI 不可用时返回空结果。 """ if not ai_client: return {} try: sp_text = "\n".join(f"- [{sp['priority']}] {sp['content']}" for sp in selling_points) if selling_points else "(无卖点要求)" prompt = f"""作为广告内容审核专家,请分析以下脚本与 Brief 要求的整体匹配程度。 脚本内容: {content} Brief 卖点要求: {sp_text} 请从以下角度综合分析,以 JSON 返回: {{ "overall_score": 0-100 的整数(整体匹配度评分), "highlights": ["亮点1", "亮点2"], "issues": ["问题1", "问题2"] }} 分析角度: - 卖点传达是否清晰自然(不要求死板对照,语义传达即可) - 内容氛围和场景是否贴合产品定位 - 表达语气和风格是否合适 - 内容结构和节奏是否流畅 - 是否有吸引力和说服力 要求: - highlights: 脚本做得好的方面,每条一句话,简明具体(如"开头用痛点切入,吸引力强") - issues: 可以改进的方面,每条一句话,简明具体(如"缺少产品使用演示环节") - 每项最多给 4 条,只写最重要的 - 如果整体不错,issues 可以为空数组 - overall_score: 综合考虑各角度的整体分数 请只返回 JSON,不要包含其他内容。""" response = await ai_client.chat_completion( messages=[{"role": "user", "content": prompt}], model=model, temperature=0.3, max_tokens=800, ) import json resp = response.content.strip() if resp.startswith("```"): resp = resp.split("\n", 1)[1] if resp.endswith("```"): resp = resp.rsplit("\n", 1)[0] return json.loads(resp) except Exception: return {} @router.post("/review", response_model=ScriptReviewResponse) async def review_script( request: ScriptReviewRequest, x_tenant_id: str = Header(..., alias="X-Tenant-ID"), db: AsyncSession = Depends(get_db), ) -> ScriptReviewResponse: """ 脚本预审(多维度评分) 四个独立维度: - legal: 法规合规(违禁词、功效词、Brief黑名单词) - platform: 平台规则 - brand_safety: 品牌安全(竞品、其他品牌词) - brief_match: Brief 匹配度(卖点覆盖) """ violations: list[Violation] = [] content = request.content image_data: list[str] | None = None # 如果提供了文件 URL,自动解析文本和提取图片 if request.file_url and request.file_name: try: file_text = await DocumentParser.download_and_parse( request.file_url, request.file_name ) if file_text: content = content + "\n\n" + file_text if content.strip() else file_text except Exception as e: import logging logging.getLogger(__name__).warning(f"文件文本解析失败: {e}") try: image_data = await DocumentParser.download_and_get_images( request.file_url, request.file_name ) except Exception as e: import logging logging.getLogger(__name__).warning(f"文件图片提取失败: {e}") # 获取品牌方配置的所有规则数据 whitelist = await get_whitelist_for_brand(x_tenant_id, request.brand_id, db) tenant_forbidden_words = await get_forbidden_words_for_tenant(x_tenant_id, db) competitors = await get_competitors_for_brand(x_tenant_id, request.brand_id, db) db_platform_rules = await get_active_platform_rules( x_tenant_id, request.brand_id, request.platform.value, db, ) # ===== Step 1: 法规合规检测 (legal) ===== # 1a. 内置违禁词(广告极限词) for word in ABSOLUTE_WORDS: if word in whitelist: continue start = 0 while True: pos = content.find(word, start) if pos == -1: break if not _is_ad_context(content, word): start = pos + 1 continue violations.append(Violation( type=ViolationType.FORBIDDEN_WORD, content=word, severity=RiskLevel.HIGH, dimension="legal", suggestion=f"建议删除或替换违禁词:{word}", position=Position(start=pos, end=pos + len(word)), )) start = pos + 1 # 1b. 功效词检测 for word in EFFICACY_WORDS: if word in whitelist: continue start = 0 while True: pos = content.find(word, start) if pos == -1: break violations.append(Violation( type=ViolationType.EFFICACY_CLAIM, content=word, severity=RiskLevel.HIGH, dimension="legal", suggestion=f"功效宣称词违反广告法,建议删除:{word}", position=Position(start=pos, end=pos + len(word)), )) start = pos + 1 # 1c. Brief 黑名单词 if request.blacklist_words: for item in request.blacklist_words: word = item.get("word", "") reason = item.get("reason", "") if not word or word in whitelist: continue start_pos = 0 while True: pos = content.find(word, start_pos) if pos == -1: break suggestion = f"Brief 黑名单词:{word}" if reason: suggestion += f"({reason})" violations.append(Violation( type=ViolationType.FORBIDDEN_WORD, content=word, severity=RiskLevel.HIGH, dimension="legal", suggestion=suggestion, position=Position(start=pos, end=pos + len(word)), )) start_pos = pos + 1 # 1d. 租户自定义违禁词 → legal 维度 for fw in tenant_forbidden_words: word = fw["word"] if word in whitelist or word in ABSOLUTE_WORDS: continue start = 0 while True: pos = content.find(word, start) if pos == -1: break if not _is_ad_context(content, word): start = pos + 1 continue violations.append(Violation( type=ViolationType.FORBIDDEN_WORD, content=word, severity=RiskLevel.HIGH, dimension="legal", suggestion=f"建议删除或替换违禁词:{word}", position=Position(start=pos, end=pos + len(word)), )) start = pos + 1 # ===== Step 2: 平台规则检测 (platform) ===== already_checked = set(ABSOLUTE_WORDS + [w["word"] for w in tenant_forbidden_words]) platform_forbidden_words: list[str] = [] platform_restricted_words: list[dict] = [] platform_content_requirements: list[str] = [] platform_other_rules: list[dict] = [] # 优先使用品牌方上传的 DB 平台规则,否则用硬编码兜底 if db_platform_rules: platform_forbidden_words = db_platform_rules.get("forbidden_words", []) platform_restricted_words = db_platform_rules.get("restricted_words", []) platform_content_requirements = db_platform_rules.get("content_requirements", []) platform_other_rules = db_platform_rules.get("other_rules", []) else: platform_rule = _platform_rules.get(request.platform.value) if platform_rule: for rule in platform_rule.get("rules", []): if rule.get("type") == "forbidden_word": platform_forbidden_words.extend(rule.get("words", [])) # 2a. 平台违禁词检测 for word in platform_forbidden_words: if word in already_checked or word in whitelist: continue start = 0 while True: pos = content.find(word, start) if pos == -1: break if not _is_ad_context(content, word): start = pos + 1 continue violations.append(Violation( type=ViolationType.FORBIDDEN_WORD, content=word, severity=RiskLevel.MEDIUM, dimension="platform", suggestion=f"违反{request.platform.value}平台规则,建议删除:{word}", position=Position(start=pos, end=pos + len(word)), )) start = pos + 1 # 2b. 平台限制词检测(有条件限制的词语) for rw in platform_restricted_words: word = rw.get("word", "") if not word or word in whitelist: continue if word in content: suggestion = rw.get("suggestion", f"「{word}」为平台限制用语") condition = rw.get("condition", "") if condition: suggestion = f"「{word}」限制条件:{condition}。{suggestion}" violations.append(Violation( type=ViolationType.FORBIDDEN_WORD, content=word, severity=RiskLevel.LOW, dimension="platform", suggestion=suggestion, position=Position(start=content.find(word), end=content.find(word) + len(word)), )) # ===== Step 3: 品牌安全检测 (brand_safety) ===== # 3a. 其他品牌专属词 other_brand_terms = await get_other_brands_whitelist_terms(x_tenant_id, request.brand_id, db) for term, owner_brand in other_brand_terms: if term in content: violations.append(Violation( type=ViolationType.BRAND_SAFETY, content=term, severity=RiskLevel.MEDIUM, dimension="brand_safety", suggestion=f"使用了其他品牌的专属词汇:{term}", position=Position(start=content.find(term), end=content.find(term) + len(term)), )) # 3b. 竞品名称和关键词检测 for comp in competitors: comp_name = comp["name"] if comp_name in whitelist: continue if comp_name in content: violations.append(Violation( type=ViolationType.BRAND_SAFETY, content=comp_name, severity=RiskLevel.HIGH, dimension="brand_safety", suggestion=f"脚本中出现竞品品牌名「{comp_name}」,请删除或替换", position=Position(start=content.find(comp_name), end=content.find(comp_name) + len(comp_name)), )) for kw in comp.get("keywords", []): if not kw or kw in whitelist: continue if kw in content: violations.append(Violation( type=ViolationType.BRAND_SAFETY, content=kw, severity=RiskLevel.MEDIUM, dimension="brand_safety", suggestion=f"脚本中出现竞品「{comp_name}」的关联词「{kw}」,请确认是否需要删除", position=Position(start=content.find(kw), end=content.find(kw) + len(kw)), )) # ===== Step 4: AI 深度分析 ===== # 构建品牌方规则上下文传给 AI brand_rules_context = _build_brand_rules_context( competitors=competitors, tenant_forbidden_words=tenant_forbidden_words, whitelist=whitelist, db_platform_rules=db_platform_rules, platform_content_requirements=platform_content_requirements, platform_other_rules=platform_other_rules, ) ai_violations, ai_warnings = await _ai_deep_analysis( x_tenant_id, content, db, image_data=image_data, platform=request.platform.value, brand_rules_context=brand_rules_context, ) if ai_violations: for v in ai_violations: # 根据类型分配维度 if v.type in (ViolationType.FORBIDDEN_WORD, ViolationType.EFFICACY_CLAIM): v.dimension = "legal" elif v.type == ViolationType.COMPETITOR_LOGO: v.dimension = "brand_safety" else: v.dimension = "brand_safety" violations.append(v) # ===== Step 4b: AI 语境复核(过滤误报) ===== # 将关键词匹配到的违规项交给 AI 复核上下文语义,去除误判 if violations: violations = await _ai_context_verify( x_tenant_id, content, violations, db, ) # ===== Step 5: 卖点语义匹配 + 整体 Brief 匹配分析 ===== selling_points = _normalize_selling_points(request.selling_points) selling_point_matches: list[SellingPointMatch] = [] brief_overall: dict = {} ai_client = None ai_available = False text_model = "gpt-4o" try: ai_client = await AIServiceFactory.get_client(x_tenant_id, db) if ai_client: ai_available = True config = await AIServiceFactory.get_config(x_tenant_id, db) if config: text_model = config.models.get("text", "gpt-4o") except Exception: pass if selling_points: selling_point_matches = await _ai_selling_point_analysis( ai_client, content, selling_points, text_model ) # AI 整体 Brief 匹配分析(亮点 + 问题点) brief_overall = await _ai_brief_overall_analysis( ai_client, content, selling_points, text_model ) # ===== Step 6: 各维度独立评分 ===== def _calc_dimension_score(dim: str) -> tuple[int, int]: dim_violations = [v for v in violations if v.dimension == dim] score = 100 for v in dim_violations: if v.severity == RiskLevel.HIGH: score -= 25 elif v.severity == RiskLevel.MEDIUM: score -= 15 else: score -= 5 return max(0, score), len(dim_violations) legal_score, legal_count = _calc_dimension_score("legal") platform_score, platform_count = _calc_dimension_score("platform") brand_safety_score, brand_safety_count = _calc_dimension_score("brand_safety") # brief_match 评分:基于 min_selling_points 覆盖率 + AI 整体匹配度 checkable = [spm for spm in selling_point_matches if spm.priority in ("core", "recommended")] matched_count = sum(1 for spm in checkable if spm.matched) total_checkable = len(checkable) # 代理商要求的最少体现条数(默认 = 全部 core 数量) core_count = sum(1 for spm in checkable if spm.priority == "core") min_required = request.min_selling_points if request.min_selling_points is not None else core_count # 确保不超过可检查的总数 min_required = min(min_required, total_checkable) if total_checkable > 0 else 0 # 覆盖率得分:matched / min_required(满足要求 = 100 分) if min_required > 0: coverage_ratio = min(matched_count / min_required, 1.0) coverage_score = round(coverage_ratio * 100) elif total_checkable > 0: # 没有要求但有卖点 → 按全量比例 coverage_score = round(matched_count / total_checkable * 100) else: coverage_score = 100 # 无卖点要求 # AI 整体匹配度得分 ai_overall_score = brief_overall.get("overall_score", coverage_score) ai_overall_score = max(0, min(100, ai_overall_score)) # 综合 brief_match 得分 = 覆盖率 60% + 整体匹配度 40% brief_match_score = round(coverage_score * 0.6 + ai_overall_score * 0.4) brief_match_score = max(0, min(100, brief_match_score)) # 构建 BriefMatchDetail highlights = brief_overall.get("highlights", [])[:4] issues_list = brief_overall.get("issues", [])[:4] # 生成评分说明 if min_required > 0: explanation = f"要求至少体现 {min_required} 条卖点,实际匹配 {matched_count} 条(覆盖率 {coverage_score}%),整体匹配度 {ai_overall_score}%" elif total_checkable > 0: explanation = f"共 {total_checkable} 条卖点,匹配 {matched_count} 条(覆盖率 {coverage_score}%),整体匹配度 {ai_overall_score}%" else: explanation = f"整体匹配度 {ai_overall_score}%" brief_match_detail = BriefMatchDetail( total_points=total_checkable, matched_points=matched_count, required_points=min_required, coverage_score=coverage_score, overall_score=ai_overall_score, highlights=highlights, issues=issues_list, explanation=explanation, ) # 加权总分 total_score = round( legal_score * 0.35 + platform_score * 0.25 + brand_safety_score * 0.25 + brief_match_score * 0.15 ) total_score = max(0, min(100, total_score)) # ===== Step 7: 各维度 passed 判定 ===== has_high_legal = any( v.dimension == "legal" and v.severity == RiskLevel.HIGH for v in violations ) legal_passed = legal_score >= 60 and not has_high_legal platform_passed = platform_score >= 60 brand_safety_passed = brand_safety_score >= 70 # brief_match passed: 覆盖率达标(matched >= min_required) brief_match_passed = matched_count >= min_required if min_required > 0 else True dimensions = ReviewDimensions( legal=ReviewDimension(score=legal_score, passed=legal_passed, issue_count=legal_count), platform=ReviewDimension(score=platform_score, passed=platform_passed, issue_count=platform_count), brand_safety=ReviewDimension(score=brand_safety_score, passed=brand_safety_passed, issue_count=brand_safety_count), brief_match=ReviewDimension( score=brief_match_score, passed=brief_match_passed, issue_count=sum(1 for spm in checkable if not spm.matched), ), ) # 向后兼容 missing_points missing_points: list[str] | None = None if selling_point_matches: core_missing = [spm.content for spm in selling_point_matches if spm.priority == "core" and not spm.matched] missing_points = core_missing # 生成摘要 parts = [] if not legal_passed: parts.append(f"法规合规问题 {legal_count} 处") if not platform_passed: parts.append(f"平台规则问题 {platform_count} 处") if not brand_safety_passed: parts.append(f"品牌安全问题 {brand_safety_count} 处") if not brief_match_passed: unmatched = min_required - matched_count parts.append(f"卖点覆盖不足(还差 {unmatched} 条)") if not parts: summary = "脚本内容合规,未发现问题" else: summary = ",".join(parts) # 软性风控评估 soft_warnings: list[SoftRiskWarning] = [] if request.soft_risk_context: soft_warnings = evaluate_soft_risk(request.soft_risk_context) if ai_warnings: soft_warnings.extend(ai_warnings) if missing_points: soft_warnings.append(SoftRiskWarning( code="missing_selling_points", message=f"核心卖点未覆盖:{', '.join(missing_points)}", action_required=SoftRiskAction.NOTE, blocking=False, )) return ScriptReviewResponse( score=total_score, summary=summary, dimensions=dimensions, selling_point_matches=selling_point_matches, brief_match_detail=brief_match_detail, violations=violations, missing_points=missing_points, soft_warnings=soft_warnings, ai_available=ai_available, ) async def _ai_context_verify( tenant_id: str, content: str, violations: list[Violation], db: AsyncSession, ) -> list[Violation]: """ AI 语境复核:将关键词匹配到的违规项交给 AI 判断上下文语义。 例如违禁词"小孩",如果脚本写"这不是小孩玩的",则属于否定语境,不构成违规。 AI 不可用时直接返回原列表(降级为纯关键词匹配)。 """ if not violations: return violations try: ai_client = await AIServiceFactory.get_client(tenant_id, db) if not ai_client: return violations config = await AIServiceFactory.get_config(tenant_id, db) if not config: return violations text_model = config.models.get("text", "gpt-4o") # 构建违规项列表 items_text = [] for i, v in enumerate(violations): # 提取违规词周围的上下文(前后各 40 字符) ctx = "" if v.position and v.position.start is not None: ctx_start = max(0, v.position.start - 40) ctx_end = min(len(content), v.position.end + 40) ctx = content[ctx_start:ctx_end] else: # 没有位置信息,尝试找上下文 pos = content.find(v.content) if pos != -1: ctx_start = max(0, pos - 40) ctx_end = min(len(content), pos + len(v.content) + 40) ctx = content[ctx_start:ctx_end] items_text.append( f"{i}. 词语「{v.content}」| 维度: {v.dimension} | 上下文: ...{ctx}..." ) prompt = f"""你是广告合规审核专家。以下脚本中通过关键词匹配检测到了一些疑似违规项。 请根据脚本的完整上下文语义,判断每一项是否真正构成违规。 完整脚本内容: {content} 检测到的疑似违规项: {chr(10).join(items_text)} 判断标准: - 如果该词出现在否定语境中(如"不是XX"、"不含XX"、"避免XX"),通常不构成违规 - 如果该词用于客观描述、对比说明或免责声明中,需要根据具体语境判断 - 如果该词用于正面宣传、推荐、承诺等语境中,构成违规 - 仅当你非常确定不构成违规时才标记为 false 请以 JSON 数组返回,每项包含: - index: 违规项编号(对应上面的编号) - is_violation: true/false(在上下文中是否真正构成违规) - reason: 简要说明判断理由(20字以内) 请只返回 JSON 数组,不要包含其他内容。""" response = await ai_client.chat_completion( messages=[{"role": "user", "content": prompt}], model=text_model, temperature=0.1, max_tokens=1000, ) import json as _json response_content = response.content.strip() if response_content.startswith("```"): response_content = response_content.split("\n", 1)[1] if response_content.endswith("```"): response_content = response_content.rsplit("\n", 1)[0] ai_results = _json.loads(response_content) # 构建复核结果映射 verify_map: dict[int, dict] = {} for item in ai_results: idx = item.get("index") if idx is not None: verify_map[idx] = item # 过滤误报 verified = [] import logging _logger = logging.getLogger(__name__) for i, v in enumerate(violations): result = verify_map.get(i) if result and not result.get("is_violation", True): reason = result.get("reason", "") _logger.info(f"AI 语境复核排除误报: 「{v.content}」— {reason}") continue verified.append(v) return verified except Exception as e: import logging logging.getLogger(__name__).warning(f"AI 语境复核失败,保留原始结果: {e}") return violations def _build_brand_rules_context( competitors: list[dict], tenant_forbidden_words: list[dict], whitelist: list[str], db_platform_rules: dict | None, platform_content_requirements: list[str], platform_other_rules: list[dict], ) -> str: """构建品牌方规则上下文文本,注入 AI prompt""" sections = [] # 竞品列表 if competitors: comp_lines = [] for c in competitors: kws = ", ".join(c.get("keywords", [])) line = f" - {c['name']}" if kws: line += f"(关键词:{kws})" comp_lines.append(line) sections.append("【竞品品牌列表】脚本中不得出现以下竞品品牌名或关联词:\n" + "\n".join(comp_lines)) # 自定义违禁词 if tenant_forbidden_words: words = [w["word"] for w in tenant_forbidden_words] sections.append(f"【品牌方自定义违禁词】以下词语禁止使用:{', '.join(words)}") # 白名单 if whitelist: sections.append(f"【白名单】以下词语已获授权可以使用,不应标记为违规:{', '.join(whitelist)}") # DB 平台规则中的内容要求和其他规则 if platform_content_requirements: sections.append("【平台内容要求】\n" + "\n".join(f" - {r}" for r in platform_content_requirements)) if platform_other_rules: other_lines = [] for r in platform_other_rules: rule_name = r.get("rule", "") rule_desc = r.get("description", "") other_lines.append(f" - {rule_name}:{rule_desc}") sections.append("【平台其他规则】\n" + "\n".join(other_lines)) # DB 平台规则中的限制词 if db_platform_rules: restricted = db_platform_rules.get("restricted_words", []) if restricted: rw_lines = [] for rw in restricted: word = rw.get("word", "") condition = rw.get("condition", "") rw_lines.append(f" - 「{word}」— {condition}") sections.append("【平台限制用语】以下词语有使用条件限制:\n" + "\n".join(rw_lines)) return "\n\n".join(sections) if sections else "" async def _ai_deep_analysis( tenant_id: str, content: str, db: AsyncSession, image_data: list[str] | None = None, platform: str = "douyin", brand_rules_context: str = "", ) -> tuple[list[Violation], list[SoftRiskWarning]]: """ 使用 AI 进行深度分析(支持纯文本和多模态图片审核) Args: tenant_id: 租户 ID content: 脚本文本内容 db: 数据库会话 image_data: 可选的 base64 图片列表(从文档中提取) platform: 投放平台 brand_rules_context: 品牌方配置的规则上下文 返回 (violations, soft_warnings) AI 分析失败时返回空列表,降级到规则检测 """ platform_labels = { "douyin": "抖音", "xiaohongshu": "小红书", "bilibili": "B站", "kuaishou": "快手", "weibo": "微博", "wechat": "微信", } platform_label = platform_labels.get(platform, platform) # 获取平台特定规则(硬编码兜底) platform_rule_details = _platform_rules.get(platform, {}) platform_rule_text = "" if platform_rule_details: rule_items = [] for rule in platform_rule_details.get("rules", []): if rule.get("type") == "forbidden_word": rule_items.append(f"- 平台违禁词:{', '.join(rule.get('words', []))}") elif rule.get("type") == "duration": if rule.get("min_seconds"): rule_items.append(f"- 最短时长要求:{rule['min_seconds']}秒") if rule_items: platform_rule_text = f"\n\n{platform_label}平台基础规则:\n" + "\n".join(rule_items) # 品牌方配置的规则上下文 brand_context_text = "" if brand_rules_context: brand_context_text = f"\n\n===== 品牌方审核规则配置 =====\n{brand_rules_context}\n=============================" try: # 获取 AI 客户端 ai_client = await AIServiceFactory.get_client(tenant_id, db) if not ai_client: return [], [] # 获取模型配置 config = await AIServiceFactory.get_config(tenant_id, db) if not config: return [], [] text_model = config.models.get("text", "gpt-4o") # 构建基础分析提示 base_prompt = f"""作为广告合规审核专家,请分析以下将在「{platform_label}」平台发布的广告脚本内容,检测潜在的合规风险: 脚本内容: {content} {platform_rule_text}{brand_context_text} 请结合上述所有规则配置,重点检查以下方面: 1. 是否存在隐性的虚假宣传(如暗示疗效但不直接说明) 2. 是否存在容易引起误解的表述 3. 是否存在夸大描述 4. 是否存在可能违反广告法的其他内容 5. 是否违反{platform_label}平台的内容规范和社区规则 6. 是否出现竞品品牌名称或关联词汇(如有竞品列表) 7. 是否符合平台内容要求(如有具体要求)""" # 有图片时追加图片审核要点 if image_data: base_prompt += """ 5. 图片中是否出现竞品品牌 logo 或商标 6. 图片中是否存在违规画面(涉黄、暴力、敏感内容等) 7. 图片中是否存在虚假对比图或误导性图片 8. 图片中的文字是否包含违禁词或夸大宣传""" base_prompt += """ 请以 JSON 数组返回,每项包含: - category: "violation"(硬性违规,明确违法/违规)或 "warning"(软性提醒,需人工判断) - type: 违规类型 (forbidden_word/efficacy_claim/brand_safety/competitor_logo) - content: 问题内容 - severity: 严重程度 (high/medium/low) - suggestion: 修改建议 分类标准: - violation: 违禁词、功效宣称、品牌安全、竞品露出等明确违规 - warning: 夸大描述、易误解表述、潜在风险 如果未发现问题,返回空数组 [] 请只返回 JSON 数组,不要包含其他内容。""" # 根据是否有图片选择纯文本或多模态分析 if image_data: vision_model = config.models.get("vision", text_model) image_urls = [f"data:image/png;base64,{b64}" for b64 in image_data] response = await ai_client.vision_analysis( image_urls=image_urls, prompt=base_prompt, model=vision_model, temperature=0.3, max_tokens=1500, ) else: response = await ai_client.chat_completion( messages=[{"role": "user", "content": base_prompt}], model=text_model, temperature=0.3, max_tokens=1000, ) # 解析 AI 响应 import json try: # 清理响应内容(移除可能的 markdown 标记) response_content = response.content.strip() if response_content.startswith("```"): response_content = response_content.split("\n", 1)[1] if response_content.endswith("```"): response_content = response_content.rsplit("\n", 1)[0] ai_results = json.loads(response_content) violations = [] warnings = [] for item in ai_results: category = item.get("category", "violation") # 默认当硬性违规(安全兜底) violation_type = item.get("type", "forbidden_word") if violation_type == "forbidden_word": vtype = ViolationType.FORBIDDEN_WORD elif violation_type == "efficacy_claim": vtype = ViolationType.EFFICACY_CLAIM elif violation_type == "competitor_logo": vtype = ViolationType.COMPETITOR_LOGO else: vtype = ViolationType.BRAND_SAFETY severity = item.get("severity", "medium") if severity == "high": slevel = RiskLevel.HIGH elif severity == "low": slevel = RiskLevel.LOW else: slevel = RiskLevel.MEDIUM if category == "warning": # 软性提醒 → SoftRiskWarning warnings.append(SoftRiskWarning( code="ai_warning", message=f"{item.get('content', '')}: {item.get('suggestion', '建议修改')}", action_required=SoftRiskAction.NOTE, blocking=False, context={"type": violation_type, "severity": severity}, )) else: # 硬性违规 → Violation violations.append(Violation( type=vtype, content=item.get("content", ""), severity=slevel, suggestion=item.get("suggestion", "建议修改"), )) return violations, warnings except json.JSONDecodeError: return [], [] except Exception: return [], []