api_rawDataTrans.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. from django.contrib import auth
  2. from rest_framework.views import APIView
  3. from rest_framework.response import Response
  4. from rest_framework import status
  5. from rest_framework.authtoken.models import Token
  6. from rest_framework.authentication import BasicAuthentication, TokenAuthentication
  7. from .serializers import UserRegisterSerializer
  8. import logging
  9. from django.middleware.csrf import get_token
  10. from django.contrib.auth import login
  11. from api.utils import *
  12. from api.models import File, Mission, Plan, Result
  13. import requests
  14. import json, csv
  15. PLAN_FAILED = -1
  16. MISSION_FAILED = -2
  17. logger = logging.getLogger("process.manager.reporter")
  18. class RawDataTrans(APIView):
  19. authentication_classes = []
  20. permission_classes = []
  21. def get(self, request):
  22. if request.user:
  23. # 用户从前端发来请求
  24. return success("测试返回图数据")
  25. else:
  26. # 进程管理器发来请求
  27. # mission = Mission.objects.get(id=int(request.data.get('missionId')))
  28. plan = Plan.objects.get(id=int(request.data.get('planId')))
  29. return success("测试返回图数据")
  30. def post(self, request):
  31. # 处理进程反馈计算结果
  32. progress_result = request.data.get('result')
  33. performance = request.data.get('performance')
  34. for progress_data in progress_result:
  35. mission = Mission.objects.get(id=int(progress_data['missionId']))
  36. plan = Plan.objects.get(id=int(progress_data['planId']))
  37. if 'nodes' in progress_data:
  38. nodes = progress_data['nodes']
  39. else:
  40. nodes = None
  41. if 'edges' in progress_data:
  42. edges = progress_data['edges']
  43. else:
  44. edges = None
  45. progress = progress_data['progress']
  46. for param in [mission, plan, progress]:
  47. if param is None:
  48. logger.error(f"传递计算结果缺少参数")
  49. return failed(message="缺少结果参数")
  50. if int(progress) == 100:
  51. if not nodes or not edges:
  52. logger.error(f"传递计算结果进度达到100却没有结果")
  53. return failed(message="缺少结果参数")
  54. try:
  55. result = plan.own_result
  56. if int(progress) == 100:
  57. # 任务完成后需要保存结果文件
  58. # 读取nodes和edges,生成结果文件
  59. nodeFile = File(type='csv', usage='result', content='node', user=plan.user)
  60. nodeFile.save()
  61. if not nodeFile.generate(nodes) == OK:
  62. logger.error("保存计算结果文件失败")
  63. return failed(message="保存节点结果文件失败")
  64. edgeFile = File(type='csv', usage='result', content='edge', user=plan.user)
  65. edgeFile.save()
  66. if not edgeFile.generate(edges) == OK:
  67. logger.error("保存计算结果文件失败")
  68. return failed(message="保存边结果文件失败")
  69. nodeFile.associate = edgeFile
  70. edgeFile.associate = nodeFile
  71. nodeFile.save()
  72. edgeFile.save()
  73. # 将文件与结果绑定
  74. result.nodeFile = nodeFile
  75. result.edgeFile = edgeFile
  76. result.progress = 100
  77. result.save()
  78. else:
  79. # 进度不到百分百,正在执行中,仅更新进度数值
  80. # 注意使用负数进度值表示单个处理失败或整个任务失败
  81. result.progress = int(progress)
  82. result.save()
  83. except Result.DoesNotExist:
  84. # 不存在结果文件,需要新建
  85. result = Result()
  86. result.plan = plan
  87. result.mission = mission
  88. result.user = plan.user
  89. result.progress = int(progress)
  90. result.save()
  91. # 计算系统性能占用信息
  92. system_performance = performance['system']
  93. process_performance = performance['process']
  94. try:
  95. SYSTEMPERFORMANCE['cpu'] = system_performance['cpu']
  96. SYSTEMPERFORMANCE['mem_total'] = system_performance['mem_total']
  97. SYSTEMPERFORMANCE['mem_used'] = system_performance['mem_used']
  98. except Exception as error:
  99. logger.error("进程管理器传递了错误的系统性能信息")
  100. return failed(message="系统性能汇报信息错误")
  101. # 处理进程占用性能
  102. PROCESSPERFORMANCE.clear()
  103. for process in process_performance:
  104. PROCESSPERFORMANCE.append({
  105. 'pid': process['pid'],
  106. 'missionId': process['missionId'],
  107. 'planId': process['planId'],
  108. 'cpu': process['cpu'],
  109. 'mem_used': process['mem_used'],
  110. 'startTime': process['startTime'],
  111. 'algorithm': process['algorithm'],
  112. 'creator': Mission.objects.get(id=int(process['missionId'])).user.username,
  113. })
  114. logger.info(PROCESSPERFORMANCE)
  115. return success(message="汇报结果文件成功")