|
- import requests, csv
- from pathlib import Path
- from enum import Enum
- import logging
- logger = logging.getLogger("Store")
- SCHEDULER_BASE_URL = "http://localhost:5000"
- BACKEND_BASE_URL = "http://localhost:8000/api"
- BASE_DIR = Path(__file__).resolve().parent
- class TaskState(Enum):
- INIT = 0
- CALCULATING = 1
- DONE = 2
- # 从配置文件中读取可用的算法
- algoList = []
- algoConfig = csv.reader(open('./algorithms.config', 'r'))
- for algo in algoConfig:
- algoList.append({
- 'name': algo[0],
- 'path': BASE_DIR / algo[1],
- 'command': algo[2].split(' '),
- })
- class Store:
- missions = []
- # 添加任务时仅放入第一级待计算plans,后续每个plan完成计算后根据children列表寻找下一个plan继续计算
- def addMission(self, mission: dict, plans: list):
- # 判断是否存在重复mission
- if [m for m in self.missions if m['id'] == mission['id']]:
- logger.error("存在重复Mission,无法新建mission")
- return False
- # 寻找初始并行任务, 默认第一个是根plan,后续所有都是它的子节点
- originPlans = []
- root = plans.pop(0)
- for childId in root['children']:
- for p in [plan for plan in plans if plan['id'] == childId]:
- # 找到匹配项则从plans中弹出,放入originPlans中
- originPlans.append(p)
- # 将mission中传递的初始数据赋值给origins
- if not 'nodes' in p or not p['nodes']:
- if 'nodes' in root:
- p['nodes'] = root['nodes']
- elif 'nodes' in mission:
- p['nodes'] = mission['nodes']
- else:
- print("无法从初始任务中找到节点信息")
- if not 'edges' in p or not p['edges']:
- if 'edges' in root:
- p['edges'] = root['edges']
- elif 'edges' in mission:
- p['edges'] = mission['edges']
- else:
- logger.error("无法从初始任务中找到边信息")
- break
-
- tasks = []
- # 第一级plans,所有当前任务叫做tasks
- tasks.extend([p['id'] for p in originPlans])
- self.missions.append({
- 'id': mission['id'],
- 'plans': plans,
- 'tasks': tasks,
- })
- return True
-
- # 当出现进程错误或需要移除mission时使用
- def removeMission(self, missionId: int):
- mission = [m for m in self.missions if m['id'] == missionId]
- if not mission:
- logger.error("移除Mission时出错,未找到该Mission")
- return
- else:
- mission = mission[0]
- index = self.missions.index(mission)
- del self.missions[index]
- logger.info(f"Mission: {missionId} 已移除")
-
- # mission中的某项任务完成计算
- def solveMission(self, missionId: int, planId: int, results: dict):
- mission = [m for m in self.missions if m['id'] == missionId]
- if not mission:
- return []
- else:
- mission = mission[0]
- if planId in mission['tasks']:
- # 从tasks中弹出当前task
- mission['tasks'].remove(planId)
- # 在plans中找到对用ID的plan
- taskP = [plan for plan in mission['plans'] if plan['id'] == planId][0]
- children = taskP['children']
- # 如果已经是叶子,则不需要继续调用处理程序
- if not children:
- # 如果全部处理完成,这里输出一下
- logger.info(f"这里的tasks是 {mission['tasks']}")
- if not mission['tasks']:
- logger.info(f"全部计算任务处理完成 MissionId: {mission['id']}")
- # 然后删除全部处理任务
- try:
- self.missions.remove(mission)
- except Exception as error:
- logger.error(f"删除完成的处理任务失败 MissionId: {mission['id']} Error: {error}")
- return []
- # 将完成的task的子节点压入tasks
- mission['tasks'].extend(children)
- # 将完成节点的结果数据作为子节点的输入数据
- for plan in [p for p in mission['plans'] if p['id'] in children]:
- plan['nodes'] = results['nodes']
- plan['edges'] = results['edges']
- # 把已完成的plan从plans中删除
- mission['plans'] = [p for p in mission['plans'] if p['id'] != planId]
- else:
- logger.error("给出的Mission中找不到完成的任务信息")
- return []
- # 返回下一轮需要调用的任务,如果存在并行,则列表长度大于1
- return [plan for plan in mission['plans'] if plan['id'] in children]
-
- def resumeMission(self, missionId: int):
- mission = [m for m in self.missions if m['id'] == missionId]
- if not mission:
- logger.error("未找到要恢复的mission")
- else:
- mission = mission[0]
- # 返回所有的tasks,由于暂停时仅在maanger中停止了进程,而没有在store中处理,因此tasks中保存的就是此前的运行任务
- return [plan for plan in mission['plans'] if plan['id'] in mission['tasks']]
-
- def stopMission(self, missionId: int):
- mission = [m for m in self.missions if m['id'] == missionId]
- if not mission:
- logger.error("未找到要停止的mission")
- return False
- # 停止后不需恢复,所以直接摘除该mission即可
- self.missions = [m for m in self.missions if m['id'] != missionId]
- return True
- def initMissionTasks(self, missionId: int):
- mission = [m for m in self.missions if m['id'] == missionId]
- if not mission:
- logger.error("未找到mission,是否错误传入更新请求")
- return []
- else:
- mission = mission[0]
- preparedTasks = []
- for plan in [plan for plan in mission['plans'] if plan['id'] in mission['tasks']]:
- algorithm = [algo for algo in algoList if algo['name'] == plan['algorithm']]
- if not algorithm:
- logger.error("未找到选择的算法,是否需要更新算法列表?")
- return []
- else:
- algorithm = algorithm[0]
- preparedTasks.append({
- 'mission': {
- 'id': mission['id'],
- },
- 'plan': {
- 'id': plan['id'],
- },
- 'cwd': str(algorithm['path']),
- 'command': algorithm['command'],
- })
- logger.info(f"初始化计算任务:{preparedTasks}")
- return preparedTasks
-
- def prepareTask(self, missionId: int, planId: int):
- mission = [m for m in self.missions if m['id'] == missionId]
- if not mission:
- print("未找到mission,是否错误传入更新请求")
- return []
- else:
- mission = mission[0]
- #
- for plan in [plan for plan in mission['plans'] if plan['id'] == planId]:
- algorithm = [algo for algo in algoList if algo['name'] == plan['algorithm']]
- if not algorithm:
- print("未找到选择的算法,是否需要更新算法列表?")
- return []
- else:
- algorithm = algorithm[0]
- preparedTask = {
- 'plan': {
- 'id': plan['id'],
- },
- 'cwd': str(algorithm['path']),
- 'command': algorithm['command'],
- 'mission': {
- 'id': mission['id'],
- },
- }
- return preparedTask
- print("未找到plan,是否错误传入更新请求")
- return []
-
- def fetchData(self, missionId, planId):
- mission = [m for m in self.missions if m['id'] == missionId]
- if not mission:
- print("未找到mission,是否错误传入更新请求")
- return []
- else:
- mission = mission[0]
- #
- for plan in mission['plans']:
- if plan['id'] == planId:
- return {
- 'nodes': plan['nodes'],
- 'edges': plan['edges'],
- }
- print("在plans中找不到对应的plan")
- return {}
- # 被导出的全局store变量
- store = Store()
|