123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- import requests, csv
- from pathlib import Path
- from enum import Enum
- SCHEDULER_BASE_URL = "http://localhost:5000"
- BACKEND_BASE_URL = "http://localhost:8000/api"
- BASE_DIR = Path(__file__).resolve().parent
- class TaskState(Enum):
- INIT = 0
- CALCULATING = 1
- DONE = 2
- # 从配置文件中读取可用的算法
- algoList = []
- algoConfig = csv.reader(open('./algorithms.config', 'r'))
- for algo in algoConfig:
- algoList.append({
- 'name': algo[0],
- 'path': BASE_DIR / algo[1],
- 'command': algo[2],
- })
- class Store:
- missions = []
- activeTasks = []
- def addActiveTask(self, missionId: int, planId: int):
- self.activeTasks.append({
- 'missionId': missionId,
- 'planId': planId,
- 'state': TaskState.INIT,
- })
-
- def checkActiveTask(self, missionId: int, planId: int, state: TaskState):
- for task in self.activeTasks:
- if task['missionId'] == missionId and task['planId'] == planId:
- if task['state'] == state:
- return True
- break
- return False
- def updateActiveTask(self, missionId: int, planId: int, state: TaskState):
- for task in self.activeTasks:
- if task['missionId'] == missionId and task['planId'] == planId:
- # 已经DONE的task不应该再被修改状态
- if task['state'] != TaskState.DONE:
- task['state'] = state
- return True
- break
- return False
- def removeActiveTask(self, missionId: int, planId: int):
- originLen = len(self.activeTasks)
- self.activeTasks = [task for task in self.activeTasks if task['missionId'] != missionId and task['planId'] != planId]
- if originLen != (len(self.activeTasks) + 1):
- # 没有删除或者删除多于一个?
- print("没有删除Task或删除多于一个Task")
- return False
- return True
- # 添加任务时仅放入第一级待计算plans,后续每个plan完成计算后根据children列表寻找下一个plan继续计算
- def addMission(self, mission: dict, plans: list):
- # 判断是否存在重复mission
- if [m for m in self.missions if m['id'] == mission['id']]:
- print("存在重复Mission,无法新建mission")
- return False
- # 寻找初始并行任务, 默认第一个是根plan,后续所有都是它的子节点
- originPlans = []
- root = plans.pop(0)
- for childId in root['children']:
- for p in [plan for plan in plans if plan['id'] == childId]:
- # 找到匹配项则从plans中弹出,放入originPlans中
- originPlans.append(p)
- break
-
- tasks = []
- # 第一级plans
- tasks.extend([p['id'] for p in originPlans])
- self.missions.append({
- 'id': mission['id'],
- 'plans': plans,
- 'tasks': tasks,
- })
- return True
-
- # mission中的某项任务完成计算
- def solveMission(self, missionId: int, planId: int, result: dict):
- mission = [m for m in self.missions if m['id'] == missionId]
- if not mission:
- print("未找到mission,是否错误传入更新请求")
- return []
- else:
- mission = mission[0]
- if planId in mission['tasks']:
- # 从tasks中弹出当前task
- mission['tasks'].remove(planId)
- # 在plans中找到对用ID的plan
- taskP = [plan for plan in mission['plans'] if plan['id'] == planId][0]
- children = taskP['children']
- # 如果已经是叶子,则不需要继续调用处理程序
- if not children:
- return []
- # 将完成的task的子节点压入tasks
- mission['tasks'].extend(children)
- for plan in [p for p in mission['plans'] if p['id'] in children]:
- plan['nodes'] = result['nodes']
- plan['edges'] = result['edges']
- # 把已完成的plan从plans中删除
- mission['plans'] = [p for p in mission['plans'] if p['id'] != planId]
- else:
- print("给出的不是task列表中计算任务?")
- return []
- # 返回下一轮需要调用的任务,如果存在并行,则列表长度大于1
- return [plan for plan in mission['plans'] if plan['id'] in children]
-
- def prepareTasks(self, missionId: int):
- mission = [m for m in self.missions if m['id'] == missionId]
- if not mission:
- print("未找到mission,是否错误传入更新请求")
- return []
- else:
- mission = mission[0]
- preparedTasks = []
- for plan in [plan for plan in mission['plans'] if plan['id'] in mission['tasks']]:
- algorithm = [algo for algo in algoList if algo['name'] == plan['algorithm']]
- if not algorithm:
- print("未找到选择的算法,是否需要更新算法列表?")
- return []
- else:
- algorithm = algorithm[0]
- preparedTasks.append({
- 'plan': plan,
- 'cwd': str(algorithm['path']),
- 'command': algorithm['command'],
- })
- return preparedTasks
|