plane/apiserver/plane/api/views/base.py
Nikhil f1fda4ae4a
[WEB - 1122] fix: webhook for issues, issue comments, projects, cycles and modules. (#4330)
* dev: update webhook logic for issues

* dev: update issue webhooks for cycle and module

* dev: webhook for comment

* dev: issue attachment webhooks

* dev: add logging

* dev: add inbox issue webhooks

* dev: update the webhook send task

* dev: project webhooks for api

* dev: webhooks update for projects, cycles and modules

* dev: fix webhook on cycle and module create from external apis
2024-05-06 14:13:49 +05:30

158 lines
4.8 KiB
Python

# Python imports
from urllib.parse import urlparse
import zoneinfo
# Django imports
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.db import IntegrityError
from django.urls import resolve
from django.utils import timezone
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
# Third party imports
from rest_framework.views import APIView
# Module imports
from plane.api.middleware.api_authentication import APIKeyAuthentication
from plane.api.rate_limit import ApiKeyRateThrottle
from plane.utils.exception_logger import log_exception
from plane.utils.paginator import BasePaginator
class TimezoneMixin:
"""
This enables timezone conversion according
to the user set timezone
"""
def initial(self, request, *args, **kwargs):
super().initial(request, *args, **kwargs)
if request.user.is_authenticated:
timezone.activate(zoneinfo.ZoneInfo(request.user.user_timezone))
else:
timezone.deactivate()
class BaseAPIView(TimezoneMixin, APIView, BasePaginator):
authentication_classes = [
APIKeyAuthentication,
]
permission_classes = [
IsAuthenticated,
]
throttle_classes = [
ApiKeyRateThrottle,
]
def filter_queryset(self, queryset):
for backend in list(self.filter_backends):
queryset = backend().filter_queryset(self.request, queryset, self)
return queryset
def handle_exception(self, exc):
"""
Handle any exception that occurs, by returning an appropriate response,
or re-raising the error.
"""
try:
response = super().handle_exception(exc)
return response
except Exception as e:
if isinstance(e, IntegrityError):
return Response(
{"error": "The payload is not valid"},
status=status.HTTP_400_BAD_REQUEST,
)
if isinstance(e, ValidationError):
return Response(
{"error": "Please provide valid detail"},
status=status.HTTP_400_BAD_REQUEST,
)
if isinstance(e, ObjectDoesNotExist):
return Response(
{"error": "The requested resource does not exist."},
status=status.HTTP_404_NOT_FOUND,
)
if isinstance(e, KeyError):
return Response(
{"error": "The required key does not exist."},
status=status.HTTP_400_BAD_REQUEST,
)
log_exception(e)
return Response(
{"error": "Something went wrong please try again later"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
def dispatch(self, request, *args, **kwargs):
try:
response = super().dispatch(request, *args, **kwargs)
if settings.DEBUG:
from django.db import connection
print(
f"{request.method} - {request.get_full_path()} of Queries: {len(connection.queries)}"
)
return response
except Exception as exc:
response = self.handle_exception(exc)
return exc
def finalize_response(self, request, response, *args, **kwargs):
# Call super to get the default response
response = super().finalize_response(
request, response, *args, **kwargs
)
# Add custom headers if they exist in the request META
ratelimit_remaining = request.META.get("X-RateLimit-Remaining")
if ratelimit_remaining is not None:
response["X-RateLimit-Remaining"] = ratelimit_remaining
ratelimit_reset = request.META.get("X-RateLimit-Reset")
if ratelimit_reset is not None:
response["X-RateLimit-Reset"] = ratelimit_reset
return response
@property
def workspace_slug(self):
return self.kwargs.get("slug", None)
@property
def project_id(self):
project_id = self.kwargs.get("project_id", None)
if project_id:
return project_id
if resolve(self.request.path_info).url_name == "project":
return self.kwargs.get("pk", None)
@property
def fields(self):
fields = [
field
for field in self.request.GET.get("fields", "").split(",")
if field
]
return fields if fields else None
@property
def expand(self):
expand = [
expand
for expand in self.request.GET.get("expand", "").split(",")
if expand
]
return expand if expand else None