# pylint: disable=duplicate-code
# pylint: disable=E1101
"""
Utility functions for the Smarter framework.
This module provides a collection of helper functions and classes
that are ostensibly implemented in more than one Smarter base class.
Hence, they are only here in order to keep the code DRY (Don't Repeat Yourself).
The module is intended for internal use within the Smarter framework and is
designed to be compatible with Python 3, Django, DRF, and Pydantic.
"""
import csv
import hashlib
import logging
import os
import random
import re
import warnings
from typing import Optional, Union
import yaml
from smarter.common.exceptions import SmarterValueError
from smarter.common.helpers.console_helpers import formatted_text
from smarter.lib.logging import WaffleSwitchedLoggerWrapper
logger = logging.getLogger(__name__)
logger_prefix = formatted_text(__name__)
# pylint: disable=W0613
def should_log_verbose(level):
"""Check if logging should be done based on the waffle switch."""
# pylint: disable=C0415
from smarter.common.conf import smarter_settings
return smarter_settings.verbose_logging
verbose_logger = WaffleSwitchedLoggerWrapper(logger, should_log_verbose)
[docs]
def hash_factory(length: int = 16) -> str:
"""
Generates a random hexadecimal hash string of the specified length.
:param length: The desired length of the hash string. Must be a positive integer. If the value exceeds the length of a SHA-256 hash (64), the result will be truncated to the maximum available length.
:type length: int, optional (default is 16)
:return: A random hexadecimal string of the specified length.
:rtype: str
.. note::
The hash is generated using a random 256-bit integer, encoded with SHA-256, and truncated to the requested length. The output is suitable for use as a unique identifier, token, or nonce in most application contexts.
.. warning::
This function does not guarantee cryptographic security for all use cases. For security-critical applications (such as password hashing or cryptographic keys), use dedicated libraries and algorithms.
**Example usage:**
.. code-block:: python
from smarter.common.utils import hash_factory
# Generate a 16-character random hash
token = hash_factory()
print(token) # e.g., 'a3f9c1e2b4d5f6a7'
# Generate a 32-character random hash
long_token = hash_factory(length=32)
print(long_token) # e.g., 'a3f9c1e2b4d5f6a7c8e9d0b1a2c3d4e5'
"""
return hashlib.sha256(str(random.getrandbits(256)).encode("utf-8")).hexdigest()[:length]
[docs]
def get_readonly_yaml_file(file_path) -> dict:
"""
Reads a YAML file from the specified path and returns its contents as a Python dictionary.
:param file_path: The path to the YAML file to be read. This should be a string representing a valid file system path.
:type file_path: str
:return: The contents of the YAML file, parsed into a Python dictionary. If the file is empty or contains no valid YAML, ``None`` may be returned.
:rtype: dict
.. note::
This function opens the file in read-only mode with UTF-8 encoding and uses ``yaml.safe_load`` for parsing. Only standard YAML types are supported.
.. warning::
If the file does not exist, is not readable, or contains invalid YAML, an exception will be raised. Always validate the file path and contents before use.
**Example usage:**
.. code-block:: python
from smarter.common.utils import get_readonly_yaml_file
config = get_readonly_yaml_file('/path/to/config.yaml')
print(config) # {'key': 'value', ...}
"""
logger.debug("%s.get_readonly_yaml_file()", logger_prefix)
with open(file_path, encoding="utf-8") as file:
return yaml.safe_load(file)
[docs]
def get_readonly_csv_file(file_path):
"""
Reads a CSV file from the specified path and returns its contents as a list of dictionaries.
:param file_path: The path to the CSV file to be read. This should be a string representing a valid file system path.
:type file_path: str
:return: A list of dictionaries, where each dictionary represents a row in the CSV file. The keys of each dictionary correspond to the column headers in the CSV.
:rtype: list[dict]
.. note::
The file is opened in read-only mode with UTF-8 encoding. The function uses ``csv.DictReader`` to parse the file, which means the first row must contain the column headers.
.. warning::
If the file does not exist, is not readable, or is not a valid CSV, an exception will be raised. Always validate the file path and ensure the CSV is properly formatted.
**Example usage:**
.. code-block:: python
from smarter.common.utils import get_readonly_csv_file
rows = get_readonly_csv_file('/path/to/data.csv')
for row in rows:
print(row) # {'column1': 'value1', 'column2': 'value2', ...}
"""
with open(file_path, encoding="utf-8") as file:
reader = csv.DictReader(file)
return list(reader)
[docs]
def camel_to_snake_dict(dictionary: dict) -> dict:
"""
Converts the keys of a dictionary from camelCase to snake_case recursively.
:param dictionary: The input dictionary whose keys are in camelCase format. Nested dictionaries are also converted.
:type dictionary: dict
:return: A new dictionary with all keys converted to snake_case. Nested dictionaries are processed recursively.
:rtype: dict
.. note::
This function only converts dictionary keys. Values are preserved as-is, except for nested dictionaries, which are also converted.
.. warning::
Keys that are not strings will not be converted. If a key is already in snake_case, it will remain unchanged.
**Example usage:**
.. code-block:: python
from smarter.common.utils import camel_to_snake_dict
data = {
"userName": "alice",
"userProfile": {
"firstName": "Alice",
"lastName": "Smith"
}
}
result = camel_to_snake_dict(data)
print(result)
# Output: {'user_name': 'alice', 'user_profile': {'first_name': 'Alice', 'last_name': 'Smith'}}
"""
verbose_logger.debug("%s.camel_to_snake_dict()", logger_prefix)
def convert(name: str):
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()
retval = {}
for key, value in dictionary.items():
if isinstance(value, dict):
value = camel_to_snake_dict(value)
new_key = convert(key)
retval[new_key] = value
return retval
[docs]
def recursive_sort_dict(d):
"""
Recursively sorts a dictionary by its keys.
:param d: The input dictionary to be sorted. Nested dictionaries are also sorted recursively.
:type d: dict
:return: A new dictionary with all keys sorted in ascending order. If a value is itself a dictionary, it is also sorted recursively.
:rtype: dict
.. note::
This function is useful for producing deterministic dictionary outputs, such as for testing, serialization, or comparison purposes.
.. warning::
Non-dictionary values are left unchanged. Lists, sets, and other types within the dictionary are not sorted or modified.
**Example usage:**
.. code-block:: python
from smarter.common.utils import recursive_sort_dict
data = {
"b": 2,
"a": {
"d": 4,
"c": 3
}
}
sorted_data = recursive_sort_dict(data)
print(sorted_data)
# Output: {'a': {'c': 3, 'd': 4}, 'b': 2}
"""
verbose_logger.debug("%s.recursive_sort_dict()", logger_prefix)
return {k: recursive_sort_dict(v) if isinstance(v, dict) else v for k, v in sorted(d.items())}
[docs]
def dict_is_contained_in(dict1, dict2):
"""
Checks whether all keys and values in ``dict1`` are present in ``dict2``, recursively.
:param dict1: The dictionary whose keys and values are to be checked for containment.
:type dict1: dict
:param dict2: The dictionary in which to check for the presence of keys and values from ``dict1``.
:type dict2: dict
:return: Returns ``True`` if every key in ``dict1`` exists in ``dict2`` and the corresponding values match (including nested dictionaries). Returns ``False`` otherwise.
:rtype: bool
.. note::
This function prints diagnostic messages to standard output if a key or value is missing or mismatched. Nested dictionaries are checked recursively.
.. warning::
The function is not silent: it prints to standard output when a mismatch is found. This may not be suitable for production use where logging is preferred.
**Example usage:**
.. code-block:: python
from smarter.common.utils import dict_is_contained_in
model = {
"name": "Alice",
"profile": {
"age": 30,
"city": "Wonderland"
}
}
test = {
"name": "Alice",
"profile": {
"age": 30,
"city": "Wonderland"
},
"extra": "value"
}
result = dict_is_contained_in(model, test)
print(result) # True
# Example with missing key
test_missing = {
"name": "Alice"
}
result = dict_is_contained_in(model, test_missing)
print(result) # False
"""
verbose_logger.debug("%s.dict_is_contained_in()", logger_prefix)
for key, value in dict1.items():
if key not in dict2:
print(f"the key {key} is not present in the model dict: ")
return False
if isinstance(value, dict):
if not dict_is_contained_in(value, dict2[key]):
print("dict not in the model dict: ", value)
return False
else:
if dict2[key] != value:
print(f"value {value} is not present in the model dict: ")
return False
return True
[docs]
def dict_is_subset(small, big) -> bool:
"""
Recursively checks that all items in the dictionary ``small`` exist in the dictionary ``big``.
:param small: The dictionary (or list) whose items should be checked for existence in ``big``.
:type small: dict or list
:param big: The dictionary (or list) in which to check for the presence of items from ``small``.
:type big: dict or list
:return: Returns ``True`` if every item in ``small`` exists in ``big`` (including nested dictionaries and lists). Returns ``False`` otherwise.
:rtype: bool
.. note::
- For dictionaries, all keys and their corresponding values must exist in ``big``.
- For lists, all elements in ``small`` must be present in ``big``; order does not matter.
- Nested dictionaries and lists are checked recursively.
.. warning::
This function does not print diagnostic messages. It is designed for silent, recursive subset checking. For more verbose output, use ``dict_is_contained_in``.
**Example usage:**
.. code-block:: python
from smarter.common.utils import dict_is_subset
big = {
"name": "Alice",
"profile": {
"age": 30,
"city": "Wonderland"
},
"roles": ["admin", "user"]
}
small = {
"profile": {
"age": 30
},
"roles": ["admin"]
}
result = dict_is_subset(small, big)
print(result) # True
# Example with missing value
small_missing = {
"profile": {
"age": 31
}
}
result = dict_is_subset(small_missing, big)
print(result) # False
"""
verbose_logger.debug("%s.dict_is_subset()", logger_prefix)
if isinstance(small, dict) and isinstance(big, dict):
for k, v in small.items():
if k not in big:
return False
if not dict_is_subset(v, big[k]):
return False
return True
elif isinstance(small, list) and isinstance(big, list):
# Check that all items in 'small' are in 'big' (order does NOT matter)
for sv in small:
if isinstance(sv, dict):
if not any(dict_is_subset(sv, bv) for bv in big if isinstance(bv, dict)):
return False
else:
if sv not in big:
return False
return True
else:
return small == big
[docs]
def mask_string(string: str, mask_char: str = "*", mask_length: int = 4, string_length: int = 8) -> str:
"""
Masks a string by replacing all but the last ``mask_length`` characters with ``mask_char``.
.. deprecated:: 0.10.0
This function is deprecated and will be removed in a future release.
Use Pydantic's ``SecretStr`` or other secure alternatives for string masking.
:param string: The string to mask. If a ``bytes`` object is provided, it will be decoded to UTF-8.
:type string: str or bytes
:param mask_char: The character to use for masking. Default is ``'*'``.
:type mask_char: str, optional
:param mask_length: The number of characters at the end of the string to leave unmasked. Must be non-negative and less than or equal to the length of the string.
:type mask_length: int, optional
:param string_length: The total length of the returned masked string. If the original string is shorter, the result will be truncated or padded accordingly.
:type string_length: int, optional
:return: The masked string, with all but the last ``mask_length`` characters replaced by ``mask_char``. The result is truncated to ``string_length`` if necessary.
:rtype: str
:raises TypeError: If ``string`` is not a string or bytes.
:raises ValueError: If ``mask_length`` or ``string_length`` are negative, or if ``mask_length`` exceeds the length of the string.
.. note::
- If the input string is shorter than ``mask_length``, the original string is returned.
- If ``mask_length`` is greater than ``string_length``, it is reduced to ``string_length``.
**Example usage:**
.. code-block:: python
from smarter.common.utils import mask_string
# Mask all but the last 4 characters
masked = mask_string("supersecretpassword", mask_char="*", mask_length=4)
print(masked) # Output: *************word
# Mask and truncate to 8 characters
masked = mask_string("supersecretpassword", mask_char="#", mask_length=3, string_length=8)
print(masked) # Output: #####ord
# Mask a short string
masked = mask_string("abc", mask_length=4)
print(masked) # Output: abc
"""
verbose_logger.debug("%s.mask_string()", logger_prefix)
warnings.warn(
"mask_string is deprecated and will be removed in a future release.", DeprecationWarning, stacklevel=2
)
if isinstance(string, bytes):
string = string.decode("utf-8")
if not isinstance(string, str):
logger.warning("mask_string() - Input is not a string or bytes: %s", type(string))
return str(string)
if len(string) <= mask_length:
return string
if mask_length < 0:
raise ValueError("mask_length must be greater than or equal to 0")
if string_length < 0:
raise ValueError("string_length must be greater than or equal to 0")
if mask_length > len(string):
raise ValueError("mask_length must be less than or equal to the length of the string")
if string_length > len(string):
string_length = len(string)
if mask_length > string_length:
mask_length = string_length
masked_string = (
f"{f'{mask_char}' * (len(string) - mask_length)}{string[-mask_length:]}"
if len(string) > mask_length
else string
)
masked_string = masked_string[-string_length:] if len(masked_string) > string_length else masked_string
return masked_string
[docs]
def snake_to_camel(data: Union[str, dict, list], convert_values: bool = False) -> Optional[Union[str, dict, list]]:
"""
Converts snake_case strings, dictionary keys, or lists of such, to camelCase format.
:param data: The input to convert. Can be a string, a dictionary (with snake_case keys), or a list containing strings or dictionaries.
:type data: str, dict, or list
:param convert_values: If ``True``, string values within dictionaries are also converted to camelCase. Default is ``False``.
:type convert_values: bool, optional
:return: The converted data in camelCase format. Returns a string, dictionary, or list, matching the input type.
:rtype: Optional[Union[str, dict, list]]
.. note::
- For dictionaries, only keys are converted by default. If ``convert_values`` is set, string values are also converted.
- Nested dictionaries and lists are processed recursively.
.. warning::
If the input is not a string, dictionary, or list, a ``SmarterValueError`` is raised.
**Example usage:**
.. code-block:: python
from smarter.common.utils import snake_to_camel
# Convert a string
print(snake_to_camel("user_name")) # Output: userName
# Convert a dictionary
data = {
"user_name": "alice",
"user_profile": {
"first_name": "Alice",
"last_name": "Smith"
}
}
print(snake_to_camel(data))
# Output: {'userName': 'alice', 'userProfile': {'firstName': 'Alice', 'lastName': 'Smith'}}
# Convert a list of strings
print(snake_to_camel(["first_name", "last_name"]))
# Output: ['firstName', 'lastName']
# Convert values as well
data = {"user_name": "first_name"}
print(snake_to_camel(data, convert_values=True))
# Output: {'userName': 'firstName'}
"""
verbose_logger.debug("%s.snake_to_camel()", logger_prefix)
def convert(name: str) -> str:
components = name.split("_")
return components[0] + "".join(x.title() for x in components[1:])
if isinstance(data, str):
return convert(data)
if isinstance(data, list):
return [snake_to_camel(item, convert_values=convert_values) for item in data]
if not isinstance(data, dict):
raise SmarterValueError(f"Expected data to be a dict or list, got: {type(data)}")
dictionary: dict = data if isinstance(data, dict) else {}
retval = {}
for key, value in dictionary.items():
if isinstance(value, dict):
value = snake_to_camel(data=value, convert_values=convert_values)
new_key = convert(key)
if convert_values:
new_value = convert(value) if isinstance(value, str) else value
else:
new_value = value
retval[new_key] = new_value
return retval
[docs]
def snake_case(name: str) -> str:
"""
Converts a string to snake_case format.
:param name: The string to convert.
:type name: str
:return: The converted string in snake_case format.
:rtype: str
.. note::
- Spaces in the input string are replaced with underscores.
- Multiple consecutive underscores are collapsed into a single underscore.
**Example usage:**
.. code-block:: python
from smarter.common.utils import snake_case
print(snake_case("UserProfile")) # Output: user_profile
print(snake_case("FirstName LastName")) # Output: first_name_last_name
"""
verbose_logger.debug("%s.snake_case()", logger_prefix)
name = name.replace(" ", "_")
name = re.sub("_+", "_", name)
return name.lower()
[docs]
def pascal_to_snake(name: str) -> str:
"""
Converts a PascalCase string to pascal_case snake_case format.
:param name: The PascalCase string to convert.
:type name: str
:return: The converted string in snake_case format.
:rtype: str
.. note::
- Spaces in the input string are replaced with underscores.
- Multiple consecutive underscores are collapsed into a single underscore.
**Example usage:**
.. code-block:: python
from smarter.common.utils import pascal_to_snake
print(pascal_to_snake("UserProfile")) # Output: user_profile
print(pascal_to_snake("FirstName LastName")) # Output: first_name_last_name
"""
verbose_logger.debug("%s.pascal_to_snake()", logger_prefix)
pattern = re.compile(r"(?<!^)(?=[A-Z])")
return pattern.sub("_", name).lower()
[docs]
def camel_to_snake(data: Union[str, dict, list]) -> Optional[Union[str, dict, list]]:
"""
Converts camelCase strings, dictionary keys, or lists of such, to snake_case format.
:param data: The input to convert. Can be a string, a dictionary (with camelCase keys), or a list containing strings or dictionaries.
:type data: str, dict, or list
:return: The converted data in snake_case format. Returns a string, dictionary, or list, matching the input type.
:rtype: Optional[Union[str, dict, list]]
.. note::
- For dictionaries, only keys are converted. Values are preserved as-is, except for nested dictionaries, which are also converted.
- Spaces in keys are replaced with underscores.
- Multiple consecutive underscores are collapsed into a single underscore.
- Nested dictionaries and lists are processed recursively.
.. warning::
If the input is not a string, dictionary, or list, a ``SmarterValueError`` is raised.
**Example usage:**
.. code-block:: python
from smarter.common.utils import camel_to_snake
# Convert a string
print(camel_to_snake("userName")) # Output: user_name
# Convert a dictionary
data = {
"userName": "alice",
"userProfile": {
"firstName": "Alice",
"lastName": "Smith"
}
}
print(camel_to_snake(data))
# Output: {'user_name': 'alice', 'user_profile': {'first_name': 'Alice', 'last_name': 'Smith'}}
# Convert a list of strings
print(camel_to_snake(["firstName", "lastName"]))
# Output: ['first_name', 'last_name']
"""
def convert(name: str):
name = name.replace(" ", "_")
name = name[0].lower() + name[1:] if name and len(name) > 1 and name[0].isupper() else name
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
result = re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()
result = re.sub("_+", "_", result)
return result
if isinstance(data, str):
return convert(data)
if isinstance(data, list):
return [camel_to_snake(item) for item in data]
if not isinstance(data, dict):
raise SmarterValueError(f"Expected data to be a dict or list, got: {type(data)}")
dictionary: dict = data if isinstance(data, dict) else {}
retval = {}
for key, value in dictionary.items():
if isinstance(value, dict):
value = camel_to_snake(value)
new_key = convert(key)
retval[new_key] = value
return retval
[docs]
def rfc1034_compliant_str(val) -> str:
"""
Generates a RFC 1034-compliant name string suitable for use as a DNS label or resource identifier.
:param val: The input string to convert to RFC 1034-compliant format.
:type val: str
:return: A string that is:
- lower case
- contains only alphanumeric characters and hyphens
- starts and ends with an alphanumeric character
- has a maximum length of 63 characters
:rtype: str
:raises SmarterValueError: If the input is not a string or is empty after conversion.
.. note::
- Underscores in the input are replaced with hyphens.
- Invalid characters (anything other than a-z, 0-9, or '-') are removed.
- Leading and trailing hyphens are stripped.
- The result is truncated to 63 characters if necessary.
.. warning::
This function is intended for generating DNS-safe names. It does not guarantee uniqueness or suitability for all RFC 1034 use cases.
**Example usage:**
.. code-block:: python
from smarter.common.utils import rfc1034_compliant_str
# Basic usage
print(rfc1034_compliant_str("My_ChatBot_2025")) # Output: my-chatbot-2025
# With special characters
print(rfc1034_compliant_str("My@Bot!_Name")) # Output: my-bot-name
# With long input
long_name = "ThisIsAReallyLongChatBotNameThatShouldBeTruncatedToSixtyThreeCharacters_Extra"
print(rfc1034_compliant_str(long_name)) # Output: thisisareallylongchatbotnamethatshouldbetruncatedtosixtythreecharacters
"""
if not isinstance(val, str):
raise SmarterValueError(f"Could not generate RFC 1034 compliant name from {type(val)}")
# Replace underscores with hyphens
label = val.lower().replace("_", "-")
# Remove invalid characters
label = re.sub(r"[^a-z0-9-]", "", label)
# Remove leading/trailing hyphens
label = label.strip("-")
# Truncate to 63 characters
if label:
return label[:63]
else:
raise SmarterValueError("Could not generate RFC 1034 compliant name from empty string")
[docs]
def rfc1034_compliant_to_snake(val) -> str:
"""
Converts a RFC 1034-compliant name (typically used for DNS labels or resource identifiers) to a more human-readable ``snake_case`` name.
This function is useful for translating machine-friendly names (which use hyphens as word separators) into Pythonic identifiers (which use underscores).
:param val: The RFC 1034-compliant name to convert. This should be a string containing only lowercase letters, numbers, and hyphens.
:type val: str
:return: The converted name in ``snake_case`` format, with hyphens replaced by underscores.
:rtype: str
:raises SmarterValueError: If the input is not a string.
.. note::
- Only hyphens are replaced; other characters are preserved.
- The function does not validate that the input is strictly RFC 1034-compliant. It assumes the input is already sanitized.
.. warning::
This function does not handle conversion of other non-alphanumeric characters. If the input contains characters other than hyphens, underscores, letters, or numbers, they will remain unchanged.
**Example usage:**
.. code-block:: python
from smarter.common.utils import rfc1034_compliant_to_snake
# Basic conversion
print(rfc1034_compliant_to_snake("my-chatbot-2025"))
# Output: my_chatbot_2025
# Input with no hyphens
print(rfc1034_compliant_to_snake("simplelabel"))
# Output: simplelabel
# Input with multiple hyphens
print(rfc1034_compliant_to_snake("this-is-a-test-label"))
# Output: this_is_a_test_label
# Input with invalid type
try:
rfc1034_compliant_to_snake(12345)
except SmarterValueError as e:
print(e)
# Output: Could not convert RFC 1034 compliant name from <class 'int'>
"""
verbose_logger.debug("%s.rfc1034_compliant_to_snake()", logger_prefix)
if not isinstance(val, str):
raise SmarterValueError(f"Could not convert RFC 1034 compliant name from {type(val)}")
# Replace hyphens with underscores
name = val.replace("-", "_")
return name
[docs]
def generate_fernet_encryption_key() -> str:
"""
Generates a new Fernet encryption key.
:return: A URL-safe base64-encoded 32-byte key suitable for use with the Fernet symmetric encryption system.
:rtype: str
.. note::
- This function uses the ``cryptography`` library to generate a secure random key. The key is encoded as a UTF-8 string for easy storage and transmission.
- The generated key is random and should be securely stored. It is essential for encrypting and decrypting data using the Fernet protocol.
**Example usage:**
.. code-block:: python
from smarter.common.utils import generate_fernet_encryption_key
key = generate_fernet_encryption_key()
print(key) # e.g., 'gAAAAABh...'
"""
verbose_logger.debug("%s.generate_fernet_encryption_key()", logger_prefix)
# pylint: disable=C0415
from cryptography.fernet import Fernet
verbose_logger.debug("%s.generate_fernet_encryption_key() Generating new Fernet encryption key.", logger_prefix)
return Fernet.generate_key().decode("utf-8")
[docs]
def bool_environment_variable(var_name: str, default: bool) -> bool:
"""Get a boolean environment variable"""
verbose_logger.debug("%s.bool_environment_variable()", logger_prefix)
value = os.environ.get(var_name) or os.environ.get(f"SMARTER_{var_name}")
if value is None:
return default
return value.lower() in ["true", "1", "t", "y", "yes"]
__all__ = [
"bool_environment_variable",
"camel_to_snake",
"camel_to_snake_dict",
"dict_is_contained_in",
"dict_is_subset",
"generate_fernet_encryption_key",
"get_readonly_csv_file",
"get_readonly_yaml_file",
"mask_string",
"pascal_to_snake",
"rfc1034_compliant_str",
"rfc1034_compliant_to_snake",
"recursive_sort_dict",
"snake_case",
"snake_to_camel",
]