scheduler.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. from flask import Flask, request, jsonify
  2. from flask_sqlalchemy import SQLAlchemy
  3. from sqlalchemy import text
  4. import logging
  5. import csv
  6. from pathlib import Path
  7. import requests
  8. from utils import store, TaskState
  9. from processManager import ProcessManager
  10. BASE_DIR = Path(__file__).resolve().parent
  11. DB_DIR = BASE_DIR.parent / 'backend' / 'db.sqlite3'
  12. logger = logging.getLogger("Scheduler")
  13. app = Flask(__name__)
  14. app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + str(DB_DIR) + '?check_same_thread=False'
  15. app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
  16. 'pool_pre_ping': True,
  17. 'pool_recycle': 3600,
  18. 'connect_args': {'timeout': 20}
  19. }
  20. db = SQLAlchemy(app)
  21. # 创建并启动子进程管理器
  22. manager = ProcessManager(
  23. # check_interval=1,
  24. # timeout=600,
  25. )
  26. manager.start_monitoring()
  27. @app.route('/', methods=['GET'])
  28. def test():
  29. return "OK"
  30. @app.route('/fetchData', methods=['GET'])
  31. def fetchData():
  32. missionId = int(request.args.get("missionId"))
  33. planId = int(request.args.get("planId"))
  34. data = store.fetchData(missionId, planId)
  35. if data:
  36. # logger.info(f"传递初始数据 MissionId:{missionId} PlanId:{planId} Data:{data}")
  37. return jsonify(data)
  38. else:
  39. logger.error(f"获取计算任务输入数据错误 MissionId:{missionId} PlanId:{planId}")
  40. return jsonify([])
  41. @app.route('/addMission', methods=['POST'])
  42. def add_mission():
  43. mission = request.json['mission']
  44. plans = request.json['plans']
  45. # 将mission加入任务列表
  46. if store.addMission(mission, plans):
  47. # 读取task列表,启动任务
  48. for task in store.initMissionTasks(missionId=mission['id']):
  49. if not manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']):
  50. logger.error(f"创建计算任务失败 :MissionId:{mission['id']} PlanId:{task['plan']['id']}")
  51. return jsonify({"code": "ERROR", "status": ""})
  52. return jsonify({"code": "OK", "status": "started"})
  53. else:
  54. return jsonify({"code": "ERROR", "status": "Failed to create mission"})
  55. @app.route('/pauseMission', methods=['POST'])
  56. def pause_mission():
  57. mission = request.json['mission']
  58. # 暂停mission的方法是停止当前正在执行的算法子进程,但是保留mission任务栈,等待恢复命令
  59. # 在store中,mission的task保存了当前正在执行的所有plan,因此并不需要对store做特殊处理
  60. plan = manager.stop(missionId=int(mission['id']))
  61. if plan:
  62. return jsonify({"code": "OK", "status": "paused"})
  63. else:
  64. return jsonify({"code": "ERROR", "status": "Failed to pause"})
  65. @app.route('/resumeMission', methods=['POST'])
  66. def resume_mission():
  67. mission = request.json['mission']
  68. # store中的恢复mission即读取该mission现有的tasks列表,将其中的任务重新加入进程管理器中运行
  69. resumedTasks = store.resumeMission(missionId=int(mission['id']))
  70. for nextTask in resumedTasks:
  71. task = store.prepareTask(missionId=int(mission['id']), planId=nextTask['id'])
  72. if not manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']):
  73. logger.info(f"任务 MissionId:{mission['id']} 恢复运行出错,尝试将其移除")
  74. # 任务恢复运行出错时,将其彻底停止以避免bug
  75. if not manager.stop(missionId=int(mission['id'])):
  76. logger.info(f"移除 MissionId:{mission['id']} 运行中进程出错")
  77. if store.stopMission(missionId=int(mission['id'])):
  78. logger.info(f"移除 MissionId:{mission['id']} 成功")
  79. else:
  80. logger.info(f"移除 MissionId:{mission['id']} 任务数据出错")
  81. logger.info(f"任务 MissionId:{mission['id']} 已恢复")
  82. @app.route('/stopMission', methods=['POST'])
  83. def stop_mission():
  84. # 被停止的mission将无法恢复,因此可以直接删除其所有上下文消息
  85. mission = request.json['mission']
  86. # 首先在进程管理器中停止运行的进程
  87. if not manager.stop(missionId=int(mission['id'])):
  88. logger.info(f"停止 MissionId:{mission['id']} 出错,未能停止现有进程")
  89. # 其次在数据管理器中删除该Mission的所有数据
  90. if store.stopMission(missionId=int(mission['id'])):
  91. logger.info(f"停止 MissionId:{mission['id']} 成功")
  92. else:
  93. logger.info(f"停止 MissionId:{mission['id']} 失败")
  94. @app.route('/get_status/<task_id>')
  95. def get_status(task_id):
  96. # 实现状态查询逻辑
  97. return jsonify({"status": "running", "progress": 75})
  98. if __name__ == '__main__':
  99. app.run(port=5000, debug=False)