123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- from flask import Flask, request, jsonify
- from flask_sqlalchemy import SQLAlchemy
- from sqlalchemy import text
- import logging
- import csv
- from pathlib import Path
- import requests
- from utils import store, TaskState
- from processManager import ProcessManager
- import time
- BASE_DIR = Path(__file__).resolve().parent
- DB_DIR = BASE_DIR.parent / 'backend' / 'db.sqlite3'
- logger = logging.getLogger("Scheduler")
- app = Flask(__name__)
- app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + str(DB_DIR) + '?check_same_thread=False'
- app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
- 'pool_pre_ping': True,
- 'pool_recycle': 3600,
- 'connect_args': {'timeout': 20}
- }
- db = SQLAlchemy(app)
- # 创建并启动子进程管理器
- manager = ProcessManager(
- # check_interval=1,
- # timeout=600,
- )
- manager.start_monitoring()
- @app.route('/', methods=['GET'])
- def test():
- return "OK"
- @app.route('/fetchData', methods=['GET'])
- def fetchData():
- missionId = int(request.args.get("missionId"))
- planId = int(request.args.get("planId"))
- data = store.fetchData(missionId, planId)
- if data:
- # logger.info(f"传递初始数据 MissionId:{missionId} PlanId:{planId} Data:{data}")
- return jsonify(data)
- else:
- logger.error(f"获取计算任务输入数据错误 MissionId:{missionId} PlanId:{planId}")
- return jsonify([])
- @app.route('/addMission', methods=['POST'])
- def add_mission():
- mission = request.json['mission']
- plans = request.json['plans']
- # 将mission加入任务列表
- if store.addMission(mission, plans):
- # 读取task列表,启动任务
- for task in store.initMissionTasks(missionId=mission['id']):
- if not manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission'], startTime = time.localtime(), algorithm=task['algorithm']):
- logger.error(f"创建计算任务失败 :MissionId:{mission['id']} PlanId:{task['plan']['id']}")
- return jsonify({"code": "ERROR", "status": ""})
- return jsonify({"code": "OK", "status": "started"})
- else:
- return jsonify({"code": "ERROR", "status": "Failed to create mission"})
- @app.route('/pauseMission', methods=['POST'])
- def pause_mission():
- mission = request.json['mission']
- # 暂停mission的方法是停止当前正在执行的算法子进程,但是保留mission任务栈,等待恢复命令
- # 在store中,mission的task保存了当前正在执行的所有plan,因此并不需要对store做特殊处理
- plan = manager.stop(missionId=int(mission['id']))
- if plan:
- return jsonify({"code": "OK", "status": "paused"})
- else:
- return jsonify({"code": "ERROR", "status": "Failed to pause"})
-
- @app.route('/resumeMission', methods=['POST'])
- def resume_mission():
- mission = request.json['mission']
- plans = request.json['plans']
- # store中的恢复mission即读取该mission现有的tasks列表,将其中的任务重新加入进程管理器中运行
- # 需要传递plans,预防已经失去保存的mission信息时用于恢复
- resumedTasks = store.resumeMission(missionId=int(mission['id']), plans=plans)
- for nextTask in resumedTasks:
- task = store.prepareTask(missionId=int(mission['id']), planId=nextTask['id'])
- if not manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission'], startTime=time.localtime(), algorithm=task['algorithm']):
- logger.info(f"任务 MissionId:{mission['id']} 恢复运行出错,尝试将其移除")
- # 任务恢复运行出错时,将其彻底停止以避免bug
- if not manager.stop(missionId=int(mission['id'])):
- logger.info(f"移除 MissionId:{mission['id']} 运行中进程出错")
- if store.stopMission(missionId=int(mission['id'])):
- logger.info(f"移除 MissionId:{mission['id']} 成功")
- else:
- logger.info(f"移除 MissionId:{mission['id']} 任务数据出错")
- logger.info(f"任务 MissionId:{mission['id']} 已恢复")
- return jsonify({"code": "OK", "status": "任务已恢复"})
-
-
- @app.route('/stopMission', methods=['POST'])
- def stop_mission():
- # 被停止的mission将无法恢复,因此可以直接删除其所有上下文消息
- mission = request.json['mission']
- # 首先在进程管理器中停止运行的进程
- if not manager.stop(missionId=int(mission['id'])):
- logger.info(f"停止 MissionId:{mission['id']} 出错,未能停止现有进程")
- # 其次在数据管理器中删除该Mission的所有数据
- if store.stopMission(missionId=int(mission['id'])):
- logger.info(f"停止 MissionId:{mission['id']} 成功")
- return jsonify({"code": "OK", "status": "任务已停止"})
- else:
- logger.info(f"停止 MissionId:{mission['id']} 失败")
- return jsonify({"code": "ERROR", "status": "任务停止失败"})
- @app.route('/get_status/<task_id>')
- def get_status(task_id):
- # 实现状态查询逻辑
- return jsonify({"status": "running", "progress": 75})
- if __name__ == '__main__':
- app.run(port=5000, debug=False)
|