api_calculate.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. from django.contrib import auth
  2. from rest_framework.views import APIView
  3. from api.utils import *
  4. from api.models import File, Mission
  5. import requests
  6. import logging
  7. import json
  8. logger = logging.getLogger('calculate')
  9. class CalculateAPI(APIView):
  10. def post(self, request):
  11. user = request.user
  12. try:
  13. if request.data.get('mission'):
  14. missionId = request.data.get('mission')
  15. elif request.data.get('missionId'):
  16. missionId = request.data.get('missionId')
  17. mission = Mission.objects.get(id=int(missionId))
  18. except Mission.DoesNotExist:
  19. return failed(message="处理任务控制失败,未找到处理任务")
  20. except Exception as error:
  21. print("处理任务控制失败", error)
  22. return failed(message="处理任务控制失败,未找到处理任务")
  23. # 检测用户权限
  24. if not mission.user == user and not user.identity == 'admin':
  25. logger.error(f"未授权用户{user.username}尝试执行任务{mission.name}:{mission.id}的计算操作")
  26. return failed(message="处理任务控制失败,用户没有操作权限")
  27. command = request.data.get('command')
  28. try:
  29. assert command in ['start', 'pause', 'stop']
  30. except Exception as error:
  31. print("处理任务控制代码错误")
  32. return failed(message="处理任务控制失败,控制代码错误")
  33. # 进行状态检查
  34. if command == 'start':
  35. # 如任务已经启动,则不操作
  36. if not mission.state in ['init', 'pause', 'stop']:
  37. return success(message="任务正在进行中")
  38. if mission.state in ['done']:
  39. return success(message="任务已完成")
  40. else:
  41. # 非启动任务需要检查任务是否已经开始
  42. if not mission.state == 'calculating' and command == 'pause':
  43. return failed(message="任务没有在运行,无法暂停")
  44. if not ( mission.state == 'calculating' or mission.state == 'pause') and command == 'stop':
  45. return failed(message="任务没有在运行或暂停,无法停止")
  46. # 向调度程序提交计算任务
  47. # mission = request.json['mission']
  48. # plans = request.json['plans']
  49. # 根据控制指令不同,执行不同操作
  50. # 启动计算任务,构造通用数据结构
  51. if command == 'start':
  52. calculateData = {
  53. 'mission': {
  54. 'id': mission.id,
  55. },
  56. 'plans': []
  57. }
  58. rootPlan = mission.own_plans.get(parent=None)
  59. # 如果是恢复历史任务,则需要进行特殊处理未下一步运行提供所有plan的原料数据,因此此时可能scheduler已经终止过,过程数据已丢失
  60. # 如果是从初始状态或停止状态启动
  61. if mission.state == 'pause':
  62. # 如果是暂停中恢复,需要找出当前停在哪里,将父节点的结果作为原料输入
  63. # 找出最新plan
  64. lastPlans = [child for child in mission.own_plans.filter(parent=rootPlan).all()]
  65. latestPlans = []
  66. while lastPlans:
  67. currentPlan = lastPlans.pop()
  68. # 检查当前currentPlan是否已经计算完毕,如果已经计算完毕则继续找它的子节点,直到找到未计算完的
  69. # 注意可能当前plan的result已经被删除
  70. if hasattr(currentPlan, 'own_result') and currentPlan.own_result.progress == 100:
  71. # 计算完毕,将子节点加入寻找列表
  72. lastPlans.extend([child for child in mission.own_plans.filter(parent=currentPlan).all()])
  73. else:
  74. # 没有计算完毕,从这里恢复,需要用父节点结果作为自己的输入
  75. parentPlan = currentPlan.parent
  76. # 判断是否父节点是根节点,是则用mission的数据作为输入
  77. if parentPlan.parent == None:
  78. nodesJson = mission.nodeFile.toJson(request)
  79. edgesJson = mission.edgeFile.toJson(request)
  80. else:
  81. nodesJson = parentPlan.own_result.nodeFile.toJson(request)
  82. edgesJson = parentPlan.own_result.edgeFile.toJson(request)
  83. latestPlans.append(currentPlan)
  84. calculateData['plans'].append({
  85. 'id': currentPlan.id,
  86. 'algorithm': currentPlan.algorithm.name,
  87. 'nodes': nodesJson,
  88. 'edges': edgesJson,
  89. 'children': list(mission.own_plans.filter(parent=currentPlan).values_list('id', flat=True)),
  90. # 新式控制方法,检测到带有root字段,则为初始节点
  91. 'root': True,
  92. })
  93. # 开始弹栈,将所有后续子节点加入
  94. while latestPlans:
  95. currentPlan = latestPlans.pop()
  96. for child in mission.own_plans.filter(parent=currentPlan).all():
  97. latestPlans.append(child)
  98. # 子节点既没有初始数据也没有root标记
  99. calculateData['plans'].append({
  100. 'id': child.id,
  101. 'algorithm': child.algorithm.name,
  102. 'nodes': None,
  103. 'edges': None,
  104. 'children': list(mission.own_plans.filter(parent=child).values_list('id', flat=True)),
  105. })
  106. response = requests.post(SCHEDULER_BASE_URL + '/resumeMission', json=calculateData)
  107. print(response.json())
  108. if response.json()['code'] == 'OK':
  109. # 更新mission的运行状态
  110. mission.state = 'calculating'
  111. mission.save()
  112. return success(message="恢复计算任务成功")
  113. else:
  114. return failed(message="恢复计算任务失败")
  115. # 如果不是恢复计算任务,则正常计算
  116. calculateData['plans'].append({
  117. 'id': rootPlan.id,
  118. 'nodes': mission.nodeFile.toJson(request),
  119. 'edges': mission.edgeFile.toJson(request),
  120. 'children': list(mission.own_plans.filter(parent=rootPlan).values_list('id', flat=True)),
  121. })
  122. rootPlans = [ child for child in mission.own_plans.filter(parent=rootPlan)]
  123. while rootPlans:
  124. tempPlans = rootPlans.copy()
  125. rootPlans = []
  126. for p in tempPlans:
  127. children = [ child for child in mission.own_plans.filter(parent=p)]
  128. # 判断是否父节点存在计算结果,有则作为子节点输入
  129. if hasattr(p.parent, 'own_result') and p.parent.own_result.nodeFile and p.parent.own_result.edgeFile:
  130. calculateData['plans'].append({
  131. 'id': p.id,
  132. 'algorithm': p.algorithm.name,
  133. 'nodes': p.parent.own_result.nodeFile.toJson(request),
  134. 'edges': p.parent.own_result.edgeFile.toJson(request),
  135. 'children': [child.id for child in children],
  136. })
  137. else:
  138. calculateData['plans'].append({
  139. 'id': p.id,
  140. 'algorithm': p.algorithm.name,
  141. 'nodes': None,
  142. 'edges': None,
  143. 'children': [child.id for child in children],
  144. })
  145. rootPlans.extend(children)
  146. # logger.warning(calculateData)
  147. response = requests.post(SCHEDULER_BASE_URL + '/addMission', json=calculateData)
  148. print(response.json())
  149. if response.json()['code'] == 'OK':
  150. # 更新mission的运行状态
  151. mission.state = 'calculating'
  152. mission.save()
  153. return success(message="启动计算任务成功")
  154. else:
  155. return failed(message="启动计算任务失败")
  156. # 暂停计算
  157. if command == 'pause':
  158. # 暂停任务时仅需要传递mission的id
  159. calculateData = {
  160. 'mission': {
  161. 'id': mission.id,
  162. }
  163. }
  164. response = requests.post(SCHEDULER_BASE_URL + '/pauseMission', json=calculateData)
  165. if response.json()['code'] == 'OK':
  166. mission.state = 'pause'
  167. mission.save()
  168. # 暂停后,所有当前未完成任务全部变为初始状态,删除其result
  169. for plan in mission.own_plans.all():
  170. if hasattr(plan, 'own_result') and plan.own_result.progress != 100:
  171. result = plan.own_result
  172. result.delete()
  173. return success(message="暂停计算任务成功")
  174. else:
  175. print(response)
  176. return failed(message="暂停计算任务失败", data=response)
  177. # 停止计算
  178. if command == 'stop':
  179. # 停止任务时仅需要传递mission的id
  180. calculateData = {
  181. 'mission': {
  182. 'id': mission.id,
  183. }
  184. }
  185. response = requests.post(SCHEDULER_BASE_URL + '/stopMission', json=calculateData)
  186. if response.json()['code'] == 'OK':
  187. # 停止后所有任务相关数据删除,恢复初始状态
  188. mission.state = 'stop'
  189. mission.save()
  190. # 停止后,所有plan的进度需要全部归零,即删除所有result,或将result的进度改为-2
  191. # 尝试删除所有result方案,下次运行时会重新创建result
  192. for plan in mission.own_plans.all():
  193. if hasattr(plan, 'own_result'):
  194. result = plan.own_result
  195. result.delete()
  196. return success(message="暂停停止计算任务成功")
  197. else:
  198. print(response)
  199. return failed(message="停止计算任务失败", data=response)