|
- from django.db import models
- from datetime import datetime
- from random import randint
- from api.utils import *
- import numpy as np
- from scipy.spatial import SphericalVoronoi
- from sklearn.preprocessing import MinMaxScaler #scikit-learn
- import networkx as nx
- from community import community_louvain #python-louvain
- graphForAlgo = [
- ('optimize', 'optimize'),
- ('group', 'group'),
- ('predict', 'predict'),
- ]
- class GraphManager(models.Manager):
- def checkDuplicate(self, token):
- try:
- self.get(token=token)
- return True
- except GraphToken.DoesNotExist:
- return False
- return True
- class GraphToken(models.Model):
- # 用于访问图的验证码
- create_time = models.DateTimeField(auto_now_add=True)
- graph = models.ForeignKey(to="api.Graph", on_delete=models.CASCADE, related_name="own_tokens")
- token = models.CharField(max_length=8)
- objects = GraphManager()
- def checkExpire(self):
- now = datetime.now()
- diff = now - self.create_time
- # 超过五分钟过期
- return diff.total_seconds() > 300
-
- class Meta:
- app_label = 'api'
- class GraphManager(models.Manager):
- def statistic(self, user):
- graphs = user.user_own_graphs.all()
- return {
- 'amount': len(graphs),
- }
-
- '''功能体探测功能的三维坐标生成 start'''
- def createFromResultGroupAlgo(self, result):
- print("Group3D")
- # 参数配置
- GROUP_SPHERE_RADIUS = 10.0 # 社团分布球体半径
- D_CORE_RADIUS = 1.5 # D类节点核心区半径
- SI_SHELL_RADIUS = 4.0 # S/I类节点分布半径
- MIN_GROUP_DIST = 8.0 # 社团间最小间距
- nodeJson = result.nodeFile.toJson()
- edgeJson = result.edgeFile.toJson()
-
- # 内部函数
- def _uniform_sphere_sampling(n, radius=1.0):
- """均匀球面采样"""
- points = []
- phi = np.pi * (3.0 - np.sqrt(5.0)) # 黄金角度
- for i in range(n):
- y = 1 - (i / (n-1)) * 2
- radius_at_y = np.sqrt(1 - y*y)
- theta = phi * i
- x = np.cos(theta) * radius_at_y
- z = np.sin(theta) * radius_at_y
- points.append(radius * np.array([x, y, z]))
- return np.array(points)
-
- # 内部函数
- def _optimize_layout(node_coords, groups, group_coords):
- """带社团约束的优化"""
- # 参数设置
- NODE_REPULSION = 0.1 # 节点间斥力
- GROUP_ATTRACTION = 0.05 # 社团内引力
- ITERATIONS = 200
-
- ids = list(node_coords.keys())
- positions = np.array([node_coords[id] for id in ids])
- group_map = {id: gid for gid, data in groups.items() for id in data['D']+data['SI']}
-
- for _ in range(ITERATIONS):
- # 节点间斥力
- diffs = positions[:, None] - positions[None, :] # 3D差分矩阵
- dists = np.linalg.norm(diffs, axis=-1)
- np.fill_diagonal(dists, np.inf)
-
- repulsion = NODE_REPULSION / (dists**2 + 1e-6)
- repulsion[dists > 5.0] = 0 # 仅处理近距离节点
-
- # 社团内引力
- attraction = np.zeros_like(positions)
- for i, id in enumerate(ids):
- gid = group_map[id]
- center = group_coords[gid]['center']
- dir_to_center = center - positions[i]
- dist_to_center = np.linalg.norm(dir_to_center)
- if dist_to_center > 2*SI_SHELL_RADIUS:
- attraction[i] = GROUP_ATTRACTION * dir_to_center
-
- # 更新位置
- movement = np.sum(repulsion[:, :, None] * diffs, axis=1) + attraction
- positions += 0.1 * movement
-
- # 归一化到0-10范围
- scaler = MinMaxScaler(feature_range=(0, 10))
- positions = scaler.fit_transform(positions)
-
- return {id: tuple(pos) for id, pos in zip(ids, positions)}
-
- # 内部函数
- def _generate_si_coordinates(num_points, radius):
- # 生成随机方向
- points = np.random.randn(num_points, 3) # 标准正态分布采样
-
- # 归一化到单位球面
- norms = np.linalg.norm(points, axis=1, keepdims=True)
- points_normalized = points / norms
-
- # 缩放到目标半径
- points_scaled = points_normalized * radius
- return points_scaled
- # 按group分组
- groups = {}
- for node in nodeJson:
- group_id = None
- for meta in node['meta']:
- if 'group' in meta:
- group_id = meta['group']
- if not group_id:
- print(node, group_id, "非Group优化结果被用于进行Group图形布局生成")
- groups.setdefault(group_id, {'D': [], 'SI': []})
- if node['type'] == 'D':
- groups[group_id]['D'].append(node['id'])
- else:
- groups[group_id]['SI'].append(node['id'])
-
- # === 步骤1: 为每个group分配空间位置 ===
- group_coords = {}
- num_groups = len(groups)
- points = _uniform_sphere_sampling(num_groups, radius=GROUP_SPHERE_RADIUS)
-
- # 确保最小间距
- for i in range(len(points)):
- for j in range(i+1, len(points)):
- dist = np.linalg.norm(points[i]-points[j])
- if dist < MIN_GROUP_DIST:
- direction = (points[j] - points[i]) / dist
- points[j] = points[i] + direction * MIN_GROUP_DIST
-
- # 分配group中心坐标
- for idx, (group_id, members) in enumerate(groups.items()):
- group_coords[group_id] = {
- 'center': points[idx],
- 'D_count': len(members['D']),
- 'SI_count': len(members['SI'])
- }
-
- # === 步骤2: 生成各group内部坐标 ===
- node_coords = {}
- for group_id, data in groups.items():
- center = group_coords[group_id]['center']
-
- # D类节点:均匀分布在核心球体内
- for node_id in data['D']:
- r = D_CORE_RADIUS * np.random.rand()**0.5 # 密度向中心聚集
- theta = np.random.uniform(0, 2*np.pi)
- phi = np.arccos(2*np.random.rand() - 1)
-
- dx = r * np.sin(phi) * np.cos(theta)
- dy = r * np.sin(phi) * np.sin(theta)
- dz = r * np.cos(phi)
- node_coords[node_id] = center + np.array([dx, dy, dz])
-
- # SI类节点:分布在球壳层
- shell_radius = SI_SHELL_RADIUS + 0.5*np.abs(np.random.randn()) # 添加随机扰动
- points = _generate_si_coordinates(len(data['SI']), shell_radius)
- # 使用球形Voronoi分布避免重叠
- sv = SphericalVoronoi(points, radius=shell_radius)
- sv.sort_vertices_of_regions()
-
- for i, node_id in enumerate(data['SI']):
- point = sv.points[i] * shell_radius
- node_coords[node_id] = center + point
-
- # === 步骤3: 全局优化 ===
- # return _optimize_layout(node_coords, groups, group_coords)
- final_coords = _optimize_layout(node_coords, groups, group_coords)
- # 将坐标添加到每个节点字典
- for node in nodeJson:
- node_id = node['id']
- x, y, z = final_coords[node_id]
- # 添加三维坐标字段
- node['coordinates'] = {
- 'x': round(x, 4), # 保留4位小数
- 'y': round(y, 4),
- 'z': round(z, 4)
- }
- print(nodeJson)
- '''结果示例输出,并用三维视图显示START'''
- # for node_id, (x, y, z) in final_coords.items():
- # print(f"Node {node_id}: ({x:.2f}, {y:.2f}, {z:.2f})")
- # import matplotlib.pyplot as plt
- # from mpl_toolkits.mplot3d import Axes3D
- # fig = plt.figure(figsize=(10, 8))
- # ax = fig.add_subplot(111, projection='3d')
- # # 按类型绘制节点
- # # types = {n['id']: n['meta']['group'] for n in nodeJson}
- # types = {}
- # for n in nodeJson:
- # for meta in n['meta']:
- # if 'group' in meta:
- # types[n['id']] = str(meta['group'])
- # colors = {'1': 'red', '2': 'green', '3': 'blue', '4': 'yellow', '5': 'black'}
- # for node_id, (x, y, z) in final_coords.items():
- # ax.scatter(x, y, z,
- # c=colors[types[node_id]],
- # s=50 if types[node_id] == 'D' else 30,
- # marker='o' if types[node_id] == 'D' else '^')
- # # 绘制边
- # for edge in edgeJson:
- # x = [final_coords[edge['from']][0], final_coords[edge['to']][0]]
- # y = [final_coords[edge['from']][1], final_coords[edge['to']][1]]
- # z = [final_coords[edge['from']][2], final_coords[edge['to']][2]]
- # ax.plot(x, y, z, c='gray', alpha=0.3)
- # ax.set_xlabel('X')
- # ax.set_ylabel('Y')
- # ax.set_zlabel('Z')
- # plt.show()
- '''结果示例输出,并用三维视图显示END'''
- return self.create(
- result=result,
- user=result.user,
- nodes=nodeJson,
- edges=edgeJson,
- type=result.plan.algorithm.type,
- )
- '''功能体探测功能的三维坐标生成 end'''
-
- def createFromResult(self, result):
- nodeJson = result.nodeFile.toJson()
- edgeJson = result.edgeFile.toJson()
- # 功能体探测算法需要额外将同一功能体聚集,单独处理
- if result.plan.algorithm.type == 'group':
- return self.createFromResultGroupAlgo(result)
- # 只有3d和VR视图需要生成图,需要给每个节点赋值一个三维坐标,该坐标需要满足一定尺度
- '''START'''
- # 创建NetworkX图对象
- G = nx.Graph()
- G.add_nodes_from([(n['id'], {'type': n['type']}) for n in nodeJson])
- G.add_edges_from([(e['from'], e['to']) for e in edgeJson])
- # 使用Louvain算法检测社团
- partition = community_louvain.best_partition(G)
- communities = {}
- for node, comm_id in partition.items():
- communities.setdefault(comm_id, []).append(node)
- '''定义函数'''
- def generate_3d_coordinates(nodes, communities):
- """ 计算三维空间坐标布局 """
- # 参数设置
- D_LAYER_RADIUS = 1.0 # D类型节点分布半径
- I_S_LAYER_RADIUS = 3.0 # I/S类型节点分布半径, 因为D类靠中心,I/S类靠外围
- COMMUNITY_SPACING = 8.0 # 社团间距
-
- # 初始化坐标字典
- coords = {}
-
- # 为每个社团分配空间区域
- comm_centers = {}
- for i, (comm_id, members) in enumerate(communities.items()):
- # 在三维空间分配不同象限
- angle = i * 2*np.pi / len(communities)
- comm_centers[comm_id] = (
- COMMUNITY_SPACING * np.cos(angle),
- COMMUNITY_SPACING * np.sin(angle),
- COMMUNITY_SPACING * (i % 2) # Z轴分层
- )
-
- # 为每个节点生成坐标
- for node in nodes:
- node_id = node['id']
- comm_id = partition[node_id]
- base_x, base_y, base_z = comm_centers[comm_id]
-
- # 根据节点类型确定分布层
- if node['type'] == 'D':
- # D类型节点紧密分布在社团中心附近
- r = D_LAYER_RADIUS * np.random.rand()
- theta = np.random.uniform(0, 2*np.pi)
- phi = np.random.uniform(0, np.pi)
-
- x = base_x + r * np.sin(phi) * np.cos(theta)
- y = base_y + r * np.sin(phi) * np.sin(theta)
- z = base_z + r * np.cos(phi)
- else:
- # I/S类型节点分布在更大半径的球面上
- r = I_S_LAYER_RADIUS
- theta = np.random.uniform(0, 2*np.pi)
- phi = np.random.uniform(0, np.pi)
-
- x = base_x + r * np.sin(phi) * np.cos(theta)
- y = base_y + r * np.sin(phi) * np.sin(theta)
- z = base_z + r * np.cos(phi)
-
- coords[node_id] = (x, y, z)
-
- return coords
-
- def optimize_overlap(coords, iterations=100):
- """ 使用斥力优化减少节点重叠 """
- nodes = list(coords.keys())
- positions = np.array(list(coords.values()))
-
- # 设置优化参数
- repulsion = 0.1 # 斥力强度
- min_dist = 0.5 # 最小间距
-
- for _ in range(iterations):
- # 计算所有节点间距
- diffs = positions[:, np.newaxis, :] - positions[np.newaxis, :, :]
- dists = np.linalg.norm(diffs, axis=2)
-
- # 避免自比较
- np.fill_diagonal(dists, np.inf)
-
- # 计算斥力
- with np.errstate(divide='ignore'):
- forces = repulsion / (dists**2)
- forces[dists > min_dist] = 0
-
- # 更新位置
- movement = np.sum(forces[:, :, np.newaxis] * diffs, axis=1)
- positions += movement
-
- # 归一化到0-10范围
- scaler = MinMaxScaler(feature_range=(0, 10))
- positions = scaler.fit_transform(positions)
-
- return {node: tuple(pos) for node, pos in zip(nodes, positions)}
- '''结束定义'''
- # 生成初始坐标
- initial_coords = generate_3d_coordinates(nodeJson, communities)
- # 优化防止重叠
- final_coords = optimize_overlap(initial_coords)
- '''结果示例输出,并用三维视图显示START'''
- # for node_id, (x, y, z) in final_coords.items():
- # print(f"Node {node_id}: ({x:.2f}, {y:.2f}, {z:.2f})")
- # import matplotlib.pyplot as plt
- # from mpl_toolkits.mplot3d import Axes3D
- # fig = plt.figure(figsize=(10, 8))
- # ax = fig.add_subplot(111, projection='3d')
- # # 按类型绘制节点
- # types = {n['id']: n['type'] for n in nodeJson}
- # colors = {'D': 'red', 'I': 'green', 'S': 'blue'}
- # for node_id, (x, y, z) in final_coords.items():
- # ax.scatter(x, y, z,
- # c=colors[types[node_id]],
- # s=50 if types[node_id] == 'D' else 30,
- # marker='o' if types[node_id] == 'D' else '^')
- # # 绘制边
- # for edge in edgeJson:
- # x = [final_coords[edge['from']][0], final_coords[edge['to']][0]]
- # y = [final_coords[edge['from']][1], final_coords[edge['to']][1]]
- # z = [final_coords[edge['from']][2], final_coords[edge['to']][2]]
- # ax.plot(x, y, z, c='gray', alpha=0.3)
- # ax.set_xlabel('X')
- # ax.set_ylabel('Y')
- # ax.set_zlabel('Z')
- # plt.show()
- '''结果示例输出,并用三维视图显示END'''
- '''END'''
- # 将坐标添加到每个节点字典
- for node in nodeJson:
- node_id = node['id']
- x, y, z = final_coords[node_id]
- # 添加三维坐标字段
- node['coordinates'] = {
- 'x': round(x, 4), # 保留4位小数
- 'y': round(y, 4),
- 'z': round(z, 4)
- }
- return self.create(
- result=result,
- user=result.user,
- nodes=nodeJson,
- edges=edgeJson,
- type=result.plan.algorithm.type,
- )
- class Graph(models.Model):
- create_time = models.DateTimeField(auto_now_add=True)
- update_time = models.DateTimeField(auto_now=True)
- type = models.CharField(choices=graphForAlgo, default='optimize', max_length=16)
- # 根据算法不同,生成的图数据结构不同
- nodes = models.JSONField()
- edges = models.JSONField()
- result = models.ForeignKey(to="api.Result", on_delete=models.CASCADE, related_name="own_graphs")
- user = models.ForeignKey(to="api.User", on_delete=models.CASCADE, related_name="user_own_graphs")
- objects = GraphManager()
- def generateToken(self):
- # 删除旧验证码
- if self.own_tokens.exists():
- for token in self.own_tokens.all():
- token.delete()
- # 生成验证码
- token = GraphToken()
- token.graph = self
- token.token = ''.join([str(randint(0,9)) for n in range(6)])
- while(GraphToken.objects.checkDuplicate(token.token)):
- token.token = ''.join([str(randint(0,9)) for n in range(6)])
- token.save()
- return token.token
-
- class Meta:
- app_label = 'api'
|