Source code for smarter.lib.drf.middleware

"""
authenticate requests via SmarterTokenAuthentication, a subclass of
knox.auth TokenAuthentication tokens.
"""

import logging
import traceback
from datetime import timedelta
from http import HTTPStatus
from typing import Optional

from django.contrib.auth import login
from django.http import HttpRequest
from django.utils import timezone
from django.utils.deprecation import MiddlewareMixin
from knox.settings import knox_settings
from rest_framework.authentication import get_authorization_header
from rest_framework.exceptions import AuthenticationFailed

from smarter.apps.api.v1.manifests.enum import SAMKinds
from smarter.common.conf import smarter_settings
from smarter.common.helpers.console_helpers import formatted_text
from smarter.common.mixins import SmarterHelperMixin
from smarter.common.utils import mask_string
from smarter.lib.django import waffle
from smarter.lib.django.validators import SmarterValidator
from smarter.lib.django.waffle import SmarterWaffleSwitches
from smarter.lib.journal.enum import SmarterJournalCliCommands
from smarter.lib.journal.http import SmarterJournaledJsonErrorResponse
from smarter.lib.logging import WaffleSwitchedLoggerWrapper

from .models import SmarterAuthToken
from .signals import (
    smarter_token_authentication_failure,
    smarter_token_authentication_request,
    smarter_token_authentication_success,
)
from .token_authentication import (
    SmarterAnonymousUser,
    SmarterTokenAuthentication,
    SmarterTokenAuthenticationError,
)


def should_log(level):
    """Check if logging should be done based on the waffle switch."""
    return waffle.switch_is_active(SmarterWaffleSwitches.MIDDLEWARE_LOGGING)


base_logger = logging.getLogger(__name__)
logger = WaffleSwitchedLoggerWrapper(base_logger, should_log)


[docs] class SmarterTokenAuthenticationMiddleware(MiddlewareMixin, SmarterHelperMixin): """Middleware to authenticate requests via SmarterTokenAuthentication, a subclass of knox.auth TokenAuthentication tokens. Provides seamless token authentication for incoming requests. Does the following: - Checks for the presence of an Authorization header with a valid token. - Uses SmarterTokenAuthentication to authenticate the token. - Logs authentication attempts and outcomes. - adds Django signals for token authentication events. - verifies token activity. - adds timestamp update on token use. Raises: AuthenticationFailed: Raised when authentication fails. SmarterTokenAuthenticationError: Raised for errors specific to SmarterTokenAuthentication. """ authorization_header: Optional[str] = None request: Optional[HttpRequest] = None masked_token: Optional[str] = None @property def formatted_class_name(self) -> str: """Return the formatted class name for logging purposes.""" return formatted_text(f"{__name__}.{SmarterTokenAuthenticationMiddleware.__name__}") # pylint: disable=unused-argument
[docs] def is_token_auth(self, request) -> bool: """Check if the request is for knox token authentication. Args: request (HttpRequest): The incoming HTTP request. Returns: bool: True if the request uses token authentication, False otherwise. """ # auth=[b'Token', b'd9d56ff4-- A 64-CHARACTER TOKEN --c8176'] auth = self.authorization_header.split() if isinstance(self.authorization_header, str) else [] auth = [a.decode() if isinstance(a, bytes) else a for a in auth] prefix = knox_settings.AUTH_HEADER_PREFIX if isinstance(prefix, bytes): logger.debug( "%s.is_token_auth() Decoding knox_settings.AUTH_HEADER_PREFIX from bytes to string", self.formatted_class_name, ) prefix = prefix.decode() if not auth: logger.debug( "%s.is_token_auth() No Authorization header present - returning False", self.formatted_class_name, ) return False # Ensure auth[0] is a string for comparison # prefix=Token # auth=['Token', 'd9d56ff4-- A 64-CHARACTER TOKEN --c8176'] auth_prefix = auth[0] if isinstance(auth_prefix, bytes): logger.debug( "%s.is_token_auth() Decoding auth_prefix from bytes to string", self.formatted_class_name, ) auth_prefix = auth_prefix.decode() if auth_prefix.lower() != prefix.lower(): # Authorization header is possibly for another backend logger.debug( "%s.is_token_auth() Authorization header prefix '%s' does not match expected prefix '%s'", self.formatted_class_name, auth_prefix, prefix, ) return False token = auth[1] self.masked_token = mask_string(string=token) logger.debug( "%s.is_token_auth() Detected Token authentication with token %s - returning True", self.formatted_class_name, self.masked_token, ) return True
[docs] def url(self) -> Optional[str]: """Return the full URL from the request object. Returns: Optional[str]: the complete url from the request object. """ if self.request: return self.smarter_build_absolute_uri(self.request) return None
[docs] def get_request_with_verified_user(self, request: HttpRequest) -> HttpRequest: """Ensure the request has a user set, defaulting to SmarterAnonymousUser if not. Args: request (HttpRequest): The incoming HTTP request. """ if not hasattr(request, "user") or request.user is None: logger.warning("%s.__call__() setting anonymous user on request", self.formatted_class_name) request.user = SmarterAnonymousUser() return request
[docs] def __init__(self, get_response): super().__init__(get_response) self.get_response = get_response logger.debug("%s.__init__() initialized", self.formatted_class_name)
def __call__(self, request, *args, **kwargs): """Try to authenticate the request using SmarterTokenAuthentication. Args: request (HttpRequest): The incoming HTTP request. Raises: AuthenticationFailed: Raised when authentication fails. SmarterTokenAuthenticationError: Raised for errors specific to SmarterTokenAuthentication. Returns: HttpResponse: The response generated by the next middleware or view. """ # this is the earliest point at which we can evaluate the request URL. url = self.smarter_build_absolute_uri(request) logger.debug( "%s.__call__() processing request for URL: %s", self.formatted_class_name, url, ) if not SmarterValidator.is_api_endpoint(url): # skip token authentication if we're not a Smarter API request logger.info("%s skipping token authentication for non-Smarter API request", self.formatted_class_name) request = self.get_request_with_verified_user(request) return self.get_response(request) self.authorization_header = get_authorization_header(request) # type: ignore[assignment] self.request = request if not self.is_token_auth(request): # we're not using token authentication, no need to do anything logger.debug("%s.__call__() skipping non-token authentication", self.formatted_class_name) request = self.get_request_with_verified_user(request) return self.get_response(request) if getattr(request, "auth", None) is not None: # we've already authenticated the request # with some other middleware, no need to do anything logger.debug( "%s.__call__() skipping already authenticated request for user %s", self.formatted_class_name, getattr(request, "user", None), ) request = self.get_request_with_verified_user(request) return self.get_response(request) smarter_token_authentication_request.send( sender=self.__class__, token=self.masked_token, url=url, ) request.auth = SmarterTokenAuthentication() # type: ignore[assignment] try: user, auth_obj = request.auth.authenticate(request) # type: ignore[assignment] if not user: raise AuthenticationFailed( "SmarterTokenAuthentication.authenticate() did not return" " a user object. This can happen if the Authorization header" " is malformed or is for another backend." ) # --- BEGIN: check token creation date --- token = None if hasattr(auth_obj, "digest"): logger.debug( "%s.__call__() digest found. Checking token creation date for user %s", self.formatted_class_name, user, ) try: token = SmarterAuthToken.objects.get(digest=auth_obj.digest) if token.created < timezone.now() - timedelta( days=smarter_settings.smarter_api_key_max_lifetime_days ): logger.warning( "%s token for user %s has exceeded max lifetime of %d days", self.formatted_class_name, user, smarter_settings.smarter_api_key_max_lifetime_days, ) except SmarterAuthToken.DoesNotExist: logger.warning("%s token digest not found in AuthToken table", self.formatted_class_name) # --- END: Get token creation date --- request.user = user or (SmarterAnonymousUser() if user is None else user) login(request, user, backend="django.contrib.auth.backends.ModelBackend") smarter_token_authentication_success.send( sender=self.__class__, user=user, token=self.masked_token, ) logger.info("%s authenticated user %s", self.formatted_class_name, user) except AuthenticationFailed as auth_failed: smarter_token_authentication_failure.send( sender=self.__class__, user=SmarterAnonymousUser(), token=self.masked_token, ) try: raise SmarterTokenAuthenticationError("Authentication failed.") from auth_failed except SmarterTokenAuthenticationError as e: auth = self.authorization_header.split() if isinstance(self.authorization_header, str) else [] auth_token = auth[1] if len(auth) > 1 else None logger.warning( "%s failed token authentication attempt using token %s", self.formatted_class_name, auth_token ) thing = SAMKinds.from_url(self.url()) command = SmarterJournalCliCommands.from_url(self.url()) return SmarterJournaledJsonErrorResponse( request=request, e=e, thing=thing, command=command, # type: ignore[arg-type] status=HTTPStatus.UNAUTHORIZED, stack_trace=traceback.format_exc(), ) logger.debug("%s.__call__() completed token authentication", self.formatted_class_name) request = self.get_request_with_verified_user(request) return self.get_response(request)