dev: segway intgegration (#3132)

* feat: implemented rabbitmq

* dev: initialize segway with queue setup

* dev: import refactors

* dev: create communication with the segway server

* dev: create new workers

* dev: create celery node queue for consuming messages from django

* dev: node to celery connection

* dev: setup segway and django connection

* dev: refactor the structure and add database integration to the app

* dev: add external id and source added

---------

Co-authored-by: NarayanBavisetti <narayan3119@gmail.com>
This commit is contained in:
Nikhil 2023-12-20 19:24:22 +05:30 committed by GitHub
parent a04ad4c4e2
commit 1cc18a0915
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 3331 additions and 230 deletions

3
.gitignore vendored
View File

@ -80,3 +80,6 @@ tmp/
## packages ## packages
dist dist
.temp/ .temp/
# logs
combined.log

View File

@ -1,3 +1,3 @@
web: gunicorn -w 4 -k uvicorn.workers.UvicornWorker plane.asgi:application --bind 0.0.0.0:$PORT --max-requests 10000 --max-requests-jitter 1000 --access-logfile - web: gunicorn -w 4 -k uvicorn.workers.UvicornWorker plane.asgi:application --bind 0.0.0.0:$PORT --max-requests 10000 --max-requests-jitter 1000 --access-logfile -
worker: celery -A plane worker -l info worker: celery -A plane worker -l info -Q internal_tasks,external_tasks
beat: celery -A plane beat -l INFO beat: celery -A plane beat -l INFO

View File

@ -281,20 +281,22 @@ class CycleAPIEndpoint(WebhookMixin, BaseAPIView):
) )
cycle = Cycle.objects.get(workspace__slug=slug, project_id=project_id, pk=pk) cycle = Cycle.objects.get(workspace__slug=slug, project_id=project_id, pk=pk)
issue_activity.delay( issue_activity.apply_async(
type="cycle.activity.deleted", args=[],
requested_data=json.dumps( kwargs={
{ 'type': "cycle.activity.deleted",
'requested_data': json.dumps({
"cycle_id": str(pk), "cycle_id": str(pk),
"cycle_name": str(cycle.name), "cycle_name": str(cycle.name),
"issues": [str(issue_id) for issue_id in cycle_issues], "issues": [str(issue_id) for issue_id in cycle_issues],
} }),
), 'actor_id': str(request.user.id),
actor_id=str(request.user.id), 'issue_id': None,
issue_id=None, 'project_id': str(project_id),
project_id=str(project_id), 'current_instance': None,
current_instance=None, 'epoch': int(timezone.now().timestamp()),
epoch=int(timezone.now().timestamp()), },
routing_key='external',
) )
# Delete the cycle # Delete the cycle
cycle.delete() cycle.delete()
@ -454,21 +456,21 @@ class CycleIssueAPIEndpoint(WebhookMixin, BaseAPIView):
) )
# Capture Issue Activity # Capture Issue Activity
issue_activity.delay( issue_activity.apply_async(
type="cycle.activity.created", args=[],
requested_data=json.dumps({"cycles_list": str(issues)}), kwargs={
actor_id=str(self.request.user.id), 'type': "cycle.activity.created",
issue_id=None, 'requested_data': json.dumps({"cycles_list": str(issues)}),
project_id=str(self.kwargs.get("project_id", None)), 'actor_id': str(self.request.user.id),
current_instance=json.dumps( 'issue_id': None,
{ 'project_id': str(self.kwargs.get("project_id", None)),
'current_instance': json.dumps({
"updated_cycle_issues": update_cycle_issue_activity, "updated_cycle_issues": update_cycle_issue_activity,
"created_cycle_issues": serializers.serialize( "created_cycle_issues": serializers.serialize("json", record_to_create),
"json", record_to_create }),
), 'epoch': int(timezone.now().timestamp()),
} },
), routing_key='external',
epoch=int(timezone.now().timestamp()),
) )
# Return all Cycle Issues # Return all Cycle Issues
@ -483,19 +485,21 @@ class CycleIssueAPIEndpoint(WebhookMixin, BaseAPIView):
) )
issue_id = cycle_issue.issue_id issue_id = cycle_issue.issue_id
cycle_issue.delete() cycle_issue.delete()
issue_activity.delay( issue_activity.apply_async(
type="cycle.activity.deleted", args=[],
requested_data=json.dumps( kwargs={
{ 'type': "cycle.activity.deleted",
'requested_data': json.dumps({
"cycle_id": str(self.kwargs.get("cycle_id")), "cycle_id": str(self.kwargs.get("cycle_id")),
"issues": [str(issue_id)], "issues": [str(issue_id)],
} }),
), 'actor_id': str(self.request.user.id),
actor_id=str(self.request.user.id), 'issue_id': str(issue_id),
issue_id=str(issue_id), 'project_id': str(self.kwargs.get("project_id", None)),
project_id=str(self.kwargs.get("project_id", None)), 'current_instance': None,
current_instance=None, 'epoch': int(timezone.now().timestamp()),
epoch=int(timezone.now().timestamp()), },
routing_key='external',
) )
return Response(status=status.HTTP_204_NO_CONTENT) return Response(status=status.HTTP_204_NO_CONTENT)

View File

@ -142,14 +142,18 @@ class InboxIssueAPIEndpoint(BaseAPIView):
) )
# Create an Issue Activity # Create an Issue Activity
issue_activity.delay( issue_activity.apply_async(
type="issue.activity.created", args=[], # If no positional arguments are required
requested_data=json.dumps(request.data, cls=DjangoJSONEncoder), kwargs={
actor_id=str(request.user.id), "type": "issue.activity.created",
issue_id=str(issue.id), "requested_data": json.dumps(request.data, cls=DjangoJSONEncoder),
project_id=str(project_id), "actor_id": str(request.user.id),
current_instance=None, "issue_id": str(issue.id),
epoch=int(timezone.now().timestamp()), "project_id": str(project_id),
"current_instance": None,
"epoch": int(timezone.now().timestamp()),
},
routing_key="external",
) )
# create an inbox issue # create an inbox issue
@ -232,17 +236,21 @@ class InboxIssueAPIEndpoint(BaseAPIView):
# Log all the updates # Log all the updates
requested_data = json.dumps(issue_data, cls=DjangoJSONEncoder) requested_data = json.dumps(issue_data, cls=DjangoJSONEncoder)
if issue is not None: if issue is not None:
issue_activity.delay( issue_activity.apply_async(
type="issue.activity.updated", args=[],
requested_data=requested_data, kwargs={
actor_id=str(request.user.id), "type": "issue.activity.updated",
issue_id=str(issue_id), "requested_data": requested_data,
project_id=str(project_id), "actor_id": str(request.user.id),
current_instance=json.dumps( "issue_id": str(issue_id),
"project_id": str(project_id),
"current_instance": json.dumps(
IssueSerializer(current_instance).data, IssueSerializer(current_instance).data,
cls=DjangoJSONEncoder, cls=DjangoJSONEncoder,
), ),
epoch=int(timezone.now().timestamp()), "epoch": int(timezone.now().timestamp()),
},
routing_key="external",
) )
issue_serializer.save() issue_serializer.save()
else: else:

View File

@ -207,14 +207,18 @@ class IssueAPIEndpoint(WebhookMixin, BaseAPIView):
serializer.save() serializer.save()
# Track the issue # Track the issue
issue_activity.delay( issue_activity.apply_async(
type="issue.activity.created", args=[], # If no positional arguments are required
requested_data=json.dumps(self.request.data, cls=DjangoJSONEncoder), kwargs={
actor_id=str(request.user.id), 'type': "issue.activity.created",
issue_id=str(serializer.data.get("id", None)), 'requested_data': json.dumps(self.request.data, cls=DjangoJSONEncoder),
project_id=str(project_id), 'actor_id': str(request.user.id),
current_instance=None, 'issue_id': str(serializer.data.get("id", None)),
epoch=int(timezone.now().timestamp()), 'project_id': str(project_id),
'current_instance': None,
'epoch': int(timezone.now().timestamp()),
},
routing_key='external',
) )
return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@ -228,14 +232,18 @@ class IssueAPIEndpoint(WebhookMixin, BaseAPIView):
serializer = IssueSerializer(issue, data=request.data, partial=True) serializer = IssueSerializer(issue, data=request.data, partial=True)
if serializer.is_valid(): if serializer.is_valid():
serializer.save() serializer.save()
issue_activity.delay( issue_activity.apply_async(
type="issue.activity.updated", args=[],
requested_data=requested_data, kwargs={
actor_id=str(request.user.id), 'type': "issue.activity.updated",
issue_id=str(pk), 'requested_data': requested_data,
project_id=str(project_id), 'actor_id': str(request.user.id),
current_instance=current_instance, 'issue_id': str(pk),
epoch=int(timezone.now().timestamp()), 'project_id': str(project_id),
'current_instance': current_instance,
'epoch': int(timezone.now().timestamp()),
},
routing_key='external',
) )
return Response(serializer.data, status=status.HTTP_200_OK) return Response(serializer.data, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@ -246,14 +254,19 @@ class IssueAPIEndpoint(WebhookMixin, BaseAPIView):
IssueSerializer(issue).data, cls=DjangoJSONEncoder IssueSerializer(issue).data, cls=DjangoJSONEncoder
) )
issue.delete() issue.delete()
issue_activity.delay( issue_activity.apply_async(
type="issue.activity.deleted", args=[],
requested_data=json.dumps({"issue_id": str(pk)}), kwargs={
actor_id=str(request.user.id), 'type': "issue.activity.deleted",
issue_id=str(pk), 'requested_data': json.dumps({"issue_id": str(pk)}),
project_id=str(project_id), 'actor_id': str(request.user.id),
current_instance=current_instance, 'issue_id': str(pk),
epoch=int(timezone.now().timestamp()), 'project_id': str(project_id),
'current_instance': current_instance,
'epoch': int(timezone.now().timestamp()),
},
routing_key='your_routing_key',
queue='your_queue_name'
) )
return Response(status=status.HTTP_204_NO_CONTENT) return Response(status=status.HTTP_204_NO_CONTENT)
@ -309,7 +322,11 @@ class LabelAPIEndpoint(BaseAPIView):
).data, ).data,
) )
label = self.get_queryset().get(pk=pk) label = self.get_queryset().get(pk=pk)
serializer = LabelSerializer(label, fields=self.fields, expand=self.expand,) serializer = LabelSerializer(
label,
fields=self.fields,
expand=self.expand,
)
return Response(serializer.data, status=status.HTTP_200_OK) return Response(serializer.data, status=status.HTTP_200_OK)
def patch(self, request, slug, project_id, pk=None): def patch(self, request, slug, project_id, pk=None):
@ -320,7 +337,6 @@ class LabelAPIEndpoint(BaseAPIView):
return Response(serializer.data, status=status.HTTP_200_OK) return Response(serializer.data, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def delete(self, request, slug, project_id, pk=None): def delete(self, request, slug, project_id, pk=None):
label = self.get_queryset().get(pk=pk) label = self.get_queryset().get(pk=pk)
label.delete() label.delete()
@ -384,14 +400,18 @@ class IssueLinkAPIEndpoint(BaseAPIView):
project_id=project_id, project_id=project_id,
issue_id=issue_id, issue_id=issue_id,
) )
issue_activity.delay( issue_activity.apply_async(
type="link.activity.created", args=[], # If no positional arguments are required
requested_data=json.dumps(serializer.data, cls=DjangoJSONEncoder), kwargs={
actor_id=str(self.request.user.id), 'type': "link.activity.created",
issue_id=str(self.kwargs.get("issue_id")), 'requested_data': json.dumps(serializer.data, cls=DjangoJSONEncoder),
project_id=str(self.kwargs.get("project_id")), 'actor_id': str(self.request.user.id),
current_instance=None, 'issue_id': str(self.kwargs.get("issue_id")),
epoch=int(timezone.now().timestamp()), 'project_id': str(self.kwargs.get("project_id")),
'current_instance': None,
'epoch': int(timezone.now().timestamp()),
},
routing_key='external',
) )
return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@ -408,14 +428,18 @@ class IssueLinkAPIEndpoint(BaseAPIView):
serializer = IssueLinkSerializer(issue_link, data=request.data, partial=True) serializer = IssueLinkSerializer(issue_link, data=request.data, partial=True)
if serializer.is_valid(): if serializer.is_valid():
serializer.save() serializer.save()
issue_activity.delay( issue_activity.apply_async(
type="link.activity.updated", args=[], # If no positional arguments are required
requested_data=requested_data, kwargs={
actor_id=str(request.user.id), 'type': "link.activity.updated",
issue_id=str(issue_id), 'requested_data': requested_data,
project_id=str(project_id), 'actor_id': str(request.user.id),
current_instance=current_instance, 'issue_id': str(issue_id),
epoch=int(timezone.now().timestamp()), 'project_id': str(project_id),
'current_instance': current_instance,
'epoch': int(timezone.now().timestamp()),
},
routing_key='external',
) )
return Response(serializer.data, status=status.HTTP_200_OK) return Response(serializer.data, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@ -428,14 +452,18 @@ class IssueLinkAPIEndpoint(BaseAPIView):
IssueLinkSerializer(issue_link).data, IssueLinkSerializer(issue_link).data,
cls=DjangoJSONEncoder, cls=DjangoJSONEncoder,
) )
issue_activity.delay( issue_activity.apply_async(
type="link.activity.deleted", args=[], # If no positional arguments are required
requested_data=json.dumps({"link_id": str(pk)}), kwargs={
actor_id=str(request.user.id), 'type': "link.activity.deleted",
issue_id=str(issue_id), 'requested_data': json.dumps({"link_id": str(pk)}),
project_id=str(project_id), 'actor_id': str(request.user.id),
current_instance=current_instance, 'issue_id': str(issue_id),
epoch=int(timezone.now().timestamp()), 'project_id': str(project_id),
'current_instance': current_instance,
'epoch': int(timezone.now().timestamp()),
},
routing_key='external',
) )
issue_link.delete() issue_link.delete()
return Response(status=status.HTTP_204_NO_CONTENT) return Response(status=status.HTTP_204_NO_CONTENT)
@ -507,14 +535,20 @@ class IssueCommentAPIEndpoint(WebhookMixin, BaseAPIView):
issue_id=issue_id, issue_id=issue_id,
actor=request.user, actor=request.user,
) )
issue_activity.delay( issue_activity.apply_async(
type="comment.activity.created", args=[],
requested_data=json.dumps(serializer.data, cls=DjangoJSONEncoder), kwargs={
actor_id=str(self.request.user.id), "type": "comment.activity.created",
issue_id=str(self.kwargs.get("issue_id")), "requested_data": json.dumps(
project_id=str(self.kwargs.get("project_id")), serializer.data, cls=DjangoJSONEncoder
current_instance=None, ),
epoch=int(timezone.now().timestamp()), "actor_id": str(self.request.user.id),
"issue_id": str(self.kwargs.get("issue_id")),
"project_id": str(self.kwargs.get("project_id")),
"current_instance": None,
"epoch": int(timezone.now().timestamp()),
},
routing_key="external",
) )
return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@ -533,14 +567,18 @@ class IssueCommentAPIEndpoint(WebhookMixin, BaseAPIView):
) )
if serializer.is_valid(): if serializer.is_valid():
serializer.save() serializer.save()
issue_activity.delay( issue_activity.apply_async(
type="comment.activity.updated", args=[],
requested_data=requested_data, kwargs={
actor_id=str(request.user.id), "type": "comment.activity.updated",
issue_id=str(issue_id), "requested_data": requested_data,
project_id=str(project_id), "actor_id": str(request.user.id),
current_instance=current_instance, "issue_id": str(issue_id),
epoch=int(timezone.now().timestamp()), "project_id": str(project_id),
"current_instance": current_instance,
"epoch": int(timezone.now().timestamp()),
},
routing_key="external",
) )
return Response(serializer.data, status=status.HTTP_200_OK) return Response(serializer.data, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@ -554,14 +592,18 @@ class IssueCommentAPIEndpoint(WebhookMixin, BaseAPIView):
cls=DjangoJSONEncoder, cls=DjangoJSONEncoder,
) )
issue_comment.delete() issue_comment.delete()
issue_activity.delay( issue_activity.apply_async(
type="comment.activity.deleted", args=[], # If no positional arguments are required
requested_data=json.dumps({"comment_id": str(pk)}), kwargs={
actor_id=str(request.user.id), "type": "comment.activity.deleted",
issue_id=str(issue_id), "requested_data": json.dumps({"comment_id": str(pk)}),
project_id=str(project_id), "actor_id": str(request.user.id),
current_instance=current_instance, "issue_id": str(issue_id),
epoch=int(timezone.now().timestamp()), "project_id": str(project_id),
"current_instance": current_instance,
"epoch": int(timezone.now().timestamp()),
},
routing_key="external",
) )
return Response(status=status.HTTP_204_NO_CONTENT) return Response(status=status.HTTP_204_NO_CONTENT)

View File

@ -166,20 +166,22 @@ class ModuleAPIEndpoint(WebhookMixin, BaseAPIView):
module_issues = list( module_issues = list(
ModuleIssue.objects.filter(module_id=pk).values_list("issue", flat=True) ModuleIssue.objects.filter(module_id=pk).values_list("issue", flat=True)
) )
issue_activity.delay( issue_activity.apply_async(
type="module.activity.deleted", args=[],
requested_data=json.dumps( kwargs={
{ 'type': "module.activity.deleted",
'requested_data': json.dumps({
"module_id": str(pk), "module_id": str(pk),
"module_name": str(module.name), "module_name": str(module.name),
"issues": [str(issue_id) for issue_id in module_issues], "issues": [str(issue_id) for issue_id in module_issues],
} }),
), 'actor_id': str(request.user.id),
actor_id=str(request.user.id), 'issue_id': None,
issue_id=None, 'project_id': str(project_id),
project_id=str(project_id), 'current_instance': None,
current_instance=None, 'epoch': int(timezone.now().timestamp()),
epoch=int(timezone.now().timestamp()), },
routing_key='external',
) )
module.delete() module.delete()
return Response(status=status.HTTP_204_NO_CONTENT) return Response(status=status.HTTP_204_NO_CONTENT)
@ -330,21 +332,21 @@ class ModuleIssueAPIEndpoint(WebhookMixin, BaseAPIView):
) )
# Capture Issue Activity # Capture Issue Activity
issue_activity.delay( issue_activity.apply_async(
type="module.activity.created", args=[],
requested_data=json.dumps({"modules_list": str(issues)}), kwargs={
actor_id=str(self.request.user.id), 'type': "module.activity.created",
issue_id=None, 'requested_data': json.dumps({"modules_list": str(issues)}),
project_id=str(self.kwargs.get("project_id", None)), 'actor_id': str(self.request.user.id),
current_instance=json.dumps( 'issue_id': None,
{ 'project_id': str(self.kwargs.get("project_id", None)),
'current_instance': json.dumps({
"updated_module_issues": update_module_issue_activity, "updated_module_issues": update_module_issue_activity,
"created_module_issues": serializers.serialize( "created_module_issues": serializers.serialize("json", record_to_create),
"json", record_to_create }),
), 'epoch': int(timezone.now().timestamp()),
} },
), routing_key='external',
epoch=int(timezone.now().timestamp()),
) )
return Response( return Response(
@ -357,18 +359,20 @@ class ModuleIssueAPIEndpoint(WebhookMixin, BaseAPIView):
workspace__slug=slug, project_id=project_id, module_id=module_id, issue_id=issue_id workspace__slug=slug, project_id=project_id, module_id=module_id, issue_id=issue_id
) )
module_issue.delete() module_issue.delete()
issue_activity.delay( issue_activity.apply_async(
type="module.activity.deleted", args=[], # If no positional arguments are required
requested_data=json.dumps( kwargs={
{ 'type': "module.activity.deleted",
'requested_data': json.dumps({
"module_id": str(module_id), "module_id": str(module_id),
"issues": [str(module_issue.issue_id)], "issues": [str(module_issue.issue_id)],
} }),
), 'actor_id': str(request.user.id),
actor_id=str(request.user.id), 'issue_id': str(issue_id),
issue_id=str(issue_id), 'project_id': str(project_id),
project_id=str(project_id), 'current_instance': None,
current_instance=None, 'epoch': int(timezone.now().timestamp()),
epoch=int(timezone.now().timestamp()), },
routing_key='external',
) )
return Response(status=status.HTTP_204_NO_CONTENT) return Response(status=status.HTTP_204_NO_CONTENT)

View File

@ -42,7 +42,6 @@ from plane.app.permissions import WorkSpaceAdminPermission
class ServiceIssueImportSummaryEndpoint(BaseAPIView): class ServiceIssueImportSummaryEndpoint(BaseAPIView):
def get(self, request, slug, service): def get(self, request, slug, service):
if service == "github": if service == "github":
owner = request.GET.get("owner", False) owner = request.GET.get("owner", False)
@ -122,6 +121,7 @@ class ImportServiceEndpoint(BaseAPIView):
permission_classes = [ permission_classes = [
WorkSpaceAdminPermission, WorkSpaceAdminPermission,
] ]
def post(self, request, slug, service): def post(self, request, slug, service):
project_id = request.data.get("project_id", False) project_id = request.data.get("project_id", False)
@ -202,7 +202,11 @@ class ImportServiceEndpoint(BaseAPIView):
updated_by=request.user, updated_by=request.user,
) )
service_importer.delay(service, importer.id) service_importer.apply_async(
args=[],
kwargs={"service": service, "importer_id": importer.id},
routing_key="internal",
)
serializer = ImporterSerializer(importer) serializer = ImporterSerializer(importer)
return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.data, status=status.HTTP_201_CREATED)
@ -221,9 +225,7 @@ class ImportServiceEndpoint(BaseAPIView):
return Response(serializer.data) return Response(serializer.data)
def delete(self, request, slug, service, pk): def delete(self, request, slug, service, pk):
importer = Importer.objects.get( importer = Importer.objects.get(pk=pk, service=service, workspace__slug=slug)
pk=pk, service=service, workspace__slug=slug
)
if importer.imported_data is not None: if importer.imported_data is not None:
# Delete all imported Issues # Delete all imported Issues
@ -241,9 +243,7 @@ class ImportServiceEndpoint(BaseAPIView):
return Response(status=status.HTTP_204_NO_CONTENT) return Response(status=status.HTTP_204_NO_CONTENT)
def patch(self, request, slug, service, pk): def patch(self, request, slug, service, pk):
importer = Importer.objects.get( importer = Importer.objects.get(pk=pk, service=service, workspace__slug=slug)
pk=pk, service=service, workspace__slug=slug
)
serializer = ImporterSerializer(importer, data=request.data, partial=True) serializer = ImporterSerializer(importer, data=request.data, partial=True)
if serializer.is_valid(): if serializer.is_valid():
serializer.save() serializer.save()
@ -479,9 +479,7 @@ class BulkImportModulesEndpoint(BaseAPIView):
[ [
ModuleLink( ModuleLink(
module=module, module=module,
url=module_data.get("link", {}).get( url=module_data.get("link", {}).get("url", "https://plane.so"),
"url", "https://plane.so"
),
title=module_data.get("link", {}).get( title=module_data.get("link", {}).get(
"title", "Original Issue" "title", "Original Issue"
), ),

View File

@ -0,0 +1 @@
from .issue_sync_task import issue_sync

View File

@ -373,7 +373,7 @@ def generate_non_segmented_rows(
return [tuple(row_zero)] + rows return [tuple(row_zero)] + rows
@shared_task @shared_task(queue='internal_tasks')
def analytic_export_task(email, data, slug): def analytic_export_task(email, data, slug):
try: try:
filters = issue_filters(data, "POST") filters = issue_filters(data, "POST")

View File

@ -29,7 +29,7 @@ def posthogConfiguration():
return None, None return None, None
@shared_task @shared_task(queue='internal_tasks')
def auth_events(user, email, user_agent, ip, event_name, medium, first_time): def auth_events(user, email, user_agent, ip, event_name, medium, first_time):
try: try:
POSTHOG_API_KEY, POSTHOG_HOST = posthogConfiguration() POSTHOG_API_KEY, POSTHOG_HOST = posthogConfiguration()
@ -54,7 +54,7 @@ def auth_events(user, email, user_agent, ip, event_name, medium, first_time):
capture_exception(e) capture_exception(e)
@shared_task @shared_task(queue='internal_tasks')
def workspace_invite_event(user, email, user_agent, ip, event_name, accepted_from): def workspace_invite_event(user, email, user_agent, ip, event_name, accepted_from):
try: try:
POSTHOG_API_KEY, POSTHOG_HOST = posthogConfiguration() POSTHOG_API_KEY, POSTHOG_HOST = posthogConfiguration()

View File

@ -259,7 +259,7 @@ def generate_xlsx(header, project_id, issues, files):
files.append((f"{project_id}.xlsx", xlsx_file)) files.append((f"{project_id}.xlsx", xlsx_file))
@shared_task @shared_task(queue='internal_tasks')
def issue_export_task(provider, workspace_id, project_ids, token_id, multiple, slug): def issue_export_task(provider, workspace_id, project_ids, token_id, multiple, slug):
try: try:
exporter_instance = ExporterHistory.objects.get(token=token_id) exporter_instance = ExporterHistory.objects.get(token=token_id)

View File

@ -15,7 +15,7 @@ from botocore.client import Config
from plane.db.models import ExporterHistory from plane.db.models import ExporterHistory
@shared_task @shared_task(queue='internal_tasks')
def delete_old_s3_link(): def delete_old_s3_link():
# Get a list of keys and IDs to process # Get a list of keys and IDs to process
expired_exporter_history = ExporterHistory.objects.filter( expired_exporter_history = ExporterHistory.objects.filter(

View File

@ -12,7 +12,7 @@ from celery import shared_task
from plane.db.models import FileAsset from plane.db.models import FileAsset
@shared_task @shared_task(queue='internal_tasks')
def delete_file_asset(): def delete_file_asset():
# file assets to delete # file assets to delete

View File

@ -17,7 +17,7 @@ from sentry_sdk import capture_exception
from plane.license.utils.instance_value import get_email_configuration from plane.license.utils.instance_value import get_email_configuration
@shared_task @shared_task(queue='internal_tasks')
def forgot_password(first_name, email, uidb64, token, current_site): def forgot_password(first_name, email, uidb64, token, current_site):
try: try:
relative_link = ( relative_link = (

View File

@ -28,7 +28,7 @@ from plane.db.models import (
from plane.bgtasks.user_welcome_task import send_welcome_slack from plane.bgtasks.user_welcome_task import send_welcome_slack
@shared_task @shared_task(queue="internal_tasks")
def service_importer(service, importer_id): def service_importer(service, importer_id):
try: try:
importer = Importer.objects.get(pk=importer_id) importer = Importer.objects.get(pk=importer_id)
@ -176,20 +176,24 @@ def service_importer(service, importer_id):
project_id=importer.project_id, project_id=importer.project_id,
) )
if settings.PROXY_BASE_URL: import_data = ImporterSerializer(importer).data
headers = {"Content-Type": "application/json"}
import_data_json = json.dumps( import_data_json = json.dumps(import_data, cls=DjangoJSONEncoder)
ImporterSerializer(importer).data,
cls=DjangoJSONEncoder, if settings.SEGWAY_BASE_URL:
) headers = {
_ = requests.post( "Content-Type": "application/json",
f"{settings.PROXY_BASE_URL}/hooks/workspaces/{str(importer.workspace_id)}/projects/{str(importer.project_id)}/importers/{str(service)}/", "x-api-key": settings.SEGWAY_KEY,
json=import_data_json, }
res = requests.post(
f"{settings.SEGWAY_BASE_URL}/api/jira",
data=import_data_json,
headers=headers, headers=headers,
) )
print(res.json())
return return
except Exception as e: except Exception as e:
print(e)
importer = Importer.objects.get(pk=importer_id) importer = Importer.objects.get(pk=importer_id)
importer.status = "failed" importer.status = "failed"
importer.save() importer.save()

View File

@ -1460,7 +1460,7 @@ def delete_draft_issue_activity(
# Receive message from room group # Receive message from room group
@shared_task @shared_task(queue='internal_tasks')
def issue_activity( def issue_activity(
type, type,
requested_data, requested_data,

View File

@ -16,7 +16,7 @@ from plane.db.models import Issue, Project, State
from plane.bgtasks.issue_activites_task import issue_activity from plane.bgtasks.issue_activites_task import issue_activity
@shared_task @shared_task(queue='internal_tasks')
def archive_and_close_old_issues(): def archive_and_close_old_issues():
archive_old_issues() archive_old_issues()
close_old_issues() close_old_issues()

View File

@ -0,0 +1,5 @@
from celery import shared_task
@shared_task(queue="segway_tasks")
def issue_sync(data):
print(f"Received data from Segway: {data}")

View File

@ -17,7 +17,7 @@ from sentry_sdk import capture_exception
from plane.license.utils.instance_value import get_email_configuration from plane.license.utils.instance_value import get_email_configuration
@shared_task @shared_task(queue='internal_tasks')
def magic_link(email, key, token, current_site): def magic_link(email, key, token, current_site):
try: try:
( (

View File

@ -183,7 +183,7 @@ def createMentionNotification(project, notification_comment, issue, actor_id, me
) )
@shared_task @shared_task(queue='internal_tasks')
def notifications(type, issue_id, project_id, actor_id, subscriber, issue_activities_created, requested_data, current_instance): def notifications(type, issue_id, project_id, actor_id, subscriber, issue_activities_created, requested_data, current_instance):
issue_activities_created = ( issue_activities_created = (
json.loads( json.loads(

View File

@ -15,7 +15,7 @@ from sentry_sdk import capture_exception
from plane.db.models import Project, User, ProjectMemberInvite from plane.db.models import Project, User, ProjectMemberInvite
from plane.license.utils.instance_value import get_email_configuration from plane.license.utils.instance_value import get_email_configuration
@shared_task @shared_task(queue='internal_tasks')
def project_invitation(email, project_id, token, current_site, invitor): def project_invitation(email, project_id, token, current_site, invitor):
try: try:
user = User.objects.get(email=invitor) user = User.objects.get(email=invitor)

View File

@ -11,7 +11,7 @@ from slack_sdk.errors import SlackApiError
from plane.db.models import User from plane.db.models import User
@shared_task @shared_task(queue='internal_tasks')
def send_welcome_slack(user_id, created, message): def send_welcome_slack(user_id, created, message):
try: try:
instance = User.objects.get(pk=user_id) instance = User.objects.get(pk=user_id)

View File

@ -71,6 +71,7 @@ def get_model_data(event, event_id, many=False):
retry_backoff=600, retry_backoff=600,
max_retries=5, max_retries=5,
retry_jitter=True, retry_jitter=True,
queue='internal_tasks'
) )
def webhook_task(self, webhook, slug, event, event_data, action): def webhook_task(self, webhook, slug, event, event_data, action):
try: try:
@ -161,7 +162,7 @@ def webhook_task(self, webhook, slug, event, event_data, action):
return return
@shared_task() @shared_task(queue='internal_tasks')
def send_webhook(event, payload, kw, action, slug, bulk): def send_webhook(event, payload, kw, action, slug, bulk):
try: try:
webhooks = Webhook.objects.filter(workspace__slug=slug, is_active=True) webhooks = Webhook.objects.filter(workspace__slug=slug, is_active=True)

View File

@ -20,7 +20,7 @@ from plane.db.models import Workspace, WorkspaceMemberInvite, User
from plane.license.utils.instance_value import get_email_configuration from plane.license.utils.instance_value import get_email_configuration
@shared_task @shared_task(queue='internal_tasks')
def workspace_invitation(email, workspace_id, token, current_site, invitor): def workspace_invitation(email, workspace_id, token, current_site, invitor):
try: try:
user = User.objects.get(email=invitor) user = User.objects.get(email=invitor)

View File

@ -8,7 +8,7 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "plane.settings.production")
ri = redis_instance() ri = redis_instance()
app = Celery("plane") app = Celery('tasks', broker='pyamqp://guest:guest@localhost:5672//')
# Using a string here means the worker will not have to # Using a string here means the worker will not have to
# pickle the object when using Windows. # pickle the object when using Windows.

View File

@ -18,6 +18,8 @@ class Cycle(ProjectBaseModel):
) )
view_props = models.JSONField(default=dict) view_props = models.JSONField(default=dict)
sort_order = models.FloatField(default=65535) sort_order = models.FloatField(default=65535)
external_source = models.CharField(null=True, blank=True)
external_id = models.CharField(max_length=255, blank=True, null=True)
class Meta: class Meta:
verbose_name = "Cycle" verbose_name = "Cycle"
@ -27,9 +29,9 @@ class Cycle(ProjectBaseModel):
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
if self._state.adding: if self._state.adding:
smallest_sort_order = Cycle.objects.filter( smallest_sort_order = Cycle.objects.filter(project=self.project).aggregate(
project=self.project smallest=models.Min("sort_order")
).aggregate(smallest=models.Min("sort_order"))["smallest"] )["smallest"]
if smallest_sort_order is not None: if smallest_sort_order is not None:
self.sort_order = smallest_sort_order - 10000 self.sort_order = smallest_sort_order - 10000

View File

@ -102,6 +102,8 @@ class Issue(ProjectBaseModel):
completed_at = models.DateTimeField(null=True) completed_at = models.DateTimeField(null=True)
archived_at = models.DateField(null=True) archived_at = models.DateField(null=True)
is_draft = models.BooleanField(default=False) is_draft = models.BooleanField(default=False)
external_source = models.CharField(null=True, blank=True)
external_id = models.CharField(max_length=255, blank=True, null=True)
objects = models.Manager() objects = models.Manager()
issue_objects = IssueManager() issue_objects = IssueManager()
@ -132,7 +134,6 @@ class Issue(ProjectBaseModel):
except ImportError: except ImportError:
pass pass
if self._state.adding: if self._state.adding:
# Get the maximum display_id value from the database # Get the maximum display_id value from the database
last_id = IssueSequence.objects.filter(project=self.project).aggregate( last_id = IssueSequence.objects.filter(project=self.project).aggregate(
@ -212,6 +213,7 @@ class IssueRelation(ProjectBaseModel):
def __str__(self): def __str__(self):
return f"{self.issue.name} {self.related_issue.name}" return f"{self.issue.name} {self.related_issue.name}"
class IssueMention(ProjectBaseModel): class IssueMention(ProjectBaseModel):
issue = models.ForeignKey( issue = models.ForeignKey(
Issue, on_delete=models.CASCADE, related_name="issue_mention" Issue, on_delete=models.CASCADE, related_name="issue_mention"
@ -221,6 +223,7 @@ class IssueMention(ProjectBaseModel):
on_delete=models.CASCADE, on_delete=models.CASCADE,
related_name="issue_mention", related_name="issue_mention",
) )
class Meta: class Meta:
unique_together = ["issue", "mention"] unique_together = ["issue", "mention"]
verbose_name = "Issue Mention" verbose_name = "Issue Mention"
@ -366,6 +369,8 @@ class IssueComment(ProjectBaseModel):
default="INTERNAL", default="INTERNAL",
max_length=100, max_length=100,
) )
external_source = models.CharField(null=True, blank=True)
external_id = models.CharField(max_length=255, blank=True, null=True)
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
self.comment_stripped = ( self.comment_stripped = (
@ -416,6 +421,8 @@ class Label(ProjectBaseModel):
description = models.TextField(blank=True) description = models.TextField(blank=True)
color = models.CharField(max_length=255, blank=True) color = models.CharField(max_length=255, blank=True)
sort_order = models.FloatField(default=65535) sort_order = models.FloatField(default=65535)
external_source = models.CharField(null=True, blank=True)
external_id = models.CharField(max_length=255, blank=True, null=True)
class Meta: class Meta:
unique_together = ["name", "project"] unique_together = ["name", "project"]

View File

@ -41,6 +41,8 @@ class Module(ProjectBaseModel):
) )
view_props = models.JSONField(default=dict) view_props = models.JSONField(default=dict)
sort_order = models.FloatField(default=65535) sort_order = models.FloatField(default=65535)
external_source = models.CharField(null=True, blank=True)
external_id = models.CharField(max_length=255, blank=True, null=True)
class Meta: class Meta:
unique_together = ["name", "project"] unique_together = ["name", "project"]

View File

@ -24,6 +24,8 @@ class State(ProjectBaseModel):
max_length=20, max_length=20,
) )
default = models.BooleanField(default=False) default = models.BooleanField(default=False)
external_source = models.CharField(null=True, blank=True)
external_id = models.CharField(max_length=255, blank=True, null=True)
def __str__(self): def __str__(self):
"""Return name of the state""" """Return name of the state"""

View File

@ -5,6 +5,7 @@ import ssl
import certifi import certifi
from datetime import timedelta from datetime import timedelta
from urllib.parse import urlparse from urllib.parse import urlparse
from kombu import Exchange, Queue
# Django imports # Django imports
from django.core.management.utils import get_random_secret_key from django.core.management.utils import get_random_secret_key
@ -148,6 +149,9 @@ else:
REDIS_URL = os.environ.get("REDIS_URL") REDIS_URL = os.environ.get("REDIS_URL")
REDIS_SSL = REDIS_URL and "rediss" in REDIS_URL REDIS_SSL = REDIS_URL and "rediss" in REDIS_URL
# RabbitMq Config
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
if REDIS_SSL: if REDIS_SSL:
CACHES = { CACHES = {
"default": { "default": {
@ -270,18 +274,28 @@ SIMPLE_JWT = {
# Celery Configuration # Celery Configuration
CELERY_TIMEZONE = TIME_ZONE CELERY_TIMEZONE = TIME_ZONE
CELERY_TASK_SERIALIZER = "json" CELERY_TASK_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ["application/json"] CELERY_ACCEPT_CONTENT = ["json"]
if REDIS_SSL: CELERY_BROKER_URL = RABBITMQ_URL
redis_url = os.environ.get("REDIS_URL") CELERY_RESULT_BACKEND = REDIS_URL
broker_url = (
f"{redis_url}?ssl_cert_reqs={ssl.CERT_NONE.name}&ssl_ca_certs={certifi.where()}" CELERY_QUEUES = (
) Queue(
CELERY_BROKER_URL = broker_url "internal_tasks",
CELERY_RESULT_BACKEND = broker_url Exchange("internal_exchange", type="direct"),
else: routing_key="internal",
CELERY_BROKER_URL = REDIS_URL ),
CELERY_RESULT_BACKEND = REDIS_URL Queue(
"external_tasks",
Exchange("external_exchange", type="direct"),
routing_key="external",
),
Queue(
"segway_tasks",
Exchange("segway_exchange", type="direct"),
routing_key="segway",
),
)
CELERY_IMPORTS = ( CELERY_IMPORTS = (
"plane.bgtasks.issue_automation_task", "plane.bgtasks.issue_automation_task",
@ -291,7 +305,9 @@ CELERY_IMPORTS = (
# Sentry Settings # Sentry Settings
# Enable Sentry Settings # Enable Sentry Settings
if bool(os.environ.get("SENTRY_DSN", False)) and os.environ.get("SENTRY_DSN").startswith("https://"): if bool(os.environ.get("SENTRY_DSN", False)) and os.environ.get(
"SENTRY_DSN"
).startswith("https://"):
sentry_sdk.init( sentry_sdk.init(
dsn=os.environ.get("SENTRY_DSN", ""), dsn=os.environ.get("SENTRY_DSN", ""),
integrations=[ integrations=[
@ -327,10 +343,9 @@ USE_MINIO = int(os.environ.get("USE_MINIO", 0)) == 1
POSTHOG_API_KEY = os.environ.get("POSTHOG_API_KEY", False) POSTHOG_API_KEY = os.environ.get("POSTHOG_API_KEY", False)
POSTHOG_HOST = os.environ.get("POSTHOG_HOST", False) POSTHOG_HOST = os.environ.get("POSTHOG_HOST", False)
# instance key
INSTANCE_KEY = os.environ.get(
"INSTANCE_KEY", "ae6517d563dfc13d8270bd45cf17b08f70b37d989128a9dab46ff687603333c3"
)
# Skip environment variable configuration # Skip environment variable configuration
SKIP_ENV_VAR = os.environ.get("SKIP_ENV_VAR", "1") == "1" SKIP_ENV_VAR = os.environ.get("SKIP_ENV_VAR", "1") == "1"
# Segway
SEGWAY_BASE_URL = os.environ.get("SEGWAY_BASE_URL", "http://localhost:9000")
SEGWAY_KEY = os.environ.get("SEGWAY_KEY", False)

View File

@ -0,0 +1,17 @@
{
"compilerOptions": {
"target": "es2016",
"module": "commonjs",
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
"strict": true,
"skipLibCheck": true,
"outDir": "./dist",
"baseUrl": "./src",
"moduleResolution": "node",
"experimentalDecorators": true,
"emitDecoratorMetadata": true
},
"include": ["src/**/*.ts"],
"exclude": ["node_modules"]
}

5
segway/.env.example Normal file
View File

@ -0,0 +1,5 @@
APP_ENV=local
SERVER_PORT=9000
DATABASE_URL=""
RABBITMQ_URL=""
SENTRY_DSN=""

20
segway/Dockerfile.segway Normal file
View File

@ -0,0 +1,20 @@
# Use the official Node.js 18-alpine image as the base image
FROM node:18-alpine
# Set the working directory inside the container
WORKDIR /usr/src/app
# Copy package.json and package-lock.json to the working directory
COPY package*.json ./
# Install dependencies
RUN npm install
# Copy the rest of the application code to the working directory
COPY . .
# Build the TypeScript code
RUN npm run build
# Expose the port that your application will run on
EXPOSE 9000

3
segway/README.md Normal file
View File

@ -0,0 +1,3 @@
# Plane Segway
A node process that take care of external integration with plane.

38
segway/package.json Normal file
View File

@ -0,0 +1,38 @@
{
"name": "segway",
"version": "0.0.1",
"description": "An integration service syncs plane data with external sources.",
"author": "plane team",
"license": "ISC",
"private": true,
"scripts": {
"build": "npx tsc",
"start": "node dist/start.js",
"dev": "concurrently \"npx tsc --watch\" \"nodemon -q dist/start.js\""
},
"dependencies": {
"@overnightjs/core": "^1.7.6",
"@sentry/node": "^7.73.0",
"@sentry/tracing": "^7.73.0",
"amqplib": "^0.10.3",
"cors": "^2.8.5",
"dotenv": "^16.3.1",
"drizzle-orm": "^0.29.1",
"express": "^4.18.2",
"postgres": "^3.4.1",
"uuid": "^9.0.1",
"winston": "^3.10.0",
"zod": "^3.22.4"
},
"devDependencies": {
"@types/amqplib": "^0.10.4",
"@types/cors": "^2.8.14",
"@types/express": "^4.17.18",
"@types/node": "^20.8.3",
"@types/pg": "^8.10.9",
"concurrently": "^8.2.1",
"drizzle-kit": "^0.20.6",
"nodemon": "^3.0.1",
"typescript": "^5.2.2"
}
}

View File

@ -0,0 +1,10 @@
import { Request, Response } from "express";
import { Controller, Get } from "@overnightjs/core";
@Controller("/api/github")
export class GithubController {
/**
* This controller houses all routes for the Github Importer/Integration
*/
}

View File

@ -0,0 +1,3 @@
export * from "./jira.controller";
export * from "./slack.controller"
export * from "./github.controller"

View File

@ -0,0 +1,50 @@
// overnight js
import { Request, Response } from "express";
import { Controller, Post, Middleware } from "@overnightjs/core";
import { PostgresJsDatabase } from "drizzle-orm/postgres-js";
// mq
import { MQSingleton } from "../mq/singleton";
// middleware
import AuthKeyMiddlware from "../middleware/authkey.middleware";
@Controller("api/jira")
export class JiraController {
/**
* This controller houses all routes for the Jira Importer
*/
// Initialize database and mq
db: PostgresJsDatabase;
mq: MQSingleton;
constructor(db: PostgresJsDatabase, mq: MQSingleton) {
this.db = db;
this.mq = mq;
}
@Post("")
@Middleware([AuthKeyMiddlware])
private home(req: Request, res: Response) {
try {
res.status(200).json({ message: "Hello, Plane Users" });
// Process Jira message
const body = {
args: [], // args
kwargs: {
data: {
type: "issue.create",
data: {
message: "Segway say's Hi",
},
},
}, // kwargs
other_data: {}, // other data
};
this.mq?.publish(body, "plane.bgtasks.issue_sync_task.issue_sync");
return;
} catch (error) {
return res.json({ message: "Server error" });
}
}
}

View File

@ -0,0 +1,10 @@
import { Request, Response } from "express";
import { Controller, Get } from "@overnightjs/core";
@Controller("/api/slack")
export class SlackController {
/**
* This controller houses all routes for the Slack Integration
*/
}

View File

@ -0,0 +1,33 @@
import { drizzle, PostgresJsDatabase } from "drizzle-orm/postgres-js";
import postgres from "postgres";
// logger
import { logger } from "../utils/logger"
export class DatabaseSingleton {
private static instance: DatabaseSingleton;
public db: PostgresJsDatabase | null = null;
private constructor() {
try {
// Ensure the DATABASE_URL is provided
if (!process.env.DATABASE_URL) {
throw new Error("DATABASE_URL environment variable is not set.");
}
const queryClient = postgres(process.env.DATABASE_URL);
this.db = drizzle(queryClient);
logger.info("🛢️ Connected to Database")
} catch (error) {
logger.error("Failed to initialize database connection:", error);
throw new Error("Could not connect to Database")
}
}
public static getInstance(): DatabaseSingleton {
if (!DatabaseSingleton.instance) {
DatabaseSingleton.instance = new DatabaseSingleton();
}
return DatabaseSingleton.instance;
}
}

View File

@ -0,0 +1,22 @@
import { RequestHandler } from "express";
import { logger } from "../utils/logger";
const AuthKeyMiddlware: RequestHandler = (req, res, next) => {
// Retrieve the API key from the request header
const apiKey = req.headers["x-api-key"];
// Define the expected API key
const expectedApiKey = process.env.SEGWAY_KEY;
// Check if the API key is present and matches the expected key
if (apiKey === expectedApiKey) {
// If the key is valid, proceed with the next middleware or route handler
next();
} else {
// If the key is invalid, log the error and send an appropriate response
logger.error("Invalid API key");
res.status(401).json({ message: "Invalid API key" });
}
};
export default AuthKeyMiddlware;

View File

@ -0,0 +1,8 @@
import { RequestHandler } from "express";
import { logger } from "../utils/logger";
const loggerMiddleware: RequestHandler = (req, res, next) => {
logger.info(`${req.method}: ${req.path}`);
next();
};
export default loggerMiddleware;

View File

@ -0,0 +1,99 @@
//uuid
import { v4 as uuidv4 } from "uuid"
// mq
import { Connection, Channel, connect, ConsumeMessage } from "amqplib";
// utils
import { logger } from "../utils/logger";
export class MQSingleton {
private static instance: MQSingleton;
private connection: Connection | null = null;
public channel: Channel | null = null;
private constructor() {}
// Get the current instance
public static getInstance(): MQSingleton {
if (!this.instance) {
this.instance = new MQSingleton();
}
return this.instance;
}
// Initialize instance
public async initialize(): Promise<void> {
if (!this.connection || !this.channel) {
await this.init();
}
}
private async init(): Promise<void> {
const rabbitMqUrl = process.env.RABBITMQ_URL || "";
try {
this.connection = await connect(rabbitMqUrl);
logger.info(`✅ Rabbit MQ Connection is ready`);
this.channel = await this.connection.createChannel();
logger.info(`🛸 Created RabbitMQ Channel successfully`);
} catch (error) {
console.error("Error in initializing RabbitMQ:", error);
}
}
// Send the message to the given queue
public async publish(body: object, taskName: string): Promise<void> {
// Check if the channel exists
if (!this.channel) {
throw new Error("Channel not initialized");
}
// Initialize the queue variables
const queue = "segway_tasks";
const exchange = "segway_exchange";
const routingKey = "segway";
// Create this message
const msg = {
contentType: "application/json",
contentEncoding: "utf-8",
headers: {
id: uuidv4(),
task: taskName,
},
body: JSON.stringify(body),
};
// Assert the queue
await this.channel.assertExchange(exchange, "direct", { durable: true });
await this.channel.assertQueue(queue, { durable: true });
await this.channel.bindQueue(queue, exchange, routingKey);
// Try publishing the message
try {
this.channel.publish(exchange, routingKey, Buffer.from(msg.body), {
contentType: msg.contentType,
contentEncoding: msg.contentEncoding,
headers: msg.headers
});
} catch (error) {
console.error("Error publishing message:", error);
}
}
// Receive the message from the given queue
public async consume(
queue: string,
callback: (msg: ConsumeMessage | null) => void
): Promise<void> {
if (!this.channel) {
throw new Error("Channel not initialized");
}
logger.info("👂 Listening for incoming events");
const exchange = "django_exchange";
const routingKey = "django.node";
await this.channel.assertExchange(exchange, "direct", { durable: true });
await this.channel.assertQueue(queue, { durable: true });
await this.channel.bindQueue(queue, exchange, routingKey);
await this.channel.consume(queue, callback, { noAck: true });
}
}

161
segway/src/server.ts Normal file
View File

@ -0,0 +1,161 @@
import dotenv from "dotenv";
import path from "path";
import express from "express";
import { Server } from "@overnightjs/core";
import cors from "cors";
import * as Sentry from "@sentry/node";
import * as Tracing from "@sentry/tracing";
import { PostgresJsDatabase } from "drizzle-orm/postgres-js";
// controllers
import * as controllers from "./controller";
// middlewares
import loggerMiddleware from "./middleware/logger.middleware";
// utils
import { logger } from "./utils/logger";
// db
import { DatabaseSingleton } from "./db/singleton";
// mq
import { MQSingleton } from "./mq/singleton";
class ApiServer extends Server {
private readonly SERVER_STARTED = "🚀 Api server started on port: ";
SERVER_PORT: number;
db: PostgresJsDatabase | null = null;
mq: MQSingleton | null = null; // Declare the channel property
constructor() {
super(true);
// disabling overnight logs
this.showLogs = false;
// enabling env variable from .env file
dotenv.config();
// assigning port
this.SERVER_PORT = process.env.SERVER_PORT
? parseInt(process.env.SERVER_PORT, 10)
: 8080;
// logger
this.app.use(loggerMiddleware);
// exposing public folder for static files.
this.app.use(express.static("public"));
// body parser
this.app.use(express.json());
this.app.use(express.urlencoded({ extended: true }));
// views engine
this.app.set("views", path.join(__dirname, "views"));
this.app.set("view engine", "hbs");
// cors
this.app.use(cors());
// setup mq
this.setupMQ();
// sentry setup
if (
process.env.APP_ENV === "staging" ||
process.env.APP_ENV === "production"
) {
// setting up error logging and tracing.
this.setupSentryInit();
}
// setting up db
this.setupDatabase();
// setting up controllers
this.setupControllers();
// not found page
this.setupNotFoundHandler();
// setting up sentry error handling
this.sentryErrorHandling();
}
// get the current app instance
public getAppInstance() {
return this.app;
}
// Setup the database
private setupDatabase(): void {
this.db = DatabaseSingleton.getInstance().db;
}
// Setup MQ and initialize channel
private setupMQ(): void {
this.mq = MQSingleton.getInstance();
this.startMQAndWorkers().catch((error) =>
logger.error("Error in startMQAndWorkers:", error)
);
}
// Start mq and workers
private async startMQAndWorkers(): Promise<void> {
try {
await this.mq?.initialize();
} catch (error) {
logger.error("Failed to initialize MQ:", error);
}
}
// setup all the controllers
private setupControllers(): void {
const controllerInstances = [];
for (const name in controllers) {
if (Object.prototype.hasOwnProperty.call(controllers, name)) {
const Controller = (controllers as any)[name];
controllerInstances.push(new Controller(this.db, this.mq));
}
}
super.addControllers(controllerInstances);
}
// This controller will return 404 for not found pages
private setupNotFoundHandler(): void {
this.app.use((req, res) => {
res.status(404).json({
status: "error",
message: "Not Found",
path: req.path,
});
});
}
private setupSentryInit() {
Sentry.init({
dsn: process.env.SENTRY_DSN,
integrations: [
// enable HTTP calls tracing
new Sentry.Integrations.Http({ tracing: true }),
// enable Express.js middleware tracing
new Tracing.Integrations.Express({ app: this.app }),
],
// Set tracesSampleRate to 1.0 to capture 100%
// of transactions for performance monitoring.
// We recommend adjusting this value in production
tracesSampleRate: 1.0,
});
// RequestHandler creates a separate execution context using domains, so that every
// transaction/span/breadcrumb is attached to its own Hub instance
this.app.use(Sentry.Handlers.requestHandler());
// TracingHandler creates a trace for every incoming request
this.app.use(Sentry.Handlers.tracingHandler());
}
private sentryErrorHandling() {
// The error handler must be before any other error middleware and after all controllers
this.app.use(Sentry.Handlers.errorHandler());
this.app.use(function onError(req, res: any) {
// The error id is attached to `res.sentry` to be returned
// and optionally displayed to the user for support.
res.statusCode = 500;
res.end(res.sentry + "\n");
});
}
public start(port: number): void {
this.app.listen(port, () => {
logger.info(this.SERVER_STARTED + port);
});
}
}
export default ApiServer;

5
segway/src/start.ts Normal file
View File

@ -0,0 +1,5 @@
import ApiServer from "./server";
const apiServer = new ApiServer();
// starting server
apiServer.start(apiServer.SERVER_PORT);

View File

@ -0,0 +1,8 @@
import winston from "winston";
export const logger = winston.createLogger({
transports: [
new winston.transports.Console(),
new winston.transports.File({ filename: "combined.log" }),
],
});

View File

@ -0,0 +1,42 @@
// mq
import { ConsumeMessage } from "amqplib";
// mq single ton
import { MQSingleton } from "../mq/singleton";
// logger
import { logger } from "../utils/logger";
export abstract class BaseWorker {
mq: MQSingleton | null = null;
protected routingKey: string;
constructor(
protected queueName: string,
routingKey: string
) {
this.mq = MQSingleton.getInstance();
this.routingKey = routingKey;
this.onMessage = this.onMessage.bind(this);
}
// Start the consumer
public async start(): Promise<void> {
try {
this.mq?.consume(this.queueName, this.onMessage);
} catch (error) {
logger.error("Error starting workers");
}
}
// Publish this to queue
protected async publish(body: object, taskName: string): Promise<void> {
try {
this.mq?.publish(body, taskName);
} catch (error) {
logger.error("Error sending to queue");
}
}
protected abstract onMessage(msg: ConsumeMessage | null): void;
}

9
segway/tsconfig.json Normal file
View File

@ -0,0 +1,9 @@
{
"extends": "tsconfig/express.json",
"include": ["src/**/*.ts"],
"exclude": ["node_modules"],
"compilerOptions": {
"baseUrl": "src/",
"outDir": "./dist"
}
}

2460
segway/yarn.lock Normal file

File diff suppressed because it is too large Load Diff

View File

@ -2790,7 +2790,7 @@
dependencies: dependencies:
"@types/react" "*" "@types/react" "*"
"@types/react@*", "@types/react@^18.2.39", "@types/react@^18.2.42": "@types/react@*", "@types/react@18.2.42", "@types/react@^18.2.39", "@types/react@^18.2.42":
version "18.2.42" version "18.2.42"
resolved "https://registry.yarnpkg.com/@types/react/-/react-18.2.42.tgz#6f6b11a904f6d96dda3c2920328a97011a00aba7" resolved "https://registry.yarnpkg.com/@types/react/-/react-18.2.42.tgz#6f6b11a904f6d96dda3c2920328a97011a00aba7"
integrity sha512-c1zEr96MjakLYus/wPnuWDo1/zErfdU9rNsIGmE+NV71nx88FG9Ttgo5dqorXTu/LImX2f63WBP986gJkMPNbA== integrity sha512-c1zEr96MjakLYus/wPnuWDo1/zErfdU9rNsIGmE+NV71nx88FG9Ttgo5dqorXTu/LImX2f63WBP986gJkMPNbA==