api_rawDataTrans.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. from django.contrib import auth
  2. from rest_framework.views import APIView
  3. from rest_framework.response import Response
  4. from rest_framework import status
  5. from rest_framework.authtoken.models import Token
  6. from rest_framework.authentication import BasicAuthentication, TokenAuthentication
  7. from .serializers import UserRegisterSerializer
  8. import logging
  9. from django.middleware.csrf import get_token
  10. from django.contrib.auth import login
  11. from api.utils import *
  12. from api.models import File, Mission, Plan, Result
  13. import requests
  14. import json, csv
  15. PLAN_FAILED = -1
  16. MISSION_FAILED = -2
  17. logger = logging.getLogger("process.manager.reporter")
  18. class RawDataTrans(APIView):
  19. authentication_classes = []
  20. permission_classes = []
  21. def get(self, request):
  22. if request.user:
  23. # 用户从前端发来请求
  24. return success("测试返回图数据")
  25. else:
  26. # 进程管理器发来请求
  27. # mission = Mission.objects.get(id=int(request.data.get('missionId')))
  28. plan = Plan.objects.get(id=int(request.data.get('planId')))
  29. return success("测试返回图数据")
  30. def post(self, request):
  31. # 处理进程反馈计算结果
  32. progress_result = request.data.get('result')
  33. logger.info(progress_result)
  34. performance = request.data.get('performance')
  35. for progress_data in progress_result:
  36. mission = Mission.objects.get(id=int(progress_data['missionId']))
  37. plan = Plan.objects.get(id=int(progress_data['planId']))
  38. if 'nodes' in progress_data:
  39. nodes = progress_data['nodes']
  40. else:
  41. nodes = None
  42. if 'edges' in progress_data:
  43. edges = progress_data['edges']
  44. else:
  45. edges = None
  46. progress = progress_data['progress']
  47. for param in [mission, plan, progress]:
  48. if param is None:
  49. logger.error(f"传递计算结果缺少参数")
  50. return failed(message="缺少结果参数")
  51. if int(progress) == 100:
  52. if not nodes or not edges:
  53. logger.error(f"传递计算结果进度达到100却没有结果")
  54. return failed(message="缺少结果参数")
  55. try:
  56. result = plan.own_result
  57. if int(progress) == 100:
  58. # 任务完成后需要保存结果文件
  59. # 读取nodes和edges,生成结果文件
  60. nodeFile = File(type='csv', usage='result', content='node', user=plan.user)
  61. nodeFile.save()
  62. if not nodeFile.generate(nodes) == OK:
  63. logger.error("保存计算结果文件失败")
  64. return failed(message="保存节点结果文件失败")
  65. edgeFile = File(type='csv', usage='result', content='edge', user=plan.user)
  66. edgeFile.save()
  67. if not edgeFile.generate(edges) == OK:
  68. logger.error("保存计算结果文件失败")
  69. return failed(message="保存边结果文件失败")
  70. nodeFile.associate = edgeFile
  71. edgeFile.associate = nodeFile
  72. nodeFile.save()
  73. edgeFile.save()
  74. # 将文件与结果绑定
  75. result.nodeFile = nodeFile
  76. result.edgeFile = edgeFile
  77. result.progress = 100
  78. result.save()
  79. else:
  80. # 进度不到百分百,正在执行中,仅更新进度数值
  81. # 注意使用负数进度值表示单个处理失败或整个任务失败
  82. result.progress = int(progress)
  83. result.save()
  84. except Result.DoesNotExist:
  85. # 不存在结果文件,需要新建
  86. result = Result()
  87. result.plan = plan
  88. result.mission = mission
  89. result.user = plan.user
  90. result.progress = int(progress)
  91. result.save()
  92. # 计算系统性能占用信息
  93. system_performance = performance['system']
  94. process_performance = performance['process']
  95. try:
  96. SYSTEMPERFORMANCE['cpu'] = system_performance['cpu']
  97. SYSTEMPERFORMANCE['mem_total'] = system_performance['mem_total']
  98. SYSTEMPERFORMANCE['mem_used'] = system_performance['mem_used']
  99. except Exception as error:
  100. logger.error("进程管理器传递了错误的系统性能信息")
  101. return failed(message="系统性能汇报信息错误")
  102. # 处理进程占用性能
  103. PROCESSPERFORMANCE.clear()
  104. for process in process_performance:
  105. PROCESSPERFORMANCE.append({
  106. 'pid': process['pid'],
  107. 'missionId': process['missionId'],
  108. 'planId': process['planId'],
  109. 'cpu': process['cpu'],
  110. 'mem_used': process['mem_used'],
  111. 'startTime': process['startTime'],
  112. 'algorithm': process['algorithm'],
  113. 'creator': Mission.objects.get(id=int(process['missionId'])).user.username,
  114. })
  115. return success(message="汇报结果文件成功")