processManager.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. # process_manager.py
  2. import subprocess
  3. import psutil
  4. import logging
  5. import threading
  6. import time
  7. import os
  8. from typing import Dict, Optional, List, Union
  9. import requests
  10. from utils import SCHEDULER_BASE_URL, BACKEND_BASE_URL
  11. class ProcessManager:
  12. def __init__(self, check_interval: int = 5, timeout: int = 300):
  13. self.processes: Dict[int, dict] = {} # {pid: {meta}}
  14. self.lock = threading.Lock()
  15. self.check_interval = check_interval # 检查间隔(秒)
  16. self.timeout = timeout # 进程超时时间(秒)
  17. self._monitor_thread: Optional[threading.Thread] = None
  18. self._running = False
  19. # 配置日志
  20. self.log_dir: str = "process_logs"
  21. # 确保日志目录存在
  22. os.makedirs(self.log_dir, exist_ok=True)
  23. logging.basicConfig(
  24. level=logging.INFO,
  25. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  26. )
  27. self.logger = logging.getLogger("ProcessManager")
  28. def spawn(
  29. self,
  30. command: Union[str, List[str]],
  31. cwd: str = None,
  32. env: dict = None,
  33. shell: bool = False,
  34. **meta
  35. ) -> Optional[int]:
  36. """
  37. 创建并监控子进程
  38. :param command: 命令字符串或参数列表
  39. :param cwd: 工作目录
  40. :param env: 环境变量(默认继承当前环境)
  41. :param shell: 是否使用shell执行
  42. :param meta: 附加元数据
  43. :return: 成功返回PID,失败返回None
  44. """
  45. try:
  46. # 准备日志文件
  47. timestamp = time.strftime("%Y%m%d-%H%M%S")
  48. log_prefix = os.path.join(
  49. self.log_dir,
  50. f"proc_{timestamp}_{os.getpid()}"
  51. )
  52. # 打开日志文件
  53. with open(f"{log_prefix}.stdout", "w") as stdout_f, \
  54. open(f"{log_prefix}.stderr", "w") as stderr_f:
  55. # 创建子进程
  56. env = {**os.environ, **(env or {})}
  57. env.update({
  58. 'SCHEDULER_BASE_URL': SCHEDULER_BASE_URL,
  59. 'BACKEND_BASE_URL': BACKEND_BASE_URL,
  60. 'missionId': str(meta['mission']['id']),
  61. 'planId': str(meta['plan']['id']),
  62. })
  63. proc = subprocess.Popen(
  64. command,
  65. cwd=cwd,
  66. env=env,
  67. shell=shell,
  68. stdout=stdout_f,
  69. stderr=stderr_f,
  70. text=True,
  71. )
  72. # 注册进程信息
  73. with self.lock:
  74. self.processes[proc.pid] = {
  75. "proc": proc,
  76. "command": command,
  77. "start_time": time.time(),
  78. "last_active": time.time(),
  79. "log_stdout": stdout_f.name,
  80. "log_stderr": stderr_f.name,
  81. "meta": meta
  82. }
  83. self.logger.info(
  84. f"创建子进程 PID={proc.pid} | "
  85. f"命令: {command} | "
  86. f"日志: {log_prefix}.*"
  87. )
  88. return proc.pid
  89. except Exception as e:
  90. self.logger.error(f"创建进程失败: {str(e)}")
  91. return None
  92. def start_monitoring(self):
  93. """启动后台监控线程"""
  94. if self._running:
  95. return
  96. self._running = True
  97. self._monitor_thread = threading.Thread(
  98. target=self._monitor_loop,
  99. daemon=True # 随主进程退出
  100. )
  101. self._monitor_thread.start()
  102. self.logger.info("进程监控线程已启动")
  103. def stop_monitoring(self):
  104. """停止监控"""
  105. self._running = False
  106. if self._monitor_thread:
  107. self._monitor_thread.join()
  108. self.logger.info("进程监控线程已停止")
  109. def _monitor_loop(self):
  110. """监控主循环"""
  111. while self._running:
  112. try:
  113. self._check_processes()
  114. except Exception as e:
  115. self.logger.error(f"监控循环异常: {str(e)}", exc_info=True)
  116. time.sleep(self.check_interval)
  117. def _check_processes(self):
  118. """执行进程状态检查"""
  119. current_time = time.time()
  120. dead_pids = []
  121. with self.lock:
  122. for pid, info in self.processes.items():
  123. try:
  124. proc = psutil.Process(pid)
  125. # 检测崩溃
  126. if proc.status() == psutil.STATUS_ZOMBIE:
  127. self.logger.warning(f"进程 {pid} 处于僵尸状态")
  128. dead_pids.append(pid)
  129. self._handle_crash(pid, info)
  130. continue
  131. # 检测超时(假死)
  132. if (current_time - info["last_active"]) > self.timeout:
  133. self.logger.warning(f"进程 {pid} 超时,即将终止")
  134. proc.terminate()
  135. dead_pids.append(pid)
  136. self._handle_timeout(pid, info)
  137. continue
  138. # 更新活跃时间(可根据业务逻辑调整)
  139. info["last_active"] = current_time
  140. except psutil.NoSuchProcess:
  141. response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'CRASH', 'results': None})
  142. code = response.json()['code']
  143. if code == 'OK':
  144. self.logger.warning(f"进程 {pid} 已正常退出")
  145. dead_pids.append(pid)
  146. if code == 'ERROR':
  147. dead_pids.append(pid)
  148. self._handle_crash(pid, info)
  149. # 清理已终止进程
  150. for pid in dead_pids:
  151. del self.processes[pid]
  152. def _handle_crash(self, pid: int, info: dict):
  153. """进程崩溃处理逻辑"""
  154. # 读取最后10行错误日志
  155. try:
  156. with open(info["log_stderr"]) as f:
  157. last_lines = "".join(f.readlines()[-10:])
  158. except Exception:
  159. last_lines = "无法读取日志"
  160. self.logger.error(
  161. f"进程崩溃 PID={pid}\n"
  162. f"命令: {info['command']}\n"
  163. f"最后错误输出:\n{last_lines}"
  164. )
  165. # 向flask发送进程崩溃信息
  166. response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'DEAD', 'results': None})
  167. def _handle_timeout(self, pid: int, info: dict):
  168. """进程超时处理逻辑"""
  169. # 记录诊断信息
  170. self.logger.error(f"进程超时处理 PID={pid} | 运行时间: {time.time() - info['start_time']:.1f}s")
  171. # 向flask发送进程超时信息
  172. response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'TIMEOUT', 'results': None})