from flask import Flask, request, jsonify from flask_sqlalchemy import SQLAlchemy from sqlalchemy import text import logging import csv from pathlib import Path import requests from utils import store, TaskState from processManager import ProcessManager BASE_DIR = Path(__file__).resolve().parent DB_DIR = BASE_DIR.parent / 'backend' / 'db.sqlite3' logger = logging.getLogger("Scheduler") app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + str(DB_DIR) + '?check_same_thread=False' app.config['SQLALCHEMY_ENGINE_OPTIONS'] = { 'pool_pre_ping': True, 'pool_recycle': 3600, 'connect_args': {'timeout': 20} } db = SQLAlchemy(app) # 创建并启动子进程管理器 manager = ProcessManager( # check_interval=1, # timeout=600, ) manager.start_monitoring() @app.route('/', methods=['GET']) def test(): return "OK" @app.route('/fetchData', methods=['GET']) def fetchData(): missionId = int(request.args.get("missionId")) planId = int(request.args.get("planId")) data = store.fetchData(missionId, planId) if data: # logger.info(f"传递初始数据 MissionId:{missionId} PlanId:{planId} Data:{data}") return jsonify(data) else: logger.error(f"获取计算任务输入数据错误 MissionId:{missionId} PlanId:{planId}") return jsonify([]) @app.route('/addMission', methods=['POST']) def add_mission(): mission = request.json['mission'] plans = request.json['plans'] # 将mission加入任务列表 if store.addMission(mission, plans): # 读取task列表,启动任务 for task in store.initMissionTasks(missionId=mission['id']): if not manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']): logger.error(f"创建计算任务失败 :MissionId:{mission['id']} PlanId:{task['plan']['id']}") return jsonify({"code": "ERROR", "status": ""}) return jsonify({"code": "OK", "status": "started"}) else: return jsonify({"code": "ERROR", "status": "Failed to create mission"}) @app.route('/pauseMission', methods=['POST']) def pause_mission(): mission = request.json['mission'] # 暂停mission的方法是停止当前正在执行的算法子进程,但是保留mission任务栈,等待恢复命令 # 在store中,mission的task保存了当前正在执行的所有plan,因此并不需要对store做特殊处理 plan = manager.stop(missionId=int(mission['id'])) if plan: return jsonify({"code": "OK", "status": "paused"}) else: return jsonify({"code": "ERROR", "status": "Failed to pause"}) @app.route('/resumeMission', methods=['POST']) def resume_mission(): mission = request.json['mission'] # store中的恢复mission即读取该mission现有的tasks列表,将其中的任务重新加入进程管理器中运行 resumedTasks = store.resumeMission(missionId=int(mission['id'])) for nextTask in resumedTasks: task = store.prepareTask(missionId=int(mission['id']), planId=nextTask['id']) if not manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']): logger.info(f"任务 MissionId:{mission['id']} 恢复运行出错,尝试将其移除") # 任务恢复运行出错时,将其彻底停止以避免bug if not manager.stop(missionId=int(mission['id'])): logger.info(f"移除 MissionId:{mission['id']} 运行中进程出错") if store.stopMission(missionId=int(mission['id'])): logger.info(f"移除 MissionId:{mission['id']} 成功") else: logger.info(f"移除 MissionId:{mission['id']} 任务数据出错") logger.info(f"任务 MissionId:{mission['id']} 已恢复") @app.route('/stopMission', methods=['POST']) def stop_mission(): # 被停止的mission将无法恢复,因此可以直接删除其所有上下文消息 mission = request.json['mission'] # 首先在进程管理器中停止运行的进程 if not manager.stop(missionId=int(mission['id'])): logger.info(f"停止 MissionId:{mission['id']} 出错,未能停止现有进程") # 其次在数据管理器中删除该Mission的所有数据 if store.stopMission(missionId=int(mission['id'])): logger.info(f"停止 MissionId:{mission['id']} 成功") else: logger.info(f"停止 MissionId:{mission['id']} 失败") @app.route('/get_status/') def get_status(task_id): # 实现状态查询逻辑 return jsonify({"status": "running", "progress": 75}) if __name__ == '__main__': app.run(port=5000, debug=False)