diff --git a/start.py b/start.py new file mode 100644 index 0000000..05706a9 --- /dev/null +++ b/start.py @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import json +import os +import shutil +import signal +import subprocess +import sys +import time +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +ROOT = Path(__file__).resolve().parent +STATE_DIR = ROOT / ".tmp" / "dev-manager" +STATE_FILE = STATE_DIR / "state.json" +LOG_FILE = STATE_DIR / "dev.log" +ERR_FILE = STATE_DIR / "dev.err.log" +SUPERVISOR_MODE = "__serve" +API_URL = "http://localhost:3001" +WEB_URL = "http://localhost:5173" +NPM_EXECUTABLE = "npm.cmd" if os.name == "nt" else "npm" + + +def now_iso() -> str: + return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") + + +def print_line(message: str) -> None: + print(message, flush=True) + + +def ensure_state_dir() -> None: + STATE_DIR.mkdir(parents=True, exist_ok=True) + + +def append_log_header(path: Path, title: str) -> None: + timestamp = now_iso() + with path.open("a", encoding="utf-8") as handle: + handle.write(f"\n[{timestamp}] {title}\n") + + +def read_state() -> dict[str, Any] | None: + if not STATE_FILE.exists(): + return None + + try: + return json.loads(STATE_FILE.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + return None + + +def write_state(state: dict[str, Any]) -> None: + ensure_state_dir() + STATE_FILE.write_text( + json.dumps(state, ensure_ascii=True, indent=2), + encoding="utf-8" + ) + + +def remove_state(expected_token: str | None = None) -> None: + if not STATE_FILE.exists(): + return + + if expected_token is not None: + current = read_state() + if not current or current.get("token") != expected_token: + return + + STATE_FILE.unlink(missing_ok=True) + + +def process_exists(pid: int) -> bool: + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + return True + except OSError: + return False + return True + + +def get_process_commandline(pid: int) -> str | None: + if os.name == "nt": + command = [ + "powershell", + "-NoProfile", + "-Command", + ( + f'$process = Get-CimInstance Win32_Process -Filter "ProcessId = {pid}"; ' + 'if ($process) { [Console]::Out.Write($process.CommandLine) }' + ) + ] + else: + command = ["ps", "-o", "command=", "-p", str(pid)] + + result = subprocess.run( + command, + cwd=str(ROOT), + capture_output=True, + text=True, + encoding="utf-8", + errors="ignore", + check=False + ) + + commandline = result.stdout.strip() + return commandline or None + + +def is_expected_supervisor(state: dict[str, Any]) -> bool: + pid = int(state.get("pid", 0)) + token = str(state.get("token", "")).strip() + if pid <= 0 or not token or not process_exists(pid): + return False + + commandline = get_process_commandline(pid) + if not commandline: + return True + + expected_fragments = [Path(__file__).name, SUPERVISOR_MODE, token] + return all(fragment in commandline for fragment in expected_fragments) + + +def get_running_state() -> dict[str, Any] | None: + state = read_state() + if not state: + return None + + if not is_expected_supervisor(state): + remove_state() + return None + + return state + + +def tail_text(path: Path, line_count: int = 20) -> str: + if not path.exists(): + return "" + + try: + content = path.read_text(encoding="utf-8", errors="ignore") + except OSError: + return "" + + lines = content.splitlines() + if not lines: + return "" + + return "\n".join(lines[-line_count:]) + + +def ensure_npm_available() -> bool: + return shutil.which(NPM_EXECUTABLE) is not None + + +def wait_for_process_exit(pid: int, attempts: int = 20, interval: float = 0.25) -> bool: + for _ in range(attempts): + if not process_exists(pid): + return True + time.sleep(interval) + return not process_exists(pid) + + +def stop_process_tree(pid: int) -> bool: + if os.name == "nt": + graceful = subprocess.run( + ["taskkill", "/PID", str(pid), "/T"], + cwd=str(ROOT), + capture_output=True, + text=True, + check=False + ) + if graceful.returncode == 0: + if wait_for_process_exit(pid): + return True + + forced = subprocess.run( + ["taskkill", "/PID", str(pid), "/T", "/F"], + cwd=str(ROOT), + capture_output=True, + text=True, + check=False + ) + if forced.returncode == 0: + wait_for_process_exit(pid) + return True + return not process_exists(pid) + + try: + os.killpg(os.getpgid(pid), signal.SIGTERM) + except OSError: + return not process_exists(pid) + + if wait_for_process_exit(pid): + return True + + try: + os.killpg(os.getpgid(pid), signal.SIGKILL) + except OSError: + return not process_exists(pid) + + if wait_for_process_exit(pid): + return True + + return not process_exists(pid) + + +def start_supervisor() -> int: + ensure_state_dir() + append_log_header(LOG_FILE, "starting root dev command") + append_log_header(ERR_FILE, "starting root dev command") + + token = uuid.uuid4().hex + creation_flags = 0 + if os.name == "nt": + creation_flags |= getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0) + creation_flags |= getattr(subprocess, "DETACHED_PROCESS", 0) + + process = subprocess.Popen( + [sys.executable, str(Path(__file__).resolve()), SUPERVISOR_MODE, token], + cwd=str(ROOT), + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + creationflags=creation_flags, + close_fds=True + ) + + deadline = time.monotonic() + 10 + state: dict[str, Any] | None = None + while time.monotonic() < deadline: + current = read_state() + if current and current.get("token") == token and is_expected_supervisor(current): + state = current + break + if process.poll() is not None: + break + time.sleep(0.2) + + if not state: + print_line("启动失败:后台 supervisor 没有成功创建。") + error_tail = tail_text(ERR_FILE) + if error_tail: + print_line(error_tail) + return 1 + + time.sleep(3) + state = get_running_state() + if not state: + print_line("启动失败:`npm run dev` 很快退出了。") + error_tail = tail_text(ERR_FILE) + if error_tail: + print_line(error_tail) + else: + log_tail = tail_text(LOG_FILE) + if log_tail: + print_line(log_tail) + return 1 + + print_line("已启动前后端开发环境。") + print_line(f"Web: {WEB_URL}") + print_line(f"API: {API_URL}") + print_line(f"状态文件: {STATE_FILE}") + print_line(f"日志: {LOG_FILE}") + print_line("关闭命令: python start.py stop") + print_line("无参数执行时会自动在启动和关闭之间切换。") + return 0 + + +def stop_supervisor() -> int: + state = get_running_state() + if not state: + remove_state() + print_line("当前没有运行中的开发环境。") + return 0 + + pid = int(state["pid"]) + token = str(state["token"]) + stopped = stop_process_tree(pid) + remove_state(expected_token=token) + + if stopped: + print_line("已关闭前后端开发环境。") + return 0 + + print_line(f"关闭失败:无法结束 supervisor 进程 {pid}。") + return 1 + + +def show_status() -> int: + state = get_running_state() + if not state: + print_line("状态: 未运行") + print_line("启动命令: python start.py start") + return 0 + + print_line("状态: 运行中") + print_line(f"Supervisor PID: {state['pid']}") + child_pid = state.get("childPid") + if child_pid: + print_line(f"Root dev PID: {child_pid}") + print_line(f"启动时间: {state['startedAt']}") + print_line(f"Web: {state['webUrl']}") + print_line(f"API: {state['apiUrl']}") + print_line(f"日志: {state['logFile']}") + print_line(f"错误日志: {state['errorLogFile']}") + return 0 + + +def run_supervisor(token: str) -> int: + ensure_state_dir() + state = { + "pid": os.getpid(), + "token": token, + "startedAt": now_iso(), + "root": str(ROOT), + "logFile": str(LOG_FILE), + "errorLogFile": str(ERR_FILE), + "webUrl": WEB_URL, + "apiUrl": API_URL, + "command": [NPM_EXECUTABLE, "run", "dev"] + } + write_state(state) + + with LOG_FILE.open("a", encoding="utf-8") as stdout_handle, ERR_FILE.open( + "a", + encoding="utf-8" + ) as stderr_handle: + stdout_handle.write(f"[{now_iso()}] supervisor pid={os.getpid()}\n") + stdout_handle.flush() + + child = subprocess.Popen( + [NPM_EXECUTABLE, "run", "dev"], + cwd=str(ROOT), + stdin=subprocess.DEVNULL, + stdout=stdout_handle, + stderr=stderr_handle + ) + + state["childPid"] = child.pid + write_state(state) + + try: + return child.wait() + finally: + append_log_header(LOG_FILE, "root dev command stopped") + remove_state(expected_token=token) + + +def show_usage() -> int: + print_line("用法: python start.py [start|stop|restart|status|toggle]") + print_line("无参数默认执行 toggle。") + return 1 + + +def main() -> int: + action = sys.argv[1].strip().lower() if len(sys.argv) > 1 else "toggle" + + if action == SUPERVISOR_MODE: + if len(sys.argv) < 3: + return 1 + return run_supervisor(sys.argv[2]) + + if action == "start": + running = get_running_state() + if running: + print_line("开发环境已在运行。") + return show_status() + if not ensure_npm_available(): + print_line("未找到 npm,请先安装 Node.js 并确认 npm 已加入 PATH。") + return 1 + return start_supervisor() + + if action == "stop": + return stop_supervisor() + + if action == "restart": + stop_code = stop_supervisor() + if stop_code != 0: + return stop_code + if not ensure_npm_available(): + print_line("未找到 npm,请先安装 Node.js 并确认 npm 已加入 PATH。") + return 1 + return start_supervisor() + + if action == "status": + return show_status() + + if action == "toggle": + if get_running_state(): + return stop_supervisor() + if not ensure_npm_available(): + print_line("未找到 npm,请先安装 Node.js 并确认 npm 已加入 PATH。") + return 1 + return start_supervisor() + + return show_usage() + + +if __name__ == "__main__": + raise SystemExit(main())