processManager.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. # process_manager.py
  2. import subprocess
  3. import psutil
  4. import logging
  5. import threading
  6. import time
  7. import os
  8. import json
  9. from queue import Queue, Empty
  10. from typing import Dict, Optional, List, Union
  11. import requests
  12. from utils import store
  13. from utils import SCHEDULER_BASE_URL, BACKEND_BASE_URL
  14. class ProcessManager:
  15. def __init__(self, check_interval: int = 1, timeout: int = 300):
  16. self.processes: Dict[int, dict] = {} # {pid: {meta}}
  17. self.lock = threading.Lock()
  18. self.check_interval = check_interval # 检查间隔(秒)
  19. self.timeout = timeout # 进程超时时间(秒)
  20. self._monitor_thread: Optional[threading.Thread] = None
  21. self._running = False
  22. self.logger = logging.getLogger("ProcessManager")
  23. # 子进程的通信队列相关参数
  24. self._reader_threads: Dict[int, threading.Thread] = {} # pid:
  25. # 汇报信息队列相关参数
  26. self.report_queue = Queue()
  27. self._report_thread = threading.Thread(target=self._report_loop, daemon=True)
  28. self.max_batch = 20
  29. self.report_interval = 1 # 1s发送一次数据
  30. # 创建长连接session,避免多次握手开销
  31. self.report_session = requests.Session()
  32. # 配置日志
  33. self.log_dir: str = "process_logs"
  34. # 确保日志目录存在
  35. os.makedirs(self.log_dir, exist_ok=True)
  36. logging.basicConfig(
  37. level=logging.INFO,
  38. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  39. )
  40. # 准备日志文件
  41. timestamp = time.strftime("%Y%m%d-%H%M%S")
  42. log_prefix = os.path.join(
  43. self.log_dir,
  44. f"proc_{timestamp}_{os.getpid()}"
  45. )
  46. # 打开日志文件
  47. # with open(f"{log_prefix}.stdout", "w") as stdout_f, \
  48. # open(f"{log_prefix}.stderr", "w") as stderr_f:
  49. def _report_loop(self):
  50. # 只有启动监视之后才会运行
  51. self.logger.info("开始监控汇报信息")
  52. while self._running:
  53. try:
  54. batch = []
  55. start_time = time.time()
  56. while len(batch) < self.max_batch:
  57. try:
  58. item = self.report_queue.get_nowait()
  59. self.logger.info(f"从队列中读取到消息{item}")
  60. batch.append(item)
  61. except Empty:
  62. # 此处放置超时检测,当到达指定时间周期后,跳出循环发送数据
  63. # 注意例外情况,当batch列表被填满时,将直接发送
  64. if time.time() - start_time >= self.report_interval:
  65. break
  66. time.sleep(0.1)
  67. continue
  68. # 超时后发送数据
  69. self._report_batch(batch)
  70. except Exception as error:
  71. self.logger.error(f"批量汇报线程出现异常:{error}", exc_info=True)
  72. # 批量汇报函数
  73. def _report_batch(self, batch: list):
  74. # 汇报所有进度信息,-1表示任务终止
  75. report_progress = []
  76. for item in batch:
  77. # 检测item中存放的是什么数据
  78. report_progress.append(item)
  79. # 汇报系统性能信息
  80. report_performance = {}
  81. cpu_system = psutil.cpu_percent()
  82. mem_system = psutil.virtual_memory()
  83. mem_total = float(mem_system.total) / (1024**3)
  84. mem_used = float(mem_system.used) / (1024**3)
  85. # 放入系统性能占用信息
  86. report_performance['system'] = {
  87. 'cpu': round(cpu_system, 1),
  88. 'mem_total': mem_total,
  89. 'mem_used': mem_used,
  90. }
  91. # 放入进程性能占用信息
  92. report_performance['process'] = []
  93. for pid in self.processes:
  94. try:
  95. ps_proc = psutil.Process(pid)
  96. cpu = round(ps_proc.cpu_percent(interval=1) / psutil.cpu_count(), 1)
  97. mem = ps_proc.memory_info().rss / (1024**2) # 转为MB
  98. except psutil.NoSuchProcess:
  99. # 该进程已退出
  100. continue
  101. self.logger.info(f"获取到进程{pid}信息: CPU:{cpu} MEM:{mem}")
  102. report_performance['process'].append({
  103. 'planId': self.processes[pid]['meta']['plan']['id'],
  104. 'missionId': self.processes[pid]['meta']['mission']['id'],
  105. 'pid': pid,
  106. 'cpu': cpu,
  107. 'mem_used': mem,
  108. 'startTime': time.strftime("%Y-%m-%d %H:%M:%S", self.processes[pid]['meta']['startTime']),
  109. 'algorithm': self.processes[pid]['meta']['algorithm'],
  110. })
  111. # 发送汇报请求
  112. response = self.report_session.post(BACKEND_BASE_URL + "/rawDataTrans/", json={
  113. 'result': report_progress,
  114. 'performance': report_performance,
  115. })
  116. # self.logger.info(f"批量信息已向后端服务器汇报 response:{response}")
  117. def spawn(
  118. self,
  119. command: Union[str, List[str]],
  120. cwd: str = None,
  121. env: dict = None,
  122. shell: bool = False,
  123. **meta
  124. ) -> Optional[int]:
  125. """
  126. 创建并监控子进程
  127. :param command: 命令字符串或参数列表
  128. :param cwd: 工作目录
  129. :param env: 环境变量(默认继承当前环境)
  130. :param shell: 是否使用shell执行
  131. :param meta: 附加元数据
  132. :return: 成功返回PID,失败返回None
  133. """
  134. try:
  135. # 创建子进程
  136. env = {**os.environ, **(env or {})}
  137. env.update({
  138. "PYTHONUNBUFFERED": "1", # 禁用缓冲
  139. 'SCHEDULER_BASE_URL': SCHEDULER_BASE_URL,
  140. 'BACKEND_BASE_URL': BACKEND_BASE_URL,
  141. 'missionId': str(meta['mission']['id']),
  142. 'planId': str(meta['plan']['id']),
  143. })
  144. proc = subprocess.Popen(
  145. command,
  146. cwd=cwd,
  147. env=env,
  148. shell=shell,
  149. stdout=subprocess.PIPE,
  150. stderr=subprocess.PIPE,
  151. text=True,
  152. )
  153. self.logger.info(f"准备创建进程")
  154. # 注册进程信息
  155. with self.lock:
  156. self.processes[proc.pid] = {
  157. "proc": proc,
  158. "command": command,
  159. "start_time": time.time(),
  160. "last_active": time.time(),
  161. "msg_queue": Queue(),
  162. "meta": meta
  163. }
  164. self.logger.info(f"准备开始监听")
  165. self._start_reader(proc.pid, proc.stdout, proc.stderr)
  166. self.logger.info(
  167. f"创建子进程 PID={proc.pid} | "
  168. f"命令: {command} | "
  169. )
  170. return proc.pid
  171. except Exception as e:
  172. self.logger.error(f"创建进程失败: {str(e)}")
  173. return None
  174. def stop(self, missionId: int):
  175. plansStopped = []
  176. pids = [] # to delete
  177. try:
  178. for pid in self.processes:
  179. if int(self.processes[pid]['meta']['mission']['id']) == missionId:
  180. pids.append(pid)
  181. plansStopped.append({'planId': int(self.processes[pid]['meta']['plan']['id'])})
  182. for pid in pids:
  183. self.remove_process(pid)
  184. return True
  185. except Exception as error:
  186. self.logger.error(f"停止pid={pid}的Plan时发生错误,Error:{error}")
  187. return False
  188. def _start_reader(self, pid:int, stdout, stderr):
  189. """为每个子进程启动独立的非阻塞读取线程"""
  190. def reader_loop(pid, out_pipe, queue: Queue):
  191. try:
  192. while True:
  193. # 非阻塞读取 stdout
  194. try:
  195. out_line = out_pipe.readline()
  196. # 管道关闭返回空串
  197. if not out_line:
  198. break
  199. if out_line:
  200. queue.put(('stdout', out_line.strip()))
  201. except (IOError, ValueError):
  202. self.logger.info(f"进程消息读取错误 pid:{pid}")
  203. pass
  204. time.sleep(0.1) # 降低 CPU 占用
  205. except Exception as e:
  206. self.logger.error(f"读取子进程消息错误: {str(e)}")
  207. def reader_err_loop(pid, err_pipe, queue: Queue):
  208. try:
  209. while True:
  210. # 非阻塞读取 stderr
  211. try:
  212. err_line = err_pipe.readline()
  213. # 管道关闭返回空串
  214. if not err_line:
  215. break
  216. if err_line:
  217. queue.put(('stderr', err_line.strip()))
  218. except (IOError, ValueError):
  219. self.logger.info(f"进程错误读取错误 pid:{pid}")
  220. pass
  221. time.sleep(0.1) # 降低 CPU 占用
  222. except Exception as e:
  223. self.logger.error(f"读取子进程消息错误: {str(e)}")
  224. # 创建并启动消息读取线程
  225. t = threading.Thread(
  226. target=reader_loop,
  227. args=(pid, stdout, self.processes[pid]["msg_queue"]),
  228. daemon=True
  229. )
  230. t.start()
  231. # 创建并启动错误读取线程
  232. tE = threading.Thread(
  233. target=reader_err_loop,
  234. args=(pid, stderr, self.processes[pid]["msg_queue"]),
  235. daemon=True
  236. )
  237. tE.start()
  238. self._reader_threads[pid] = [t, tE]
  239. def start_monitoring(self):
  240. """启动后台监控线程"""
  241. if self._running:
  242. return
  243. self._running = True
  244. self._monitor_thread = threading.Thread(
  245. target=self._monitor_loop,
  246. daemon=True # 随主进程退出
  247. )
  248. self._monitor_thread.start()
  249. self.logger.info("启动汇报子线程")
  250. self._report_thread.start()
  251. self.logger.info("启动汇报子线程")
  252. self.logger.info("进程监控线程已启动")
  253. def stop_monitoring(self):
  254. """停止监控"""
  255. self._running = False
  256. if self._monitor_thread:
  257. self._monitor_thread.join()
  258. self.logger.info("进程监控线程已停止")
  259. def remove_process(self, missionId: int, planId: int):
  260. """删除进程监视任务"""
  261. for pid, info in self.processes.items():
  262. if int(info["meta"]['mission']['id']) == int(missionId) and int(info["meta"]["plan"]["id"]) == int(planId):
  263. try:
  264. proc = psutil.Process(pid)
  265. proc.terminate()
  266. except psutil.NoSuchProcess:
  267. self.logger.error(f"进程已自行退出 MissionId: {missionId} PlanId: {planId}")
  268. except Exception as error:
  269. self.logger.error(f"移除处理进程监视失败 MissionId: {missionId} PlanId: {planId} Error: {error}")
  270. return False
  271. # 不应该在这里移除,保持整个process遍历时字典内容不改变
  272. # # 移除监视进程列表
  273. # del self.processes[pid]
  274. # 清理读取线程
  275. if pid in self._reader_threads:
  276. for t in self._reader_threads[pid]:
  277. t.join()
  278. del self._reader_threads[pid]
  279. self.logger.info(f"移除处理进程监视 MissionId: {missionId} PlanId: {planId}")
  280. return True
  281. self.logger.info(f"该处理进程不在监视进程表中 MissionId: {missionId} PlanId: {planId}")
  282. return True
  283. def remove_process(self, pid_to_del: int):
  284. try:
  285. # 清理监视进程列表
  286. for pid, info in self.processes.items():
  287. if pid == pid_to_del:
  288. missionId = info['meta']['mission']['id']
  289. planId = info["meta"]["plan"]["id"]
  290. proc = psutil.Process(pid)
  291. proc.terminate()
  292. self.logger.info(f"移除处理进程监视成功 MissionId: {missionId} PlanId: {planId}")
  293. except psutil.NoSuchProcess:
  294. self.logger.error(f"进程已自行退出 MissionId: {missionId} PlanId: {planId}")
  295. except Exception as error:
  296. self.logger.error(f"移除处理进程监视-with pid失败 pid:{pid_to_del}")
  297. return False
  298. # 不应该在这里移除,保持整个process遍历时字典内容不改变
  299. # # 移除监视进程列表
  300. # del self.processes[pid]
  301. # 清理读取线程
  302. if pid_to_del in self._reader_threads:
  303. for t in self._reader_threads[pid_to_del]:
  304. t.join()
  305. del self._reader_threads[pid_to_del]
  306. return True
  307. def _monitor_loop(self):
  308. """监控主循环"""
  309. while self._running:
  310. try:
  311. self._check_processes()
  312. except Exception as e:
  313. self.logger.error(f"监控循环异常: {str(e)}", exc_info=True)
  314. time.sleep(self.check_interval)
  315. def _check_processes(self):
  316. """执行进程状态检查"""
  317. current_time = time.time()
  318. dead_pids = []
  319. reportDatas = []
  320. with self.lock:
  321. for pid, info in self.processes.items():
  322. try:
  323. proc = psutil.Process(pid)
  324. # 检测崩溃
  325. if proc.status() == psutil.STATUS_ZOMBIE:
  326. self.logger.warning(f"进程 {pid} 处于僵尸状态")
  327. dead_pids.append(pid)
  328. self._handle_crash(pid, info)
  329. continue
  330. # 检测超时(假死)
  331. if (current_time - info["last_active"]) > self.timeout:
  332. self.logger.warning(f"进程 {pid} 超时,即将终止")
  333. proc.terminate()
  334. dead_pids.append(pid)
  335. self._handle_timeout(pid, info)
  336. continue
  337. # 更新活跃时间(可根据业务逻辑调整)
  338. info["last_active"] = current_time
  339. except psutil.NoSuchProcess:
  340. # 无法找到进程时,可能子进程已正常退出,读取所有管道中剩余数据,判断是否崩溃
  341. response = {}
  342. try:
  343. while True:
  344. pipe_type, message = info["msg_queue"].get_nowait()
  345. response = self._handle_process_message(pid, pipe_type, message)
  346. except Empty:
  347. pass
  348. except Exception as error:
  349. self.logger.error(f"ERROR:{error}")
  350. if 'finished' in response and response['finished']:
  351. # 虽然已经找不到进程,但是由于进程已正常退出
  352. # 将获取的result缓存,待释放锁后向调度器汇报
  353. reportDatas.append({
  354. 'missionId': info["meta"]["mission"]["id"],
  355. 'planId': info["meta"]["plan"]["id"],
  356. 'state': 'DONE',
  357. 'results': response['results'],
  358. })
  359. self.remove_process(pid)
  360. else:
  361. # 非正常退出(未传递finished信息)
  362. self._handle_crash(pid, info)
  363. # 无论如何都加入待清理列表
  364. dead_pids.append(pid)
  365. # 正常读取子进程输出
  366. try:
  367. count = 0
  368. while True:
  369. pipe_type, message = info["msg_queue"].get_nowait()
  370. count += 1
  371. self.logger.info(f"Count is:{count}")
  372. response = self._handle_process_message(pid, pipe_type, message)
  373. if 'finished' in response and response['finished']:
  374. # 正常退出
  375. # 将获取的result缓存,待释放锁后向调度器汇报
  376. reportDatas.append({
  377. 'missionId': int(info["meta"]["mission"]["id"]),
  378. 'planId': int(info["meta"]["plan"]["id"]),
  379. 'state': 'DONE',
  380. 'results': response['results'],
  381. })
  382. self.remove_process(pid)
  383. dead_pids.append(pid)
  384. except Empty:
  385. pass
  386. # 清理已终止进程
  387. for pid in dead_pids:
  388. del self.processes[pid]
  389. # 清理读取线程
  390. if pid in self._reader_threads:
  391. for t in self._reader_threads[pid]:
  392. t.join()
  393. del self._reader_threads[pid]
  394. # 锁已释放
  395. # 依次提交已完成任务,获取下一步任务
  396. # 注意在收到results通信时,就已经将结果发送至django,这里不需要处理
  397. for report in reportDatas:
  398. # 向store提交
  399. missionId = report['missionId']
  400. planId=report['planId']
  401. results=report['results']
  402. for nextTask in store.solveMission(missionId=missionId, planId=planId, results=results):
  403. task = store.prepareTask(missionId=missionId, planId=nextTask['id'])
  404. if self.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission'], startTime=time.localtime(), algorithm=task['algorithm']):
  405. self.logger.info(f"创建后续计算任务成功 MissionId:{missionId} PlanId:{task['plan']['id']}")
  406. else:
  407. self.logger.error(f"创建后续计算任务失败 MissionId:{missionId} PlanId:{task['plan']['id']}")
  408. # 任务无法继续,向django汇报下一个任务失败
  409. self.report_queue.put({
  410. 'missionId': task['mission']['id'],
  411. 'planId': task['plan']['id'],
  412. 'progress': -1, # -1 表示单个任务失败 -2 表示全体mission失败
  413. })
  414. # response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json={
  415. # 'missionId': task['mission']['id'],
  416. # 'planId': task['plan']['id'],
  417. # 'progress': -1, # -1 表示单个任务失败 -2 表示全体mission失败
  418. # })
  419. def _handle_process_message(self, pid:int, pipe_type:str, message:str):
  420. """处理来自子进程的通信消息"""
  421. try:
  422. # 解析 JSON 格式消息
  423. data = json.loads(message)
  424. self.logger.info(f"收到进程消息 PID={pid}: {data}")
  425. # 更新最后活跃时间
  426. self.processes[pid]["last_active"] = time.time()
  427. # 处理完成消息
  428. msg = data.get("msg")
  429. if msg == "progress":
  430. # 获得进度汇报,向django汇报
  431. self.report_queue.put(data.get("data"))
  432. # response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json=data.get("data"))
  433. return {'finished': False}
  434. if msg == "result":
  435. # 获得返回结果,向django汇报
  436. self.report_queue.put(data.get("data"))
  437. # response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json=data.get("data"))
  438. self.logger.info(f"进程 {pid} 报告已完成")
  439. # 标记该进程正常退出
  440. return {'finished': True, 'results': data.get('data')}
  441. return {'finished': False}
  442. except json.JSONDecodeError:
  443. self.logger.warning(f"无效消息格式 PID={pid}: {message}")
  444. return {'finished': False}
  445. def _handle_crash(self, pid: int, info: dict):
  446. """进程崩溃处理逻辑"""
  447. # 读取最后10行错误日志
  448. # try:
  449. # with open(info["log_stderr"]) as f:
  450. # last_lines = "".join(f.readlines()[-10:])
  451. # except Exception:
  452. # last_lines = "无法读取日志"
  453. self.logger.error(
  454. f"进程崩溃 PID={pid}\n"
  455. f"命令: {info['command']}\n"
  456. )
  457. # 发现进程崩溃,向django汇报任务失败
  458. self.report_queue.put({
  459. 'missionId': info['meta']['mission']['id'],
  460. 'planId': info['meta']['plan']['id'],
  461. 'progress': -1, # -1 表示单个任务失败 -2 表示全体mission失败
  462. })
  463. # response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'progress': -1}) # -1表示单个任务失败
  464. # 调用store终止该mission
  465. store.removeMission(missionId=info['meta']['mission']['id'])
  466. self.logger.error(f"任务进程发生崩溃,Mission {info['meta']['mission']['id']}已终止")
  467. def _handle_timeout(self, pid: int, info: dict):
  468. """进程超时处理逻辑"""
  469. # 记录诊断信息
  470. self.logger.error(f"进程超时处理 PID={pid} | 运行时间: {time.time() - info['start_time']:.1f}s")
  471. # 调用store终止该mission
  472. store.removeMission(missionId=info['meta']['mission']['id'])
  473. self.logger.error(f"任务进程发生超时,Mission {info['meta']['mission']['id']}已终止")
  474. def __del__(self):
  475. self.report_session.close()