From ec818a55234f122984d093ef694e88b26ee8fa67 Mon Sep 17 00:00:00 2001 From: pablohashescobar <118773738+pablohashescobar@users.noreply.github.com> Date: Thu, 6 Apr 2023 22:56:36 +0530 Subject: [PATCH] refactor: move all background task from rqworker to celery (#668) * refactor: move all background task from rqworker to celery * dev: update background job to take input in parameters rather than a single dict * dev: update procfile for new worker * dev: docker updates for new celery worker --- apiserver/Procfile | 2 +- apiserver/bin/worker | 2 +- apiserver/plane/api/views/cycle.py | 28 ++--- apiserver/plane/api/views/issue.py | 114 ++++++++---------- apiserver/plane/api/views/module.py | 28 ++--- apiserver/plane/bgtasks/celery.py | 0 .../plane/bgtasks/email_verification_task.py | 6 +- .../plane/bgtasks/forgot_password_task.py | 4 +- apiserver/plane/bgtasks/importer_task.py | 4 +- .../plane/bgtasks/issue_activites_task.py | 23 +--- .../plane/bgtasks/magic_link_code_task.py | 5 +- .../plane/bgtasks/project_invitation_task.py | 10 +- .../bgtasks/workspace_invitation_task.py | 4 +- apiserver/plane/settings/__init__.py | 3 + apiserver/plane/settings/celery.py | 15 +++ apiserver/plane/settings/common.py | 1 - apiserver/plane/settings/local.py | 13 +- apiserver/plane/settings/production.py | 3 + apiserver/plane/settings/redis.py | 34 +++--- apiserver/plane/settings/staging.py | 3 + apiserver/requirements/base.txt | 4 +- docker-compose.yml | 6 +- 22 files changed, 150 insertions(+), 162 deletions(-) delete mode 100644 apiserver/plane/bgtasks/celery.py create mode 100644 apiserver/plane/settings/celery.py diff --git a/apiserver/Procfile b/apiserver/Procfile index 35f6e9aa8..6cf9ed25b 100644 --- a/apiserver/Procfile +++ b/apiserver/Procfile @@ -1,2 +1,2 @@ web: gunicorn -w 4 -k uvicorn.workers.UvicornWorker plane.asgi:application --bind 0.0.0.0:$PORT --config gunicorn.config.py --max-requests 10000 --max-requests-jitter 1000 --access-logfile - -worker: python manage.py rqworker \ No newline at end of file +worker: celery -A plane.settings.celery worker -l info \ No newline at end of file diff --git a/apiserver/bin/worker b/apiserver/bin/worker index 25a947613..281236246 100755 --- a/apiserver/bin/worker +++ b/apiserver/bin/worker @@ -3,4 +3,4 @@ set -e python manage.py wait_for_db python manage.py migrate -python manage.py rqworker \ No newline at end of file +celery -A plane.settings.celery worker -l info \ No newline at end of file diff --git a/apiserver/plane/api/views/cycle.py b/apiserver/plane/api/views/cycle.py index 73edb7d1e..4139c63d9 100644 --- a/apiserver/plane/api/views/cycle.py +++ b/apiserver/plane/api/views/cycle.py @@ -317,21 +317,19 @@ class CycleIssueViewSet(BaseViewSet): # Capture Issue Activity issue_activity.delay( - { - "type": "issue.activity", - "requested_data": json.dumps({"cycles_list": issues}), - "actor_id": str(self.request.user.id), - "issue_id": str(self.kwargs.get("pk", None)), - "project_id": str(self.kwargs.get("project_id", None)), - "current_instance": json.dumps( - { - "updated_cycle_issues": update_cycle_issue_activity, - "created_cycle_issues": serializers.serialize( - "json", record_to_create - ), - } - ), - }, + type="issue.activity.updated", + requested_data=json.dumps({"cycles_list": issues}), + actor_id=str(self.request.user.id), + issue_id=str(self.kwargs.get("pk", None)), + project_id=str(self.kwargs.get("project_id", None)), + current_instance=json.dumps( + { + "updated_cycle_issues": update_cycle_issue_activity, + "created_cycle_issues": serializers.serialize( + "json", record_to_create + ), + } + ), ) # Return all Cycle Issues diff --git a/apiserver/plane/api/views/issue.py b/apiserver/plane/api/views/issue.py index dfe3b51cc..938633ab4 100644 --- a/apiserver/plane/api/views/issue.py +++ b/apiserver/plane/api/views/issue.py @@ -85,16 +85,14 @@ class IssueViewSet(BaseViewSet): ) if current_instance is not None: issue_activity.delay( - { - "type": "issue.activity.updated", - "requested_data": requested_data, - "actor_id": str(self.request.user.id), - "issue_id": str(self.kwargs.get("pk", None)), - "project_id": str(self.kwargs.get("project_id", None)), - "current_instance": json.dumps( - IssueSerializer(current_instance).data, cls=DjangoJSONEncoder - ), - }, + type="issue.activity.updated", + requested_data=requested_data, + actor_id=str(self.request.user.id), + issue_id=str(self.kwargs.get("pk", None)), + project_id=str(self.kwargs.get("project_id", None)), + current_instance=json.dumps( + IssueSerializer(current_instance).data, cls=DjangoJSONEncoder + ), ) return super().perform_update(serializer) @@ -105,18 +103,16 @@ class IssueViewSet(BaseViewSet): ) if current_instance is not None: issue_activity.delay( - { - "type": "issue.activity.deleted", - "requested_data": json.dumps( - {"issue_id": str(self.kwargs.get("pk", None))} - ), - "actor_id": str(self.request.user.id), - "issue_id": str(self.kwargs.get("pk", None)), - "project_id": str(self.kwargs.get("project_id", None)), - "current_instance": json.dumps( - IssueSerializer(current_instance).data, cls=DjangoJSONEncoder - ), - }, + type="issue.activity.deleted", + requested_data=json.dumps( + {"issue_id": str(self.kwargs.get("pk", None))} + ), + actor_id=str(self.request.user.id), + issue_id=str(self.kwargs.get("pk", None)), + project_id=str(self.kwargs.get("project_id", None)), + current_instance=json.dumps( + IssueSerializer(current_instance).data, cls=DjangoJSONEncoder + ), ) return super().perform_destroy(instance) @@ -190,16 +186,12 @@ class IssueViewSet(BaseViewSet): # Track the issue issue_activity.delay( - { - "type": "issue.activity.created", - "requested_data": json.dumps( - self.request.data, cls=DjangoJSONEncoder - ), - "actor_id": str(request.user.id), - "issue_id": str(serializer.data.get("id", None)), - "project_id": str(project_id), - "current_instance": None, - }, + type="issue.activity.created", + requested_data=json.dumps(self.request.data, cls=DjangoJSONEncoder), + actor_id=str(request.user.id), + issue_id=str(serializer.data.get("id", None)), + project_id=str(project_id), + current_instance=None, ) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @@ -331,14 +323,12 @@ class IssueCommentViewSet(BaseViewSet): actor=self.request.user if self.request.user is not None else None, ) issue_activity.delay( - { - "type": "comment.activity.created", - "requested_data": json.dumps(serializer.data, cls=DjangoJSONEncoder), - "actor_id": str(self.request.user.id), - "issue_id": str(self.kwargs.get("issue_id")), - "project_id": str(self.kwargs.get("project_id")), - "current_instance": None, - }, + type="comment.activity.created", + requested_data=json.dumps(serializer.data, cls=DjangoJSONEncoder), + actor_id=str(self.request.user.id), + issue_id=str(self.kwargs.get("issue_id")), + project_id=str(self.kwargs.get("project_id")), + current_instance=None, ) def perform_update(self, serializer): @@ -348,17 +338,15 @@ class IssueCommentViewSet(BaseViewSet): ) if current_instance is not None: issue_activity.delay( - { - "type": "comment.activity.updated", - "requested_data": requested_data, - "actor_id": str(self.request.user.id), - "issue_id": str(self.kwargs.get("issue_id", None)), - "project_id": str(self.kwargs.get("project_id", None)), - "current_instance": json.dumps( - IssueCommentSerializer(current_instance).data, - cls=DjangoJSONEncoder, - ), - }, + type="comment.activity.updated", + requested_data=requested_data, + actor_id=str(self.request.user.id), + issue_id=str(self.kwargs.get("issue_id", None)), + project_id=str(self.kwargs.get("project_id", None)), + current_instance=json.dumps( + IssueCommentSerializer(current_instance).data, + cls=DjangoJSONEncoder, + ), ) return super().perform_update(serializer) @@ -369,19 +357,17 @@ class IssueCommentViewSet(BaseViewSet): ) if current_instance is not None: issue_activity.delay( - { - "type": "comment.activity.deleted", - "requested_data": json.dumps( - {"comment_id": str(self.kwargs.get("pk", None))} - ), - "actor_id": str(self.request.user.id), - "issue_id": str(self.kwargs.get("issue_id", None)), - "project_id": str(self.kwargs.get("project_id", None)), - "current_instance": json.dumps( - IssueCommentSerializer(current_instance).data, - cls=DjangoJSONEncoder, - ), - }, + type="comment.activity.deleted", + requested_data=json.dumps( + {"comment_id": str(self.kwargs.get("pk", None))} + ), + actor_id=str(self.request.user.id), + issue_id=str(self.kwargs.get("issue_id", None)), + project_id=str(self.kwargs.get("project_id", None)), + current_instance=json.dumps( + IssueCommentSerializer(current_instance).data, + cls=DjangoJSONEncoder, + ), ) return super().perform_destroy(instance) diff --git a/apiserver/plane/api/views/module.py b/apiserver/plane/api/views/module.py index 3cdb54f70..ec393f47a 100644 --- a/apiserver/plane/api/views/module.py +++ b/apiserver/plane/api/views/module.py @@ -286,21 +286,19 @@ class ModuleIssueViewSet(BaseViewSet): # Capture Issue Activity issue_activity.delay( - { - "type": "issue.activity", - "requested_data": json.dumps({"modules_list": issues}), - "actor_id": str(self.request.user.id), - "issue_id": str(self.kwargs.get("pk", None)), - "project_id": str(self.kwargs.get("project_id", None)), - "current_instance": json.dumps( - { - "updated_module_issues": update_module_issue_activity, - "created_module_issues": serializers.serialize( - "json", record_to_create - ), - } - ), - }, + type="issue.activity.updated", + requested_data=json.dumps({"modules_list": issues}), + actor_id=str(self.request.user.id), + issue_id=str(self.kwargs.get("pk", None)), + project_id=str(self.kwargs.get("project_id", None)), + current_instance=json.dumps( + { + "updated_module_issues": update_module_issue_activity, + "created_module_issues": serializers.serialize( + "json", record_to_create + ), + } + ), ) return Response( diff --git a/apiserver/plane/bgtasks/celery.py b/apiserver/plane/bgtasks/celery.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/apiserver/plane/bgtasks/email_verification_task.py b/apiserver/plane/bgtasks/email_verification_task.py index cf233c531..ee4680e53 100644 --- a/apiserver/plane/bgtasks/email_verification_task.py +++ b/apiserver/plane/bgtasks/email_verification_task.py @@ -4,14 +4,16 @@ from django.template.loader import render_to_string from django.utils.html import strip_tags # Third party imports -from django_rq import job +from celery import shared_task + + from sentry_sdk import capture_exception # Module imports from plane.db.models import User -@job("default") +@shared_task def email_verification(first_name, email, token, current_site): try: diff --git a/apiserver/plane/bgtasks/forgot_password_task.py b/apiserver/plane/bgtasks/forgot_password_task.py index 7d169e8cf..4598e5f2f 100644 --- a/apiserver/plane/bgtasks/forgot_password_task.py +++ b/apiserver/plane/bgtasks/forgot_password_task.py @@ -4,14 +4,14 @@ from django.template.loader import render_to_string from django.utils.html import strip_tags # Third party imports -from django_rq import job +from celery import shared_task from sentry_sdk import capture_exception # Module imports from plane.db.models import User -@job("default") +@shared_task def forgot_password(first_name, email, uidb64, token, current_site): try: diff --git a/apiserver/plane/bgtasks/importer_task.py b/apiserver/plane/bgtasks/importer_task.py index efdee3a17..fba43f6e4 100644 --- a/apiserver/plane/bgtasks/importer_task.py +++ b/apiserver/plane/bgtasks/importer_task.py @@ -11,7 +11,7 @@ from django.core.serializers.json import DjangoJSONEncoder from django.contrib.auth.hashers import make_password # Third Party imports -from django_rq import job +from celery import shared_task from sentry_sdk import capture_exception # Module imports @@ -29,7 +29,7 @@ from plane.db.models import ( from .workspace_invitation_task import workspace_invitation -@job("default") +@shared_task def service_importer(service, importer_id): try: importer = Importer.objects.get(pk=importer_id) diff --git a/apiserver/plane/bgtasks/issue_activites_task.py b/apiserver/plane/bgtasks/issue_activites_task.py index efc7f196e..809b09226 100644 --- a/apiserver/plane/bgtasks/issue_activites_task.py +++ b/apiserver/plane/bgtasks/issue_activites_task.py @@ -7,7 +7,7 @@ from django.conf import settings from django.core.serializers.json import DjangoJSONEncoder # Third Party imports -from django_rq import job +from celery import shared_task from sentry_sdk import capture_exception # Module imports @@ -737,27 +737,14 @@ def delete_comment_activity( # Receive message from room group -@job("default") -def issue_activity(event): +@shared_task +def issue_activity( + type, requested_data, current_instance, issue_id, actor_id, project_id +): try: issue_activities = [] - type = event.get("type") - requested_data = ( - json.loads(event.get("requested_data")) - if event.get("current_instance") is not None - else None - ) - current_instance = ( - json.loads(event.get("current_instance")) - if event.get("current_instance") is not None - else None - ) - issue_id = event.get("issue_id", None) - actor_id = event.get("actor_id") - project_id = event.get("project_id") actor = User.objects.get(pk=actor_id) - project = Project.objects.get(pk=project_id) ACTIVITY_MAPPER = { diff --git a/apiserver/plane/bgtasks/magic_link_code_task.py b/apiserver/plane/bgtasks/magic_link_code_task.py index 89239e87d..89554dcca 100644 --- a/apiserver/plane/bgtasks/magic_link_code_task.py +++ b/apiserver/plane/bgtasks/magic_link_code_task.py @@ -4,13 +4,12 @@ from django.template.loader import render_to_string from django.utils.html import strip_tags # Third party imports -from django_rq import job +from celery import shared_task from sentry_sdk import capture_exception -@job("default") +@shared_task def magic_link(email, key, token, current_site): - try: realtivelink = f"/magic-sign-in/?password={token}&key={key}" abs_url = "http://" + current_site + realtivelink diff --git a/apiserver/plane/bgtasks/project_invitation_task.py b/apiserver/plane/bgtasks/project_invitation_task.py index 681438851..18e539970 100644 --- a/apiserver/plane/bgtasks/project_invitation_task.py +++ b/apiserver/plane/bgtasks/project_invitation_task.py @@ -4,18 +4,16 @@ from django.template.loader import render_to_string from django.utils.html import strip_tags # Third party imports -from django_rq import job +from celery import shared_task from sentry_sdk import capture_exception # Module imports from plane.db.models import Project, User, ProjectMemberInvite -@job("default") +@shared_task def project_invitation(email, project_id, token, current_site): - try: - project = Project.objects.get(pk=project_id) project_member_invite = ProjectMemberInvite.objects.get( token=token, email=email @@ -35,7 +33,9 @@ def project_invitation(email, project_id, token, current_site): "invitation_url": abs_url, } - html_content = render_to_string("emails/invitations/project_invitation.html", context) + html_content = render_to_string( + "emails/invitations/project_invitation.html", context + ) text_content = strip_tags(html_content) diff --git a/apiserver/plane/bgtasks/workspace_invitation_task.py b/apiserver/plane/bgtasks/workspace_invitation_task.py index 0ed807171..c6e69689b 100644 --- a/apiserver/plane/bgtasks/workspace_invitation_task.py +++ b/apiserver/plane/bgtasks/workspace_invitation_task.py @@ -5,7 +5,7 @@ from django.utils.html import strip_tags from django.conf import settings # Third party imports -from django_rq import job +from celery import shared_task from sentry_sdk import capture_exception from slack_sdk import WebClient from slack_sdk.errors import SlackApiError @@ -14,7 +14,7 @@ from slack_sdk.errors import SlackApiError from plane.db.models import Workspace, User, WorkspaceMemberInvite -@job("default") +@shared_task def workspace_invitation(email, workspace_id, token, current_site, invitor): try: workspace = Workspace.objects.get(pk=workspace_id) diff --git a/apiserver/plane/settings/__init__.py b/apiserver/plane/settings/__init__.py index e69de29bb..fb989c4e6 100644 --- a/apiserver/plane/settings/__init__.py +++ b/apiserver/plane/settings/__init__.py @@ -0,0 +1,3 @@ +from .celery import app as celery_app + +__all__ = ('celery_app',) diff --git a/apiserver/plane/settings/celery.py b/apiserver/plane/settings/celery.py new file mode 100644 index 000000000..a01333797 --- /dev/null +++ b/apiserver/plane/settings/celery.py @@ -0,0 +1,15 @@ +import os +from celery import Celery +from django.conf import settings + +# Set the default Django settings module for the 'celery' program. +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "plane.settings.production") + +app = Celery("plane") + +# Using a string here means the worker will not have to +# pickle the object when using Windows. +app.config_from_object("django.conf:settings", namespace="CELERY") + +# Load task modules from all registered Django app configs. +app.autodiscover_tasks() diff --git a/apiserver/plane/settings/common.py b/apiserver/plane/settings/common.py index 73c3c4be5..aa61fab56 100644 --- a/apiserver/plane/settings/common.py +++ b/apiserver/plane/settings/common.py @@ -35,7 +35,6 @@ INSTALLED_APPS = [ "rest_framework_simplejwt.token_blacklist", "corsheaders", "taggit", - "django_rq", ] MIDDLEWARE = [ diff --git a/apiserver/plane/settings/local.py b/apiserver/plane/settings/local.py index bf161568b..c3bf65588 100644 --- a/apiserver/plane/settings/local.py +++ b/apiserver/plane/settings/local.py @@ -59,16 +59,8 @@ if os.environ.get("SENTRY_DSN", False): REDIS_HOST = "localhost" REDIS_PORT = 6379 -REDIS_URL = False +REDIS_URL = os.environ.get("REDIS_URL") -RQ_QUEUES = { - "default": { - "HOST": "localhost", - "PORT": 6379, - "DB": 0, - "DEFAULT_TIMEOUT": 360, - }, -} MEDIA_URL = "/uploads/" MEDIA_ROOT = os.path.join(BASE_DIR, "uploads") @@ -88,3 +80,6 @@ GPT_ENGINE = os.environ.get("GPT_ENGINE", "text-davinci-003") SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN", False) LOGGER_BASE_URL = os.environ.get("LOGGER_BASE_URL", False) + +CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL") +CELERY_BROKER_URL = os.environ.get("REDIS_URL") diff --git a/apiserver/plane/settings/production.py b/apiserver/plane/settings/production.py index 5569e1c09..05fdf6485 100644 --- a/apiserver/plane/settings/production.py +++ b/apiserver/plane/settings/production.py @@ -236,3 +236,6 @@ GPT_ENGINE = os.environ.get("GPT_ENGINE", "text-davinci-003") SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN", False) LOGGER_BASE_URL = os.environ.get("LOGGER_BASE_URL", False) + +CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL") +CELERY_BROKER_URL = os.environ.get("REDIS_URL") \ No newline at end of file diff --git a/apiserver/plane/settings/redis.py b/apiserver/plane/settings/redis.py index b32cf8c80..085f9fea4 100644 --- a/apiserver/plane/settings/redis.py +++ b/apiserver/plane/settings/redis.py @@ -1,23 +1,23 @@ +import os import redis from django.conf import settings from urllib.parse import urlparse + def redis_instance(): - # Run in local redis url is false - if not settings.REDIS_URL: - ri = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) + # connect to redis + if settings.DOCKERIZED or os.environ.get( + "DJANGO_SETTINGS_MODULE", "plane.settings.local" + ): + ri = redis.Redis.from_url(settings.REDIS_URL, db=0) else: - # Run in prod redis url is true check with dockerized value - if settings.DOCKERIZED: - ri = redis.from_url(settings.REDIS_URL, db=0) - else: - url = urlparse(settings.REDIS_URL) - ri = redis.Redis( - host=url.hostname, - port=url.port, - password=url.password, - ssl=True, - ssl_cert_reqs=None, - ) - - return ri \ No newline at end of file + url = urlparse(settings.REDIS_URL) + ri = redis.Redis( + host=url.hostname, + port=url.port, + password=url.password, + ssl=True, + ssl_cert_reqs=None, + ) + + return ri diff --git a/apiserver/plane/settings/staging.py b/apiserver/plane/settings/staging.py index 9015ce03f..1de2cfcbe 100644 --- a/apiserver/plane/settings/staging.py +++ b/apiserver/plane/settings/staging.py @@ -197,3 +197,6 @@ GPT_ENGINE = os.environ.get("GPT_ENGINE", "text-davinci-003") SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN", False) LOGGER_BASE_URL = os.environ.get("LOGGER_BASE_URL", False) + +CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL") +CELERY_BROKER_URL = os.environ.get("REDIS_URL") diff --git a/apiserver/requirements/base.txt b/apiserver/requirements/base.txt index a1e6c0b71..e3e58450c 100644 --- a/apiserver/requirements/base.txt +++ b/apiserver/requirements/base.txt @@ -23,9 +23,9 @@ django-guardian==2.4.0 dj_rest_auth==2.2.5 google-auth==2.16.0 google-api-python-client==2.75.0 -django-rq==2.6.0 django-redis==5.2.0 uvicorn==0.20.0 channels==4.0.0 openai==0.27.2 -slack-sdk==3.20.2 \ No newline at end of file +slack-sdk==3.20.2 +celery==5.2.7 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 2fe1a945f..8d05e03cd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,7 +21,7 @@ services: POSTGRES_USER: plane POSTGRES_DB: plane POSTGRES_PASSWORD: xyzzyspoon - PGDATA : /var/lib/postgresql/data + PGDATA: /var/lib/postgresql/data command: postgres -c 'max_connections=1000' ports: - 5432:5432 @@ -39,7 +39,7 @@ services: context: . dockerfile: ./apps/app/Dockerfile.web restart: always - command: node apps/app/server.js + command: node apps/app/server.js env_file: - ./apps/app/.env ports: @@ -62,7 +62,7 @@ services: - db:db - redis:redis plane-worker: - container_name: planerqworker + container_name: planebgworker build: context: ./apiserver dockerfile: Dockerfile.api