import requests, csv from pathlib import Path from enum import Enum import logging logger = logging.getLogger("Store") SCHEDULER_BASE_URL = "http://localhost:5000" BACKEND_BASE_URL = "http://localhost:8000/api" BASE_DIR = Path(__file__).resolve().parent class TaskState(Enum): INIT = 0 CALCULATING = 1 DONE = 2 # 从配置文件中读取可用的算法 algoList = [] algoConfig = csv.reader(open('./algorithms.config', 'r')) for algo in algoConfig: algoList.append({ 'name': algo[0], 'path': BASE_DIR / algo[1], 'command': algo[2].split(' '), }) class Store: missions = [] # 添加任务时仅放入第一级待计算plans,后续每个plan完成计算后根据children列表寻找下一个plan继续计算 def addMission(self, mission: dict, plans: list): # 判断是否存在重复mission if [m for m in self.missions if m['id'] == mission['id']]: logger.error("存在重复Mission,无法新建mission") return False # 寻找初始并行任务, 默认第一个是根plan,后续所有都是它的子节点 originPlans = [] root = plans.pop(0) for childId in root['children']: for p in [plan for plan in plans if plan['id'] == childId]: # 找到匹配项则从plans中弹出,放入originPlans中 originPlans.append(p) # 将mission中传递的初始数据赋值给origins if not 'nodes' in p or not p['nodes']: if 'nodes' in root: p['nodes'] = root['nodes'] elif 'nodes' in mission: p['nodes'] = mission['nodes'] else: print("无法从初始任务中找到节点信息") if not 'edges' in p or not p['edges']: if 'edges' in root: p['edges'] = root['edges'] elif 'edges' in mission: p['edges'] = mission['edges'] else: logger.error("无法从初始任务中找到边信息") break tasks = [] # 第一级plans,所有当前任务叫做tasks tasks.extend([p['id'] for p in originPlans]) self.missions.append({ 'id': mission['id'], 'plans': plans, 'tasks': tasks, }) return True # 当出现进程错误或需要移除mission时使用 def removeMission(self, missionId: int): mission = [m for m in self.missions if int(m['id']) == missionId] if not mission: logger.error("移除Mission时出错,未找到该Mission") return else: mission = mission[0] index = self.missions.index(mission) del self.missions[index] logger.info(f"Mission: {missionId} 已移除") # mission中的某项任务完成计算 def solveMission(self, missionId: int, planId: int, results: dict): mission = [m for m in self.missions if m['id'] == missionId] if not mission: return [] else: mission = mission[0] if planId in mission['tasks']: # 从tasks中弹出当前task mission['tasks'].remove(planId) # 在plans中找到对用ID的plan taskP = [plan for plan in mission['plans'] if plan['id'] == planId][0] children = taskP['children'] # 如果已经是叶子,则不需要继续调用处理程序 if not children: # 如果全部处理完成,这里输出一下 logger.info(f"这里的tasks是 {mission['tasks']}") if not mission['tasks']: logger.info(f"全部计算任务处理完成 MissionId: {mission['id']}") # 然后删除全部处理任务 try: self.missions.remove(mission) except Exception as error: logger.error(f"删除完成的处理任务失败 MissionId: {mission['id']} Error: {error}") return [] # 将完成的task的子节点压入tasks mission['tasks'].extend(children) # 将完成节点的结果数据作为子节点的输入数据 for plan in [p for p in mission['plans'] if p['id'] in children]: plan['nodes'] = results['nodes'] plan['edges'] = results['edges'] # 把已完成的plan从plans中删除 mission['plans'] = [p for p in mission['plans'] if p['id'] != planId] else: logger.error("给出的Mission中找不到完成的任务信息") return [] # 返回下一轮需要调用的任务,如果存在并行,则列表长度大于1 return [plan for plan in mission['plans'] if plan['id'] in children] def resumeMission(self, missionId: int, plans: list): mission = [m for m in self.missions if int(m['id']) == missionId] if not mission: logger.error("未找到要恢复的mission,依据传递plans重新创建") # 如果没有找到要恢复的Mission,就重新构造mission # 此时传递的plans中全部都是第一级任务,应直接装入Task列表 tasks = [plan['id'] for plan in plans if 'root' in plan] mission = { 'id': missionId, 'plans': plans, 'tasks': tasks, } # 构造mission之后还需要加入missions以便后续继续追踪 self.missions.append(mission) else: logger.info("已找到要恢复的mission,正在恢复任务") mission = mission[0] # 返回所有的tasks,由于暂停时仅在maanger中停止了进程,而没有在store中处理,因此tasks中保存的就是此前的运行任务 return [plan for plan in mission['plans'] if plan['id'] in mission['tasks']] def stopMission(self, missionId: int): mission = [m for m in self.missions if int(m['id']) == missionId] if not mission: logger.error("未找到要停止的mission") return False # 停止后不需恢复,所以直接摘除该mission即可 self.missions = [m for m in self.missions if int(m['id']) != missionId] return True def initMissionTasks(self, missionId: int): mission = [m for m in self.missions if m['id'] == missionId] if not mission: logger.error("未找到mission,是否错误传入更新请求") return [] else: mission = mission[0] preparedTasks = [] for plan in [plan for plan in mission['plans'] if plan['id'] in mission['tasks']]: algorithm = [algo for algo in algoList if algo['name'] == plan['algorithm']] if not algorithm: logger.error("未找到选择的算法,是否需要更新算法列表?") return [] else: algorithm = algorithm[0] preparedTasks.append({ 'mission': { 'id': mission['id'], }, 'plan': { 'id': plan['id'], }, 'cwd': str(algorithm['path']), 'command': algorithm['command'], 'algorithm': plan['algorithm'], }) logger.info(f"初始化计算任务:{preparedTasks}") return preparedTasks def prepareTask(self, missionId: int, planId: int): mission = [m for m in self.missions if m['id'] == missionId] if not mission: print("未找到mission,是否错误传入更新请求") return [] else: mission = mission[0] # for plan in [plan for plan in mission['plans'] if plan['id'] == planId]: algorithm = [algo for algo in algoList if algo['name'] == plan['algorithm']] if not algorithm: print("未找到选择的算法,是否需要更新算法列表?") return [] else: algorithm = algorithm[0] preparedTask = { 'plan': { 'id': plan['id'], }, 'cwd': str(algorithm['path']), 'command': algorithm['command'], 'mission': { 'id': mission['id'], }, 'algorithm': plan['algorithm'], } return preparedTask print("未找到plan,是否错误传入更新请求") return [] def fetchData(self, missionId, planId): mission = [m for m in self.missions if m['id'] == missionId] if not mission: print("未找到mission,是否错误传入更新请求") return [] else: mission = mission[0] # for plan in mission['plans']: if plan['id'] == planId: return { 'nodes': plan['nodes'], 'edges': plan['edges'], } print("在plans中找不到对应的plan") return {} # 被导出的全局store变量 store = Store()