from django.db import models from datetime import datetime from random import randint from api.utils import * import numpy as np from scipy.spatial import SphericalVoronoi from sklearn.preprocessing import MinMaxScaler #scikit-learn import networkx as nx from community import community_louvain #python-louvain graphForAlgo = [ ('optimize', 'optimize'), ('group', 'group'), ('predict', 'predict'), ] class GraphManager(models.Manager): def checkDuplicate(self, token): try: self.get(token=token) return True except GraphToken.DoesNotExist: return False return True class GraphToken(models.Model): # 用于访问图的验证码 create_time = models.DateTimeField(auto_now_add=True) graph = models.ForeignKey(to="api.Graph", on_delete=models.CASCADE, related_name="own_tokens") token = models.CharField(max_length=8) objects = GraphManager() def checkExpire(self): now = datetime.now() diff = now - self.create_time # 超过五分钟过期 return diff.total_seconds() > 300 class Meta: app_label = 'api' class GraphManager(models.Manager): def statistic(self, user): graphs = user.user_own_graphs.all() return { 'amount': len(graphs), } '''功能体探测功能的三维坐标生成 start''' def createFromResultGroupAlgo(self, result): print("Group3D") # 参数配置 GROUP_SPHERE_RADIUS = 10.0 # 社团分布球体半径 D_CORE_RADIUS = 1.5 # D类节点核心区半径 SI_SHELL_RADIUS = 4.0 # S/I类节点分布半径 MIN_GROUP_DIST = 8.0 # 社团间最小间距 nodeJson = result.nodeFile.toJson() edgeJson = result.edgeFile.toJson() # 内部函数 def _uniform_sphere_sampling(n, radius=1.0): """均匀球面采样""" points = [] phi = np.pi * (3.0 - np.sqrt(5.0)) # 黄金角度 for i in range(n): y = 1 - (i / (n-1)) * 2 radius_at_y = np.sqrt(1 - y*y) theta = phi * i x = np.cos(theta) * radius_at_y z = np.sin(theta) * radius_at_y points.append(radius * np.array([x, y, z])) return np.array(points) # 内部函数 def _optimize_layout(node_coords, groups, group_coords): """带社团约束的优化""" # 参数设置 NODE_REPULSION = 0.1 # 节点间斥力 GROUP_ATTRACTION = 0.05 # 社团内引力 ITERATIONS = 200 ids = list(node_coords.keys()) positions = np.array([node_coords[id] for id in ids]) group_map = {id: gid for gid, data in groups.items() for id in data['D']+data['SI']} for _ in range(ITERATIONS): # 节点间斥力 diffs = positions[:, None] - positions[None, :] # 3D差分矩阵 dists = np.linalg.norm(diffs, axis=-1) np.fill_diagonal(dists, np.inf) repulsion = NODE_REPULSION / (dists**2 + 1e-6) repulsion[dists > 5.0] = 0 # 仅处理近距离节点 # 社团内引力 attraction = np.zeros_like(positions) for i, id in enumerate(ids): gid = group_map[id] center = group_coords[gid]['center'] dir_to_center = center - positions[i] dist_to_center = np.linalg.norm(dir_to_center) if dist_to_center > 2*SI_SHELL_RADIUS: attraction[i] = GROUP_ATTRACTION * dir_to_center # 更新位置 movement = np.sum(repulsion[:, :, None] * diffs, axis=1) + attraction positions += 0.1 * movement # 归一化到0-10范围 scaler = MinMaxScaler(feature_range=(0, 10)) positions = scaler.fit_transform(positions) return {id: tuple(pos) for id, pos in zip(ids, positions)} # 内部函数 def _generate_si_coordinates(num_points, radius): # 生成随机方向 points = np.random.randn(num_points, 3) # 标准正态分布采样 # 归一化到单位球面 norms = np.linalg.norm(points, axis=1, keepdims=True) points_normalized = points / norms # 缩放到目标半径 points_scaled = points_normalized * radius return points_scaled # 按group分组 groups = {} for node in nodeJson: group_id = None for meta in node['meta']: if 'group' in meta: group_id = meta['group'] if not group_id: print(node, group_id, "非Group优化结果被用于进行Group图形布局生成") groups.setdefault(group_id, {'D': [], 'SI': []}) if node['type'] == 'D': groups[group_id]['D'].append(node['id']) else: groups[group_id]['SI'].append(node['id']) # === 步骤1: 为每个group分配空间位置 === group_coords = {} num_groups = len(groups) points = _uniform_sphere_sampling(num_groups, radius=GROUP_SPHERE_RADIUS) # 确保最小间距 for i in range(len(points)): for j in range(i+1, len(points)): dist = np.linalg.norm(points[i]-points[j]) if dist < MIN_GROUP_DIST: direction = (points[j] - points[i]) / dist points[j] = points[i] + direction * MIN_GROUP_DIST # 分配group中心坐标 for idx, (group_id, members) in enumerate(groups.items()): group_coords[group_id] = { 'center': points[idx], 'D_count': len(members['D']), 'SI_count': len(members['SI']) } # === 步骤2: 生成各group内部坐标 === node_coords = {} for group_id, data in groups.items(): center = group_coords[group_id]['center'] # D类节点:均匀分布在核心球体内 for node_id in data['D']: r = D_CORE_RADIUS * np.random.rand()**0.5 # 密度向中心聚集 theta = np.random.uniform(0, 2*np.pi) phi = np.arccos(2*np.random.rand() - 1) dx = r * np.sin(phi) * np.cos(theta) dy = r * np.sin(phi) * np.sin(theta) dz = r * np.cos(phi) node_coords[node_id] = center + np.array([dx, dy, dz]) # SI类节点:分布在球壳层 shell_radius = SI_SHELL_RADIUS + 0.5*np.abs(np.random.randn()) # 添加随机扰动 points = _generate_si_coordinates(len(data['SI']), shell_radius) # 使用球形Voronoi分布避免重叠 sv = SphericalVoronoi(points, radius=shell_radius) sv.sort_vertices_of_regions() for i, node_id in enumerate(data['SI']): point = sv.points[i] * shell_radius node_coords[node_id] = center + point # === 步骤3: 全局优化 === # return _optimize_layout(node_coords, groups, group_coords) final_coords = _optimize_layout(node_coords, groups, group_coords) # 将坐标添加到每个节点字典 for node in nodeJson: node_id = node['id'] x, y, z = final_coords[node_id] # 添加三维坐标字段 node['coordinates'] = { 'x': round(x, 4), # 保留4位小数 'y': round(y, 4), 'z': round(z, 4) } print(nodeJson) '''结果示例输出,并用三维视图显示START''' # for node_id, (x, y, z) in final_coords.items(): # print(f"Node {node_id}: ({x:.2f}, {y:.2f}, {z:.2f})") # import matplotlib.pyplot as plt # from mpl_toolkits.mplot3d import Axes3D # fig = plt.figure(figsize=(10, 8)) # ax = fig.add_subplot(111, projection='3d') # # 按类型绘制节点 # # types = {n['id']: n['meta']['group'] for n in nodeJson} # types = {} # for n in nodeJson: # for meta in n['meta']: # if 'group' in meta: # types[n['id']] = str(meta['group']) # colors = {'1': 'red', '2': 'green', '3': 'blue', '4': 'yellow', '5': 'black'} # for node_id, (x, y, z) in final_coords.items(): # ax.scatter(x, y, z, # c=colors[types[node_id]], # s=50 if types[node_id] == 'D' else 30, # marker='o' if types[node_id] == 'D' else '^') # # 绘制边 # for edge in edgeJson: # x = [final_coords[edge['from']][0], final_coords[edge['to']][0]] # y = [final_coords[edge['from']][1], final_coords[edge['to']][1]] # z = [final_coords[edge['from']][2], final_coords[edge['to']][2]] # ax.plot(x, y, z, c='gray', alpha=0.3) # ax.set_xlabel('X') # ax.set_ylabel('Y') # ax.set_zlabel('Z') # plt.show() '''结果示例输出,并用三维视图显示END''' return self.create( result=result, user=result.user, nodes=nodeJson, edges=edgeJson, type=result.plan.algorithm.type, ) '''功能体探测功能的三维坐标生成 end''' def createFromResult(self, result): nodeJson = result.nodeFile.toJson() edgeJson = result.edgeFile.toJson() # 功能体探测算法需要额外将同一功能体聚集,单独处理 if result.plan.algorithm.type == 'group': return self.createFromResultGroupAlgo(result) # 只有3d和VR视图需要生成图,需要给每个节点赋值一个三维坐标,该坐标需要满足一定尺度 '''START''' # 创建NetworkX图对象 G = nx.Graph() G.add_nodes_from([(n['id'], {'type': n['type']}) for n in nodeJson]) G.add_edges_from([(e['from'], e['to']) for e in edgeJson]) # 使用Louvain算法检测社团 partition = community_louvain.best_partition(G) communities = {} for node, comm_id in partition.items(): communities.setdefault(comm_id, []).append(node) '''定义函数''' def generate_3d_coordinates(nodes, communities): """ 计算三维空间坐标布局 """ # 参数设置 D_LAYER_RADIUS = 1.0 # D类型节点分布半径 I_S_LAYER_RADIUS = 3.0 # I/S类型节点分布半径, 因为D类靠中心,I/S类靠外围 COMMUNITY_SPACING = 8.0 # 社团间距 # 初始化坐标字典 coords = {} # 为每个社团分配空间区域 comm_centers = {} for i, (comm_id, members) in enumerate(communities.items()): # 在三维空间分配不同象限 angle = i * 2*np.pi / len(communities) comm_centers[comm_id] = ( COMMUNITY_SPACING * np.cos(angle), COMMUNITY_SPACING * np.sin(angle), COMMUNITY_SPACING * (i % 2) # Z轴分层 ) # 为每个节点生成坐标 for node in nodes: node_id = node['id'] comm_id = partition[node_id] base_x, base_y, base_z = comm_centers[comm_id] # 根据节点类型确定分布层 if node['type'] == 'D': # D类型节点紧密分布在社团中心附近 r = D_LAYER_RADIUS * np.random.rand() theta = np.random.uniform(0, 2*np.pi) phi = np.random.uniform(0, np.pi) x = base_x + r * np.sin(phi) * np.cos(theta) y = base_y + r * np.sin(phi) * np.sin(theta) z = base_z + r * np.cos(phi) else: # I/S类型节点分布在更大半径的球面上 r = I_S_LAYER_RADIUS theta = np.random.uniform(0, 2*np.pi) phi = np.random.uniform(0, np.pi) x = base_x + r * np.sin(phi) * np.cos(theta) y = base_y + r * np.sin(phi) * np.sin(theta) z = base_z + r * np.cos(phi) coords[node_id] = (x, y, z) return coords def optimize_overlap(coords, iterations=100): """ 使用斥力优化减少节点重叠 """ nodes = list(coords.keys()) positions = np.array(list(coords.values())) # 设置优化参数 repulsion = 0.1 # 斥力强度 min_dist = 0.5 # 最小间距 for _ in range(iterations): # 计算所有节点间距 diffs = positions[:, np.newaxis, :] - positions[np.newaxis, :, :] dists = np.linalg.norm(diffs, axis=2) # 避免自比较 np.fill_diagonal(dists, np.inf) # 计算斥力 with np.errstate(divide='ignore'): forces = repulsion / (dists**2) forces[dists > min_dist] = 0 # 更新位置 movement = np.sum(forces[:, :, np.newaxis] * diffs, axis=1) positions += movement # 归一化到0-10范围 scaler = MinMaxScaler(feature_range=(0, 10)) positions = scaler.fit_transform(positions) return {node: tuple(pos) for node, pos in zip(nodes, positions)} '''结束定义''' # 生成初始坐标 initial_coords = generate_3d_coordinates(nodeJson, communities) # 优化防止重叠 final_coords = optimize_overlap(initial_coords) '''结果示例输出,并用三维视图显示START''' # for node_id, (x, y, z) in final_coords.items(): # print(f"Node {node_id}: ({x:.2f}, {y:.2f}, {z:.2f})") # import matplotlib.pyplot as plt # from mpl_toolkits.mplot3d import Axes3D # fig = plt.figure(figsize=(10, 8)) # ax = fig.add_subplot(111, projection='3d') # # 按类型绘制节点 # types = {n['id']: n['type'] for n in nodeJson} # colors = {'D': 'red', 'I': 'green', 'S': 'blue'} # for node_id, (x, y, z) in final_coords.items(): # ax.scatter(x, y, z, # c=colors[types[node_id]], # s=50 if types[node_id] == 'D' else 30, # marker='o' if types[node_id] == 'D' else '^') # # 绘制边 # for edge in edgeJson: # x = [final_coords[edge['from']][0], final_coords[edge['to']][0]] # y = [final_coords[edge['from']][1], final_coords[edge['to']][1]] # z = [final_coords[edge['from']][2], final_coords[edge['to']][2]] # ax.plot(x, y, z, c='gray', alpha=0.3) # ax.set_xlabel('X') # ax.set_ylabel('Y') # ax.set_zlabel('Z') # plt.show() '''结果示例输出,并用三维视图显示END''' '''END''' # 将坐标添加到每个节点字典 for node in nodeJson: node_id = node['id'] x, y, z = final_coords[node_id] # 添加三维坐标字段 node['coordinates'] = { 'x': round(x, 4), # 保留4位小数 'y': round(y, 4), 'z': round(z, 4) } return self.create( result=result, user=result.user, nodes=nodeJson, edges=edgeJson, type=result.plan.algorithm.type, ) class Graph(models.Model): create_time = models.DateTimeField(auto_now_add=True) update_time = models.DateTimeField(auto_now=True) type = models.CharField(choices=graphForAlgo, default='optimize', max_length=16) # 根据算法不同,生成的图数据结构不同 nodes = models.JSONField() edges = models.JSONField() result = models.ForeignKey(to="api.Result", on_delete=models.CASCADE, related_name="own_graphs") user = models.ForeignKey(to="api.User", on_delete=models.CASCADE, related_name="user_own_graphs") objects = GraphManager() def generateToken(self): # 删除旧验证码 if self.own_tokens.exists(): for token in self.own_tokens.all(): token.delete() # 生成验证码 token = GraphToken() token.graph = self token.token = ''.join([str(randint(0,9)) for n in range(6)]) while(GraphToken.objects.checkDuplicate(token.token)): token.token = ''.join([str(randint(0,9)) for n in range(6)]) token.save() return token.token class Meta: app_label = 'api'