# your_app/scheduler.py import logging from apscheduler.schedulers.background import BackgroundScheduler from django_apscheduler.jobstores import DjangoJobStore from .models import Alert from .utils import SYSTEMPERFORMANCE, TRIGGEREDALERTS, BASE_FILE_PATH from django.db import DatabaseError from pathlib import Path logger = logging.getLogger("scheduler") def getUploadFolderSize(): total_size = 0 for path in Path(BASE_FILE_PATH).rglob('*'): if path.is_file(): try: total_size += path.stat().st_size except (FileNotFoundError, PermissionError): pass # 返回值单位是B return total_size def checkAlert(): # 计算用户上传文件总大小,存入系统性能信息 SYSTEMPERFORMANCE['disk_used'] = getUploadFolderSize() # 获取所有开启的告警,存入触发告警列表 TRIGGEREDALERTS.clear() TRIGGEREDALERTS.extend(Alert.objects.checkAlert()) # 显示当前触发的告警 if TRIGGEREDALERTS: try: # 获取所有告警对象 if TRIGGEREDALERTS: # 生成制表符分隔的表格化输出 alert_table = "\n".join([ f"{alert.name}\t{alert.level}\t{alert.metric}\t{alert.threshold}" for alert in TRIGGEREDALERTS ]) logger.warning(f"当前触发告警 (共{len(TRIGGEREDALERTS)}条):\n名称\t级别\t指标\t阈值\n{alert_table}") except DatabaseError as e: logger.error(f"获取告警数据失败: {str(e)}") # 初始化调度器 scheduler = BackgroundScheduler() def start_scheduler(): try: # 使用Django的JobStore scheduler.add_jobstore(DjangoJobStore(), "default") # 添加任务(避免重复添加) if not scheduler.get_job("5_seconds_job"): scheduler.add_job( checkAlert, 'interval', seconds=2, id="5_seconds_job", replace_existing=True, ) logger.info("定时任务已启动") # 启动调度器 if not scheduler.running: scheduler.start() except Exception as e: logger.error(f"调度器启动失败: {str(e)}")