# process_manager.py import subprocess import psutil import logging import threading import time import os from typing import Dict, Optional, List, Union import requests from utils import SCHEDULER_BASE_URL, BACKEND_BASE_URL class ProcessManager: def __init__(self, check_interval: int = 5, timeout: int = 300): self.processes: Dict[int, dict] = {} # {pid: {meta}} self.lock = threading.Lock() self.check_interval = check_interval # 检查间隔(秒) self.timeout = timeout # 进程超时时间(秒) self._monitor_thread: Optional[threading.Thread] = None self._running = False # 配置日志 self.log_dir: str = "process_logs" # 确保日志目录存在 os.makedirs(self.log_dir, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger("ProcessManager") def spawn( self, command: Union[str, List[str]], cwd: str = None, env: dict = None, shell: bool = False, **meta ) -> Optional[int]: """ 创建并监控子进程 :param command: 命令字符串或参数列表 :param cwd: 工作目录 :param env: 环境变量(默认继承当前环境) :param shell: 是否使用shell执行 :param meta: 附加元数据 :return: 成功返回PID,失败返回None """ try: # 准备日志文件 timestamp = time.strftime("%Y%m%d-%H%M%S") log_prefix = os.path.join( self.log_dir, f"proc_{timestamp}_{os.getpid()}" ) # 打开日志文件 with open(f"{log_prefix}.stdout", "w") as stdout_f, \ open(f"{log_prefix}.stderr", "w") as stderr_f: # 创建子进程 env = {**os.environ, **(env or {})} env.update({ 'SCHEDULER_BASE_URL': SCHEDULER_BASE_URL, 'BACKEND_BASE_URL': BACKEND_BASE_URL, 'missionId': str(meta['mission']['id']), 'planId': str(meta['plan']['id']), }) proc = subprocess.Popen( command, cwd=cwd, env=env, shell=shell, stdout=stdout_f, stderr=stderr_f, text=True, ) # 注册进程信息 with self.lock: self.processes[proc.pid] = { "proc": proc, "command": command, "start_time": time.time(), "last_active": time.time(), "log_stdout": stdout_f.name, "log_stderr": stderr_f.name, "meta": meta } self.logger.info( f"创建子进程 PID={proc.pid} | " f"命令: {command} | " f"日志: {log_prefix}.*" ) return proc.pid except Exception as e: self.logger.error(f"创建进程失败: {str(e)}") return None def start_monitoring(self): """启动后台监控线程""" if self._running: return self._running = True self._monitor_thread = threading.Thread( target=self._monitor_loop, daemon=True # 随主进程退出 ) self._monitor_thread.start() self.logger.info("进程监控线程已启动") def stop_monitoring(self): """停止监控""" self._running = False if self._monitor_thread: self._monitor_thread.join() self.logger.info("进程监控线程已停止") def _monitor_loop(self): """监控主循环""" while self._running: try: self._check_processes() except Exception as e: self.logger.error(f"监控循环异常: {str(e)}", exc_info=True) time.sleep(self.check_interval) def _check_processes(self): """执行进程状态检查""" current_time = time.time() dead_pids = [] with self.lock: for pid, info in self.processes.items(): try: proc = psutil.Process(pid) # 检测崩溃 if proc.status() == psutil.STATUS_ZOMBIE: self.logger.warning(f"进程 {pid} 处于僵尸状态") dead_pids.append(pid) self._handle_crash(pid, info) continue # 检测超时(假死) if (current_time - info["last_active"]) > self.timeout: self.logger.warning(f"进程 {pid} 超时,即将终止") proc.terminate() dead_pids.append(pid) self._handle_timeout(pid, info) continue # 更新活跃时间(可根据业务逻辑调整) info["last_active"] = current_time except psutil.NoSuchProcess: response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'CRASH', 'results': None}) code = response.json()['code'] if code == 'OK': self.logger.warning(f"进程 {pid} 已正常退出") dead_pids.append(pid) if code == 'ERROR': dead_pids.append(pid) self._handle_crash(pid, info) # 清理已终止进程 for pid in dead_pids: del self.processes[pid] def _handle_crash(self, pid: int, info: dict): """进程崩溃处理逻辑""" # 读取最后10行错误日志 try: with open(info["log_stderr"]) as f: last_lines = "".join(f.readlines()[-10:]) except Exception: last_lines = "无法读取日志" self.logger.error( f"进程崩溃 PID={pid}\n" f"命令: {info['command']}\n" f"最后错误输出:\n{last_lines}" ) # 向flask发送进程崩溃信息 response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'DEAD', 'results': None}) def _handle_timeout(self, pid: int, info: dict): """进程超时处理逻辑""" # 记录诊断信息 self.logger.error(f"进程超时处理 PID={pid} | 运行时间: {time.time() - info['start_time']:.1f}s") # 向flask发送进程超时信息 response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'TIMEOUT', 'results': None})