123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- # your_app/scheduler.py
- import logging
- from apscheduler.schedulers.background import BackgroundScheduler
- from django_apscheduler.jobstores import DjangoJobStore
- from .models import Alert, File
- from .utils import SYSTEMPERFORMANCE, TRIGGEREDALERTS, BASE_FILE_PATH
- from django.db import DatabaseError
- from pathlib import Path
- import zipfile
- import os
- from datetime import datetime, timedelta
- logger = logging.getLogger("scheduler")
- def getUploadFolderSize():
- total_size = 0
- for path in Path(BASE_FILE_PATH).rglob('*'):
- if path.is_file():
- try:
- total_size += path.stat().st_size
- except (FileNotFoundError, PermissionError):
- pass
- # 返回值单位是B
- return total_size
- def checkAlert():
- # 计算用户上传文件总大小,存入系统性能信息
- SYSTEMPERFORMANCE['disk_used'] = getUploadFolderSize()
- # 获取所有开启的告警,存入触发告警列表
- TRIGGEREDALERTS.clear()
- TRIGGEREDALERTS.extend(Alert.objects.checkAlert())
- # 显示当前触发的告警
- if TRIGGEREDALERTS:
- try:
- # 获取所有告警对象
- if TRIGGEREDALERTS:
- # 生成制表符分隔的表格化输出
- alert_table = "\n".join([
- f"{alert.name}\t{alert.level}\t{alert.metric}\t{alert.threshold}"
- for alert in TRIGGEREDALERTS
- ])
- logger.warning(f"当前触发告警 (共{len(TRIGGEREDALERTS)}条):\n名称\t级别\t指标\t阈值\n{alert_table}")
-
- except DatabaseError as e:
- logger.error(f"获取告警数据失败: {str(e)}")
- # 文档归档检查,修改时间超过七天的文件将被压缩
- # 对应方法:File.archive / File.unZip 在FileModel中
- def checkArchive():
- now = datetime.now()
- cutoff = now - timedelta(days=7)
- expired_files = File.objects.filter(update_time__lt=cutoff)
- # for file in File.objects.all():
- # file.archived = False
- # file.save()
- for file in expired_files.all():
- file.archive()
- # 初始化调度器
- scheduler = BackgroundScheduler()
- def start_scheduler():
- try:
- # 使用Django的JobStore
- scheduler.add_jobstore(DjangoJobStore(), "default")
- # 添加任务(避免重复添加)
- # 系统性能监视任务
- if not scheduler.get_job("2_seconds_monitor_job"):
- scheduler.add_job(
- checkAlert,
- 'interval',
- seconds=2,
- id="2_seconds_job",
- replace_existing=True,
- )
- logger.info("定时任务已启动")
- # 文件归档计划任务
- if not scheduler.get_job("3600_seconds_archive_job"):
- scheduler.add_job(
- checkArchive,
- 'interval',
- # 正常应当尽可能久的检查一次,这里每小时检查一次,调试时可改为5秒一次
- seconds=3600,
- # seconds = 5,
- id="3600_seconds_archive_job",
- replace_existing=True,
- )
- # 启动调度器
- if not scheduler.running:
- scheduler.start()
- except Exception as e:
- logger.error(f"调度器启动失败: {str(e)}")
|