Source code for smarter.apps.account.manifest.transformers.secret

"""A class for working with Secret manifests and the Secret Django ORM."""

# python stuff
import logging
from datetime import datetime
from typing import Any, Optional, Union

import yaml

# 3rd party stuff
from rest_framework import serializers

from smarter.apps.account.manifest.enum import SAMSecretSpecKeys
from smarter.apps.account.manifest.models.secret.const import MANIFEST_KIND
from smarter.apps.account.manifest.models.secret.metadata import SAMSecretMetadata

# smarter stuff
from smarter.apps.account.manifest.models.secret.model import SAMSecret
from smarter.apps.account.manifest.models.secret.spec import (
    SAMSecretSpec,
    SAMSecretSpecConfig,
)
from smarter.apps.account.manifest.models.secret.status import SAMSecretStatus
from smarter.apps.account.models import Secret, UserProfile
from smarter.apps.account.signals import (
    secret_created,
    secret_deleted,
    secret_inializing,
    secret_ready,
    secret_saved,
    secret_updated,
)
from smarter.apps.account.utils import (
    get_user_profiles_for_account,
    smarter_cached_objects,
)
from smarter.common.api import SmarterApiVersions
from smarter.common.exceptions import SmarterException
from smarter.common.mixins import SmarterHelperMixin
from smarter.lib import json
from smarter.lib.django import waffle
from smarter.lib.django.waffle import SmarterWaffleSwitches
from smarter.lib.logging import WaffleSwitchedLoggerWrapper
from smarter.lib.manifest.enum import SAMKeys, SAMMetadataKeys
from smarter.lib.manifest.exceptions import SAMValidationError
from smarter.lib.manifest.loader import SAMLoader


# pylint: disable=W0613
[docs] def should_log(level): """Check if logging should be done based on the waffle switch.""" return waffle.switch_is_active(SmarterWaffleSwitches.ACCOUNT_LOGGING)
base_logger = logging.getLogger(__name__) logger = WaffleSwitchedLoggerWrapper(base_logger, should_log) SMARTER_API_MANIFEST_COMPATIBILITY = [SmarterApiVersions.V1] SMARTER_API_MANIFEST_DEFAULT_VERSION = SmarterApiVersions.V1 READ_ONLY_FIELDS = ["id", "user_profile", "last_accessed", "created_at", "modified_at"]
[docs] class SmarterSecretTransformerError(SmarterException): """Base exception for Smarter API Secret handling."""
[docs] class SecretSerializer(serializers.ModelSerializer): """Secret serializer for Smarter api.""" # pylint: disable=missing-class-docstring class Meta: model = Secret fields = "__all__" read_only_fields = ("user_profile", "last_accessed", "created_at", "modified_at")
[docs] class SecretTransformer(SmarterHelperMixin): """A class for working with secrets.""" _name: Optional[str] = None _api_version: str = SMARTER_API_MANIFEST_DEFAULT_VERSION _manifest: Optional[SAMSecret] = None _secret: Optional[Secret] = None _secret_serializer: Optional[SecretSerializer] = None _user_profile: Optional[UserProfile] = None
[docs] def __init__( self, *args, user_profile: UserProfile, name: Optional[str] = None, api_version: Optional[str] = None, manifest: Optional[SAMSecret] = None, secret_id: Optional[int] = None, secret: Optional[Secret] = None, data: Optional[Union[dict, str]] = None, **kwargs, ): """ Options for initialization are: - name: name of the secret, for initializing the Django ORM model. - Pydantic model created by a manifest broker (preferred method). - django model secret id. - yaml manifest or json representation of a yaml manifest see ./tests/data/secret-good.yaml for an example. """ logger.debug( "%s.__init__() called with args=%s, user_profile=%s, name=%s, api_version=%s, manifest=%s, secret_id=%s, secret=%s, data=%s, kwargs=%s", self.formatted_class_name, args, user_profile, name, api_version, manifest, secret_id, secret, data, kwargs, ) super().__init__(*args, **kwargs) if sum([bool(name), bool(data), bool(manifest), bool(secret_id), bool(secret)]) == 0: raise SmarterSecretTransformerError( f"Must specify at least one of: name, manifest, data, secret_id, or secret. " f"Received name: {bool(name)} data: {bool(data)}, manifest: {bool(manifest)}, " f"secret_id: {bool(secret_id)}, secret: {bool(secret)}." ) self._secret = secret if secret else None self._secret_serializer = None self._user_profile = user_profile secret_inializing.send(sender=self.__class__, secret_name=name, user_profile=user_profile) self._user_profile = user_profile if not self._user_profile: raise SmarterSecretTransformerError("User profile is not set.") if name: self._name = name if api_version: self._api_version = api_version ####################################################################### # identifiers for existing secrets ####################################################################### if secret_id: logger.debug( "%s.__init__() Initializing secret transformer with secret_id: %s", self.formatted_class_name, secret_id ) self.id = secret_id if secret: logger.debug( "%s.__init__() Initializing secret transformer with secret: %s", self.formatted_class_name, secret ) self.id = secret.id # type: ignore[union-attr] ####################################################################### # Smarter API Manifest based initialization ####################################################################### if api_version and api_version not in SMARTER_API_MANIFEST_COMPATIBILITY: raise SmarterSecretTransformerError(f"API version {api_version} is not compatible.") if api_version: logger.debug( "%s.__init__() Initializing secret transformer with api_version: %s", self.formatted_class_name, api_version, ) self._api_version = api_version if manifest: logger.debug( "%s.__init__() Initializing secret transformer with manifest: %s", self.formatted_class_name, manifest ) if not isinstance(manifest, SAMSecret): raise SAMValidationError(f"Expected SAMSecret, but got {type(manifest)}.") # we received a Pydantic model from a manifest broker. self._manifest = manifest self.api_version = manifest.apiVersion if isinstance(data, dict): logger.debug("%s.__init__() Initializing secret transformer with data: %s", self.formatted_class_name, data) # we received a yaml or json string representation of a manifest. self.api_version = data.get(SAMKeys.APIVERSION.value, self.api_version) if data.get(SAMKeys.KIND.value) != self.kind: raise SAMValidationError(f"Expected kind of {self.kind}, but got {data.get(SAMKeys.KIND.value)}.") loader = SAMLoader( api_version=self.api_version, kind=self.kind, manifest=json.dumps(data), ) if not loader.ready: raise SAMValidationError("SAMLoader is not ready.") self._manifest = SAMSecret(**loader.pydantic_model_dump()) self.create() if self.ready: secret_ready.send(sender=self.__class__, secret=self)
def __str__(self) -> str: """Return the name of the secret.""" return f"{SecretTransformer.__name__}[{id(self)}](name={self.name}, user_profile={self.user_profile})" def __repr__(self) -> str: """Return the name of the secret.""" return self.__str__() ########################################################################### # class methods ########################################################################### # pylint: disable=W0613
[docs] @classmethod def example_manifest(cls, kwargs: Optional[dict[str, Any]] = None) -> dict: return { SAMKeys.APIVERSION.value: SMARTER_API_MANIFEST_DEFAULT_VERSION, SAMKeys.KIND.value: MANIFEST_KIND, SAMKeys.METADATA.value: { SAMMetadataKeys.DESCRIPTION.value: "A secret for testing purposes", SAMMetadataKeys.NAME.value: "TestSecret", SAMMetadataKeys.TAGS.value: [], SAMMetadataKeys.VERSION.value: "0.1.0", }, SAMKeys.SPEC.value: { SAMSecretSpecKeys.CONFIG.value: { SAMSecretSpecKeys.VALUE.value: "test-password", SAMSecretSpecKeys.EXPIRATION_DATE.value: "2026-12-31", } }, }
########################################################################### # class instance properties ########################################################################### @property def api_version(self) -> str: """Return the api version of the secret.""" if not self._api_version: self._api_version = self._manifest.apiVersion if self._manifest else SMARTER_API_MANIFEST_DEFAULT_VERSION return self._api_version @api_version.setter def api_version(self, value: str): """Set the api version of the secret.""" if value not in SMARTER_API_MANIFEST_COMPATIBILITY: raise SAMValidationError( f"Invalid api version: {value}. Must be one of: {SMARTER_API_MANIFEST_COMPATIBILITY}" ) self._api_version = value @property def kind(self) -> str: """Return the kind of manifest.""" return MANIFEST_KIND @property def manifest(self) -> Optional[SAMSecret]: """Return the Pydandic model of the secret.""" if self._manifest: if not isinstance(self._manifest, SAMSecret): raise SAMValidationError(f"Expected SAMSecret, but got {type(self._manifest)}.") return self._manifest if self.secret: # if we don't have a manifest but we do have Django ORM data then # we can work backwards to the Pydantic model metadata = SAMSecretMetadata( name=self.secret.name, description=self.secret.description, version=self.secret.version, tags=self.secret.tags_list, annotations=self.secret.annotations if self.secret.annotations else [], ) spec_config = SAMSecretSpecConfig( value=self.secret.get_secret(update_last_accessed=False) or "", expiration_date=self.secret.expires_at if self.secret.expires_at else None, ) status = SAMSecretStatus( accountNumber=self.secret.account.account_number if self.secret.account else "missing", username=self.secret.user_profile.cached_user.username if self.secret.user_profile else "missing", recordLocator=self.secret.record_locator, created=self.secret.created_at, modified=self.secret.updated_at, last_accessed=self.secret.last_accessed, ) self._manifest = SAMSecret( apiVersion=self.api_version, kind=self.kind, metadata=metadata, spec=SAMSecretSpec(config=spec_config), status=status, ) return self._manifest @property def value(self) -> Optional[str]: """Return the secret value.""" if self._manifest: return self._manifest.spec.config.value if self._manifest.spec and self._manifest.spec.config else None if self.secret: return self.secret.get_secret(update_last_accessed=False) return None @property def encrypted_value(self) -> Optional[bytes]: """Return the encrypted secret value.""" if self._manifest: return Secret.encrypt(value=self.value) # type: ignore[return-value] if self.secret: return self.secret.encrypted_value return None @property def description(self) -> Optional[str]: """Return the secret description.""" if self._manifest and self._manifest.metadata: return self._manifest.metadata.description if self.secret: return self.secret.description return None @property def version(self) -> str: """Return the secret version.""" if self._manifest and self._manifest.metadata and self._manifest.metadata.version: return self._manifest.metadata.version return "1.0.0" @property def tags(self) -> set[str]: """Return the secret tags.""" if self._manifest and self._manifest.metadata and self._manifest.metadata.tags: # Convert tags (list[str]) to set for TaggableManager compatibility tags = self._manifest.metadata.tags tags = set(tags) if tags else set() return tags return set() @property def annotations(self) -> list[dict[str, Any]]: """Return the secret annotations.""" if self._manifest and self._manifest.metadata and self._manifest.metadata.annotations: return self._manifest.metadata.annotations return [] @property def created_at(self) -> Optional[str]: """Return the created date.""" if self.secret: return self.secret.created_at.isoformat() if self.secret.created_at else None return None @property def updated_at(self) -> Optional[str]: """Return the updated date.""" if self.secret: return self.secret.updated_at.isoformat() if self.secret.updated_at else None return None @property def last_accessed(self) -> Optional[str]: """Return the last accessed date.""" retval = None if self._manifest and self._manifest.status and self._manifest.status.last_accessed: retval = ( self._manifest.status.last_accessed.isoformat() if self._manifest.status and self._manifest.status.last_accessed else None ) if self.secret: retval = self.secret.last_accessed.isoformat() if self.secret.last_accessed else None return retval @property def expires_at(self) -> Optional[datetime]: """Return the expiration date in the format, YYYY-MM-DD""" if ( self._manifest and self._manifest.spec and self._manifest.spec.config and self._manifest.spec.config.expiration_date ): return self._manifest.spec.config.expiration_date if self._manifest.spec.config.expiration_date else None if self.secret: return self.secret.expires_at if self.secret.expires_at else None return None @property def id(self) -> Optional[int]: """Return the id of the secret.""" if self.secret: return self.secret.id # type: ignore[return-value] return None @id.setter def id(self, value: int): """Set the id of the secret.""" self._name = None self._secret_serializer = None if not value: self._secret = None return try: self._secret = Secret.objects.get(pk=value) except Secret.DoesNotExist as e: raise SmarterSecretTransformerError(f"Secret.DoesNotExist: pk={value}") from e @property def secret(self) -> Optional[Secret]: """Return the secret meta.""" if self._secret: return self._secret if not self.name: logger.warning("%s.secret() Secret name is not set.", self.formatted_class_name) return None if not self.user_profile: logger.warning("%s.secret() User profile is not set.", self.formatted_class_name) return None try: self._secret = Secret.objects.get(name=self.name, user_profile=self.user_profile) except Secret.DoesNotExist: admin_user = UserProfile.admin_for_account(self.user_profile.account) admin_user_profile = UserProfile.get_cached_object(user=admin_user) # type: ignore try: self._secret = Secret.objects.get(name=self.name, user_profile=admin_user_profile) except Secret.DoesNotExist: try: self._secret = Secret.objects.get( name=self.name, user_profile=smarter_cached_objects.smarter_admin_user_profile ) except Secret.DoesNotExist: pass if self._secret: logger.debug( "%s.secret() initialized Django ORM Secret %s for user profile %s.", self.formatted_class_name, self._secret.name, self._secret.user_profile, ) return self._secret logger.warning( "%s.secret() Django ORM Secret %s does not exist for user profile %s.", self.formatted_class_name, self.name, self.user_profile, ) # if the secret does not exist for the user profile, then we still need to check # if the secret exists for the account, and if so, whether self.user_profile # is at least a staff user. otherwise, we raise an error. other_user_profiles = ( get_user_profiles_for_account(account=self.user_profile.account) if self.user_profile else None ) secret = Secret.objects.filter(user_profile__in=other_user_profiles, name=self.name).first() if secret: if self.user_profile and not self.user_profile.user.is_staff and not self.user_profile.user.is_superuser: raise SmarterSecretTransformerError( f"Secret {self.name} exists for user profile {secret.user_profile.cached_user.username} " f"but not for user profile {self.user_profile.user.username}." ) self._secret = secret logger.debug( "%s.secret() initialized Django ORM Secret %s for user profile %s from account-level access.", self.formatted_class_name, self.name, self.user_profile, ) if not self._secret: logger.warning( "%s.secret() could not initialize Django ORM Secret %s for user_profile %s.", self.formatted_class_name, self.name, self.user_profile, ) return self._secret @secret.setter def secret(self, value: Secret): """Set the secret meta.""" self._secret = value self._secret_serializer = None if self._secret: self._name = self._secret.name # Only set _user_profile if it exists. This will be missing on new secrets. if hasattr(self._secret, "user_profile") and self._secret.user_profile.id is not None: self._user_profile = self._secret.user_profile @property def secret_serializer(self) -> Optional[SecretSerializer]: """Return the secret meta serializer.""" if self.secret and not self._secret_serializer: self._secret_serializer = SecretSerializer(self.secret) return self._secret_serializer
[docs] def manifest_to_django_orm(self) -> Optional[dict[str, Any]]: """Return a dict for loading the secret Django ORM model.""" if not self.manifest: return None return { "id": self.id, "user_profile": self.user_profile, "name": self.name, "description": self.description, "version": self.version, "annotations": self.annotations, "last_accessed": self.last_accessed, "expires_at": self.expires_at, "encrypted_value": self.encrypted_value, }
@property def user_profile(self) -> Optional[UserProfile]: """Return the user profile.""" return self._user_profile @property def name(self) -> Optional[str]: """ Return the name of the secret. The manifest takes precedence over the secret ORM """ if self._name: return self._name if self._manifest: self._name = self._manifest.metadata.name self._secret = None else: if self._secret: self._name = self._secret.name return self._name @name.setter def name(self, value: str): """Set the name of the secret.""" if not value: self._name = None self._secret = None return if self._manifest: if self._manifest.metadata.name != value: raise SmarterSecretTransformerError( f"Cannot set name of secret to {value} when manifest is set to {self._manifest.metadata.name}." ) if self._secret: if self._secret.name != value: raise SmarterSecretTransformerError( f"Cannot set name of secret to {value} when secret is set to {self._secret.name}." ) self._name = value @property # pylint: disable=too-many-return-statements def ready(self) -> bool: """Return whether SecretTransformer is ready.""" if not self.user_profile: logger.warning("%s.ready() User profile is not set.", self.formatted_class_name) return False # --------------------------------------------------------------------- # validate whether we have either a manifest or a secret instance # --------------------------------------------------------------------- if self._manifest: if not self._manifest.model_validate(self._manifest.model_dump()): logger.warning("%s.ready() Pydantic model is not valid.", self.formatted_class_name) return False return True else: if self.secret: return True logger.warning( "%s.ready() Not in a ready state: No manifest nor secret instance found. ", self.formatted_class_name ) return False @property def data(self) -> Optional[dict]: """Return the secret as a dictionary.""" if self.ready: return self.to_json() return None @property def yaml(self) -> Optional[str]: """Return the secret as a yaml string.""" if self.ready: return yaml.dump(self.to_json()) return None
[docs] def refresh(self) -> bool: """Refresh the secret.""" if self.ready: self.id = self.id # type: ignore[assignment] return self.ready return False
[docs] def yaml_to_json(self, yaml_string: str) -> dict: """Convert a yaml string to a dictionary.""" if self.is_valid_yaml(yaml_string): return yaml.safe_load(yaml_string) raise SmarterSecretTransformerError("Invalid data: must be a dictionary or valid YAML.")
[docs] def is_valid_yaml(self, data) -> bool: """Validate a yaml string.""" try: yaml.safe_load(data) return True except yaml.YAMLError: return False
[docs] def create(self) -> bool: """Create a secret from either yaml or a dictionary.""" if not self._manifest: logger.warning("%s.create() Secret manifest is not set. Cannot create secret.", self.formatted_class_name) return False if self._secret and self._secret.id: self.id = self.secret.id # type: ignore[assignment] logger.debug( "%s.create() Secret %s already exists. Updating secret %s instead.", self.formatted_class_name, self.name, self.secret.id, # type: ignore[union-attr] ) return self.update() secret_data = self.manifest_to_django_orm() if not secret_data: raise SmarterSecretTransformerError( f"{self.formatted_class_name}.create() self.manifest_to_django_orm() returned None." ) secret = Secret.objects.create(**secret_data) self.id = secret.id # type: ignore[assignment] secret_created.send(sender=self.__class__, secret=self) return True
[docs] def update(self) -> bool: """Update a secret.""" if not self._manifest: logger.warning("%s.update() Secret manifest is not set.", self.formatted_class_name) return False if not self.secret: logger.warning("%s.update() Secret does not exist.", self.formatted_class_name) return False manifest_to_django_orm = self.manifest_to_django_orm() if not manifest_to_django_orm: logger.warning("%s.update() Secret Django model is not set.", self.formatted_class_name) return False for attr, value in manifest_to_django_orm.items(): if attr not in READ_ONLY_FIELDS: setattr(self._secret, attr, value) self.secret.save() self.secret.tags.set(self.tags) logger.debug("%s.update() secret %s: %s.", self.formatted_class_name, self.name, self.id) secret_updated.send(sender=self.__class__, secret=self, user_profile=self.user_profile) return True
[docs] def save(self) -> bool: """Save a secret.""" if not self.ready: return False if isinstance(self.secret, Secret): self.secret.save() self.secret.tags.set(self.tags) self.id = self.secret.id # type: ignore[assignment] secret_saved.send(sender=self.__class__, secret=self, user_profile=self.user_profile) return True
[docs] def delete(self) -> bool: """Delete a secret.""" if not self.ready: return False secret_id = self.id secret_name = self.name if isinstance(self.secret, Secret): self.secret.delete() self._secret = None self._secret_serializer = None secret_deleted.send(sender=self.__class__, secret_id=secret_id, secret_name=secret_name) logger.debug("%s.delete() secret %s: %s.", self.formatted_class_name, secret_id, secret_name) return True
[docs] def to_json(self, version: str = "v1") -> Optional[dict[str, Any]]: """ Serialize a secret in JSON format that is importable by Pydantic. """ if not self.ready: return None if version == "v1": return self.manifest.model_dump() raise SmarterSecretTransformerError(f"Invalid version: {version}")