Source code for smarter.lib.drf.manifest.brokers.auth_token

# pylint: disable=W0718
"""Smarter API SmarterAuthToken Manifest handler."""

import traceback
from typing import Any, Optional, Type

from django.core import serializers
from django.core.handlers.asgi import ASGIRequest
from pydantic_core import ValidationError as PydanticValidationError
from rest_framework.serializers import ModelSerializer

from smarter.apps.account.models import User
from smarter.common.utils.decorators import camel_case
from smarter.lib import logging
from smarter.lib.drf.manifest.enum import SAMSmarterAuthTokenSpecKeys
from smarter.lib.drf.manifest.models.auth_token.const import MANIFEST_KIND
from smarter.lib.drf.manifest.models.auth_token.metadata import (
    SAMSmarterAuthTokenMetadata,
)
from smarter.lib.drf.manifest.models.auth_token.model import SAMSmarterAuthToken
from smarter.lib.drf.manifest.models.auth_token.spec import (
    SAMSmarterAuthTokenSpec,
    SAMSmarterAuthTokenSpecConfig,
)
from smarter.lib.drf.manifest.models.auth_token.status import SAMSmarterAuthTokenStatus
from smarter.lib.drf.models import SmarterAuthToken
from smarter.lib.journal.enum import SmarterJournalCliCommands
from smarter.lib.journal.http import SmarterJournaledJsonResponse
from smarter.lib.manifest.broker import (
    AbstractBroker,
    SAMBrokerError,
    SAMBrokerErrorNotFound,
    SAMBrokerErrorNotImplemented,
    SAMBrokerErrorNotReady,
)
from smarter.lib.manifest.enum import (
    SAMKeys,
    SAMMetadataKeys,
    SCLIResponseGet,
    SCLIResponseGetData,
)

MAX_RESULTS = 1000
logger = logging.getLogger(__name__)


[docs] class SAMSmarterAuthTokenBrokerError(SAMBrokerError): """Base exception for Smarter API SmarterAuthToken Broker handling.""" @property def get_formatted_err_message(self): return "Smarter API SmarterAuthToken Manifest Broker Error"
[docs] class SmarterAuthTokenMiniSerializer(ModelSerializer): """API key serializer for smarter api that excludes SAM ownership information.""" # pylint: disable=missing-class-docstring class Meta: model = SmarterAuthToken fields = ["key_id", "name", "description", "is_active", "last_used_at", "created_at", "updated_at"]
[docs] class SAMSmarterAuthTokenBroker(AbstractBroker): """ Smarter API SmarterAuthToken Manifest Broker. This class is responsible for - loading, validating and parsing the Smarter Api yaml SmarterAuthToken manifests - using the manifest to initialize the corresponding Pydantic model This Broker class interacts with the collection of Django ORM models that represent the Smarter API SAMSmarterAuthToken manifests. The Broker class is responsible for creating, updating, deleting and querying the Django ORM models, as well as transforming the Django ORM models into Pydantic models for serialization and deserialization. """ # override the base abstract manifest model with the SAMSmarterAuthToken model _manifest: Optional[SAMSmarterAuthToken] _pydantic_model: Type[SAMSmarterAuthToken] = SAMSmarterAuthToken _smarter_auth_token: Optional[SmarterAuthToken] _token_key: Optional[str] _orm_instance: Optional[SmarterAuthToken]
[docs] def __init__(self, *args, **kwargs): """ Initialize the SAMSmarterAuthTokenBroker with the given arguments. The constructor initializes the parent class and sets up the manifest and user attributes. """ self._smarter_auth_token = None self._token_key = None self._created = False super().__init__(*args, **kwargs)
@property def formatted_class_name(self) -> str: """ Returns the formatted class name for logging purposes. This is used to provide a more readable class name in logs. """ class_name = f"{__name__}.{SAMSmarterAuthToken.__name__}()[{id(self)}]" return self.formatted_text(class_name) @property def smarter_auth_token(self) -> Optional[SmarterAuthToken]: """ The SmarterAuthToken object is a Django ORM model subclass from knox.AuthToken. that represents a SmarterAuthToken api key. The SmarterAuthToken object is used to store the authentication hash and Smarter metadata for the Smarter API. The SmarterAuthToken object is retrieved from the database, if it exists, or created from the manifest if it does not. """ if self._smarter_auth_token: return self._smarter_auth_token if not self.manifest: logger.debug( "%s.smarter_auth_token() Manifest not set. Cannot retrieve SmarterAuthToken.", self.formatted_class_name, ) return None username = self.manifest.spec.config.username try: logger.debug( "%s.smarter_auth_token() Retrieving SmarterAuthToken for user %s with name %s", self.formatted_class_name, username, self.name, ) self._smarter_auth_token = SmarterAuthToken.objects.get(user__username=username, name=self.name) except SmarterAuthToken.DoesNotExist: logger.debug( "%s.smarter_auth_token() SmarterAuthToken for user %s with name %s does not exist.", self.formatted_class_name, username, self.name, ) logger.debug( "%s.smarter_auth_token() SmarterAuthTokens: %s", self.formatted_class_name, SmarterAuthToken.objects.all(), ) return self._smarter_auth_token @smarter_auth_token.setter def smarter_auth_token(self, value: SmarterAuthToken) -> None: """Set the SmarterAuthToken object.""" self._smarter_auth_token = value logger.debug( "%s.smarter_auth_token() set to %s", self.formatted_class_name, self._smarter_auth_token, ) @property def token_key(self) -> Optional[str]: """ The token_key is the actual API key that is used to authenticate with the Smarter API. The token_key is generated by the SmarterAuthToken object when it is created and it is only available immediately after the object is created. """ if self.created and self._token_key: return self._token_key
[docs] def manifest_to_django_orm(self) -> dict[str, Any]: """Transform the Smarter API SAMSmarterAuthToken manifest into a Django ORM model.""" logger.debug("%s.manifest_to_django_orm() called", self.formatted_class_name) if not isinstance(self.manifest, SAMSmarterAuthToken): raise SAMSmarterAuthTokenBrokerError( f"Invalid manifest type for {self.kind} broker: {type(self.manifest)}", thing=self.kind, command=SmarterJournalCliCommands.APPLY, ) metadata = super().manifest_to_django_orm() config_dump = self.manifest.spec.config.model_dump() config_dump = self.to_snake_case(config_dump) if not isinstance(config_dump, dict): raise SAMSmarterAuthTokenBrokerError( message=f"Invalid config dump for {self.kind} manifest: {config_dump}. Got type {type(config_dump)}", thing=self.kind, command=SmarterJournalCliCommands.APPLY, ) retval = { **metadata, **config_dump, } logger.debug( "%s.manifest_to_django_orm() converted manifest metadata to Django ORM dict: %s", self.formatted_class_name, logging.formatted_json(retval), ) return retval
[docs] @camel_case() def django_orm_to_manifest_dict(self) -> dict: """ Transform the Django ORM model into a Pydantic readable. Smarter API SAMSmarterAuthToken manifest dict. """ logger.debug("%s.django_orm_to_manifest_dict() called", self.formatted_class_name) if not isinstance(self.smarter_auth_token, SmarterAuthToken): raise SAMSmarterAuthTokenBrokerError( f"smarter_auth_token is not a SmarterAuthToken instance: {type(self.smarter_auth_token)}", thing=self.kind, command=None, ) if self.manifest is None: raise SAMBrokerErrorNotFound( f"Manifest not set for {self.kind} broker. Cannot describe.", thing=self.thing, command=SmarterJournalCliCommands.DESCRIBE, ) data = self.manifest.model_dump() return data
########################################################################### # Smarter abstract property implementations ########################################################################### @property def kind(self) -> str: return MANIFEST_KIND @property def manifest(self) -> Optional[SAMSmarterAuthToken]: """ SAMSmarterAuthToken() is a Pydantic model. that is used to represent the Smarter API SAMSmarterAuthToken manifest. The Pydantic model is initialized with the data from the manifest loader, which is generally passed to the model constructor as **data. However, this top-level manifest model has to be explicitly initialized, whereas its child models are automatically cascade-initialized by the Pydantic model, implicitly passing **data to each child's constructor. """ if self._manifest: if not isinstance(self._manifest, SAMSmarterAuthToken): raise SAMSmarterAuthTokenBrokerError( f"Invalid manifest type for {self.kind} broker: {type(self._manifest)}", thing=self.kind, ) return self._manifest logger.debug( "%s.manifest() - Attempting to initialize from loader %s or smarter_auth_token %s", self.formatted_class_name, self.loader, self._smarter_auth_token, ) if self.loader and self.loader.manifest_kind == self.kind: try: logger.debug( "%s.manifest() initializing SAMSmarterAuthToken() using data from self.loader %s", self.formatted_class_name, self.loader, ) self._manifest = SAMSmarterAuthToken( apiVersion=self.loader.manifest_api_version, kind=self.loader.manifest_kind, metadata=SAMSmarterAuthTokenMetadata(**self.loader.manifest_metadata), spec=SAMSmarterAuthTokenSpec(**self.loader.manifest_spec), ) logger.debug( "%s.manifest() initialized with SAMSmarterAuthToken() using data from self.loader", self.formatted_class_name, ) except PydanticValidationError as e: logger.error( "%s.manifest() could not be initialized with SAMSmarterAuthToken() using data from self.loader: %s", self.formatted_class_name, str(e), ) elif self._smarter_auth_token: status = SAMSmarterAuthTokenStatus( recordLocator=self._smarter_auth_token.record_locator, created=self._smarter_auth_token.created_at, modified=self._smarter_auth_token.updated_at, lastUsedAt=self._smarter_auth_token.last_used_at, ) metadata = SAMSmarterAuthTokenMetadata( name=str(self._smarter_auth_token.name), description=self._smarter_auth_token.description, version=self._smarter_auth_token.version, tags=self._smarter_auth_token.tags_list, annotations=self._smarter_auth_token.annotations, ) spec = SAMSmarterAuthTokenSpec( config=SAMSmarterAuthTokenSpecConfig( isActive=self._smarter_auth_token.is_active, username=self._smarter_auth_token.user.username, ) ) self._manifest = SAMSmarterAuthToken( apiVersion=self.api_version, kind=self.kind, metadata=metadata, spec=spec, status=status, ) logger.debug( "%s.manifest() initialized %s from SmarterAuthToken ORM model %s: %s", self.formatted_class_name, type(self._manifest).__name__, self.smarter_auth_token, serializers.serialize("json", [self._smarter_auth_token]), ) return self._manifest else: logger.warning( "%s.manifest() %s could not be initialized. self.loader is %s.", self.kind, self.formatted_class_name, "initialized" if self.loader is not None else "not initialized", ) return self._manifest ########################################################################### # Smarter manifest abstract method implementations ########################################################################### @property def SerializerClass(self) -> Type[ModelSerializer]: """ Return the Serializer class for the SmarterAuthToken model. This is used to serialize and deserialize the SmarterAuthToken model for API responses and requests. """ return SmarterAuthTokenMiniSerializer @property def ORMMetaModelClass(self) -> Type[SmarterAuthToken]: """ Return the Django ORM meta model class for the broker. :return: The Django ORM meta model class definition for the broker. :rtype: Type[SmarterAuthToken] """ return SmarterAuthToken @property def ORMModelClass(self) -> Type[SmarterAuthToken]: return SmarterAuthToken @property def orm_instance(self) -> Optional[SmarterAuthToken]: """ Return the Django ORM model instance for the broker. :return: The Django ORM model instance for the broker. :rtype: Optional[TimestampedModel] """ if self._orm_instance: return self._orm_instance if self.orm_meta_instance: self._orm_instance = self.orm_meta_instance # type: ignore return self._orm_instance if not self.manifest: logger.debug( "%s.orm_instance() - manifest is not set. Cannot retrieve %s instance.", self.formatted_class_name, SmarterAuthToken.__name__, ) return None try: logger.debug( "%s.orm_instance() - attempting to retrieve %s for %s owned by %s", self.formatted_class_name, SmarterAuthToken.__name__, self.name, self.user_profile, ) self._orm_instance = SmarterAuthToken.objects.get( user__username=self.manifest.spec.config.username, name=self.name ) logger.debug( "%s.orm_instance() - retrieved %s for %s owned by %s", self.formatted_class_name, SmarterAuthToken.__name__, self.name, self.user_profile, ) return self._orm_instance except SmarterAuthToken.DoesNotExist: logger.warning( "%s.orm_instance() - %s instance does not exist for %s owned by %s", self.formatted_class_name, SmarterAuthToken.__name__, self.name, self.user_profile, ) return None
[docs] def orm_meta_instance_setter(self) -> None: """Override the base method to initialize the ORM meta model for the broker.""" if self._orm_instance: logger.debug( "%s.orm_meta_instance_setter() ORM instance is already set. Setting ORM meta instance to ORM instance.", self.formatted_class_name, ) self._orm_meta_instance = self._orm_instance return if not self._manifest: logger.debug( "%s.orm_meta_instance_setter() - manifest is not set. Cannot retrieve ORM meta instance for %s.", self.formatted_class_name, SmarterAuthToken.__name__, ) return if not self.name: logger.debug( "%s.orm_meta_instance_setter() - name is not set. Cannot retrieve ORM meta instance for %s.", self.formatted_class_name, SmarterAuthToken.__name__, ) return self._orm_meta_instance = None try: self._orm_meta_instance = SmarterAuthToken.objects.get( user__username=self._manifest.spec.config.username, name=self.name ) logger.debug( "%s.orm_meta_instance_setter() - initialized ORM meta: %s", self.formatted_class_name, serializers.serialize("json", [self.orm_meta_instance]), # type: ignore ) except SmarterAuthToken.DoesNotExist: logger.warning( "%s.orm_meta_instance_setter() - ORM meta does not exist for account=%s, name=%s", self.formatted_class_name, self.account, self.name, ) except Exception as e: logger.error( "%s.orm_meta_instance_setter() - Error initializing ORM meta for account=%s, name=%s: %s", self.formatted_class_name, self.account, self.name, str(e), )
[docs] def cache_invalidations(self) -> None: """Invalidate any relevant caches when the manifest or SmarterAuthToken data changes.""" logger.debug("%s.cache_invalidations() called.", self.formatted_class_name_cache_invalidations) SmarterAuthToken.get_cached_object(invalidate=True, user=self.user, name=self.name, taggit=False) # type: ignore return super().cache_invalidations()
[docs] def example_manifest(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse: logger.debug("%s.example_manifest() called with args: %s, kwargs: %s", self.formatted_class_name, args, kwargs) command = self.example_manifest.__name__ command = SmarterJournalCliCommands(command) data = { SAMKeys.APIVERSION.value: self.api_version, SAMKeys.KIND.value: self.kind, SAMKeys.METADATA.value: { SAMMetadataKeys.NAME.value: "snake-case-name", SAMMetadataKeys.DESCRIPTION.value: "An example Smarter API manifest for a SmarterAuthToken", SAMMetadataKeys.VERSION.value: "1.0.0", }, SAMKeys.SPEC.value: { SAMSmarterAuthTokenSpecKeys.CONFIG.value: { "isActive": True, "username": "valid_smarter_username", }, }, } return self.json_response_ok(command=command, data=data)
[docs] def get(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse: logger.debug("%s.get() called with args: %s, kwargs: %s", self.formatted_class_name, args, kwargs) command = self.get.__name__ command = SmarterJournalCliCommands(command) data = [] name = kwargs.get(SAMMetadataKeys.NAME.value) url = self.smarter_build_absolute_uri(request) or "Unknown URL" name = self.clean_cli_param(param=name, param_name="name", url=url) if name: # if the name is not None, then we are looking for a specific SmarterAuthToken smarter_auth_tokens = SmarterAuthToken.get_cached_objects(user=self.user, name=name) # type: ignore else: smarter_auth_tokens = SmarterAuthToken.get_cached_objects(user=self.user) # type: ignore # iterate over the QuerySet and use the manifest controller to create a Pydantic model dump for each Plugin for smarter_auth_token in smarter_auth_tokens: try: model_dump = SmarterAuthTokenMiniSerializer(smarter_auth_token).data if not model_dump: raise SAMSmarterAuthTokenBrokerError( f"Model dump failed for {self.kind} {smarter_auth_token.name}", thing=self.kind, command=command ) camel_cased_model_dump = self.to_camel_case(model_dump) data.append(camel_cased_model_dump) except Exception as e: raise SAMSmarterAuthTokenBrokerError( f"Model dump failed for {self.kind} {smarter_auth_token.name}", thing=self.kind, command=command ) from e data = { SAMKeys.APIVERSION.value: self.api_version, SAMKeys.KIND.value: self.kind, SAMKeys.METADATA.value: {"count": len(data)}, SCLIResponseGet.KWARGS.value: kwargs, SCLIResponseGet.DATA.value: { SCLIResponseGetData.TITLES.value: self.get_model_titles(serializer=SmarterAuthTokenMiniSerializer()), SCLIResponseGetData.ITEMS.value: data, }, } return self.json_response_ok(command=command, data=data)
[docs] def apply(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse: """ Apply the manifest. copy the manifest data to the Django ORM model and save the model to the database. Call super().apply() to ensure that the manifest is loaded and validated before applying the manifest to the Django ORM model. Note that there are fields included in the manifest that are not editable and are therefore removed from the Django ORM model dict prior to attempting the save() command. These fields are defined in the readonly_fields list. """ logger.debug("%s.apply() called with args: %s, kwargs: %s", self.formatted_class_name, args, kwargs) super().apply(request, kwargs) command = self.apply.__name__ command = SmarterJournalCliCommands(command) readonly_fields = [ "id", "created_at", "updated_at", "last_used_at", "key_id", "user", "username", "digest", "token_key", ] if not isinstance(self.user, User): raise SAMSmarterAuthTokenBrokerError( f"Invalid user type: {type(self.user)}. User must be an instance of User.", thing=self.kind, command=command, ) if not self.user.is_staff: raise SAMSmarterAuthTokenBrokerError( message="Only account admins can apply auth token manifests.", thing=self.kind, command=command, ) if not self.manifest: raise SAMBrokerErrorNotReady( f"Manifest not set for {self.kind} broker. Cannot apply.", thing=self.thing, command=command, ) try: data = self.manifest_to_django_orm() for field in readonly_fields: logger.debug( "%s.apply() Removing readonly field %s from data for %s", self.formatted_class_name, field, self.kind, ) data.pop(field, None) # handle the username try: manifest_spec_config_user = User.objects.get(username=self.manifest.spec.config.username) except User.DoesNotExist as e: raise SAMSmarterAuthTokenBrokerError( f"User {self.manifest.spec.config.username} does not exist for {self.kind} {self.name}", thing=self.kind, command=command, ) from e # ensure that the role of the user is equal to or less than the role of the owner # of this process. if not self.user.is_staff and not self.user.is_superuser: raise SAMSmarterAuthTokenBrokerError( f"User {self.user.username} does not have permission to create or modify API keys.", thing=self.kind, command=command, ) if not self.user.is_superuser: if manifest_spec_config_user.is_superuser: raise SAMSmarterAuthTokenBrokerError( f"User {self.user.username} does not have permission to create or modify API keys for users with higher administrative roles.", thing=self.kind, command=command, ) if self.smarter_auth_token is None: # Set all required fields at instantiation self.smarter_auth_token = SmarterAuthToken( user=manifest_spec_config_user, user_profile=self.user_profile, name=data.get("name"), description=data.get("description"), version=data.get("version"), annotations=data.get("annotations"), is_active=data.get("is_active"), # add any other required fields here ) else: for key, value in data.items(): setattr(self.smarter_auth_token, key, value) self.smarter_auth_token.user = manifest_spec_config_user logger.debug( "%s.apply() Saving %s: %s", self.formatted_class_name, self.smarter_auth_token, serializers.serialize("json", [self.smarter_auth_token]), ) self.smarter_auth_token.save() self.smarter_auth_token.refresh_from_db() except Exception as e: tb = traceback.format_exc() raise SAMBrokerError(message=f"Error in {command}: {e}\n{tb}", thing=self.kind, command=command) from e self.cache_invalidations() return self.json_response_ok(command=command, data=self.to_json())
[docs] def prompt(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse: logger.debug("%s.prompt() called with args: %s, kwargs: %s", self.formatted_class_name, args, kwargs) command = self.prompt.__name__ command = SmarterJournalCliCommands(command) raise SAMBrokerErrorNotImplemented(message="Prompt not implemented", thing=self.kind, command=command)
[docs] def describe(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse: logger.debug( "%s.describe() called for %s with args: %s, kwargs: %s", self.formatted_class_name, self.name, args, kwargs ) command = self.describe.__name__ command = SmarterJournalCliCommands(command) self._smarter_auth_token = None self.set_and_verify_name_param(command=command) if self.smarter_auth_token: try: data = self.django_orm_to_manifest_dict() return self.json_response_ok(command=command, data=data) except Exception as e: raise SAMSmarterAuthTokenBrokerError( f"{self.kind} {self.smarter_auth_token.name} error: {str(e)}", thing=self.kind, command=command ) from e raise SAMBrokerErrorNotReady(f"{self.kind} {self.name} is not ready", thing=self.kind, command=command)
[docs] def delete(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse: logger.debug( "%s.delete() called for %s with args: %s, kwargs: %s", self.formatted_class_name, self.name, args, kwargs ) command = self.delete.__name__ command = SmarterJournalCliCommands(command) self.set_and_verify_name_param(command=command) if not isinstance(self.user, User): raise SAMSmarterAuthTokenBrokerError( f"Invalid user type: {type(self.user)}. User must be an instance of User.", thing=self.kind, command=command, ) if not self.user.is_staff: raise SAMSmarterAuthTokenBrokerError( message="Only account admins can delete auth tokens.", thing=self.kind, command=command, ) if self.smarter_auth_token: try: self.smarter_auth_token.delete() return self.json_response_ok(command=command, data={}) except Exception as e: raise SAMSmarterAuthTokenBrokerError( f"Failed to delete {self.kind} {self.smarter_auth_token.name}", thing=self.kind, command=command ) from e raise SAMBrokerErrorNotReady(f"{self.kind} {self.name} is not ready", thing=self.kind, command=command)
[docs] def deploy(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse: logger.debug( "%s.deploy() called for %s with args: %s, kwargs: %s", self.formatted_class_name, self.name, args, kwargs ) command = self.deploy.__name__ command = SmarterJournalCliCommands(command) self.set_and_verify_name_param(command=command) if not self.smarter_auth_token: raise SAMBrokerErrorNotReady(f"{self.kind} {self.name} is not ready", thing=self.kind, command=command) if not self.smarter_auth_token.is_active: self.smarter_auth_token.is_active = True self.smarter_auth_token.save() logger.debug( "%s.deploy() Activated %s %s. Saved and refreshed from DB: %s", self.formatted_class_name, self.kind, self.name, serializers.serialize("json", [self.smarter_auth_token]), ) else: logger.debug( "%s.deploy() %s %s is already active. No action taken.", self.formatted_class_name, self.kind, self.name, ) return self.json_response_ok(command=command, data=self.to_json())
[docs] def undeploy(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse: logger.debug( "%s.undeploy() called for %s with args: %s, kwargs: %s", self.formatted_class_name, self.name, args, kwargs ) command = self.undeploy.__name__ command = SmarterJournalCliCommands(command) self.set_and_verify_name_param(command=command) if self.smarter_auth_token: if self.smarter_auth_token.is_active: self.smarter_auth_token.is_active = False self.smarter_auth_token.save() self.smarter_auth_token.refresh_from_db() else: raise SAMBrokerErrorNotReady(f"{self.kind} {self.name} is not ready", thing=self.kind, command=command) return self.json_response_ok(command=command, data=self.to_json())
[docs] def logs(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse: logger.debug("%s.logs() called with args: %s, kwargs: %s", self.formatted_class_name, args, kwargs) command = self.logs.__name__ command = SmarterJournalCliCommands(command) raise SAMBrokerErrorNotImplemented(message="Logs are not implemented", thing=self.kind, command=command)