"""
authenticate requests via SmarterTokenAuthentication, a subclass of
knox.auth TokenAuthentication tokens.
"""
import logging
import traceback
from datetime import timedelta
from http import HTTPStatus
from typing import Optional
from django.contrib.auth import login
from django.http import HttpRequest
from django.utils import timezone
from django.utils.deprecation import MiddlewareMixin
from knox.settings import knox_settings
from rest_framework.authentication import get_authorization_header
from rest_framework.exceptions import AuthenticationFailed
from smarter.apps.api.v1.manifests.enum import SAMKinds
from smarter.common.conf import smarter_settings
from smarter.common.helpers.console_helpers import formatted_text
from smarter.common.mixins import SmarterHelperMixin
from smarter.common.utils import mask_string
from smarter.lib.django import waffle
from smarter.lib.django.validators import SmarterValidator
from smarter.lib.django.waffle import SmarterWaffleSwitches
from smarter.lib.journal.enum import SmarterJournalCliCommands
from smarter.lib.journal.http import SmarterJournaledJsonErrorResponse
from smarter.lib.logging import WaffleSwitchedLoggerWrapper
from .models import SmarterAuthToken
from .signals import (
smarter_token_authentication_failure,
smarter_token_authentication_request,
smarter_token_authentication_success,
)
from .token_authentication import (
SmarterAnonymousUser,
SmarterTokenAuthentication,
SmarterTokenAuthenticationError,
)
def should_log(level):
"""Check if logging should be done based on the waffle switch."""
return waffle.switch_is_active(SmarterWaffleSwitches.MIDDLEWARE_LOGGING)
base_logger = logging.getLogger(__name__)
logger = WaffleSwitchedLoggerWrapper(base_logger, should_log)
[docs]
class SmarterTokenAuthenticationMiddleware(MiddlewareMixin, SmarterHelperMixin):
"""Middleware to authenticate requests via SmarterTokenAuthentication, a subclass of
knox.auth TokenAuthentication tokens. Provides seamless token authentication for
incoming requests.
Does the following:
- Checks for the presence of an Authorization header with a valid token.
- Uses SmarterTokenAuthentication to authenticate the token.
- Logs authentication attempts and outcomes.
- adds Django signals for token authentication events.
- verifies token activity.
- adds timestamp update on token use.
Raises:
AuthenticationFailed: Raised when authentication fails.
SmarterTokenAuthenticationError: Raised for errors specific to SmarterTokenAuthentication.
"""
authorization_header: Optional[str] = None
request: Optional[HttpRequest] = None
masked_token: Optional[str] = None
@property
def formatted_class_name(self) -> str:
"""Return the formatted class name for logging purposes."""
return formatted_text(f"{__name__}.{SmarterTokenAuthenticationMiddleware.__name__}")
# pylint: disable=unused-argument
[docs]
def is_token_auth(self, request) -> bool:
"""Check if the request is for knox token authentication.
Args:
request (HttpRequest): The incoming HTTP request.
Returns:
bool: True if the request uses token authentication, False otherwise.
"""
# auth=[b'Token', b'd9d56ff4-- A 64-CHARACTER TOKEN --c8176']
auth = self.authorization_header.split() if isinstance(self.authorization_header, str) else []
auth = [a.decode() if isinstance(a, bytes) else a for a in auth]
prefix = knox_settings.AUTH_HEADER_PREFIX
if isinstance(prefix, bytes):
logger.debug(
"%s.is_token_auth() Decoding knox_settings.AUTH_HEADER_PREFIX from bytes to string",
self.formatted_class_name,
)
prefix = prefix.decode()
if not auth:
logger.debug(
"%s.is_token_auth() No Authorization header present - returning False",
self.formatted_class_name,
)
return False
# Ensure auth[0] is a string for comparison
# prefix=Token
# auth=['Token', 'd9d56ff4-- A 64-CHARACTER TOKEN --c8176']
auth_prefix = auth[0]
if isinstance(auth_prefix, bytes):
logger.debug(
"%s.is_token_auth() Decoding auth_prefix from bytes to string",
self.formatted_class_name,
)
auth_prefix = auth_prefix.decode()
if auth_prefix.lower() != prefix.lower():
# Authorization header is possibly for another backend
logger.debug(
"%s.is_token_auth() Authorization header prefix '%s' does not match expected prefix '%s'",
self.formatted_class_name,
auth_prefix,
prefix,
)
return False
token = auth[1]
self.masked_token = mask_string(string=token)
logger.debug(
"%s.is_token_auth() Detected Token authentication with token %s - returning True",
self.formatted_class_name,
self.masked_token,
)
return True
[docs]
def url(self) -> Optional[str]:
"""Return the full URL from the request object.
Returns:
Optional[str]: the complete url from the request object.
"""
if self.request:
return self.smarter_build_absolute_uri(self.request)
return None
[docs]
def get_request_with_verified_user(self, request: HttpRequest) -> HttpRequest:
"""Ensure the request has a user set, defaulting to SmarterAnonymousUser if not.
Args:
request (HttpRequest): The incoming HTTP request.
"""
if not hasattr(request, "user") or request.user is None:
logger.warning("%s.__call__() setting anonymous user on request", self.formatted_class_name)
request.user = SmarterAnonymousUser()
return request
[docs]
def __init__(self, get_response):
super().__init__(get_response)
self.get_response = get_response
logger.debug("%s.__init__() initialized", self.formatted_class_name)
def __call__(self, request, *args, **kwargs):
"""Try to authenticate the request using SmarterTokenAuthentication.
Args:
request (HttpRequest): The incoming HTTP request.
Raises:
AuthenticationFailed: Raised when authentication fails.
SmarterTokenAuthenticationError: Raised for errors specific to SmarterTokenAuthentication.
Returns:
HttpResponse: The response generated by the next middleware or view.
"""
# this is the earliest point at which we can evaluate the request URL.
url = self.smarter_build_absolute_uri(request)
logger.debug(
"%s.__call__() processing request for URL: %s",
self.formatted_class_name,
url,
)
if not SmarterValidator.is_api_endpoint(url):
# skip token authentication if we're not a Smarter API request
logger.info("%s skipping token authentication for non-Smarter API request", self.formatted_class_name)
request = self.get_request_with_verified_user(request)
return self.get_response(request)
self.authorization_header = get_authorization_header(request) # type: ignore[assignment]
self.request = request
if not self.is_token_auth(request):
# we're not using token authentication, no need to do anything
logger.debug("%s.__call__() skipping non-token authentication", self.formatted_class_name)
request = self.get_request_with_verified_user(request)
return self.get_response(request)
if getattr(request, "auth", None) is not None:
# we've already authenticated the request
# with some other middleware, no need to do anything
logger.debug(
"%s.__call__() skipping already authenticated request for user %s",
self.formatted_class_name,
getattr(request, "user", None),
)
request = self.get_request_with_verified_user(request)
return self.get_response(request)
smarter_token_authentication_request.send(
sender=self.__class__,
token=self.masked_token,
url=url,
)
request.auth = SmarterTokenAuthentication() # type: ignore[assignment]
try:
user, auth_obj = request.auth.authenticate(request) # type: ignore[assignment]
if not user:
raise AuthenticationFailed(
"SmarterTokenAuthentication.authenticate() did not return"
" a user object. This can happen if the Authorization header"
" is malformed or is for another backend."
)
# --- BEGIN: check token creation date ---
token = None
if hasattr(auth_obj, "digest"):
logger.debug(
"%s.__call__() digest found. Checking token creation date for user %s",
self.formatted_class_name,
user,
)
try:
token = SmarterAuthToken.objects.get(digest=auth_obj.digest)
if token.created < timezone.now() - timedelta(
days=smarter_settings.smarter_api_key_max_lifetime_days
):
logger.warning(
"%s token for user %s has exceeded max lifetime of %d days",
self.formatted_class_name,
user,
smarter_settings.smarter_api_key_max_lifetime_days,
)
except SmarterAuthToken.DoesNotExist:
logger.warning("%s token digest not found in AuthToken table", self.formatted_class_name)
# --- END: Get token creation date ---
request.user = user or (SmarterAnonymousUser() if user is None else user)
login(request, user, backend="django.contrib.auth.backends.ModelBackend")
smarter_token_authentication_success.send(
sender=self.__class__,
user=user,
token=self.masked_token,
)
logger.info("%s authenticated user %s", self.formatted_class_name, user)
except AuthenticationFailed as auth_failed:
smarter_token_authentication_failure.send(
sender=self.__class__,
user=SmarterAnonymousUser(),
token=self.masked_token,
)
try:
raise SmarterTokenAuthenticationError("Authentication failed.") from auth_failed
except SmarterTokenAuthenticationError as e:
auth = self.authorization_header.split() if isinstance(self.authorization_header, str) else []
auth_token = auth[1] if len(auth) > 1 else None
logger.warning(
"%s failed token authentication attempt using token %s", self.formatted_class_name, auth_token
)
thing = SAMKinds.from_url(self.url())
command = SmarterJournalCliCommands.from_url(self.url())
return SmarterJournaledJsonErrorResponse(
request=request,
e=e,
thing=thing,
command=command, # type: ignore[arg-type]
status=HTTPStatus.UNAUTHORIZED,
stack_trace=traceback.format_exc(),
)
logger.debug("%s.__call__() completed token authentication", self.formatted_class_name)
request = self.get_request_with_verified_user(request)
return self.get_response(request)