# pylint: disable=W0718
"""Smarter API SmarterAuthToken Manifest handler."""
import traceback
from typing import Any, Optional, Type
from django.core import serializers
from django.core.handlers.asgi import ASGIRequest
from pydantic_core import ValidationError as PydanticValidationError
from rest_framework.serializers import ModelSerializer
from smarter.apps.account.models import User
from smarter.common.utils.decorators import camel_case
from smarter.lib import logging
from smarter.lib.drf.manifest.enum import SAMSmarterAuthTokenSpecKeys
from smarter.lib.drf.manifest.models.auth_token.const import MANIFEST_KIND
from smarter.lib.drf.manifest.models.auth_token.metadata import (
SAMSmarterAuthTokenMetadata,
)
from smarter.lib.drf.manifest.models.auth_token.model import SAMSmarterAuthToken
from smarter.lib.drf.manifest.models.auth_token.spec import (
SAMSmarterAuthTokenSpec,
SAMSmarterAuthTokenSpecConfig,
)
from smarter.lib.drf.manifest.models.auth_token.status import SAMSmarterAuthTokenStatus
from smarter.lib.drf.models import SmarterAuthToken
from smarter.lib.journal.enum import SmarterJournalCliCommands
from smarter.lib.journal.http import SmarterJournaledJsonResponse
from smarter.lib.manifest.broker import (
AbstractBroker,
SAMBrokerError,
SAMBrokerErrorNotFound,
SAMBrokerErrorNotImplemented,
SAMBrokerErrorNotReady,
)
from smarter.lib.manifest.enum import (
SAMKeys,
SAMMetadataKeys,
SCLIResponseGet,
SCLIResponseGetData,
)
MAX_RESULTS = 1000
logger = logging.getLogger(__name__)
[docs]
class SAMSmarterAuthTokenBrokerError(SAMBrokerError):
"""Base exception for Smarter API SmarterAuthToken Broker handling."""
@property
def get_formatted_err_message(self):
return "Smarter API SmarterAuthToken Manifest Broker Error"
[docs]
class SmarterAuthTokenMiniSerializer(ModelSerializer):
"""API key serializer for smarter api that excludes SAM ownership information."""
# pylint: disable=missing-class-docstring
class Meta:
model = SmarterAuthToken
fields = ["key_id", "name", "description", "is_active", "last_used_at", "created_at", "updated_at"]
[docs]
class SAMSmarterAuthTokenBroker(AbstractBroker):
"""
Smarter API SmarterAuthToken Manifest Broker.
This class is responsible for
- loading, validating and parsing the Smarter Api yaml SmarterAuthToken manifests
- using the manifest to initialize the corresponding Pydantic model
This Broker class interacts with the collection of Django ORM models that
represent the Smarter API SAMSmarterAuthToken manifests. The Broker class
is responsible for creating, updating, deleting and querying the Django ORM
models, as well as transforming the Django ORM models into Pydantic models
for serialization and deserialization.
"""
# override the base abstract manifest model with the SAMSmarterAuthToken model
_manifest: Optional[SAMSmarterAuthToken]
_pydantic_model: Type[SAMSmarterAuthToken] = SAMSmarterAuthToken
_smarter_auth_token: Optional[SmarterAuthToken]
_token_key: Optional[str]
_orm_instance: Optional[SmarterAuthToken]
[docs]
def __init__(self, *args, **kwargs):
"""
Initialize the SAMSmarterAuthTokenBroker with the given arguments.
The constructor initializes the parent class and sets up the manifest
and user attributes.
"""
self._smarter_auth_token = None
self._token_key = None
self._created = False
super().__init__(*args, **kwargs)
@property
def formatted_class_name(self) -> str:
"""
Returns the formatted class name for logging purposes.
This is used to provide a more readable class name in logs.
"""
class_name = f"{__name__}.{SAMSmarterAuthToken.__name__}()[{id(self)}]"
return self.formatted_text(class_name)
@property
def smarter_auth_token(self) -> Optional[SmarterAuthToken]:
"""
The SmarterAuthToken object is a Django ORM model subclass from knox.AuthToken.
that represents a SmarterAuthToken api key. The SmarterAuthToken object is
used to store the authentication hash and Smarter metadata for the Smarter API.
The SmarterAuthToken object is retrieved from the database, if it exists,
or created from the manifest if it does not.
"""
if self._smarter_auth_token:
return self._smarter_auth_token
if not self.manifest:
logger.debug(
"%s.smarter_auth_token() Manifest not set. Cannot retrieve SmarterAuthToken.",
self.formatted_class_name,
)
return None
username = self.manifest.spec.config.username
try:
logger.debug(
"%s.smarter_auth_token() Retrieving SmarterAuthToken for user %s with name %s",
self.formatted_class_name,
username,
self.name,
)
self._smarter_auth_token = SmarterAuthToken.objects.get(user__username=username, name=self.name)
except SmarterAuthToken.DoesNotExist:
logger.debug(
"%s.smarter_auth_token() SmarterAuthToken for user %s with name %s does not exist.",
self.formatted_class_name,
username,
self.name,
)
logger.debug(
"%s.smarter_auth_token() SmarterAuthTokens: %s",
self.formatted_class_name,
SmarterAuthToken.objects.all(),
)
return self._smarter_auth_token
@smarter_auth_token.setter
def smarter_auth_token(self, value: SmarterAuthToken) -> None:
"""Set the SmarterAuthToken object."""
self._smarter_auth_token = value
logger.debug(
"%s.smarter_auth_token() set to %s",
self.formatted_class_name,
self._smarter_auth_token,
)
@property
def token_key(self) -> Optional[str]:
"""
The token_key is the actual API key that is used to authenticate with the Smarter API.
The token_key is generated by the SmarterAuthToken object when it is created and
it is only available immediately after the object is created.
"""
if self.created and self._token_key:
return self._token_key
[docs]
def manifest_to_django_orm(self) -> dict[str, Any]:
"""Transform the Smarter API SAMSmarterAuthToken manifest into a Django ORM model."""
logger.debug("%s.manifest_to_django_orm() called", self.formatted_class_name)
if not isinstance(self.manifest, SAMSmarterAuthToken):
raise SAMSmarterAuthTokenBrokerError(
f"Invalid manifest type for {self.kind} broker: {type(self.manifest)}",
thing=self.kind,
command=SmarterJournalCliCommands.APPLY,
)
metadata = super().manifest_to_django_orm()
config_dump = self.manifest.spec.config.model_dump()
config_dump = self.to_snake_case(config_dump)
if not isinstance(config_dump, dict):
raise SAMSmarterAuthTokenBrokerError(
message=f"Invalid config dump for {self.kind} manifest: {config_dump}. Got type {type(config_dump)}",
thing=self.kind,
command=SmarterJournalCliCommands.APPLY,
)
retval = {
**metadata,
**config_dump,
}
logger.debug(
"%s.manifest_to_django_orm() converted manifest metadata to Django ORM dict: %s",
self.formatted_class_name,
logging.formatted_json(retval),
)
return retval
[docs]
@camel_case()
def django_orm_to_manifest_dict(self) -> dict:
"""
Transform the Django ORM model into a Pydantic readable.
Smarter API SAMSmarterAuthToken manifest dict.
"""
logger.debug("%s.django_orm_to_manifest_dict() called", self.formatted_class_name)
if not isinstance(self.smarter_auth_token, SmarterAuthToken):
raise SAMSmarterAuthTokenBrokerError(
f"smarter_auth_token is not a SmarterAuthToken instance: {type(self.smarter_auth_token)}",
thing=self.kind,
command=None,
)
if self.manifest is None:
raise SAMBrokerErrorNotFound(
f"Manifest not set for {self.kind} broker. Cannot describe.",
thing=self.thing,
command=SmarterJournalCliCommands.DESCRIBE,
)
data = self.manifest.model_dump()
return data
###########################################################################
# Smarter abstract property implementations
###########################################################################
@property
def kind(self) -> str:
return MANIFEST_KIND
@property
def manifest(self) -> Optional[SAMSmarterAuthToken]:
"""
SAMSmarterAuthToken() is a Pydantic model.
that is used to represent the Smarter API SAMSmarterAuthToken manifest. The Pydantic
model is initialized with the data from the manifest loader, which is
generally passed to the model constructor as **data. However, this top-level
manifest model has to be explicitly initialized, whereas its child models
are automatically cascade-initialized by the Pydantic model, implicitly
passing **data to each child's constructor.
"""
if self._manifest:
if not isinstance(self._manifest, SAMSmarterAuthToken):
raise SAMSmarterAuthTokenBrokerError(
f"Invalid manifest type for {self.kind} broker: {type(self._manifest)}",
thing=self.kind,
)
return self._manifest
logger.debug(
"%s.manifest() - Attempting to initialize from loader %s or smarter_auth_token %s",
self.formatted_class_name,
self.loader,
self._smarter_auth_token,
)
if self.loader and self.loader.manifest_kind == self.kind:
try:
logger.debug(
"%s.manifest() initializing SAMSmarterAuthToken() using data from self.loader %s",
self.formatted_class_name,
self.loader,
)
self._manifest = SAMSmarterAuthToken(
apiVersion=self.loader.manifest_api_version,
kind=self.loader.manifest_kind,
metadata=SAMSmarterAuthTokenMetadata(**self.loader.manifest_metadata),
spec=SAMSmarterAuthTokenSpec(**self.loader.manifest_spec),
)
logger.debug(
"%s.manifest() initialized with SAMSmarterAuthToken() using data from self.loader",
self.formatted_class_name,
)
except PydanticValidationError as e:
logger.error(
"%s.manifest() could not be initialized with SAMSmarterAuthToken() using data from self.loader: %s",
self.formatted_class_name,
str(e),
)
elif self._smarter_auth_token:
status = SAMSmarterAuthTokenStatus(
recordLocator=self._smarter_auth_token.record_locator,
created=self._smarter_auth_token.created_at,
modified=self._smarter_auth_token.updated_at,
lastUsedAt=self._smarter_auth_token.last_used_at,
)
metadata = SAMSmarterAuthTokenMetadata(
name=str(self._smarter_auth_token.name),
description=self._smarter_auth_token.description,
version=self._smarter_auth_token.version,
tags=self._smarter_auth_token.tags_list,
annotations=self._smarter_auth_token.annotations,
)
spec = SAMSmarterAuthTokenSpec(
config=SAMSmarterAuthTokenSpecConfig(
isActive=self._smarter_auth_token.is_active,
username=self._smarter_auth_token.user.username,
)
)
self._manifest = SAMSmarterAuthToken(
apiVersion=self.api_version,
kind=self.kind,
metadata=metadata,
spec=spec,
status=status,
)
logger.debug(
"%s.manifest() initialized %s from SmarterAuthToken ORM model %s: %s",
self.formatted_class_name,
type(self._manifest).__name__,
self.smarter_auth_token,
serializers.serialize("json", [self._smarter_auth_token]),
)
return self._manifest
else:
logger.warning(
"%s.manifest() %s could not be initialized. self.loader is %s.",
self.kind,
self.formatted_class_name,
"initialized" if self.loader is not None else "not initialized",
)
return self._manifest
###########################################################################
# Smarter manifest abstract method implementations
###########################################################################
@property
def SerializerClass(self) -> Type[ModelSerializer]:
"""
Return the Serializer class for the SmarterAuthToken model.
This is used to serialize and deserialize the SmarterAuthToken
model for API responses and requests.
"""
return SmarterAuthTokenMiniSerializer
@property
def ORMMetaModelClass(self) -> Type[SmarterAuthToken]:
"""
Return the Django ORM meta model class for the broker.
:return: The Django ORM meta model class definition for the broker.
:rtype: Type[SmarterAuthToken]
"""
return SmarterAuthToken
@property
def ORMModelClass(self) -> Type[SmarterAuthToken]:
return SmarterAuthToken
@property
def orm_instance(self) -> Optional[SmarterAuthToken]:
"""
Return the Django ORM model instance for the broker.
:return: The Django ORM model instance for the broker.
:rtype: Optional[TimestampedModel]
"""
if self._orm_instance:
return self._orm_instance
if self.orm_meta_instance:
self._orm_instance = self.orm_meta_instance # type: ignore
return self._orm_instance
if not self.manifest:
logger.debug(
"%s.orm_instance() - manifest is not set. Cannot retrieve %s instance.",
self.formatted_class_name,
SmarterAuthToken.__name__,
)
return None
try:
logger.debug(
"%s.orm_instance() - attempting to retrieve %s for %s owned by %s",
self.formatted_class_name,
SmarterAuthToken.__name__,
self.name,
self.user_profile,
)
self._orm_instance = SmarterAuthToken.objects.get(
user__username=self.manifest.spec.config.username, name=self.name
)
logger.debug(
"%s.orm_instance() - retrieved %s for %s owned by %s",
self.formatted_class_name,
SmarterAuthToken.__name__,
self.name,
self.user_profile,
)
return self._orm_instance
except SmarterAuthToken.DoesNotExist:
logger.warning(
"%s.orm_instance() - %s instance does not exist for %s owned by %s",
self.formatted_class_name,
SmarterAuthToken.__name__,
self.name,
self.user_profile,
)
return None
[docs]
def cache_invalidations(self) -> None:
"""Invalidate any relevant caches when the manifest or SmarterAuthToken data changes."""
logger.debug("%s.cache_invalidations() called.", self.formatted_class_name_cache_invalidations)
SmarterAuthToken.get_cached_object(invalidate=True, user=self.user, name=self.name, taggit=False) # type: ignore
return super().cache_invalidations()
[docs]
def example_manifest(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse:
logger.debug("%s.example_manifest() called with args: %s, kwargs: %s", self.formatted_class_name, args, kwargs)
command = self.example_manifest.__name__
command = SmarterJournalCliCommands(command)
data = {
SAMKeys.APIVERSION.value: self.api_version,
SAMKeys.KIND.value: self.kind,
SAMKeys.METADATA.value: {
SAMMetadataKeys.NAME.value: "snake-case-name",
SAMMetadataKeys.DESCRIPTION.value: "An example Smarter API manifest for a SmarterAuthToken",
SAMMetadataKeys.VERSION.value: "1.0.0",
},
SAMKeys.SPEC.value: {
SAMSmarterAuthTokenSpecKeys.CONFIG.value: {
"isActive": True,
"username": "valid_smarter_username",
},
},
}
return self.json_response_ok(command=command, data=data)
[docs]
def get(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse:
logger.debug("%s.get() called with args: %s, kwargs: %s", self.formatted_class_name, args, kwargs)
command = self.get.__name__
command = SmarterJournalCliCommands(command)
data = []
name = kwargs.get(SAMMetadataKeys.NAME.value)
url = self.smarter_build_absolute_uri(request) or "Unknown URL"
name = self.clean_cli_param(param=name, param_name="name", url=url)
if name:
# if the name is not None, then we are looking for a specific SmarterAuthToken
smarter_auth_tokens = SmarterAuthToken.get_cached_objects(user=self.user, name=name) # type: ignore
else:
smarter_auth_tokens = SmarterAuthToken.get_cached_objects(user=self.user) # type: ignore
# iterate over the QuerySet and use the manifest controller to create a Pydantic model dump for each Plugin
for smarter_auth_token in smarter_auth_tokens:
try:
model_dump = SmarterAuthTokenMiniSerializer(smarter_auth_token).data
if not model_dump:
raise SAMSmarterAuthTokenBrokerError(
f"Model dump failed for {self.kind} {smarter_auth_token.name}", thing=self.kind, command=command
)
camel_cased_model_dump = self.to_camel_case(model_dump)
data.append(camel_cased_model_dump)
except Exception as e:
raise SAMSmarterAuthTokenBrokerError(
f"Model dump failed for {self.kind} {smarter_auth_token.name}", thing=self.kind, command=command
) from e
data = {
SAMKeys.APIVERSION.value: self.api_version,
SAMKeys.KIND.value: self.kind,
SAMKeys.METADATA.value: {"count": len(data)},
SCLIResponseGet.KWARGS.value: kwargs,
SCLIResponseGet.DATA.value: {
SCLIResponseGetData.TITLES.value: self.get_model_titles(serializer=SmarterAuthTokenMiniSerializer()),
SCLIResponseGetData.ITEMS.value: data,
},
}
return self.json_response_ok(command=command, data=data)
[docs]
def apply(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse:
"""
Apply the manifest.
copy the manifest data to the Django ORM model and
save the model to the database. Call super().apply() to ensure that the
manifest is loaded and validated before applying the manifest to the
Django ORM model.
Note that there are fields included in the manifest that are not editable
and are therefore removed from the Django ORM model dict prior to attempting
the save() command. These fields are defined in the readonly_fields list.
"""
logger.debug("%s.apply() called with args: %s, kwargs: %s", self.formatted_class_name, args, kwargs)
super().apply(request, kwargs)
command = self.apply.__name__
command = SmarterJournalCliCommands(command)
readonly_fields = [
"id",
"created_at",
"updated_at",
"last_used_at",
"key_id",
"user",
"username",
"digest",
"token_key",
]
if not isinstance(self.user, User):
raise SAMSmarterAuthTokenBrokerError(
f"Invalid user type: {type(self.user)}. User must be an instance of User.",
thing=self.kind,
command=command,
)
if not self.user.is_staff:
raise SAMSmarterAuthTokenBrokerError(
message="Only account admins can apply auth token manifests.",
thing=self.kind,
command=command,
)
if not self.manifest:
raise SAMBrokerErrorNotReady(
f"Manifest not set for {self.kind} broker. Cannot apply.",
thing=self.thing,
command=command,
)
try:
data = self.manifest_to_django_orm()
for field in readonly_fields:
logger.debug(
"%s.apply() Removing readonly field %s from data for %s",
self.formatted_class_name,
field,
self.kind,
)
data.pop(field, None)
# handle the username
try:
manifest_spec_config_user = User.objects.get(username=self.manifest.spec.config.username)
except User.DoesNotExist as e:
raise SAMSmarterAuthTokenBrokerError(
f"User {self.manifest.spec.config.username} does not exist for {self.kind} {self.name}",
thing=self.kind,
command=command,
) from e
# ensure that the role of the user is equal to or less than the role of the owner
# of this process.
if not self.user.is_staff and not self.user.is_superuser:
raise SAMSmarterAuthTokenBrokerError(
f"User {self.user.username} does not have permission to create or modify API keys.",
thing=self.kind,
command=command,
)
if not self.user.is_superuser:
if manifest_spec_config_user.is_superuser:
raise SAMSmarterAuthTokenBrokerError(
f"User {self.user.username} does not have permission to create or modify API keys for users with higher administrative roles.",
thing=self.kind,
command=command,
)
if self.smarter_auth_token is None:
# Set all required fields at instantiation
self.smarter_auth_token = SmarterAuthToken(
user=manifest_spec_config_user,
user_profile=self.user_profile,
name=data.get("name"),
description=data.get("description"),
version=data.get("version"),
annotations=data.get("annotations"),
is_active=data.get("is_active"),
# add any other required fields here
)
else:
for key, value in data.items():
setattr(self.smarter_auth_token, key, value)
self.smarter_auth_token.user = manifest_spec_config_user
logger.debug(
"%s.apply() Saving %s: %s",
self.formatted_class_name,
self.smarter_auth_token,
serializers.serialize("json", [self.smarter_auth_token]),
)
self.smarter_auth_token.save()
self.smarter_auth_token.refresh_from_db()
except Exception as e:
tb = traceback.format_exc()
raise SAMBrokerError(message=f"Error in {command}: {e}\n{tb}", thing=self.kind, command=command) from e
self.cache_invalidations()
return self.json_response_ok(command=command, data=self.to_json())
[docs]
def prompt(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse:
logger.debug("%s.prompt() called with args: %s, kwargs: %s", self.formatted_class_name, args, kwargs)
command = self.prompt.__name__
command = SmarterJournalCliCommands(command)
raise SAMBrokerErrorNotImplemented(message="Prompt not implemented", thing=self.kind, command=command)
[docs]
def describe(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse:
logger.debug(
"%s.describe() called for %s with args: %s, kwargs: %s", self.formatted_class_name, self.name, args, kwargs
)
command = self.describe.__name__
command = SmarterJournalCliCommands(command)
self._smarter_auth_token = None
self.set_and_verify_name_param(command=command)
if self.smarter_auth_token:
try:
data = self.django_orm_to_manifest_dict()
return self.json_response_ok(command=command, data=data)
except Exception as e:
raise SAMSmarterAuthTokenBrokerError(
f"{self.kind} {self.smarter_auth_token.name} error: {str(e)}", thing=self.kind, command=command
) from e
raise SAMBrokerErrorNotReady(f"{self.kind} {self.name} is not ready", thing=self.kind, command=command)
[docs]
def delete(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse:
logger.debug(
"%s.delete() called for %s with args: %s, kwargs: %s", self.formatted_class_name, self.name, args, kwargs
)
command = self.delete.__name__
command = SmarterJournalCliCommands(command)
self.set_and_verify_name_param(command=command)
if not isinstance(self.user, User):
raise SAMSmarterAuthTokenBrokerError(
f"Invalid user type: {type(self.user)}. User must be an instance of User.",
thing=self.kind,
command=command,
)
if not self.user.is_staff:
raise SAMSmarterAuthTokenBrokerError(
message="Only account admins can delete auth tokens.",
thing=self.kind,
command=command,
)
if self.smarter_auth_token:
try:
self.smarter_auth_token.delete()
return self.json_response_ok(command=command, data={})
except Exception as e:
raise SAMSmarterAuthTokenBrokerError(
f"Failed to delete {self.kind} {self.smarter_auth_token.name}", thing=self.kind, command=command
) from e
raise SAMBrokerErrorNotReady(f"{self.kind} {self.name} is not ready", thing=self.kind, command=command)
[docs]
def deploy(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse:
logger.debug(
"%s.deploy() called for %s with args: %s, kwargs: %s", self.formatted_class_name, self.name, args, kwargs
)
command = self.deploy.__name__
command = SmarterJournalCliCommands(command)
self.set_and_verify_name_param(command=command)
if not self.smarter_auth_token:
raise SAMBrokerErrorNotReady(f"{self.kind} {self.name} is not ready", thing=self.kind, command=command)
if not self.smarter_auth_token.is_active:
self.smarter_auth_token.is_active = True
self.smarter_auth_token.save()
logger.debug(
"%s.deploy() Activated %s %s. Saved and refreshed from DB: %s",
self.formatted_class_name,
self.kind,
self.name,
serializers.serialize("json", [self.smarter_auth_token]),
)
else:
logger.debug(
"%s.deploy() %s %s is already active. No action taken.",
self.formatted_class_name,
self.kind,
self.name,
)
return self.json_response_ok(command=command, data=self.to_json())
[docs]
def undeploy(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse:
logger.debug(
"%s.undeploy() called for %s with args: %s, kwargs: %s", self.formatted_class_name, self.name, args, kwargs
)
command = self.undeploy.__name__
command = SmarterJournalCliCommands(command)
self.set_and_verify_name_param(command=command)
if self.smarter_auth_token:
if self.smarter_auth_token.is_active:
self.smarter_auth_token.is_active = False
self.smarter_auth_token.save()
self.smarter_auth_token.refresh_from_db()
else:
raise SAMBrokerErrorNotReady(f"{self.kind} {self.name} is not ready", thing=self.kind, command=command)
return self.json_response_ok(command=command, data=self.to_json())
[docs]
def logs(self, request: ASGIRequest, *args, **kwargs) -> SmarterJournaledJsonResponse:
logger.debug("%s.logs() called with args: %s, kwargs: %s", self.formatted_class_name, args, kwargs)
command = self.logs.__name__
command = SmarterJournalCliCommands(command)
raise SAMBrokerErrorNotImplemented(message="Logs are not implemented", thing=self.kind, command=command)