Source code for smarter.common.helpers.aws.aws

"""AWS helper base class."""

# python stuff
import logging
import os
from functools import cached_property
from typing import Optional
from urllib.parse import urlparse

import boto3  # AWS SDK for Python https://boto3.amazonaws.com/v1/documentation/api/latest/index.html
import botocore.exceptions

from smarter.common.conf import services, smarter_settings

# our stuff
from smarter.common.const import (
    SMARTER_API_SUBDOMAIN,
    SmarterEnvironments,
)
from smarter.common.exceptions import SmarterException
from smarter.common.helpers.console_helpers import (
    formatted_text,
    formatted_text_green,
    formatted_text_red,
)
from smarter.common.mixins import SmarterHelperMixin

# mcdaniel apr-2024: technically we shouldn't import smarter.libe.django into the aws helpers
# but the validators don't depend on django initialization, so we're okay here.
from smarter.lib.django.validators import SmarterValidator, SmarterValueError

# from .exceptions import AWSNotReadyError


logger = logging.getLogger(__name__)


class SmarterAWSException(SmarterException):
    """Raised when the hosted zone is not found."""


# pylint: disable=too-many-instance-attributes,too-many-public-methods
[docs] class AWSBase(SmarterHelperMixin): """ Provides a foundational interface for interacting with AWS services in a secure, consistent, and environment-aware manner. This base class is responsible for initializing and managing AWS connections, ensuring that credentials and sessions are established only when the application is in a ready state. It supports multiple authentication strategies, including IAM role-based security (for environments like AWS Lambda), AWS profiles, and direct access key credentials. The class automatically detects the execution environment and adapts its behavior accordingly, such as recognizing when it is running inside AWS infrastructure. AWSBase also implements logic to determine whether the current environment is a recognized and authorized Smarter environment, which is critical for safely creating or modifying billable AWS resources. It provides mechanisms for reformatting and validating domain names, particularly transforming localhost or development domains into proxy domains compatible with AWS Route53 and Kubernetes, thereby supporting seamless local development and cloud deployment. The class exposes properties and methods for accessing AWS account identity, region, authentication sources, and session objects, as well as utility functions for domain validation and environment-specific configuration. It includes robust logging and error handling to facilitate debugging and operational transparency. By centralizing AWS connection logic and environment checks, AWSBase enables derived helper classes to focus on service-specific functionality while maintaining consistent security and configuration practices across the codebase. """ LOCAL_HOSTS = smarter_settings.local_hosts _aws_access_key_id: Optional[str] = None _aws_secret_access_key: Optional[str] = None _aws_region: Optional[str] = None _aws_profile: Optional[str] = None _aws_access_key_id_source: Optional[str] = None _aws_secret_access_key_source: Optional[str] = None _authentication_credentials_are_initialized: bool = False _domain = None _aws_session = None _client = None _client_type: Optional[str] = None _root_domain: str = smarter_settings.root_domain _environment: str = smarter_settings.environment _environment_domain: Optional[str] = None _shared_resource_identifier: Optional[str] = None _debug_mode: bool = False _connected: bool = False _identity: Optional[dict] = None # pylint: disable=too-many-arguments
[docs] def __init__( self, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, aws_region: Optional[str] = None, aws_profile: Optional[str] = None, shared_resource_identifier: Optional[str] = None, environment: Optional[str] = None, environment_domain: Optional[str] = None, root_domain: Optional[str] = None, debug_mode: bool = False, init_info: Optional[str] = None, ): super().__init__() services.raise_error_on_disabled(services.AWS_CLI) logger.debug("%s.__init__() initializing", self.formatted_class_name) self._shared_resource_identifier = shared_resource_identifier or smarter_settings.shared_resource_identifier self._environment = environment or smarter_settings.environment self._root_domain = root_domain or smarter_settings.root_domain self._environment_domain = environment_domain or smarter_settings.environment_platform_domain self._debug_mode = debug_mode or smarter_settings.debug_mode if init_info: logger.debug(init_info) # ---------------------------------------------------------------------- # AWS authentication. Hereon we only want to initialize whatever is # needed to establish a connection to AWS. # ---------------------------------------------------------------------- # priority 1: AWS IAM role based security if not self.authentication_credentials_are_initialized and self.is_aws_deployed: # If we're running inside AWS Lambda, then we don't need to set the AWS credentials. logger.debug("running inside AWS Lambda") self._aws_access_key_id_source = "overridden by IAM role-based security" self._aws_secret_access_key_source = "overridden by IAM role-based security" self._authentication_credentials_are_initialized = True # initialize creentials from smarter_settings unless any of these were passed as parameters self._aws_access_key_id = ( aws_access_key_id or smarter_settings.aws_access_key_id.get_secret_value() if smarter_settings.aws_access_key_id else None ) self._aws_secret_access_key = ( aws_secret_access_key or smarter_settings.aws_secret_access_key.get_secret_value() if smarter_settings.aws_secret_access_key else None ) self._aws_region = aws_region or smarter_settings.aws_region self._aws_profile = aws_profile or smarter_settings.aws_profile # priority 2: aws_profile if not self.authentication_credentials_are_initialized: if self.aws_profile: self._aws_access_key_id_source = "aws_profile" self._aws_secret_access_key_source = "aws_profile" self._authentication_credentials_are_initialized = True # priority 3: aws_access_key_id and aws_secret_access_key if ( not self.authentication_credentials_are_initialized and self.aws_access_key_id and self.aws_secret_access_key and self.aws_region ): self._authentication_credentials_are_initialized = True self._aws_access_key_id_source = "passed parameter" self._aws_secret_access_key_source = "passed parameter" msg = f"{self.formatted_class_name}.__init__() is {self.authentication_credentials_state}" if self.authentication_credentials_are_initialized: logger.debug(msg) else: logger.error(msg)
@property def formatted_class_name(self) -> str: """ Return formatted class name. :return: formatted class name :rtype: str """ return formatted_text(f"{__name__}.{AWSBase.__name__}") @property def client(self): """ Return the AWS client :return: boto3 client instance :rtype: a child of boto3.client """ if self._client: return self._client if not self.client_type: raise SmarterAWSException("Client type is not specified.") if not self.ready: logger.error("%s.client() AWS session is not ready", self.formatted_class_name) return None try: logger.debug("%s.client() creating AWS %s client", self.formatted_class_name, self.client_type.upper()) if not isinstance(self.aws_session, boto3.Session): logger.error( "%s.client() AWS session is not available. Cannot create client.", self.formatted_class_name ) return None self._client = self.aws_session.client(self.client_type) msg = f"{self.formatted_class_name}.client() {formatted_text_green(f'AWS Boto {type(self._client).__name__} client created')}." logger.debug(msg) except botocore.exceptions.BotoCoreError as e: logger.error( "%s.client() Failed to create AWS %s client: %s", self.formatted_class_name, self.client_type.upper(), str(e), ) return None return self._client @property def client_type(self) -> Optional[str]: """ Return the AWS client type. :return: AWS client type :rtype: Optional[str] """ return self._client_type @property def identity(self) -> Optional[dict]: """ Return the AWS identity for the current session. This property retrieves the identity information associated with the current AWS credentials, including the user ID, AWS account number, and IAM ARN. The identity is fetched using the AWS Security Token Service (STS) and is cached for subsequent calls. :return: A dictionary containing AWS identity details, or None if not available. :rtype: Optional[dict] Example output: .. code-block:: json { "UserId": "AIDARKEXDU3E7KD3L3CRF", "Account": "090511222473", "Arn": "arn:aws:iam::090511222473:user/mcdaniel", "ResponseMetadata": { "RequestId": "4d20b844-7e75-4980-9e92-ca0867b24387", "HTTPStatusCode": 200, "HTTPHeaders": { "x-amzn-requestid": "4d20b844-7e75-4980-9e92-ca0867b24387", "x-amz-sts-extended-request-id": "MTp1cy1lYXN0LTI6UzoxNzYzNTc5NjUyMzA0OlI6MVFkeG8yZ1Q=", "content-type": "text/xml", "content-length": "405", "date": "Wed, 19 Nov 2025 19:14:12 GMT" }, "RetryAttempts": 0 } } """ if self._identity: return self._identity if not self.authentication_credentials_are_initialized: logger.warning("%s.identity requested but AWSBase is not initialized", self.formatted_class_name) return None try: logger.debug("%s.identity fetching AWS IAM identity", self.formatted_class_name) self._identity = self.aws_session.client("sts").get_caller_identity() if self.aws_session else None # pylint: disable=broad-exception-caught except Exception as e: msg = f"{self.formatted_class_name}.identity {formatted_text_red('could not fetch AWS IAM identity due to an error')}: {e}" logger.error(msg) self._identity = None if self._identity: msg = f"{self.formatted_class_name}.identity {formatted_text_green('successfully fetched AWS IAM identity.')}: {self._identity}" logger.debug(msg) else: msg = f"{self.formatted_class_name}.identity {formatted_text_red('could not fetch AWS IAM identity.')}" logger.error(msg) return self._identity
[docs] @cached_property def version(self): """ Return the version. :return: boto3 version :rtype: str """ return boto3.__version__
@property def debug_mode(self): """ Debug mode :return: debug mode :rtype: bool """ return self._debug_mode @property def authentication_credentials_are_initialized(self): """ Are aws authentication settings initialized? True if we have enoug information to try to connect to AWS. False otherwise. :return: True if initialized :rtype: bool """ return self._authentication_credentials_are_initialized @property def is_aws_deployed(self) -> bool: """ Return True if we're running inside of AWS Lambda. :return: True if running inside AWS Lambda :rtype: bool """ return bool(os.environ.get("AWS_DEPLOYED", False)) @property def aws_is_configured(self) -> bool: """ Return True if AWS is configured. :return: True if AWS is configured :rtype: bool """ return smarter_settings.aws_is_configured @property def aws_profile(self): """ AWS profile :return: AWS profile :rtype: Optional[str] """ return self._aws_profile @property def aws_account_id(self): """ AWS account id :return: AWS account id :rtype: Optional[str] """ if not self.ready: return None if not isinstance(self.identity, dict): return None return self.identity.get("Account", None) @property def aws_iam_arn(self): """ AWS IAM ARN :return: AWS IAM ARN (Amazon Resource Name) :rtype: Optional[str] """ if not self.ready: return None if not isinstance(self.identity, dict): return None return self.identity.get("Arn", None) @property def aws_region(self): """ AWS region :return: AWS region (e.g. 'us-west-2') :rtype: Optional[str] """ return self._aws_region @property def aws_access_key_id_source(self): """ AWS access key id source :return: AWS access key id source :rtype: Optional[str] """ return self._aws_access_key_id_source @property def aws_access_key_id(self): """ AWS access key id :return: AWS access key id :rtype: Optional[str] """ return self._aws_access_key_id @property def aws_secret_access_key_source(self): """ AWS secret access key source :return: AWS secret access key source :rtype: Optional[str] """ return self._aws_secret_access_key_source @property def aws_secret_access_key(self): """ AWS secret access key :return: AWS secret access key :rtype: Optional[str] """ return self._aws_secret_access_key @property def aws_auth(self) -> dict: """ AWS authentication :return: AWS authentication details :rtype: dict """ retval = { "aws_profile": self.aws_profile, "aws_access_key_id_source": self.aws_access_key_id_source, "aws_secret_access_key_source": self.aws_secret_access_key_source, "aws_region": self.aws_region, } return retval @property def aws_session(self): """ AWS session :return: boto3 AWS session :rtype: Optional[boto3.Session] """ if self._aws_session: return self._aws_session logger.debug("%s.aws_session() establishing AWS boto session", self.formatted_class_name) if not self.authentication_credentials_are_initialized: msg = f"{self.formatted_class_name}.aws_session() {formatted_text_red('AWSBase is not initialized')}" logger.error(msg) return None if self.aws_profile: try: self._aws_session = boto3.Session(profile_name=self.aws_profile, region_name=self.aws_region) except botocore.exceptions.ProfileNotFound: msg = f"{self.formatted_class_name}.aws_session() {formatted_text_red(f'Unable to establish AWS boto session: aws_profile {self.aws_profile} not found')}" logger.error(msg) if self._aws_session: msg = f"{self.formatted_class_name}.aws_session() {formatted_text_green('established AWS boto session using aws_profile')}: {self.aws_profile}" logger.debug(msg) return self._aws_session else: msg = f"{self.formatted_class_name}.aws_session() {formatted_text_red('Unable to establish AWS boto session using aws_profile')}: {self.aws_profile}" logger.error(msg) if self.aws_access_key_id is not None and self.aws_secret_access_key is not None: try: self._aws_session = boto3.Session( region_name=self.aws_region, aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key, ) except Exception as e: # pylint: disable=broad-exception-caught msg = f"{self.formatted_class_name}.aws_session() {formatted_text_red('Encountered an error while attempting to establish AWS boto session using aws key-pair')}: {e}" logger.error(msg) if self._aws_session: msg = f"{self.formatted_class_name}.aws_session() {formatted_text_green('established AWS boto session using aws key-pair')}" logger.debug(msg) return self._aws_session else: msg = f"{self.formatted_class_name}.aws_session() {formatted_text_red('Unable to establish AWS boto session using aws key-pair')}" logger.error(msg) logger.warning("%s.aws_session() creating new aws_session without aws credentials", self.formatted_class_name) try: self._aws_session = boto3.Session(region_name=self.aws_region) except Exception as e: # pylint: disable=broad-exception-caught msg = f"{self.formatted_class_name}.aws_session() {formatted_text_red('Encountered an error while attempting to establish AWS boto session without aws credentials')}: {e}" logger.error(msg) if self._aws_session: msg = f"{self.formatted_class_name}.aws_session() {formatted_text_green('established AWS boto session.')}" logger.debug(msg) else: msg = f"{self.formatted_class_name}.aws_session() {formatted_text_red('Unable to establish AWS boto session.')}" logger.error(msg) return self._aws_session @property def shared_resource_identifier(self): """ Return the shared resource identifier. :return: shared resource identifier :rtype: Optional[str] """ return self._shared_resource_identifier @property def environment(self) -> str: """ Return the environment. :return: environment :rtype: str """ return self._environment @property def environment_domain(self) -> str: """ we need to rebuild these in order to reformat the localhost domain into a proxy domain that will work with AWS Route53 and Kubernetes :return: environment domain :rtype: str """ return smarter_settings.environment_platform_domain @property def environment_api_domain(self) -> str: """ we need to rebuild these in order to reformat the localhost domain into a proxy domain that will work with AWS Route53 and Kubernetes :return: environment API domain :rtype: str """ return f"{self.environment}.{SMARTER_API_SUBDOMAIN}.{self.root_domain}" @property def root_domain(self) -> str: """ Return the root domain. :return: root domain :rtype: str """ return self._root_domain # -------------------------------------------------------------------------- # helper functions # --------------------------------------------------------------------------
[docs] def domain_resolver(self, domain: str) -> str: """ Validate the domain and swap out localhost for the proxy domain. :param domain: domain to validate :type domain: str """ if self.environment == SmarterEnvironments.LOCAL: proxy_domain: Optional[str] = None if smarter_settings.environment_platform_domain in domain: proxy_domain = domain.replace(smarter_settings.environment_platform_domain, self.environment_domain) if smarter_settings.environment_api_domain in domain: proxy_domain = domain.replace(smarter_settings.environment_api_domain, self.environment_api_domain) if proxy_domain: SmarterValidator.validate_domain(domain) logger.debug("replacing %s with proxy domain %s", domain, proxy_domain) return proxy_domain # catch-all to ensure that we don't find ourselves working # with anything boneheaded. parsed_domain = urlparse(f"http://{domain}") root_domain = parsed_domain.netloc if root_domain in self.LOCAL_HOSTS: raise SmarterValueError(f"Domain {root_domain} is prohibited.") # if we're not in a local environment, we don't need to do anything SmarterValidator.validate_domain(domain) return domain
# -------------------------------------------------------------------------- # AWS state functions # -------------------------------------------------------------------------- @property def ready(self) -> bool: """ Return True if we're working with a known Smarter environment, and we consider it safe to create billable resources in AWS. :return: True if ready :rtype: bool """ if not isinstance(self.identity, dict): return False return True @property def authentication_credentials_state(self) -> str: """ Return formatted ready state. :return: formatted ready state :rtype: str """ if self.authentication_credentials_are_initialized: return self.formatted_state_ready else: return self.formatted_state_not_ready