Source code for smarter.common.helpers.k8s_helpers

"""A module for interacting with Kubernetes clusters."""

# pylint: disable=W0613

import logging
import os
import subprocess
import time
from typing import Optional, Tuple, Union

from smarter.common.conf import smarter_settings
from smarter.common.exceptions import SmarterException
from smarter.common.helpers.aws_helpers import aws_helper
from smarter.common.helpers.console_helpers import (
    formatted_text,
    formatted_text_green,
    formatted_text_red,
)
from smarter.common.mixins import Singleton, SmarterHelperMixin
from smarter.common.utils import get_readonly_yaml_file
from smarter.lib import json

logger = logging.getLogger(__name__)
module_prefix = formatted_text(f"{__name__}.KubernetesHelper")


class KubernetesHelperException(SmarterException):
    """Base class for Kubernetes helper exceptions."""


[docs] class KubernetesHelper(SmarterHelperMixin, metaclass=Singleton): """ Helper class for interacting with Kubernetes clusters. This class provides a set of utility methods to manage and verify Kubernetes resources such as Ingresses, Certificates, and Secrets, as well as to apply manifests and update kubeconfig files for EKS clusters. It is designed to be used as a singleton and integrates with AWS EKS and cert-manager. Attributes ---------- _kubeconfig : dict The loaded kubeconfig as a dictionary. _configured : bool Indicates whether the kubeconfig has been configured. Parameters ---------- kubeconfig : dict, optional The kubeconfig dictionary to use. If not provided, a default is used. configured : bool, optional Whether the helper is already configured. """ _kubeconfig: Optional[dict] = None _configured: bool = False _namespace_verified: bool = False
[docs] def __init__(self, kubeconfig: Optional[dict] = None, configured: bool = False, **kwargs): super().__init__() default_kubeconfig = {"apiVersion": "v1"} self._configured = configured self._kubeconfig = kubeconfig or default_kubeconfig logger.debug( "%s initialized with kubeconfig: %s, configured: %s", module_prefix, self._kubeconfig, self._configured, )
@property def ready(self) -> bool: """ Return whether the Kubernetes helper is ready for use. Returns True if Kubernetes is configured, AND, the Kubernetes namespace exists. :return: True if ready, False otherwise. :rtype: bool smarter_settings.environment_namespace """ if not aws_helper.ready: msg = f"{module_prefix}.ready() {formatted_text_red('AWS not ready, cannot configure KubernetesHelper')}" logger.warning(msg) return False if not self.configured: msg = f"{module_prefix}.ready() {formatted_text_red('KubernetesHelper not configured')}" logger.warning(msg) return False if not self.namespace_verified: msg = f"{module_prefix}.ready() {formatted_text_red(f'KubernetesHelper namespace {smarter_settings.environment_namespace} does not exist')}" logger.warning(msg) return False is_ready = self.configured and self.namespace_verified if is_ready: msg = f"{module_prefix}.ready() - {self.formatted_state_ready}." logger.info(msg) return True msg = f"{module_prefix}.ready() - {self.formatted_state_not_ready}" logger.error(msg) return False @property def namespace_verified(self) -> bool: """ Return whether the Kubernetes namespace has been verified. :return: True if the namespace has been verified, False otherwise. :rtype: bool """ if self._namespace_verified: return True self._namespace_verified = self.verify_namespace(smarter_settings.environment_namespace) return self._namespace_verified @property def configured(self) -> bool: """ Return whether the kubeconfig has been configured. :return: True if configured, False otherwise. :rtype: bool """ if self._configured: return True self._configured = self.update_kubeconfig() return self._configured @property def kubeconfig_path(self) -> str: """ Return the path to the kubeconfig file. :return: The kubeconfig file path. :rtype: str """ return os.path.join(smarter_settings.data_directory, ".kube", "config") @property def kubeconfig(self) -> dict: """ Return the kubeconfig file as a dictionary. :return: The kubeconfig dictionary. :rtype: dict """ if self._kubeconfig: return self._kubeconfig self._kubeconfig = get_readonly_yaml_file(self.kubeconfig_path) logger.info("%s loaded kubeconfig from path %s", module_prefix, self.kubeconfig_path) return self._kubeconfig
[docs] def update_kubeconfig(self) -> bool: """ Generate a fresh kubeconfig file for the EKS cluster. This method uses the AWS CLI to update the kubeconfig file for the specified EKS cluster. :raises KubernetesHelperException: If the kubeconfig update fails. :return: True if the kubeconfig was updated successfully, False otherwise. :rtype: bool """ logger.info( "%s.update_kubeconfig() updating kubeconfig for Kubernetes cluster %s", module_prefix, smarter_settings.aws_eks_cluster_name, ) command = [ "aws", "eks", "update-kubeconfig", "--region", smarter_settings.aws_region, "--name", smarter_settings.aws_eks_cluster_name, ] try: subprocess.check_call(command) self._configured = True logger.info("%s.update_kubeconfig() kubeconfig updated successfully", module_prefix) return True except subprocess.CalledProcessError as e: self._configured = False msg = f"{module_prefix}.update_kubeconfig() {formatted_text_red('Failed to update kubeconfig')}: {e}" logger.error(msg) return False
[docs] def apply_manifest(self, manifest: str): """ Apply a Kubernetes manifest to the cluster. :param manifest: The Kubernetes manifest as a string. :type manifest: str :raises KubernetesHelperException: If applying the manifest fails. :return: None :rtype: None """ logger.info( "%s.apply_manifest() applying Kubernetes manifest to cluster %s:\n%s", module_prefix, smarter_settings.aws_eks_cluster_name, manifest, ) if not self.ready: return None with subprocess.Popen( ["kubectl", "apply", "-f", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE ) as process: _, stderr = process.communicate(input=manifest.encode()) if process.returncode != 0: msg = f"{module_prefix}.apply_manifest() {formatted_text_red('Failed to apply manifest')}: {stderr.decode()}" logger.error(msg) raise KubernetesHelperException(f"Failed to apply manifest: {stderr.decode()}")
[docs] def verify_namespace(self, namespace: str) -> bool: """ Verify that a namespace exists in the cluster. :param namespace: The name of the namespace. :type namespace: str :return: True if the namespace exists, False otherwise. :rtype: bool """ logger.info( "%s.verify_namespace() verifying namespace in cluster %s, name %s", module_prefix, smarter_settings.aws_eks_cluster_name, namespace, ) if not self.configured: return False command = ["kubectl", "get", "namespace", namespace, "-o", "json"] try: output = subprocess.check_output(command) json.loads(output) logger.info("%s found namespace resource %s", module_prefix, namespace) except subprocess.CalledProcessError: logger.warning("%s did not find namespace resource %s", module_prefix, namespace) return False except json.JSONDecodeError as e: logger.exception("%s failed to parse namespace resource: %s", module_prefix, e) return False return True
[docs] def verify_ingress_resources(self, hostname: str, namespace: str) -> Tuple[bool, bool, bool]: """ Verify that an ingress and all child resources exist in the cluster. commands: - kubectl get ingress education.3141-5926-5359.api.example.com -n smarter-platform-prod -o json - kubectl get certificate education.3141-5926-5359.api.example.com-tls -n smarter-platform-prod -o json - kubectl get secret education.3141-5926-5359.api.example.com-tls -n smarter-platform-prod -o json :param hostname: The hostname of the ingress. :type hostname: str :param namespace: The namespace of the ingress. :type namespace: str :return: A tuple of booleans indicating whether the ingress, certificate, and secret were verified. :rtype: Tuple[bool, bool, bool] """ logger.debug( "%s.verify_ingress_resources() verifying ingress resources in cluster %s, hostname %s, namespace %s", module_prefix, smarter_settings.aws_eks_cluster_name, hostname, namespace, ) ingress_name = hostname ingress_verified = self.verify_ingress(ingress_name, namespace) secret_name = f"{hostname}-tls" secret_verified = self.verify_secret(secret_name, namespace) certificate_name = secret_name max_attempts = 30 sleep_time = 60 # attempt to verify the certificate once per minute for up to a half hour. for _ in range(max_attempts): certificate_verified = self.verify_certificate(certificate_name, namespace) if certificate_verified: break logger.debug( "%s.verify_ingress_resources() certificate %s %s not ready, sleeping for %s seconds", module_prefix, hostname, namespace, sleep_time, ) time.sleep(sleep_time) else: logger.error( "%s.verify_ingress_resources() certificate not ready after %s attempts", module_prefix, max_attempts, ) return ingress_verified, certificate_verified, secret_verified
[docs] def verify_ingress(self, name: str, namespace: str) -> bool: """ Verify that an Ingress resource exists in the cluster. commands: - kubectl get ingress smarter.3141-5926-5359.api.example.com -n smarter-platform-prod -o json :param name: The name of the ingress. :type name: str :param namespace: The namespace of the ingress. :type namespace: str :return: True if the ingress exists, False otherwise. :rtype: bool """ logger.debug( "%s verifying ingress in cluster %s, name %s, namespace %s", module_prefix, smarter_settings.aws_eks_cluster_name, name, namespace, ) if not self.ready: return False command = ["kubectl", "get", "ingress", name, "-n", namespace, "-o", "json"] try: output = subprocess.check_output(command) json.loads(output) logger.debug("%s found ingress resource %s %s", module_prefix, name, namespace) except subprocess.CalledProcessError: logger.warning("%s did not find ingress resource %s %s", module_prefix, name, namespace) return False except json.JSONDecodeError as e: logger.exception("%s failed to parse ingress resource: %s", module_prefix, e) return False return True
[docs] def verify_certificate(self, name: str, namespace: str) -> bool: """ Verify that a cert-manager certificate resource exists in the cluster and is in a ready state. command: - kubectl get certificate smarter.3141-5926-5359.api.example.com-tls -n smarter-platform-prod -o json parse json response and check for the following: - status.conditions.type == Ready :param name: The name of the certificate. :type name: str :param namespace: The namespace of the certificate. :type namespace: str :return: True if the certificate exists and is ready, False otherwise. :rtype: bool """ logger.debug( "%s verifying certificate in cluster %s, name %s, namespace %s", module_prefix, smarter_settings.aws_eks_cluster_name, name, namespace, ) if not self.ready: return False command = ["kubectl", "get", "certificate", name, "-n", namespace, "-o", "json"] # if the certificate is found, the output will be the certificate data in json format. try: output = subprocess.check_output(command, text=True) logger.debug("%s found certificate resource for %s %s", module_prefix, name, namespace) certificate_info: dict try: certificate_info = json.loads(output) logger.debug("%s parsed json certificate data %s %s", module_prefix, name, namespace) except json.JSONDecodeError as e: logger.exception("%s Failed to parse certificate resource: %s", module_prefix, e) return False # try to parse the json data and check if the certificate is ready. # status.conditions.status == True and status.conditions.type == Ready try: ready_status = next( ( condition["status"] for condition in certificate_info["status"]["conditions"] if condition["type"] == "Ready" ), None, ) certificate_issued = str(ready_status).lower() == "true" if certificate_issued: logger.debug( "%s Certificate %s in namespace %s is issued and in a ready state.", module_prefix, name, namespace, ) else: logger.warning( "%s Certificate %s in namespace %s is not ready. Status: %s", module_prefix, name, namespace, ready_status, ) return False except KeyError as e: logger.exception( "%s Could not parse certificate json data for %s %s: %s", module_prefix, name, namespace, e ) return False except subprocess.CalledProcessError as e: logger.warning("%s Failed to retrieve certificate %s %s", module_prefix, name, namespace) return False return True
[docs] def verify_secret(self, name: str, namespace: str) -> bool: """ Verify that a secret resource exists in the cluster. command: - kubectl get secret smarter.3141-5926-5359.api.example.com-tls -n smarter-platform-prod -o json :param name: The name of the secret. :type name: str :param namespace: The namespace of the secret. :type namespace: str :return: True if the secret exists, False otherwise. :rtype: bool """ logger.debug( "%s verifying secret in cluster %s, name %s, namespace %s", module_prefix, smarter_settings.aws_eks_cluster_name, name, namespace, ) if not self.ready: return False command = ["kubectl", "get", "secret", name, "-n", namespace, "-o", "json"] # if the secret is found, the output will be the secret data in json format. try: output = subprocess.check_output(command) json.loads(output) logger.debug("%s secret %s in namespace %s is ready", module_prefix, name, namespace) return True except subprocess.CalledProcessError: logger.error("%s Failed to verify secret resource %s %s", module_prefix, name, namespace) return False except json.JSONDecodeError as e: logger.exception("%s Failed to parse secret resource: %s", module_prefix, e) return False return True
[docs] def delete_ingress_resources(self, hostname: str, namespace: str) -> Tuple[bool, bool, bool]: """ Delete an ingress and all child resources from the cluster. commands: - kubectl delete ingress education.3141-5926-5359.api.example.com -n smarter-platform-prod - kubectl delete certificate education.3141-5926-5359.api.example.com-tls -n smarter-platform-prod - kubectl delete secret education.3141-5926-5359.api.example.com-tls -n smarter-platform-prod :param hostname: The hostname of the ingress. :type hostname: str :param namespace: The namespace of the ingress. :type namespace: str :return: A tuple of booleans indicating whether the ingress, certificate, and secret were deleted. :rtype: Tuple[bool, bool, bool] """ logger.debug( "%s.delete_ingress_resources() deleting ingress resources from cluster %s, hostname %s, namespace %s", self.formatted_class_name, smarter_settings.aws_eks_cluster_name, hostname, namespace, ) ingress_name = hostname ingress_deleted = self.delete_ingress(ingress_name, namespace) certificate_name = f"{hostname}-tls" certificate_deleted = self.delete_certificate(certificate_name, namespace) secret_name = certificate_name secret_deleted = self.delete_secret(secret_name, namespace) return ingress_deleted, certificate_deleted, secret_deleted
[docs] def delete_ingress(self, ingress_name: str, namespace: str) -> bool: """ Delete an Ingress resource from the cluster. command: - kubectl delete ingress education.3141-5926-5359.api.example.com -n smarter-platform-prod :param ingress_name: The name of the ingress. :type ingress_name: str :param namespace: The namespace of the ingress. :type namespace: str :return: True if the ingress was deleted, False otherwise. :rtype: bool """ logger.debug( "%s.delete_ingress() deleting ingress from cluster %s, name %s, namespace %s", self.formatted_class_name, smarter_settings.aws_eks_cluster_name, ingress_name, namespace, ) if not self.ready: return False command = ["kubectl", "delete", "ingress", ingress_name, "-n", namespace] try: subprocess.check_call(command) except subprocess.CalledProcessError as error: logger.error("Failed to delete ingress resource: %s", error) return False return True
[docs] def delete_certificate(self, certificate_name: str, namespace: str) -> bool: """ Delete a cert-manager certificate resource from the cluster. command: - kubectl delete certificate education.3141-5926-5359.api.example.com-tls -n smarter-platform-prod :param certificate_name: The name of the certificate. :type certificate_name: str :param namespace: The namespace of the certificate. :type namespace: str :return: True if the certificate was deleted, False otherwise. :rtype: bool """ logger.debug( "%s.delete_ingress() deleting certificate from cluster %s, certificate_name %s, namespace %s", self.formatted_class_name, smarter_settings.aws_eks_cluster_name, certificate_name, namespace, ) if not self.ready: return False command = ["kubectl", "delete", "certificate", certificate_name, "-n", namespace] try: subprocess.check_call(command) except subprocess.CalledProcessError as error: logger.error("Failed to delete certificate resource: %s", error) return False return True
[docs] def delete_secret(self, secret_name: str, namespace: str) -> bool: """ Delete a secret resource from the cluster. commands: - kubectl delete secret education.3141-5926-5359.api.example.com-tls -n smarter-platform-prod :param secret_name: The name of the secret. :type secret_name: str :param namespace: The namespace of the secret. :type namespace: str :return: True if the secret was deleted, False otherwise. :rtype: bool """ logger.debug( "%s.delete_ingress() deleting secret from cluster %s, secret_name %s, namespace %s", self.formatted_class_name, smarter_settings.aws_eks_cluster_name, secret_name, namespace, ) if not self.ready: return False command = ["kubectl", "delete", "secret", secret_name, "-n", namespace] try: subprocess.check_call(command) except subprocess.CalledProcessError as error: logger.error("Failed to delete secret resource: %s", error) return False return True
[docs] def get_namespaces(self) -> Union[dict, None]: """ Get all namespaces in the Kubernetes cluster. :return: A dictionary of namespaces. :rtype: dict """ logger.debug("retrieving namespaces from Kubernetes cluster %s", smarter_settings.aws_eks_cluster_name) if not self.ready: return None output = subprocess.check_output(["kubectl", "get", "pods", "-n", "kube-system", "-o", "json"]) output_dict = json.loads(output) return output_dict
kubernetes_helper = KubernetesHelper()