feat: 更新配置和增强日志记录功能

- 修改config.py中的文件路径和定时发送时间
- 在各个脚本中添加日志记录功能,提升错误追踪和调试能力
- 更新README.md,详细说明程序功能和使用方法
- 重构scheduler.py、sendmsg.py、send_openmsg.py和send_filemsg.py,增强代码可读性和可维护性
This commit is contained in:
teddy 2025-08-21 15:22:55 +08:00
parent aaf5f59b59
commit f96d1eb08d
10 changed files with 867 additions and 124 deletions

103
README.md
View File

@ -0,0 +1,103 @@
# WXMSG - 微信消息自动发送程序
## 功能概述
这是一个自动发送每日AI新闻图片和消息到微信的程序具有完整的日志记录和错误处理功能。
## 主要功能
- **定时发送**: 每天在指定时间自动发送图片和消息
- **容错机制**: 微信失败时自动发送飞书提醒
- **详细日志**: 完整的操作日志记录,便于问题排查
- **进程管理**: 支持后台运行和进程控制
## 日志功能
### 日志文件位置
所有日志文件保存在 `wxauto_logs/` 目录下,按日期和模块分类:
- `sendmsg_YYYYMMDD.log` - 主发送模块日志
- `scheduler_YYYYMMDD.log` - 定时任务调度器日志
- `send_openmsg_YYYYMMDD.log` - 飞书提醒消息日志
- `send_filemsg_YYYYMMDD.log` - 文件未找到提醒日志
- `service_runner_YYYYMMDD.log` - 后台服务启动日志
- `stop_scheduler_YYYYMMDD.log` - 进程停止日志
### 日志级别
- **INFO**: 正常操作信息
- **WARNING**: 警告信息
- **ERROR**: 错误信息
- **DEBUG**: 调试信息
### 日志内容
每个日志条目包含:
- 时间戳
- 模块名称
- 日志级别
- 函数名和行号
- 详细消息
## 使用方法
### 1. 直接运行
```bash
python sendmsg.py
```
### 2. 启动定时任务
```bash
python scheduler.py
```
### 3. 后台运行
```bash
python service_runner.py
```
### 4. 停止服务
```bash
python stop_scheduler.py
```
## 配置说明
`config.py` 中配置:
- 发送时间
- 文件路径
- 消息内容
- 飞书API配置
## 故障排查
### 查看日志
1. 检查 `wxauto_logs/` 目录下的最新日志文件
2. 根据时间戳找到相关操作的日志
3. 查看ERROR级别的日志了解错误详情
### 常见问题
1. **微信客户端问题**: 查看 `sendmsg_*.log` 中的微信初始化日志
2. **文件未找到**: 检查文件路径配置和文件是否存在
3. **飞书API问题**: 查看 `send_openmsg_*.log``send_filemsg_*.log`
4. **进程管理问题**: 查看 `service_runner_*.log``stop_scheduler_*.log`
### 日志示例
```
2024-01-15 11:40:00 - sendmsg - INFO - send_daily_message:25 - 开始执行每日消息发送任务,当前尝试次数: 0
2024-01-15 11:40:00 - sendmsg - INFO - send_daily_message:30 - 目标日期: 2024-01-15
2024-01-15 11:40:00 - sendmsg - INFO - send_daily_message:31 - 目标文件: 2024-01-15.jpg
2024-01-15 11:40:00 - sendmsg - INFO - send_daily_message:32 - 完整文件路径: Z:\2024-01-15.jpg
```
## 依赖要求
- Python 3.13+
- wxauto
- lark-oapi
- schedule
- psutil (可选,用于进程管理)
## 注意事项
1. 确保微信客户端已登录
2. 检查文件路径配置是否正确
3. 验证飞书API配置是否有效
4. 定期检查日志文件大小,避免占用过多磁盘空间

View File

@ -2,9 +2,9 @@ CONFIG = {
"app_id": "cli_a8d64a7be63a500e",
"app_secret": "mcK8aTiq0CLtkzGs2aTZpcnom5J4o6yB",
"open_id": ["ou_c6466a45623096cf7a34d94fe30c6c73", "ou_3b94d0caf83dbced8b0e26af4852a281"],
"file_path": "F:\\",
"file_path": "Z:\\",
"messages_reciever": "文件传输助手",
"checking_time": "15:24",
"sending_time": "15:26",
"checking_time": "15:19",
"sending_time": "15:20",
"message": "新的一天从华智长盈每日AI新闻开始让我们一起看看今天AI圈有啥新鲜事"
}

50
logger_config.py Normal file
View File

@ -0,0 +1,50 @@
import logging
import os
from datetime import datetime
from pathlib import Path
def setup_logger(name, log_file=None, level=logging.INFO):
"""设置日志记录器"""
# 创建logs目录
logs_dir = Path("wxauto_logs")
logs_dir.mkdir(exist_ok=True)
# 如果没有指定日志文件,使用默认命名
if log_file is None:
timestamp = datetime.now().strftime("%Y%m%d")
log_file = logs_dir / f"{name}_{timestamp}.log"
# 创建logger
logger = logging.getLogger(name)
logger.setLevel(level)
# 避免重复添加handler
if logger.handlers:
return logger
# 创建文件处理器
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(level)
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
# 创建格式器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 添加处理器到logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
def get_logger(name):
"""获取已配置的logger"""
return logging.getLogger(name)

View File

@ -5,35 +5,94 @@ import sys
import os
from datetime import datetime
from config import CONFIG
from logger_config import setup_logger
# 设置日志记录器
logger = setup_logger('scheduler')
def run_sendmsg():
"""执行sendmsg.py脚本"""
script_path = os.path.join(os.path.dirname(__file__), 'sendmsg.py')
logger.info("=" * 50)
logger.info("开始执行每日消息发送任务...")
logger.info(f"脚本路径: {script_path}")
logger.info(f"当前工作目录: {os.getcwd()}")
try:
print(f"[{datetime.now()}] 开始执行每日消息发送任务...")
logger.info("正在启动sendmsg.py子进程...")
result = subprocess.run([sys.executable, script_path],
capture_output=True, text=True, cwd=os.path.dirname(__file__))
logger.info(f"子进程执行完成,返回码: {result.returncode}")
if result.returncode == 0:
print(f"[{datetime.now()}] 任务执行成功")
logger.info("任务执行成功")
if result.stdout:
logger.info(f"标准输出: {result.stdout}")
else:
print(f"[{datetime.now()}] 任务执行失败: {result.stderr}")
logger.error("任务执行失败")
if result.stderr:
logger.error(f"错误输出: {result.stderr}")
if result.stdout:
logger.info(f"标准输出: {result.stdout}")
except Exception as e:
print(f"[{datetime.now()}] 执行出错: {str(e)}")
logger.error(f"执行出错: {str(e)}")
logger.error(f"错误类型: {type(e).__name__}")
import traceback
logger.error(f"异常堆栈: {traceback.format_exc()}")
logger.info("=" * 50)
def main():
"""主函数 - 设置定时任务"""
# 每天07:55执行任务
logger.info("=" * 50)
logger.info("定时任务调度器启动")
logger.info("=" * 50)
# 显示配置信息
logger.info(f"检查时间: {CONFIG['checking_time']}")
logger.info(f"发送时间: {CONFIG['sending_time']}")
logger.info(f"文件路径: {CONFIG['file_path']}")
logger.info(f"消息接收者: {CONFIG['messages_reciever']}")
logger.info(f"飞书接收者数量: {len(CONFIG['open_id'])}")
# 每天在配置的时间执行任务
schedule.every().day.at(CONFIG['checking_time']).do(run_sendmsg)
print(f"[{datetime.now()}] 定时任务已启动将在每天07:55执行sendmsg.py")
print("按Ctrl+C停止程序")
logger.info(f"定时任务已设置,将在每天 {CONFIG['checking_time']} 执行sendmsg.py")
logger.info("按Ctrl+C停止程序")
# 显示下次执行时间
next_run = schedule.next_run()
if next_run:
logger.info(f"下次执行时间: {next_run.strftime('%Y-%m-%d %H:%M:%S')}")
try:
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
# 每分钟显示一次状态信息
current_time = datetime.now()
logger.debug(f"当前时间: {current_time.strftime('%Y-%m-%d %H:%M:%S')}, 等待下次执行...")
except KeyboardInterrupt:
print(f"\n[{datetime.now()}] 定时任务已停止")
logger.info(f"\n收到中断信号,正在停止定时任务...")
logger.info(f"定时任务已停止")
except Exception as e:
logger.error(f"定时任务运行过程中出现异常: {str(e)}")
logger.error(f"异常类型: {type(e).__name__}")
import traceback
logger.error(f"异常堆栈: {traceback.format_exc()}")
raise
if __name__ == "__main__":
try:
main()
except Exception as e:
logger.error(f"程序启动失败: {str(e)}")
logger.error(f"异常类型: {type(e).__name__}")
import traceback
logger.error(f"异常堆栈: {traceback.format_exc()}")
sys.exit(1)

View File

@ -1,20 +1,42 @@
import json
import lark_oapi as lark
from lark_oapi.api.im.v1 import *
from config import CONFIG
from logger_config import setup_logger
# 设置日志记录器
logger = setup_logger('send_filemsg')
def main():
logger.info("=" * 50)
logger.info("开始执行文件未找到的飞书提醒消息发送")
logger.info("=" * 50)
Local_openid = CONFIG['open_id']
logger.info(f"目标接收者数量: {len(Local_openid)}")
logger.info(f"接收者open_id列表: {Local_openid}")
# 创建client
logger.info("正在创建飞书客户端...")
try:
client = lark.Client.builder() \
.app_id(CONFIG['app_id']) \
.app_secret(CONFIG['app_secret']) \
.log_level(lark.LogLevel.DEBUG) \
.build()
logger.info("飞书客户端创建成功")
except Exception as e:
logger.error(f"飞书客户端创建失败: {str(e)}")
logger.error(f"错误类型: {type(e).__name__}")
return
for local_openid in Local_openid:
success_count = 0
fail_count = 0
for i, local_openid in enumerate(Local_openid):
logger.info(f"正在处理第 {i+1}/{len(Local_openid)} 个接收者: {local_openid}")
try:
# 构造请求对象
request: CreateMessageRequest = CreateMessageRequest.builder() \
.receive_id_type("open_id") \
@ -25,17 +47,48 @@ def main():
.build()) \
.build()
logger.info("请求对象构造成功")
logger.info(f"请求内容: 未找到指定图片,请手动发送")
# 发起请求
logger.info("正在发送飞书消息...")
response: CreateMessageResponse = client.im.v1.message.create(request)
# 处理失败返回
if not response.success():
lark.logger.error(
f"client.im.v1.message.create failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}")
return
logger.error(f"消息发送失败 - 接收者: {local_openid}")
logger.error(f"错误代码: {response.code}")
logger.error(f"错误信息: {response.msg}")
logger.error(f"日志ID: {response.get_log_id()}")
logger.error(f"响应内容: {json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}")
fail_count += 1
else:
logger.info(f"消息发送成功 - 接收者: {local_openid}")
logger.info(f"响应数据: {lark.JSON.marshal(response.data, indent=4)}")
success_count += 1
# 处理业务结果
lark.logger.info(lark.JSON.marshal(response.data, indent=4))
except Exception as e:
logger.error(f"处理接收者 {local_openid} 时出现异常: {str(e)}")
logger.error(f"异常类型: {type(e).__name__}")
import traceback
logger.error(f"异常堆栈: {traceback.format_exc()}")
fail_count += 1
# 总结报告
logger.info("=" * 50)
logger.info("文件未找到的飞书提醒消息发送完成")
logger.info(f"成功发送: {success_count}")
logger.info(f"发送失败: {fail_count}")
logger.info(f"总计处理: {len(Local_openid)} 个接收者")
logger.info("=" * 50)
if __name__ == "__main__":
try:
main()
logger.info("程序执行完成")
except Exception as e:
logger.error(f"程序执行过程中出现未捕获的异常: {str(e)}")
logger.error(f"异常类型: {type(e).__name__}")
import traceback
logger.error(f"异常堆栈: {traceback.format_exc()}")
exit(1)

View File

@ -1,21 +1,43 @@
import json
import lark_oapi as lark
from lark_oapi.api.im.v1 import *
from config import CONFIG
from logger_config import setup_logger
# 设置日志记录器
logger = setup_logger('send_openmsg')
def main():
logger.info("=" * 50)
logger.info("开始执行飞书提醒消息发送")
logger.info("=" * 50)
Local_openid = CONFIG['open_id']
logger.info(f"目标接收者数量: {len(Local_openid)}")
logger.info(f"接收者open_id列表: {Local_openid}")
# 创建client
logger.info("正在创建飞书客户端...")
try:
client = lark.Client.builder() \
.app_id(CONFIG['app_id']) \
.app_secret(CONFIG['app_secret']) \
.log_level(lark.LogLevel.DEBUG) \
.build()
logger.info("飞书客户端创建成功")
except Exception as e:
logger.error(f"飞书客户端创建失败: {str(e)}")
logger.error(f"错误类型: {type(e).__name__}")
return
# 构造请求对象
for local_openid in Local_openid:
success_count = 0
fail_count = 0
for i, local_openid in enumerate(Local_openid):
logger.info(f"正在处理第 {i+1}/{len(Local_openid)} 个接收者: {local_openid}")
try:
request: CreateMessageRequest = CreateMessageRequest.builder() \
.receive_id_type("open_id") \
.request_body(CreateMessageRequestBody.builder()
@ -25,17 +47,48 @@ def main():
.build()) \
.build()
logger.info("请求对象构造成功")
logger.info(f"请求内容: 请打开微信或者登录微信")
# 发起请求
logger.info("正在发送飞书消息...")
response: CreateMessageResponse = client.im.v1.message.create(request)
# 处理失败返回
if not response.success():
lark.logger.error(
f"client.im.v1.message.create failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}")
return
logger.error(f"消息发送失败 - 接收者: {local_openid}")
logger.error(f"错误代码: {response.code}")
logger.error(f"错误信息: {response.msg}")
logger.error(f"日志ID: {response.get_log_id()}")
logger.error(f"响应内容: {json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}")
fail_count += 1
else:
logger.info(f"消息发送成功 - 接收者: {local_openid}")
logger.info(f"响应数据: {lark.JSON.marshal(response.data, indent=4)}")
success_count += 1
# 处理业务结果
lark.logger.info(lark.JSON.marshal(response.data, indent=4))
except Exception as e:
logger.error(f"处理接收者 {local_openid} 时出现异常: {str(e)}")
logger.error(f"异常类型: {type(e).__name__}")
import traceback
logger.error(f"异常堆栈: {traceback.format_exc()}")
fail_count += 1
# 总结报告
logger.info("=" * 50)
logger.info("飞书提醒消息发送完成")
logger.info(f"成功发送: {success_count}")
logger.info(f"发送失败: {fail_count}")
logger.info(f"总计处理: {len(Local_openid)} 个接收者")
logger.info("=" * 50)
if __name__ == "__main__":
try:
main()
logger.info("程序执行完成")
except Exception as e:
logger.error(f"程序执行过程中出现未捕获的异常: {str(e)}")
logger.error(f"异常类型: {type(e).__name__}")
import traceback
logger.error(f"异常堆栈: {traceback.format_exc()}")
exit(1)

View File

@ -3,9 +3,12 @@ from datetime import date, datetime, time as dt_time
import sys
import os
import subprocess
import json
from config import CONFIG
import time
from logger_config import setup_logger
# 设置日志记录器
logger = setup_logger('sendmsg')
def wait_until_time():
"""等待到配置的发送时间"""
@ -22,61 +25,116 @@ def wait_until_time():
target_time = target_time.replace(day=target_time.day + 1)
wait_seconds = (target_time - now).total_seconds()
print(f"当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"将在 {target_time.strftime('%Y-%m-%d %H:%M:%S')} 执行发送操作")
print(f"等待时间: {wait_seconds/3600:.2f} 小时")
logger.info(f"当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"目标发送时间: {target_time.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"等待时间: {wait_seconds/3600:.2f} 小时 ({wait_seconds:.0f} 秒)")
time.sleep(wait_seconds)
logger.info("等待完成,开始执行发送操作")
def send_daily_message(count):
"""发送每日消息的主函数"""
logger.info(f"开始执行每日消息发送任务,当前尝试次数: {count}")
today = date.today()
formatted_date = today.strftime('%Y-%m-%d')
file_name = formatted_date + ".jpg"
file_path = CONFIG['file_path'] + file_name
logger.info(f"目标日期: {formatted_date}")
logger.info(f"目标文件: {file_name}")
logger.info(f"完整文件路径: {file_path}")
num = count
if num >= 4:
print("已尝试4次程序将退出")
logger.error("已尝试4次程序将退出")
sys.exit(0) # 直接终止程序
try:
logger.info("尝试初始化微信客户端...")
wx = WeChat()
except:
logger.info("微信客户端初始化成功")
except Exception as e:
logger.error(f"微信客户端初始化失败: {str(e)}")
logger.info("尝试发送飞书提醒消息...")
subprocess.run([sys.executable, 'send_openmsg.py'])
if num<4:
logger.info(f"等待60秒后重试当前尝试次数: {num+1}")
time.sleep(60)
num += 1
send_daily_message(num)
else:
print("已尝试4次程序将退出")
logger.error("已尝试4次程序将退出")
sys.exit(0) # 直接终止程序
msg = CONFIG['message']
who = CONFIG['messages_reciever']
logger.info(f"消息内容: {msg}")
logger.info(f"接收者: {who}")
if os.path.isfile(file_path):
print("找到了指定文件!")
logger.info("找到了指定文件!")
logger.info(f"文件大小: {os.path.getsize(file_path)} 字节")
# 等待到配置的时间再执行发送操作
wait_until_time()
print("开始发送文件和消息...")
logger.info("开始发送文件和消息...")
try:
# 发送文件
logger.info(f"正在发送文件: {file_path}")
wx.SendFiles(filepath=file_path, who=who)
logger.info("文件发送成功")
# 发送消息
logger.info(f"正在发送消息: {msg}")
wx.SendMsg(msg=msg, who=who)
print("发送完成!")
logger.info("消息发送成功")
logger.info("所有内容发送完成!")
return True
else:
print("没找到指定文件")
except Exception as e:
logger.error(f"发送过程中出现错误: {str(e)}")
logger.error(f"错误类型: {type(e).__name__}")
# 发送失败时也尝试发送飞书提醒
subprocess.run([sys.executable, 'send_filemsg.py'])
if num<4:
logger.info(f"等待60秒后重试当前尝试次数: {num+1}")
time.sleep(60)
num += 1
send_daily_message(num)
else:
print("已尝试4次程序将退出")
logger.error("已尝试4次程序将退出")
sys.exit(0)
else:
logger.warning(f"没找到指定文件: {file_path}")
logger.info("尝试发送飞书提醒消息...")
subprocess.run([sys.executable, 'send_filemsg.py'])
if num<4:
logger.info(f"等待60秒后重试当前尝试次数: {num+1}")
time.sleep(60)
num += 1
send_daily_message(num)
else:
logger.error("已尝试4次程序将退出")
sys.exit(0) # 直接终止程序
# 如果直接运行此脚本,执行发送消息功能
if __name__ == "__main__":
logger.info("=" * 50)
logger.info("开始执行微信消息发送程序")
logger.info("=" * 50)
count = 0
try:
send_daily_message(count)
logger.info("程序执行完成")
except Exception as e:
logger.error(f"程序执行过程中出现未捕获的异常: {str(e)}")
logger.error(f"异常类型: {type(e).__name__}")
import traceback
logger.error(f"异常堆栈: {traceback.format_exc()}")
sys.exit(1)

View File

@ -2,25 +2,99 @@ import subprocess
import sys
import os
from pathlib import Path
from logger_config import setup_logger
# 设置日志记录器
logger = setup_logger('service_runner')
def run_in_background():
"""在后台运行定时任务"""
logger.info("=" * 50)
logger.info("开始启动后台定时任务")
logger.info("=" * 50)
script_path = Path(__file__).parent / "scheduler.py"
pid_file = Path(__file__).parent / "scheduler_pid.txt"
logger.info(f"脚本路径: {script_path}")
logger.info(f"PID文件路径: {pid_file}")
logger.info(f"当前工作目录: {os.getcwd()}")
# 检查脚本文件是否存在
if not script_path.exists():
logger.error(f"脚本文件不存在: {script_path}")
return False
# 检查是否已经有PID文件
if pid_file.exists():
logger.warning(f"发现已存在的PID文件: {pid_file}")
try:
with open(pid_file, 'r') as f:
old_pid = f.read().strip()
logger.warning(f"已存在的进程ID: {old_pid}")
# 检查进程是否还在运行
try:
import psutil
if psutil.pid_exists(int(old_pid)):
logger.warning(f"进程 {old_pid} 仍在运行,请先停止它")
return False
else:
logger.info(f"进程 {old_pid} 已不存在删除旧的PID文件")
pid_file.unlink()
except ImportError:
logger.warning("无法检查进程状态,建议手动确认")
return False
except Exception as e:
logger.error(f"读取PID文件失败: {str(e)}")
pid_file.unlink() # 删除损坏的PID文件
# 使用pythonw.exe在后台运行无窗口
pythonw_path = sys.executable.replace('python.exe', 'pythonw.exe')
logger.info(f"Python解释器路径: {sys.executable}")
logger.info(f"Pythonw路径: {pythonw_path}")
if not os.path.exists(pythonw_path):
logger.warning(f"pythonw.exe不存在使用python.exe替代")
pythonw_path = sys.executable
try:
logger.info("正在启动后台进程...")
process = subprocess.Popen([pythonw_path, str(script_path)],
cwd=str(Path(__file__).parent),
creationflags=subprocess.CREATE_NO_WINDOW)
logger.info(f"后台进程启动成功进程ID: {process.pid}")
# 保存进程ID到文件
with open(pid_file, 'w') as f:
f.write(str(process.pid))
print(f"定时任务已在后台启动进程ID: {process.pid}")
print(f"进程ID已保存到: {pid_file}")
logger.info(f"进程ID已保存到: {pid_file}")
logger.info("=" * 50)
logger.info("后台定时任务启动完成")
logger.info("=" * 50)
return True
except Exception as e:
logger.error(f"启动后台进程失败: {str(e)}")
logger.error(f"错误类型: {type(e).__name__}")
import traceback
logger.error(f"异常堆栈: {traceback.format_exc()}")
return False
if __name__ == "__main__":
run_in_background()
try:
success = run_in_background()
if success:
logger.info("程序执行完成")
else:
logger.error("程序执行失败")
exit(1)
except Exception as e:
logger.error(f"程序执行过程中出现未捕获的异常: {str(e)}")
logger.error(f"异常类型: {type(e).__name__}")
import traceback
logger.error(f"异常堆栈: {traceback.format_exc()}")
exit(1)

View File

@ -3,40 +3,67 @@ import sys
import os
import psutil
from pathlib import Path
from logger_config import setup_logger
# 设置日志记录器
logger = setup_logger('stop_scheduler')
def stop_scheduler():
"""停止定时任务进程和相关的sendmsg.py进程"""
logger.info("=" * 50)
logger.info("开始停止定时任务进程")
logger.info("=" * 50)
stopped_count = 0
current_dir = str(Path(__file__).parent)
logger.info(f"当前工作目录: {current_dir}")
# 方法0: 从PID文件中读取进程ID
pid_file = Path(__file__).parent / "scheduler_pid.txt"
if pid_file.exists():
logger.info(f"发现PID文件: {pid_file}")
try:
with open(pid_file, 'r') as f:
pid = int(f.read().strip())
logger.info(f"从PID文件中读取到进程ID: {pid}")
try:
process = psutil.Process(pid)
print(f"从PID文件中找到进程: PID={pid}")
logger.info(f"找到进程: PID={pid}, 名称={process.name()}")
logger.info(f"进程命令行: {' '.join(process.cmdline())}")
process.terminate()
logger.info(f"已发送终止信号给进程 PID={pid}")
try:
process.wait(timeout=5) # 等待进程正常结束
print(f"已终止进程 PID={pid}")
logger.info(f"进程 PID={pid} 已正常终止")
except psutil.TimeoutExpired:
logger.warning(f"进程 PID={pid} 未在5秒内终止强制结束")
process.kill()
logger.info(f"已强制结束进程 PID={pid}")
stopped_count += 1
# 删除PID文件
pid_file.unlink()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
print(f"PID文件中的进程 {pid} 已不存在或无法访问")
logger.info("PID文件已删除")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e:
logger.warning(f"PID文件中的进程 {pid} 已不存在或无法访问: {str(e)}")
# 删除过期的PID文件
pid_file.unlink()
logger.info("过期的PID文件已删除")
except Exception as e:
print(f"从PID文件读取进程ID失败: {str(e)}")
logger.error(f"从PID文件读取进程ID失败: {str(e)}")
logger.error(f"错误类型: {type(e).__name__}")
else:
print("未找到PID文件尝试其他方法查找进程...")
logger.info("未找到PID文件尝试其他方法查找进程...")
try:
# 方法1: 使用psutil精确查找进程
current_dir = str(Path(__file__).parent)
logger.info("使用psutil查找相关进程...")
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
@ -46,72 +73,107 @@ def stop_scheduler():
# 检查是否是我们的scheduler.py或sendmsg.py进程
cmdline_str = ' '.join(cmdline)
if ('scheduler.py' in cmdline_str or 'sendmsg.py' in cmdline_str) and current_dir in cmdline_str:
print(f"找到进程: PID={proc.info['pid']}, 命令行={' '.join(cmdline)}")
logger.info(f"找到目标进程: PID={proc.info['pid']}")
logger.info(f"进程名称: {proc.info['name']}")
logger.info(f"命令行: {cmdline_str}")
try:
proc.terminate()
logger.info(f"已发送终止信号给进程 PID={proc.info['pid']}")
try:
proc.wait(timeout=5) # 等待进程正常结束
print(f"已终止进程 PID={proc.info['pid']}")
logger.info(f"进程 PID={proc.info['pid']} 已正常终止")
except psutil.TimeoutExpired:
logger.warning(f"进程 PID={proc.info['pid']} 未在5秒内终止强制结束")
proc.kill()
logger.info(f"已强制结束进程 PID={proc.info['pid']}")
stopped_count += 1
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e:
logger.warning(f"进程 {proc.info['pid']} 已不存在或无法访问: {str(e)}")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e:
logger.debug(f"跳过进程 {proc.info.get('pid', 'unknown')}: {str(e)}")
continue
except ImportError:
print("psutil模块未安装使用备用方法...")
logger.warning("psutil模块未安装使用备用方法...")
# 方法2: 备用方法 - 改进的tasklist方式
try:
logger.info("使用wmic命令查找进程...")
# 获取详细的进程信息
result = subprocess.run(['wmic', 'process', 'where', 'name="pythonw.exe"', 'get', 'ProcessId,CommandLine', '/format:csv'],
capture_output=True, text=True, encoding='gbk')
lines = result.stdout.strip().split('\n')
current_dir = str(Path(__file__).parent)
for line in lines[1:]: # 跳过标题行
if line.strip() and ('scheduler.py' in line or 'sendmsg.py' in line) and current_dir in line:
logger.info(f"找到目标进程: {line}")
# 提取PID
parts = line.split(',')
if len(parts) >= 2:
pid = parts[-1].strip()
if pid.isdigit():
logger.info(f"正在终止进程 PID={pid}")
subprocess.run(['taskkill', '/F', '/PID', pid], check=True)
print(f"已终止进程 PID={pid}")
logger.info(f"已终止进程 PID={pid}")
stopped_count += 1
except subprocess.CalledProcessError as e:
print(f"备用方法执行失败: {e}")
logger.error(f"wmic方法执行失败: {e}")
# 方法3: 最后的备用方案
try:
logger.info("使用tasklist命令查找进程...")
result = subprocess.run(['tasklist', '/FI', 'IMAGENAME eq pythonw.exe', '/FO', 'CSV'],
capture_output=True, text=True)
if 'pythonw.exe' in result.stdout:
print("警告: 找到pythonw.exe进程但无法确定是否为目标进程")
logger.warning("警告: 找到pythonw.exe进程但无法确定是否为目标进程")
logger.warning("建议手动检查进程列表")
logger.info("进程列表:")
logger.info(result.stdout)
response = input("是否强制结束所有pythonw.exe进程(y/N): ")
if response.lower() == 'y':
logger.info("正在强制结束所有pythonw.exe进程...")
subprocess.run(['taskkill', '/F', '/IM', 'pythonw.exe'], check=True)
print("已强制结束所有pythonw.exe进程")
logger.info("已强制结束所有pythonw.exe进程")
stopped_count += 1
except subprocess.CalledProcessError as e:
print(f"最后备用方案失败: {e}")
logger.error(f"最后备用方案失败: {e}")
# 总结报告
logger.info("=" * 50)
if stopped_count > 0:
print(f"成功停止了 {stopped_count} 个定时任务相关进程(scheduler.py或sendmsg.py)")
logger.info(f"成功停止了 {stopped_count} 个定时任务相关进程(scheduler.py或sendmsg.py)")
else:
print("未找到运行中的定时任务相关进程(scheduler.py或sendmsg.py)")
logger.info("未找到运行中的定时任务相关进程(scheduler.py或sendmsg.py)")
logger.info("=" * 50)
# 额外检查:查看是否还有相关进程
print("\n检查剩余进程...")
logger.info("检查剩余进程...")
try:
result = subprocess.run(['tasklist', '/FI', 'IMAGENAME eq pythonw.exe'],
capture_output=True, text=True)
if 'pythonw.exe' in result.stdout:
print("仍有pythonw.exe进程在运行:")
print(result.stdout)
logger.info("仍有pythonw.exe进程在运行:")
logger.info(result.stdout)
else:
print("没有发现pythonw.exe进程")
except:
pass
logger.info("没有发现pythonw.exe进程")
except Exception as e:
logger.warning(f"检查剩余进程失败: {str(e)}")
if __name__ == "__main__":
try:
stop_scheduler()
logger.info("程序执行完成")
except Exception as e:
logger.error(f"程序执行过程中出现未捕获的异常: {str(e)}")
logger.error(f"异常类型: {type(e).__name__}")
import traceback
logger.error(f"异常堆栈: {traceback.format_exc()}")
exit(1)

231
view_logs.py Normal file
View File

@ -0,0 +1,231 @@
#!/usr/bin/env python3
"""
日志查看工具
用于快速查看和搜索wxmsg程序的日志文件
"""
import os
import sys
from pathlib import Path
from datetime import datetime, timedelta
import argparse
def list_log_files():
"""列出所有可用的日志文件"""
logs_dir = Path("wxauto_logs")
if not logs_dir.exists():
print("❌ 日志目录不存在: wxauto_logs/")
return []
log_files = []
for log_file in logs_dir.glob("*.log"):
stat = log_file.stat()
size_mb = stat.st_size / (1024 * 1024)
modified_time = datetime.fromtimestamp(stat.st_mtime)
log_files.append({
'name': log_file.name,
'path': log_file,
'size_mb': size_mb,
'modified': modified_time
})
# 按修改时间排序,最新的在前
log_files.sort(key=lambda x: x['modified'], reverse=True)
return log_files
def show_log_files():
"""显示日志文件列表"""
log_files = list_log_files()
if not log_files:
print("📁 没有找到日志文件")
return
print("📋 可用的日志文件:")
print("=" * 80)
print(f"{'文件名':<30} {'大小(MB)':<10} {'最后修改':<20} {'状态'}")
print("=" * 80)
for log_file in log_files:
status = "🟢 最新" if log_file['modified'].date() == datetime.now().date() else "🟡 旧"
print(f"{log_file['name']:<30} {log_file['size_mb']:<10.2f} {log_file['modified'].strftime('%Y-%m-%d %H:%M:%S'):<20} {status}")
print("=" * 80)
def view_log_file(filename, lines=50, search=None, level=None):
"""查看指定的日志文件"""
logs_dir = Path("wxauto_logs")
log_path = logs_dir / filename
if not log_path.exists():
print(f"❌ 日志文件不存在: {filename}")
return
print(f"📖 查看日志文件: {filename}")
print(f"📁 文件路径: {log_path.absolute()}")
print(f"📊 文件大小: {log_path.stat().st_size / 1024:.2f} KB")
print("=" * 80)
try:
with open(log_path, 'r', encoding='utf-8') as f:
all_lines = f.readlines()
# 过滤行
filtered_lines = []
for line in all_lines:
# 按级别过滤
if level and level.upper() not in line.upper():
continue
# 按搜索词过滤
if search and search.lower() not in line.lower():
continue
filtered_lines.append(line)
# 显示最后N行
if lines > 0:
filtered_lines = filtered_lines[-lines:]
if not filtered_lines:
print("🔍 没有找到匹配的日志条目")
return
print(f"📝 显示 {len(filtered_lines)} 条日志条目:")
print("-" * 80)
for line in filtered_lines:
line = line.strip()
if not line:
continue
# 高亮显示不同级别的日志
if "ERROR" in line:
print(f"🔴 {line}")
elif "WARNING" in line:
print(f"🟡 {line}")
elif "INFO" in line:
print(f"🔵 {line}")
elif "DEBUG" in line:
print(f"{line}")
else:
print(f" {line}")
except Exception as e:
print(f"❌ 读取日志文件失败: {str(e)}")
def search_logs(search_term, level=None, days=1):
"""搜索所有日志文件"""
logs_dir = Path("wxauto_logs")
if not logs_dir.exists():
print("❌ 日志目录不存在: wxauto_logs/")
return
print(f"🔍 在所有日志文件中搜索: '{search_term}'")
if level:
print(f"📊 日志级别: {level}")
print(f"📅 搜索范围: 最近 {days}")
print("=" * 80)
found_count = 0
cutoff_date = datetime.now() - timedelta(days=days)
for log_file in logs_dir.glob("*.log"):
try:
stat = log_file.stat()
modified_time = datetime.fromtimestamp(stat.st_mtime)
# 检查文件是否在时间范围内
if modified_time < cutoff_date:
continue
with open(log_file, 'r', encoding='utf-8') as f:
file_lines = f.readlines()
matching_lines = []
for line in file_lines:
line = line.strip()
if not line:
continue
# 按级别过滤
if level and level.upper() not in line.upper():
continue
# 按搜索词过滤
if search_term.lower() in line.lower():
matching_lines.append(line)
if matching_lines:
print(f"\n📁 文件: {log_file.name}")
print(f"📅 修改时间: {modified_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"🔍 找到 {len(matching_lines)} 条匹配记录:")
print("-" * 60)
for line in matching_lines[:20]: # 限制每个文件显示20条
# 高亮显示不同级别的日志
if "ERROR" in line:
print(f"🔴 {line}")
elif "WARNING" in line:
print(f"🟡 {line}")
elif "INFO" in line:
print(f"🔵 {line}")
elif "DEBUG" in line:
print(f"{line}")
else:
print(f" {line}")
if len(matching_lines) > 20:
print(f"... 还有 {len(matching_lines) - 20} 条记录")
found_count += len(matching_lines)
except Exception as e:
print(f"❌ 读取文件 {log_file.name} 失败: {str(e)}")
print("=" * 80)
print(f"🔍 搜索完成,总共找到 {found_count} 条匹配记录")
def main():
parser = argparse.ArgumentParser(description="WXMSG日志查看工具")
parser.add_argument("action", choices=["list", "view", "search"],
help="操作类型: list(列出文件), view(查看文件), search(搜索)")
parser.add_argument("--file", "-f", help="要查看的日志文件名")
parser.add_argument("--lines", "-n", type=int, default=50,
help="显示的行数 (默认: 50, 0表示全部)")
parser.add_argument("--search", "-s", help="搜索关键词")
parser.add_argument("--level", "-l", choices=["INFO", "WARNING", "ERROR", "DEBUG"],
help="按日志级别过滤")
parser.add_argument("--days", "-d", type=int, default=1,
help="搜索最近几天的日志 (默认: 1)")
args = parser.parse_args()
if args.action == "list":
show_log_files()
elif args.action == "view":
if not args.file:
print("❌ 请指定要查看的日志文件名")
print("💡 使用 --file 参数或先运行 'python view_logs.py list' 查看可用文件")
return
view_log_file(args.file, args.lines, args.search, args.level)
elif args.action == "search":
if not args.search:
print("❌ 请指定搜索关键词")
print("💡 使用 --search 参数指定要搜索的内容")
return
search_logs(args.search, args.level, args.days)
if __name__ == "__main__":
if len(sys.argv) == 1:
print("🔧 WXMSG日志查看工具")
print("=" * 40)
print("用法:")
print(" python view_logs.py list # 列出所有日志文件")
print(" python view_logs.py view -f filename.log # 查看指定日志文件")
print(" python view_logs.py search -s keyword # 搜索日志内容")
print("\n更多帮助: python view_logs.py --help")
print("=" * 40)
else:
main()