123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404 |
- # process_manager.py
- import subprocess
- import psutil
- import logging
- import threading
- import time
- import os
- import json
- from queue import Queue, Empty
- from typing import Dict, Optional, List, Union
- import requests
- from utils import SCHEDULER_BASE_URL, BACKEND_BASE_URL
- class ProcessManager:
- def __init__(self, check_interval: int = 1, timeout: int = 300):
- self.processes: Dict[int, dict] = {} # {pid: {meta}}
- self.lock = threading.Lock()
- self.check_interval = check_interval # 检查间隔(秒)
- self.timeout = timeout # 进程超时时间(秒)
- self._monitor_thread: Optional[threading.Thread] = None
- self._running = False
- # 与子进程的通信队列相关参数
- self._reader_threads: Dict[int, threading.Thread] = {} # pid:
- # 配置日志
- self.log_dir: str = "process_logs"
- # 确保日志目录存在
- os.makedirs(self.log_dir, exist_ok=True)
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- self.logger = logging.getLogger("ProcessManager")
- # 准备日志文件
- timestamp = time.strftime("%Y%m%d-%H%M%S")
- log_prefix = os.path.join(
- self.log_dir,
- f"proc_{timestamp}_{os.getpid()}"
- )
-
- # 打开日志文件
- # with open(f"{log_prefix}.stdout", "w") as stdout_f, \
- # open(f"{log_prefix}.stderr", "w") as stderr_f:
- def spawn(
- self,
- command: Union[str, List[str]],
- cwd: str = None,
- env: dict = None,
- shell: bool = False,
- **meta
- ) -> Optional[int]:
- """
- 创建并监控子进程
- :param command: 命令字符串或参数列表
- :param cwd: 工作目录
- :param env: 环境变量(默认继承当前环境)
- :param shell: 是否使用shell执行
- :param meta: 附加元数据
- :return: 成功返回PID,失败返回None
- """
- try:
- # 创建子进程
- env = {**os.environ, **(env or {})}
- env.update({
- "PYTHONUNBUFFERED": "1", # 禁用缓冲
- 'SCHEDULER_BASE_URL': SCHEDULER_BASE_URL,
- 'BACKEND_BASE_URL': BACKEND_BASE_URL,
- 'missionId': str(meta['mission']['id']),
- 'planId': str(meta['plan']['id']),
- })
- proc = subprocess.Popen(
- command,
- cwd=cwd,
- env=env,
- shell=shell,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- text=True,
- )
- self.logger.info(f"准备创建进程")
- # 注册进程信息
- with self.lock:
- self.processes[proc.pid] = {
- "proc": proc,
- "command": command,
- "start_time": time.time(),
- "last_active": time.time(),
- "msg_queue": Queue(),
- "meta": meta
- }
- self.logger.info(f"准备开始监听")
- self._start_reader(proc.pid, proc.stdout, proc.stderr)
- self.logger.info(
- f"创建子进程 PID={proc.pid} | "
- f"命令: {command} | "
- )
- return proc.pid
- except Exception as e:
- self.logger.error(f"创建进程失败: {str(e)}")
- return None
- def stop(self, missionId: int):
- plansStopped = []
- pids = [] # to delete
- for pid in self.processes:
- if int(self.processes[pid]['meta']['mission']['id']) == missionId:
- pids.append(pid)
- plansStopped.append({'planId': int(self.processes[pid]['meta']['plan']['id'])})
- for pid in pids:
- self.remove_process(pid)
- def _start_reader(self, pid:int, stdout, stderr):
- """为每个子进程启动独立的非阻塞读取线程"""
- def reader_loop(pid, out_pipe, queue: Queue):
- try:
- while True:
- # 非阻塞读取 stdout
- try:
- out_line = out_pipe.readline()
- # 管道关闭返回空串
- if not out_line:
- break
- if out_line:
- queue.put(('stdout', out_line.strip()))
- except (IOError, ValueError):
- self.logger.info(f"进程消息读取错误 pid:{pid}")
- pass
- time.sleep(0.1) # 降低 CPU 占用
- except Exception as e:
- self.logger.error(f"读取子进程消息错误: {str(e)}")
- def reader_err_loop(pid, err_pipe, queue: Queue):
- try:
- while True:
- # 非阻塞读取 stderr
- try:
- err_line = err_pipe.readline()
- # 管道关闭返回空串
- if not err_line:
- break
- if err_line:
- queue.put(('stderr', err_line.strip()))
- except (IOError, ValueError):
- self.logger.info(f"进程错误读取错误 pid:{pid}")
- pass
- time.sleep(0.1) # 降低 CPU 占用
- except Exception as e:
- self.logger.error(f"读取子进程消息错误: {str(e)}")
- # 创建并启动消息读取线程
- t = threading.Thread(
- target=reader_loop,
- args=(pid, stdout, self.processes[pid]["msg_queue"]),
- daemon=True
- )
- t.start()
- # 创建并启动错误读取线程
- tE = threading.Thread(
- target=reader_err_loop,
- args=(pid, stderr, self.processes[pid]["msg_queue"]),
- daemon=True
- )
- tE.start()
- self._reader_threads[pid] = [t, tE]
- def start_monitoring(self):
- """启动后台监控线程"""
- if self._running:
- return
- self._running = True
- self._monitor_thread = threading.Thread(
- target=self._monitor_loop,
- daemon=True # 随主进程退出
- )
- self._monitor_thread.start()
- self.logger.info("进程监控线程已启动")
- def stop_monitoring(self):
- """停止监控"""
- self._running = False
- if self._monitor_thread:
- self._monitor_thread.join()
- self.logger.info("进程监控线程已停止")
- def remove_process(self, missionId: int, planId: int):
- """删除进程监视任务"""
- for pid, info in self.processes.items():
- if int(info["meta"]['mission']['id']) == int(missionId) and int(info["meta"]["plan"]["id"]) == int(planId):
- try:
- proc = psutil.Process(pid)
- proc.terminate()
- del self.processes[pid]
- # 清理读取线程
- if pid in self._reader_threads:
- for t in self._reader_threads[pid]:
- t.join()
- del self._reader_threads[pid]
- self.logger.info(f"移除处理进程监视 MissionId: {missionId} PlanId: {planId}")
- return True
- except psutil.NoSuchProcess:
- self.logger.error(f"进程已自行退出 MissionId: {missionId} PlanId: {planId}")
- return True
- except Exception as error:
- self.logger.error(f"移除处理进程监视失败 MissionId: {missionId} PlanId: {planId} Error: {error}")
- return False
- self.logger.info(f"该处理进程不在监视中 MissionId: {missionId} PlanId: {planId}")
- return True
-
- def remove_process(self, pid_to_del: int):
- self.logger.error(f"移除处理进程-with pid")
- try:
- # 清理读取线程
- if pid_to_del in self._reader_threads:
- for t in self._reader_threads[pid_to_del]:
- t.join()
- del self._reader_threads[pid_to_del]
- for pid, info in self.processes.items():
- if pid == pid_to_del:
- missionId = info['meta']['mission']['id']
- planId = info["meta"]["plan"]["id"]
- proc = psutil.Process(pid)
- proc.terminate()
- del self.processes[pid]
- self.logger.info(f"移除处理进程监视成功 MissionId: {missionId} PlanId: {planId}")
- return
- self.logger.error(f"未找到请求移除的进程 MissionId: {missionId} PlanId: {planId}")
- except psutil.NoSuchProcess:
- self.logger.error(f"进程已自行退出 MissionId: {missionId} PlanId: {planId}")
- except Exception as error:
- self.logger.error(f"移除处理进程监视-with pid失败 pid:{pid_to_del}")
- def _monitor_loop(self):
- """监控主循环"""
- while self._running:
- try:
- self._check_processes()
- except Exception as e:
- self.logger.error(f"监控循环异常: {str(e)}", exc_info=True)
-
- time.sleep(self.check_interval)
- def _check_processes(self):
- """执行进程状态检查"""
- current_time = time.time()
- dead_pids = []
- reportDatas = []
- with self.lock:
- for pid, info in self.processes.items():
- try:
- proc = psutil.Process(pid)
- # 检测崩溃
- if proc.status() == psutil.STATUS_ZOMBIE:
- self.logger.warning(f"进程 {pid} 处于僵尸状态")
- dead_pids.append(pid)
- self._handle_crash(pid, info)
- continue
- # 检测超时(假死)
- if (current_time - info["last_active"]) > self.timeout:
- self.logger.warning(f"进程 {pid} 超时,即将终止")
- proc.terminate()
- dead_pids.append(pid)
- self._handle_timeout(pid, info)
- continue
- # 更新活跃时间(可根据业务逻辑调整)
- info["last_active"] = current_time
- except psutil.NoSuchProcess:
- # 无法找到进程时,可能子进程已正常退出,读取所有管道中剩余数据,判断是否崩溃
- response = {}
- try:
- while True:
- self.logger.info(f"111")
- pipe_type, message = info["msg_queue"].get_nowait()
- self.logger.info(f"222")
- response = self._handle_process_message(pid, pipe_type, message)
- self.logger.info(f"返回信息1{response}")
- except Empty:
- self.logger.info(f"333")
- pass
- except Exception as error:
- self.logger.error(f"ERROR:{error}")
- self.logger.info(f"返回信息2{response}")
- if 'finished' in response and response['finished']:
- # 虽然已经找不到进程,但是由于进程已正常退出
- # 将获取的result缓存,待释放锁后向调度器汇报
- reportDatas.append({
- 'missionId': info["meta"]["mission"]["id"],
- 'planId': info["meta"]["plan"]["id"],
- 'state': 'DONE',
- 'results': response['results'],
- })
-
- self.remove_process(pid)
- else:
- # 非正常退出(未传递finished信息)
- self._handle_crash(pid, info)
- # 无论如何都加入待清理列表
- dead_pids.append(pid)
-
- # 正常读取子进程输出
- try:
- while True:
- pipe_type, message = info["msg_queue"].get_nowait()
- response = self._handle_process_message(pid, pipe_type, message)
- if 'finished' in response and response['finished']:
- # 正常退出
- # 将获取的result缓存,待释放锁后向调度器汇报
- reportDatas.append({
- 'missionId': info["meta"]["mission"]["id"],
- 'planId': info["meta"]["plan"]["id"],
- 'state': 'DONE',
- 'results': response['results'],
- })
- self.remove_process(pid)
- dead_pids.append(pid)
- except Empty:
- pass
-
- # 清理已终止进程
- for pid in dead_pids:
- del self.processes[pid]
- # 清理读取线程
- if pid in self._reader_threads:
- for t in self._reader_threads[pid]:
- t.join()
- del self._reader_threads[pid]
- # 锁已释放
- # 依次汇报已结束任务,获取下一步任务
- for report in reportDatas:
- response = requests.post(SCHEDULER_BASE_URL + "/report", json=report)
- self.logger.info(f"进程结果已提交调度器 mission:{report['missionId']} plan:{report['planId']} response:{response}")
- def _handle_process_message(self, pid:int, pipe_type:str, message:str):
- """处理来自子进程的通信消息"""
- try:
- # 解析 JSON 格式消息
- data = json.loads(message)
- self.logger.info(f"收到进程消息 PID={pid}: {data}")
- # 更新最后活跃时间
- self.processes[pid]["last_active"] = time.time()
- # 处理完成消息
- msg = data.get("msg")
- if msg == "progress":
- # 获得进度汇报,向django汇报
- response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json=data.get("data"))
- return {'finished': False}
-
- if msg == "result":
- # 获得返回结果,向django汇报
- response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json=data.get("data"))
- self.logger.info(f"进程结果已向后端服务器反馈 pid:{pid} response:{response}")
- self.logger.info(f"进程 {pid} 报告已完成")
- # 标记该进程正常退出
- return {'finished': True, 'results': data.get('data')}
- return {'finished': False}
- except json.JSONDecodeError:
- self.logger.warning(f"无效消息格式 PID={pid}: {message}")
- return {'finished': False}
- def _handle_crash(self, pid: int, info: dict):
- """进程崩溃处理逻辑"""
- # 读取最后10行错误日志
- try:
- with open(info["log_stderr"]) as f:
- last_lines = "".join(f.readlines()[-10:])
- except Exception:
- last_lines = "无法读取日志"
- self.logger.error(
- f"进程崩溃 PID={pid}\n"
- f"命令: {info['command']}\n"
- f"最后错误输出:\n{last_lines}"
- )
- # 发现进程崩溃,向django汇报任务失败
- response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'progress': -1})
- # 向flask发送进程崩溃信息
- response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'DEAD', 'results': None})
- def _handle_timeout(self, pid: int, info: dict):
- """进程超时处理逻辑"""
- # 记录诊断信息
- self.logger.error(f"进程超时处理 PID={pid} | 运行时间: {time.time() - info['start_time']:.1f}s")
- # 向flask发送进程超时信息
- response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'TIMEOUT', 'results': None})
|