123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- from django.contrib import auth
- from rest_framework.views import APIView
- from rest_framework.response import Response
- from rest_framework import status
- from rest_framework.authtoken.models import Token
- from rest_framework.authentication import BasicAuthentication, TokenAuthentication
- from .serializers import UserRegisterSerializer
- import logging
- from django.middleware.csrf import get_token
- from django.contrib.auth import login
- from api.utils import *
- from api.models import File, Mission, Plan, Result
- import requests
- import json, csv
- PLAN_FAILED = -1
- MISSION_FAILED = -2
- logger = logging.getLogger("process.manager.reporter")
- class RawDataTrans(APIView):
- authentication_classes = []
- permission_classes = []
- def get(self, request):
- if request.user:
- # 用户从前端发来请求
- return success("测试返回图数据")
- else:
- # 进程管理器发来请求
- # mission = Mission.objects.get(id=int(request.data.get('missionId')))
- plan = Plan.objects.get(id=int(request.data.get('planId')))
- return success("测试返回图数据")
-
- def post(self, request):
- # 处理进程反馈计算结果
- progress_result = request.data.get('result')
- performance = request.data.get('performance')
- for progress_data in progress_result:
- mission = Mission.objects.get(id=int(progress_data['missionId']))
- plan = Plan.objects.get(id=int(progress_data['planId']))
- if 'nodes' in progress_data:
- nodes = progress_data['nodes']
- else:
- nodes = None
- if 'edges' in progress_data:
- edges = progress_data['edges']
- else:
- edges = None
- progress = progress_data['progress']
- for param in [mission, plan, progress]:
- if param is None:
- logger.error(f"传递计算结果缺少参数")
- return failed(message="缺少结果参数")
-
- if int(progress) == 100:
- if not nodes or not edges:
- logger.error(f"传递计算结果进度达到100却没有结果")
- return failed(message="缺少结果参数")
- try:
- result = plan.own_result
- if int(progress) == 100:
- # 任务完成后需要保存结果文件
- # 读取nodes和edges,生成结果文件
- nodeFile = File(type='csv', usage='result', content='node', user=plan.user)
- nodeFile.save()
- if not nodeFile.generate(nodes) == OK:
- logger.error("保存计算结果文件失败")
- return failed(message="保存节点结果文件失败")
- edgeFile = File(type='csv', usage='result', content='edge', user=plan.user)
- edgeFile.save()
- if not edgeFile.generate(edges) == OK:
- logger.error("保存计算结果文件失败")
- return failed(message="保存边结果文件失败")
- nodeFile.associate = edgeFile
- edgeFile.associate = nodeFile
- nodeFile.save()
- edgeFile.save()
- # 将文件与结果绑定
- result.nodeFile = nodeFile
- result.edgeFile = edgeFile
- result.progress = 100
- result.save()
- else:
- # 进度不到百分百,正在执行中,仅更新进度数值
- # 注意使用负数进度值表示单个处理失败或整个任务失败
- result.progress = int(progress)
- result.save()
- except Result.DoesNotExist:
- # 不存在结果文件,需要新建
- result = Result()
- result.plan = plan
- result.mission = mission
- result.user = plan.user
- result.progress = int(progress)
- result.save()
-
- # 计算系统性能占用信息
- system_performance = performance['system']
- process_performance = performance['process']
- try:
- SYSTEMPERFORMANCE['cpu'] = system_performance['cpu']
- SYSTEMPERFORMANCE['mem_total'] = system_performance['mem_total']
- SYSTEMPERFORMANCE['mem_used'] = system_performance['mem_used']
- except Exception as error:
- logger.error("进程管理器传递了错误的系统性能信息")
- return failed(message="系统性能汇报信息错误")
-
- # 处理进程占用性能
- PROCESSPERFORMANCE.clear()
- for process in process_performance:
- PROCESSPERFORMANCE.append({
- 'pid': process['pid'],
- 'missionId': process['missionId'],
- 'planId': process['planId'],
- 'cpu': process['cpu'],
- 'mem_used': process['mem_used'],
- 'startTime': process['startTime'],
- 'algorithm': process['algorithm'],
- 'creator': Mission.objects.get(id=int(process['missionId'])).user.username,
- })
- logger.info(PROCESSPERFORMANCE)
- return success(message="汇报结果文件成功")
|