utils.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. import requests, csv
  2. from pathlib import Path
  3. from enum import Enum
  4. import logging
  5. logger = logging.getLogger("Store")
  6. SCHEDULER_BASE_URL = "http://localhost:5000"
  7. BACKEND_BASE_URL = "http://localhost:8000/api"
  8. BASE_DIR = Path(__file__).resolve().parent
  9. class TaskState(Enum):
  10. INIT = 0
  11. CALCULATING = 1
  12. DONE = 2
  13. # 从配置文件中读取可用的算法
  14. algoList = []
  15. algoConfig = csv.reader(open('./algorithms.config', 'r'))
  16. for algo in algoConfig:
  17. algoList.append({
  18. 'name': algo[0],
  19. 'path': BASE_DIR / algo[1],
  20. 'command': algo[2],
  21. })
  22. class Store:
  23. missions = []
  24. activeTasks = []
  25. def addActiveTask(self, missionId: int, planId: int):
  26. self.activeTasks.append({
  27. 'missionId': missionId,
  28. 'planId': planId,
  29. 'state': TaskState.INIT,
  30. })
  31. def checkActiveTask(self, missionId: int, planId: int, state: TaskState):
  32. for task in self.activeTasks:
  33. if task['missionId'] == missionId and task['planId'] == planId:
  34. if task['state'] == state:
  35. return True
  36. break
  37. return False
  38. def updateActiveTask(self, missionId: int, planId: int, state: TaskState):
  39. for task in self.activeTasks:
  40. if task['missionId'] == missionId and task['planId'] == planId:
  41. # 已经DONE的task不应该再被修改状态
  42. if task['state'] != TaskState.DONE:
  43. task['state'] = state
  44. return True
  45. break
  46. return False
  47. def removeActiveTask(self, missionId: int, planId: int):
  48. originLen = len(self.activeTasks)
  49. self.activeTasks = [task for task in self.activeTasks if task['missionId'] != missionId and task['planId'] != planId]
  50. if originLen != (len(self.activeTasks) + 1):
  51. # 没有删除或者删除多于一个?
  52. print("没有删除Task或删除多于一个Task")
  53. return False
  54. return True
  55. # 添加任务时仅放入第一级待计算plans,后续每个plan完成计算后根据children列表寻找下一个plan继续计算
  56. def addMission(self, mission: dict, plans: list):
  57. # 判断是否存在重复mission
  58. if [m for m in self.missions if m['id'] == mission['id']]:
  59. print("存在重复Mission,无法新建mission")
  60. return False
  61. # 寻找初始并行任务, 默认第一个是根plan,后续所有都是它的子节点
  62. originPlans = []
  63. root = plans.pop(0)
  64. for childId in root['children']:
  65. for p in [plan for plan in plans if plan['id'] == childId]:
  66. # 找到匹配项则从plans中弹出,放入originPlans中
  67. originPlans.append(p)
  68. # 将mission中传递的初始数据赋值给origins
  69. if not 'nodes' in p or not p['nodes']:
  70. if 'nodes' in root:
  71. p['nodes'] = root['nodes']
  72. elif 'nodes' in mission:
  73. p['nodes'] = mission['nodes']
  74. else:
  75. print("无法从初始任务中找到节点信息")
  76. if not 'edges' in p or not p['edges']:
  77. if 'edges' in root:
  78. p['edges'] = root['edges']
  79. elif 'edges' in mission:
  80. p['edges'] = mission['edges']
  81. else:
  82. print("无法从初始任务中找到边信息")
  83. break
  84. tasks = []
  85. # 第一级plans
  86. tasks.extend([p['id'] for p in originPlans])
  87. self.missions.append({
  88. 'id': mission['id'],
  89. 'plans': plans,
  90. 'tasks': tasks,
  91. })
  92. return True
  93. # mission中的某项任务完成计算
  94. def solveMission(self, missionId: int, planId: int, results: dict):
  95. mission = [m for m in self.missions if m['id'] == missionId]
  96. if not mission:
  97. return []
  98. else:
  99. mission = mission[0]
  100. if planId in mission['tasks']:
  101. # 从tasks中弹出当前task
  102. mission['tasks'].remove(planId)
  103. # 在plans中找到对用ID的plan
  104. taskP = [plan for plan in mission['plans'] if plan['id'] == planId][0]
  105. children = taskP['children']
  106. # 如果已经是叶子,则不需要继续调用处理程序
  107. if not children:
  108. # 如果全部处理完成,这里输出一下
  109. logger.info(f"这里的tasks是 {mission['tasks']}")
  110. if not mission['tasks']:
  111. logger.info(f"全部计算任务处理完成 MissionId: {mission['id']}")
  112. # 然后删除全部处理任务
  113. try:
  114. self.missions.remove(mission)
  115. except Exception as error:
  116. logger.error(f"删除完成的处理任务失败 MissionId: {mission['id']} Error: {error}")
  117. return []
  118. # 将完成的task的子节点压入tasks
  119. mission['tasks'].extend(children)
  120. # 将完成节点的结果数据作为子节点的输入数据
  121. for plan in [p for p in mission['plans'] if p['id'] in children]:
  122. plan['nodes'] = results['nodes']
  123. plan['edges'] = results['edges']
  124. # 把已完成的plan从plans中删除
  125. mission['plans'] = [p for p in mission['plans'] if p['id'] != planId]
  126. else:
  127. print("给出的不是task列表中计算任务?")
  128. return []
  129. # 返回下一轮需要调用的任务,如果存在并行,则列表长度大于1
  130. return [plan for plan in mission['plans'] if plan['id'] in children]
  131. def initMissionTasks(self, missionId: int):
  132. mission = [m for m in self.missions if m['id'] == missionId]
  133. if not mission:
  134. print("未找到mission,是否错误传入更新请求")
  135. return []
  136. else:
  137. mission = mission[0]
  138. preparedTasks = []
  139. for plan in [plan for plan in mission['plans'] if plan['id'] in mission['tasks']]:
  140. algorithm = [algo for algo in algoList if algo['name'] == plan['algorithm']]
  141. if not algorithm:
  142. print("未找到选择的算法,是否需要更新算法列表?")
  143. return []
  144. else:
  145. algorithm = algorithm[0]
  146. preparedTasks.append({
  147. 'plan': {
  148. 'id': plan['id'],
  149. },
  150. 'cwd': str(algorithm['path']),
  151. 'command': algorithm['command'],
  152. 'mission': {
  153. 'id': mission['id'],
  154. },
  155. })
  156. print("initMissionTasks is :", preparedTasks)
  157. return preparedTasks
  158. def prepareTask(self, missionId: int, planId: int):
  159. mission = [m for m in self.missions if m['id'] == missionId]
  160. if not mission:
  161. print("未找到mission,是否错误传入更新请求")
  162. return []
  163. else:
  164. mission = mission[0]
  165. #
  166. for plan in [plan for plan in mission['plans'] if plan['id'] == planId]:
  167. algorithm = [algo for algo in algoList if algo['name'] == plan['algorithm']]
  168. if not algorithm:
  169. print("未找到选择的算法,是否需要更新算法列表?")
  170. return []
  171. else:
  172. algorithm = algorithm[0]
  173. preparedTask = {
  174. 'plan': {
  175. 'id': plan['id'],
  176. },
  177. 'cwd': str(algorithm['path']),
  178. 'command': algorithm['command'],
  179. 'mission': {
  180. 'id': mission['id'],
  181. },
  182. }
  183. return preparedTask
  184. print("未找到plan,是否错误传入更新请求")
  185. return []
  186. def fetchData(self, missionId, planId):
  187. mission = [m for m in self.missions if m['id'] == missionId]
  188. if not mission:
  189. print("未找到mission,是否错误传入更新请求")
  190. return []
  191. else:
  192. mission = mission[0]
  193. #
  194. for plan in mission['plans']:
  195. if plan['id'] == planId:
  196. return {
  197. 'nodes': plan['nodes'],
  198. 'edges': plan['edges'],
  199. }
  200. print("在plans中找不到对应的plan")
  201. return {}