utils.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. import requests, csv
  2. from pathlib import Path
  3. from enum import Enum
  4. import logging
  5. logger = logging.getLogger("Store")
  6. SCHEDULER_BASE_URL = "http://localhost:5000"
  7. BACKEND_BASE_URL = "http://localhost:8000/api"
  8. BASE_DIR = Path(__file__).resolve().parent
  9. class TaskState(Enum):
  10. INIT = 0
  11. CALCULATING = 1
  12. DONE = 2
  13. # 从配置文件中读取可用的算法
  14. algoList = []
  15. algoConfig = csv.reader(open('./algorithms.config', 'r'))
  16. for algo in algoConfig:
  17. algoList.append({
  18. 'name': algo[0],
  19. 'path': BASE_DIR / algo[1],
  20. 'command': algo[2].split(' '),
  21. })
  22. class Store:
  23. missions = []
  24. # 添加任务时仅放入第一级待计算plans,后续每个plan完成计算后根据children列表寻找下一个plan继续计算
  25. def addMission(self, mission: dict, plans: list):
  26. # 判断是否存在重复mission
  27. if [m for m in self.missions if m['id'] == mission['id']]:
  28. logger.error("存在重复Mission,无法新建mission")
  29. return False
  30. # 寻找初始并行任务, 默认第一个是根plan,后续所有都是它的子节点
  31. originPlans = []
  32. root = plans.pop(0)
  33. for childId in root['children']:
  34. for p in [plan for plan in plans if plan['id'] == childId]:
  35. # 找到匹配项则从plans中弹出,放入originPlans中
  36. originPlans.append(p)
  37. # 将mission中传递的初始数据赋值给origins
  38. if not 'nodes' in p or not p['nodes']:
  39. if 'nodes' in root:
  40. p['nodes'] = root['nodes']
  41. elif 'nodes' in mission:
  42. p['nodes'] = mission['nodes']
  43. else:
  44. print("无法从初始任务中找到节点信息")
  45. if not 'edges' in p or not p['edges']:
  46. if 'edges' in root:
  47. p['edges'] = root['edges']
  48. elif 'edges' in mission:
  49. p['edges'] = mission['edges']
  50. else:
  51. logger.error("无法从初始任务中找到边信息")
  52. break
  53. tasks = []
  54. # 第一级plans,所有当前任务叫做tasks
  55. tasks.extend([p['id'] for p in originPlans])
  56. self.missions.append({
  57. 'id': mission['id'],
  58. 'plans': plans,
  59. 'tasks': tasks,
  60. })
  61. return True
  62. # 当出现进程错误或需要移除mission时使用
  63. def removeMission(self, missionId: int):
  64. mission = [m for m in self.missions if m['id'] == missionId]
  65. if not mission:
  66. logger.error("移除Mission时出错,未找到该Mission")
  67. return
  68. else:
  69. mission = mission[0]
  70. index = self.missions.index(mission)
  71. del self.missions[index]
  72. logger.info(f"Mission: {missionId} 已移除")
  73. # mission中的某项任务完成计算
  74. def solveMission(self, missionId: int, planId: int, results: dict):
  75. mission = [m for m in self.missions if m['id'] == missionId]
  76. if not mission:
  77. return []
  78. else:
  79. mission = mission[0]
  80. if planId in mission['tasks']:
  81. # 从tasks中弹出当前task
  82. mission['tasks'].remove(planId)
  83. # 在plans中找到对用ID的plan
  84. taskP = [plan for plan in mission['plans'] if plan['id'] == planId][0]
  85. children = taskP['children']
  86. # 如果已经是叶子,则不需要继续调用处理程序
  87. if not children:
  88. # 如果全部处理完成,这里输出一下
  89. logger.info(f"这里的tasks是 {mission['tasks']}")
  90. if not mission['tasks']:
  91. logger.info(f"全部计算任务处理完成 MissionId: {mission['id']}")
  92. # 然后删除全部处理任务
  93. try:
  94. self.missions.remove(mission)
  95. except Exception as error:
  96. logger.error(f"删除完成的处理任务失败 MissionId: {mission['id']} Error: {error}")
  97. return []
  98. # 将完成的task的子节点压入tasks
  99. mission['tasks'].extend(children)
  100. # 将完成节点的结果数据作为子节点的输入数据
  101. for plan in [p for p in mission['plans'] if p['id'] in children]:
  102. plan['nodes'] = results['nodes']
  103. plan['edges'] = results['edges']
  104. # 把已完成的plan从plans中删除
  105. mission['plans'] = [p for p in mission['plans'] if p['id'] != planId]
  106. else:
  107. logger.error("给出的Mission中找不到完成的任务信息")
  108. return []
  109. # 返回下一轮需要调用的任务,如果存在并行,则列表长度大于1
  110. return [plan for plan in mission['plans'] if plan['id'] in children]
  111. def resumeMission(self, missionId: int):
  112. mission = [m for m in self.missions if m['id'] == missionId]
  113. if not mission:
  114. logger.error("未找到要恢复的mission")
  115. else:
  116. mission = mission[0]
  117. # 返回所有的tasks,由于暂停时仅在maanger中停止了进程,而没有在store中处理,因此tasks中保存的就是此前的运行任务
  118. return [plan for plan in mission['plans'] if plan['id'] in mission['tasks']]
  119. def stopMission(self, missionId: int):
  120. mission = [m for m in self.missions if m['id'] == missionId]
  121. if not mission:
  122. logger.error("未找到要停止的mission")
  123. return False
  124. # 停止后不需恢复,所以直接摘除该mission即可
  125. self.missions = [m for m in self.missions if m['id'] != missionId]
  126. return True
  127. def initMissionTasks(self, missionId: int):
  128. mission = [m for m in self.missions if m['id'] == missionId]
  129. if not mission:
  130. logger.error("未找到mission,是否错误传入更新请求")
  131. return []
  132. else:
  133. mission = mission[0]
  134. preparedTasks = []
  135. for plan in [plan for plan in mission['plans'] if plan['id'] in mission['tasks']]:
  136. algorithm = [algo for algo in algoList if algo['name'] == plan['algorithm']]
  137. if not algorithm:
  138. logger.error("未找到选择的算法,是否需要更新算法列表?")
  139. return []
  140. else:
  141. algorithm = algorithm[0]
  142. preparedTasks.append({
  143. 'mission': {
  144. 'id': mission['id'],
  145. },
  146. 'plan': {
  147. 'id': plan['id'],
  148. },
  149. 'cwd': str(algorithm['path']),
  150. 'command': algorithm['command'],
  151. })
  152. logger.info(f"初始化计算任务:{preparedTasks}")
  153. return preparedTasks
  154. def prepareTask(self, missionId: int, planId: int):
  155. mission = [m for m in self.missions if m['id'] == missionId]
  156. if not mission:
  157. print("未找到mission,是否错误传入更新请求")
  158. return []
  159. else:
  160. mission = mission[0]
  161. #
  162. for plan in [plan for plan in mission['plans'] if plan['id'] == planId]:
  163. algorithm = [algo for algo in algoList if algo['name'] == plan['algorithm']]
  164. if not algorithm:
  165. print("未找到选择的算法,是否需要更新算法列表?")
  166. return []
  167. else:
  168. algorithm = algorithm[0]
  169. preparedTask = {
  170. 'plan': {
  171. 'id': plan['id'],
  172. },
  173. 'cwd': str(algorithm['path']),
  174. 'command': algorithm['command'],
  175. 'mission': {
  176. 'id': mission['id'],
  177. },
  178. }
  179. return preparedTask
  180. print("未找到plan,是否错误传入更新请求")
  181. return []
  182. def fetchData(self, missionId, planId):
  183. mission = [m for m in self.missions if m['id'] == missionId]
  184. if not mission:
  185. print("未找到mission,是否错误传入更新请求")
  186. return []
  187. else:
  188. mission = mission[0]
  189. #
  190. for plan in mission['plans']:
  191. if plan['id'] == planId:
  192. return {
  193. 'nodes': plan['nodes'],
  194. 'edges': plan['edges'],
  195. }
  196. print("在plans中找不到对应的plan")
  197. return {}
  198. # 被导出的全局store变量
  199. store = Store()