from django.contrib import auth from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from rest_framework.authtoken.models import Token from rest_framework.authentication import BasicAuthentication, TokenAuthentication from .serializers import UserRegisterSerializer import logging from django.middleware.csrf import get_token from django.contrib.auth import login from api.utils import * from api.models import File, Mission, Plan, Result import requests import json, csv PLAN_FAILED = -1 MISSION_FAILED = -2 logger = logging.getLogger("process.manager.reporter") class RawDataTrans(APIView): authentication_classes = [] permission_classes = [] def get(self, request): if request.user: # 用户从前端发来请求 return success("测试返回图数据") else: # 进程管理器发来请求 # mission = Mission.objects.get(id=int(request.data.get('missionId'))) plan = Plan.objects.get(id=int(request.data.get('planId'))) return success("测试返回图数据") def post(self, request): # 处理进程反馈计算结果 progress_result = request.data.get('result') logger.info(progress_result) performance = request.data.get('performance') for progress_data in progress_result: mission = Mission.objects.get(id=int(progress_data['missionId'])) plan = Plan.objects.get(id=int(progress_data['planId'])) if 'nodes' in progress_data: nodes = progress_data['nodes'] else: nodes = None if 'edges' in progress_data: edges = progress_data['edges'] else: edges = None progress = progress_data['progress'] for param in [mission, plan, progress]: if param is None: logger.error(f"传递计算结果缺少参数") return failed(message="缺少结果参数") if int(progress) == 100: if not nodes or not edges: logger.error(f"传递计算结果进度达到100却没有结果") return failed(message="缺少结果参数") try: result = plan.own_result if int(progress) == 100: # 任务完成后需要保存结果文件 # 读取nodes和edges,生成结果文件 nodeFile = File(type='csv', usage='result', content='node', user=plan.user) nodeFile.save() if not nodeFile.generate(nodes) == OK: logger.error("保存计算结果文件失败") return failed(message="保存节点结果文件失败") edgeFile = File(type='csv', usage='result', content='edge', user=plan.user) edgeFile.save() if not edgeFile.generate(edges) == OK: logger.error("保存计算结果文件失败") return failed(message="保存边结果文件失败") nodeFile.associate = edgeFile edgeFile.associate = nodeFile nodeFile.save() edgeFile.save() # 将文件与结果绑定 result.nodeFile = nodeFile result.edgeFile = edgeFile result.progress = 100 result.save() else: # 进度不到百分百,正在执行中,仅更新进度数值 # 注意使用负数进度值表示单个处理失败或整个任务失败 result.progress = int(progress) result.save() except Result.DoesNotExist: # 不存在结果文件,需要新建 result = Result() result.plan = plan result.mission = mission result.user = plan.user result.progress = int(progress) result.save() # 计算系统性能占用信息 system_performance = performance['system'] process_performance = performance['process'] try: SYSTEMPERFORMANCE['cpu'] = system_performance['cpu'] SYSTEMPERFORMANCE['mem_total'] = system_performance['mem_total'] SYSTEMPERFORMANCE['mem_used'] = system_performance['mem_used'] except Exception as error: logger.error("进程管理器传递了错误的系统性能信息") return failed(message="系统性能汇报信息错误") # 处理进程占用性能 PROCESSPERFORMANCE.clear() for process in process_performance: PROCESSPERFORMANCE.append({ 'pid': process['pid'], 'missionId': process['missionId'], 'planId': process['planId'], 'cpu': process['cpu'], 'mem_used': process['mem_used'], 'startTime': process['startTime'], 'algorithm': process['algorithm'], 'creator': Mission.objects.get(id=int(process['missionId'])).user.username, }) return success(message="汇报结果文件成功")