utils.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. import requests, csv
  2. from pathlib import Path
  3. from enum import Enum
  4. import logging
  5. logger = logging.getLogger("Store")
  6. SCHEDULER_BASE_URL = "http://localhost:5000"
  7. BACKEND_BASE_URL = "http://localhost:8000/api"
  8. BASE_DIR = Path(__file__).resolve().parent
  9. class TaskState(Enum):
  10. INIT = 0
  11. CALCULATING = 1
  12. DONE = 2
  13. # 从配置文件中读取可用的算法
  14. algoList = []
  15. algoConfig = csv.reader(open('./algorithms.config', 'r'))
  16. for algo in algoConfig:
  17. algoList.append({
  18. 'name': algo[0],
  19. 'path': BASE_DIR / algo[1],
  20. 'command': algo[2].split(' '),
  21. })
  22. class Store:
  23. missions = []
  24. # 添加任务时仅放入第一级待计算plans,后续每个plan完成计算后根据children列表寻找下一个plan继续计算
  25. def addMission(self, mission: dict, plans: list):
  26. # 判断是否存在重复mission
  27. if [m for m in self.missions if m['id'] == mission['id']]:
  28. logger.error("存在重复Mission,无法新建mission")
  29. return False
  30. # 寻找初始并行任务, 默认第一个是根plan,后续所有都是它的子节点
  31. originPlans = []
  32. root = plans.pop(0)
  33. for childId in root['children']:
  34. for p in [plan for plan in plans if plan['id'] == childId]:
  35. # 找到匹配项则从plans中弹出,放入originPlans中
  36. originPlans.append(p)
  37. # 将mission中传递的初始数据赋值给origins
  38. if not 'nodes' in p or not p['nodes']:
  39. if 'nodes' in root:
  40. p['nodes'] = root['nodes']
  41. elif 'nodes' in mission:
  42. p['nodes'] = mission['nodes']
  43. else:
  44. print("无法从初始任务中找到节点信息")
  45. if not 'edges' in p or not p['edges']:
  46. if 'edges' in root:
  47. p['edges'] = root['edges']
  48. elif 'edges' in mission:
  49. p['edges'] = mission['edges']
  50. else:
  51. logger.error("无法从初始任务中找到边信息")
  52. break
  53. tasks = []
  54. # 第一级plans,所有当前任务叫做tasks
  55. tasks.extend([p['id'] for p in originPlans])
  56. self.missions.append({
  57. 'id': mission['id'],
  58. 'plans': plans,
  59. 'tasks': tasks,
  60. })
  61. return True
  62. # 当出现进程错误或需要移除mission时使用
  63. def removeMission(self, missionId: int):
  64. mission = [m for m in self.missions if int(m['id']) == missionId]
  65. if not mission:
  66. logger.error("移除Mission时出错,未找到该Mission")
  67. return
  68. else:
  69. mission = mission[0]
  70. index = self.missions.index(mission)
  71. del self.missions[index]
  72. logger.info(f"Mission: {missionId} 已移除")
  73. # mission中的某项任务完成计算
  74. def solveMission(self, missionId: int, planId: int, results: dict):
  75. mission = [m for m in self.missions if m['id'] == missionId]
  76. if not mission:
  77. return []
  78. else:
  79. mission = mission[0]
  80. if planId in mission['tasks']:
  81. # 从tasks中弹出当前task
  82. mission['tasks'].remove(planId)
  83. # 在plans中找到对用ID的plan
  84. taskP = [plan for plan in mission['plans'] if plan['id'] == planId][0]
  85. children = taskP['children']
  86. # 如果已经是叶子,则不需要继续调用处理程序
  87. if not children:
  88. # 如果全部处理完成,这里输出一下
  89. logger.info(f"这里的tasks是 {mission['tasks']}")
  90. if not mission['tasks']:
  91. logger.info(f"全部计算任务处理完成 MissionId: {mission['id']}")
  92. # 然后删除全部处理任务
  93. try:
  94. self.missions.remove(mission)
  95. except Exception as error:
  96. logger.error(f"删除完成的处理任务失败 MissionId: {mission['id']} Error: {error}")
  97. return []
  98. # 将完成的task的子节点压入tasks
  99. mission['tasks'].extend(children)
  100. # 将完成节点的结果数据作为子节点的输入数据
  101. for plan in [p for p in mission['plans'] if p['id'] in children]:
  102. plan['nodes'] = results['nodes']
  103. plan['edges'] = results['edges']
  104. # 把已完成的plan从plans中删除
  105. mission['plans'] = [p for p in mission['plans'] if p['id'] != planId]
  106. else:
  107. logger.error("给出的Mission中找不到完成的任务信息")
  108. return []
  109. # 返回下一轮需要调用的任务,如果存在并行,则列表长度大于1
  110. return [plan for plan in mission['plans'] if plan['id'] in children]
  111. def resumeMission(self, missionId: int, plans: list):
  112. mission = [m for m in self.missions if int(m['id']) == missionId]
  113. if not mission:
  114. logger.error("未找到要恢复的mission,依据传递plans重新创建")
  115. # 如果没有找到要恢复的Mission,就重新构造mission
  116. # 此时传递的plans中全部都是第一级任务,应直接装入Task列表
  117. tasks = [plan['id'] for plan in plans if 'root' in plan]
  118. mission = {
  119. 'id': missionId,
  120. 'plans': plans,
  121. 'tasks': tasks,
  122. }
  123. # 构造mission之后还需要加入missions以便后续继续追踪
  124. self.missions.append(mission)
  125. else:
  126. logger.info("已找到要恢复的mission,正在恢复任务")
  127. mission = mission[0]
  128. # 返回所有的tasks,由于暂停时仅在maanger中停止了进程,而没有在store中处理,因此tasks中保存的就是此前的运行任务
  129. return [plan for plan in mission['plans'] if plan['id'] in mission['tasks']]
  130. def stopMission(self, missionId: int):
  131. mission = [m for m in self.missions if int(m['id']) == missionId]
  132. if not mission:
  133. logger.error("未找到要停止的mission")
  134. return False
  135. # 停止后不需恢复,所以直接摘除该mission即可
  136. self.missions = [m for m in self.missions if int(m['id']) != missionId]
  137. return True
  138. def initMissionTasks(self, missionId: int):
  139. mission = [m for m in self.missions if m['id'] == missionId]
  140. if not mission:
  141. logger.error("未找到mission,是否错误传入更新请求")
  142. return []
  143. else:
  144. mission = mission[0]
  145. preparedTasks = []
  146. for plan in [plan for plan in mission['plans'] if plan['id'] in mission['tasks']]:
  147. algorithm = [algo for algo in algoList if algo['name'] == plan['algorithm']]
  148. if not algorithm:
  149. logger.error("未找到选择的算法,是否需要更新算法列表?")
  150. return []
  151. else:
  152. algorithm = algorithm[0]
  153. preparedTasks.append({
  154. 'mission': {
  155. 'id': mission['id'],
  156. },
  157. 'plan': {
  158. 'id': plan['id'],
  159. },
  160. 'cwd': str(algorithm['path']),
  161. 'command': algorithm['command'],
  162. 'algorithm': plan['algorithm'],
  163. })
  164. logger.info(f"初始化计算任务:{preparedTasks}")
  165. return preparedTasks
  166. def prepareTask(self, missionId: int, planId: int):
  167. mission = [m for m in self.missions if m['id'] == missionId]
  168. if not mission:
  169. print("未找到mission,是否错误传入更新请求")
  170. return []
  171. else:
  172. mission = mission[0]
  173. #
  174. for plan in [plan for plan in mission['plans'] if plan['id'] == planId]:
  175. algorithm = [algo for algo in algoList if algo['name'] == plan['algorithm']]
  176. if not algorithm:
  177. print("未找到选择的算法,是否需要更新算法列表?")
  178. return []
  179. else:
  180. algorithm = algorithm[0]
  181. preparedTask = {
  182. 'plan': {
  183. 'id': plan['id'],
  184. },
  185. 'cwd': str(algorithm['path']),
  186. 'command': algorithm['command'],
  187. 'mission': {
  188. 'id': mission['id'],
  189. },
  190. 'algorithm': plan['algorithm'],
  191. }
  192. return preparedTask
  193. print("未找到plan,是否错误传入更新请求")
  194. return []
  195. def fetchData(self, missionId, planId):
  196. mission = [m for m in self.missions if m['id'] == missionId]
  197. if not mission:
  198. print("未找到mission,是否错误传入更新请求")
  199. return []
  200. else:
  201. mission = mission[0]
  202. #
  203. for plan in mission['plans']:
  204. if plan['id'] == planId:
  205. return {
  206. 'nodes': plan['nodes'],
  207. 'edges': plan['edges'],
  208. }
  209. print("在plans中找不到对应的plan")
  210. return {}
  211. # 被导出的全局store变量
  212. store = Store()