# process_manager.py import subprocess import psutil import logging import threading import time import os import json from queue import Queue, Empty from typing import Dict, Optional, List, Union import requests from utils import store from utils import SCHEDULER_BASE_URL, BACKEND_BASE_URL class ProcessManager: def __init__(self, check_interval: int = 1, timeout: int = 300): self.processes: Dict[int, dict] = {} # {pid: {meta}} self.lock = threading.Lock() self.check_interval = check_interval # 检查间隔(秒) self.timeout = timeout # 进程超时时间(秒) self._monitor_thread: Optional[threading.Thread] = None self._running = False # 与子进程的通信队列相关参数 self._reader_threads: Dict[int, threading.Thread] = {} # pid: # 配置日志 self.log_dir: str = "process_logs" # 确保日志目录存在 os.makedirs(self.log_dir, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger("ProcessManager") # 准备日志文件 timestamp = time.strftime("%Y%m%d-%H%M%S") log_prefix = os.path.join( self.log_dir, f"proc_{timestamp}_{os.getpid()}" ) # 打开日志文件 # with open(f"{log_prefix}.stdout", "w") as stdout_f, \ # open(f"{log_prefix}.stderr", "w") as stderr_f: def spawn( self, command: Union[str, List[str]], cwd: str = None, env: dict = None, shell: bool = False, **meta ) -> Optional[int]: """ 创建并监控子进程 :param command: 命令字符串或参数列表 :param cwd: 工作目录 :param env: 环境变量(默认继承当前环境) :param shell: 是否使用shell执行 :param meta: 附加元数据 :return: 成功返回PID,失败返回None """ try: # 创建子进程 env = {**os.environ, **(env or {})} env.update({ "PYTHONUNBUFFERED": "1", # 禁用缓冲 'SCHEDULER_BASE_URL': SCHEDULER_BASE_URL, 'BACKEND_BASE_URL': BACKEND_BASE_URL, 'missionId': str(meta['mission']['id']), 'planId': str(meta['plan']['id']), }) proc = subprocess.Popen( command, cwd=cwd, env=env, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) self.logger.info(f"准备创建进程") # 注册进程信息 with self.lock: self.processes[proc.pid] = { "proc": proc, "command": command, "start_time": time.time(), "last_active": time.time(), "msg_queue": Queue(), "meta": meta } self.logger.info(f"准备开始监听") self._start_reader(proc.pid, proc.stdout, proc.stderr) self.logger.info( f"创建子进程 PID={proc.pid} | " f"命令: {command} | " ) return proc.pid except Exception as e: self.logger.error(f"创建进程失败: {str(e)}") return None def stop(self, missionId: int): plansStopped = [] pids = [] # to delete for pid in self.processes: if int(self.processes[pid]['meta']['mission']['id']) == missionId: pids.append(pid) plansStopped.append({'planId': int(self.processes[pid]['meta']['plan']['id'])}) for pid in pids: self.remove_process(pid) def _start_reader(self, pid:int, stdout, stderr): """为每个子进程启动独立的非阻塞读取线程""" def reader_loop(pid, out_pipe, queue: Queue): try: while True: # 非阻塞读取 stdout try: out_line = out_pipe.readline() # 管道关闭返回空串 if not out_line: break if out_line: queue.put(('stdout', out_line.strip())) except (IOError, ValueError): self.logger.info(f"进程消息读取错误 pid:{pid}") pass time.sleep(0.1) # 降低 CPU 占用 except Exception as e: self.logger.error(f"读取子进程消息错误: {str(e)}") def reader_err_loop(pid, err_pipe, queue: Queue): try: while True: # 非阻塞读取 stderr try: err_line = err_pipe.readline() # 管道关闭返回空串 if not err_line: break if err_line: queue.put(('stderr', err_line.strip())) except (IOError, ValueError): self.logger.info(f"进程错误读取错误 pid:{pid}") pass time.sleep(0.1) # 降低 CPU 占用 except Exception as e: self.logger.error(f"读取子进程消息错误: {str(e)}") # 创建并启动消息读取线程 t = threading.Thread( target=reader_loop, args=(pid, stdout, self.processes[pid]["msg_queue"]), daemon=True ) t.start() # 创建并启动错误读取线程 tE = threading.Thread( target=reader_err_loop, args=(pid, stderr, self.processes[pid]["msg_queue"]), daemon=True ) tE.start() self._reader_threads[pid] = [t, tE] def start_monitoring(self): """启动后台监控线程""" if self._running: return self._running = True self._monitor_thread = threading.Thread( target=self._monitor_loop, daemon=True # 随主进程退出 ) self._monitor_thread.start() self.logger.info("进程监控线程已启动") def stop_monitoring(self): """停止监控""" self._running = False if self._monitor_thread: self._monitor_thread.join() self.logger.info("进程监控线程已停止") def remove_process(self, missionId: int, planId: int): """删除进程监视任务""" for pid, info in self.processes.items(): if int(info["meta"]['mission']['id']) == int(missionId) and int(info["meta"]["plan"]["id"]) == int(planId): try: proc = psutil.Process(pid) proc.terminate() except psutil.NoSuchProcess: self.logger.error(f"进程已自行退出 MissionId: {missionId} PlanId: {planId}") except Exception as error: self.logger.error(f"移除处理进程监视失败 MissionId: {missionId} PlanId: {planId} Error: {error}") return False # 清理进程列表 del self.processes[pid] # 清理读取线程 if pid in self._reader_threads: for t in self._reader_threads[pid]: t.join() del self._reader_threads[pid] self.logger.info(f"移除处理进程监视 MissionId: {missionId} PlanId: {planId}") return True self.logger.info(f"该处理进程不在监视进程表中 MissionId: {missionId} PlanId: {planId}") return True def remove_process(self, pid_to_del: int): try: # 清理监视进程列表 for pid, info in self.processes.items(): if pid == pid_to_del: missionId = info['meta']['mission']['id'] planId = info["meta"]["plan"]["id"] proc = psutil.Process(pid) proc.terminate() self.logger.info(f"移除处理进程监视成功 MissionId: {missionId} PlanId: {planId}") except psutil.NoSuchProcess: self.logger.error(f"进程已自行退出 MissionId: {missionId} PlanId: {planId}") except Exception as error: self.logger.error(f"移除处理进程监视-with pid失败 pid:{pid_to_del}") return False # 移除监视进程列表 del self.processes[pid] # 清理读取线程 if pid_to_del in self._reader_threads: for t in self._reader_threads[pid_to_del]: t.join() del self._reader_threads[pid_to_del] return True def _monitor_loop(self): """监控主循环""" while self._running: try: self._check_processes() except Exception as e: self.logger.error(f"监控循环异常: {str(e)}", exc_info=True) time.sleep(self.check_interval) def _check_processes(self): """执行进程状态检查""" current_time = time.time() dead_pids = [] reportDatas = [] with self.lock: for pid, info in self.processes.items(): try: proc = psutil.Process(pid) # 检测崩溃 if proc.status() == psutil.STATUS_ZOMBIE: self.logger.warning(f"进程 {pid} 处于僵尸状态") dead_pids.append(pid) self._handle_crash(pid, info) continue # 检测超时(假死) if (current_time - info["last_active"]) > self.timeout: self.logger.warning(f"进程 {pid} 超时,即将终止") proc.terminate() dead_pids.append(pid) self._handle_timeout(pid, info) continue # 更新活跃时间(可根据业务逻辑调整) info["last_active"] = current_time except psutil.NoSuchProcess: # 无法找到进程时,可能子进程已正常退出,读取所有管道中剩余数据,判断是否崩溃 response = {} try: while True: pipe_type, message = info["msg_queue"].get_nowait() response = self._handle_process_message(pid, pipe_type, message) except Empty: pass except Exception as error: self.logger.error(f"ERROR:{error}") if 'finished' in response and response['finished']: # 虽然已经找不到进程,但是由于进程已正常退出 # 将获取的result缓存,待释放锁后向调度器汇报 reportDatas.append({ 'missionId': info["meta"]["mission"]["id"], 'planId': info["meta"]["plan"]["id"], 'state': 'DONE', 'results': response['results'], }) self.remove_process(pid) else: # 非正常退出(未传递finished信息) self._handle_crash(pid, info) # 无论如何都加入待清理列表 dead_pids.append(pid) # 正常读取子进程输出 try: while True: pipe_type, message = info["msg_queue"].get_nowait() response = self._handle_process_message(pid, pipe_type, message) if 'finished' in response and response['finished']: # 正常退出 # 将获取的result缓存,待释放锁后向调度器汇报 reportDatas.append({ 'missionId': int(info["meta"]["mission"]["id"]), 'planId': int(info["meta"]["plan"]["id"]), 'state': 'DONE', 'results': response['results'], }) self.remove_process(pid) dead_pids.append(pid) except Empty: pass # 清理已终止进程 for pid in dead_pids: del self.processes[pid] # 清理读取线程 if pid in self._reader_threads: for t in self._reader_threads[pid]: t.join() del self._reader_threads[pid] # 锁已释放 # 依次提交已完成任务,获取下一步任务 # 注意在收到results通信时,就已经将结果发送至django,这里不需要处理 for report in reportDatas: # 向store提交 missionId = report['missionId'] planId=report['planId'] results=report['results'] for nextTask in store.solveMission(missionId=missionId, planId=planId, results=results): task = store.prepareTask(missionId=missionId, planId=nextTask['id']) if self.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']): self.logger.info(f"创建后续计算任务成功 MissionId:{missionId} PlanId:{task['plan']['id']}") else: self.logger.error(f"创建后续计算任务失败 MissionId:{missionId} PlanId:{task['plan']['id']}") # 任务无法继续,向django汇报下一个任务失败 response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json={ 'missionId': task['mission']['id'], 'planId': task['plan']['id'], 'progress': -1, # -1 表示单个任务失败 -2 表示全体mission失败 }) def _handle_process_message(self, pid:int, pipe_type:str, message:str): """处理来自子进程的通信消息""" try: # 解析 JSON 格式消息 data = json.loads(message) self.logger.info(f"收到进程消息 PID={pid}: {data}") # 更新最后活跃时间 self.processes[pid]["last_active"] = time.time() # 处理完成消息 msg = data.get("msg") if msg == "progress": # 获得进度汇报,向django汇报 response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json=data.get("data")) return {'finished': False} if msg == "result": # 获得返回结果,向django汇报 response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json=data.get("data")) self.logger.info(f"进程结果已向后端服务器反馈 pid:{pid} response:{response}") self.logger.info(f"进程 {pid} 报告已完成") # 标记该进程正常退出 return {'finished': True, 'results': data.get('data')} return {'finished': False} except json.JSONDecodeError: self.logger.warning(f"无效消息格式 PID={pid}: {message}") return {'finished': False} def _handle_crash(self, pid: int, info: dict): """进程崩溃处理逻辑""" # 读取最后10行错误日志 # try: # with open(info["log_stderr"]) as f: # last_lines = "".join(f.readlines()[-10:]) # except Exception: # last_lines = "无法读取日志" self.logger.error( f"进程崩溃 PID={pid}\n" f"命令: {info['command']}\n" ) # 发现进程崩溃,向django汇报任务失败 response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'progress': -1}) # -1表示单个任务失败 # 调用store终止该mission store.removeMission(missionId=info['meta']['mission']['id']) self.logger.error(f"任务进程发生崩溃,Mission {info['meta']['mission']['id']}已终止") def _handle_timeout(self, pid: int, info: dict): """进程超时处理逻辑""" # 记录诊断信息 self.logger.error(f"进程超时处理 PID={pid} | 运行时间: {time.time() - info['start_time']:.1f}s") # 调用store终止该mission store.removeMission(missionId=info['meta']['mission']['id']) self.logger.error(f"任务进程发生超时,Mission {info['meta']['mission']['id']}已终止")