123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- # process_manager.py
- import subprocess
- import psutil
- import logging
- import threading
- import time
- import os
- from typing import Dict, Optional, List, Union
- import requests
- from utils import SCHEDULER_BASE_URL, BACKEND_BASE_URL
- class ProcessManager:
- def __init__(self, check_interval: int = 5, timeout: int = 300):
- self.processes: Dict[int, dict] = {} # {pid: {meta}}
- self.lock = threading.Lock()
- self.check_interval = check_interval # 检查间隔(秒)
- self.timeout = timeout # 进程超时时间(秒)
- self._monitor_thread: Optional[threading.Thread] = None
- self._running = False
- # 配置日志
- self.log_dir: str = "process_logs"
- # 确保日志目录存在
- os.makedirs(self.log_dir, exist_ok=True)
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- self.logger = logging.getLogger("ProcessManager")
- def spawn(
- self,
- command: Union[str, List[str]],
- cwd: str = None,
- env: dict = None,
- shell: bool = False,
- **meta
- ) -> Optional[int]:
- """
- 创建并监控子进程
- :param command: 命令字符串或参数列表
- :param cwd: 工作目录
- :param env: 环境变量(默认继承当前环境)
- :param shell: 是否使用shell执行
- :param meta: 附加元数据
- :return: 成功返回PID,失败返回None
- """
- try:
- # 准备日志文件
- timestamp = time.strftime("%Y%m%d-%H%M%S")
- log_prefix = os.path.join(
- self.log_dir,
- f"proc_{timestamp}_{os.getpid()}"
- )
-
- # 打开日志文件
- with open(f"{log_prefix}.stdout", "w") as stdout_f, \
- open(f"{log_prefix}.stderr", "w") as stderr_f:
- # 创建子进程
- env = {**os.environ, **(env or {})}
- env.update({
- 'SCHEDULER_BASE_URL': SCHEDULER_BASE_URL,
- 'BACKEND_BASE_URL': BACKEND_BASE_URL,
- 'missionId': str(meta['mission']['id']),
- 'planId': str(meta['plan']['id']),
- })
- proc = subprocess.Popen(
- command,
- cwd=cwd,
- env=env,
- shell=shell,
- stdout=stdout_f,
- stderr=stderr_f,
- text=True,
- )
- # 注册进程信息
- with self.lock:
- self.processes[proc.pid] = {
- "proc": proc,
- "command": command,
- "start_time": time.time(),
- "last_active": time.time(),
- "log_stdout": stdout_f.name,
- "log_stderr": stderr_f.name,
- "meta": meta
- }
- self.logger.info(
- f"创建子进程 PID={proc.pid} | "
- f"命令: {command} | "
- f"日志: {log_prefix}.*"
- )
- return proc.pid
- except Exception as e:
- self.logger.error(f"创建进程失败: {str(e)}")
- return None
- def start_monitoring(self):
- """启动后台监控线程"""
- if self._running:
- return
- self._running = True
- self._monitor_thread = threading.Thread(
- target=self._monitor_loop,
- daemon=True # 随主进程退出
- )
- self._monitor_thread.start()
- self.logger.info("进程监控线程已启动")
- def stop_monitoring(self):
- """停止监控"""
- self._running = False
- if self._monitor_thread:
- self._monitor_thread.join()
- self.logger.info("进程监控线程已停止")
- def _monitor_loop(self):
- """监控主循环"""
- while self._running:
- try:
- self._check_processes()
- except Exception as e:
- self.logger.error(f"监控循环异常: {str(e)}", exc_info=True)
-
- time.sleep(self.check_interval)
- def _check_processes(self):
- """执行进程状态检查"""
- current_time = time.time()
- dead_pids = []
- with self.lock:
- for pid, info in self.processes.items():
- try:
- proc = psutil.Process(pid)
- # 检测崩溃
- if proc.status() == psutil.STATUS_ZOMBIE:
- self.logger.warning(f"进程 {pid} 处于僵尸状态")
- dead_pids.append(pid)
- self._handle_crash(pid, info)
- continue
- # 检测超时(假死)
- if (current_time - info["last_active"]) > self.timeout:
- self.logger.warning(f"进程 {pid} 超时,即将终止")
- proc.terminate()
- dead_pids.append(pid)
- self._handle_timeout(pid, info)
- continue
- # 更新活跃时间(可根据业务逻辑调整)
- info["last_active"] = current_time
- except psutil.NoSuchProcess:
- response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'CRASH', 'results': None})
- code = response.json()['code']
- if code == 'OK':
- self.logger.warning(f"进程 {pid} 已正常退出")
- dead_pids.append(pid)
- if code == 'ERROR':
- dead_pids.append(pid)
- self._handle_crash(pid, info)
- # 清理已终止进程
- for pid in dead_pids:
- del self.processes[pid]
- def _handle_crash(self, pid: int, info: dict):
- """进程崩溃处理逻辑"""
- # 读取最后10行错误日志
- try:
- with open(info["log_stderr"]) as f:
- last_lines = "".join(f.readlines()[-10:])
- except Exception:
- last_lines = "无法读取日志"
- self.logger.error(
- f"进程崩溃 PID={pid}\n"
- f"命令: {info['command']}\n"
- f"最后错误输出:\n{last_lines}"
- )
- # 向flask发送进程崩溃信息
- response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'DEAD', 'results': None})
- def _handle_timeout(self, pid: int, info: dict):
- """进程超时处理逻辑"""
- # 记录诊断信息
- self.logger.error(f"进程超时处理 PID={pid} | 运行时间: {time.time() - info['start_time']:.1f}s")
- # 向flask发送进程超时信息
- response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'TIMEOUT', 'results': None})
|