Source code for smarter.apps.vectorstore.tasks

"""Celery tasks for the vectorstore app."""

import logging
import os

from smarter.apps.vectorstore.models import VectorestoreMeta
from smarter.apps.vectorstore.service import VectorstoreService
from smarter.common.conf import smarter_settings
from smarter.common.helpers.console_helpers import formatted_text
from smarter.lib.django import waffle
from smarter.lib.django.waffle import SmarterWaffleSwitches
from smarter.lib.logging import WaffleSwitchedLoggerWrapper, user_id_context
from smarter.workers.celery import app


# pylint: disable=W0613
[docs] def should_log(level): """Check if logging should be done based on the waffle switch.""" return waffle.switch_is_active(SmarterWaffleSwitches.TASK_LOGGING) or waffle.switch_is_active( SmarterWaffleSwitches.VECTORSTORE_LOGGING )
base_logger = logging.getLogger(__name__) base_logger = logging.getLogger(__name__) logger = WaffleSwitchedLoggerWrapper(base_logger, should_log) logger_prefix = formatted_text(__name__) HERE = os.path.abspath(os.path.dirname(__file__)) @app.task( bind=True, autoretry_for=(Exception,), retry_backoff=smarter_settings.llm_client_tasks_celery_retry_backoff, max_retries=smarter_settings.llm_client_tasks_celery_max_retries, queue=smarter_settings.llm_client_tasks_celery_task_queue, ) def embed_and_load_pdf(self) -> bool: """Celery task to load pdf documents into a vectorstore.""" job_id = self.request.id token = user_id_context.set(job_id) try: db = VectorestoreMeta.objects.first() if not db: logger.error(f"{logger_prefix} No vector database found.") return False service = VectorstoreService(db=db) service.pdf_loader("") return True # pylint: disable=broad-except except Exception as e: logger.error(f"{logger_prefix} Error occurred: {e}") return False finally: user_id_context.reset(token)