utils.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. import requests, csv
  2. from pathlib import Path
  3. from enum import Enum
  4. import logging
  5. logger = logging.getLogger("Store")
  6. SCHEDULER_BASE_URL = "http://localhost:5000"
  7. BACKEND_BASE_URL = "http://localhost:8000/api"
  8. BASE_DIR = Path(__file__).resolve().parent
  9. class TaskState(Enum):
  10. INIT = 0
  11. CALCULATING = 1
  12. DONE = 2
  13. # 从配置文件中读取可用的算法
  14. algoList = []
  15. algoConfig = csv.reader(open('./algorithms.config', 'r'))
  16. for algo in algoConfig:
  17. algoList.append({
  18. 'name': algo[0],
  19. 'path': BASE_DIR / algo[1],
  20. 'command': algo[2].split(' '),
  21. })
  22. class Store:
  23. missions = []
  24. activeTasks = []
  25. def addActiveTask(self, missionId: int, planId: int):
  26. self.activeTasks.append({
  27. 'missionId': missionId,
  28. 'planId': planId,
  29. 'state': TaskState.INIT,
  30. })
  31. def checkActiveTask(self, missionId: int, planId: int, state: TaskState):
  32. for task in self.activeTasks:
  33. if task['missionId'] == missionId and task['planId'] == planId:
  34. if task['state'] == state:
  35. return True
  36. break
  37. return False
  38. def updateActiveTask(self, missionId: int, planId: int, state: TaskState):
  39. for task in self.activeTasks:
  40. if task['missionId'] == missionId and task['planId'] == planId:
  41. # 已经DONE的task不应该再被修改状态
  42. if task['state'] != TaskState.DONE:
  43. task['state'] = state
  44. return True
  45. break
  46. return False
  47. def removeActiveTask(self, missionId: int, planId: int):
  48. logger.info(f"selfACTIVETASL{self.activeTasks}")
  49. self.activeTasks = [task for task in self.activeTasks if task['missionId'] != missionId and task['planId'] != planId]
  50. logger.info(f"selfACTIVETASL{self.activeTasks}")
  51. return True
  52. # 添加任务时仅放入第一级待计算plans,后续每个plan完成计算后根据children列表寻找下一个plan继续计算
  53. def addMission(self, mission: dict, plans: list):
  54. # 判断是否存在重复mission
  55. if [m for m in self.missions if m['id'] == mission['id']]:
  56. print("存在重复Mission,无法新建mission")
  57. return False
  58. # 寻找初始并行任务, 默认第一个是根plan,后续所有都是它的子节点
  59. originPlans = []
  60. root = plans.pop(0)
  61. for childId in root['children']:
  62. for p in [plan for plan in plans if plan['id'] == childId]:
  63. # 找到匹配项则从plans中弹出,放入originPlans中
  64. originPlans.append(p)
  65. # 将mission中传递的初始数据赋值给origins
  66. if not 'nodes' in p or not p['nodes']:
  67. if 'nodes' in root:
  68. p['nodes'] = root['nodes']
  69. elif 'nodes' in mission:
  70. p['nodes'] = mission['nodes']
  71. else:
  72. print("无法从初始任务中找到节点信息")
  73. if not 'edges' in p or not p['edges']:
  74. if 'edges' in root:
  75. p['edges'] = root['edges']
  76. elif 'edges' in mission:
  77. p['edges'] = mission['edges']
  78. else:
  79. print("无法从初始任务中找到边信息")
  80. break
  81. tasks = []
  82. # 第一级plans
  83. tasks.extend([p['id'] for p in originPlans])
  84. self.missions.append({
  85. 'id': mission['id'],
  86. 'plans': plans,
  87. 'tasks': tasks,
  88. })
  89. return True
  90. # mission中的某项任务完成计算
  91. def solveMission(self, missionId: int, planId: int, results: dict):
  92. mission = [m for m in self.missions if m['id'] == missionId]
  93. if not mission:
  94. return []
  95. else:
  96. mission = mission[0]
  97. if planId in mission['tasks']:
  98. # 从tasks中弹出当前task
  99. mission['tasks'].remove(planId)
  100. # 在plans中找到对用ID的plan
  101. taskP = [plan for plan in mission['plans'] if plan['id'] == planId][0]
  102. children = taskP['children']
  103. # 如果已经是叶子,则不需要继续调用处理程序
  104. if not children:
  105. # 如果全部处理完成,这里输出一下
  106. logger.info(f"这里的tasks是 {mission['tasks']}")
  107. if not mission['tasks']:
  108. logger.info(f"全部计算任务处理完成 MissionId: {mission['id']}")
  109. # 然后删除全部处理任务
  110. try:
  111. self.missions.remove(mission)
  112. except Exception as error:
  113. logger.error(f"删除完成的处理任务失败 MissionId: {mission['id']} Error: {error}")
  114. return []
  115. # 将完成的task的子节点压入tasks
  116. mission['tasks'].extend(children)
  117. # 将完成节点的结果数据作为子节点的输入数据
  118. for plan in [p for p in mission['plans'] if p['id'] in children]:
  119. plan['nodes'] = results['nodes']
  120. plan['edges'] = results['edges']
  121. # 把已完成的plan从plans中删除
  122. mission['plans'] = [p for p in mission['plans'] if p['id'] != planId]
  123. else:
  124. print("给出的不是task列表中计算任务?")
  125. return []
  126. # 返回下一轮需要调用的任务,如果存在并行,则列表长度大于1
  127. return [plan for plan in mission['plans'] if plan['id'] in children]
  128. def resumeMission(self, missionId: int):
  129. mission = [m for m in self.missions if m['id'] == missionId]
  130. if not mission:
  131. logger.info("未找到要恢复的mission")
  132. else:
  133. mission = mission[0]
  134. # 返回所有的tasks,由于暂停时仅在maanger中停止了进程,而没有在store中处理,因此tasks中保存的就是此前的运行任务
  135. return [plan for plan in mission['plans'] if plan['id'] in mission['tasks']]
  136. def stopMission(self, missionId: int):
  137. mission = [m for m in self.missions if m['id'] == missionId]
  138. if not mission:
  139. logger.info("未找到要停止的mission")
  140. return False
  141. # 停止后不需恢复,所以直接摘除该mission即可
  142. self.missions = [m for m in self.missions if m['id'] != missionId]
  143. return True
  144. def initMissionTasks(self, missionId: int):
  145. mission = [m for m in self.missions if m['id'] == missionId]
  146. if not mission:
  147. logger.info("未找到mission,是否错误传入更新请求")
  148. return []
  149. else:
  150. mission = mission[0]
  151. preparedTasks = []
  152. for plan in [plan for plan in mission['plans'] if plan['id'] in mission['tasks']]:
  153. algorithm = [algo for algo in algoList if algo['name'] == plan['algorithm']]
  154. if not algorithm:
  155. print("未找到选择的算法,是否需要更新算法列表?")
  156. return []
  157. else:
  158. algorithm = algorithm[0]
  159. preparedTasks.append({
  160. 'plan': {
  161. 'id': plan['id'],
  162. },
  163. 'cwd': str(algorithm['path']),
  164. 'command': algorithm['command'],
  165. 'mission': {
  166. 'id': mission['id'],
  167. },
  168. })
  169. print("initMissionTasks is :", preparedTasks)
  170. return preparedTasks
  171. def prepareTask(self, missionId: int, planId: int):
  172. mission = [m for m in self.missions if m['id'] == missionId]
  173. if not mission:
  174. print("未找到mission,是否错误传入更新请求")
  175. return []
  176. else:
  177. mission = mission[0]
  178. #
  179. for plan in [plan for plan in mission['plans'] if plan['id'] == planId]:
  180. algorithm = [algo for algo in algoList if algo['name'] == plan['algorithm']]
  181. if not algorithm:
  182. print("未找到选择的算法,是否需要更新算法列表?")
  183. return []
  184. else:
  185. algorithm = algorithm[0]
  186. preparedTask = {
  187. 'plan': {
  188. 'id': plan['id'],
  189. },
  190. 'cwd': str(algorithm['path']),
  191. 'command': algorithm['command'],
  192. 'mission': {
  193. 'id': mission['id'],
  194. },
  195. }
  196. return preparedTask
  197. print("未找到plan,是否错误传入更新请求")
  198. return []
  199. def fetchData(self, missionId, planId):
  200. mission = [m for m in self.missions if m['id'] == missionId]
  201. if not mission:
  202. print("未找到mission,是否错误传入更新请求")
  203. return []
  204. else:
  205. mission = mission[0]
  206. #
  207. for plan in mission['plans']:
  208. if plan['id'] == planId:
  209. return {
  210. 'nodes': plan['nodes'],
  211. 'edges': plan['edges'],
  212. }
  213. print("在plans中找不到对应的plan")
  214. return {}