Source code for smarter.apps.chatbot.management.commands.load_from_github

"""
This module is used to deploy a collection of customer API's from a GitHub repository containing plugin YAML files
organized in directories by customer API name.
"""

import logging
import os
import re
import subprocess
import sys
from typing import Optional

from smarter.apps.account.models import Account, User, UserProfile
from smarter.apps.account.utils import (
    get_cached_admin_user_for_account,
)
from smarter.apps.api.utils import apply_manifest
from smarter.apps.chatbot.models import ChatBot, ChatBotPlugin
from smarter.apps.plugin.manifest.controller import SAM_MAP, PluginController
from smarter.common.api import SmarterApiVersions
from smarter.common.conf import smarter_settings
from smarter.common.exceptions import SmarterValueError
from smarter.common.helpers.console_helpers import formatted_text
from smarter.lib.django.management.base import SmarterCommand
from smarter.lib.django.validators import SmarterValidator
from smarter.lib.manifest.loader import SAMLoader

logger = logging.getLogger(__name__)
logger_prefix = formatted_text(f"{__name__}")


# pylint: disable=E1101,too-many-instance-attributes
[docs] class Command(SmarterCommand): """ Deploy customer APIs and plugins from a GitHub repository containing YAML manifest files. This management command automates the deployment of chatbots and plugins for Smarter accounts by processing a public GitHub repository. The repository should contain YAML manifest files organized either by directories representing customer APIs (subdomains) or by separate 'plugins' and 'chatbots' folders, depending on the selected repo version. The command supports two repository layouts: - **Version 1:** Each folder in the repository represents a customer API (subdomain), and contains YAML plugin manifests. The command creates a chatbot for each folder and attaches plugins found within. - **Version 2:** The repository contains 'plugins' and 'chatbots' directories. All plugin manifests are processed first (to satisfy dependencies), followed by chatbot manifests. The deployment process includes: - Cloning the specified GitHub repository to a local directory. - Iterating through manifest files and applying them to the Smarter platform. - Creating chatbots and associating plugins as defined by the manifests. - Deploying chatbots asynchronously using Celery tasks. - Outputting progress, error, and completion messages to the console. Administrators can specify the target account by account number and optionally a username. The command ensures that all required account, user, and profile objects are available before processing manifests. It validates URLs, manages local repository cleanup, and handles errors encountered during manifest application or plugin loading. This command is useful for bulk provisioning, migration, or onboarding scenarios where multiple chatbots and plugins need to be deployed from a structured repository. It streamlines the process of setting up complex environments and ensures consistent deployment across accounts. """ _url: Optional[str] = None user: User account: Account user_profile: UserProfile
[docs] def __init__(self, stdout=None, stderr=None, no_color=False, force_color=False): super().__init__(stdout, stderr, no_color, force_color) self._url = None
@property def url(self) -> str: if not self._url: raise SmarterValueError("URL is required.") return self._url @url.setter def url(self, value): SmarterValidator.validate_url(value) self._url = value @property def local_path(self): return os.path.join(smarter_settings.data_directory, self.get_url_filename(self.url))
[docs] def get_url_filename(self, url) -> str: """ Get the filename from a URL. example: https://github.com/smarter-sh/smarter_demo/blob/main/hr/shrm_fmla.yaml returns "shrm_fmla.yaml" """ return url.split("/")[-1]
[docs] def clone_repo(self): """Synchronously clone a GitHub repository to the local file system.""" self.delete_repo() result = subprocess.call(["git", "clone", self.url, self.local_path]) if result != 0: raise subprocess.CalledProcessError( returncode=result, cmd=f"git clone {self.url} {self.local_path}", output="Failed to clone repository" ) else: logger.debug("%s: Cloned %s to %s", logger_prefix, self.url, self.local_path)
[docs] def delete_repo(self): """Delete a cloned GitHub repository from the local file system.""" if os.path.exists(self.local_path): result = subprocess.call(["rm", "-rf", self.local_path]) if result != 0: raise subprocess.CalledProcessError( returncode=result, cmd=f"rm -rf {self.local_path}", output="Failed to delete repository" ) else: logger.debug("%s: Deleted %s", logger_prefix, self.local_path)
[docs] def load_plugin(self, filespec: str): """Load a plugin from a file on the local file system.""" if not self.user_profile: raise SmarterValueError("User profile is required.") loader = SAMLoader( api_version=SmarterApiVersions.V1, file_path=filespec, ) if not loader.ready: logger.error("%s: manage.py create_plugin. SAMLoader is not ready.", logger_prefix) sys.exit(1) plugin_class = SAM_MAP[loader.manifest_kind] manifest = plugin_class(**loader.pydantic_model_dump()) logger.debug( "%s: Creating %s %s for account %s...", logger_prefix, plugin_class.__name__, manifest.metadata.name, self.user_profile, ) controller = PluginController(account=self.account, user=self.user, user_profile=self.user_profile, manifest=manifest) # type: ignore plugin = controller.obj return plugin
[docs] def process_repo_v2(self): """ Process a GitHub repository containing yaml manifest files. Folders are optional and can be used to organize the manifest files, but otherwise do not contain any special meaning. """ def process_directory(directory) -> None: directory_path = os.path.join(root, directory) for _, _, files in os.walk(directory_path): for file in files: if file.endswith(".yaml") or file.endswith(".yml"): filespec = os.path.join(directory_path, file) apply_manifest(filespec=filespec, username=self.user.username, verbose=True) if not self.user_profile: raise SmarterValueError("User profile is required.") self.clone_repo() # pylint: disable=too-many-nested-blocks for root, directory_names, _ in os.walk(self.local_path): if "plugins" in directory_names: # we need to process plugins first as these can be dependencies for chatbots logger.debug("%s: Processing plugins for account %s...", logger_prefix, self.account.account_number) process_directory(directory="plugins") if "chatbots" in directory_names: logger.debug("%s: Processing chatbots for account %s...", logger_prefix, self.account.account_number) process_directory(directory="chatbots")
[docs] def process_repo_v1(self): """ Process a GitHub repository containing yaml plugin files organized into folders, where each folder name is the subdomain for a customer API. """ def is_demo_folder(directory) -> bool: """returns true if the folder contains yaml or yml files""" VALID_HOST_PATTERN = r"(?!-)[A-Z\d-]{1,63}(?<!-)$" folder_name = os.path.basename(directory_path) if not re.fullmatch(VALID_HOST_PATTERN, folder_name, re.IGNORECASE): logger.debug("%s: Skipping folder: %s", logger_prefix, folder_name) return False for _, _, files in os.walk(directory): for file in files: if isinstance(file, str) and file.endswith(".yaml") or file.endswith(".yml"): return True return False if not self.user_profile: raise SmarterValueError("User profile is required.") self.clone_repo() # pylint: disable=too-many-nested-blocks for root, directory_names, _ in os.walk(self.local_path): for directory in [d for d in directory_names if not d.startswith(".")]: # yaml plugins are separated by directories # representing different kinds of demo plugins # (e.g. "hr", "sales-support", "government", "university-admissions", etc.) # and each directory contains a collection of yaml files. # # We're not currently doing anything with the directory names, # but we could use them to create a customer api of the same name. directory_path = os.path.join(root, directory) api_name = os.path.basename(directory_path) if is_demo_folder(directory=directory_path): logger.debug("%s: Processing API: %s", logger_prefix, api_name) chatbot, _ = ChatBot.objects.get_or_create(name=api_name, user_profile=self.user_profile) for _, _, files in os.walk(directory_path): for file in files: if file.endswith(".yaml") or file.endswith(".yml"): try: filespec = os.path.join(directory_path, file) filename = os.path.basename(filespec) logger.debug("%s: Loading plugin: %s", logger_prefix, filespec) plugin = self.load_plugin(filespec=filespec) if not plugin: logger.error( "%s: Error loading plugin: %s for user_profile %s", logger_prefix, filename, self.user_profile, ) continue ChatBotPlugin.objects.get_or_create(chatbot=chatbot, plugin_meta=plugin.plugin_meta) except Exception as e: logger.error( "%s: Error loading plugin: %s for user_profile %s: %s", logger_prefix, filename, self.user_profile, e, ) raise e chatbot.deployed = True chatbot.save(asynchronous=True)
[docs] def add_arguments(self, parser): """Add arguments to the command.""" parser.add_argument("-u", "--url", type=str, help="A url for a public GitHub repository.") parser.add_argument( "-a", "--account_number", type=str, nargs="?", default=None, help="Account number that will own the new plugin.", ) parser.add_argument("--username", type=str, nargs="?", default=None, help="A user associated with the account.") parser.add_argument( "--repo_version", type=str, nargs="?", default="1", help="The version of the Github repo reader." )
[docs] def handle(self, *args, **options): self.handle_begin() self.url = options["url"] account_number = options["account_number"] username = options["username"] repo_version = int(options["repo_version"]) logger.debug("%s: Deploying plugins from %s for account %s.", logger_prefix, self.url, account_number) if not account_number and not username: self.handle_completed_failure(msg="username and/or account_number is required.") raise SmarterValueError("username and/or account_number is required.") if account_number: self.account = Account.get_cached_object(invalidate=False, account_number=account_number) # type: ignore[assignment] if username: self.user = User.objects.get(username=username) else: self.user = get_cached_admin_user_for_account(account=self.account) if self.user is not None: self.user_profile = UserProfile.objects.get(user=self.user, account=self.account) try: if repo_version == 2: # iterate repo and apply manifests self.process_repo_v2() else: # iterate repo, assume that folders refer to chatbots, and load plugins self.process_repo_v1() # pylint: disable=broad-except except Exception as e: self.handle_completed_failure(e) return self.handle_completed_success()