from django.contrib import auth from rest_framework.views import APIView from api.utils import * from api.models import File, Mission import requests import logging import json logger = logging.getLogger('calculate') class CalculateAPI(APIView): def post(self, request): user = request.user try: if request.data.get('mission'): missionId = request.data.get('mission') elif request.data.get('missionId'): missionId = request.data.get('missionId') mission = Mission.objects.get(id=int(missionId)) except Mission.DoesNotExist: return failed(message="处理任务控制失败,未找到处理任务") except Exception as error: print("处理任务控制失败", error) return failed(message="处理任务控制失败,未找到处理任务") # 检测用户权限 if not mission.user == user and not user.identity == 'admin': logger.error(f"未授权用户{user.username}尝试执行任务{mission.name}:{mission.id}的计算操作") return failed(message="处理任务控制失败,用户没有操作权限") command = request.data.get('command') try: assert command in ['start', 'pause', 'stop'] except Exception as error: print("处理任务控制代码错误") return failed(message="处理任务控制失败,控制代码错误") # 进行状态检查 if command == 'start': # 如任务已经启动,则不操作 if not mission.state in ['init', 'pause', 'stop']: return success(message="任务正在进行中") if mission.state in ['done']: return success(message="任务已完成") else: # 非启动任务需要检查任务是否已经开始 if not mission.state == 'calculating': return failed(message="任务没有在运行,无法暂停或停止") # 向调度程序提交计算任务 # mission = request.json['mission'] # plans = request.json['plans'] # 根据控制指令不同,执行不同操作 # 启动计算任务,构造通用数据结构 if command == 'start': calculateData = { 'mission': { 'id': mission.id, }, 'plans': [] } rootPlan = mission.own_plans.get(parent=None) # 如果是恢复历史任务,则需要进行特殊处理未下一步运行提供所有plan的原料数据,因此此时可能scheduler已经终止过,过程数据已丢失 # 如果是从初始状态或停止状态启动 if mission.state == 'pause': # 如果是暂停中恢复,需要找出当前停在哪里,将父节点的结果作为原料输入 # 找出最新plan lastPlans = [child for child in mission.own_plans.filter(parent=rootPlan).all()] latestPlans = [] while lastPlans: currentPlan = lastPlans.pop() # 检查当前currentPlan是否已经计算完毕,如果已经计算完毕则继续找它的子节点,直到找到未计算完的 # 注意可能当前plan的result已经被删除 if hasattr(currentPlan, 'own_result') and currentPlan.own_result.progress == 100: # 计算完毕,将子节点加入寻找列表 lastPlans.extend([child for child in mission.own_plans.filter(parent=currentPlan).all()]) else: # 没有计算完毕,从这里恢复,需要用父节点结果作为自己的输入 parentPlan = currentPlan.parent # 判断是否父节点是根节点,是则用mission的数据作为输入 if parentPlan.parent == None: nodesJson = mission.nodeFile.toJson() edgesJson = mission.edgeFile.toJson() else: nodesJson = parentPlan.own_result.nodeFile.toJson() edgesJson = parentPlan.own_result.edgeFile.toJson() latestPlans.append(currentPlan) calculateData['plans'].append({ 'id': currentPlan.id, 'algorithm': currentPlan.algorithm.name, 'nodes': nodesJson, 'edges': edgesJson, 'children': list(mission.own_plans.filter(parent=currentPlan).values_list('id', flat=True)), # 新式控制方法,检测到带有root字段,则为初始节点 'root': True, }) # 开始弹栈,将所有后续子节点加入 while latestPlans: currentPlan = latestPlans.pop() for child in mission.own_plans.filter(parent=currentPlan).all(): latestPlans.append(child) # 子节点既没有初始数据也没有root标记 calculateData['plans'].append({ 'id': child.id, 'algorithm': child.algorithm.name, 'nodes': None, 'edges': None, 'children': list(mission.own_plans.filter(parent=child).values_list('id', flat=True)), }) response = requests.post(SCHEDULER_BASE_URL + '/resumeMission', json=calculateData) print(response.json()) if response.json()['code'] == 'OK': # 更新mission的运行状态 mission.state = 'calculating' mission.save() return success(message="恢复计算任务成功") else: return failed(message="恢复计算任务失败") # 如果不是恢复计算任务,则正常计算 calculateData['plans'].append({ 'id': rootPlan.id, 'nodes': mission.nodeFile.toJson(), 'edges': mission.edgeFile.toJson(), 'children': list(mission.own_plans.filter(parent=rootPlan).values_list('id', flat=True)), }) rootPlans = [ child for child in mission.own_plans.filter(parent=rootPlan)] while rootPlans: tempPlans = rootPlans.copy() rootPlans = [] for p in tempPlans: children = [ child for child in mission.own_plans.filter(parent=p)] # 判断是否父节点存在计算结果,有则作为子节点输入 if hasattr(p.parent, 'own_result') and p.parent.own_result.nodeFile and p.parent.own_result.edgeFile: calculateData['plans'].append({ 'id': p.id, 'algorithm': p.algorithm.name, 'nodes': p.parent.own_result.nodeFile.toJson(), 'edges': p.parent.own_result.edgeFile.toJson(), 'children': [child.id for child in children], }) else: calculateData['plans'].append({ 'id': p.id, 'algorithm': p.algorithm.name, 'nodes': None, 'edges': None, 'children': [child.id for child in children], }) rootPlans.extend(children) response = requests.post(SCHEDULER_BASE_URL + '/addMission', json=calculateData) print(response.json()) if response.json()['code'] == 'OK': # 更新mission的运行状态 mission.state = 'calculating' mission.save() return success(message="启动计算任务成功") else: return failed(message="启动计算任务失败") # 暂停计算 if command == 'pause': # 暂停任务时仅需要传递mission的id calculateData = { 'mission': { 'id': mission.id, } } response = requests.post(SCHEDULER_BASE_URL + '/pauseMission', json=calculateData) if response.json()['code'] == 'OK': mission.state = 'pause' mission.save() # 暂停后,所有当前未完成任务全部变为初始状态,删除其result for plan in mission.own_plans.all(): if hasattr(plan, 'own_result') and plan.own_result.progress != 100: result = plan.own_result result.delete() return success(message="暂停计算任务成功") else: print(response) return failed(message="暂停计算任务失败", data=response) # 停止计算 if command == 'stop': # 停止任务时仅需要传递mission的id calculateData = { 'mission': { 'id': mission.id, } } response = requests.post(SCHEDULER_BASE_URL + '/stopMission', json=calculateData) if response.json()['code'] == 'OK': # 停止后所有任务相关数据删除,恢复初始状态 mission.state = 'stop' mission.save() # 停止后,所有plan的进度需要全部归零,即删除所有result,或将result的进度改为-2 # 尝试删除所有result方案,下次运行时会重新创建result for plan in mission.own_plans.all(): if hasattr(plan, 'own_result'): result = plan.own_result result.delete() return success(message="暂停计算任务成功") else: print(response) return failed(message="暂停计算任务失败", data=response)