# pylint: disable=W0613
"""
This module contains views to implement the React.
AuthToken list view in the Smarter Dashboard.
"""
from http import HTTPStatus
from typing import Union
from django.core.handlers.asgi import ASGIRequest
from django.core.paginator import Paginator
from django.db import models
from django.http import HttpRequest, JsonResponse
from smarter.apps.account.serializers import UserProfileSerializer
from smarter.apps.account.utils import smarter_cached_objects
from smarter.common.enum import SmarterResourceOwnershipFilterEnum
from smarter.lib import logging
from smarter.lib.django.http.shortcuts import (
SmarterHttpResponseNotFound,
)
from smarter.lib.django.views import SmarterAuthenticatedNeverCachedWebView
from smarter.lib.django.waffle import SmarterWaffleSwitches
from smarter.lib.drf.caching import (
get_cached_authtokens_available_to_user_profile,
get_cached_authtokens_owned_by_user_profile,
get_cached_authtokens_shared_with_user_profile,
invalidate_all_cached_authtokens_for_user_profile,
)
from smarter.lib.drf.models import SmarterAuthToken as AuthToken
from smarter.lib.drf.serializers.authtoken import (
SmarterAuthTokenSerializer as AuthTokenSerializer,
)
DEFAULT_PAGE_SIZE = 25
logger = logging.getSmarterLogger(__name__, any_switches=[SmarterWaffleSwitches.SECRET_LOGGING])
[docs]
class AuthTokenListApiView(SmarterAuthenticatedNeverCachedWebView):
"""
Render the authtoken list view for the Smarter Workbench web console.
This view displays all authtokens available to the authenticated user as cards, providing a quick overview and access to authtoken details.
:param request: Django HTTP request object.
:type request: ASGIRequest
:param args: Additional positional arguments.
:type args: tuple
:param kwargs: Additional keyword arguments.
:type kwargs: dict
:returns: Rendered HTML page with a card for each authtoken, or a 404 error page if the user is not authenticated.
:rtype: HttpResponse
"""
@property
def formatted_class_name(self) -> str:
"""Returns a formatted string of the class name for logging purposes."""
class_name = f"{__name__}.{AuthTokenListApiView.__name__}[{id(self)}]"
return self.formatted_text(class_name)
[docs]
def post(self, request: ASGIRequest, *args, **kwargs) -> Union[JsonResponse, SmarterHttpResponseNotFound]:
qs: models.QuerySet[AuthToken]
ownership_filter = kwargs.get("ownership_filter", SmarterResourceOwnershipFilterEnum.ALL)
page = request.GET.get("page", 1)
page_size = request.GET.get("page_size", DEFAULT_PAGE_SIZE)
invalidate_cache = request.GET.get("invalidate_cache", "false").lower() == "true"
logger.debug(
"%s.get() Rendering authtoken list view for user %s with args=%s, kwargs=%s.",
self.formatted_class_name,
request.user.username if request.user else "None", # type: ignore[union-attr]
args,
kwargs,
)
if invalidate_cache:
invalidate_all_cached_authtokens_for_user_profile(user_profile=self.user_profile) # type: ignore
if ownership_filter == SmarterResourceOwnershipFilterEnum.OWNED:
qs = get_cached_authtokens_owned_by_user_profile(user_profile=self.user_profile) # type: ignore
elif ownership_filter == SmarterResourceOwnershipFilterEnum.SHARED:
qs = get_cached_authtokens_shared_with_user_profile(user_profile=self.user_profile) # type: ignore
elif ownership_filter == SmarterResourceOwnershipFilterEnum.ALL:
qs = get_cached_authtokens_available_to_user_profile(user_profile=self.user_profile) # type: ignore
else:
logger.warning(
"%s.post() Received an invalid ownership_filter value: %s. Must be one of 'owned', 'shared', or 'all'. Defaulting to 'all'.",
self.formatted_class_name,
ownership_filter,
)
return JsonResponse(
{"error": "Invalid ownership_filter. Must be one of 'owned', 'shared', or 'all'."},
status=HTTPStatus.BAD_REQUEST,
)
paginator = Paginator(qs.order_by("-updated_at"), page_size)
authtokens = paginator.get_page(page)
smarter_admin = smarter_cached_objects.smarter_admin_user_profile
retval = {
"user": UserProfileSerializer(self.user_profile).data,
"admin": UserProfileSerializer(smarter_admin).data,
"objects": AuthTokenSerializer(authtokens, many=True).data,
}
return JsonResponse(retval)
[docs]
class AuthTokenListApiCloneView(SmarterAuthenticatedNeverCachedWebView):
"""Clone a authtoken for the authenticated user."""
@property
def formatted_class_name(self) -> str:
"""Returns a formatted string of the class name for logging purposes."""
class_name = f"{__name__}.{AuthTokenListApiCloneView.__name__}[{id(self)}]"
return self.formatted_text(class_name)
[docs]
def post(self, request: HttpRequest, *args, **kwargs) -> JsonResponse:
"""
Handle POST requests to clone an existing AuthToken.
Validates input
parameters, checks for the existence of the AuthToken to be cloned, and
creates a new AuthToken with the specified name. Invalidates the cache
for the user's LLMClients after cloning.
:param request: The HTTP request object containing the parameters for cloning.
:type request: HttpRequest
:param args: Additional positional arguments (not used).
:param kwargs: Additional keyword arguments, including:
- authtoken_id (str): The ID of the AuthToken to be cloned.
- new_name (str): The new name for the cloned AuthToken.
:returns: A JsonResponse containing the serialized data of the newly cloned AuthToken if successful, or an error message if the cloning fails.
:rtype: JsonResponse
"""
authtoken_id = kwargs.get("authtoken_id")
new_name = kwargs.get("new_name")
authtoken: AuthToken
if not authtoken_id or not new_name:
logger.warning(
"%s.post() Missing required parameters. authtoken_id: %s, new_name: %s",
self.formatted_class_name,
authtoken_id,
new_name,
)
return JsonResponse({"error": "authtoken_id and new_name are required."}, status=HTTPStatus.BAD_REQUEST)
try:
authtoken = AuthToken.objects.with_read_permission_for(self.user_profile.user).get(id=authtoken_id) # type: ignore
except AuthToken.DoesNotExist:
logger.warning(
"%s.post() AuthToken with id %s not found for cloning.", self.formatted_class_name, authtoken_id
)
return JsonResponse({"error": f"AuthToken with id {authtoken_id} not found."}, status=HTTPStatus.NOT_FOUND)
try:
new_name = self.to_snake_case(new_name.strip())
cloned_authtoken = authtoken.clone(new_name=new_name, user_profile=self.user_profile) # type: ignore
invalidate_all_cached_authtokens_for_user_profile(user_profile=self.user_profile) # type: ignore
data = AuthTokenSerializer(cloned_authtoken).data
return JsonResponse(data, status=HTTPStatus.OK) # type: ignore
# pylint: disable=broad-except
except Exception as e:
logger.error(
"%s.post() Error cloning AuthToken with id %s: %s",
self.formatted_class_name,
authtoken_id,
str(e),
exc_info=True,
)
return JsonResponse(
{"error": f"An error occurred while cloning the AuthToken: {str(e)}"}, status=HTTPStatus.BAD_REQUEST
)
[docs]
class AuthTokenListApiDeleteView(SmarterAuthenticatedNeverCachedWebView):
"""Delete a authtoken for the authenticated user."""
@property
def formatted_class_name(self) -> str:
"""Returns a formatted string of the class name for logging purposes."""
class_name = f"{__name__}.{AuthTokenListApiDeleteView.__name__}[{id(self)}]"
return self.formatted_text(class_name)
[docs]
def post(self, request: HttpRequest, *args, **kwargs) -> JsonResponse:
"""
Handle POST requests to delete an existing AuthToken.
Validates input
parameters, checks for the existence of the AuthToken to be deleted, and
deletes the AuthToken if it exists. Invalidates the cache for the user's
LLMClients after deletion.
:param request: The HTTP request object containing the parameters for deletion.
:type request: HttpRequest
:param args: Additional positional arguments (not used).
:param kwargs: Additional keyword arguments, including:
- authtoken_id (str): The ID of the AuthToken to be deleted.
:returns: A JsonResponse indicating the success or failure of the deletion.
:rtype: JsonResponse
"""
authtoken_id = kwargs.get("authtoken_id")
if not authtoken_id:
logger.warning("%s.post() Missing required parameter authtoken_id for deletion.", self.formatted_class_name)
return JsonResponse({"error": "authtoken_id is required."}, status=HTTPStatus.BAD_REQUEST)
try:
authtoken = AuthToken.objects.with_ownership_permission_for(self.user_profile.user).get(id=authtoken_id) # type: ignore
except AuthToken.DoesNotExist:
logger.warning(
"%s.post() AuthToken with id %s not found for deletion.", self.formatted_class_name, authtoken_id
)
return JsonResponse({"error": f"AuthToken with id {authtoken_id} not found."}, status=HTTPStatus.NOT_FOUND)
try:
authtoken.delete()
invalidate_all_cached_authtokens_for_user_profile(user_profile=self.user_profile) # type: ignore
return JsonResponse(
{"message": f"AuthToken with id {authtoken_id} deleted successfully."}, status=HTTPStatus.OK
)
# pylint: disable=broad-except
except Exception as e:
logger.error(
"%s.post() Error deleting AuthToken with id %s: %s",
self.formatted_class_name,
authtoken_id,
str(e),
exc_info=True,
)
return JsonResponse(
{"error": f"An error occurred while deleting the AuthToken: {str(e)}"}, status=HTTPStatus.BAD_REQUEST
)
[docs]
class AuthTokenListApiRenameView(SmarterAuthenticatedNeverCachedWebView):
"""Rename a authtoken for the authenticated user."""
@property
def formatted_class_name(self) -> str:
"""Returns a formatted string of the class name for logging purposes."""
class_name = f"{__name__}.{AuthTokenListApiRenameView.__name__}[{id(self)}]"
return self.formatted_text(class_name)
[docs]
def post(self, request: HttpRequest, *args, **kwargs) -> JsonResponse:
"""
Handle POST requests to rename an existing AuthToken.
Validates input
parameters, checks for the existence of the AuthToken to be renamed, and
renames the AuthToken if it exists. Invalidates the cache for the user's
LLMClients after renaming.
:param request: The HTTP request object containing the parameters for renaming.
:type request: HttpRequest
:param args: Additional positional arguments (not used).
:param kwargs: Additional keyword arguments, including:
- authtoken_id (str): The ID of the AuthToken to be renamed.
- new_name (str): The new name for the AuthToken.
:returns: A JsonResponse indicating the success or failure of the renaming.
:rtype: JsonResponse
"""
authtoken_id = kwargs.get("authtoken_id")
new_name = kwargs.get("new_name")
if not authtoken_id or not new_name:
logger.warning(
"%s.post() Missing required parameters for renaming. authtoken_id: %s, new_name: %s",
self.formatted_class_name,
authtoken_id,
new_name,
)
return JsonResponse({"error": "authtoken_id and new_name are required."}, status=HTTPStatus.BAD_REQUEST)
try:
authtoken = AuthToken.objects.with_ownership_permission_for(self.user_profile.user).get(id=authtoken_id) # type: ignore
except AuthToken.DoesNotExist:
logger.warning(
"%s.post() AuthToken with id %s not found for renaming.", self.formatted_class_name, authtoken_id
)
return JsonResponse({"error": f"AuthToken with id {authtoken_id} not found."}, status=HTTPStatus.NOT_FOUND)
try:
new_name = self.to_snake_case(new_name.strip())
authtoken.rename(new_name=new_name)
invalidate_all_cached_authtokens_for_user_profile(user_profile=self.user_profile) # type: ignore
data = AuthTokenSerializer(authtoken).data
return JsonResponse(data, status=HTTPStatus.OK) # type: ignore
# pylint: disable=broad-except
except Exception as e:
logger.error(
"%s.post() Error renaming AuthToken with id %s: %s",
self.formatted_class_name,
authtoken_id,
str(e),
exc_info=True,
)
return JsonResponse(
{"error": f"An error occurred while renaming the AuthToken: {str(e)}"}, status=HTTPStatus.BAD_REQUEST
)