# pylint: disable=W0613
"""A helper class that provides setters/getters for account and user."""
from typing import Optional, Union
from django.contrib.auth.models import AnonymousUser
from rest_framework.exceptions import AuthenticationFailed
from smarter.common.exceptions import SmarterBusinessRuleViolation
from smarter.common.mixins import SmarterHelperMixin
from smarter.common.utils import mask_string
from smarter.lib import logging
from smarter.lib.django.waffle import SmarterWaffleSwitches
from smarter.lib.drf.token_authentication import (
SmarterAnonymousUser,
SmarterTokenAuthentication,
)
from .models import Account, User, UserProfile
from .serializers import (
AccountMiniSerializer,
UserMiniSerializer,
UserProfileSerializer,
)
from .utils import (
account_number_from_url,
get_cached_account_for_user,
)
UserType = Union[AnonymousUser, User, None]
AccountNumberType = Optional[str]
ApiTokenType = Optional[bytes]
logger = logging.getSmarterLogger(__name__, any_switches=[SmarterWaffleSwitches.ACCOUNT_MIXIN_LOGGING])
[docs]
class AccountMixin(SmarterHelperMixin):
"""
Provides consistent initialization and short-lived caching of the.
``account``, ``user``, and ``user_profile`` properties using various sources,
such as direct arguments, request objects, or API tokens. Also handles
API token authentication when a request object with an Authorization
header is provided.
Initialization priority:
1. API token authentication if provided.
2. Explicit ``account_number``, ``account``, or ``user`` arguments.
3. Request object (from ``kwargs`` or positional args), extracting user and account info.
4. Lazy loading from existing ``user`` or ``user_profile``.
5. User and Account parameters passed directly to the constructor.
:param args: Positional arguments, may include a request object.
:param account_number: Unique account identifier (optional).
:type account_number: str or None
:param account: Account instance (optional).
:type account: Account or None
:param user: Django user instance (optional).
:type user: AnonymousUser, User, or None
:param api_token: API token for authentication (optional).
:type api_token: bytes or None
:param kwargs: Additional keyword arguments, may include ``request``.
The constructor attempts to resolve and cache the account and user information,
logging relevant events and warnings if data cannot be resolved.
"""
__slots__ = ("_account", "_user", "_user_profile", "_am_ready")
[docs]
def __init__(
self,
*args,
user: UserType = None,
account: Optional[Account] = None,
user_profile: Optional[UserProfile] = None,
account_number: AccountNumberType = None,
api_token: ApiTokenType = None,
**kwargs,
):
self._account: Optional[Account] = None
self._user: UserType = None
self._user_profile: Optional[UserProfile] = None
self._am_ready: bool = False
super().__init__(*args, **kwargs)
logger.debug(
"%s.__init__() called with args=%s, user=%s, account=%s, user_profile=%s, account_number=%s, api_token=%s, kwargs=%s",
self._am_formatted_class_name,
args,
user,
account,
user_profile,
account_number,
mask_string(api_token.decode()) if api_token else None,
kwargs,
)
# ---------------------------------------------------------------------
# Initial resolution of parameters, taking into consideration that
# they may be passed in via args or kwargs.
# ---------------------------------------------------------------------
request = kwargs.get("request") or next((arg for arg in args if "request" in str(type(arg)).lower()), None)
user = user or kwargs.get("user", None) or next((arg for arg in args if isinstance(arg, User)), None)
account = (
account or kwargs.get("account", None) or next((arg for arg in args if isinstance(arg, Account)), None)
)
user_profile = (
user_profile
or kwargs.get("user_profile", None)
or next((arg for arg in args if isinstance(arg, UserProfile)), None)
)
api_token = api_token or kwargs.get("api_token", None)
if isinstance(account_number, str):
logger.debug(
"%s.__init__(): received account_number %s. This will take precedence over other account information",
self._am_formatted_class_name,
account_number,
)
account = Account.get_cached_object(account_number=account_number)
# ---------------------------------------------------------------------
# Process the request object if available. We're looking for any of
# - account_number in the URL
# - API token in the Authorization header
# - user in the request object
# ---------------------------------------------------------------------
if request is not None:
url: Optional[str] = self.smarter_build_absolute_uri(request)
logger.debug(
"%s.__init__(): received a request object: %s. This will take precedence over other information.",
self._am_formatted_class_name,
url,
)
account_number = account_number or account_number_from_url(url) # type: ignore[arg-type]
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Token "):
if auth_header.split("Token ")[1].encode():
api_token = auth_header.split("Token ")[1].encode()
logger.debug(
"%s.__init__(): found API token in Authorization header of request object %s. This will take precedence over other information.",
self._am_formatted_class_name,
mask_string(api_token.decode()) if isinstance(api_token, (bytes, bytearray)) else None,
)
if not api_token and hasattr(request, "user") and not isinstance(request.user, AnonymousUser):
user = request.user # type: ignore[union-attr]
if isinstance(user, User):
logger.debug(
"%s.__init__(): found a user object in the request: %s. This will supersede other user information.",
self._am_formatted_class_name,
user,
)
else:
logger.debug(
"%s.__init__(): could not resolve user from the request object %s",
self._am_formatted_class_name,
request.build_absolute_uri(),
)
user = None
logger.debug(
"%s.__init__(): resolved api_token=%s, account_number=%s, account=%s, user=%s, user_profile=%s",
self._am_formatted_class_name,
mask_string(api_token.decode()) if isinstance(api_token, (bytes, bytearray)) else None,
account_number,
account,
user,
user_profile,
)
# ---------------------------------------------------------------------
# Final initialization based on priority order
# ---------------------------------------------------------------------
if isinstance(api_token, bytes):
logger.debug(
"%s.__init__(): found API token: %s. This will take precedence over other information.",
self._am_formatted_class_name,
mask_string(api_token.decode()),
)
AccountMixin.authenticate(self, api_token)
else:
if user_profile:
logger.debug(
"%s.__init__(): found a user_profile object: %s. This will take precedence over other information.",
self._am_formatted_class_name,
user_profile,
)
self.user_profile = user_profile
elif user and account:
logger.debug(
"%s.__init__(): found a user and account: %s, %s. This will take precedence over other information.",
self._am_formatted_class_name,
user,
account,
)
self.user_profile = UserProfile.get_cached_object(user=user, account=account) # type: ignore
elif user and not self.user:
logger.debug(
"%s.__init__(): found a user object: %s. This will take precedence over other information.",
self._am_formatted_class_name,
user,
)
self.user = user
elif account and not self.account:
logger.debug(
"%s.__init__(): found an account object: %s. This will take precedence over other information.",
self._am_formatted_class_name,
account,
)
self.account = account
logger.debug(
"%s.__init__() - finished %s",
self._am_formatted_class_name,
AccountMixin.__repr__(self),
)
self._am_log_ready_status()
def __str__(self):
"""
Returns a string representation of the class.
:return: String representation of the class.
:rtype: str
"""
return f"{logging.formatted_text(AccountMixin.__name__)}[{id(self)}](user_profile={self.user_profile})"
def __repr__(self) -> str:
"""
Returns a JSON representation of the class.
:return: JSON representation of the class.
:rtype: str
"""
return self.__str__()
def __bool__(self) -> bool:
"""
Returns True if the AccountMixin is am_ready to be used.
:return: True if the AccountMixin is am_ready to be used.
:rtype: bool
"""
return self.am_ready
def __hash__(self) -> int:
"""
Returns the hash of the user_profile.
:return: Hash of the user_profile.
:rtype: int
"""
return hash(self.user_profile)
def __eq__(self, value: object) -> bool:
"""
Returns True if the user_profile is the same.
:param value: The value to compare to.
:type value: object
:return: True if the user_profile is the same.
:rtype: bool
"""
return isinstance(value, AccountMixin) and self.user_profile == value.user_profile
def __lt__(self, value: object) -> bool:
"""
Returns True if the user_profile is less than the other user_profile.
:param value: The value to compare to.
:type value: object
:return: True if the user_profile is less than the other user_profile.
:rtype: bool
"""
if not isinstance(value, AccountMixin):
return NotImplemented
# Compare by user_profile id if both exist, else handle None
self_profile = self.user_profile
other_profile = value.user_profile
if self_profile is None and other_profile is None:
return False
if self_profile is None:
return True # None is considered less than any profile
if other_profile is None:
return False
return str(self_profile) < str(other_profile)
def __le__(self, value: object) -> bool:
"""
Returns True if the user_profile is less than or equal to the other user_profile.
:param value: The value to compare to.
:type value: object
:return: True if the user_profile is less than or equal to the other user_profile.
:rtype: bool
"""
if not isinstance(value, AccountMixin):
return NotImplemented
return self == value or self < value
def __gt__(self, value: object) -> bool:
"""
Returns True if the user_profile is greater than the other user_profile.
:param value: The value to compare to.
:type value: object
:return: True if the user_profile is greater than the other user_profile.
:rtype: bool
"""
if not isinstance(value, AccountMixin):
return NotImplemented
return not self <= value
def __ge__(self, value: object) -> bool:
"""
Returns True if the user_profile is greater than or equal to the other user_profile.
:param value: The value to compare to.
:type value: object
:return: True if the user_profile is greater than or equal to the other user_profile
:rtype: bool
"""
if not isinstance(value, AccountMixin):
return NotImplemented
return not self < value
[docs]
def init(self, *args, **kwargs) -> None:
"""
An optional method that can be called after initialization to perform any additional setup.
This is separate from __init__ to allow for more flexible initialization patterns.
:param args: Positional arguments passed to the init method.
:param kwargs: Keyword arguments passed to the init method.
:return: None
"""
logger.debug(
"%s.init() called with args: %s, kwargs: %s",
self._am_formatted_class_name,
args,
kwargs,
)
if not self.am_ready:
user = kwargs.get("user", None)
account = kwargs.get("account", None)
user_profile = kwargs.get("user_profile", None)
if user_profile:
self.user_profile = user_profile
elif user and account:
self.user_profile = UserProfile.get_cached_object(user=user, account=account) # type: ignore
elif user and not self.user:
self.user = user
elif account and not self.account:
self.account = account
self._am_log_ready_status()
[docs]
def setup(self, *args, **kwargs) -> None:
"""
This method is called by Django views during initialization.
It attempts to resolve the account and user information from the request object if it hasn't already been set.
:param args: Positional arguments passed to the view.
:param kwargs: Keyword arguments passed to the view, may include 'request'.
:return: The result of the superclass setup method.
:rtype: None
"""
logger.debug(
"%s.setup() called with args: %s, kwargs: %s",
self._am_formatted_class_name,
args,
kwargs,
)
if not self.am_ready:
self.init(*args, **kwargs)
logger.debug(
"%s.setup() completed with args: %s, kwargs: %s",
self._am_formatted_class_name,
args,
kwargs,
)
@property
def _am_formatted_class_name(self) -> str:
"""Returns the logger prefix for the class."""
class_name = f"{__name__}.{AccountMixin.__name__}[{id(self)}]"
return self.formatted_text(class_name)
@property
def account(self) -> Optional[Account]:
"""
Returns the account for the current user.
Handle
lazy instantiation from user or user_profile.
:return: The account for the current user.
:rtype: Account or None
"""
try:
if self._account:
return self._account
if isinstance(self._user_profile, UserProfile):
self._account = self._user_profile.account
logger.debug(
"%s.account() set _account to %s based on user_profile %s",
self._am_formatted_class_name,
self._account,
self._user_profile,
)
return self._account
if self._user:
try:
self._account = get_cached_account_for_user(invalidate=False, user=self._user) # type: ignore[assignment]
except Account.DoesNotExist as e:
logger.error(
"%s.account() could not find an account for user %s during lazy loading. This should not happen.",
self._am_formatted_class_name,
self._user,
)
raise SmarterBusinessRuleViolation(
f"Could not find an account for user {self._user} during lazy loading."
) from e
except Account.MultipleObjectsReturned:
logger.info(
"%s.account() found multiple accounts for user %s during lazy loading. Cannot lazily initialize account from UserProfile.",
self._am_formatted_class_name,
self._user,
)
return (
self._account
) # if there are multiple accounts then we're unable to lazily set from UserProfile
if self._account:
logger.debug(
"%s.account() set _account to %s based on user %s",
self._am_formatted_class_name,
self._account,
self._user,
)
try:
self._user_profile = UserProfile.get_cached_object(user=self._user, account=self._account) # type: ignore[assignment]
self._am_ready = True
logger.debug(
"%s.account() lazily set _user_profile to %s",
self._am_formatted_class_name,
self._user_profile,
)
self._am_log_ready_status()
except UserProfile.DoesNotExist as e:
logger.error(
"%s.account() could not find a user_profile for user %s and account %s during lazy loading. This should not happen.",
self._am_formatted_class_name,
self._user,
self._account,
)
raise SmarterBusinessRuleViolation(
f"Could not find a user_profile for user {self._user} and account {self._account} during lazy loading."
) from e
return self._account
logger.debug(
"%s.account() could not initialize _account for user: %s, user_profile: %s",
self._am_formatted_class_name,
self._user,
self._user_profile,
)
return None
except AttributeError as e:
logger.error(
"%s.account() AccountMixin appears to be only partially initialized: %s",
self._am_formatted_class_name,
e,
exc_info=True,
)
return None
# pylint: disable=broad-except
except Exception as e:
logger.error(
"%s.account() encountered an error while trying to resolve account: %s",
self._am_formatted_class_name,
e,
exc_info=True,
)
return None
@account.setter
def account(self, account: Optional[Account]):
"""
Set the account for the current user.
Handle
management of user_profile.
"""
if isinstance(self._account, Account) and account is not None:
raise SmarterBusinessRuleViolation(f"Account is already set to {self._account}. It is now immutable.")
self._account = account
logger.debug("%s.account.setter: set _account to %s", self._am_formatted_class_name, self._account)
self._user_profile = None
logger.debug("%s.account.setter: reset _user_profile to None", self._am_formatted_class_name)
if not account:
return
if self.user:
# If the user is already set, then we need to verify that the user is part of the account
# by attempting to fetch the user_profile.
try:
self._user_profile = UserProfile.get_cached_object(invalidate=False, user=self.user, account=account) # type: ignore[arg-type]
self._am_ready = True
logger.debug(
"%s.account.setter: lazily set _user_profile to %s",
self._am_formatted_class_name,
self._user_profile,
)
self._am_log_ready_status()
except UserProfile.DoesNotExist as e:
raise SmarterBusinessRuleViolation(
f"User {self._user} is not associated with the account {account.account_number}."
) from e
# this should actually happen.
if not self._user_profile:
raise SmarterBusinessRuleViolation(
f"User {self._user} is not associated with the account {self._account.account_number if isinstance(self._account, Account) else "unknown account"}."
)
@property
def account_number(self) -> AccountNumberType:
"""
A helper function to get the account number from the account.
:return: The account number for the current account.
:rtype: str or None
"""
return self._account.account_number if self._account else None
@account_number.setter
def account_number(self, account_number: AccountNumberType):
"""
A helper function to set the account from the account_number.
:param account_number: The account number to set the account from.
:type account_number: str or None
:return: None
:rtype: None
"""
if isinstance(self._account, Account):
raise SmarterBusinessRuleViolation(f"Account is already set to {self._account}. It is now immutable.")
if not account_number:
self._account = None
logger.debug("%s.account_number.setter: unset _account", self._am_formatted_class_name)
self._user_profile = None
logger.debug("%s.account_number.setter: unset _user_profile", self._am_formatted_class_name)
return
account = Account.get_cached_object(account_number=account_number)
if isinstance(account, Account):
self._account = account
logger.debug(
"%s: set account to %s based on account_number %s",
self._am_formatted_class_name,
self._account,
account_number,
)
@property
def user(self) -> UserType:
"""
Returns the user.
Handle lazy instantiation from user_profile or account.
:return: The user.
:rtype: User or None
"""
try:
if self._user:
return self._user
if self._user_profile:
self._user = self._user_profile.user
logger.debug(
"%s.user() set _user to %s based on user_profile %s",
self._am_formatted_class_name,
self._user,
self._user_profile,
)
return self._user
except AttributeError as e:
logger.error(
"%s.account() AccountMixin appears to be only partially initialized: %s",
self._am_formatted_class_name,
e,
exc_info=True,
)
return None
# pylint: disable=broad-except
except Exception as e:
logger.error(
"%s.user() encountered an error while trying to resolve user: %s",
self._am_formatted_class_name,
e,
exc_info=True,
)
return None
@user.setter
def user(self, user: UserType):
"""
Set the user.
:param user: The user to set.
:type user: User or None
:return: None
:rtype: None
"""
if isinstance(self._user, User):
raise SmarterBusinessRuleViolation(f"User is already set to {self._user}. It is now immutable.")
self._user = user
logger.debug("%s.user.setter: set user to %s", self._am_formatted_class_name, self._user)
if not user:
self._account = None
logger.debug("%s.user.setter: unset _account", self._am_formatted_class_name)
self._user_profile = None
logger.debug("%s.user.setter: unset _user_profile", self._am_formatted_class_name)
return
try:
self._user_profile = UserProfile.get_cached_object(user=user) # type: ignore[assignment]
self._am_ready = True
logger.debug(
"%s.user.setter: lazily set _user_profile to %s based on user %s",
self._am_formatted_class_name,
self._user_profile,
self._user,
)
self._am_log_ready_status()
except UserProfile.DoesNotExist as e:
raise SmarterBusinessRuleViolation(f"User {self._user} does not belong to any account.") from e
except UserProfile.MultipleObjectsReturned:
logger.info(
"%s.user.setter: found multiple user_profiles for user %s during lazy loading. Cannot lazily initialize user_profile from User.",
self._am_formatted_class_name,
self._user,
)
@property
def user_profile(self) -> Optional[UserProfile]:
"""
Returns the user_profile for the current user.
Handle
lazy instantiation from user or account.
:return: The user_profile for the current user.
:rtype: UserProfile or None
"""
try:
if self._user_profile:
self._am_ready = True
return self._user_profile
# note that we have to use property references here in order to trigger
# the property setters.
if self._account and isinstance(self._user, User):
try:
self._user_profile = UserProfile.get_cached_object(user=self._user, account=self._account)
self._am_ready = True
logger.debug(
"%s.user_profile() lazily set _user_profile to %s based on user %s and account %s",
self._am_formatted_class_name,
self._user_profile,
self._user,
self._account,
)
self._am_log_ready_status()
return self._user_profile
except UserProfile.DoesNotExist as e:
raise SmarterBusinessRuleViolation(
f"User {self._user} does not belong to the account {self._account.account_number}."
) from e
if isinstance(self._user, User):
self._user_profile = UserProfile.get_cached_object(user=self._user)
self._am_ready = True
logger.debug(
"%s.user_profile() lazily set _user_profile to %s",
self._am_formatted_class_name,
self._user_profile,
)
self._am_log_ready_status()
if not self._user_profile:
logger.debug(
"%s: user_profile() could not initialize _user_profile for user: %s, account: %s",
self._am_formatted_class_name,
self._user,
self._account,
)
else:
self._am_ready = True
self._am_log_ready_status()
return self._user_profile
except AttributeError as e:
logger.error(
"%s.account() AccountMixin appears to be only partially initialized: %s",
self._am_formatted_class_name,
e,
exc_info=True,
)
return None
# pylint: disable=broad-except
except Exception as e:
logger.error(
"%s.user_profile() encountered an error while trying to resolve user_profile: %s",
self._am_formatted_class_name,
e,
exc_info=True,
)
return None
@user_profile.setter
def user_profile(self, user_profile: Optional[UserProfile]):
"""
Set the user_profile for the current user.
If we're unsetting the user_profile,
then leave the user and account as they are. But if we're setting the user_profile,
then set the user and account as well.
:param user_profile: The user_profile to set.
:type user_profile: UserProfile or None
:return: None
:rtype: None
"""
if isinstance(self._user_profile, UserProfile):
raise SmarterBusinessRuleViolation(
f"UserProfile is already set to {self._user_profile}. It is now immutable."
)
self._user_profile = user_profile
logger.debug(
"%s.user_profile.setter: set _user_profile to %s", self._am_formatted_class_name, self._user_profile
)
if not self._user_profile:
self._user = None
logger.debug("%s.user_profile.setter: unset _user", self._am_formatted_class_name)
self._account = None
logger.debug("%s.user_profile.setter: unset _account", self._am_formatted_class_name)
else:
self._user = self._user_profile.user
logger.debug("%s.user_profile.setter: set _user to %s", self._am_formatted_class_name, self._user)
self._account = self._user_profile.account
logger.debug("%s.user_profile.setter: set _account to %s", self._am_formatted_class_name, self._account)
self._am_ready = True
self._am_log_ready_status()
@property
def am_ready(self) -> bool:
"""
Returns True if the AccountMixin is am_ready to be used.
This is a convenience property that checks if the account and user
are initialized. AccountMixin is considered am_ready if:
- self.user is an instance of User
- self.user_profile is an instance of UserProfile
- self.account is an instance of Account
:return: True if the AccountMixin is am_ready to be used.
:rtype: bool
"""
if self._am_ready:
return True
try:
if not super().ready:
logger.debug(
"%s.am_ready() returning false because superclass is not ready.",
self._am_formatted_class_name,
)
return False
if not isinstance(self.user_profile, UserProfile):
logger.debug(
"%s.am_ready() returning false because user_profile is not initialized.",
self._am_formatted_class_name,
)
return False
if not isinstance(self.user, User):
logger.debug(
"%s.am_ready() had to initialize user from user_profile. This is a bug.",
self._am_formatted_class_name,
)
self._user = self.user_profile.cached_user
if not isinstance(self.account, Account):
logger.debug(
"%s.am_ready() had to initialize account from user_profile. This is a bug.",
self._am_formatted_class_name,
)
self._account = self.user_profile.cached_account
self._am_ready = True
self._am_log_ready_status()
return self._am_ready
except AttributeError as e:
logger.error(
"%s.account() AccountMixin appears to be only partially initialized. This is a bug: %s",
self._am_formatted_class_name,
e,
exc_info=True,
)
return False
# pylint: disable=broad-except
except Exception as e:
logger.error(
"%s.am_ready() encountered an error while checking am_ready state. This is a bug: %s",
self._am_formatted_class_name,
e,
exc_info=True,
)
return False
@property
def _am_ready_state(self) -> str:
"""
Returns a string representation of the AccountMixin am_ready state.
:return: String representation of the AccountMixin am_ready state.
:rtype: str
"""
if self.am_ready:
return self.formatted_state_ready
else:
return self.formatted_state_not_ready
@property
def ready_state(self) -> str:
"""Returns a string representation of the am_ready state."""
if self.am_ready:
return self.formatted_state_ready
else:
return self.formatted_state_not_ready
@property
def is_authenticated(self) -> bool:
"""Returns True if the user is authenticated and is associated with an Account."""
return bool(self._user) and self._user.is_authenticated and bool(self._account) and bool(self._user_profile)
[docs]
def to_json(self):
"""Returns a JSON representation of the account, user, and user_profile."""
return self.sorted_dict(
{
"am_ready": self.am_ready,
"account": AccountMiniSerializer(self.account).data if self.account else None,
"user": UserMiniSerializer(self.user).data if self.user else None,
"userProfile": UserProfileSerializer(self.user_profile).data if self.user_profile else None,
}
)
[docs]
def authenticate(self, api_token: bytes) -> bool:
"""
Authenticate the user using the provided API token.
The api_token will
have been generated by the SmarterTokenAuthentication class and passed
by the caller in the Authorization header of the request.
example:
Authorization: Token <api_token>
:param api_token: The API token to authenticate with.
:type api_token: bytes
:return: True if authentication was successful, False otherwise.
:rtype: bool
"""
logger.debug(
"%s.authenticate() called with api_token=%s",
self._am_formatted_class_name,
mask_string(api_token.decode()),
)
try:
user, _ = SmarterTokenAuthentication().authenticate_credentials(api_token)
self._user = user
self._account = None
self._user_profile = None
logger.debug(
"%s.authenticate(): successfully authenticated user %s from API token.",
self._am_formatted_class_name,
self._user,
)
return True
except AuthenticationFailed:
self._user = SmarterAnonymousUser()
self._account = None
self._user_profile = None
logger.warning(
"%s.authenticate(): failed to authenticate user from API token", self._am_formatted_class_name
)
return False
[docs]
def log_ready_status(self):
"""Logs the ready status of the view."""
msg = f"{self.formatted_class_name} is {self.ready_state}"
logger.info(msg)
def _am_log_ready_status(self):
"""Logs the am_ready status of the AccountMixin."""
msg = f"{self._am_formatted_class_name} is {self._am_ready_state}"
logger.info(msg)