|
@@ -5,6 +5,8 @@ import logging
|
|
import threading
|
|
import threading
|
|
import time
|
|
import time
|
|
import os
|
|
import os
|
|
|
|
+import json
|
|
|
|
+from queue import Queue, Empty
|
|
from typing import Dict, Optional, List, Union
|
|
from typing import Dict, Optional, List, Union
|
|
import requests
|
|
import requests
|
|
|
|
|
|
@@ -12,7 +14,7 @@ from utils import SCHEDULER_BASE_URL, BACKEND_BASE_URL
|
|
|
|
|
|
|
|
|
|
class ProcessManager:
|
|
class ProcessManager:
|
|
- def __init__(self, check_interval: int = 5, timeout: int = 300):
|
|
|
|
|
|
+ def __init__(self, check_interval: int = 1, timeout: int = 300):
|
|
self.processes: Dict[int, dict] = {} # {pid: {meta}}
|
|
self.processes: Dict[int, dict] = {} # {pid: {meta}}
|
|
self.lock = threading.Lock()
|
|
self.lock = threading.Lock()
|
|
self.check_interval = check_interval # 检查间隔(秒)
|
|
self.check_interval = check_interval # 检查间隔(秒)
|
|
@@ -20,6 +22,9 @@ class ProcessManager:
|
|
self._monitor_thread: Optional[threading.Thread] = None
|
|
self._monitor_thread: Optional[threading.Thread] = None
|
|
self._running = False
|
|
self._running = False
|
|
|
|
|
|
|
|
+ # 与子进程的通信队列相关参数
|
|
|
|
+ self._reader_threads: Dict[int, threading.Thread] = {} # pid:
|
|
|
|
+
|
|
# 配置日志
|
|
# 配置日志
|
|
self.log_dir: str = "process_logs"
|
|
self.log_dir: str = "process_logs"
|
|
# 确保日志目录存在
|
|
# 确保日志目录存在
|
|
@@ -30,6 +35,17 @@ class ProcessManager:
|
|
)
|
|
)
|
|
self.logger = logging.getLogger("ProcessManager")
|
|
self.logger = logging.getLogger("ProcessManager")
|
|
|
|
|
|
|
|
+ # 准备日志文件
|
|
|
|
+ timestamp = time.strftime("%Y%m%d-%H%M%S")
|
|
|
|
+ log_prefix = os.path.join(
|
|
|
|
+ self.log_dir,
|
|
|
|
+ f"proc_{timestamp}_{os.getpid()}"
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # 打开日志文件
|
|
|
|
+ # with open(f"{log_prefix}.stdout", "w") as stdout_f, \
|
|
|
|
+ # open(f"{log_prefix}.stderr", "w") as stderr_f:
|
|
|
|
+
|
|
def spawn(
|
|
def spawn(
|
|
self,
|
|
self,
|
|
command: Union[str, List[str]],
|
|
command: Union[str, List[str]],
|
|
@@ -48,58 +64,112 @@ class ProcessManager:
|
|
:return: 成功返回PID,失败返回None
|
|
:return: 成功返回PID,失败返回None
|
|
"""
|
|
"""
|
|
try:
|
|
try:
|
|
- # 准备日志文件
|
|
|
|
- timestamp = time.strftime("%Y%m%d-%H%M%S")
|
|
|
|
- log_prefix = os.path.join(
|
|
|
|
- self.log_dir,
|
|
|
|
- f"proc_{timestamp}_{os.getpid()}"
|
|
|
|
|
|
+ # 创建子进程
|
|
|
|
+ env = {**os.environ, **(env or {})}
|
|
|
|
+ env.update({
|
|
|
|
+ "PYTHONUNBUFFERED": "1", # 禁用缓冲
|
|
|
|
+ 'SCHEDULER_BASE_URL': SCHEDULER_BASE_URL,
|
|
|
|
+ 'BACKEND_BASE_URL': BACKEND_BASE_URL,
|
|
|
|
+ 'missionId': str(meta['mission']['id']),
|
|
|
|
+ 'planId': str(meta['plan']['id']),
|
|
|
|
+ })
|
|
|
|
+ proc = subprocess.Popen(
|
|
|
|
+ command,
|
|
|
|
+ cwd=cwd,
|
|
|
|
+ env=env,
|
|
|
|
+ shell=shell,
|
|
|
|
+ stdout=subprocess.PIPE,
|
|
|
|
+ stderr=subprocess.PIPE,
|
|
|
|
+ text=True,
|
|
)
|
|
)
|
|
-
|
|
|
|
- # 打开日志文件
|
|
|
|
- with open(f"{log_prefix}.stdout", "w") as stdout_f, \
|
|
|
|
- open(f"{log_prefix}.stderr", "w") as stderr_f:
|
|
|
|
-
|
|
|
|
- # 创建子进程
|
|
|
|
- env = {**os.environ, **(env or {})}
|
|
|
|
- env.update({
|
|
|
|
- 'SCHEDULER_BASE_URL': SCHEDULER_BASE_URL,
|
|
|
|
- 'BACKEND_BASE_URL': BACKEND_BASE_URL,
|
|
|
|
- 'missionId': str(meta['mission']['id']),
|
|
|
|
- 'planId': str(meta['plan']['id']),
|
|
|
|
- })
|
|
|
|
- proc = subprocess.Popen(
|
|
|
|
- command,
|
|
|
|
- cwd=cwd,
|
|
|
|
- env=env,
|
|
|
|
- shell=shell,
|
|
|
|
- stdout=stdout_f,
|
|
|
|
- stderr=stderr_f,
|
|
|
|
- text=True,
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- # 注册进程信息
|
|
|
|
- with self.lock:
|
|
|
|
- self.processes[proc.pid] = {
|
|
|
|
- "proc": proc,
|
|
|
|
- "command": command,
|
|
|
|
- "start_time": time.time(),
|
|
|
|
- "last_active": time.time(),
|
|
|
|
- "log_stdout": stdout_f.name,
|
|
|
|
- "log_stderr": stderr_f.name,
|
|
|
|
- "meta": meta
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- self.logger.info(
|
|
|
|
- f"创建子进程 PID={proc.pid} | "
|
|
|
|
- f"命令: {command} | "
|
|
|
|
- f"日志: {log_prefix}.*"
|
|
|
|
- )
|
|
|
|
- return proc.pid
|
|
|
|
|
|
+ self.logger.info(f"准备创建进程")
|
|
|
|
+ # 注册进程信息
|
|
|
|
+ with self.lock:
|
|
|
|
+ self.processes[proc.pid] = {
|
|
|
|
+ "proc": proc,
|
|
|
|
+ "command": command,
|
|
|
|
+ "start_time": time.time(),
|
|
|
|
+ "last_active": time.time(),
|
|
|
|
+ "msg_queue": Queue(),
|
|
|
|
+ "meta": meta
|
|
|
|
+ }
|
|
|
|
+ self.logger.info(f"准备开始监听")
|
|
|
|
+ self._start_reader(proc.pid, proc.stdout, proc.stderr)
|
|
|
|
+
|
|
|
|
+ self.logger.info(
|
|
|
|
+ f"创建子进程 PID={proc.pid} | "
|
|
|
|
+ f"命令: {command} | "
|
|
|
|
+ )
|
|
|
|
+ return proc.pid
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
self.logger.error(f"创建进程失败: {str(e)}")
|
|
self.logger.error(f"创建进程失败: {str(e)}")
|
|
return None
|
|
return None
|
|
|
|
|
|
|
|
+ def stop(self, missionId: int):
|
|
|
|
+ plansStopped = []
|
|
|
|
+ pids = [] # to delete
|
|
|
|
+ for pid in self.processes:
|
|
|
|
+ if int(self.processes[pid]['meta']['mission']['id']) == missionId:
|
|
|
|
+ pids.append(pid)
|
|
|
|
+ plansStopped.append({'planId': int(self.processes[pid]['meta']['plan']['id'])})
|
|
|
|
+ for pid in pids:
|
|
|
|
+ self.remove_process(pid)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ def _start_reader(self, pid:int, stdout, stderr):
|
|
|
|
+ """为每个子进程启动独立的非阻塞读取线程"""
|
|
|
|
+ def reader_loop(pid, out_pipe, queue: Queue):
|
|
|
|
+ try:
|
|
|
|
+ while True:
|
|
|
|
+ # 非阻塞读取 stdout
|
|
|
|
+ try:
|
|
|
|
+ out_line = out_pipe.readline()
|
|
|
|
+ # 管道关闭返回空串
|
|
|
|
+ if not out_line:
|
|
|
|
+ break
|
|
|
|
+ if out_line:
|
|
|
|
+ queue.put(('stdout', out_line.strip()))
|
|
|
|
+ except (IOError, ValueError):
|
|
|
|
+ self.logger.info(f"进程消息读取错误 pid:{pid}")
|
|
|
|
+ pass
|
|
|
|
+ time.sleep(0.1) # 降低 CPU 占用
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.logger.error(f"读取子进程消息错误: {str(e)}")
|
|
|
|
+ def reader_err_loop(pid, err_pipe, queue: Queue):
|
|
|
|
+ try:
|
|
|
|
+ while True:
|
|
|
|
+ # 非阻塞读取 stderr
|
|
|
|
+ try:
|
|
|
|
+ err_line = err_pipe.readline()
|
|
|
|
+ # 管道关闭返回空串
|
|
|
|
+ if not err_line:
|
|
|
|
+ break
|
|
|
|
+ if err_line:
|
|
|
|
+ queue.put(('stderr', err_line.strip()))
|
|
|
|
+ except (IOError, ValueError):
|
|
|
|
+ self.logger.info(f"进程错误读取错误 pid:{pid}")
|
|
|
|
+ pass
|
|
|
|
+ time.sleep(0.1) # 降低 CPU 占用
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.logger.error(f"读取子进程消息错误: {str(e)}")
|
|
|
|
+
|
|
|
|
+ # 创建并启动消息读取线程
|
|
|
|
+ t = threading.Thread(
|
|
|
|
+ target=reader_loop,
|
|
|
|
+ args=(pid, stdout, self.processes[pid]["msg_queue"]),
|
|
|
|
+ daemon=True
|
|
|
|
+ )
|
|
|
|
+ t.start()
|
|
|
|
+ # 创建并启动错误读取线程
|
|
|
|
+ tE = threading.Thread(
|
|
|
|
+ target=reader_err_loop,
|
|
|
|
+ args=(pid, stderr, self.processes[pid]["msg_queue"]),
|
|
|
|
+ daemon=True
|
|
|
|
+ )
|
|
|
|
+ tE.start()
|
|
|
|
+ self._reader_threads[pid] = [t, tE]
|
|
|
|
+
|
|
def start_monitoring(self):
|
|
def start_monitoring(self):
|
|
"""启动后台监控线程"""
|
|
"""启动后台监控线程"""
|
|
if self._running:
|
|
if self._running:
|
|
@@ -128,14 +198,47 @@ class ProcessManager:
|
|
proc = psutil.Process(pid)
|
|
proc = psutil.Process(pid)
|
|
proc.terminate()
|
|
proc.terminate()
|
|
del self.processes[pid]
|
|
del self.processes[pid]
|
|
|
|
+ # 清理读取线程
|
|
|
|
+ if pid in self._reader_threads:
|
|
|
|
+ for t in self._reader_threads[pid]:
|
|
|
|
+ t.join()
|
|
|
|
+ del self._reader_threads[pid]
|
|
self.logger.info(f"移除处理进程监视 MissionId: {missionId} PlanId: {planId}")
|
|
self.logger.info(f"移除处理进程监视 MissionId: {missionId} PlanId: {planId}")
|
|
return True
|
|
return True
|
|
|
|
+ except psutil.NoSuchProcess:
|
|
|
|
+ self.logger.error(f"进程已自行退出 MissionId: {missionId} PlanId: {planId}")
|
|
|
|
+ return True
|
|
except Exception as error:
|
|
except Exception as error:
|
|
self.logger.error(f"移除处理进程监视失败 MissionId: {missionId} PlanId: {planId} Error: {error}")
|
|
self.logger.error(f"移除处理进程监视失败 MissionId: {missionId} PlanId: {planId} Error: {error}")
|
|
return False
|
|
return False
|
|
self.logger.info(f"该处理进程不在监视中 MissionId: {missionId} PlanId: {planId}")
|
|
self.logger.info(f"该处理进程不在监视中 MissionId: {missionId} PlanId: {planId}")
|
|
return True
|
|
return True
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ def remove_process(self, pid_to_del: int):
|
|
|
|
+ self.logger.error(f"移除处理进程-with pid")
|
|
|
|
+ try:
|
|
|
|
+ # 清理读取线程
|
|
|
|
+ if pid_to_del in self._reader_threads:
|
|
|
|
+ for t in self._reader_threads[pid_to_del]:
|
|
|
|
+ t.join()
|
|
|
|
+ del self._reader_threads[pid_to_del]
|
|
|
|
+ for pid, info in self.processes.items():
|
|
|
|
+ if pid == pid_to_del:
|
|
|
|
+ missionId = info['meta']['mission']['id']
|
|
|
|
+ planId = info["meta"]["plan"]["id"]
|
|
|
|
+ proc = psutil.Process(pid)
|
|
|
|
+ proc.terminate()
|
|
|
|
+ del self.processes[pid]
|
|
|
|
+ self.logger.info(f"移除处理进程监视成功 MissionId: {missionId} PlanId: {planId}")
|
|
|
|
+ return
|
|
|
|
+ self.logger.error(f"未找到请求移除的进程 MissionId: {missionId} PlanId: {planId}")
|
|
|
|
+ except psutil.NoSuchProcess:
|
|
|
|
+ self.logger.error(f"进程已自行退出 MissionId: {missionId} PlanId: {planId}")
|
|
|
|
+ except Exception as error:
|
|
|
|
+ self.logger.error(f"移除处理进程监视-with pid失败 pid:{pid_to_del}")
|
|
|
|
+
|
|
|
|
+
|
|
|
|
|
|
def _monitor_loop(self):
|
|
def _monitor_loop(self):
|
|
"""监控主循环"""
|
|
"""监控主循环"""
|
|
@@ -151,12 +254,11 @@ class ProcessManager:
|
|
"""执行进程状态检查"""
|
|
"""执行进程状态检查"""
|
|
current_time = time.time()
|
|
current_time = time.time()
|
|
dead_pids = []
|
|
dead_pids = []
|
|
-
|
|
|
|
|
|
+ reportDatas = []
|
|
with self.lock:
|
|
with self.lock:
|
|
for pid, info in self.processes.items():
|
|
for pid, info in self.processes.items():
|
|
try:
|
|
try:
|
|
proc = psutil.Process(pid)
|
|
proc = psutil.Process(pid)
|
|
-
|
|
|
|
# 检测崩溃
|
|
# 检测崩溃
|
|
if proc.status() == psutil.STATUS_ZOMBIE:
|
|
if proc.status() == psutil.STATUS_ZOMBIE:
|
|
self.logger.warning(f"进程 {pid} 处于僵尸状态")
|
|
self.logger.warning(f"进程 {pid} 处于僵尸状态")
|
|
@@ -176,18 +278,104 @@ class ProcessManager:
|
|
info["last_active"] = current_time
|
|
info["last_active"] = current_time
|
|
|
|
|
|
except psutil.NoSuchProcess:
|
|
except psutil.NoSuchProcess:
|
|
- response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'CRASH', 'results': None})
|
|
|
|
- code = response.json()['code']
|
|
|
|
- if code == 'OK':
|
|
|
|
- self.logger.warning(f"进程 {pid} 已正常退出")
|
|
|
|
- dead_pids.append(pid)
|
|
|
|
- if code == 'ERROR':
|
|
|
|
- dead_pids.append(pid)
|
|
|
|
|
|
+ # 无法找到进程时,可能子进程已正常退出,读取所有管道中剩余数据,判断是否崩溃
|
|
|
|
+ response = {}
|
|
|
|
+ try:
|
|
|
|
+ while True:
|
|
|
|
+ self.logger.info(f"111")
|
|
|
|
+ pipe_type, message = info["msg_queue"].get_nowait()
|
|
|
|
+ self.logger.info(f"222")
|
|
|
|
+ response = self._handle_process_message(pid, pipe_type, message)
|
|
|
|
+ self.logger.info(f"返回信息1{response}")
|
|
|
|
+ except Empty:
|
|
|
|
+ self.logger.info(f"333")
|
|
|
|
+ pass
|
|
|
|
+ except Exception as error:
|
|
|
|
+ self.logger.error(f"ERROR:{error}")
|
|
|
|
+ self.logger.info(f"返回信息2{response}")
|
|
|
|
+ if 'finished' in response and response['finished']:
|
|
|
|
+ # 虽然已经找不到进程,但是由于进程已正常退出
|
|
|
|
+ # 将获取的result缓存,待释放锁后向调度器汇报
|
|
|
|
+ reportDatas.append({
|
|
|
|
+ 'missionId': info["meta"]["mission"]["id"],
|
|
|
|
+ 'planId': info["meta"]["plan"]["id"],
|
|
|
|
+ 'state': 'DONE',
|
|
|
|
+ 'results': response['results'],
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ self.remove_process(pid)
|
|
|
|
+ else:
|
|
|
|
+ # 非正常退出(未传递finished信息)
|
|
self._handle_crash(pid, info)
|
|
self._handle_crash(pid, info)
|
|
|
|
|
|
|
|
+ # 无论如何都加入待清理列表
|
|
|
|
+ dead_pids.append(pid)
|
|
|
|
+
|
|
|
|
+ # 正常读取子进程输出
|
|
|
|
+ try:
|
|
|
|
+ while True:
|
|
|
|
+ pipe_type, message = info["msg_queue"].get_nowait()
|
|
|
|
+ response = self._handle_process_message(pid, pipe_type, message)
|
|
|
|
+ if 'finished' in response and response['finished']:
|
|
|
|
+ # 正常退出
|
|
|
|
+ # 将获取的result缓存,待释放锁后向调度器汇报
|
|
|
|
+ reportDatas.append({
|
|
|
|
+ 'missionId': info["meta"]["mission"]["id"],
|
|
|
|
+ 'planId': info["meta"]["plan"]["id"],
|
|
|
|
+ 'state': 'DONE',
|
|
|
|
+ 'results': response['results'],
|
|
|
|
+ })
|
|
|
|
+ self.remove_process(pid)
|
|
|
|
+ dead_pids.append(pid)
|
|
|
|
+ except Empty:
|
|
|
|
+ pass
|
|
|
|
+
|
|
|
|
+
|
|
# 清理已终止进程
|
|
# 清理已终止进程
|
|
for pid in dead_pids:
|
|
for pid in dead_pids:
|
|
del self.processes[pid]
|
|
del self.processes[pid]
|
|
|
|
+ # 清理读取线程
|
|
|
|
+ if pid in self._reader_threads:
|
|
|
|
+ for t in self._reader_threads[pid]:
|
|
|
|
+ t.join()
|
|
|
|
+ del self._reader_threads[pid]
|
|
|
|
+ # 锁已释放
|
|
|
|
+ # 依次汇报已结束任务,获取下一步任务
|
|
|
|
+ for report in reportDatas:
|
|
|
|
+ response = requests.post(SCHEDULER_BASE_URL + "/report", json=report)
|
|
|
|
+ self.logger.info(f"进程结果已提交调度器 mission:{report['missionId']} plan:{report['planId']} response:{response}")
|
|
|
|
+
|
|
|
|
+ def _handle_process_message(self, pid:int, pipe_type:str, message:str):
|
|
|
|
+ """处理来自子进程的通信消息"""
|
|
|
|
+ try:
|
|
|
|
+ # 解析 JSON 格式消息
|
|
|
|
+ data = json.loads(message)
|
|
|
|
+ self.logger.info(f"收到进程消息 PID={pid}: {data}")
|
|
|
|
+
|
|
|
|
+ # 更新最后活跃时间
|
|
|
|
+ self.processes[pid]["last_active"] = time.time()
|
|
|
|
+
|
|
|
|
+ # 处理完成消息
|
|
|
|
+ msg = data.get("msg")
|
|
|
|
+ if msg == "progress":
|
|
|
|
+ # 获得进度汇报,向django汇报
|
|
|
|
+ response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json=data.get("data"))
|
|
|
|
+ return {'finished': False}
|
|
|
|
+
|
|
|
|
+ if msg == "result":
|
|
|
|
+ # 获得返回结果,向django汇报
|
|
|
|
+ response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json=data.get("data"))
|
|
|
|
+ self.logger.info(f"进程结果已向后端服务器反馈 pid:{pid} response:{response}")
|
|
|
|
+
|
|
|
|
+ self.logger.info(f"进程 {pid} 报告已完成")
|
|
|
|
+ # 标记该进程正常退出
|
|
|
|
+ return {'finished': True, 'results': data.get('data')}
|
|
|
|
+ return {'finished': False}
|
|
|
|
+
|
|
|
|
+ except json.JSONDecodeError:
|
|
|
|
+ self.logger.warning(f"无效消息格式 PID={pid}: {message}")
|
|
|
|
+ return {'finished': False}
|
|
|
|
+
|
|
|
|
|
|
def _handle_crash(self, pid: int, info: dict):
|
|
def _handle_crash(self, pid: int, info: dict):
|
|
"""进程崩溃处理逻辑"""
|
|
"""进程崩溃处理逻辑"""
|
|
@@ -203,6 +391,8 @@ class ProcessManager:
|
|
f"命令: {info['command']}\n"
|
|
f"命令: {info['command']}\n"
|
|
f"最后错误输出:\n{last_lines}"
|
|
f"最后错误输出:\n{last_lines}"
|
|
)
|
|
)
|
|
|
|
+ # 发现进程崩溃,向django汇报任务失败
|
|
|
|
+ response = requests.post(BACKEND_BASE_URL + "/rawDataTrans/", json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'progress': -1})
|
|
# 向flask发送进程崩溃信息
|
|
# 向flask发送进程崩溃信息
|
|
response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'DEAD', 'results': None})
|
|
response = requests.post(SCHEDULER_BASE_URL + '/report', json={'missionId': info['meta']['mission']['id'], 'planId': info['meta']['plan']['id'], 'state': 'DEAD', 'results': None})
|
|
|
|
|