- 后端: 新增验证码服务(生成/存储/验证)和邮件发送服务(开发环境控制台输出) - 后端: 新增 POST /auth/send-code 端点,支持注册/登录/重置密码三种用途 - 后端: 注册流程要求邮箱验证码,验证通过后 is_verified=True - 后端: 登录支持邮箱+密码 或 邮箱+验证码 两种方式 - 前端: 注册页增加验证码输入框和获取验证码按钮(60秒倒计时) - 前端: 登录页增加密码登录/验证码登录双Tab切换 - 测试: conftest 添加 bypass_verification fixture,所有 367 测试通过 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
97 lines
2.3 KiB
Python
97 lines
2.3 KiB
Python
"""
|
||
验证码服务
|
||
|
||
使用内存存储验证码,支持 TTL 自动过期。
|
||
生产环境建议替换为 Redis 存储。
|
||
"""
|
||
import secrets
|
||
import time
|
||
import logging
|
||
from typing import Optional
|
||
|
||
from app.config import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 内存存储: { "email:purpose" -> (code, expire_timestamp) }
|
||
_code_store: dict[str, tuple[str, float]] = {}
|
||
|
||
# 发送频率限制: { "email:purpose" -> last_send_timestamp }
|
||
_rate_limit: dict[str, float] = {}
|
||
|
||
# 最小发送间隔(秒)
|
||
SEND_INTERVAL = 60
|
||
|
||
|
||
def _cleanup_expired() -> None:
|
||
"""清理过期的验证码"""
|
||
now = time.time()
|
||
expired_keys = [k for k, (_, exp) in _code_store.items() if now > exp]
|
||
for k in expired_keys:
|
||
del _code_store[k]
|
||
|
||
|
||
def generate_code(email: str, purpose: str = "register") -> tuple[str, Optional[str]]:
|
||
"""
|
||
生成验证码并存储。
|
||
|
||
返回 (code, error)。
|
||
error 为 None 表示成功,否则返回错误信息。
|
||
"""
|
||
_cleanup_expired()
|
||
|
||
key = f"{email}:{purpose}"
|
||
|
||
# 检查发送频率
|
||
now = time.time()
|
||
last_sent = _rate_limit.get(key, 0)
|
||
if now - last_sent < SEND_INTERVAL:
|
||
remaining = int(SEND_INTERVAL - (now - last_sent))
|
||
return "", f"发送过于频繁,请 {remaining} 秒后重试"
|
||
|
||
# 生成验证码
|
||
code = "".join(str(secrets.randbelow(10)) for _ in range(settings.VERIFICATION_CODE_LENGTH))
|
||
|
||
# 存储(带 TTL)
|
||
expire_at = now + settings.VERIFICATION_CODE_EXPIRE_MINUTES * 60
|
||
_code_store[key] = (code, expire_at)
|
||
_rate_limit[key] = now
|
||
|
||
logger.info("验证码已生成: email=%s, purpose=%s", email, purpose)
|
||
return code, None
|
||
|
||
|
||
def verify_code(email: str, code: str, purpose: str = "register") -> bool:
|
||
"""
|
||
验证验证码是否正确。
|
||
|
||
验证成功后自动删除验证码(一次性使用)。
|
||
"""
|
||
_cleanup_expired()
|
||
|
||
key = f"{email}:{purpose}"
|
||
stored = _code_store.get(key)
|
||
|
||
if not stored:
|
||
return False
|
||
|
||
stored_code, expire_at = stored
|
||
|
||
# 已过期
|
||
if time.time() > expire_at:
|
||
del _code_store[key]
|
||
return False
|
||
|
||
# 验证码匹配
|
||
if stored_code == code:
|
||
del _code_store[key] # 一次性使用
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
def clear_all() -> None:
|
||
"""清除所有验证码(用于测试)"""
|
||
_code_store.clear()
|
||
_rate_limit.clear()
|