refactor: move all background task from rqworker to celery (#668)

* refactor: move all background task from rqworker to celery

* dev: update background job to take input in parameters rather than a single dict

* dev: update procfile for new worker

* dev: docker updates for new celery worker
This commit is contained in:
pablohashescobar 2023-04-06 22:56:36 +05:30 committed by GitHub
parent 100c431ac3
commit ec818a5523
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 150 additions and 162 deletions

View File

@ -1,2 +1,2 @@
web: gunicorn -w 4 -k uvicorn.workers.UvicornWorker plane.asgi:application --bind 0.0.0.0:$PORT --config gunicorn.config.py --max-requests 10000 --max-requests-jitter 1000 --access-logfile - web: gunicorn -w 4 -k uvicorn.workers.UvicornWorker plane.asgi:application --bind 0.0.0.0:$PORT --config gunicorn.config.py --max-requests 10000 --max-requests-jitter 1000 --access-logfile -
worker: python manage.py rqworker worker: celery -A plane.settings.celery worker -l info

View File

@ -3,4 +3,4 @@ set -e
python manage.py wait_for_db python manage.py wait_for_db
python manage.py migrate python manage.py migrate
python manage.py rqworker celery -A plane.settings.celery worker -l info

View File

@ -317,21 +317,19 @@ class CycleIssueViewSet(BaseViewSet):
# Capture Issue Activity # Capture Issue Activity
issue_activity.delay( issue_activity.delay(
{ type="issue.activity.updated",
"type": "issue.activity", requested_data=json.dumps({"cycles_list": issues}),
"requested_data": json.dumps({"cycles_list": issues}), actor_id=str(self.request.user.id),
"actor_id": str(self.request.user.id), issue_id=str(self.kwargs.get("pk", None)),
"issue_id": str(self.kwargs.get("pk", None)), project_id=str(self.kwargs.get("project_id", None)),
"project_id": str(self.kwargs.get("project_id", None)), current_instance=json.dumps(
"current_instance": json.dumps( {
{ "updated_cycle_issues": update_cycle_issue_activity,
"updated_cycle_issues": update_cycle_issue_activity, "created_cycle_issues": serializers.serialize(
"created_cycle_issues": serializers.serialize( "json", record_to_create
"json", record_to_create ),
), }
} ),
),
},
) )
# Return all Cycle Issues # Return all Cycle Issues

View File

@ -85,16 +85,14 @@ class IssueViewSet(BaseViewSet):
) )
if current_instance is not None: if current_instance is not None:
issue_activity.delay( issue_activity.delay(
{ type="issue.activity.updated",
"type": "issue.activity.updated", requested_data=requested_data,
"requested_data": requested_data, actor_id=str(self.request.user.id),
"actor_id": str(self.request.user.id), issue_id=str(self.kwargs.get("pk", None)),
"issue_id": str(self.kwargs.get("pk", None)), project_id=str(self.kwargs.get("project_id", None)),
"project_id": str(self.kwargs.get("project_id", None)), current_instance=json.dumps(
"current_instance": json.dumps( IssueSerializer(current_instance).data, cls=DjangoJSONEncoder
IssueSerializer(current_instance).data, cls=DjangoJSONEncoder ),
),
},
) )
return super().perform_update(serializer) return super().perform_update(serializer)
@ -105,18 +103,16 @@ class IssueViewSet(BaseViewSet):
) )
if current_instance is not None: if current_instance is not None:
issue_activity.delay( issue_activity.delay(
{ type="issue.activity.deleted",
"type": "issue.activity.deleted", requested_data=json.dumps(
"requested_data": json.dumps( {"issue_id": str(self.kwargs.get("pk", None))}
{"issue_id": str(self.kwargs.get("pk", None))} ),
), actor_id=str(self.request.user.id),
"actor_id": str(self.request.user.id), issue_id=str(self.kwargs.get("pk", None)),
"issue_id": str(self.kwargs.get("pk", None)), project_id=str(self.kwargs.get("project_id", None)),
"project_id": str(self.kwargs.get("project_id", None)), current_instance=json.dumps(
"current_instance": json.dumps( IssueSerializer(current_instance).data, cls=DjangoJSONEncoder
IssueSerializer(current_instance).data, cls=DjangoJSONEncoder ),
),
},
) )
return super().perform_destroy(instance) return super().perform_destroy(instance)
@ -190,16 +186,12 @@ class IssueViewSet(BaseViewSet):
# Track the issue # Track the issue
issue_activity.delay( issue_activity.delay(
{ type="issue.activity.created",
"type": "issue.activity.created", requested_data=json.dumps(self.request.data, cls=DjangoJSONEncoder),
"requested_data": json.dumps( actor_id=str(request.user.id),
self.request.data, cls=DjangoJSONEncoder issue_id=str(serializer.data.get("id", None)),
), project_id=str(project_id),
"actor_id": str(request.user.id), current_instance=None,
"issue_id": str(serializer.data.get("id", None)),
"project_id": str(project_id),
"current_instance": None,
},
) )
return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@ -331,14 +323,12 @@ class IssueCommentViewSet(BaseViewSet):
actor=self.request.user if self.request.user is not None else None, actor=self.request.user if self.request.user is not None else None,
) )
issue_activity.delay( issue_activity.delay(
{ type="comment.activity.created",
"type": "comment.activity.created", requested_data=json.dumps(serializer.data, cls=DjangoJSONEncoder),
"requested_data": json.dumps(serializer.data, cls=DjangoJSONEncoder), actor_id=str(self.request.user.id),
"actor_id": str(self.request.user.id), issue_id=str(self.kwargs.get("issue_id")),
"issue_id": str(self.kwargs.get("issue_id")), project_id=str(self.kwargs.get("project_id")),
"project_id": str(self.kwargs.get("project_id")), current_instance=None,
"current_instance": None,
},
) )
def perform_update(self, serializer): def perform_update(self, serializer):
@ -348,17 +338,15 @@ class IssueCommentViewSet(BaseViewSet):
) )
if current_instance is not None: if current_instance is not None:
issue_activity.delay( issue_activity.delay(
{ type="comment.activity.updated",
"type": "comment.activity.updated", requested_data=requested_data,
"requested_data": requested_data, actor_id=str(self.request.user.id),
"actor_id": str(self.request.user.id), issue_id=str(self.kwargs.get("issue_id", None)),
"issue_id": str(self.kwargs.get("issue_id", None)), project_id=str(self.kwargs.get("project_id", None)),
"project_id": str(self.kwargs.get("project_id", None)), current_instance=json.dumps(
"current_instance": json.dumps( IssueCommentSerializer(current_instance).data,
IssueCommentSerializer(current_instance).data, cls=DjangoJSONEncoder,
cls=DjangoJSONEncoder, ),
),
},
) )
return super().perform_update(serializer) return super().perform_update(serializer)
@ -369,19 +357,17 @@ class IssueCommentViewSet(BaseViewSet):
) )
if current_instance is not None: if current_instance is not None:
issue_activity.delay( issue_activity.delay(
{ type="comment.activity.deleted",
"type": "comment.activity.deleted", requested_data=json.dumps(
"requested_data": json.dumps( {"comment_id": str(self.kwargs.get("pk", None))}
{"comment_id": str(self.kwargs.get("pk", None))} ),
), actor_id=str(self.request.user.id),
"actor_id": str(self.request.user.id), issue_id=str(self.kwargs.get("issue_id", None)),
"issue_id": str(self.kwargs.get("issue_id", None)), project_id=str(self.kwargs.get("project_id", None)),
"project_id": str(self.kwargs.get("project_id", None)), current_instance=json.dumps(
"current_instance": json.dumps( IssueCommentSerializer(current_instance).data,
IssueCommentSerializer(current_instance).data, cls=DjangoJSONEncoder,
cls=DjangoJSONEncoder, ),
),
},
) )
return super().perform_destroy(instance) return super().perform_destroy(instance)

View File

@ -286,21 +286,19 @@ class ModuleIssueViewSet(BaseViewSet):
# Capture Issue Activity # Capture Issue Activity
issue_activity.delay( issue_activity.delay(
{ type="issue.activity.updated",
"type": "issue.activity", requested_data=json.dumps({"modules_list": issues}),
"requested_data": json.dumps({"modules_list": issues}), actor_id=str(self.request.user.id),
"actor_id": str(self.request.user.id), issue_id=str(self.kwargs.get("pk", None)),
"issue_id": str(self.kwargs.get("pk", None)), project_id=str(self.kwargs.get("project_id", None)),
"project_id": str(self.kwargs.get("project_id", None)), current_instance=json.dumps(
"current_instance": json.dumps( {
{ "updated_module_issues": update_module_issue_activity,
"updated_module_issues": update_module_issue_activity, "created_module_issues": serializers.serialize(
"created_module_issues": serializers.serialize( "json", record_to_create
"json", record_to_create ),
), }
} ),
),
},
) )
return Response( return Response(

View File

@ -4,14 +4,16 @@ from django.template.loader import render_to_string
from django.utils.html import strip_tags from django.utils.html import strip_tags
# Third party imports # Third party imports
from django_rq import job from celery import shared_task
from sentry_sdk import capture_exception from sentry_sdk import capture_exception
# Module imports # Module imports
from plane.db.models import User from plane.db.models import User
@job("default") @shared_task
def email_verification(first_name, email, token, current_site): def email_verification(first_name, email, token, current_site):
try: try:

View File

@ -4,14 +4,14 @@ from django.template.loader import render_to_string
from django.utils.html import strip_tags from django.utils.html import strip_tags
# Third party imports # Third party imports
from django_rq import job from celery import shared_task
from sentry_sdk import capture_exception from sentry_sdk import capture_exception
# Module imports # Module imports
from plane.db.models import User from plane.db.models import User
@job("default") @shared_task
def forgot_password(first_name, email, uidb64, token, current_site): def forgot_password(first_name, email, uidb64, token, current_site):
try: try:

View File

@ -11,7 +11,7 @@ from django.core.serializers.json import DjangoJSONEncoder
from django.contrib.auth.hashers import make_password from django.contrib.auth.hashers import make_password
# Third Party imports # Third Party imports
from django_rq import job from celery import shared_task
from sentry_sdk import capture_exception from sentry_sdk import capture_exception
# Module imports # Module imports
@ -29,7 +29,7 @@ from plane.db.models import (
from .workspace_invitation_task import workspace_invitation from .workspace_invitation_task import workspace_invitation
@job("default") @shared_task
def service_importer(service, importer_id): def service_importer(service, importer_id):
try: try:
importer = Importer.objects.get(pk=importer_id) importer = Importer.objects.get(pk=importer_id)

View File

@ -7,7 +7,7 @@ from django.conf import settings
from django.core.serializers.json import DjangoJSONEncoder from django.core.serializers.json import DjangoJSONEncoder
# Third Party imports # Third Party imports
from django_rq import job from celery import shared_task
from sentry_sdk import capture_exception from sentry_sdk import capture_exception
# Module imports # Module imports
@ -737,27 +737,14 @@ def delete_comment_activity(
# Receive message from room group # Receive message from room group
@job("default") @shared_task
def issue_activity(event): def issue_activity(
type, requested_data, current_instance, issue_id, actor_id, project_id
):
try: try:
issue_activities = [] issue_activities = []
type = event.get("type")
requested_data = (
json.loads(event.get("requested_data"))
if event.get("current_instance") is not None
else None
)
current_instance = (
json.loads(event.get("current_instance"))
if event.get("current_instance") is not None
else None
)
issue_id = event.get("issue_id", None)
actor_id = event.get("actor_id")
project_id = event.get("project_id")
actor = User.objects.get(pk=actor_id) actor = User.objects.get(pk=actor_id)
project = Project.objects.get(pk=project_id) project = Project.objects.get(pk=project_id)
ACTIVITY_MAPPER = { ACTIVITY_MAPPER = {

View File

@ -4,13 +4,12 @@ from django.template.loader import render_to_string
from django.utils.html import strip_tags from django.utils.html import strip_tags
# Third party imports # Third party imports
from django_rq import job from celery import shared_task
from sentry_sdk import capture_exception from sentry_sdk import capture_exception
@job("default") @shared_task
def magic_link(email, key, token, current_site): def magic_link(email, key, token, current_site):
try: try:
realtivelink = f"/magic-sign-in/?password={token}&key={key}" realtivelink = f"/magic-sign-in/?password={token}&key={key}"
abs_url = "http://" + current_site + realtivelink abs_url = "http://" + current_site + realtivelink

View File

@ -4,18 +4,16 @@ from django.template.loader import render_to_string
from django.utils.html import strip_tags from django.utils.html import strip_tags
# Third party imports # Third party imports
from django_rq import job from celery import shared_task
from sentry_sdk import capture_exception from sentry_sdk import capture_exception
# Module imports # Module imports
from plane.db.models import Project, User, ProjectMemberInvite from plane.db.models import Project, User, ProjectMemberInvite
@job("default") @shared_task
def project_invitation(email, project_id, token, current_site): def project_invitation(email, project_id, token, current_site):
try: try:
project = Project.objects.get(pk=project_id) project = Project.objects.get(pk=project_id)
project_member_invite = ProjectMemberInvite.objects.get( project_member_invite = ProjectMemberInvite.objects.get(
token=token, email=email token=token, email=email
@ -35,7 +33,9 @@ def project_invitation(email, project_id, token, current_site):
"invitation_url": abs_url, "invitation_url": abs_url,
} }
html_content = render_to_string("emails/invitations/project_invitation.html", context) html_content = render_to_string(
"emails/invitations/project_invitation.html", context
)
text_content = strip_tags(html_content) text_content = strip_tags(html_content)

View File

@ -5,7 +5,7 @@ from django.utils.html import strip_tags
from django.conf import settings from django.conf import settings
# Third party imports # Third party imports
from django_rq import job from celery import shared_task
from sentry_sdk import capture_exception from sentry_sdk import capture_exception
from slack_sdk import WebClient from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError from slack_sdk.errors import SlackApiError
@ -14,7 +14,7 @@ from slack_sdk.errors import SlackApiError
from plane.db.models import Workspace, User, WorkspaceMemberInvite from plane.db.models import Workspace, User, WorkspaceMemberInvite
@job("default") @shared_task
def workspace_invitation(email, workspace_id, token, current_site, invitor): def workspace_invitation(email, workspace_id, token, current_site, invitor):
try: try:
workspace = Workspace.objects.get(pk=workspace_id) workspace = Workspace.objects.get(pk=workspace_id)

View File

@ -0,0 +1,3 @@
from .celery import app as celery_app
__all__ = ('celery_app',)

View File

@ -0,0 +1,15 @@
import os
from celery import Celery
from django.conf import settings
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "plane.settings.production")
app = Celery("plane")
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

View File

@ -35,7 +35,6 @@ INSTALLED_APPS = [
"rest_framework_simplejwt.token_blacklist", "rest_framework_simplejwt.token_blacklist",
"corsheaders", "corsheaders",
"taggit", "taggit",
"django_rq",
] ]
MIDDLEWARE = [ MIDDLEWARE = [

View File

@ -59,16 +59,8 @@ if os.environ.get("SENTRY_DSN", False):
REDIS_HOST = "localhost" REDIS_HOST = "localhost"
REDIS_PORT = 6379 REDIS_PORT = 6379
REDIS_URL = False REDIS_URL = os.environ.get("REDIS_URL")
RQ_QUEUES = {
"default": {
"HOST": "localhost",
"PORT": 6379,
"DB": 0,
"DEFAULT_TIMEOUT": 360,
},
}
MEDIA_URL = "/uploads/" MEDIA_URL = "/uploads/"
MEDIA_ROOT = os.path.join(BASE_DIR, "uploads") MEDIA_ROOT = os.path.join(BASE_DIR, "uploads")
@ -88,3 +80,6 @@ GPT_ENGINE = os.environ.get("GPT_ENGINE", "text-davinci-003")
SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN", False) SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN", False)
LOGGER_BASE_URL = os.environ.get("LOGGER_BASE_URL", False) LOGGER_BASE_URL = os.environ.get("LOGGER_BASE_URL", False)
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_BROKER_URL = os.environ.get("REDIS_URL")

View File

@ -236,3 +236,6 @@ GPT_ENGINE = os.environ.get("GPT_ENGINE", "text-davinci-003")
SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN", False) SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN", False)
LOGGER_BASE_URL = os.environ.get("LOGGER_BASE_URL", False) LOGGER_BASE_URL = os.environ.get("LOGGER_BASE_URL", False)
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_BROKER_URL = os.environ.get("REDIS_URL")

View File

@ -1,23 +1,23 @@
import os
import redis import redis
from django.conf import settings from django.conf import settings
from urllib.parse import urlparse from urllib.parse import urlparse
def redis_instance(): def redis_instance():
# Run in local redis url is false # connect to redis
if not settings.REDIS_URL: if settings.DOCKERIZED or os.environ.get(
ri = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) "DJANGO_SETTINGS_MODULE", "plane.settings.local"
):
ri = redis.Redis.from_url(settings.REDIS_URL, db=0)
else: else:
# Run in prod redis url is true check with dockerized value url = urlparse(settings.REDIS_URL)
if settings.DOCKERIZED: ri = redis.Redis(
ri = redis.from_url(settings.REDIS_URL, db=0) host=url.hostname,
else: port=url.port,
url = urlparse(settings.REDIS_URL) password=url.password,
ri = redis.Redis( ssl=True,
host=url.hostname, ssl_cert_reqs=None,
port=url.port, )
password=url.password,
ssl=True, return ri
ssl_cert_reqs=None,
)
return ri

View File

@ -197,3 +197,6 @@ GPT_ENGINE = os.environ.get("GPT_ENGINE", "text-davinci-003")
SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN", False) SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN", False)
LOGGER_BASE_URL = os.environ.get("LOGGER_BASE_URL", False) LOGGER_BASE_URL = os.environ.get("LOGGER_BASE_URL", False)
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_BROKER_URL = os.environ.get("REDIS_URL")

View File

@ -23,9 +23,9 @@ django-guardian==2.4.0
dj_rest_auth==2.2.5 dj_rest_auth==2.2.5
google-auth==2.16.0 google-auth==2.16.0
google-api-python-client==2.75.0 google-api-python-client==2.75.0
django-rq==2.6.0
django-redis==5.2.0 django-redis==5.2.0
uvicorn==0.20.0 uvicorn==0.20.0
channels==4.0.0 channels==4.0.0
openai==0.27.2 openai==0.27.2
slack-sdk==3.20.2 slack-sdk==3.20.2
celery==5.2.7

View File

@ -21,7 +21,7 @@ services:
POSTGRES_USER: plane POSTGRES_USER: plane
POSTGRES_DB: plane POSTGRES_DB: plane
POSTGRES_PASSWORD: xyzzyspoon POSTGRES_PASSWORD: xyzzyspoon
PGDATA : /var/lib/postgresql/data PGDATA: /var/lib/postgresql/data
command: postgres -c 'max_connections=1000' command: postgres -c 'max_connections=1000'
ports: ports:
- 5432:5432 - 5432:5432
@ -39,7 +39,7 @@ services:
context: . context: .
dockerfile: ./apps/app/Dockerfile.web dockerfile: ./apps/app/Dockerfile.web
restart: always restart: always
command: node apps/app/server.js command: node apps/app/server.js
env_file: env_file:
- ./apps/app/.env - ./apps/app/.env
ports: ports:
@ -62,7 +62,7 @@ services:
- db:db - db:db
- redis:redis - redis:redis
plane-worker: plane-worker:
container_name: planerqworker container_name: planebgworker
build: build:
context: ./apiserver context: ./apiserver
dockerfile: Dockerfile.api dockerfile: Dockerfile.api