"""A utility class for introspecting AWS infrastructure."""
import logging
# python stuff
import time
from typing import Optional
# our stuff
from .aws import AWSBase
from .exceptions import AWSACMVerificationFailed
logger = logging.getLogger(__name__)
[docs]
class AWSCertificateManager(AWSBase):
"""
AWS Certificate Manager helper class. Provides a high-level interface for managing AWS Certificate Manager (ACM) resources.
This helper class encapsulates common operations related to ACM, such as requesting new certificates,
retrieving certificate details, handling DNS validation, and verifying certificate status. It abstracts
the complexities of interacting directly with the AWS SDK, offering a streamlined way to automate
certificate management tasks within AWS environments.
The class also integrates with AWS Route53 to facilitate DNS-based validation by automatically creating
or retrieving the necessary DNS records for certificate verification. It is designed to be used as part
of a broader AWS automation or orchestration workflow, ensuring that certificates are requested,
validated, and managed efficiently.
Logging is provided throughout to assist with debugging and operational visibility. Exceptions are
raised for error conditions, such as failed verification or missing resources, to allow for robust
error handling in consuming code.
"""
_client = None
_route53 = None
_client_type: str = "acm"
@property
def route53(self):
"""
Return the AWS Route53 helper.
:return: AWSRoute53 helper instance
:rtype: AWSRoute53
"""
if self._route53 is None:
# pylint: disable=import-outside-toplevel
from .route53 import AWSRoute53
self._route53 = AWSRoute53()
return self._route53
[docs]
def get_certificate_arn(self, domain_name) -> Optional[str]:
"""
Return the certificate ARN.
:param domain_name: The domain name to search for.
:type domain_name: str
:return: The certificate ARN if found, else None.
:rtype: Optional[str]
"""
response = self.client.list_certificates()
for certificate in response["CertificateSummaryList"]:
if certificate["DomainName"] == domain_name:
return certificate["CertificateArn"]
return None
[docs]
def get_certificate_status(self, certificate_arn: str) -> dict:
"""
Return the certificate status
see example return in ./data/aws/certificate_detail.json
:param certificate_arn: The ARN of the certificate.
:type certificate_arn: str
:return: The certificate details.
:rtype: dict
"""
sleep_interval = 5
max_attempts = int(600 / sleep_interval)
attempts = 0
while True:
try:
certificate_detail = self.client.describe_certificate(CertificateArn=certificate_arn)
# look for a DNS ResourceRecord in the DomainValidationOptions for the Certificate
certificate = certificate_detail.get("Certificate")
if certificate:
domain_validation_options = certificate.get("DomainValidationOptions")
if domain_validation_options:
resource_record = domain_validation_options[0].get("ResourceRecord")
if resource_record:
logger.debug("Found DNS records for ACM certificate ARN: %s", certificate_arn)
return certificate_detail
logger.debug("Waiting for DNS records to be generated for ACM certificate ARN: %s", certificate_arn)
attempts += 1
time.sleep(sleep_interval)
except self.client.exceptions.ResourceNotFoundException as e:
attempts += 1
if attempts >= max_attempts:
raise e(f"Failed to get certificate details for AWS ACM certificate ARN {certificate_arn}") from e
# Wait for a while before describing the certificate
# as it can take a few seconds for ACM to generate the DNS records
time.sleep(sleep_interval)
[docs]
def get_or_create_certificate(self, domain_name) -> str:
"""
Return the certificate ARN.
:param domain_name: The domain name for the certificate.
:type domain_name: str
:return: The certificate ARN.
:rtype: str
"""
# look for existing certificate
certificate_arn = self.get_certificate_arn(domain_name)
if not certificate_arn:
# create a new certificate since we didn't find an existing one
response = self.client.request_certificate(
DomainName=domain_name,
ValidationMethod="DNS",
SubjectAlternativeNames=[f"*.{domain_name}"],
)
certificate_arn = response["CertificateArn"]
return certificate_arn
[docs]
def get_or_create_certificate_dns_record(self, certificate_arn: str) -> dict:
"""
Get or create the DNS verification record for the certificate.
:param certificate_arn: The ARN of the certificate.
:type certificate_arn: str
:return: The DNS record.
:rtype: dict
"""
# get the certificate details
certificate_detail = self.get_certificate_status(certificate_arn=certificate_arn)
dns_records = certificate_detail["Certificate"]["DomainValidationOptions"]
domain_name = dns_records[0]["DomainName"]
resource_record = dns_records[0]["ResourceRecord"]
dns_record_name = resource_record["Name"]
dns_record_type = resource_record["Type"]
dns_record_value = resource_record["Value"]
hosted_zone, _ = self.route53.get_or_create_hosted_zone(domain_name)
hosted_zone_id = self.route53.get_hosted_zone_id(hosted_zone)
dns_record, _ = self.route53.get_or_create_dns_record(
hosted_zone_id=hosted_zone_id,
record_name=dns_record_name,
record_type=dns_record_type,
record_value=dns_record_value,
record_ttl=300,
)
return dns_record
[docs]
def certificate_is_verified(self, certificate_arn: str) -> bool:
"""
Return whether the certificate is verified.
:param certificate_arn: The ARN of the certificate.
:type certificate_arn: str
:return: True if the certificate is verified, else False.
:rtype: bool
"""
certificate_detail = self.get_certificate_status(certificate_arn=certificate_arn)
return certificate_detail["Certificate"]["Status"] == "SUCCESS"
[docs]
def verify_certificate(self, certificate_arn: str) -> bool:
"""
Verify the ACM certificate.
:param certificate_arn: The ARN of the certificate.
:type certificate_arn: str
:return: True if the certificate is verified, else False.
:rtype: bool
"""
sleep_interval = 30
max_attempts = int(600 / sleep_interval)
attempts = 0
if self.certificate_is_verified(certificate_arn):
return True
while not self.certificate_is_verified(certificate_arn=certificate_arn):
attempts += 1
if attempts >= max_attempts:
try:
raise AWSACMVerificationFailed(f"Failed to verify ACM certificate ARN {certificate_arn}")
except AWSACMVerificationFailed as e:
logger.exception(e)
return False
time.sleep(sleep_interval)
return True
[docs]
def delete_certificate(self, certificate_arn: str):
"""
Delete the certificate.
:param certificate_arn: The ARN of the certificate.
:type certificate_arn: str
"""
try:
self.client.delete_certificate(CertificateArn=certificate_arn)
except self.client.exceptions.ResourceNotFoundException:
pass