Redis Log Handler
smarter.lib.logging.redis_log_handler
This module provides a custom logging handler for publishing log records to Redis channels, enabling real-time log streaming for distributed systems. It is designed to work with context-aware job IDs, allowing logs to be associated with specific jobs or tasks (such as Celery tasks), as well as to a global log channel for system-wide log aggregation.
This handler is used for the Terminal Emulator real-time log view in the Smarter web console.
Main Components
smarter.lib.logging.redis_log_handler.RedisLogHandler: A custom logging handler that publishes log records to Redis channels, supporting both job-specific and global log streams.smarter.lib.logging.redis_log_handler.job_id_factory: Utility function to generate unique job IDs for associating logs with specific jobs or tasks.smarter.lib.logging.redis_log_handler.user_id_context: Context variable for tracking the current job ID within the logging context.smarter.lib.logging.redis_log_handler.GLOBAL_LOG_CHANNEL: The Redis channel name used for publishing all logs globally.
Features
Asynchronous log publishing to Redis using a background worker thread and batching for efficiency.
Support for both job-specific and global log channels.
Graceful shutdown and log flushing on process exit.
Handles dropped logs if the internal queue is full, with periodic reporting.
Example Usage
#
# configure the Django logging to use the RedisLogHandler
#
LOGGING = {
"handlers": {
"default": {
"level": smarter_settings.log_level_name,
"class": "logging.StreamHandler",
"formatter": "timestamped",
},
"redis": {
"level": smarter_settings.log_level_name,
"class": "smarter.lib.logging.RedisLogHandler", # <--- Use the RedisLogHandler
"formatter": "timestamped",
},
}
Attention
The following is technically possible but not a recommended practice. It demonstrates how to manually configure logging to use the RedisLogHandler outside of a Django settings context, such as in a standalone script or a Celery task. In most cases, it’s better to configure logging through the Django settings for consistency and maintainability.
#
# manually configure logging to use RedisLogHandler
# NOTE: this is technically possible but not a great idea.
#
import logging
from smarter.lib.logging.redis_log_handler import RedisLogHandler, user_id_context, job_id_factory
# Set the current job ID (typically in a Celery task or similar context)
user_id_context.set(job_id_factory("task"))
# Configure the logger
logger = logging.getLogger("my_logger")
logger.setLevel(logging.INFO)
logger.addHandler(RedisLogHandler())
logger.info("This log will be published to Redis.")
- class smarter.lib.logging.redis_log_handler.RedisLogHandler(level=0)[source]
Bases:
HandlerCustom logging handler that publishes log records to Redis channels for real-time log streaming.
This handler supports both job-specific and global log channels:
- If a job ID is present in the
user_id_contextcontext variable, the log record is published to the Redis channel
logs:{user_id}, where{user_id}is the unique identifier for the job or task.
- If a job ID is present in the
- All log records are also published to the global channel defined by
GLOBAL_LOG_CHANNEL(logs:global), which can be used for system-wide log aggregation or UI log feeds.
- All log records are also published to the global channel defined by
The handler is designed to be used in distributed or asynchronous environments (e.g., Celery tasks), where each job or task can have its own log stream. Log records are enqueued and published asynchronously by a background worker thread for efficiency and non-blocking behavior.
If the internal log queue is full, log records may be dropped. The number of dropped logs is tracked by the class variable dropped_logs, and a message is printed every 100 dropped logs.
See also
job_id_factoryFunction to generate unique job IDs.
user_id_contextContext variable for the current job ID.
GLOBAL_LOG_CHANNELName of the global Redis log channel.
- __init__(level=0)
Initializes the instance - basically setting the formatter to None and the filter list to empty.
- _at_fork_reinit()
- acquire()
Acquire the I/O thread lock.
- addFilter(filter)
Add the specified filter to this handler.
- close()
Tidy up any resources used by the handler.
This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.
- createLock()
Acquire a thread lock for serializing access to the underlying I/O.
- dropped_logs = 0
- emit(record)[source]
Emit a log record.
This method formats the log record and publishes it to the appropriate Redis channels. If a job ID is present in the current context, the log record is published to the job-specific channel. All log records are also published to the global log channel.
- Parameters:
record (logging.LogRecord) – The log record to be emitted.
- filter(record)
Determine if a record is loggable by consulting all the filters.
The default is to allow the record to be logged; any filter can veto this by returning a false value. If a filter attached to a handler returns a log record instance, then that instance is used in place of the original log record in any further processing of the event by that handler. If a filter returns any other true value, the original log record is used in any further processing of the event by that handler.
If none of the filters return false values, this method returns a log record. If any of the filters return a false value, this method returns a false value.
Changed in version 3.2: Allow filters to be just callables.
Changed in version 3.12: Allow filters to return a LogRecord instead of modifying it in place.
- flush()
Ensure all logging output has been flushed.
This version does nothing and is intended to be implemented by subclasses.
- format(record)
Format the specified record.
If a formatter is set, use it. Otherwise, use the default formatter for the module.
- get_name()
- handle(record)
Conditionally emit the specified logging record.
Emission depends on filters which may have been added to the handler. Wrap the actual emission of the record with acquisition/release of the I/O thread lock.
Returns an instance of the log record that was emitted if it passed all filters, otherwise a false value is returned.
- handleError(record)
Handle errors which occur during an emit() call.
This method should be called from handlers when an exception is encountered during an emit() call. If raiseExceptions is false, exceptions get silently ignored. This is what is mostly wanted for a logging system - most users will not care about errors in the logging system, they are more interested in application errors. You could, however, replace this with a custom handler if you wish. The record which was being processed is passed in to this method.
- property name
- release()
Release the I/O thread lock.
- removeFilter(filter)
Remove the specified filter from this handler.
- setFormatter(fmt)
Set the formatter for this handler.
- setLevel(level)
Set the logging level of this handler. level must be an int or a str.
- set_name(name)
- smarter.lib.logging.redis_log_handler.get_user_context(user)[source]
Retrieves the current user context for logging.
This function accesses the
user_id_contextcontext variable to get the current user ID or job ID associated with the logging context. This is used by theRedisLogHandlerto determine which Redis channel to publish log records to, allowing for both job-specific and global log streams.- Returns:
The current user context (user ID or job ID) for logging.
- Return type:
- smarter.lib.logging.redis_log_handler.job_id_factory(prefix='job')[source]
Factory method to generate a unique job ID.
This method creates a unique identifier for jobs or tasks, using a specified prefix and a random UUID. The resulting ID is formatted as “{prefix}_{uuid}”. This is used primarily for managing subscriptions to Server-Sent Events (SSE) channels, for ensuring that each subscription has a unique identifier.