scheduler.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. from flask import Flask, request, jsonify
  2. from flask_sqlalchemy import SQLAlchemy
  3. from sqlalchemy import text
  4. import logging
  5. import csv
  6. from pathlib import Path
  7. import requests
  8. from utils import Store, TaskState
  9. from processManager import ProcessManager
  10. BASE_DIR = Path(__file__).resolve().parent
  11. DB_DIR = BASE_DIR.parent / 'backend' / 'db.sqlite3'
  12. logger = logging.getLogger("Scheduler")
  13. store = Store()
  14. app = Flask(__name__)
  15. app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + str(DB_DIR) + '?check_same_thread=False'
  16. app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
  17. 'pool_pre_ping': True,
  18. 'pool_recycle': 3600,
  19. 'connect_args': {'timeout': 20}
  20. }
  21. db = SQLAlchemy(app)
  22. # 创建并启动子进程管理器
  23. manager = ProcessManager(
  24. check_interval=10,
  25. timeout=600,
  26. )
  27. manager.start_monitoring()
  28. @app.route('/', methods=['GET'])
  29. def test():
  30. return "OK"
  31. @app.route('/report', methods=['POST'])
  32. def report():
  33. missionId = int(request.json['missionId'])
  34. planId = int(request.json['planId'])
  35. state = request.json['state']
  36. results = request.json['results']
  37. if state == "CRASH":
  38. # 已无法找到进程,检查是否已经结束
  39. if store.checkActiveTask(missionId=missionId, planId=planId, state=TaskState.DONE):
  40. return jsonify({"code": "OK"})
  41. else:
  42. # logger.info(f"计算任务崩溃: MissionId:{mission['id']} PlanId:{plan['id']}")
  43. return jsonify({"code": "ERROR"})
  44. if state == "DONE":
  45. if store.updateActiveTask(missionId=missionId, planId=planId, state=TaskState.DONE):
  46. logger.info(f"计算任务完成: MissionId:{missionId} PlanId:{planId}")
  47. return jsonify({"code": "OK"})
  48. else:
  49. logger.error(f"计算任务无法完成: MissionId:{missionId} PlanId:{planId}")
  50. # 超时任务处理逻辑待完善
  51. if state == "DEAD" or state == "TIMEOUT":
  52. if store.removeActiveTask(missionId=missionId, planId=planId):
  53. logger.info(f"移除错误计算任务: MissionId:{missionId} PlanId:{planId}")
  54. return jsonify({"code": "OK"})
  55. else:
  56. logger.error(f"移除错误计算任务失败,已强制清除: MissionId:{missionId} PlanId:{planId}")
  57. return jsonify({"code": "ERROR"})
  58. # 主控只负责构建任务队列,然后调用子进程,子进程运行结束后发送报告,查询队列,继续运行。同时向Django发起访问,将运行结果更新到数据库,前端轮询时即可获取结果
  59. logger.error(f"无法识别的任务状态: MissionId:{missionId} PlanId:{planId} State:{state}")
  60. return jsonify({"code": "ERROR"})
  61. @app.route('/addMission', methods=['POST'])
  62. def start_task():
  63. mission = request.json['mission']
  64. plans = request.json['plans']
  65. # 将mission加入任务列表
  66. if store.addMission(mission, plans):
  67. # 读取task列表,启动任务
  68. for task in store.prepareTasks(missionId=mission['id']):
  69. if manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=mission):
  70. store.addActiveTask(missionId=mission['id'], planId=task['plan']['id'])
  71. else:
  72. logger.error(f"创建计算任务失败 :MissionId:{mission['id']} PlanId:{task['plan']['id']}")
  73. return jsonify({"status": "started"})
  74. else:
  75. return jsonify({"startus": "duplicated"})
  76. @app.route('/get_status/<task_id>')
  77. def get_status(task_id):
  78. # 实现状态查询逻辑
  79. return jsonify({"status": "running", "progress": 75})
  80. if __name__ == '__main__':
  81. app.run(port=5000)