# process_manager.py import subprocess import psutil import logging import threading import time import os import json from queue import Queue, Empty from typing import Dict, Optional, List, Union import requests from utils import SCHEDULER_BASE_URL, BACKEND_BASE_URL class ProcessManager: def __init__(self, check_interval: int = 1, timeout: int = 300): self.processes: Dict[int, dict] = {} # {pid: {meta}} self.lock = threading.Lock() self.check_interval = check_interval # 检查间隔(秒) self.timeout = timeout # 进程超时时间(秒) self._monitor_thread: Optional[threading.Thread] = None self._running = False # 与子进程的通信队列相关参数 self._reader_threads: Dict[int, threading.Thread] = {} # pid: # 配置日志 self.log_dir: str = "process_logs" # 确保日志目录存在 os.makedirs(self.log_dir, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger("ProcessManager") # 准备日志文件 timestamp = time.strftime("%Y%m%d-%H%M%S") log_prefix = os.path.join( self.log_dir, f"proc_{timestamp}_{os.getpid()}" ) # 打开日志文件 # with open(f"{log_prefix}.stdout", "w") as stdout_f, \ # open(f"{log_prefix}.stderr", "w") as stderr_f: def spawn( self, command: Union[str, List[str]], cwd: str = None, env: dict = None, shell: bool = False, **meta ) -> Optional[int]: """ 创建并监控子进程 :param command: 命令字符串或参数列表 :param cwd: 工作目录 :param env: 环境变量(默认继承当前环境) :param shell: 是否使用shell执行 :param meta: 附加元数据 :return: 成功返回PID,失败返回None """ try: # 创建子进程 env = {**os.environ, **(env or {})} env.update({ "PYTHONUNBUFFERED": "1", # 禁用缓冲 'SCHEDULER_BASE_URL': SCHEDULER_BASE_URL, 'BACKEND_BASE_URL': BACKEND_BASE_URL, 'missionId': str(meta['mission']['id']), 'planId': str(meta['plan']['id']), }) proc = subprocess.Popen( command, cwd=cwd, env=env, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) self.logger.info(f"准备创建进程") # 注册进程信息 with self.lock: self.processes[proc.pid] = { "proc": proc, "command": command, "start_time": time.time(), "last_active": time.time(), "msg_queue": Queue(), "meta": meta } self.logger.info(f"准备开始监听") self._start_reader(proc.pid, proc.stdout, proc.stderr) self.logger.info( f"创建子进程 PID={proc.pid} | " f"命令: {command} | " ) return proc.pid except Exception as e: self.logger.error(f"创建进程失败: {str(e)}") return None def stop(self, missionId: int): plansStopped = [] pids = [] # to delete for pid in self.processes: if int(self.processes[pid]['meta']['mission']['id']) == missionId: pids.append(pid) plansStopped.append({'planId': int(self.processes[pid]['meta']['plan']['id'])}) for pid in pids: self.remove_process(pid) def _start_reader(self, pid:int, stdout, stderr): """为每个子进程启动独立的非阻塞读取线程""" def reader_loop(pid, out_pipe, queue: Queue): try: while True: # 非阻塞读取 stdout try: out_line = out_pipe.readline() # 管道关闭返回空串 if not out_line: break if out_line: queue.put(('stdout', out_line.strip())) except (IOError, ValueError): self.logger.info(f"进程消息读取错误 pid:{pid}") pass time.sleep(0.1) # 降低 CPU 占用 except Exception as e: self.logger.error(f"读取子进程消息错误: {str(e)}") def reader_err_loop(pid, err_pipe, queue: Queue): try: while True: # 非阻塞读取 stderr try: err_line = err_pipe.readline() # 管道关闭返回空串 if not err_line: break if err_line: queue.put(('stderr', err_line.strip())) except (IOError, ValueError): self.logger.info(f"进程错误读取错误 pid:{pid}") pass time.sleep(0.1) # 降低 CPU 占用 except Exception as e: self.logger.error(f"读取子进程消息错误: {str(e)}") # 创建并启动消息读取线程 t = threading.Thread( target=reader_loop, args=(pid, stdout, self.processes[pid]["msg_queue"]), daemon=True ) t.start() # 创建并启动错误读取线程 tE = threading.Thread( target=reader_err_loop, args=(pid, stderr, self.processes[pid]["msg_queue"]), daemon=True ) tE.start() self._reader_threads[pid] = [t, tE] def start_monitoring(self): """启动后台监控线程""" if self._running: return self._running = True self._monitor_thread = threading.Thread( target=self._monitor_loop, daemon=True # 随主进程退出 ) self._monitor_thread.start() self.logger.info("进程监控线程已启动") def stop_monitoring(self): """停止监控""" self._running = False if self._monitor_thread: self._monitor_thread.join() self.logger.info("进程监控线程已停止") def remove_process(self, missionId: int, planId: int): """删除进程监视任务""" for pid, info in self.processes.items(): if int(info["meta"]['mission']['id']) == int(missionId) and int(info["meta"]["plan"]["id"]) == int(planId): try: proc = psutil.Process(pid) proc.terminate() del self.processes[pid] # 清理读取线程 if pid in self._reader_threads: for t in self._reader_threads[pid]: t.join() del self._reader_threads[pid] self.logger.info(f"移除处理进程监视 MissionId: {missionId} PlanId: {planId}") return True except psutil.NoSuchProcess: self.logger.error(f"进程已自行退出 MissionId: {missionId} PlanId: {planId}") return True except Exception as error: self.logger.error(f"移除处理进程监视失败 MissionId: {missionId} PlanId: {planId} Error: {error}") return False self.logger.info(f"该处理进程不在监视中 MissionId: {missionId} PlanId: {planId}") return True def remove_process(self, pid_to_del: int): self.logger.error(f"移除处理进程-with pid") try: # 清理读取线程 if pid_to_del in self._reader_threads: for t in self._reader_threads[pid_to_del]: t.join() del self._reader_threads[pid_to_del] for pid, info in self.processes.items(): if pid == pid_to_del: missionId = info['meta']['mission']['id'] planId = info["meta"]["plan"]["id"] proc = psutil.Process(pid) proc.terminate() del self.processes[pid] self.logger.info(f"移除处理进程监视成功 MissionId: {missionId} PlanId: {planId}") return self.logger.error(f"未找到请求移除的进程 MissionId: {missionId} PlanId: {planId}") except psutil.NoSuchProcess: self.logger.error(f"进程已自行退出 MissionId: {missionId} PlanId: {planId}") except Exception as error: self.logger.error(f"移除处理进程监视-with pid失败 pid:{pid_to_del}") def _monitor_loop(self): """监控主循环""" while self._running: try: self._check_processes() except Exception as e: self.logger.error(f"监控循环异常: {str(e)}", exc_info=True) time.sleep(self.check_interval) def _check_processes(self): """执行进程状态检查""" current_time = time.time() dead_pids = [] reportDatas = [] with self.lock: for pid, info in self.processes.items(): try: proc = psutil.Process(pid) # 检测崩溃 if proc.status() == psutil.STATUS_ZOMBIE: self.logger.warning(f"进程 {pid} 处于僵尸状态") dead_pids.append(pid) self._handle_crash(pid, info) continue # 检测超时(假死) if (current_time - info["last_active"]) > self.timeout: self.logger.warning(f"进程 {pid} 超时,即将终止") proc.terminate() dead_pids.append(pid) self._handle_timeout(pid, info) continue # 更新活跃时间(可根据业务逻辑调整) info["last_active"] = current_time except psutil.NoSuchProcess: # 无法找到进程时,可能子进程已正常退出,读取所有管道中剩余数据,判断是否崩溃 response = {} try: while True: self.logger.info(f"111") pipe_type, message = info["msg_queue"].get_nowait() self.logger.info(f"222") response = self._handle_process_message(pid, pipe_type, message) self.logger.info(f"返回信息1{response}") except Empty: self.logger.info(f"333") pass except Exception as error: self.logger.error(f"ERROR:{error}") self.logger.info(f"返回信息2{response}") if 'finished' in response and response['finished']: # 虽然已经找不到进程,但是由于进程已正常退出 # 将获取的result缓存,待释放锁后向调度器汇报 reportDatas.append({ 'missionId': info["meta"]["mission"]["id"], 'planId': info["meta"]["plan"]["id"], 'state': 'DONE', 'results': response['results'], }) self.remove_process(pid) else: # 非正常退出(未传递finished信息) self._handle_crash(pid, info) # 无论如何都加入待清理列表 dead_pids.append(pid) # 正常读取子进程输出 try: while True: pipe_type, message = info["msg_queue"].get_nowait() response = self._handle_process_message(pid, pipe_type, message) if 'finished' in response and response['finished']: # 正常退出 # 将获取的result缓存,待释放锁后向调度器汇报 reportDatas.append({ 'missionId': info["meta"]["mission"]["id"], 'planId': info["meta"]["plan"]["id"], 'state': 'DONE', 'results': response['results'], }) self.remove_process(pid) dead_pids.append(pid) except Empty: pass # 清理已终止进程 for pid in dead_pids: del self.processes[pid] # 清理读取线程 if pid in self._reader_threads: for t in self._reader_threads[pid]: t.join() del self._reader_threads[pid] # 锁已释放 # 依次汇报已结束任务,获取下一步任务 for report in reportDatas: response = requests.post(SCHEDULER_BASE_URL + "/report", json=report) self.logger.info(f"进程结果已提交调度器 mission:{report['missionId']} plan:{report['planId']} response:{response}") def _handle_process_message(self, pid:int, pipe_type:str, message:str): """处理来自子进程的通信消息""" try: # 解析 JSON 格式消息 data = json.loads(message) self.logger.info(f"收到进程消息 PID={pid}: {data}") # 更新最后活跃时间 self.processes[pid]["last_active"] = time.time() # 处理完成消息 msg = data.get("msg") if msg == "progress": # 获得进度汇报,向django汇报 response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json=data.get("data")) return {'finished': False} if msg == "result": # 获得返回结果,向django汇报 response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json=data.get("data")) self.logger.info(f"进程结果已向后端服务器反馈 pid:{pid} response:{response}") self.logger.info(f"进程 {pid} 报告已完成") # 标记该进程正常退出 return {'finished': True, 'results': data.get('data')} return {'finished': False} except json.JSONDecodeError: self.logger.warning(f"无效消息格式 PID={pid}: {message}") return {'finished': False} def _handle_crash(self, pid: int, info: dict): """进程崩溃处理逻辑""" # 读取最后10行错误日志 try: with open(info["log_stderr"]) as f: last_lines = "".join(f.readlines()[-10:]) except Exception: last_lines = "无法读取日志" self.logger.error( f"进程崩溃 PID={pid}\n" f"命令: {info['command']}\n" f"最后错误输出:\n{last_lines}" ) # 发现进程崩溃,向django汇报任务失败 response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'progress': -1}) # 向flask发送进程崩溃信息 response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'DEAD', 'results': None}) def _handle_timeout(self, pid: int, info: dict): """进程超时处理逻辑""" # 记录诊断信息 self.logger.error(f"进程超时处理 PID={pid} | 运行时间: {time.time() - info['start_time']:.1f}s") # 向flask发送进程超时信息 response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'TIMEOUT', 'results': None})