plane/apiserver/plane/celery.py
Nikhil 7249f84e18
dev: code improvements and minor performance upgrades (#2201)
* dev: remove len for empty comparison

* dev: using in instead of multiple ors

* dev: assign expression to empty variables

* dev: use f-string

* dev: remove list comprehension and use generators

* dev: remove assert from paginator

* dev: use is for identity comparison with singleton

* dev: remove unnecessary else statements

* dev: fix does not exists error for both project and workspace

* dev: remove reimports

* dev: iterate a dictionary

* dev: remove unused commented code

* dev: remove redefinition

* dev: remove unused imports

* dev: remove unused imports

* dev: remove unnecessary f strings

* dev: remove unused variables

* dev: use literal structure to create the data structure

* dev: add empty lines at the end of the file

* dev: remove user middleware

* dev: remove unnecessary default None
2023-11-01 20:35:06 +05:30

33 lines
1.0 KiB
Python

import os
from celery import Celery
from plane.settings.redis import redis_instance
from celery.schedules import crontab
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "plane.settings.production")
ri = redis_instance()
app = Celery("plane")
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object("django.conf:settings", namespace="CELERY")
app.conf.beat_schedule = {
# Executes every day at 12 AM
"check-every-day-to-archive-and-close": {
"task": "plane.bgtasks.issue_automation_task.archive_and_close_old_issues",
"schedule": crontab(hour=0, minute=0),
},
"check-every-day-to-delete_exporter_history": {
"task": "plane.bgtasks.exporter_expired_task.delete_old_s3_link",
"schedule": crontab(hour=0, minute=0),
},
}
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
app.conf.beat_scheduler = 'django_celery_beat.schedulers.DatabaseScheduler'