plane/apiserver/plane/utils/paginator.py
2022-11-30 02:47:42 +05:30

228 lines
6.5 KiB
Python

from rest_framework.response import Response
from rest_framework.exceptions import ParseError
from collections.abc import Sequence
import math
class Cursor:
def __init__(self, value, offset=0, is_prev=False, has_results=None):
self.value = value
self.offset = int(offset)
self.is_prev = bool(is_prev)
self.has_results = has_results
def __str__(self):
return f"{self.value}:{self.offset}:{int(self.is_prev)}"
def __eq__(self, other):
return all(
getattr(self, attr) == getattr(other, attr)
for attr in ("value", "offset", "is_prev", "has_results")
)
def __repr__(self):
return "<{}: value={} offset={} is_prev={}>".format(
type(self).__name__,
self.value,
self.offset,
int(self.is_prev),
)
def __bool__(self):
return bool(self.has_results)
@classmethod
def from_string(cls, value):
bits = value.split(":")
if len(bits) != 3:
raise ValueError
try:
value = float(bits[0]) if "." in bits[0] else int(bits[0])
bits = value, int(bits[1]), int(bits[2])
except (TypeError, ValueError):
raise ValueError
return cls(*bits)
class CursorResult(Sequence):
def __init__(self, results, next, prev, hits=None, max_hits=None):
self.results = results
self.next = next
self.prev = prev
self.hits = hits
self.max_hits = max_hits
def __len__(self):
return len(self.results)
def __iter__(self):
return iter(self.results)
def __getitem__(self, key):
return self.results[key]
def __repr__(self):
return f"<{type(self).__name__}: results={len(self.results)}>"
MAX_LIMIT = 100
class BadPaginationError(Exception):
pass
class OffsetPaginator:
"""
The Offset paginator using the offset and limit
with cursor controls
http://example.com/api/users/?cursor=10.0.0&per_page=10
cursor=limit,offset=page,
"""
def __init__(
self,
queryset,
order_by=None,
max_limit=MAX_LIMIT,
max_offset=None,
on_results=None,
):
self.key = (
order_by
if order_by is None or isinstance(order_by, (list, tuple, set))
else (order_by,)
)
self.queryset = queryset
self.max_limit = max_limit
self.max_offset = max_offset
self.on_results = on_results
def get_result(self, limit=100, cursor=None):
# offset is page #
# value is page limit
if cursor is None:
cursor = Cursor(0, 0, 0)
limit = min(limit, self.max_limit)
queryset = self.queryset
if self.key:
queryset = queryset.order_by(*self.key)
page = cursor.offset
offset = cursor.offset * cursor.value
stop = offset + (cursor.value or limit) + 1
if self.max_offset is not None and offset >= self.max_offset:
raise BadPaginationError("Pagination offset too large")
if offset < 0:
raise BadPaginationError("Pagination offset cannot be negative")
results = list(queryset[offset:stop])
if cursor.value != limit:
results = results[-(limit + 1) :]
next_cursor = Cursor(limit, page + 1, False, len(results) > limit)
prev_cursor = Cursor(limit, page - 1, True, page > 0)
results = list(results[:limit])
if self.on_results:
results = self.on_results(results)
max_hits = math.ceil(queryset.count() / limit)
return CursorResult(
results=results,
next=next_cursor,
prev=prev_cursor,
hits=None,
max_hits=max_hits,
)
class BasePaginator:
"""BasePaginator class can be inherited by any View to return a paginated view"""
# cursor query parameter name
cursor_name = "cursor"
# get the per page parameter from request
def get_per_page(self, request, default_per_page=100, max_per_page=100):
try:
per_page = int(request.GET.get("per_page", default_per_page))
except ValueError:
raise ParseError(detail="Invalid per_page parameter.")
max_per_page = max(max_per_page, default_per_page)
if per_page > max_per_page:
raise ParseError(
detail=f"Invalid per_page value. Cannot exceed {max_per_page}."
)
return per_page
def paginate(
self,
request,
on_results=None,
paginator=None,
paginator_cls=OffsetPaginator,
default_per_page=100,
max_per_page=100,
cursor_cls=Cursor,
extra_stats=None,
controller=None,
**paginator_kwargs,
):
"""Paginate the request"""
assert (paginator and not paginator_kwargs) or (
paginator_cls and paginator_kwargs
)
per_page = self.get_per_page(request, default_per_page, max_per_page)
# Convert the cursor value to integer and float from string
input_cursor = None
if request.GET.get(self.cursor_name):
try:
input_cursor = cursor_cls.from_string(request.GET.get(self.cursor_name))
except ValueError:
raise ParseError(detail="Invalid cursor parameter.")
if not paginator:
paginator = paginator_cls(**paginator_kwargs)
try:
cursor_result = paginator.get_result(limit=per_page, cursor=input_cursor)
except BadPaginationError as e:
raise ParseError(detail=str(e))
# Serialize result according to the on_result function
if on_results:
results = on_results(cursor_result.results)
else:
results = cursor_result.results
# Add Manipulation functions to the response
if controller is not None:
results = controller(results)
else:
results = results
# Return the response
response = Response(
{
"next_cursor": str(cursor_result.next),
"prev_cursor": str(cursor_result.prev),
"next_page_results": cursor_result.next.has_results,
"prev_page_results": cursor_result.prev.has_results,
"count": cursor_result.__len__(),
"total_pages": cursor_result.max_hits,
"extra_stats": extra_stats,
"results": results,
}
)
return response