mirror of
https://github.com/makeplane/plane
synced 2024-06-14 14:31:34 +00:00
131 lines
4.2 KiB
Python
131 lines
4.2 KiB
Python
# Django imports
|
|
from django.db import IntegrityError
|
|
|
|
# Third party imports
|
|
from rest_framework import status
|
|
from rest_framework.response import Response
|
|
|
|
# Module imports
|
|
from plane.db.models import Webhook, WebhookLog, Workspace
|
|
from plane.db.models.webhook import generate_token
|
|
from .base import BaseAPIView
|
|
from plane.app.permissions import WorkspaceOwnerPermission
|
|
from plane.app.serializers import WebhookSerializer, WebhookLogSerializer
|
|
|
|
|
|
class WebhookEndpoint(BaseAPIView):
|
|
permission_classes = [
|
|
WorkspaceOwnerPermission,
|
|
]
|
|
|
|
def post(self, request, slug):
|
|
workspace = Workspace.objects.get(slug=slug)
|
|
|
|
try:
|
|
serializer = WebhookSerializer(data=request.data)
|
|
if serializer.is_valid():
|
|
serializer.save(workspace_id=workspace.id)
|
|
return Response(serializer.data, status=status.HTTP_201_CREATED)
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
except IntegrityError as e:
|
|
if "already exists" in str(e):
|
|
return Response(
|
|
{"error": "URL already exists for the workspace"},
|
|
status=status.HTTP_410_GONE,
|
|
)
|
|
raise IntegrityError
|
|
|
|
def get(self, request, slug, pk=None):
|
|
if pk == None:
|
|
webhooks = Webhook.objects.filter(workspace__slug=slug)
|
|
serializer = WebhookSerializer(
|
|
webhooks,
|
|
fields=(
|
|
"id",
|
|
"url",
|
|
"is_active",
|
|
"created_at",
|
|
"updated_at",
|
|
"project",
|
|
"issue",
|
|
"cycle",
|
|
"module",
|
|
"issue_comment",
|
|
),
|
|
many=True,
|
|
)
|
|
return Response(serializer.data, status=status.HTTP_200_OK)
|
|
else:
|
|
webhook = Webhook.objects.get(workspace__slug=slug, pk=pk)
|
|
serializer = WebhookSerializer(
|
|
webhook,
|
|
fields=(
|
|
"id",
|
|
"url",
|
|
"is_active",
|
|
"created_at",
|
|
"updated_at",
|
|
"project",
|
|
"issue",
|
|
"cycle",
|
|
"module",
|
|
"issue_comment",
|
|
),
|
|
)
|
|
return Response(serializer.data, status=status.HTTP_200_OK)
|
|
|
|
def patch(self, request, slug, pk):
|
|
webhook = Webhook.objects.get(workspace__slug=slug, pk=pk)
|
|
serializer = WebhookSerializer(
|
|
webhook,
|
|
data=request.data,
|
|
partial=True,
|
|
fields=(
|
|
"id",
|
|
"url",
|
|
"is_active",
|
|
"created_at",
|
|
"updated_at",
|
|
"project",
|
|
"issue",
|
|
"cycle",
|
|
"module",
|
|
"issue_comment",
|
|
),
|
|
)
|
|
if serializer.is_valid():
|
|
serializer.save()
|
|
return Response(serializer.data, status=status.HTTP_200_OK)
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
def delete(self, request, slug, pk):
|
|
webhook = Webhook.objects.get(pk=pk, workspace__slug=slug)
|
|
webhook.delete()
|
|
return Response(status=status.HTTP_204_NO_CONTENT)
|
|
|
|
|
|
class WebhookSecretRegenerateEndpoint(BaseAPIView):
|
|
permission_classes = [
|
|
WorkspaceOwnerPermission,
|
|
]
|
|
|
|
def post(self, request, slug, pk):
|
|
webhook = Webhook.objects.get(workspace__slug=slug, pk=pk)
|
|
webhook.secret_key = generate_token()
|
|
webhook.save()
|
|
serializer = WebhookSerializer(webhook)
|
|
return Response(serializer.data, status=status.HTTP_200_OK)
|
|
|
|
|
|
class WebhookLogsEndpoint(BaseAPIView):
|
|
permission_classes = [
|
|
WorkspaceOwnerPermission,
|
|
]
|
|
|
|
def get(self, request, slug, webhook_id):
|
|
webhook_logs = WebhookLog.objects.filter(
|
|
workspace__slug=slug, webhook_id=webhook_id
|
|
)
|
|
serializer = WebhookLogSerializer(webhook_logs, many=True)
|
|
return Response(serializer.data, status=status.HTTP_200_OK)
|