scheduler.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. # your_app/scheduler.py
  2. import logging
  3. from apscheduler.schedulers.background import BackgroundScheduler
  4. from django_apscheduler.jobstores import DjangoJobStore
  5. from .models import Alert, File
  6. from .utils import SYSTEMPERFORMANCE, TRIGGEREDALERTS, BASE_FILE_PATH
  7. from django.db import DatabaseError
  8. from pathlib import Path
  9. import zipfile
  10. import os
  11. from datetime import datetime, timedelta
  12. logger = logging.getLogger("scheduler")
  13. def getUploadFolderSize():
  14. total_size = 0
  15. for path in Path(BASE_FILE_PATH).rglob('*'):
  16. if path.is_file():
  17. try:
  18. total_size += path.stat().st_size
  19. except (FileNotFoundError, PermissionError):
  20. pass
  21. # 返回值单位是B
  22. return total_size
  23. def checkAlert():
  24. # 计算用户上传文件总大小,存入系统性能信息
  25. SYSTEMPERFORMANCE['disk_used'] = getUploadFolderSize()
  26. # 获取所有开启的告警,存入触发告警列表
  27. TRIGGEREDALERTS.clear()
  28. TRIGGEREDALERTS.extend(Alert.objects.checkAlert())
  29. # 显示当前触发的告警
  30. if TRIGGEREDALERTS:
  31. try:
  32. # 获取所有告警对象
  33. if TRIGGEREDALERTS:
  34. # 生成制表符分隔的表格化输出
  35. alert_table = "\n".join([
  36. f"{alert.name}\t{alert.level}\t{alert.metric}\t{alert.threshold}"
  37. for alert in TRIGGEREDALERTS
  38. ])
  39. logger.warning(f"当前触发告警 (共{len(TRIGGEREDALERTS)}条):\n名称\t级别\t指标\t阈值\n{alert_table}")
  40. except DatabaseError as e:
  41. logger.error(f"获取告警数据失败: {str(e)}")
  42. # 文档归档检查,修改时间超过七天的文件将被压缩
  43. # 对应方法:File.archive / File.unZip 在FileModel中
  44. def checkArchive():
  45. now = datetime.now()
  46. cutoff = now - timedelta(days=7)
  47. expired_files = File.objects.filter(update_time__lt=cutoff)
  48. # for file in File.objects.all():
  49. # file.archived = False
  50. # file.save()
  51. for file in expired_files.all():
  52. file.archive()
  53. # 初始化调度器
  54. scheduler = BackgroundScheduler()
  55. def start_scheduler():
  56. try:
  57. # 使用Django的JobStore
  58. scheduler.add_jobstore(DjangoJobStore(), "default")
  59. # 添加任务(避免重复添加)
  60. # 系统性能监视任务
  61. if not scheduler.get_job("2_seconds_monitor_job"):
  62. scheduler.add_job(
  63. checkAlert,
  64. 'interval',
  65. seconds=2,
  66. id="2_seconds_job",
  67. replace_existing=True,
  68. )
  69. logger.info("定时任务已启动")
  70. # 文件归档计划任务
  71. if not scheduler.get_job("3600_seconds_archive_job"):
  72. scheduler.add_job(
  73. checkArchive,
  74. 'interval',
  75. # 正常应当尽可能久的检查一次,这里每小时检查一次,调试时可改为5秒一次
  76. seconds=3600,
  77. # seconds = 5,
  78. id="3600_seconds_archive_job",
  79. replace_existing=True,
  80. )
  81. # 启动调度器
  82. if not scheduler.running:
  83. scheduler.start()
  84. except Exception as e:
  85. logger.error(f"调度器启动失败: {str(e)}")