# pylint: disable=W0707,W0718,W0613
"""User views for smarter api."""
from http import HTTPStatus
from typing import Optional
from django.core.exceptions import ValidationError
from django.db import transaction
from django.http import Http404, HttpResponseRedirect, JsonResponse
from rest_framework import status
from rest_framework.request import Request
from rest_framework.response import Response
from smarter.apps.account.models import User, UserProfile, get_resolved_user
from smarter.apps.account.serializers import UserSerializer
from smarter.apps.api.signals import api_request_completed
from smarter.common.utils import is_authenticated_request, smarter_build_absolute_uri
from smarter.lib import json, logging
from smarter.lib.django.waffle import SmarterWaffleSwitches
from .base import AccountListViewBase, AccountViewBase
logger = logging.getSmarterLogger(__name__, any_switches=[SmarterWaffleSwitches.API_LOGGING])
# -----------------------------------------------------------------------
# handlers for users
# -----------------------------------------------------------------------
[docs]
def validate_request_body(request: Request):
# do a cursory check of the request data
logger.debug("%s.validate_request_body() - request body: %s", __name__, request.body)
try:
data = json.loads(request.body)
if not isinstance(data, dict):
raise ValidationError(
f"Invalid request data. Was expecting a dictionary but received {type(request.data) if hasattr(request, 'data') else 'unknown'}."
)
if "username" not in data:
raise ValidationError("Invalid request data. Missing 'username' field.")
if "password" not in data:
raise ValidationError("Invalid request data. Missing 'password' field.")
except json.JSONDecodeError:
return JsonResponse({"error": "Invalid JSON format in request body."}, status=HTTPStatus.BAD_REQUEST.value)
except ValidationError as e:
return JsonResponse({"error": "Invalid request data", "exception": str(e)}, status=HTTPStatus.BAD_REQUEST.value)
except Exception as e:
return JsonResponse({"error": "Invalid request data", "exception": str(e)}, status=HTTPStatus.BAD_REQUEST.value)
return None
[docs]
def eval_permissions(request, user_to_update: User, user_to_update_profile: Optional[UserProfile] = None):
logger.debug("%s.eval_permissions() - request: %s", __name__, request)
user = get_resolved_user(request.user)
if user is None:
return JsonResponse({"error": "User not found"}, status=HTTPStatus.UNAUTHORIZED.value)
if not user.is_superuser:
# if the user is not a superuser then they need to have a UserProfile
try:
request_user_account = UserProfile.objects.get(user=request.user).account
except UserProfile.DoesNotExist:
return JsonResponse(
{"error": "You are not authorized to modify Smarter user accounts."},
status=HTTPStatus.UNAUTHORIZED.value,
)
# if the user is not a superuser then at most they can update users within their own account
if user_to_update_profile and user_to_update_profile.account != request_user_account:
return JsonResponse(
{"error": "You are not authorized to modify this user account."}, status=HTTPStatus.UNAUTHORIZED.value
)
# if the user is neither a superuser nor a staff member then they can only update their own account
if not request.user.is_staff and user_to_update != request.user:
return JsonResponse(
{"error": "You are not authorized to modify this user account."}, status=HTTPStatus.UNAUTHORIZED.value
)
return None
[docs]
def get_user_for_operation(request, user_id: int) -> tuple[User, UserProfile] | JsonResponse:
if not isinstance(request.user, User):
return JsonResponse({"error": "Unauthorized"}, status=HTTPStatus.UNAUTHORIZED.value)
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
return JsonResponse({"error": "User not found"}, status=HTTPStatus.BAD_REQUEST.value)
try:
user_profile = UserProfile.objects.get(user=user)
except UserProfile.DoesNotExist:
return JsonResponse({"error": "User profile not found"}, status=HTTPStatus.BAD_REQUEST.value)
logger.debug("%s.get_user_for_operation() - user: %s", request, user)
return user, user_profile
# pylint: disable=too-many-return-statements
[docs]
def get_user(request, user_id: Optional[int] = None):
"""Get an account json representation by id."""
logger.debug("%s.get_user() - user_id: %s, request: %s", request, user_id, request)
if user_id is None:
serializer = UserSerializer(request.user)
logger.debug("UserListView.get_queryset() - returning current user: %s", request.user)
return Response(serializer.data, status=HTTPStatus.OK.value)
request_user = get_resolved_user(request.user)
if request_user is None:
logger.debug("UserListView.get_queryset() - user not found for request: %s", request)
return JsonResponse({"error": "User not found"}, status=HTTPStatus.UNAUTHORIZED.value)
# if the user is a superuser, they can get any user
if request_user.is_superuser:
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
logger.debug("UserListView.get_queryset() - user not found for id: %s", user_id)
return JsonResponse({"error": "User not found"}, status=HTTPStatus.NOT_FOUND.value)
serializer = UserSerializer(user)
logger.debug("UserListView.get_queryset() - returning user for id %s: %s", user_id, user)
return Response(serializer.data, status=HTTPStatus.OK.value)
# if the user is a staff member, they can get users within their own account
if request_user.is_staff:
try:
account = UserProfile.objects.get(user=request.user).account
user = UserProfile.objects.get(account=account, user_id=user_id).user
except User.DoesNotExist:
logger.debug("UserListView.get_queryset() - user not found for id: %s", user_id)
return JsonResponse({"error": "User not found"}, status=HTTPStatus.NOT_FOUND.value)
serializer = UserSerializer(user)
return Response(serializer.data, status=HTTPStatus.OK)
# mere mortals can only get their own account
if user_id != request.user.id:
logger.debug("UserListView.get_queryset() - unauthorized access attempt for user id: %s", user_id)
return JsonResponse({"error": "Unauthorized"}, status=HTTPStatus.UNAUTHORIZED.value)
serializer = UserSerializer(request.user)
logger.debug("UserListView.get_queryset() - returning current user: %s", request.user)
return Response(serializer.data, status=HTTPStatus.OK.value)
[docs]
def create_user(request: Request):
"""Create an account from a json representation in the body of the request."""
logger.debug("%s.create_user() - request: %s", request, request)
data: dict
user = get_resolved_user(request.user)
if user is None:
logger.debug("UserListView.get_queryset() - user not found for request: %s", request)
return JsonResponse({"error": "User not found"}, status=HTTPStatus.UNAUTHORIZED.value)
if not user.is_superuser and not user.is_staff:
logger.debug("UserListView.get_queryset() - unauthorized access attempt for user: %s", user)
return JsonResponse({"error": "Unauthorized"}, status=HTTPStatus.UNAUTHORIZED.value)
validate_request_body(request)
try:
data = json.loads(request.body)
except json.JSONDecodeError:
logger.debug("UserListView.get_queryset() - invalid JSON format in request body: %s", request.body)
return JsonResponse({"error": "Invalid JSON format in request body."}, status=HTTPStatus.BAD_REQUEST.value)
# the new user will be associated with the account of the current user
try:
account = UserProfile.objects.get(user=request.user).account
except UserProfile.DoesNotExist:
logger.debug("UserListView.get_queryset() - user profile not found for user: %s", user)
return JsonResponse({"error": "User is not associated with any account."}, status=HTTPStatus.UNAUTHORIZED.value)
try:
with transaction.atomic():
user = User.objects.create_user(**data)
UserProfile.objects.create(name=user.username, user=user, account=account)
except Exception as e:
logger.debug("UserListView.get_queryset() - error creating user: %s", e)
return JsonResponse({"error": "Invalid request data", "exception": str(e)}, status=HTTPStatus.BAD_REQUEST.value)
logger.debug("UserListView.get_queryset() - user created successfully: %s", user)
return HttpResponseRedirect(request.path_info + str(user.id) + "/") # type: ignore[return-value]
[docs]
def update_user(request: Request, user_id: int):
"""update an account from a json representation in the body of the request."""
logger.debug("%s.update_user() - user_id: %s, request: %s", request, user_id, request)
data: dict
validate_request_body(request)
try:
data = json.loads(request.body)
except json.JSONDecodeError:
logger.debug("UserListView.get_queryset() - invalid JSON format in request body: %s", request.body)
return JsonResponse({"error": "Invalid JSON format in request body."}, status=HTTPStatus.BAD_REQUEST.value)
user_to_update, user_to_update_profile = get_user_for_operation(request, user_id)
if isinstance(user_to_update, JsonResponse):
return user_to_update
if isinstance(user_to_update_profile, JsonResponse):
return user_to_update_profile
if not isinstance(user_to_update, User):
logger.debug("UserListView.get_queryset() - user not found for request: %s", request)
return JsonResponse({"error": "User not found"}, status=HTTPStatus.BAD_REQUEST.value)
if not isinstance(user_to_update_profile, UserProfile):
logger.debug("UserListView.get_queryset() - user profile not found for user: %s", user_to_update)
return JsonResponse({"error": "User profile not found"}, status=HTTPStatus.BAD_REQUEST.value)
eval_permissions(request, user_to_update, user_to_update_profile)
try:
for key, value in data.items():
if hasattr(user_to_update, key):
setattr(user_to_update, key, value)
user_to_update.save()
logger.debug("UserListView.get_queryset() - user updated successfully: %s", user_to_update)
except ValidationError as e:
logger.debug("UserListView.get_queryset() - validation error: %s", e)
return JsonResponse({"error": e.message}, status=HTTPStatus.BAD_REQUEST.value)
except Exception as e:
logger.debug("UserListView.get_queryset() - internal error: %s", e)
return JsonResponse(
{"error": "Internal error", "exception": str(e)}, status=HTTPStatus.INTERNAL_SERVER_ERROR.value
)
return HttpResponseRedirect(request.path_info)
[docs]
def delete_user(request: Request, user_id: Optional[int] = None):
"""delete a user by id."""
logger.debug("%s.delete_user() - user_id: %s, request: %s", __name__, user_id, request)
try:
if user_id:
user = User.objects.get(id=user_id)
else:
user = request.user
except User.DoesNotExist:
logger.debug("%s.delete_user() - user not found for request: %s", __name__, request)
return JsonResponse({"error": "User not found"}, status=HTTPStatus.NOT_FOUND.value)
try:
with transaction.atomic():
UserProfile.objects.get(user=user).delete()
user.delete()
logger.debug("%s.delete_user() - user deleted successfully: %s", __name__, user)
except Exception as e:
logger.debug("%s.delete_user() - error deleting user: %s", __name__, e)
return JsonResponse(
{"error": "Internal error", "exception": str(e)}, status=HTTPStatus.INTERNAL_SERVER_ERROR.value
)
plugins_path = request.path_info.rsplit("/", 2)[0]
return HttpResponseRedirect(plugins_path)
[docs]
class UserView(AccountViewBase):
"""User view for smarter api."""
[docs]
def get(self, request, user_id: int):
logger.debug("%s.get() - user_id: %s, request: %s", self.formatted_class_name, user_id, request)
response = get_user(request, user_id)
api_request_completed.send(sender=self.__class__, instance=self, request=request, response=response)
return response
[docs]
def post(self, request, user_id: Optional[int] = None):
logger.debug("%s.post() - request: %s", self.formatted_class_name, request)
response = create_user(request)
api_request_completed.send(sender=self.__class__, instance=self, request=request, response=response)
return response
[docs]
def patch(self, request, user_id: int):
logger.debug("%s.patch() - user_id: %s, request: %s", self.formatted_class_name, user_id, request)
response = update_user(request, user_id)
api_request_completed.send(sender=self.__class__, instance=self, request=request, response=response)
return response
[docs]
def delete(self, request, user_id):
logger.debug("%s.delete() - user_id: %s, request: %s", self.formatted_class_name, user_id, request)
response = delete_user(request, user_id)
api_request_completed.send(sender=self.__class__, instance=self, request=request, response=response)
return response
[docs]
def handle_exception(self, exc):
if isinstance(exc, Http404):
logger.debug("%s.handle_exception() - Http404: %s", self.formatted_class_name, exc)
return Response({"error": "Not found"}, status=status.HTTP_404_NOT_FOUND)
logger.error("%s.handle_exception() - unhandled exception: %s", self.formatted_class_name, exc)
return super().handle_exception(exc)
[docs]
class UserListView(AccountListViewBase):
"""User list view for smarter api."""
serializer_class = UserSerializer
[docs]
def setup(self, request: Request, *args, **kwargs):
"""
Setup the view. This is called by Django before dispatch() and is used to
initialize attributes that require the request object.
"""
super().setup(request, *args, **kwargs)
logger.debug(
"%s.setup() - request: %s, user: %s, user_profile: %s is_authenticated: %s",
self.formatted_class_name,
smarter_build_absolute_uri(self.request),
self.request.user.username if self.request.user else "Anonymous", # type: ignore[assignment]
self.user_profile,
is_authenticated_request(self.request),
)
[docs]
def get_queryset(self):
logger.debug("%s.get_queryset() - request: %s", self.formatted_class_name, self.request)
user = get_resolved_user(self.request.user)
if user is None:
logger.debug("%s.get_queryset() - user not found for request: %s", self.formatted_class_name, self.request)
return Response({"error": "User not found"}, status=HTTPStatus.UNAUTHORIZED.value)
if user.is_superuser:
logger.debug("%s.get_queryset() - user is superuser: %s", self.formatted_class_name, user)
return User.objects.all()
try:
account_users = UserProfile.objects.filter(account__user=self.request.user).values_list("user", flat=True)
logger.debug(
"%s.get_queryset() - account users for %s: %s", self.formatted_class_name, user, list(account_users)
)
return User.objects.filter(id__in=account_users)
except UserProfile.DoesNotExist:
logger.debug("%s.get_queryset() - user profile not found for user: %s", self.formatted_class_name, user)
return Response({"error": "User not found"}, status=HTTPStatus.NOT_FOUND.value)
[docs]
def get_list(self, request: Request):
"""
Get a list of all users the requesting user has access to.
"""
logger.debug("%s.get() - request: %s", self.formatted_class_name, request)
queryset = self.get_queryset()
if isinstance(queryset, Response):
return queryset
serializer = UserSerializer(queryset, many=True)
response = Response(serializer.data, status=HTTPStatus.OK.value)
return response
[docs]
def get(self, request: Request, *args, **kwargs):
"""
Handle GET requests to retrieve the list of users.
"""
logger.debug("%s.get() - request: %s", self.formatted_class_name, request)
return self.get_list(request)
[docs]
def post(self, request, *args, **kwargs):
"""
Handle POST requests to create a new user.
"""
logger.debug("%s.post() - request: %s", self.formatted_class_name, request)
return self.get_list(request)