123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- from flask import Flask, request, jsonify
- from flask_sqlalchemy import SQLAlchemy
- from sqlalchemy import text
- import logging
- import csv
- from pathlib import Path
- import requests
- from utils import Store, TaskState
- from processManager import ProcessManager
- BASE_DIR = Path(__file__).resolve().parent
- DB_DIR = BASE_DIR.parent / 'backend' / 'db.sqlite3'
- logger = logging.getLogger("Scheduler")
- store = Store()
- app = Flask(__name__)
- app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + str(DB_DIR) + '?check_same_thread=False'
- app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
- 'pool_pre_ping': True,
- 'pool_recycle': 3600,
- 'connect_args': {'timeout': 20}
- }
- db = SQLAlchemy(app)
- # 创建并启动子进程管理器
- manager = ProcessManager(
- # check_interval=1,
- # timeout=600,
- )
- manager.start_monitoring()
- @app.route('/', methods=['GET'])
- def test():
- return "OK"
- @app.route('/fetchData', methods=['GET'])
- def fetchData():
- missionId = int(request.args.get("missionId"))
- planId = int(request.args.get("planId"))
- data = store.fetchData(missionId, planId)
- if data:
- # logger.info(f"传递初始数据 MissionId:{missionId} PlanId:{planId} Data:{data}")
- return jsonify(data)
- else:
- logger.error(f"获取计算任务输入数据错误 MissionId:{missionId} PlanId:{planId}")
- return jsonify([])
- @app.route('/report', methods=['POST'])
- def report():
- missionId = int(request.json['missionId'])
- planId = int(request.json['planId'])
- state = request.json['state']
- results = request.json['results']
- if state == "CRASH":
- # 已无法找到进程,检查是否已经结束
- if store.checkActiveTask(missionId=missionId, planId=planId, state=TaskState.DONE):
- return jsonify({"code": "OK"})
- else:
- # logger.info(f"计算任务崩溃: MissionId:{mission['id']} PlanId:{plan['id']}")
- return jsonify({"code": "ERROR"})
- if state == "DONE":
- if store.updateActiveTask(missionId=missionId, planId=planId, state=TaskState.DONE):
- logger.info(f"计算任务完成: MissionId:{missionId} PlanId:{planId}")
- # 现有任务完成,搜索下一任务
- nextTasks = store.solveMission(missionId=missionId, planId=planId, results=results)
- logger.info(f"Next Tasks{nextTasks}")
- for nextTask in nextTasks:
- task = store.prepareTask(missionId=missionId, planId=nextTask['id'])
- if manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']):
- store.addActiveTask(missionId=missionId, planId=task['plan']['id'])
- logger.info(f"创建后续计算任务成功 MissionId:{missionId} PlanId:{task['plan']['id']}")
- else:
- logger.info(f"创建后续计算任务失败 MissionId:{missionId} PlanId:{task['plan']['id']}")
- return jsonify({"code": "OK"})
- else:
- logger.error(f"计算任务无法完成: MissionId:{missionId} PlanId:{planId}")
- # 超时任务处理逻辑待完善
- if state == "DEAD" or state == "TIMEOUT":
- if store.removeActiveTask(missionId=missionId, planId=planId):
- logger.info(f"移除终止计算任务: MissionId:{missionId} PlanId:{planId}")
- return jsonify({"code": "OK"})
- else:
- logger.error(f"移除错误计算任务失败,已强制清除: MissionId:{missionId} PlanId:{planId}")
- return jsonify({"code": "ERROR"})
- # 主控只负责构建任务队列,然后调用子进程,子进程运行结束后发送报告,查询队列,继续运行。同时向Django发起访问,将运行结果更新到数据库,前端轮询时即可获取结果
- logger.error(f"无法识别的任务状态: MissionId:{missionId} PlanId:{planId} State:{state}")
- return jsonify({"code": "ERROR"})
- @app.route('/addMission', methods=['POST'])
- def start_task():
- mission = request.json['mission']
- plans = request.json['plans']
- # 将mission加入任务列表
- if store.addMission(mission, plans):
- # 读取task列表,启动任务
- for task in store.initMissionTasks(missionId=mission['id']):
- if manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']):
- store.addActiveTask(missionId=mission['id'], planId=task['plan']['id'])
- else:
- logger.error(f"创建计算任务失败 :MissionId:{mission['id']} PlanId:{task['plan']['id']}")
- return jsonify({"code": "OK", "status": "started"})
- else:
- return jsonify({"code": "ERROR", "status": "duplicated"})
- @app.route('/pauseMission', methods=['POST'])
- def pause_task():
- mission = request.json['mission']
- # 暂停mission的方法是停止当前正在执行的算法子进程,但是保留mission任务栈,等待恢复命令
- # 在store中,mission的task保存了当前正在执行的所有plan,因此并不需要对store做特殊处理
- plan = manager.stop(missionId=int(mission['id']))
- if plan:
- return jsonify({"code": "OK", "status": "paused"})
- else:
- return jsonify({"code": "ERROR", "status": "Failed to pause"})
-
- @app.route('/resumeMission', methods=['POST'])
- def resume_task():
- mission = request.json['mission']
- resumedTasks = store.resumeMission(missionId=int(mission['id']))
- for nextTask in resumedTasks:
- task = store.prepareTask(missionId=int(mission['id']), planId=nextTask['id'])
- if manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']):
- logger.info(f"任务 MissionId:{mission['id']} 已恢复")
- else:
- logger.info(f"任务 MissionId:{mission['id']} 恢复运行出错")
- def stop_task():
- # 被停止的mission将无法恢复,因此可以直接删除其所有上下文消息
- mission = request.json['mission']
- plan = manager.stop(missionId=int(mission['id']))
- if not plan:
- logger.info(f"停止 MissionId:{mission['id']} 出错,未能停止现有进程")
- if store.stopMission(missionId=int(mission['id'])):
- logger.info(f"停止 MissionId:{mission['id']} 成功")
- else:
- logger.info(f"停止 MissionId:{mission['id']} 失败")
- @app.route('/get_status/<task_id>')
- def get_status(task_id):
- # 实现状态查询逻辑
- return jsonify({"status": "running", "progress": 75})
- if __name__ == '__main__':
- app.run(port=5000, debug=False)
|