from flask import Flask, request, jsonify from flask_sqlalchemy import SQLAlchemy from sqlalchemy import text import logging import csv from pathlib import Path import requests from utils import Store, TaskState from processManager import ProcessManager BASE_DIR = Path(__file__).resolve().parent DB_DIR = BASE_DIR.parent / 'backend' / 'db.sqlite3' logger = logging.getLogger("Scheduler") store = Store() app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + str(DB_DIR) + '?check_same_thread=False' app.config['SQLALCHEMY_ENGINE_OPTIONS'] = { 'pool_pre_ping': True, 'pool_recycle': 3600, 'connect_args': {'timeout': 20} } db = SQLAlchemy(app) # 创建并启动子进程管理器 manager = ProcessManager( # check_interval=1, # timeout=600, ) manager.start_monitoring() @app.route('/', methods=['GET']) def test(): return "OK" @app.route('/fetchData', methods=['GET']) def fetchData(): missionId = int(request.args.get("missionId")) planId = int(request.args.get("planId")) data = store.fetchData(missionId, planId) if data: # logger.info(f"传递初始数据 MissionId:{missionId} PlanId:{planId} Data:{data}") return jsonify(data) else: logger.error(f"获取计算任务输入数据错误 MissionId:{missionId} PlanId:{planId}") return jsonify([]) @app.route('/report', methods=['POST']) def report(): missionId = int(request.json['missionId']) planId = int(request.json['planId']) state = request.json['state'] results = request.json['results'] if state == "CRASH": # 已无法找到进程,检查是否已经结束 if store.checkActiveTask(missionId=missionId, planId=planId, state=TaskState.DONE): return jsonify({"code": "OK"}) else: # logger.info(f"计算任务崩溃: MissionId:{mission['id']} PlanId:{plan['id']}") return jsonify({"code": "ERROR"}) if state == "DONE": if store.updateActiveTask(missionId=missionId, planId=planId, state=TaskState.DONE): logger.info(f"计算任务完成: MissionId:{missionId} PlanId:{planId}") # 现有任务完成,搜索下一任务 nextTasks = store.solveMission(missionId=missionId, planId=planId, results=results) logger.info(f"Next Tasks{nextTasks}") for nextTask in nextTasks: task = store.prepareTask(missionId=missionId, planId=nextTask['id']) if manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']): store.addActiveTask(missionId=missionId, planId=task['plan']['id']) logger.info(f"创建后续计算任务成功 MissionId:{missionId} PlanId:{task['plan']['id']}") else: logger.info(f"创建后续计算任务失败 MissionId:{missionId} PlanId:{task['plan']['id']}") return jsonify({"code": "OK"}) else: logger.error(f"计算任务无法完成: MissionId:{missionId} PlanId:{planId}") # 超时任务处理逻辑待完善 if state == "DEAD" or state == "TIMEOUT": if store.removeActiveTask(missionId=missionId, planId=planId): logger.info(f"移除终止计算任务: MissionId:{missionId} PlanId:{planId}") return jsonify({"code": "OK"}) else: logger.error(f"移除错误计算任务失败,已强制清除: MissionId:{missionId} PlanId:{planId}") return jsonify({"code": "ERROR"}) # 主控只负责构建任务队列,然后调用子进程,子进程运行结束后发送报告,查询队列,继续运行。同时向Django发起访问,将运行结果更新到数据库,前端轮询时即可获取结果 logger.error(f"无法识别的任务状态: MissionId:{missionId} PlanId:{planId} State:{state}") return jsonify({"code": "ERROR"}) @app.route('/addMission', methods=['POST']) def start_task(): mission = request.json['mission'] plans = request.json['plans'] # 将mission加入任务列表 if store.addMission(mission, plans): # 读取task列表,启动任务 for task in store.initMissionTasks(missionId=mission['id']): if manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']): store.addActiveTask(missionId=mission['id'], planId=task['plan']['id']) else: logger.error(f"创建计算任务失败 :MissionId:{mission['id']} PlanId:{task['plan']['id']}") return jsonify({"code": "OK", "status": "started"}) else: return jsonify({"code": "ERROR", "status": "duplicated"}) @app.route('/pauseMission', methods=['POST']) def pause_task(): mission = request.json['mission'] # 暂停mission的方法是停止当前正在执行的算法子进程,但是保留mission任务栈,等待恢复命令 # 在store中,mission的task保存了当前正在执行的所有plan,因此并不需要对store做特殊处理 plan = manager.stop(missionId=int(mission['id'])) if plan: return jsonify({"code": "OK", "status": "paused"}) else: return jsonify({"code": "ERROR", "status": "Failed to pause"}) @app.route('/resumeMission', methods=['POST']) def resume_task(): mission = request.json['mission'] resumedTasks = store.resumeMission(missionId=int(mission['id'])) for nextTask in resumedTasks: task = store.prepareTask(missionId=int(mission['id']), planId=nextTask['id']) if manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']): logger.info(f"任务 MissionId:{mission['id']} 已恢复") else: logger.info(f"任务 MissionId:{mission['id']} 恢复运行出错") def stop_task(): # 被停止的mission将无法恢复,因此可以直接删除其所有上下文消息 mission = request.json['mission'] plan = manager.stop(missionId=int(mission['id'])) if not plan: logger.info(f"停止 MissionId:{mission['id']} 出错,未能停止现有进程") if store.stopMission(missionId=int(mission['id'])): logger.info(f"停止 MissionId:{mission['id']} 成功") else: logger.info(f"停止 MissionId:{mission['id']} 失败") @app.route('/get_status/') def get_status(task_id): # 实现状态查询逻辑 return jsonify({"status": "running", "progress": 75}) if __name__ == '__main__': app.run(port=5000, debug=False)