from datetime import datetime from typing import Literal from fastapi import APIRouter, Query, HTTPException from fastapi.responses import StreamingResponse, JSONResponse from io import BytesIO from app.services.export_service import generate_excel, generate_csv from app.core.logging import get_logger router = APIRouter() logger = get_logger(__name__) # 存储最近的查询结果 (简化实现, 生产环境应使用 Redis 等缓存) _cached_data: list = [] def set_export_data(data: list): """设置导出数据缓存.""" global _cached_data _cached_data = data def get_export_data() -> list: """获取导出数据缓存.""" return _cached_data @router.get("/export") async def export_data( format: Literal["xlsx", "csv"] = Query("xlsx", description="导出格式"), ): """ 导出查询结果. Args: format: 导出格式 (xlsx 或 csv) Returns: 文件下载响应 """ try: data = get_export_data() if not data: logger.warning("Export requested but no data available") return JSONResponse( status_code=400, content={"success": False, "error": "无数据可导出,请先执行查询"} ) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") logger.info(f"Exporting {len(data)} records as {format}") if format == "xlsx": content = generate_excel(data) media_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" filename = f"kol_data_{timestamp}.xlsx" else: content = generate_csv(data) media_type = "text/csv; charset=utf-8" filename = f"kol_data_{timestamp}.csv" logger.info(f"Export successful: {filename}") return StreamingResponse( BytesIO(content), media_type=media_type, headers={ "Content-Disposition": f'attachment; filename="{filename}"', }, ) except Exception as e: logger.error(f"Export error: {e}") return JSONResponse( status_code=500, content={"success": False, "error": "导出失败,请重试"} )