12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- from flask import Flask, request, jsonify
- from flask_sqlalchemy import SQLAlchemy
- from sqlalchemy import text
- import logging
- import csv
- from pathlib import Path
- import requests
- from utils import Store, TaskState
- from processManager import ProcessManager
- BASE_DIR = Path(__file__).resolve().parent
- DB_DIR = BASE_DIR.parent / 'backend' / 'db.sqlite3'
- logger = logging.getLogger("Scheduler")
- store = Store()
- app = Flask(__name__)
- app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + str(DB_DIR) + '?check_same_thread=False'
- app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
- 'pool_pre_ping': True,
- 'pool_recycle': 3600,
- 'connect_args': {'timeout': 20}
- }
- db = SQLAlchemy(app)
- # 创建并启动子进程管理器
- manager = ProcessManager(
- check_interval=10,
- timeout=600,
- )
- manager.start_monitoring()
- @app.route('/', methods=['GET'])
- def test():
- return "OK"
- @app.route('/report', methods=['POST'])
- def report():
- missionId = int(request.json['missionId'])
- planId = int(request.json['planId'])
- state = request.json['state']
- results = request.json['results']
- if state == "CRASH":
- # 已无法找到进程,检查是否已经结束
- if store.checkActiveTask(missionId=missionId, planId=planId, state=TaskState.DONE):
- return jsonify({"code": "OK"})
- else:
- # logger.info(f"计算任务崩溃: MissionId:{mission['id']} PlanId:{plan['id']}")
- return jsonify({"code": "ERROR"})
- if state == "DONE":
- if store.updateActiveTask(missionId=missionId, planId=planId, state=TaskState.DONE):
- logger.info(f"计算任务完成: MissionId:{missionId} PlanId:{planId}")
- return jsonify({"code": "OK"})
- else:
- logger.error(f"计算任务无法完成: MissionId:{missionId} PlanId:{planId}")
- # 超时任务处理逻辑待完善
- if state == "DEAD" or state == "TIMEOUT":
- if store.removeActiveTask(missionId=missionId, planId=planId):
- logger.info(f"移除错误计算任务: MissionId:{missionId} PlanId:{planId}")
- return jsonify({"code": "OK"})
- else:
- logger.error(f"移除错误计算任务失败,已强制清除: MissionId:{missionId} PlanId:{planId}")
- return jsonify({"code": "ERROR"})
- # 主控只负责构建任务队列,然后调用子进程,子进程运行结束后发送报告,查询队列,继续运行。同时向Django发起访问,将运行结果更新到数据库,前端轮询时即可获取结果
- logger.error(f"无法识别的任务状态: MissionId:{missionId} PlanId:{planId} State:{state}")
- return jsonify({"code": "ERROR"})
- @app.route('/addMission', methods=['POST'])
- def start_task():
- mission = request.json['mission']
- plans = request.json['plans']
- # 将mission加入任务列表
- if store.addMission(mission, plans):
- # 读取task列表,启动任务
- for task in store.prepareTasks(missionId=mission['id']):
- if manager.spawn(command=task['command'], cwd=task['cwd'], plan=task['plan'], mission=mission):
- store.addActiveTask(missionId=mission['id'], planId=task['plan']['id'])
- else:
- logger.error(f"创建计算任务失败 :MissionId:{mission['id']} PlanId:{task['plan']['id']}")
- return jsonify({"status": "started"})
- else:
- return jsonify({"startus": "duplicated"})
- @app.route('/get_status/<task_id>')
- def get_status(task_id):
- # 实现状态查询逻辑
- return jsonify({"status": "running", "progress": 75})
- if __name__ == '__main__':
- app.run(port=5000)
|