processManager.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. # process_manager.py
  2. import subprocess
  3. import psutil
  4. import logging
  5. import threading
  6. import time
  7. import os
  8. import json
  9. from queue import Queue, Empty
  10. from typing import Dict, Optional, List, Union
  11. import requests
  12. from utils import SCHEDULER_BASE_URL, BACKEND_BASE_URL
  13. class ProcessManager:
  14. def __init__(self, check_interval: int = 1, timeout: int = 300):
  15. self.processes: Dict[int, dict] = {} # {pid: {meta}}
  16. self.lock = threading.Lock()
  17. self.check_interval = check_interval # 检查间隔(秒)
  18. self.timeout = timeout # 进程超时时间(秒)
  19. self._monitor_thread: Optional[threading.Thread] = None
  20. self._running = False
  21. # 与子进程的通信队列相关参数
  22. self._reader_threads: Dict[int, threading.Thread] = {} # pid:
  23. # 配置日志
  24. self.log_dir: str = "process_logs"
  25. # 确保日志目录存在
  26. os.makedirs(self.log_dir, exist_ok=True)
  27. logging.basicConfig(
  28. level=logging.INFO,
  29. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  30. )
  31. self.logger = logging.getLogger("ProcessManager")
  32. # 准备日志文件
  33. timestamp = time.strftime("%Y%m%d-%H%M%S")
  34. log_prefix = os.path.join(
  35. self.log_dir,
  36. f"proc_{timestamp}_{os.getpid()}"
  37. )
  38. # 打开日志文件
  39. # with open(f"{log_prefix}.stdout", "w") as stdout_f, \
  40. # open(f"{log_prefix}.stderr", "w") as stderr_f:
  41. def spawn(
  42. self,
  43. command: Union[str, List[str]],
  44. cwd: str = None,
  45. env: dict = None,
  46. shell: bool = False,
  47. **meta
  48. ) -> Optional[int]:
  49. """
  50. 创建并监控子进程
  51. :param command: 命令字符串或参数列表
  52. :param cwd: 工作目录
  53. :param env: 环境变量(默认继承当前环境)
  54. :param shell: 是否使用shell执行
  55. :param meta: 附加元数据
  56. :return: 成功返回PID,失败返回None
  57. """
  58. try:
  59. # 创建子进程
  60. env = {**os.environ, **(env or {})}
  61. env.update({
  62. "PYTHONUNBUFFERED": "1", # 禁用缓冲
  63. 'SCHEDULER_BASE_URL': SCHEDULER_BASE_URL,
  64. 'BACKEND_BASE_URL': BACKEND_BASE_URL,
  65. 'missionId': str(meta['mission']['id']),
  66. 'planId': str(meta['plan']['id']),
  67. })
  68. proc = subprocess.Popen(
  69. command,
  70. cwd=cwd,
  71. env=env,
  72. shell=shell,
  73. stdout=subprocess.PIPE,
  74. stderr=subprocess.PIPE,
  75. text=True,
  76. )
  77. self.logger.info(f"准备创建进程")
  78. # 注册进程信息
  79. with self.lock:
  80. self.processes[proc.pid] = {
  81. "proc": proc,
  82. "command": command,
  83. "start_time": time.time(),
  84. "last_active": time.time(),
  85. "msg_queue": Queue(),
  86. "meta": meta
  87. }
  88. self.logger.info(f"准备开始监听")
  89. self._start_reader(proc.pid, proc.stdout, proc.stderr)
  90. self.logger.info(
  91. f"创建子进程 PID={proc.pid} | "
  92. f"命令: {command} | "
  93. )
  94. return proc.pid
  95. except Exception as e:
  96. self.logger.error(f"创建进程失败: {str(e)}")
  97. return None
  98. def stop(self, missionId: int):
  99. plansStopped = []
  100. pids = [] # to delete
  101. for pid in self.processes:
  102. if int(self.processes[pid]['meta']['mission']['id']) == missionId:
  103. pids.append(pid)
  104. plansStopped.append({'planId': int(self.processes[pid]['meta']['plan']['id'])})
  105. for pid in pids:
  106. self.remove_process(pid)
  107. def _start_reader(self, pid:int, stdout, stderr):
  108. """为每个子进程启动独立的非阻塞读取线程"""
  109. def reader_loop(pid, out_pipe, queue: Queue):
  110. try:
  111. while True:
  112. # 非阻塞读取 stdout
  113. try:
  114. out_line = out_pipe.readline()
  115. # 管道关闭返回空串
  116. if not out_line:
  117. break
  118. if out_line:
  119. queue.put(('stdout', out_line.strip()))
  120. except (IOError, ValueError):
  121. self.logger.info(f"进程消息读取错误 pid:{pid}")
  122. pass
  123. time.sleep(0.1) # 降低 CPU 占用
  124. except Exception as e:
  125. self.logger.error(f"读取子进程消息错误: {str(e)}")
  126. def reader_err_loop(pid, err_pipe, queue: Queue):
  127. try:
  128. while True:
  129. # 非阻塞读取 stderr
  130. try:
  131. err_line = err_pipe.readline()
  132. # 管道关闭返回空串
  133. if not err_line:
  134. break
  135. if err_line:
  136. queue.put(('stderr', err_line.strip()))
  137. except (IOError, ValueError):
  138. self.logger.info(f"进程错误读取错误 pid:{pid}")
  139. pass
  140. time.sleep(0.1) # 降低 CPU 占用
  141. except Exception as e:
  142. self.logger.error(f"读取子进程消息错误: {str(e)}")
  143. # 创建并启动消息读取线程
  144. t = threading.Thread(
  145. target=reader_loop,
  146. args=(pid, stdout, self.processes[pid]["msg_queue"]),
  147. daemon=True
  148. )
  149. t.start()
  150. # 创建并启动错误读取线程
  151. tE = threading.Thread(
  152. target=reader_err_loop,
  153. args=(pid, stderr, self.processes[pid]["msg_queue"]),
  154. daemon=True
  155. )
  156. tE.start()
  157. self._reader_threads[pid] = [t, tE]
  158. def start_monitoring(self):
  159. """启动后台监控线程"""
  160. if self._running:
  161. return
  162. self._running = True
  163. self._monitor_thread = threading.Thread(
  164. target=self._monitor_loop,
  165. daemon=True # 随主进程退出
  166. )
  167. self._monitor_thread.start()
  168. self.logger.info("进程监控线程已启动")
  169. def stop_monitoring(self):
  170. """停止监控"""
  171. self._running = False
  172. if self._monitor_thread:
  173. self._monitor_thread.join()
  174. self.logger.info("进程监控线程已停止")
  175. def remove_process(self, missionId: int, planId: int):
  176. """删除进程监视任务"""
  177. for pid, info in self.processes.items():
  178. if int(info["meta"]['mission']['id']) == int(missionId) and int(info["meta"]["plan"]["id"]) == int(planId):
  179. try:
  180. proc = psutil.Process(pid)
  181. proc.terminate()
  182. del self.processes[pid]
  183. # 清理读取线程
  184. if pid in self._reader_threads:
  185. for t in self._reader_threads[pid]:
  186. t.join()
  187. del self._reader_threads[pid]
  188. self.logger.info(f"移除处理进程监视 MissionId: {missionId} PlanId: {planId}")
  189. return True
  190. except psutil.NoSuchProcess:
  191. self.logger.error(f"进程已自行退出 MissionId: {missionId} PlanId: {planId}")
  192. return True
  193. except Exception as error:
  194. self.logger.error(f"移除处理进程监视失败 MissionId: {missionId} PlanId: {planId} Error: {error}")
  195. return False
  196. self.logger.info(f"该处理进程不在监视中 MissionId: {missionId} PlanId: {planId}")
  197. return True
  198. def remove_process(self, pid_to_del: int):
  199. self.logger.error(f"移除处理进程-with pid")
  200. try:
  201. # 清理读取线程
  202. if pid_to_del in self._reader_threads:
  203. for t in self._reader_threads[pid_to_del]:
  204. t.join()
  205. del self._reader_threads[pid_to_del]
  206. for pid, info in self.processes.items():
  207. if pid == pid_to_del:
  208. missionId = info['meta']['mission']['id']
  209. planId = info["meta"]["plan"]["id"]
  210. proc = psutil.Process(pid)
  211. proc.terminate()
  212. del self.processes[pid]
  213. self.logger.info(f"移除处理进程监视成功 MissionId: {missionId} PlanId: {planId}")
  214. return
  215. self.logger.error(f"未找到请求移除的进程 MissionId: {missionId} PlanId: {planId}")
  216. except psutil.NoSuchProcess:
  217. self.logger.error(f"进程已自行退出 MissionId: {missionId} PlanId: {planId}")
  218. except Exception as error:
  219. self.logger.error(f"移除处理进程监视-with pid失败 pid:{pid_to_del}")
  220. def _monitor_loop(self):
  221. """监控主循环"""
  222. while self._running:
  223. try:
  224. self._check_processes()
  225. except Exception as e:
  226. self.logger.error(f"监控循环异常: {str(e)}", exc_info=True)
  227. time.sleep(self.check_interval)
  228. def _check_processes(self):
  229. """执行进程状态检查"""
  230. current_time = time.time()
  231. dead_pids = []
  232. reportDatas = []
  233. with self.lock:
  234. for pid, info in self.processes.items():
  235. try:
  236. proc = psutil.Process(pid)
  237. # 检测崩溃
  238. if proc.status() == psutil.STATUS_ZOMBIE:
  239. self.logger.warning(f"进程 {pid} 处于僵尸状态")
  240. dead_pids.append(pid)
  241. self._handle_crash(pid, info)
  242. continue
  243. # 检测超时(假死)
  244. if (current_time - info["last_active"]) > self.timeout:
  245. self.logger.warning(f"进程 {pid} 超时,即将终止")
  246. proc.terminate()
  247. dead_pids.append(pid)
  248. self._handle_timeout(pid, info)
  249. continue
  250. # 更新活跃时间(可根据业务逻辑调整)
  251. info["last_active"] = current_time
  252. except psutil.NoSuchProcess:
  253. # 无法找到进程时,可能子进程已正常退出,读取所有管道中剩余数据,判断是否崩溃
  254. response = {}
  255. try:
  256. while True:
  257. self.logger.info(f"111")
  258. pipe_type, message = info["msg_queue"].get_nowait()
  259. self.logger.info(f"222")
  260. response = self._handle_process_message(pid, pipe_type, message)
  261. self.logger.info(f"返回信息1{response}")
  262. except Empty:
  263. self.logger.info(f"333")
  264. pass
  265. except Exception as error:
  266. self.logger.error(f"ERROR:{error}")
  267. self.logger.info(f"返回信息2{response}")
  268. if 'finished' in response and response['finished']:
  269. # 虽然已经找不到进程,但是由于进程已正常退出
  270. # 将获取的result缓存,待释放锁后向调度器汇报
  271. reportDatas.append({
  272. 'missionId': info["meta"]["mission"]["id"],
  273. 'planId': info["meta"]["plan"]["id"],
  274. 'state': 'DONE',
  275. 'results': response['results'],
  276. })
  277. self.remove_process(pid)
  278. else:
  279. # 非正常退出(未传递finished信息)
  280. self._handle_crash(pid, info)
  281. # 无论如何都加入待清理列表
  282. dead_pids.append(pid)
  283. # 正常读取子进程输出
  284. try:
  285. while True:
  286. pipe_type, message = info["msg_queue"].get_nowait()
  287. response = self._handle_process_message(pid, pipe_type, message)
  288. if 'finished' in response and response['finished']:
  289. # 正常退出
  290. # 将获取的result缓存,待释放锁后向调度器汇报
  291. reportDatas.append({
  292. 'missionId': info["meta"]["mission"]["id"],
  293. 'planId': info["meta"]["plan"]["id"],
  294. 'state': 'DONE',
  295. 'results': response['results'],
  296. })
  297. self.remove_process(pid)
  298. dead_pids.append(pid)
  299. except Empty:
  300. pass
  301. # 清理已终止进程
  302. for pid in dead_pids:
  303. del self.processes[pid]
  304. # 清理读取线程
  305. if pid in self._reader_threads:
  306. for t in self._reader_threads[pid]:
  307. t.join()
  308. del self._reader_threads[pid]
  309. # 锁已释放
  310. # 依次汇报已结束任务,获取下一步任务
  311. for report in reportDatas:
  312. response = requests.post(SCHEDULER_BASE_URL + "/report", json=report)
  313. self.logger.info(f"进程结果已提交调度器 mission:{report['missionId']} plan:{report['planId']} response:{response}")
  314. def _handle_process_message(self, pid:int, pipe_type:str, message:str):
  315. """处理来自子进程的通信消息"""
  316. try:
  317. # 解析 JSON 格式消息
  318. data = json.loads(message)
  319. self.logger.info(f"收到进程消息 PID={pid}: {data}")
  320. # 更新最后活跃时间
  321. self.processes[pid]["last_active"] = time.time()
  322. # 处理完成消息
  323. msg = data.get("msg")
  324. if msg == "progress":
  325. # 获得进度汇报,向django汇报
  326. response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json=data.get("data"))
  327. return {'finished': False}
  328. if msg == "result":
  329. # 获得返回结果,向django汇报
  330. response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json=data.get("data"))
  331. self.logger.info(f"进程结果已向后端服务器反馈 pid:{pid} response:{response}")
  332. self.logger.info(f"进程 {pid} 报告已完成")
  333. # 标记该进程正常退出
  334. return {'finished': True, 'results': data.get('data')}
  335. return {'finished': False}
  336. except json.JSONDecodeError:
  337. self.logger.warning(f"无效消息格式 PID={pid}: {message}")
  338. return {'finished': False}
  339. def _handle_crash(self, pid: int, info: dict):
  340. """进程崩溃处理逻辑"""
  341. # 读取最后10行错误日志
  342. try:
  343. with open(info["log_stderr"]) as f:
  344. last_lines = "".join(f.readlines()[-10:])
  345. except Exception:
  346. last_lines = "无法读取日志"
  347. self.logger.error(
  348. f"进程崩溃 PID={pid}\n"
  349. f"命令: {info['command']}\n"
  350. f"最后错误输出:\n{last_lines}"
  351. )
  352. # 发现进程崩溃,向django汇报任务失败
  353. response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'progress': -1})
  354. # 向flask发送进程崩溃信息
  355. response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'DEAD', 'results': None})
  356. def _handle_timeout(self, pid: int, info: dict):
  357. """进程超时处理逻辑"""
  358. # 记录诊断信息
  359. self.logger.error(f"进程超时处理 PID={pid} | 运行时间: {time.time() - info['start_time']:.1f}s")
  360. # 向flask发送进程超时信息
  361. response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'TIMEOUT', 'results': None})