api_rawDataTrans.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. from django.contrib import auth
  2. from rest_framework.views import APIView
  3. from rest_framework.response import Response
  4. from rest_framework import status
  5. from rest_framework.authtoken.models import Token
  6. from rest_framework.authentication import BasicAuthentication, TokenAuthentication
  7. from .serializers import UserRegisterSerializer
  8. import logging
  9. from django.middleware.csrf import get_token
  10. from django.contrib.auth import login
  11. from api.utils import *
  12. from api.models import File, Mission, Plan, Result
  13. import requests
  14. import json, csv
  15. PLAN_FAILED = -1
  16. MISSION_FAILED = -2
  17. logger = logging.getLogger("process.manager.reporter")
  18. class RawDataTrans(APIView):
  19. authentication_classes = []
  20. permission_classes = []
  21. def get(self, request):
  22. if request.user:
  23. # 用户从前端发来请求
  24. return success("测试返回图数据")
  25. else:
  26. # 进程管理器发来请求
  27. # mission = Mission.objects.get(id=int(request.data.get('missionId')))
  28. plan = Plan.objects.get(id=int(request.data.get('planId')))
  29. return success("测试返回图数据")
  30. def post(self, request):
  31. # 处理进程反馈计算结果
  32. progress_result = request.data.get('result')
  33. performance = request.data.get('performance')
  34. # logger.info(progress_result)
  35. for progress_data in progress_result:
  36. mission = Mission.objects.get(id=int(progress_data['missionId']))
  37. # 收到结果反馈,表示该mission一定开始计算了
  38. # 注意当missin的状态不是init时,一定在其他地方先被修改,所以不能在此处修改
  39. if mission.state == 'init':
  40. mission.state = 'calculating'
  41. mission.save()
  42. plan = Plan.objects.get(id=int(progress_data['planId']))
  43. if 'nodes' in progress_data:
  44. nodes = []
  45. for node in progress_data['nodes']:
  46. nodes.append([node['id'], node['type'], node['meta']])
  47. else:
  48. nodes = None
  49. if 'edges' in progress_data:
  50. edges = []
  51. for edge in progress_data['edges']:
  52. edges.append([edge['from'], edge['to'], edge['meta']])
  53. else:
  54. edges = None
  55. progress = progress_data['progress']
  56. for param in [mission, plan, progress]:
  57. if param is None:
  58. logger.error(f"传递计算结果缺少参数")
  59. return failed(message="缺少结果参数")
  60. if int(progress) == 100:
  61. if not nodes or not edges:
  62. logger.error(f"传递计算结果进度达到100却没有结果")
  63. return failed(message="缺少结果参数")
  64. try:
  65. result = plan.own_result
  66. except Result.DoesNotExist:
  67. # 不存在结果文件,需要新建
  68. result = Result()
  69. result.plan = plan
  70. result.mission = mission
  71. result.user = plan.user
  72. result.progress = int(progress)
  73. result.save()
  74. if int(progress) == 100:
  75. # 任务完成后需要保存结果文件
  76. # 读取nodes和edges,生成结果文件
  77. nodeFile = File(type='csv', usage='result', content='node', user=plan.user)
  78. nodeFile.save()
  79. if not nodeFile.generate(nodes) == OK:
  80. logger.error("保存计算结果文件失败")
  81. return failed(message="保存节点结果文件失败")
  82. edgeFile = File(type='csv', usage='result', content='edge', user=plan.user)
  83. edgeFile.save()
  84. if not edgeFile.generate(edges) == OK:
  85. logger.error("保存计算结果文件失败")
  86. return failed(message="保存边结果文件失败")
  87. nodeFile.associate = edgeFile
  88. edgeFile.associate = nodeFile
  89. nodeFile.save()
  90. edgeFile.save()
  91. # 将文件与结果绑定
  92. result.nodeFile = nodeFile
  93. result.edgeFile = edgeFile
  94. result.progress = 100
  95. result.save()
  96. # 检查该任务是否下属Plan全部完成
  97. missionCompleted = True
  98. for p in mission.own_plans.all():
  99. try:
  100. r = p.own_result
  101. if not r.progress == 100:
  102. missionCompleted = False
  103. except Result.DoesNotExist:
  104. # 存在一个没有结果的plan,mission肯定没有计算完毕
  105. missionCompleted = False
  106. break
  107. if missionCompleted:
  108. logger.error("检测认为mission已结束")
  109. mission.state = 'done'
  110. mission.save()
  111. else:
  112. # 进度不到百分百,正在执行中,仅更新进度数值
  113. # 注意使用负数进度值表示单个处理失败或整个任务失败
  114. result.progress = int(progress)
  115. result.save()
  116. # 计算系统性能占用信息
  117. system_performance = performance['system']
  118. process_performance = performance['process']
  119. try:
  120. SYSTEMPERFORMANCE['cpu'] = system_performance['cpu']
  121. SYSTEMPERFORMANCE['mem_total'] = system_performance['mem_total']
  122. SYSTEMPERFORMANCE['mem_used'] = system_performance['mem_used']
  123. except Exception as error:
  124. logger.error("进程管理器传递了错误的系统性能信息")
  125. return failed(message="系统性能汇报信息错误")
  126. # 处理进程占用性能
  127. PROCESSPERFORMANCE.clear()
  128. for process in process_performance:
  129. PROCESSPERFORMANCE.append({
  130. 'pid': process['pid'],
  131. 'missionId': process['missionId'],
  132. 'planId': process['planId'],
  133. 'cpu': process['cpu'],
  134. 'mem_used': process['mem_used'],
  135. 'startTime': process['startTime'],
  136. 'algorithm': process['algorithm'],
  137. 'creator': Mission.objects.get(id=int(process['missionId'])).user.username,
  138. })
  139. return success(message="汇报结果文件成功")