Source code for smarter.apps.connection.receivers

# pylint: disable=W0613
"""Django signal receivers for connection app."""

from typing import Optional

from django.db.models.signals import post_save, pre_delete
from django.dispatch import receiver
from django.forms.models import model_to_dict
from requests import Response

from smarter.common.conf import smarter_settings
from smarter.common.exceptions import SmarterConfigurationError
from smarter.common.helpers.console_helpers import formatted_json, formatted_text
from smarter.lib import logging
from smarter.lib.django.waffle import SmarterWaffleSwitches
from smarter.lib.manifest.broker import AbstractBroker

from .models import (
    ApiConnection,
    SqlConnection,
)
from .signals import (
    api_connection_attempted,
    api_connection_failed,
    api_connection_query_attempted,
    api_connection_query_failed,
    api_connection_query_success,
    api_connection_success,
    broker_ready,
    sql_connection_attempted,
    sql_connection_failed,
    sql_connection_query_attempted,
    sql_connection_query_failed,
    sql_connection_query_success,
    sql_connection_success,
    sql_connection_validated,
)

logger = logging.getSmarterLogger(__name__, any_switches=[SmarterWaffleSwitches.RECEIVER_LOGGING])

prefix = formatted_text(__name__)


# ------------------------------------------------------------------------------
# Django model receivers.
# ------------------------------------------------------------------------------
[docs] @receiver(post_save, sender=ApiConnection) def handle_api_connection_saved(sender, instance, created, **kwargs): """Handle API connection saved signal.""" if created: logger.info( "%s - %s", formatted_text(prefix + "post_save() ApiConnection() created"), formatted_json(model_to_dict(instance)), ) else: logger.info( "%s - %s", formatted_text(prefix + "post_save() ApiConnection() updated"), formatted_json(model_to_dict(instance)), )
[docs] @receiver(post_save, sender=SqlConnection) def handle_sql_connection_saved(sender, instance: SqlConnection, created, **kwargs): """Handle SQL connection saved signal.""" user_profile = str(instance.user_profile) if instance.user_profile else "(user_profile is missing)" if created: logger.info( "%s - %s %s", formatted_text(prefix + "post_save() SqlConnection() created"), user_profile, formatted_json(model_to_dict(instance)), ) else: logger.info( "%s - %s %s", formatted_text(prefix + "post_save() SqlConnection() updated"), user_profile, formatted_json(model_to_dict(instance)), )
[docs] def masked_dict(dic: dict) -> dict: """Mask sensitive data in a dictionary.""" masked = dic.copy() if "PASSWORD" in masked: masked["PASSWORD"] = "********" return masked
[docs] @receiver(sql_connection_attempted, dispatch_uid="sql_connection_attempted") def handle_sql_connection_attempted(sender, connection: SqlConnection, **kwargs): """Handle SQL connection attempted signal.""" logger.info( "%s - %s", formatted_text(prefix + "sql_connection_attempted()"), connection.get_connection_string(), )
[docs] @receiver(sql_connection_success, dispatch_uid="sql_connection_success") def handle_sql_connection_success(sender, connection: SqlConnection, **kwargs): """Handle SQL connection success signal.""" logger.info( "%s - %s", formatted_text(prefix + "sql_connection_success()"), connection.get_connection_string(), )
[docs] @receiver(sql_connection_validated, dispatch_uid="sql_connection_validated") def handle_sql_connection_validated(sender, connection: SqlConnection, **kwargs): """Handle SQL connection validated signal.""" logger.info( "%s - %s", formatted_text(prefix + "sql_connection_validated()"), connection.get_connection_string(), )
[docs] @receiver(sql_connection_failed, dispatch_uid="sql_connection_failed") def handle_sql_connection_failed(sender, connection: SqlConnection, error: str, **kwargs): """Handle SQL connection failed signal.""" logger.error( "%s - %s - error: %s", formatted_text(prefix + "sql_connection_failed()"), connection.get_connection_string(masked=not smarter_settings.debug_mode), error, ) raise SmarterConfigurationError( f"Remote SQL Connection {connection.get_connection_string(masked=not smarter_settings.debug_mode)} failed: {error}" ) from None
[docs] @receiver(sql_connection_query_attempted, dispatch_uid="sql_connection_query_attempted") def handle_sql_connection_query_attempted(sender, connection: SqlConnection, sql: str, limit: int, **kwargs): """Handle SQL connection query attempted signal.""" logger.info( "%s - %s - sql: %s - limit: %s", formatted_text(prefix + "sql_connection_query_attempted()"), connection.get_connection_string(), sql, limit, )
[docs] @receiver(sql_connection_query_success, dispatch_uid="sql_connection_query_success") def handle_sql_connection_query_success(sender, connection: SqlConnection, sql: str, limit: int, **kwargs): """Handle SQL connection query success signal.""" logger.info( "%s - %s - sql: %s - limit: %s", formatted_text(prefix + "sql_connection_query_success()"), connection.get_connection_string(), sql, limit, )
[docs] @receiver(sql_connection_query_failed, dispatch_uid="sql_connection_query_failed") def handle_sql_connection_query_failed(sender, connection: SqlConnection, sql: str, limit: int, error: str, **kwargs): """Handle SQL connection query failed signal.""" logger.info( "%s - %s - sql: %s - limit: %s - error: %s", formatted_text(prefix + "sql_connection_query_failed()"), connection.get_connection_string(), sql, limit, error, ) raise SmarterConfigurationError( f"Remote SQL {connection.get_connection_string()} query execution failed {sql}: {error}" )
[docs] @receiver(api_connection_attempted, dispatch_uid="api_connection_attempted") def handle_api_connection_attempted(sender, connection: ApiConnection, **kwargs): """Handle API connection attempted signal.""" logger.info( "%s - %s", formatted_text(prefix + "api_connection_attempted()"), connection.get_connection_string(), )
[docs] @receiver(api_connection_success, dispatch_uid="api_connection_success") def handle_api_connection_success(sender, connection: ApiConnection, **kwargs): """Handle API connection success signal.""" logger.info( "%s - %s", formatted_text(prefix + "api_connection_success()"), connection.get_connection_string(), )
[docs] @receiver(api_connection_failed, dispatch_uid="api_connection_failed") def handle_api_connection_failed(sender, connection: ApiConnection, error: Optional[Exception] = None, **kwargs): """Handle API connection failed signal.""" logger.info( "%s - %s", formatted_text(prefix + "api_connection_failed()"), connection.get_connection_string(), )
[docs] @receiver(api_connection_query_attempted, dispatch_uid="api_connection_query_attempted") def handle_api_connection_query_attempted(sender, connection: ApiConnection, **kwargs): """Handle API connection query attempted signal.""" logger.info( "%s - %s", formatted_text(prefix + "api_connection_query_attempted()"), connection.get_connection_string(), )
[docs] @receiver(api_connection_query_success, dispatch_uid="api_connection_query_success") def handle_api_connection_query_success( sender, connection: ApiConnection, response: Optional[Response] = None, **kwargs ): """Handle API connection query success signal.""" logger.info( "%s - %s - response: %s", formatted_text(prefix + "api_connection_query_success()"), connection.get_connection_string(), formatted_json(response.json()) if response else None, )
[docs] @receiver(api_connection_query_failed, dispatch_uid="api_connection_query_failed") def handle_api_connection_query_failed( sender, connection: ApiConnection, response: Optional[Response] = None, error: Optional[Exception] = None, **kwargs ): """Handle API connection query failed signal.""" logger.info( "%s - %s - response: %s - error: %s", formatted_text(prefix + "api_connection_query_failed()"), connection.get_connection_string(), formatted_json(response.json()) if response else None, error, )
# ------------------------------------------------------------------------------ # pre_delete signals for # ApiConnection, # SqlConnection, # ------------------------------------------------------------------------------
[docs] @receiver(pre_delete, sender=ApiConnection) def handle_api_connection_pre_delete(sender, instance, **kwargs): """Handle pre-delete signal for ApiConnection.""" logger.info( "%s - %s deleting.", formatted_text(prefix + "ApiConnection().pre_delete()"), instance, )
[docs] @receiver(pre_delete, sender=SqlConnection) def handle_sql_connection_pre_delete(sender, instance, **kwargs): """Handle pre-delete signal for SqlConnection.""" logger.info( "%s - %s deleting.", formatted_text(prefix + "SqlConnection().pre_delete()"), instance, )
[docs] @receiver(broker_ready, dispatch_uid="broker_ready") def handle_broker_ready(sender, broker: AbstractBroker, **kwargs): """Handle broker ready signal.""" logger.info( "%s %s %s for %s is ready.", formatted_text(f"{prefix}broker_ready()"), broker.kind, str(broker), broker.name, )