scheduler.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. from flask import Flask, request, jsonify
  2. from flask_sqlalchemy import SQLAlchemy
  3. from sqlalchemy import text
  4. import logging
  5. import csv
  6. from pathlib import Path
  7. import requests
  8. from utils import Store, TaskState
  9. from processManager import ProcessManager
  10. BASE_DIR = Path(__file__).resolve().parent
  11. DB_DIR = BASE_DIR.parent / 'backend' / 'db.sqlite3'
  12. logger = logging.getLogger("Scheduler")
  13. store = Store()
  14. app = Flask(__name__)
  15. app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + str(DB_DIR) + '?check_same_thread=False'
  16. app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
  17. 'pool_pre_ping': True,
  18. 'pool_recycle': 3600,
  19. 'connect_args': {'timeout': 20}
  20. }
  21. db = SQLAlchemy(app)
  22. # 创建并启动子进程管理器
  23. manager = ProcessManager(
  24. # check_interval=1,
  25. # timeout=600,
  26. )
  27. manager.start_monitoring()
  28. @app.route('/', methods=['GET'])
  29. def test():
  30. return "OK"
  31. @app.route('/fetchData', methods=['GET'])
  32. def fetchData():
  33. missionId = int(request.args.get("missionId"))
  34. planId = int(request.args.get("planId"))
  35. data = store.fetchData(missionId, planId)
  36. if data:
  37. # logger.info(f"传递初始数据 MissionId:{missionId} PlanId:{planId} Data:{data}")
  38. return jsonify(data)
  39. else:
  40. logger.error(f"获取计算任务输入数据错误 MissionId:{missionId} PlanId:{planId}")
  41. return jsonify([])
  42. @app.route('/report', methods=['POST'])
  43. def report():
  44. missionId = int(request.json['missionId'])
  45. planId = int(request.json['planId'])
  46. state = request.json['state']
  47. results = request.json['results']
  48. if state == "CRASH":
  49. # 已无法找到进程,检查是否已经结束
  50. if store.checkActiveTask(missionId=missionId, planId=planId, state=TaskState.DONE):
  51. return jsonify({"code": "OK"})
  52. else:
  53. # logger.info(f"计算任务崩溃: MissionId:{mission['id']} PlanId:{plan['id']}")
  54. return jsonify({"code": "ERROR"})
  55. if state == "DONE":
  56. if store.updateActiveTask(missionId=missionId, planId=planId, state=TaskState.DONE):
  57. logger.info(f"计算任务完成: MissionId:{missionId} PlanId:{planId}")
  58. # 现有任务完成,搜索下一任务
  59. nextTasks = store.solveMission(missionId=missionId, planId=planId, results=results)
  60. logger.info(f"Next Tasks{nextTasks}")
  61. for nextTask in nextTasks:
  62. task = store.prepareTask(missionId=missionId, planId=nextTask['id'])
  63. if manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']):
  64. store.addActiveTask(missionId=missionId, planId=task['plan']['id'])
  65. logger.info(f"创建后续计算任务成功 MissionId:{missionId} PlanId:{task['plan']['id']}")
  66. else:
  67. logger.info(f"创建后续计算任务失败 MissionId:{missionId} PlanId:{task['plan']['id']}")
  68. return jsonify({"code": "OK"})
  69. else:
  70. logger.error(f"计算任务无法完成: MissionId:{missionId} PlanId:{planId}")
  71. # 超时任务处理逻辑待完善
  72. if state == "DEAD" or state == "TIMEOUT":
  73. if store.removeActiveTask(missionId=missionId, planId=planId):
  74. logger.info(f"移除终止计算任务: MissionId:{missionId} PlanId:{planId}")
  75. return jsonify({"code": "OK"})
  76. else:
  77. logger.error(f"移除错误计算任务失败,已强制清除: MissionId:{missionId} PlanId:{planId}")
  78. return jsonify({"code": "ERROR"})
  79. # 主控只负责构建任务队列,然后调用子进程,子进程运行结束后发送报告,查询队列,继续运行。同时向Django发起访问,将运行结果更新到数据库,前端轮询时即可获取结果
  80. logger.error(f"无法识别的任务状态: MissionId:{missionId} PlanId:{planId} State:{state}")
  81. return jsonify({"code": "ERROR"})
  82. @app.route('/addMission', methods=['POST'])
  83. def start_task():
  84. mission = request.json['mission']
  85. plans = request.json['plans']
  86. # 将mission加入任务列表
  87. if store.addMission(mission, plans):
  88. # 读取task列表,启动任务
  89. for task in store.initMissionTasks(missionId=mission['id']):
  90. if manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']):
  91. store.addActiveTask(missionId=mission['id'], planId=task['plan']['id'])
  92. else:
  93. logger.error(f"创建计算任务失败 :MissionId:{mission['id']} PlanId:{task['plan']['id']}")
  94. return jsonify({"code": "OK", "status": "started"})
  95. else:
  96. return jsonify({"code": "ERROR", "status": "duplicated"})
  97. @app.route('/pauseMission', methods=['POST'])
  98. def pause_task():
  99. mission = request.json['mission']
  100. # 暂停mission的方法是停止当前正在执行的算法子进程,但是保留mission任务栈,等待恢复命令
  101. # 在store中,mission的task保存了当前正在执行的所有plan,因此并不需要对store做特殊处理
  102. plan = manager.stop(missionId=int(mission['id']))
  103. if plan:
  104. return jsonify({"code": "OK", "status": "paused"})
  105. else:
  106. return jsonify({"code": "ERROR", "status": "Failed to pause"})
  107. @app.route('/resumeMission', methods=['POST'])
  108. def resume_task():
  109. mission = request.json['mission']
  110. resumedTasks = store.resumeMission(missionId=int(mission['id']))
  111. for nextTask in resumedTasks:
  112. task = store.prepareTask(missionId=int(mission['id']), planId=nextTask['id'])
  113. if manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=task['mission']):
  114. logger.info(f"任务 MissionId:{mission['id']} 已恢复")
  115. else:
  116. logger.info(f"任务 MissionId:{mission['id']} 恢复运行出错")
  117. def stop_task():
  118. # 被停止的mission将无法恢复,因此可以直接删除其所有上下文消息
  119. mission = request.json['mission']
  120. plan = manager.stop(missionId=int(mission['id']))
  121. if not plan:
  122. logger.info(f"停止 MissionId:{mission['id']} 出错,未能停止现有进程")
  123. if store.stopMission(missionId=int(mission['id'])):
  124. logger.info(f"停止 MissionId:{mission['id']} 成功")
  125. else:
  126. logger.info(f"停止 MissionId:{mission['id']} 失败")
  127. @app.route('/get_status/<task_id>')
  128. def get_status(task_id):
  129. # 实现状态查询逻辑
  130. return jsonify({"status": "running", "progress": 75})
  131. if __name__ == '__main__':
  132. app.run(port=5000, debug=False)