from django.db import models import os, errno import csv from api.utils import * import json from random import randint import logging from django.http import FileResponse from Crypto.Cipher import ARC4 from Crypto.Protocol.KDF import PBKDF2 from Crypto.Hash import SHA512 from django.contrib.auth.hashers import make_password from io import TextIOWrapper, BytesIO from ast import literal_eval from typing import Any import zipfile types = [ ('csv', 'csv'), ] usages = [ ('input', 'input'), ('show', 'show'), ('result', 'result'), ('output', 'output'), ] contents = [ ('node', 'node'), ('edge', 'edge'), ] logger = logging.getLogger("file-model") # 盐值 salt = "vrServer" # 加密 def rc4_encrypt(key: bytes, data: bytes) -> bytes: cipher = ARC4.new(key) cipher.encrypt(b'\x00' * 1024) # 丢弃前1024字节密钥流 return cipher.encrypt(data) # 解密 def rc4_decrypt(key: bytes, data: bytes) -> bytes: return rc4_encrypt(key, data) # 由密码生成密钥 def derive_key(password: str, salt: bytes, iterations: int) -> bytes: return PBKDF2( password.encode('utf-8'), salt, dkLen=32, # 生成256位密钥 count=iterations, hmac_hash_module=SHA512 ) # 安全解析json def safe_json_parse(json_str: str, default: Any = None) -> Any: # 预处理空字符串 stripped_str = json_str.strip() if not stripped_str: return default if default is not None else [] try: data = json.loads(stripped_str) except json.JSONDecodeError: return default if default is not None else [] # 递归检查嵌套空列表 def is_empty_nested_list(obj): if isinstance(obj, list): return all(is_empty_nested_list(item) for item in obj) return False # 如果是空列表或嵌套空列表,返回默认值 if data == [] or is_empty_nested_list(data): return default if default is not None else [] return data class FileManager(models.Manager): def getHistory(self, user): # try: files = user.own_files.filter(usage="input").all() history = [] for file in files: fileId = file.id if file.content == "node" and not file.own_missions_node.exists(): # 输入的节点文件没有对应的任务,应该删除 file.delete() continue if file.content == "edge" and not file.own_missions_edge.exists(): # 输入的边文件没有对应的任务,应该删除 continue directory = os.path.join(BASE_FILE_PATH, str(user.id)) path = os.path.join(directory, str(fileId)) # 首先检查文件是否被归档压缩 archivePath = os.path.join(BASE_FILE_PATH, "archive.zip") with zipfile.ZipFile(archivePath, 'a') as zipf: fileArchPath = os.path.normpath(f"{file.user.id}/{file.id}").replace("\\", "/") if fileArchPath in zipf.namelist(): # 重复添加压缩,则跳过压缩步骤直接将原始文件删除即可 size = zipf.getinfo(fileArchPath).file_size else: # 文件未被压缩,查找是否真的有文件存在,否则清理掉没有实际文件的数据库记录 try: size = os.path.getsize(path) except FileNotFoundError: print("未找到对应文件,现将记录删除", fileId, file.name) self.get(id=fileId).delete() continue except Exception as error: print("读取历史记录时出现未知错误") return FAILED if size >= 1024 * 1024: size = size / (1024 * 1024) size = f"{size:.2f} MB" else: size = size / 1024 size = f"{size:.2f} KB" if file.content == 'node': missions = file.own_missions_node.all() fileInfo = { '节点总数': file.own_file_info.nodes, 'S节点数': file.own_file_info.sNodes, 'D节点数': file.own_file_info.dNodes, 'I节点数': file.own_file_info.iNodes, } elif file.content == 'edge': missions = file.own_missions_edge.all() fileInfo = { '边总数': file.own_file_info.edges, } else: logger.error(f"获取历史文件出错,文件格式错误 content: {file.content}") return FAILED history.append({ 'id': file.id, 'name': file.name, 'uploadTime': file.update_time, 'size': size, 'encrypted': file.encrypted, 'content': file.content, 'missions': [{'id': mission.id, 'name': mission.name} for mission in missions], 'fileInfo': fileInfo, }) return history # except Exception as error: # print("Failed to get upload history", error) # return FAILED # Create your models here. class File(models.Model): name = models.CharField(default="untitled", max_length=64) type = models.CharField(choices=types, max_length=5) usage = models.CharField(choices=usages, max_length=20) create_time = models.DateTimeField(auto_now_add=True) update_time = models.DateTimeField(auto_now=True) content = models.CharField(choices=contents, max_length=10) encrypted = models.BooleanField(default=False) archived = models.BooleanField(default=False) key = models.CharField(blank=True, null=True, max_length=128) associate = models.ForeignKey('self', on_delete=models.CASCADE, blank=True, null=True) user = models.ForeignKey(to="api.User", on_delete=models.CASCADE, related_name='own_files') objects = FileManager() def encrypt(self, password): # 该密码仅用于验证 verifyPassword = make_password( password, salt='vrviewer', hasher='pbkdf2_sha256' ) self.key = verifyPassword if self.encrypted: logger.error(f"文件{self.id}已经过加密,无法再次加密") return False else: # 仍使用用户输入密码加密 key = derive_key( password=password, salt=salt, iterations=4, ) path = os.path.join(os.path.join(BASE_FILE_PATH, str(self.user.id)), str(self.id)) with open(path, 'rb') as f: original_data = f.read() with open(path, 'wb') as f: f.write(rc4_encrypt(key, original_data)) self.encrypted = True self.save() return True def decrypted(self, password): # 仅用于验证 verifyPassword = make_password( password, salt='vrviewer', hasher='pbkdf2_sha256' ) if not verifyPassword == self.key: logger.error(f"文件{self.id}解密密钥错误") return False if not self.encrypted: logger.error(f"文件{self.id}未经过加密,无法进行解密") return False else: key = derive_key( password=password, salt=salt, iterations=4, ) path = os.path.join(os.path.join(BASE_FILE_PATH, str(self.user.id)), str(self.id)) with open(path, 'rb') as f: original_data = f.read() with open(path, 'wb') as f: f.write(rc4_decrypt(key, original_data)) self.encrypted = False self.save() return True def decryptToData(self, password): # 仅用于验证 verifyPassword = make_password( password, salt='vrviewer', hasher='pbkdf2_sha256' ) if not verifyPassword == self.key: logger.error(f"文件{self.id}解密密钥错误") return False if not self.encrypted: logger.error(f"文件{self.id}未经过加密,无法进行解密") return False else: key = derive_key( password=password, salt=salt, iterations=4, ) path = os.path.join(os.path.join(BASE_FILE_PATH, str(self.user.id)), str(self.id)) with open(path, 'rb') as f: original_data = f.read() return TextIOWrapper( BytesIO(rc4_decrypt(key, original_data)), encoding='utf-8', newline='') def verify(self, password): verifyPassword = make_password( password, salt='vrviewer', hasher='pbkdf2_sha256' ) if not self.encrypted: logger.error(f"文件{self.id}未经过加密,无法进行解密验证") return False if not verifyPassword == self.key: logger.error(f"文件{self.id}验证密钥错误") return False return True def archive(self): if not self.archived: filePath = os.path.join(os.path.join(BASE_FILE_PATH, str(self.user.id)), str(self.id)) archivePath = os.path.join(BASE_FILE_PATH, "archive.zip") try: with zipfile.ZipFile(archivePath, 'a') as zipf: fileArchPath = os.path.normpath(f"{self.user.id}/{self.id}").replace("\\", "/") if fileArchPath in zipf.namelist(): # 重复添加压缩,则跳过压缩步骤直接将原始文件删除即可 self.archived = True self.save() os.remove(filePath) else: # 使用用户id和文件id组合成压缩文件中索引 zipf.write(filePath, fileArchPath) self.archived = True self.save() os.remove(filePath) except Exception as error: logger.error(f"压缩文件{self.id} {self.name}失败: {error}") else: pass def unzip(self): if not self.archived: self.error(f"解压文件{self.id} {self.name}失败,文件并未压缩") return else: filePath = os.path.join(os.path.join(BASE_FILE_PATH, str(self.user.id)), str(self.id)) archivePath = os.path.join(BASE_FILE_PATH, "archive.zip") try: with zipfile.ZipFile(archivePath, 'r') as zipf: fileArchPath = os.path.normpath(f"{self.user.id}/{self.id}").replace("\\", "/") if fileArchPath in zipf.namelist(): with zipf.open(fileArchPath) as zipd: content = zipd.read() with open(filePath, 'wb') as f: f.write(content) # 恢复压缩标记 self.archived = False self.save() else: raise ValueError(f"该文件不存在压缩文件中") except Exception as error: logger.error(f"解压缩文件{self.id} {self.name}失败:{error}") def download(self): path = os.path.join(os.path.join(BASE_FILE_PATH, str(self.user.id)), str(self.id)) archivePath = os.path.join(BASE_FILE_PATH, "archive.zip") # 需要检查文件是否被归档,下载归档文件并不需要解压 if self.archived: with zipfile.ZipFile(archivePath, 'r') as zipf: fileArchPath = os.path.normpath(f"{self.user.id}/{self.id}").replace("\\", "/") if fileArchPath in zipf.namelist(): # 在压缩文件中找到文件,将其数据读出用于下载 with zipf.open(fileArchPath) as zfile: logger.info("从压缩包中下载") content = zfile.read() response = FileResponse(BytesIO(content)) response['Content-Disposition'] = f'attachment; filename="{self.name}"' return FileResponse(open(path, 'rb')) else: logger.info(f"文件{self.id} {self.name}具有压缩标记,但未在压缩文件中找到") raise ValueError(f"文件{self.id} {self.name}具有压缩标记,但未在压缩文件中找到") if not os.path.exists(path): return False # 加密后文件也不允许下载 if self.encrypted: return False else: response = FileResponse(open(path), 'rb') response['Content-Disposition'] = f'attachment; filename="{self.name}"' return FileResponse(open(path, 'rb')) def saveWithInfo(self): path = os.path.join(os.path.join(BASE_FILE_PATH, str(self.user.id)), str(self.id)) if self.content in ['node', 'nodes']: sCount = dCount = iCount = 0 nodeFile = csv.reader(open(path, 'r')) for line in nodeFile: if line[1] == 'S': sCount += 1 if line[1] == 'D': dCount += 1 if line[1] == 'I': iCount += 1 fileInfo = FileInfo() fileInfo.file = self fileInfo.nodes = sCount + dCount + iCount fileInfo.sNodes = sCount fileInfo.dNodes = dCount fileInfo.iNodes = iCount fileInfo.save() if self.content in ['edge', 'edges']: edges = 0 edgeFile = csv.reader(open(path, 'r')) for line in edgeFile: if line: edges += 1 fileInfo = FileInfo() fileInfo.file = self fileInfo.edges = edges fileInfo.save() self.save() def generate(self, data): # 从json结果生成文件 path = os.path.join(BASE_FILE_PATH, str(self.user.id)) if os.path.exists(os.path.join(path, str(self.id))): self.delete() return FILE_ALREADY_EXIST else: try: os.mkdir(path) except Exception as error: if not error.args[0] == 17: print(error) return FILE_FAILED_CREATE_DIR if self.content == 'node': nodes = [] file = open(os.path.join(path, str(self.id)), 'w', newline='') csvFile = csv.writer(file) for line in data: if not str(line[0]).isdigit(): logger.error("check file illegal failed node id wrong") return FAILED if not line[1] in ['S', 'D', 'I']: logger.error("check file illegal failed node type wrong") return FAILED if line[0] not in nodes: nodes.append(line[0]) else: logger.error("check file illegal failed node dudplicate id") return FAILED # 除了节点编号和节点类型外,其余参数全部放在line的后续位置,以字符串json的格式保存 csvFile.writerow(line) file.close() return OK if self.content == 'edge': edges = [] file = open(os.path.join(path, str(self.id)), 'w', newline='') csvFile = csv.writer(file) for line in data: if not str(line[0]).isdigit() or not str(line[1]).isdigit(): logger.error("check file illegal failed edge len =2") return FAILED # 注意默认将边视为无向边 # 检查重复 if [line[0], line[1]] not in edges and [line[1], line[0]] not in edges: edges.append([line[0], line[1]]) # 后续参数放在line的后续位置 csvFile.writerow(line) file.close() return OK return UNKNOWN_CONTENT def storage(self, file): # 将file数据保存成文件,不对file做任何处理 try: path = os.path.join(BASE_FILE_PATH, str(self.user.id)) if os.path.exists(os.path.join(path, str(self.id))): self.delete() return FILE_ALREADY_EXIST else: try: os.mkdir(path) except Exception as error: if not error.args[0] == 17: print(error) return FILE_FAILED_CREATE_DIR file_path = os.path.join(path, str(self.id)) f = open(file_path, 'wb') for bite in file: f.write(bite) f.close() return OK except Exception as error: logger.error(error) return FAILED # 检查文件是否合法 def checkIllegal(self): # 检查文件前需要检查是否被压缩,如被压缩则需要解压 if self.archived: self.unzip() path = os.path.join(os.path.join(BASE_FILE_PATH, str(self.user.id)), str(self.id)) path2 = os.path.join(os.path.join(BASE_FILE_PATH, str(self.user.id)), str(self.associate.id)) if self.content == 'node': file = csv.reader(open(path, 'r')) # 针对csv文件的检测 if self.type == 'csv': nodes = [] for line in file: if not len(line) >= 2: logger.error("check file illegal failed node len >= 2") return False if not line[0].isdigit(): logger.error("check file illegal failed node id wrong") return False if not line[1] in ['S', 'D', 'I']: logger.error(f"check file illegal failed node type wrong:{line}") return False if line[0] not in nodes: nodes.append(line[0]) else: logger.error("check file illegal failed node dudplicate id") return False return True if self.content == 'edge': edgeFile = csv.reader(open(path, 'r')) nodeFile = csv.reader(open(path2, 'r')) # 针对csv文件的检测 if self.type == 'csv': nodes = [] edges = [] for line in nodeFile: if not len(line) >= 2: logger.error("check file illegal failed node len >= 2") return False if not line[0].isdigit(): logger.error("check file illegal failed node id wrong") return False nodes.append(line[0]) for line in edgeFile: if not len(line) == 2: logger.error("check file illegal failed edge len =2") return False if line[0] not in nodes or line[1] not in nodes: logger.error("check file illegal failed edge id not exist") return False if [line[0], line[1]] not in edges and [line[1], line[0]] not in edges: edges.append([line[0], line[1]]) else: # 将图视为无向图,同一条边的正反算作重复 # 直接去除重复边 logger.error("check file illegal failed edge duplicate edge") return False return True def toJson(self, request=None): # 需要检查文件是否被归档压缩,如有则需要先解压 if self.archived: self.unzip() # 检查是否为加密文件,只有当文件usage为input时才应该存在加密属性 if self.usage == 'input' and self.encrypted: # 如果被加密则需要从request中获取解密密钥 key = request.session.get('encrypt-keys', {}).get(str(self.id), '') if key: file = csv.reader(self.decryptToData(key)) else: raise KeyError(f"解密文件{self.id}所需密钥不存在") else: path = os.path.join(os.path.join(BASE_FILE_PATH, str(self.user.id)), str(self.id)) file = csv.reader(open(path, 'r')) if self.content == 'node': if self.type == 'csv': nodes = [] for line in file: # 如果有额外数据,则放入第三个字段中 node = {'id': line[0], 'type': line[1], 'meta': []} for el in range(2, len(line)): # 对于meta字段,写入时数据为不带双引号,以冒号分割的字串 # 或者是直接正常的json字段,应尝试两种方式解析 try: metaJson = safe_json_parse(line[el].replace('\'', '\"')) # 检测是否嵌套过多 while metaJson: if type(metaJson[0]) == list: metaJson = metaJson[0] else: break node['meta'] = metaJson except Exception as error: logger.info(f"尝试以json格式解析文件meta内容{line[el]}失败,尝试以非标准格式解析{error}") # 尝试以冒号分隔格式解析 elList = el.split(':') if len(elList) != 2: logger.info(f"尝试以非标准格式解析文件meta内容{el}失败,放弃解析") continue else: node['meta'].append({ elList[0]: elList[1] }) # # 测试用,添加optimize # el = '{"optimize": "old"}' # node['meta'].append(json.loads(el)) # # 测试用,添加group # el = '{"group": "' + str(randint(1,5)) + '"}' # node['meta'].append(json.loads(el)) nodes.append(node) return nodes if self.content == 'edge': if self.type == 'csv': edges = [] for line in file: # 如果有额外数据,则放入第三个字段中 edge = {'from': line[0], 'to': line[1], 'meta': []} for el in range(2, len(line)): try: metaJson = safe_json_parse(line[el].replace('\'', '\"')) # 检测是否嵌套过多 while metaJson: if type(metaJson[0]) == list: metaJson = metaJson[0] else: break edge['meta'] = metaJson except Exception as error: logger.info(f"尝试以json格式解析文件meta内容{line[el]}失败,尝试以非标准格式解析{error}") # 尝试以冒号分隔格式解析 elList = el.split(':') if len(elList) != 2: logger.info(f"尝试以非标准格式解析文件meta内容{el}失败,放弃解析") continue else: edge['meta'].append({ elList[0]: elList[1] }) # # 测试用,添加optimize # el = '{"optimize": "old"}' # edge['meta'].append(json.loads(el)) edges.append(edge) # logger.info(edges) return edges def deleteStorage(self): path = os.path.join(os.path.join(BASE_FILE_PATH, str(self.user.id)), str(self.id)) if self.associate: path2 = os.path.join(os.path.join(BASE_FILE_PATH, str(self.user.id)), str(self.associate.id)) else: path2 = "" failedFlag = False for p in [path, path2]: if os.path.exists(p): try: os.remove(p) except Exception as error: # 可能出现失败的原因是文件被占用 logger.error(f"删除文件{self.id} {self.name}失败:{error}") failedFlag = True # 无论文件删除是否成功,都要把记录删除,多余的文件可以再后续清理时删除 if self.associate: self.associate.delete() if self: self.delete() if failedFlag: return FAILED return OK class Meta: app_label = 'api' class FileInfo(models.Model): file = models.OneToOneField(File, on_delete=models.CASCADE, related_name='own_file_info') nodes = models.IntegerField(default=0) sNodes = models.IntegerField(default=0) dNodes = models.IntegerField(default=0) iNodes = models.IntegerField(default=0) edges = models.IntegerField(default=0) # 待添加集中度等边的信息 class Meta: app_label = 'api'