# pylint: disable=C0302
"""Account models."""
# pylint: disable=missing-class-docstring
import logging
import os
import random
from typing import TYPE_CHECKING, Optional, Union
# 3rd party stuff
from cryptography.fernet import Fernet
# django stuff
from django.conf import settings
from django.contrib.auth.models import User
from django.core.validators import RegexValidator
from django.db import models
from django.template.loader import render_to_string
from django.test.client import RequestFactory
from django.utils import timezone
from django.utils.functional import SimpleLazyObject
# our stuff
from smarter.common.conf import smarter_settings
from smarter.common.const import SMARTER_ADMIN_USERNAME
from smarter.common.exceptions import SmarterConfigurationError, SmarterValueError
from smarter.common.helpers.console_helpers import formatted_text
from smarter.common.helpers.email_helpers import email_helper
from smarter.lib.cache import cache_results
from smarter.lib.django import waffle
from smarter.lib.django.models import MetaDataModel, TimestampedModel
from smarter.lib.django.validators import SmarterValidator
from smarter.lib.django.waffle import SmarterWaffleSwitches
from smarter.lib.logging import WaffleSwitchedLoggerWrapper
from .signals import (
new_charge_created,
new_user_created,
secret_accessed,
secret_created,
secret_edited,
)
if TYPE_CHECKING:
try:
from django.contrib.auth.models import AbstractUser, AnonymousUser, _AnyUser
from django.core.handlers.wsgi import WSGIRequest
except ImportError:
_AnyUser = Union[object] # fallback for Sphinx/type checkers
HERE = os.path.abspath(os.path.dirname(__file__))
ResolvedUserType = Optional[Union["User", "AbstractUser", "AnonymousUser"]]
# pylint: disable=W0613
def should_log(level):
"""Check if logging should be done based on the waffle switch."""
return waffle.switch_is_active(SmarterWaffleSwitches.ACCOUNT_LOGGING)
base_logger = logging.getLogger(__name__)
logger = WaffleSwitchedLoggerWrapper(base_logger, should_log)
CHARGE_TYPE_PROMPT_COMPLETION = "completion"
CHARGE_TYPE_PLUGIN = "plugin"
CHARGE_TYPE_TOOL = "tool"
CHARGE_TYPES = [
(CHARGE_TYPE_PROMPT_COMPLETION, "Prompt Completion"),
(CHARGE_TYPE_PLUGIN, "Plugin"),
(CHARGE_TYPE_TOOL, "Tool"),
]
PROVIDER_OPENAI = "openai"
PROVIDER_METAAI = "metaai"
PROVIDER_GOOGLEAI = "googleai"
PROVIDERS = [
(PROVIDER_OPENAI, "OpenAI"),
(PROVIDER_METAAI, "Meta AI"),
(PROVIDER_GOOGLEAI, "Google AI"),
]
def welcome_email_context(first_name: str) -> dict:
"""
Return the context for the welcome email template.
templates/account/email/welcome.html
"""
# pylint: disable=import-outside-toplevel
from smarter.apps.dashboard.context_processors import branding
first_name = first_name.capitalize()
request = RequestFactory().get("/", HTTP_HOST=smarter_settings.environment_platform_domain)
retval = branding(request=request)
retval["first_name"] = first_name
retval["environment_platform_domain"] = smarter_settings.environment_url.rstrip("/")
retval["send_password_email"] = waffle.switch_is_active(SmarterWaffleSwitches.ENABLE_NEW_USER_PASSWORD_EMAIL)
return retval
def get_resolved_user(
user: Union[User, "AbstractUser", "AnonymousUser", SimpleLazyObject, "_AnyUser"],
) -> ResolvedUserType:
"""
Resolve and return a Django user object from a user-like instance.
This function maps various Django user subclasses and proxy types (such as `SimpleLazyObject`)
to a concrete `User`, `AbstractUser`, or `AnonymousUser` instance. It is useful for ensuring
type safety and correct type annotations when working with user objects in Django.
:param user: Union[User, AbstractUser, AnonymousUser, SimpleLazyObject, _AnyUser]
The user-like object to resolve.
:returns: Optional[Union[User, AbstractUser, AnonymousUser]]
The resolved user object, or None if input is None.
:raises SmarterConfigurationError: If the input user type is unexpected.
.. note::
Handles edge cases such as lazy objects and test mocks.
**Example usage**::
from smarter.apps.account.models import get_resolved_user
resolved_user = get_resolved_user(request.user)
if resolved_user and resolved_user.is_authenticated:
# Safe to access user fields
.. seealso::
:class:`django.contrib.auth.models.User`
:class:`django.utils.functional.SimpleLazyObject`
"""
logger.debug("%s called for user type: %s", formatted_text(__name__) + ".get_resolved_user()", type(user))
if user is None:
return None
# pylint: disable=import-outside-toplevel
from django.contrib.auth.models import AbstractUser, AnonymousUser
# this is the expected case
if isinstance(user, Union[User, AnonymousUser, AbstractUser]):
logger.debug(
"%s - user is instance of expected type: %s",
formatted_text(__name__) + ".get_resolved_user()",
type(user),
)
return user
# these are manageable edge cases
# --------------------------------
# pylint: disable=W0212
if isinstance(user, SimpleLazyObject):
return user._wrapped
# Allow unittest.mock.MagicMock or Mock for testing
if hasattr(user, "__class__") and user.__class__.__name__ in ("MagicMock", "Mock"):
return user # type: ignore[return-value]
raise SmarterConfigurationError(
f"Unexpected user type: {type(user)}. Expected Django User, AnonymousUser, SimpleLazyObject, or a test mock."
)
[docs]
class Account(MetaDataModel):
"""
Model representing a Smarter account.
The `Account` model stores company and billing information, and is the central entity for resource ownership,
billing, and user management in the Smarter platform.
:param account_number: String. Unique account identifier in the format '9999-9999-9999'.
:param is_default_account: Boolean. Indicates if this is the default account.
:param company_name: String. Name of the company.
:param phone_number: String. Company phone number.
:param address1: String. Primary address line.
:param address2: String. Secondary address line.
:param city: String. City.
:param state: String. State or region.
:param postal_code: String. Postal code.
:param country: String. Country (default: 'USA').
:param language: String. Language code (default: 'EN').
:param timezone: String. Timezone.
:param currency: String. Currency code (default: 'USD').
:param is_active: Boolean. If False, account is disabled for billing and resource management.
**Example usage**::
from smarter.apps.account.models import Account
account = Account.objects.create(company_name="Acme Corp")
print(account.account_number)
.. seealso::
Related models: :class:`UserProfile`, :class:`AccountContact`, :class:`Charge`
"""
account_number_format = RegexValidator(
regex=SmarterValidator.VALID_ACCOUNT_NUMBER_PATTERN,
message="Account number must be entered in the format: '9999-9999-9999'.",
)
account_number = models.CharField(
validators=[account_number_format], max_length=255, unique=True, default="9999-9999-9999", blank=True, null=True
)
is_default_account = models.BooleanField(
default=False,
help_text="Indicates if this is the default account for the organization. Only one account should be marked as default.",
)
company_name = models.CharField(max_length=255)
phone_number = models.CharField(max_length=50, blank=True, null=True)
address1 = models.CharField(max_length=255, blank=True, null=True)
address2 = models.CharField(max_length=255, blank=True, null=True)
city = models.CharField(max_length=255, blank=True, null=True)
state = models.CharField(max_length=255, blank=True, null=True)
postal_code = models.CharField(max_length=20, blank=True, null=True)
country = models.CharField(max_length=255, default="USA", blank=True, null=True, help_text="ISO 3166 country code.")
language = models.CharField(
max_length=255, default="EN", blank=True, null=True, help_text="BCP 47 language tag, e.g., 'en-US'."
)
timezone = models.CharField(
max_length=255, blank=True, null=True, help_text=" IANA timezone name, e.g., 'America/New_York'."
)
currency = models.CharField(
max_length=255, default="USD", blank=True, null=True, help_text="ISO 4217 currency code."
)
is_active = models.BooleanField(
default=True,
help_text="Indicates whether the account is active. Inactive accounts cannot be used for billing or resource management, nor hosting Provider apis.",
)
[docs]
@classmethod
def randomized_account_number(cls):
"""
Generate a random account number in the format ####-####-####.
This method ensures uniqueness by checking for collisions with existing account numbers.
:returns: str
A unique account number string.
.. note::
The generated account number is zero-padded and segmented for readability.
**Example usage**::
from smarter.apps.account.models import Account
account_number = Account.randomized_account_number()
print(account_number) # e.g., '1234-5678-9012'
"""
ACCOUNT_NUMBER_SEGMENTS = 3
ACCOUNT_NUMBER_SEGMENT_LENGTH = 4
def account_number_generator():
parts = [
str(random.randint(0, 9999)).zfill(ACCOUNT_NUMBER_SEGMENT_LENGTH)
for _ in range(ACCOUNT_NUMBER_SEGMENTS)
]
retval = "-".join(parts)
return retval
account_number = account_number_generator()
while cls.objects.filter(account_number=account_number).exists():
account_number = account_number_generator()
return account_number
[docs]
def save(self, *args, **kwargs):
"""
Save the Account instance, ensuring a valid and unique account number.
If the account number is set to the default value, this method generates a new unique account number.
It also validates the account number format before saving.
:param args: Positional arguments passed to the parent save method.
:param kwargs: Keyword arguments passed to the parent save method.
:raises SmarterValueError: If the account number is invalid.
**Example usage**::
account = Account(company_name="Acme Corp")
account.save() # Ensures account_number is unique and valid
"""
if self.account_number == "9999-9999-9999":
self.account_number = self.randomized_account_number()
SmarterValidator.validate_account_number(self.account_number)
super().save(*args, **kwargs)
[docs]
@classmethod
def get_by_account_number(cls, account_number):
"""
Retrieve an Account instance by its account number.
:param account_number: String. The account number to search for.
:returns: Optional[Account]
The Account instance if found, otherwise None.
"""
try:
return cls.objects.get(account_number=account_number)
except cls.DoesNotExist:
return None
[docs]
@classmethod
def get_cached_object(
cls,
invalidate: Optional[bool] = False,
pk: Optional[int] = None,
name: Optional[str] = None,
account_number: Optional[str] = None,
company_name: Optional[str] = None,
) -> Optional["Account"]:
"""
Retrieve an Account instance by account number with caching.
This method uses caching to optimize retrieval of Account instances by their account number.
It checks the cache first and falls back to a database query if the cache is missed.
:param invalidate: If True, invalidate the cache for this query.
:type invalidate: bool, optional
:param pk: Optional primary key to search for (ignored if account_number is provided).
:type pk: int, optional
:param name: Optional name to search for (ignored if account_number is provided).
:type name: str, optional
:param account_number: String. The account number to search for.
:type account_number: str, optional
:param company_name: String. The company name to search for (used if account_number is not provided).
:type company_name: str, optional
:returns: Optional[Account]
The Account instance if found, otherwise None.
.. note::
Caching can significantly improve performance for frequently accessed accounts.
**Example usage**::
account = Account.get_cached_object(account_number="1234-5678-9012")
if account:
print(account.company_name)
"""
logger_prefix = formatted_text(f"{__name__}.{cls.__name__}.get_cached_object()")
logger.debug(
"%s called with pk=%s, name=%s, account_number=%s, company_name=%s, invalidate=%s",
logger_prefix,
pk,
name,
account_number,
company_name,
invalidate,
)
@cache_results(cls.cache_expiration)
def _get_account_by_number(account_number: str, class_name: str) -> Optional["Account"]:
try:
logger.debug(
"%s._get_account_by_number() cache miss for account_number=%s", logger_prefix, account_number
)
return cls.objects.get(account_number=account_number)
except cls.DoesNotExist:
logger.debug(
"%s._get_account_by_number() no Account found for account_number=%s", logger_prefix, account_number
)
return None
@cache_results(cls.cache_expiration)
def _get_account_by_company_name(company_name: str, class_name: str) -> Optional["Account"]:
try:
logger.debug(
"%s._get_account_by_company_name() cache miss for company_name=%s", logger_prefix, company_name
)
return cls.objects.get(company_name=company_name)
except cls.DoesNotExist:
logger.debug(
"%s._get_account_by_company_name() no Account found for company_name=%s",
logger_prefix,
company_name,
)
return None
if invalidate:
_get_account_by_number.invalidate(account_number=account_number, class_name=Account.__name__)
_get_account_by_company_name.invalidate(company_name=company_name, class_name=Account.__name__)
if account_number:
return _get_account_by_number(account_number=account_number, class_name=Account.__name__)
if company_name:
return _get_account_by_company_name(company_name=company_name, class_name=Account.__name__)
return super().get_cached_object(invalidate=invalidate, pk=pk, name=name) # type: ignore[return-value]
# pylint: disable=missing-class-docstring
class Meta:
verbose_name = "Account"
verbose_name_plural = "Accounts"
def __str__(self):
return str(self.account_number) + " - " + str(self.company_name)
def __repr__(self) -> str:
return super().__str__()
class AccountContact(TimestampedModel):
"""
Model for storing contact information associated with an account.
Unlike the User model, `AccountContact` allows management of email lists and contact details
independently from registered users. This is useful for sending communications to non-user contacts,
or for users who opt out of system emails.
:param account: ForeignKey to :class:`Account`. The related account.
:param first_name: String. Contact's first name.
:param last_name: String. Contact's last name.
:param email: String. Contact's email address.
:param phone: String. Contact's phone number (optional).
:param is_primary: Boolean. Marks this contact as the primary contact for the account.
:param is_test: Boolean. Indicates if this contact is for testing purposes.
:param welcomed: Boolean. Indicates if a welcome email has been sent.
.. note::
Contacts do not need to be registered users.
.. tip::
Use :meth:`send_email_to_account` to broadcast messages to all contacts.
.. attention::
Only one primary contact is allowed per account.
**Example usage**::
from smarter.apps.account.models import AccountContact
contact = AccountContact.objects.create(
account=account,
first_name="Jane",
last_name="Doe",
email="jane@example.com",
is_primary=True
)
.. seealso::
:class:`Account`, :class:`UserProfile`
"""
# pylint: disable=missing-class-docstring
class Meta:
verbose_name = "Account Contact"
verbose_name_plural = "Account Contacts"
unique_together = ("account", "email")
account = models.ForeignKey(Account, on_delete=models.CASCADE, related_name="contacts")
first_name = models.CharField(max_length=255)
last_name = models.CharField(max_length=255)
email = models.EmailField()
phone = models.CharField(max_length=20, blank=True, null=True)
is_primary = models.BooleanField(
default=False,
help_text="Indicates if this contact is the primary contact for the account. Only one contact can be primary per account.",
)
is_test = models.BooleanField(
default=False, help_text="Indicates if this contact is used for unit testing purposes."
)
welcomed = models.BooleanField(
default=False, help_text="Indicates if a welcome email has been sent to this contact."
)
def send_email(self, subject: str, body: str, html: bool = False, from_email: Optional[str] = None):
"""
Send an email to this account contact.
This method uses the Smarter email helper to deliver a message to the contact's email address.
It supports both plain text and HTML emails, and allows customization of the sender address.
:param subject: String. The email subject line.
:param body: String. The email body content.
:param html: Boolean. If True, sends the email as HTML. Defaults to False.
:param from_email: String or None. Optional sender email address.
.. note::
If the contact is marked as a test contact (`is_test=True`), the email is sent quietly.
.. tip::
Use this method for direct, transactional communications with account contacts.
**Example usage**::
contact.send_email(
subject="Welcome!",
body="Hello and welcome to Smarter.",
html=True,
from_email="support@smarter.com"
)
"""
email_helper.send_email(
subject=subject, to=self.email, body=body, html=html, from_email=from_email, quiet=self.is_test
)
def send_welcome_email(self) -> None:
"""
Send a personalized welcome email to this account contact.
This method renders the welcome email template with the contact's first name and sends it as HTML.
:returns: None
.. note::
The welcome email uses the template at ``account/email/welcome.html``.
.. tip::
This method is automatically called when a new contact is created and has not yet been welcomed.
**Example usage**::
contact.send_welcome_email()
"""
context = welcome_email_context(first_name=self.first_name)
html_template = render_to_string("account/email/welcome.html", context)
logger.debug(
"%s.send_welcome_email() Sending welcome email to %s with content: %s",
formatted_text(__name__ + ".AccountContact.send_welcome_email()"),
self.email,
html_template,
)
subject = "Welcome to Smarter!"
body = html_template
self.send_email(subject=subject, body=body, html=True)
@classmethod
def get_primary_contact(cls, account: Account) -> Optional["AccountContact"]:
"""
Retrieve the primary contact for a given account.
This method returns the first contact marked as primary for the specified account, or None if no such contact exists.
:param account: Instance of :class:`Account`. The account to search for a primary contact.
:returns: Optional[AccountContact]
The primary contact instance, or None if not found.
.. tip::
Use this method to quickly access the main point of contact for notifications or support.
**Example usage**::
primary_contact = AccountContact.get_primary_contact(account)
if primary_contact:
print(primary_contact.email)
"""
return cls.objects.filter(account=account, is_primary=True).first()
# pylint: disable=too-many-arguments
@classmethod
def send_email_to_account(
cls, account: Account, subject: str, body: str, html: bool = False, from_email: Optional[str] = None
) -> None:
"""
Send an email to all contacts associated with a given account.
This method iterates over all contacts for the specified account and sends the provided message
to each contact's email address.
:param account: Instance of :class:`Account`. The account whose contacts will receive the email.
:param subject: String. The email subject line.
:param body: String. The email body content.
:param html: Boolean. If True, sends the email as HTML. Defaults to False.
:param from_email: String or None. Optional sender email address.
.. note::
Contacts marked as test contacts (`is_test=True`) will receive emails quietly.
.. tip::
Use this method for account-wide notifications or announcements.
**Example usage**::
AccountContact.send_email_to_account(
account=account,
subject="System Update",
body="We have updated our terms of service.",
html=False
)
"""
contacts = cls.objects.filter(account=account)
for contact in contacts:
contact.send_email(subject=subject, body=body, html=html, from_email=from_email)
# pylint: disable=too-many-arguments
@classmethod
def send_email_to_primary_contact(
cls, account: Account, subject: str, body: str, html: bool = False, from_email: Optional[str] = None
) -> None:
"""
Send an email to the primary contact of a given account.
This method locates the primary contact for the specified account and sends the provided message.
If no primary contact exists, an error is logged.
:param account: Instance of :class:`Account`. The account whose primary contact will receive the email.
:param subject: String. The email subject line.
:param body: String. The email body content.
:param html: Boolean. If True, sends the email as HTML. Defaults to False.
:param from_email: String or None. Optional sender email address.
.. attention::
If no primary contact is found, no email is sent and an error is logged.
.. tip::
Use this method for urgent or important communications that require a single point of contact.
**Example usage**::
AccountContact.send_email_to_primary_contact(
account=account,
subject="Urgent: Action Required",
body="Please review your account settings.",
html=True
)
"""
prefix = formatted_text(__name__ + ".AccountContact.send_email_to_primary_contact()")
contact = cls.get_primary_contact(account)
logger.debug(
"%s.send_email_to_primary_contact() Attempting to send email to primary contact for account %s. Found contact: %s, subject: %s, body: %s, html: %s, from_email: %s",
prefix,
account,
contact,
subject,
body,
html,
from_email,
)
if contact:
contact.send_email(subject=subject, body=body, html=html, from_email=from_email)
else:
logger.error(
"%s.send_email_to_primary_contact() No primary contact found for account %s",
prefix,
account,
)
def save(self, *args, **kwargs):
"""
Save the AccountContact instance, enforcing primary contact uniqueness and sending a welcome email if needed.
This method ensures that only one primary contact exists per account. If the contact is new and has not
been welcomed, a welcome email is sent and the `welcomed` flag is updated.
:param args: Positional arguments passed to the parent save method.
:param kwargs: Keyword arguments passed to the parent save method.
.. attention::
Only one contact per account can be marked as primary. Attempting to save another will raise an error.
.. note::
The welcome email is sent automatically for new contacts who have not been welcomed.
:raises SmarterValueError: If another primary contact already exists for the account.
**Example usage**::
contact = AccountContact(account=account, email="jane@example.com", is_primary=True)
contact.save() # Ensures uniqueness and sends welcome email if needed
"""
prefix = formatted_text(__name__ + ".AccountContact.save()")
logger.debug("%s called with args: %s, kwargs: %s", prefix, args, kwargs)
if self.is_primary:
# Check for another primary contact for this account (excluding self if updating)
qs = AccountContact.objects.filter(account=self.account, is_primary=True)
if self.pk:
qs = qs.exclude(pk=self.pk)
if qs.exists():
raise SmarterValueError("There is already a primary contact for this account.")
super().save(*args, **kwargs)
if not self.welcomed:
self.send_welcome_email()
self.welcomed = True
self.save()
def __str__(self):
return self.first_name + " " + self.last_name
[docs]
class UserProfile(MetaDataModel):
"""
UserProfile model for associating Django users with Smarter accounts.
Establishes a link between a Django User and an Account, enabling centralized management of billing, identity, and resource ownership.
:param user: ForeignKey to :class:`django.contrib.auth.models.User`. The user associated with this profile.
:param account: ForeignKey to :class:`Account`. The related Smarter account.
:param is_test: Boolean. Indicates if this profile is for testing purposes.
.. important::
The combination of `user` and `account` must be unique. Duplicate profiles for the same user and account are not allowed.
**Example usage**::
from smarter.apps.account.models import UserProfile
profile = UserProfile.objects.create(user=user, account=account)
profile.add_to_account_contacts(is_primary=True)
"""
# pylint: disable=missing-class-docstring
class Meta:
unique_together = (
"user",
"account",
)
# Add more fields here as needed
user = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name="user_profile",
)
account = models.ForeignKey(Account, on_delete=models.CASCADE, related_name="user_profiles")
profile_image_url = models.URLField(
blank=True, null=True, help_text="URL to the user's profile image, provided via oauth."
)
is_test = models.BooleanField(
default=False, help_text="Indicates if this profile is used for unit testing purposes."
)
@property
def cached_user(self) -> Optional[User]:
"""
Retrieve the associated User instance with caching.
This significantly reduces the number of database queries when accessing
the user from the user profile.
:returns: Optional[User]
The associated User instance, or None if not found.
**Example usage**::
user = profile.cached_user
if user:
print(user.email)
"""
return self.user
@property
def cached_account(self) -> Optional[Account]:
"""
Retrieve the associated Account instance with caching.
This significantly reduces the number of database queries
when accessing the account from the user profile.
:returns: Optional[Account]
The associated Account instance, or None if not found.
**Example usage**::
account = user_profile.cached_account
if account:
print(account.company_name)
"""
return self.account
[docs]
def save(self, *args, **kwargs):
"""
Save the UserProfile instance and ensure account contacts are updated.
This method validates that both `user` and `account` are set, saves the profile, and, if newly created,
adds the user to the account's contact list. It also emits a signal for new user creation.
:param args: Positional arguments passed to the parent save method.
:param kwargs: Keyword arguments passed to the parent save method.
.. note::
On first save, ensures at least one primary contact exists for the account.
**Example usage**::
profile.save()
"""
is_new = self.pk is None
if self.user is None or self.account is None:
raise SmarterValueError("User and Account cannot be null")
super().save(*args, **kwargs)
if is_new:
# ensure that at least one person is on the account contact list
is_primary = AccountContact.objects.filter(account=self.account, is_primary=True).count() == 0
self.add_to_account_contacts(is_primary=is_primary)
logger.debug(
"%s.save() New user profile created for %s %s. Sending signal.",
formatted_text(__name__ + ".UserProfile()"),
self.account.company_name,
self.user.email,
)
new_user_created.send(sender=self.__class__, user_profile=self)
[docs]
@classmethod
def admin_for_account(cls, account: Account) -> User:
"""
Return the designated user for the given account.
This method finds the first staff user associated with the account. If no staff user exists, it returns the first available user.
If the account has no users, an admin user is created and returned.
:param account: Instance of :class:`Account`. The account for which to find the designated user.
:returns: :class:`django.contrib.auth.models.User`
The designated user for the account.
.. attention::
If no staff or regular users exist for the account, an admin user is automatically created. You must set the password manually.
.. error::
Logs an error if no admin or user is found for the account.
**Example usage**::
user = UserProfile.admin_for_account(account)
.. seealso::
:class:`UserProfile`
"""
@cache_results(cls.cache_expiration)
def _get_admin_for_account(account_id: int, class_name: str) -> Optional[User]:
admins = cls.objects.filter(account_id=account_id, user__is_staff=True).order_by("user__id")
if admins.exists():
return admins.first().user # type: ignore[return-value]
logger.error(
"%s.admin_for_account() No admin found for account %s",
formatted_text(__name__ + ".UserProfile()"),
account,
)
users = cls.objects.filter(account_id=account_id).order_by("user__id")
if users.exists():
user = users.first().user # type: ignore[return-value]
return user
logger.error(
"%s.admin_for_account() No user for account %s", formatted_text(__name__ + ".UserProfile()"), account
)
admin_user = cls.objects.get_or_create(username=SMARTER_ADMIN_USERNAME)
user_profile = cls.objects.create(user=admin_user, account=account)
logger.warning(
"%s.admin_for_account() Created admin user for account %s. Use manage.py to set the password",
formatted_text(__name__ + ".UserProfile()"),
account,
)
return user_profile.user
return _get_admin_for_account(account_id=account.id, class_name=UserProfile.__name__) # type: ignore[return-value]
[docs]
@classmethod
def get_cached_object(
cls,
invalidate: Optional[bool] = False,
pk: Optional[int] = None,
name: Optional[str] = None,
user: Optional[User] = None,
username: Optional[str] = None,
account: Optional[Account] = None,
) -> "UserProfile":
"""
Retrieve a model instance by primary key or name, using caching to
optimize performance. This method is selectively overridden in
models that inherit from MetaDataModel to provide class-specific
function parameters.
Example usage:
.. code-block:: python
# Retrieve by primary key
instance = MyModel.get_cached_object(pk=1)
# Retrieve by name
instance = MyModel.get_cached_object(name="exampleName")
:param pk: The primary key of the model instance to retrieve.
:param name: The name of the model instance to retrieve.
:returns: The model instance if found, otherwise None.
:rtype: Optional["UserProfile"]
"""
logger_prefix = formatted_text(__name__ + ".UserProfile.get_cached_object()")
logger.debug(
"%s called with pk: %s, name: %s, user: %s, username: %s, account: %s, invalidate: %s",
logger_prefix,
pk,
name,
user,
username,
account,
invalidate,
)
@cache_results(cls.cache_expiration)
def _get_object_by_user_and_account(user: User, account: Account, class_name: str) -> "UserProfile":
try:
retval = (
UserProfile.objects.prefetch_related("tags")
.select_related("user", "account")
.get(user=user, account=account)
)
logger.debug(
"%s._get_object_by_user_and_account() fetched %s for user: %s and account: %s",
formatted_text(__name__ + ".UserProfile.get_cached_object()"),
cls.__name__,
user.email,
account,
)
_ = retval.user
_ = retval.account
return retval
except UserProfile.DoesNotExist as e:
logger.debug(
"%s._get_object_by_user_and_account() no %s found for user: %s, account: %s",
formatted_text(__name__ + ".UserProfile.get_cached_object()"),
UserProfile.__name__,
user.email,
account,
)
raise UserProfile.DoesNotExist(f"No UserProfile found for user {user} and account {account}") from e
@cache_results(cls.cache_expiration)
def _get_object_by_user(user: User, class_name: str) -> "UserProfile":
try:
retval = UserProfile.objects.prefetch_related("tags").select_related("user", "account").get(user=user)
logger.debug(
"%s._get_object_by_user() fetched %s for user: %s",
formatted_text(__name__ + ".UserProfile.get_cached_object()"),
cls.__name__,
user.email,
)
_ = retval.user
_ = retval.account
return retval
except UserProfile.DoesNotExist as e:
logger.debug(
"%s._get_object_by_user() no %s found for user: %s",
formatted_text(__name__ + ".UserProfile.get_cached_object()"),
UserProfile.__name__,
user.email,
)
raise UserProfile.DoesNotExist(f"No UserProfile found for user {user} and account {account}") from e
except UserProfile.MultipleObjectsReturned as e:
logger.error(
"%s.get_cached_object() Multiple UserProfiles found for user %s. Defaulting to first result.",
formatted_text(__name__ + ".UserProfile.get_cached_object()"),
user.email,
)
retval = (
UserProfile.objects.prefetch_related("tags")
.select_related("user", "account")
.filter(user=user)
.first()
)
if not retval:
raise UserProfile.DoesNotExist(
f"No UserProfile found for user {user} and account {account} after MultipleObjectsReturned exception."
) from e
return retval
@cache_results(cls.cache_expiration)
def _get_object_by_account(account: Account, class_name: str) -> Optional["UserProfile"]:
try:
user = UserProfile.admin_for_account(account)
retval = (
UserProfile.objects.prefetch_related("tags")
.select_related("user", "account")
.get(account=account, user=user)
)
logger.debug(
"%s._get_object_by_account() fetched %s for account admin %s",
formatted_text(__name__ + ".UserProfile.get_cached_object()"),
UserProfile.__name__,
retval,
)
_ = retval.user
_ = retval.account
return retval
except UserProfile.DoesNotExist:
logger.debug(
"%s._get_object_by_account() no %s found for account admin %s",
formatted_text(__name__ + ".UserProfile.get_cached_object()"),
UserProfile.__name__,
user,
)
return None
except UserProfile.MultipleObjectsReturned:
logger.error(
"%s.get_cached_object() Multiple UserProfiles found for account %s. Defaulting to first result.",
formatted_text(__name__ + ".UserProfile.get_cached_object()"),
account,
)
return (
UserProfile.objects.prefetch_related("tags")
.select_related("user", "account")
.filter(account=account)
.first()
)
if username and not user:
try:
user = User.objects.get(username=username)
logger.debug(
"%s.get_cached_object() fetched user by username: %s",
formatted_text(__name__ + ".UserProfile.get_cached_object()"),
username,
)
except User.DoesNotExist as e:
logger.error(
"%s.get_cached_object() No user found with username %s.",
formatted_text(__name__ + ".UserProfile.get_cached_object()"),
username,
)
raise User.DoesNotExist(f"No user found with username {username}") from e
if invalidate:
_get_object_by_user_and_account.invalidate(user=user, account=account, class_name=UserProfile.__name__)
_get_object_by_user.invalidate(user=user, class_name=UserProfile.__name__)
_get_object_by_account.invalidate(account=account, class_name=UserProfile.__name__)
if user or account:
if user and account:
return _get_object_by_user_and_account(user, account, UserProfile.__name__)
if user:
return _get_object_by_user(user=user, class_name=UserProfile.__name__)
if account:
return _get_object_by_account(account=account, class_name=UserProfile.__name__)
return super().get_cached_object(invalidate=invalidate, pk=pk, name=name) # type: ignore[return-value]
def __str__(self):
try:
user_identifier = (
self.user.email if self.user and self.user.email else (self.user.username if self.user else "NoUser")
)
company_name = self.account.company_name if self.account else "NoAccount"
except User.DoesNotExist:
user_identifier = "NoUser"
except Account.DoesNotExist:
company_name = "NoAccount"
return f"{company_name}-{user_identifier}"
def __repr__(self):
return self.__str__()
class MetaDataWithOwnershipModel(MetaDataModel):
"""
Abstract Django ORM base model that adds Account and
User ownership to a SAM Metadata model.
This model extends `MetaDataModel` to include a foreign key
relationship to the `UserProfile` model, establishing ownership of resources
by a specific user profile. It also enforces uniqueness constraints on
the combination of `user_profile` and `name` fields,
:param user_profile: ForeignKey to :class:`UserProfile`. The user profile that owns this resource.
.. note::
This is an abstract base class and should not be instantiated directly.
"""
# pylint: disable=missing-class-docstring
class Meta:
abstract = True
unique_together = (
"user_profile",
"name",
)
user_profile = models.ForeignKey(UserProfile, on_delete=models.CASCADE, related_name="%(class)ss")
# pylint: disable=W0221
@classmethod
def get_cached_object(
cls,
invalidate: Optional[bool] = False,
pk: Optional[int] = None,
name: Optional[str] = None,
user: Optional[User] = None,
user_profile: Optional[UserProfile] = None,
username: Optional[str] = None,
account: Optional[Account] = None,
) -> Optional[models.Model]:
"""
Retrieve a model instance using caching to optimize performance.
Examples of retrieval patterns:
.. code-block:: python
# By primary key
instance = MyModel.get_cached_object(pk=123)
# By name and user profile
instance = MyModel.get_cached_object(name="Resource Name", user_profile=user_profile)
# By name and account
instance = MyModel.get_cached_object(name="Resource Name", account=account)
:param pk: The primary key of the model instance to retrieve.
:param name: The name of the model instance to retrieve.
:param user: The user associated with the model instance.
:param user_profile: The user profile associated with the model instance.
:param account: The account associated with the model instance.
:param invalidate: Whether to invalidate the cache for this retrieval.
:returns: The model instance if found, otherwise None.
:rtype: Optional[models.Model]
"""
logger_prefix = formatted_text(cls.__name__ + ".get_cached_object()")
logger.debug(
"%s called with pk: %s, name: %s, user: %s, user_profile: %s, username: %s, account: %s",
logger_prefix,
pk,
name,
user,
user_profile,
username,
account,
)
if username and not user and not user_profile:
logger.debug("%s Resolving user_profile from username: %s", logger_prefix, username)
user_profile = UserProfile.get_cached_object(invalidate=invalidate, username=username)
user = user_profile.cached_user if user_profile else None
if user_profile is not None and (not user or not account):
logger.debug("%s Resolving user and account from user_profile: %s", logger_prefix, user_profile)
user = user or user_profile.cached_user
account = account or user_profile.cached_account
@cache_results(cls.cache_expiration)
def _get_object_by_pk(pk: int, class_name: str = cls.__name__) -> Optional["MetaDataWithOwnershipModel"]:
"""
Internal method to retrieve a model instance by primary key with caching.
Prefetches related tags and selects related user profile, account, and
user for optimal access. Handles most common SAM pk retrieval scenarios.
:param pk: The primary key of the model instance to retrieve.
:param class_name: The name of the class for logging purposes.
:class_name: The name of the class for cache key purposes.
:returns: The model instance if found, otherwise None.
:rtype: Optional["MetaDataWithOwnershipModel"]
"""
try:
retval = (
cls.objects.prefetch_related("tags")
.select_related("user_profile", "user_profile__account", "user_profile__user")
.get(pk=pk)
)
logger.debug(
"%s._get_object_by_pk() fetched %s - %s",
formatted_text(MetaDataWithOwnershipModel.__name__ + ".get_cached_object()"),
type(retval).__name__,
str(retval),
)
return retval
except cls.DoesNotExist:
logger.debug(
"%s._get_object_by_pk() no %s object found for pk: %s",
formatted_text(MetaDataWithOwnershipModel.__name__ + ".get_cached_object()"),
cls.__name__,
pk,
)
return None
@cache_results(cls.cache_expiration)
def _get_object_by_name_and_user_profile(
name: str, user_profile: UserProfile, class_name: str = cls.__name__
) -> Optional["MetaDataWithOwnershipModel"]:
"""
Internal method to retrieve a model instance by name and user
profile with caching. Prefetches related tags and selects
related user profile, account, and user for optimal access.
Handles common SAM retrieval patterns for name/user.
:param name: The name of the model instance to retrieve.
:param user_profile: The user profile associated with the model instance.
:param class_name: The name of the class for cache key purposes.
:returns: The model instance if found, otherwise None.
:rtype: Optional["MetaDataWithOwnershipModel"]
"""
try:
retval = (
cls.objects.prefetch_related("tags")
.select_related("user_profile", "user_profile__account", "user_profile__user")
.get(name=name, user_profile=user_profile)
)
logger.debug(
"%s._get_object_by_name_and_user_profile() fetched %s for name: %s and user_profile: %s",
formatted_text(MetaDataWithOwnershipModel.__name__ + ".get_cached_object()"),
type(retval).__class__.__name__,
name,
user_profile,
)
return retval
except cls.DoesNotExist:
logger.debug(
"%s._get_object_by_name_and_user_profile() no %s found for name: %s and user_profile: %s",
formatted_text(MetaDataWithOwnershipModel.__name__ + ".get_cached_object()"),
cls.__name__,
name,
user_profile,
)
return None
except cls.MultipleObjectsReturned:
logger.error(
"%s.get_cached_object() Multiple %s objects found with name '%s' and user profile '%s'. Defaulting to first result.",
formatted_text(MetaDataWithOwnershipModel.__name__ + ".get_cached_object()"),
cls.__name__,
name,
user_profile,
)
return cls.objects.prefetch_related("tags").filter(name=name, user_profile=user_profile).first()
@cache_results(cls.cache_expiration)
def _get_object_by_name_and_account(
name: str, account: Account, class_name: str = cls.__name__
) -> Optional["MetaDataWithOwnershipModel"]:
"""
Internal method to retrieve a model instance by name and account with
caching. Prefetches related tags and selects related user profile,
account, and user for optimal access. Handles common SAM retrieval
patterns for name/account.
:param name: The name of the model instance to retrieve.
:param account: The account associated with the model instance.
:param class_name: The name of the class for cache key purposes.
:returns: The model instance if found, otherwise None.
:rtype: Optional["MetaDataWithOwnershipModel"]
"""
try:
retval = (
cls.objects.prefetch_related("tags")
.select_related("user_profile", "user_profile__account", "user_profile__user")
.get(name=name, user_profile__account=account)
)
logger.debug(
"%s._get_object_by_name_and_account() fetched %s for name: %s and account: %s",
formatted_text(MetaDataWithOwnershipModel.__name__ + ".get_cached_object()"),
type(retval).__class__.__name__,
name,
account,
)
return retval
except cls.DoesNotExist:
logger.debug(
"%s._get_object_by_name_and_account() no %s found for name: %s and account: %s",
formatted_text(MetaDataWithOwnershipModel.__name__ + ".get_cached_object()"),
cls.__name__,
name,
account,
)
return None
except cls.MultipleObjectsReturned:
logger.error(
"%s.get_cached_object() Multiple %s objects found with name '%s' and account '%s'. Defaulting to first result.",
formatted_text(MetaDataWithOwnershipModel.__name__ + ".get_cached_object()"),
cls.__name__,
name,
account,
)
return cls.objects.prefetch_related("tags").filter(name=name, user_profile__account=account).first()
if invalidate:
_get_object_by_pk.invalidate(pk=pk, class_name=cls.__name__)
_get_object_by_name_and_user_profile.invalidate(
name=name, user_profile=user_profile, class_name=cls.__name__
)
_get_object_by_name_and_account.invalidate(name=name, account=account, class_name=cls.__name__)
if pk:
return _get_object_by_pk(pk=pk, class_name=cls.__name__)
try:
user_profile = user_profile or UserProfile.get_cached_object(user=user, account=account)
except UserProfile.DoesNotExist:
user_profile = None
except UserProfile.MultipleObjectsReturned:
logger.error(
"%s.get_cached_object() Multiple UserProfiles found for user %s and account %s. Defaulting to first result.",
formatted_text(cls.__name__ + ".get_cached_object()"),
user,
account,
)
user_profile = (
UserProfile.objects.select_related("user_profile", "user_profile__account", "user_profile__user")
.prefetch_related("tags")
.filter(user=user, account=account)
.first()
)
if user_profile:
# call this regardless of whether name is provided.
return _get_object_by_name_and_user_profile(name=name, user_profile=user_profile, class_name=cls.__name__)
elif account:
return _get_object_by_name_and_account(name=name, account=account, class_name=cls.__name__)
# no ownership info provided, so fall back to the super().
return super().get_cached_object(invalidate=invalidate, pk=pk, name=name) # type: ignore[return-value]
@classmethod
def get_cached_objects(
cls, invalidate: Optional[bool] = False, user_profile: Optional[UserProfile] = None
) -> models.QuerySet["MetaDataWithOwnershipModel"]:
"""
Retrieve a list of MetaDataWithOwnershipModel instances associated with a user profile using caching.
Example usage:
.. code-block:: python
# Retrieve MetaDataWithOwnershipModel instances for a user profile with caching
models = MetaDataWithOwnershipModel.get_cached_objects(my_user_profile, invalidate=invalidate)
:param invalidate: Whether to invalidate the cache for this retrieval.
:type invalidate: bool, optional
:param user_profile: The user profile for which to retrieve MetaDataWithOwnershipModel instances.
:type user_profile: UserProfile, optional
:returns: A queryset of MetaDataWithOwnershipModel instances associated with the user profile.
:rtype: models.QuerySet["MetaDataWithOwnershipModel"]
"""
logger_prefix = formatted_text(__name__ + f".{MetaDataWithOwnershipModel.__name__}.get_cached_objects()")
logger.debug(
"%s called for %s with user_profile: %s invalidate: %s",
logger_prefix,
cls.__name__,
user_profile,
invalidate,
)
@cache_results(cls.cache_expiration)
def _get_objects_for_user_profile_id(
user_profile_id: int, class_name: str = cls.__name__
) -> models.QuerySet["MetaDataWithOwnershipModel"]:
"""
Internal method to retrieve MetaDataWithOwnershipModel instances for
a given user profile ID with caching.
:param user_profile_id: The ID of the user profile for which to retrieve MetaDataWithOwnershipModel instances.
:param class_name: The name of the class for cache key purposes.
:returns: A queryset of MetaDataWithOwnershipModel instances associated with the user profile ID.
:rtype: models.QuerySet["MetaDataWithOwnershipModel"]
"""
return (
cls.objects.prefetch_related("tags")
.select_related("user_profile", "user_profile__account", "user_profile__user")
.filter(user_profile_id=user_profile_id)
)
if invalidate and user_profile:
_get_objects_for_user_profile_id.invalidate(user_profile_id=user_profile.id, class_name=cls.__name__) # type: ignore
if user_profile:
return _get_objects_for_user_profile_id(user_profile_id=user_profile.id, class_name=cls.__name__) # type: ignore
return super().get_cached_objects(invalidate=invalidate) # type: ignore[return-value]
class PaymentMethod(TimestampedModel):
"""
Payment method model.
.. attention::
This model is not in use.
"""
account = models.ForeignKey(Account, on_delete=models.CASCADE, related_name="payment_methods")
name = models.CharField(max_length=255)
stripe_id = models.CharField(max_length=255)
card_type = models.CharField(max_length=255)
card_last_4 = models.CharField(max_length=4)
card_exp_month = models.CharField(max_length=2)
card_exp_year = models.CharField(max_length=4)
is_default = models.BooleanField(default=False)
def __str__(self):
return str(self.card_type) + " " + str(self.card_last_4)
class LLMPrices(TimestampedModel):
"""
LLMPrices model for managing markup factors in account billing.
Stores provider/model-specific price markups, enabling proportional billing across all accounts based on their usage.
:param charge_type: String. The type of charge (e.g., completion, plugin, tool).
:param provider: String. The LLM provider (e.g., OpenAI, Meta AI).
:param model: String. The model name.
:param price: Decimal. The markup price to apply.
.. note::
Markup factors are used to calculate each account's share of provider costs.
**Example usage**::
# Calculate account charge for provider/model usage
markup = LLMPrices.objects.get(provider="openai", model="gpt-4").price
account_charge = provider_cost * markup * account_usage_ratio
:TODO: create a Choice or FK to charge_type field.
:TODO: create some form of referential integrity for model and provider fields.
:TODO: establish reasonable boundaries on price field.
.. seealso::
:class:`Account`, :class:`Charge`
"""
charge_type = models.CharField(max_length=20)
provider = models.CharField(max_length=255)
model = models.CharField(max_length=255)
price = models.DecimalField(max_digits=10, decimal_places=6)
class Meta:
unique_together = ("charge_type", "provider", "model")
def __str__(self):
return f"{self.charge_type} - {self.provider} - {self.model} - {self.price}"
class Charge(TimestampedModel):
"""
Charge model for tracking periodic account billing events.
Represents a single billing event for an account and user, including provider, charge type, token usage, and reference details.
:param account: ForeignKey to :class:`Account`. The account being billed.
:param user: ForeignKey to :class:`django.contrib.auth.models.User`. The user associated with the charge.
:param session_key: String. Optional session identifier for the charge.
:param provider: String. The LLM provider (e.g., OpenAI).
:param charge_type: String. The type of charge (e.g., completion, plugin, tool).
:param prompt_tokens: Integer. Number of prompt tokens used.
:param completion_tokens: Integer. Number of completion tokens used.
:param total_tokens: Integer. Total tokens used.
:param model: String. The model name.
:param reference: String. Reference identifier for the charge.
.. note::
A signal is emitted when a new charge is created, enabling downstream billing and analytics workflows.
**Example usage**::
charge = Charge.objects.create(
account=account,
user=user,
provider="openai",
charge_type="completion",
prompt_tokens=100,
completion_tokens=200,
total_tokens=300,
model="gpt-4",
reference="invoice-123"
)
.. seealso::
:class:`Account`, :class:`LLMPrices`
"""
account = models.ForeignKey(Account, on_delete=models.CASCADE, related_name="charge", null=False, blank=False)
user = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name="charge", null=False, blank=False
)
session_key = models.CharField(max_length=255, null=True, blank=True)
provider = models.CharField(
max_length=255,
choices=PROVIDERS,
default=PROVIDER_OPENAI,
)
charge_type = models.CharField(
max_length=20,
choices=CHARGE_TYPES,
default=CHARGE_TYPE_PROMPT_COMPLETION,
)
prompt_tokens = models.IntegerField()
completion_tokens = models.IntegerField()
total_tokens = models.IntegerField()
model = models.CharField(max_length=255)
reference = models.CharField(max_length=255)
def save(self, *args, **kwargs):
is_new = self.pk is None
super().save(*args, **kwargs)
if is_new:
logger.debug(
"%s.save() New user charge created for %s %s. Sending signal.",
formatted_text(__name__ + ".Charge()"),
self.account.company_name,
self.user.email,
)
new_charge_created.send(sender=self.__class__, charge=self)
def __str__(self):
return f"""{self.account.account_number} - {self.user.email} - {self.provider} - {self.charge_type} - {self.total_tokens}"""
class DailyBillingRecord(TimestampedModel):
"""
DailyBillingRecord model for aggregating daily account charges.
Tracks daily usage and billing data for each account, user, provider, and charge type, enabling efficient reporting and analytics.
:param account: ForeignKey to :class:`Account`. The account being billed.
:param user: ForeignKey to :class:`django.contrib.auth.models.User`. The user associated with the record.
:param provider: String. The LLM provider (e.g., OpenAI).
:param date: Date. The billing date for the record.
:param charge_type: String. The type of charge (e.g., completion, plugin, tool).
:param prompt_tokens: Integer. Number of prompt tokens used.
:param completion_tokens: Integer. Number of completion tokens used.
:param total_tokens: Integer. Total tokens used.
**Example usage**::
record = DailyBillingRecord.objects.create(
account=account,
user=user,
provider="openai",
date=date.today(),
charge_type="completion",
prompt_tokens=100,
completion_tokens=200,
total_tokens=300
)
.. seealso::
:class:`Charge`, :class:`Account`
"""
account = models.ForeignKey(Account, on_delete=models.CASCADE, related_name="daily_billing_records")
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name="daily_billing_records")
provider = models.CharField(
max_length=255,
choices=PROVIDERS,
)
date = models.DateField()
charge_type = models.CharField(
max_length=20,
choices=CHARGE_TYPES,
)
prompt_tokens = models.IntegerField()
completion_tokens = models.IntegerField()
total_tokens = models.IntegerField()
class Meta:
unique_together = ("account", "user", "provider", "date")
def __str__(self):
return (
f"{self.account.account_number} - {self.user.email} - {self.provider} - {self.date} - {self.total_tokens}"
)
class Secret(MetaDataWithOwnershipModel):
"""
Secret model for securely storing and managing sensitive account-level information.
Usage::
# Encrypt a secret value before saving it
secret_value = Secret.encrypt("my-sensitive-api-key")
# Create a new secret
secret = Secret(
name="API Key",
user_profile=user_profile_instance,
encrypted_value=secret_value
)
secret.save()
# Retrieve and decrypt a secret
retrieved_secret = Secret.objects.get(id=secret.id)
decrypted_value = retrieved_secret.get_secret()
.. note::
The `value` field is transient and only used during runtime. It is not stored in the database
to ensure sensitive data is only saved in encrypted form.
"""
class Meta:
verbose_name = "Secret"
verbose_name_plural = "Secrets"
unique_together = ("user_profile", "name")
last_accessed = models.DateTimeField(
blank=True, editable=False, null=True, help_text="Timestamp of the last time the secret was accessed."
)
expires_at = models.DateTimeField(
blank=True,
null=True,
help_text="Timestamp indicating when the secret expires. If null, the secret does not expire.",
)
user_profile = models.ForeignKey(
UserProfile,
on_delete=models.CASCADE,
related_name="secrets",
help_text="Reference to the UserProfile associated with this secret.",
)
encrypted_value = models.BinaryField(help_text="Read-only encrypted representation of the secret's value.")
def save(self, *args, **kwargs):
"""
Encrypt and persist the secret value for this instance.
This method encrypts the transient `value` field and stores the result in `encrypted_value`.
It validates that both `name` and `encrypted_value` are present and that the value is a string.
:param args: Positional arguments passed to the parent save method.
:param kwargs: Keyword arguments passed to the parent save method.
:raises: :class:`SmarterValueError` if `name` or `encrypted_value` is missing.
.. important::
Only the encrypted value is stored in the database; the plaintext value is never persisted.
.. note::
Emits a signal on creation or edit for audit and notification purposes.
**Example usage**::
secret = Secret(
name="apiKey",
user_profile=user_profile,
encrypted_value=Secret.encrypt("my-api-key")
)
secret.save()
"""
is_new = self.pk is None
if not self.name or not self.encrypted_value:
raise SmarterValueError(
f"Name and encrypted_value are required fields. Got name: {self.name}, encrypted_value: {self.encrypted_value}"
)
self.user_profile = self.user_profile
super().save(*args, **kwargs)
if is_new:
secret_created.send(sender=self.__class__, secret=self)
else:
secret_edited.send(sender=self.__class__, secret=self)
def get_secret(self, update_last_accessed=True) -> Optional[str]:
"""
Decrypt and return the original secret value.
Optionally updates the `last_accessed` timestamp and emits an access signal. If decryption fails, raises a :class:`SmarterValueError`.
:param update_last_accessed: Boolean. If True, updates the `last_accessed` timestamp. Defaults to True.
:returns: Optional[str]
The decrypted secret value, or None if not set.
:raises: :class:`SmarterValueError` if decryption fails.
.. note::
Accessing the secret updates its last accessed time for audit purposes.
**Example usage**::
secret_value = secret.get_secret(update_last_accessed=True)
"""
try:
if update_last_accessed:
self.last_accessed = timezone.now()
self.save(update_fields=["last_accessed"])
secret_accessed.send(sender=self.__class__, secret=self, user_profile=self.user_profile)
fernet = self.get_fernet()
if self.encrypted_value:
return fernet.decrypt(self.encrypted_value).decode()
return None
except Exception as e:
raise SmarterValueError(f"Failed to decrypt the secret: {str(e)}") from e
def is_expired(self) -> bool:
"""
Determine whether the secret has expired based on its `expires_at` timestamp.
:returns: bool
True if the current time is past the expiration timestamp; False otherwise.
.. note::
If `expires_at` is not set, the secret is considered non-expiring.
**Example usage**::
if secret.is_expired():
print("This secret is no longer valid.")
"""
if not self.expires_at:
return False
expiration = timezone.make_aware(self.expires_at) if timezone.is_naive(self.expires_at) else self.expires_at
return timezone.now() > expiration
def has_permissions(self, request: "WSGIRequest") -> bool:
"""
Check if the authenticated user in the given request has permission to manage this secret.
:param request: :class:`django.core.handlers.wsgi.WSGIRequest`
The HTTP request containing the user to check.
:returns: bool
True if the user is authenticated and is either staff or superuser; False otherwise.
.. attention::
Only users with staff or superuser status are permitted to manage secrets.
.. warning::
If the request does not contain a valid user, or the user lacks required privileges, permission is denied.
**Example usage**::
if secret.has_permissions(request):
# Allow secret management
pass
.. seealso::
:meth:`get_resolved_user` -- Resolves the user from the request.
"""
if not hasattr(request, "user"):
return False
user = get_resolved_user(request.user)
if not isinstance(user, User):
return False
if not hasattr(user, "is_authenticated") or not user.is_authenticated:
return False
if not hasattr(user, "is_staff") or not hasattr(user, "is_superuser"):
return False
return user.is_staff or user.is_superuser
def __str__(self):
return str(self.name) or "no name" + " - " + str(self.user_profile) or "no user profile"
@classmethod
def encrypt(cls, value: str) -> bytes:
"""
Encrypt a string value using Fernet symmetric encryption.
:param value: str
The plaintext string to encrypt.
:returns: bytes
The encrypted value as bytes.
:raises: :class:`SmarterValueError`
If the input value is not a non-empty string.
.. attention::
The original plaintext value is not stored or persisted; only the encrypted bytes are returned.
.. caution::
Always clear or avoid storing the plaintext value after encryption to prevent accidental exposure.
**Example usage**::
encrypted = Secret.encrypt("my-api-key")
# Store `encrypted` in the database, never the plaintext
.. seealso::
:meth:`get_fernet` -- Returns the Fernet encryption object.
"""
if not value or not isinstance(value, str):
raise SmarterValueError("Value must be a non-empty string")
fernet = cls.get_fernet()
retval = fernet.encrypt(value.encode())
return retval
@classmethod
def get_fernet(cls) -> Fernet:
"""
Return a Fernet encryption object for secure value encryption and decryption.
:returns: :class:`cryptography.fernet.Fernet`
A Fernet instance initialized with the configured encryption key.
:raises: :class:`SmarterConfigurationError`
If the encryption key is missing from settings.
.. important::
The encryption key must be set in ``smarter.common.conf.settings.fernet_encryption_key``.
Without a valid key, secrets cannot be encrypted or decrypted.
**Example usage**::
fernet = Secret.get_fernet()
encrypted = fernet.encrypt(b"my-value")
decrypted = fernet.decrypt(encrypted)
.. seealso::
:meth:`encrypt` -- Uses the Fernet object to encrypt values.
"""
encryption_key = smarter_settings.fernet_encryption_key.get_secret_value()
if not encryption_key:
raise SmarterConfigurationError(
"Encryption key not found in settings. Please set smarter.common.conf.settings.fernet_encryption_key"
)
fernet = Fernet(encryption_key)
return fernet
# pylint: disable=W0221
@classmethod
def get_cached_object(
cls,
invalidate: Optional[bool] = False,
pk: Optional[int] = None,
name: Optional[str] = None,
user: Optional[User] = None,
user_profile: Optional[UserProfile] = None,
account: Optional[Account] = None,
) -> Optional["Secret"]:
"""
Retrieve a model instance using caching to optimize performance.
Examples of retrieval patterns:
.. code-block:: python
# By primary key
instance = MyModel.get_cached_object(pk=123)
# By name and user profile
instance = MyModel.get_cached_object(name="Resource Name", user_profile=user_profile)
# By name and account
instance = MyModel.get_cached_object(name="Resource Name", account=account)
:param pk: The primary key of the model instance to retrieve.
:param name: The name of the model instance to retrieve.
:param user: The user associated with the model instance.
:param user_profile: The user profile associated with the model instance.
:param account: The account associated with the model instance.
:returns: The model instance if found, otherwise None.
:rtype: Optional[Secret]
"""
logger_prefix = formatted_text(__name__ + "." + Secret.__name__ + ".get_cached_object()")
logger.debug(
"%s called with pk: %s, name: %s, user: %s, user_profile: %s, account: %s, invalidate: %s",
logger_prefix,
pk,
name,
user,
user_profile,
account,
invalidate,
)
retval = super().get_cached_object(
invalidate=invalidate, pk=pk, name=name, user=user, user_profile=user_profile, account=account
)
if isinstance(retval, Secret):
return retval
logger.debug(
"%s super().get_cached_object() did not return a Secret instance. Got: %s. Returning None.",
logger_prefix,
type(retval),
)
return None