Source code for smarter.lib.drf.models

"""DRF knox authtoken model and manager."""

import uuid
from datetime import datetime, timedelta
from logging import getLogger
from typing import Optional

from django.db import models
from django.urls import reverse
from django.utils import timezone
from knox import crypto
from knox.models import AuthToken
from knox.settings import CONSTANTS

from smarter.apps.account.models import (
    MetaDataWithOwnershipModel,
    MetaDataWithOwnershipModelManager,
    User,
    UserProfile,
)
from smarter.common.exceptions import SmarterBusinessRuleViolation
from smarter.common.helpers.console_helpers import formatted_text
from smarter.lib.cache import cache_results

logger = getLogger(__name__)

###############################################################################
# API Key Management
###############################################################################


[docs] class SmarterAuthTokenManager(MetaDataWithOwnershipModelManager): """ API Key manager. This is a custom manager derived from a combination of Knox's AuthTokenManager and and Smarter's SmarterQuerySetWithPermissions Queryset to provide both knox token management functionality as well as Smarter's permission-based querying behavior. """
[docs] def create( self, user: User, expiry=None, prefix=None, name: Optional[str] = None, description: Optional[str] = None, is_active: bool = True, **kwargs, ) -> tuple["SmarterAuthToken", str]: prefix = prefix or "" token = prefix + crypto.create_token_string() token_key = token[: CONSTANTS.TOKEN_KEY_LENGTH] digest = crypto.hash_token(token) if expiry is not None: expiry = timezone.now() + expiry auth_token = self.model( token_key=token_key, digest=digest, user=user, expiry=expiry, name=name, description=description, is_active=is_active, **kwargs, ) logger.info( "%s Creating API Key for user %s with token %s and expiry %s", formatted_text("lib.drf.models.SmarterAuthTokenManager.create()"), user, token_key, expiry, ) auth_token.save() return auth_token, token
[docs] class SmarterAuthToken(AuthToken, MetaDataWithOwnershipModel): """ Represents a Smarter API Key used for authenticating and authorizing access to the Smarter platform. This model extends Knox's `AuthToken` and includes additional metadata and management features for API keys, such as naming, description, activation status, and usage tracking. **Parameters:** key_id (UUIDField): Unique identifier for the API key. name (str): Human-readable name for the API key. description (str, optional): Optional description of the API key's purpose. last_used_at (datetime, optional): Timestamp of the last usage of the API key. is_active (bool): Indicates whether the API key is currently active. **Usage Example:** .. code-block:: python # Creating an API key for a staff user user = User.objects.get(username="admin") token, key = SmarterAuthToken.objects.create( user=user, name="Production Key", description="Key for production API access" ) # Activating or deactivating the key token.activate() token.deactivate() # Toggling active status token.toggle_active() # Tracking usage token.accessed() .. note:: - API keys can only be created for staff users. Attempting to create a key for a non-staff user will raise a `SmarterBusinessRuleViolation`. - The `identifier` property returns a masked version of the key digest for display purposes. .. warning:: - Ensure that API keys are managed securely. Deactivated keys cannot be used for authentication. Related Models -------------- - ``User``: The owner of the API key. - ``MetaDataModel``: Provides created/modified timestamps and SAM metadata. """ objects = SmarterAuthTokenManager() # pylint: disable=C0115 class Meta: verbose_name = "API Key" verbose_name_plural = "API Keys" key_id = models.UUIDField(default=uuid.uuid4, editable=False, unique=True) last_used_at = models.DateTimeField(blank=True, null=True) is_active = models.BooleanField(default=True) tags = models.JSONField(default=list, blank=True) @property def identifier(self): self.mask_string(self.digest) @property def manifest_url(self) -> Optional[str]: """ Returns the URL to the plugin's manifest. This property constructs the URL to the plugin's manifest based on its kind and RFC 1034-compliant name. The URL follows the pattern: ``/plugins/{kind}/{name}/manifest/``, where ``{kind}`` is the RFC 1034-compliant kind of the plugin, and ``{name}`` is the RFC 1034-compliant name of the plugin. **Example:** .. code-block:: python self.rfc1034_compliant_kind # 'static' self.rfc1034_compliant_name # 'example-plugin self.manifest_url # '/plugins/static/example-plugin/manifest/' """ # pylint: disable=C0415 from smarter.lib.drf.urls import AuthTokenReverseNames return reverse( f"{AuthTokenReverseNames.namespace}:{AuthTokenReverseNames.detailview}", kwargs={"authtoken_id": self.id}, # type: ignore )
[docs] def save(self, *args, **kwargs): if not self.user.is_staff: raise SmarterBusinessRuleViolation("API Keys can only be created for staff users.") if self.created is None: self.created = timezone.now() super().save(*args, **kwargs)
[docs] def activate(self): """Activate the API key.""" self.is_active = True self.save()
[docs] def deactivate(self): """Deactivate the API key.""" self.is_active = False self.save()
[docs] def toggle_active(self): """Toggle the active status of the API key.""" self.is_active = not self.is_active self.save()
[docs] def accessed(self): """Update the last used time.""" if self.last_used_at is None or (datetime.now() - self.last_used_at) > timedelta(minutes=5): self.last_used_at = datetime.now() self.save()
[docs] @classmethod def get_cached_objects( cls, invalidate: Optional[bool] = False, user_profile: Optional[UserProfile] = None, user: Optional[User] = None, name: Optional[str] = None, **kwargs, ) -> models.QuerySet["SmarterAuthToken"]: """ Retrieve API keys with caching based on user profile and optional name filter using caching. :param invalidate: If True, invalidate the cache for this query. :type invalidate: bool, optional :param user_profile: The user profile for which to retrieve API keys. :type user_profile: UserProfile, optional :param user: The user for which to retrieve API keys (used if user_profile is not provided). :type user: User, optional :param name: Optional name filter to retrieve API keys with a specific name. :type name: str, optional :returns: A queryset of SmarterAuthToken objects matching the criteria. :rtype: QuerySet[SmarterAuthToken] """ logger_prefix = formatted_text(f"{__name__}.{cls.__name__}.get_cached_objects()") logger.debug( "%s called with user_profile=%s, user=%s, name=%s, invalidate=%s", logger_prefix, user_profile, user, name, invalidate, ) # pylint: disable=W0613 @cache_results(cls.cache_expiration) def _get_cached_objects_for_user_profile(user_profile_id: int) -> models.QuerySet["SmarterAuthToken"]: if not user_profile: return cls.objects.none() try: queryset = cls.objects.select_related( "user_profile", "user_profile__account", "user_profile__user" ).filter(user=user_profile.cached_user) logger.debug( "%s._get_cached_objects_for_user_profile() fetched and cached objects for user_profile_id: %s", logger_prefix, user_profile_id, ) return queryset # pylint: disable=broad-except except Exception as e: logger.error("Error retrieving cached objects: %s", e) try: queryset = cls.objects.select_related( "user_profile", "user_profile__account", "user_profile__user" ).filter(user=user_profile.cached_user) return queryset except Exception as e2: logger.error("Error retrieving objects without cache: %s", e2) queryset = cls.objects.filter(user=user_profile.cached_user) return queryset # pylint: disable=W0613 @cache_results(cls.cache_expiration) def _get_cached_objects_for_user_profile_and_name( user_profile_id: int, name: str ) -> models.QuerySet["SmarterAuthToken"]: """ Retrieve API keys for a specific user profile and name with caching. :param user_profile_id: The ID of the user profile for which to retrieve API keys. :type user_profile_id: int :param name: The name of the API key to retrieve. :type name: str :returns: A queryset of SmarterAuthToken objects matching the criteria. :rtype: QuerySet[SmarterAuthToken] """ if not user_profile: return cls.objects.none() try: queryset = cls.objects.select_related( "user_profile", "user_profile__account", "user_profile__user" ).filter(user=user_profile.cached_user, name=name) # pylint: disable=broad-except except Exception as e: logger.error("Error retrieving cached objects: %s", e) try: queryset = cls.objects.select_related( "user_profile", "user_profile__account", "user_profile__user" ).filter(user=user_profile.cached_user, name=name) except Exception as e2: logger.error("Error retrieving objects without cache: %s", e2) queryset = cls.objects.filter(user=user_profile.cached_user, name=name) logger.debug( "%s._get_cached_objects_for_user_profile_and_name() fetched and cached objects for user_profile_id: %s, name: %s", logger_prefix, user_profile_id, name, ) return queryset if invalidate: # Invalidate the cache for both functions _get_cached_objects_for_user_profile.invalidate(user_profile_id=user_profile.id if user_profile else None) # type: ignore _get_cached_objects_for_user_profile_and_name.invalidate( user_profile_id=user_profile.id if user_profile else None, name=name # type: ignore ) if not user_profile and user: user_profile = UserProfile.get_cached_object(user=user) if user_profile and name: return _get_cached_objects_for_user_profile_and_name(user_profile.id, name) # type: ignore elif user_profile: return _get_cached_objects_for_user_profile(user_profile.id) # type: ignore else: return super().get_cached_objects(user_profile=user_profile, invalidate=invalidate, taggit=False) # type: ignore
def __str__(self): return str(self.name) + " (" + str(self.user) + ") " + str(self.identifier)