Module libnova.common.nuclio.Request

Expand source code
#!/usr/bin/env python
# coding: utf-8

import json
from libnova                      import common
from libnova.common               import api
from libnova.common.api           import Driver, Storage, Container, File, Job, User, Trigger, Function, JobMessage
from libnova.common.filesystem    import S3
from libnova.common.filesystem.S3 import Storage as S3Storage

__instance = None


class Request:
    global __instance
    """Nuclio Request Helper.

    It has some built-in functions to help during the flow of a function
    Some examples are:

     * Start the job related to the function

     * Add log messages

     * Automatically retrieve the related files using the API

    Args:
        context (object): The Nuclio context object.
        event (str):    The Nuclio event object.
        silent (bool):   If silent is True, it will skip the logging of the Nuclio request to either console or log system.

    Attributes:
        JSONData (dict): Function request body as a dict.
        Storage (libnova.common.api.Storage): Storage related to the current function execution
        Container (libnova.common.api.Container): Container related to the current function execution
        User (libnova.common.api.User): User that has triggered the current function
        Function (libnova.common.api.Function): The function that is being ran
        Trigger (libnova.common.api.Trigger): The trigger that made this function be called
        Job (libnova.common.api.Job): The job related to the current function execution
        Files (list[libnova.common.api.File]): List of files related to the current function execution
        Parameters (dict): Extra input parameters filled by the user when calling the current function
        Context (object): Nuclio context object
        Context (event): Nuclio event object

    Returns:
        Request: A Request Helper instance

    """

    JSONData   = None

    Storage    = None
    Container  = None
    User       = None
    Function   = None
    Trigger    = None
    Job        = None
    Parameters = {}

    Files      = []

    # Nuclio data
    Context = None
    Event   = None

    # Function parameters
    FunctionNamespace = "function_data"

    def __str__(self):
        return str(self.__class__) + ": " + str(self.__dict__)

    def __init__(self, context, event, silent=True):
        # Set self as instance
        __instance = self

        self.Context = context
        self.Event   = event

        self.__cleanup()

        self.__process(event.body)

        if not silent:
            try:
                self.log(" - Processing event - " + "\n" + str(event.body.decode("utf-8")), JobMessage.JobMessageType.DEBUG)
            except:
                self.log(" - Processing event - " + "\n" + event.body, JobMessage.JobMessageType.DEBUG)

    def __cleanup(self):
        """Restores the variables to initial values to ensure no remaining data is kept between executions
        """
        self.JSONData   = None
        self.Storage    : Storage    = None
        self.Container  : Container  = None
        self.User       : User       = None
        self.Function   : Function   = None
        self.Trigger    : Trigger    = None
        self.Job        : Job        = None
        self.Files      : list(File) = []
        self.Parameters : dict       = {}

    def __process(self, request_body):
        """Retrieves all the information to the main class attributes
        """

        try:
            self.JSONData = json.loads(request_body)
            # Get main data as "sudo" to avoid permission issues
            self.sudo()

            self.Parameters.update(self.JSONData["function_params"].items())

            if "job" in self.JSONData[self.FunctionNamespace]:
                self.Job = Job.get(self.JSONData[self.FunctionNamespace]["job"]["id"])
            if self.Job is not None:
                if self.Job.status == Job.JobStatus.FAILED.name or self.Job.status == Job.JobStatus.COMPLETED.name:
                    self.Job = None
                    raise ValueError('The job has already been completed, skipping...')

            if "container" in self.JSONData[self.FunctionNamespace]:
                if Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"]) is not None:
                    self.Container = Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"])

            if "user" in self.JSONData[self.FunctionNamespace]:
                self.User = User.get(self.JSONData[self.FunctionNamespace]["user"]["id"])

            if "trigger" in self.JSONData[self.FunctionNamespace]:
                self.Trigger = Trigger.get(self.JSONData[self.FunctionNamespace]["trigger"]["id"])

            if "function" in self.JSONData[self.FunctionNamespace]:
                self.Function = Function.get(self.JSONData[self.FunctionNamespace]["function"]["id"])

            if self.Container is not None:
                self.Storage   = Storage.get(self.Container.storage_id)

            if self.Storage is not None:
                # Platform S3 Storage Driver
                S3Storage.initialize_storage(self.Storage)

            if "ids" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["ids"]) > 0:
                for file_id in self.JSONData[self.FunctionNamespace]["files"]["ids"]:
                    file = File.get(file_id)
                    if file is not None:
                        self.Files.append(file)
            if len(self.Files) == 0:
                if self.Container is not None:
                    if "paths" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["paths"]) > 0:
                        for file_path in self.JSONData[self.FunctionNamespace]["files"]["paths"]:
                            self.Files.append(File.get_by_path(self.Container.id, file_path))
                            pass

            # But later on switch to the user API credentials
            self.unsudo()
        except Exception as e:
            # Input is not a JSON, Ehe te nandayo!
            print(e)
            import traceback
            traceback.print_exc()

            pass

    def parameter(self, parameter_name):
        """Get the value of a user input parameter.

        Args:
            parameter_name (str): The user parameter name whose value we want

        Returns:
            object: The value of the user parameter if exists
        """

        if parameter_name in self.Parameters:
            return self.Parameters[parameter_name]
        return None

    def log(self, message, message_type = JobMessage.JobMessageType.INFO, file_id = None):
        """This main logging function will log messages by the following cryteria:

         * If this is being executed inside Nuclio, it will add a message to the Nuclio function log

         * Else, it will print the message by console

         * If the current Job is not None, it will add the message to the Job Message table

        Args:
            message (str): The message to log
            message_type (JobMessage.JobMessageType): The log level of the message
            file_id (int): It will be used to relate the Job Message entry with an specific file
        """

        if self.Context is not None:
            nuclio_message = "[" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
            if message_type == JobMessage.JobMessageType.ERROR:
                self.Context.logger.error(nuclio_message)
            if message_type == JobMessage.JobMessageType.WARNING:
                self.Context.logger.warn(nuclio_message)
            if message_type == JobMessage.JobMessageType.INFO:
                self.Context.logger.info(nuclio_message)
            if message_type == JobMessage.JobMessageType.DEBUG:
                self.Context.logger._debug(nuclio_message)
        else:
            console_message = "[" + message_type.name + "] [" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
            print(console_message)

        if self.Job is not None:
            return JobMessage.add(self.Job.id, message, message_type, file_id)

    def job_init(self, function_id = None, container_id = None):
        """Sets the current job (if any) to status RUNNING.

        If the job does not exist, one job will be created
        """

        if self.Job is None:
            if self.Function is not None:
                self.Job = Job.create(
                    function_id  if function_id  is not None else self.Function.id,
                    container_id if container_id is not None else (
                        self.Container.id if self.Container is not None else None
                    )
                )
                if self.Job is not None:
                    # Likely to be ran using a command line, print the new job
                    print(f"Created job #{str(self.Job.id)}")
        if self.Job is not None:
            job_init_result = Job.set_status(self.Job.id, Job.JobStatus.RUNNING)
            if 'success' in job_init_result and job_init_result['success'] is False:
                raise ValueError(f'The job #{str(self.Job.id)} cannot be initialized, skipping...')
            return job_init_result

    def job_end(self, success: bool = True):
        """Sets the current job to either COMPLETED or FAILED depending of the `success` value.

        Args:
            success (bool): True if the job has completed successfully, False otherwise
        """

        if self.Job is not None:
            Job.set_status(self.Job.id, Job.JobStatus.COMPLETED if success else Job.JobStatus.FAILED)

    def job_asset_add(self, file : File):
        """Adds a `File` to the current `Job`.

        Args:
            file (File): The `File` object to relate to the current `Job`
        """

        if self.Job is not None:
            Job.add_asset(self.Job.id, file)

    def sudo(self):
        """Initializes the Api Driver using `root` credentials.
        """

        # Initialize API Driver
        api.Driver.initialize(
            str(self.JSONData["api"]["url"]),
            str(self.JSONData["api"]["key_root"]),
            True
        )

    def unsudo(self):
        """Initializes the Api Driver using `user` credentials.
        """

        # Initialize API Driver
        api.Driver.initialize(
            str(self.JSONData["api"]["url"]),
            str(self.JSONData["api"]["key_user"]),
            True
        )

    def add_dependencies(self, packages, check_and_import = False):
        """Ensure that the given dependencies are correctly installed and imported
        """
        import pip
        import importlib

        # Import the package to check if it is installed and if not install it
        if check_and_import:
            for package in packages:
                try:
                    importlib.import_module(package)
                except ImportError:
                    pip.main(['install', package])
                    importlib.import_module(package)
        # Just install the package to ensure it actually is installed
        else:
            for package in packages:
                pip.main(['install', package])

    def add_dependencies_old(self, packages, check_and_import = False):
        """Ensure that the given dependencies are correctly installed and imported
        """
        import pip
        import importlib

        # Import the package to check if it is installed and if not install it
        if check_and_import:
            for package in packages:
                try:
                    __import__(package)
                except ImportError:
                    pip.main(['install', package])
                    __import__(package)
        # Just install the package to ensure it actually is installed
        else:
            for package in packages:
                pip.main(['install', package])


if __name__ == "__main__":
    print('This file cannot be executed directly!')

Classes

class Request (context, event, silent=True)
Expand source code
class Request:
    global __instance
    """Nuclio Request Helper.

    It has some built-in functions to help during the flow of a function
    Some examples are:

     * Start the job related to the function

     * Add log messages

     * Automatically retrieve the related files using the API

    Args:
        context (object): The Nuclio context object.
        event (str):    The Nuclio event object.
        silent (bool):   If silent is True, it will skip the logging of the Nuclio request to either console or log system.

    Attributes:
        JSONData (dict): Function request body as a dict.
        Storage (libnova.common.api.Storage): Storage related to the current function execution
        Container (libnova.common.api.Container): Container related to the current function execution
        User (libnova.common.api.User): User that has triggered the current function
        Function (libnova.common.api.Function): The function that is being ran
        Trigger (libnova.common.api.Trigger): The trigger that made this function be called
        Job (libnova.common.api.Job): The job related to the current function execution
        Files (list[libnova.common.api.File]): List of files related to the current function execution
        Parameters (dict): Extra input parameters filled by the user when calling the current function
        Context (object): Nuclio context object
        Context (event): Nuclio event object

    Returns:
        Request: A Request Helper instance

    """

    JSONData   = None

    Storage    = None
    Container  = None
    User       = None
    Function   = None
    Trigger    = None
    Job        = None
    Parameters = {}

    Files      = []

    # Nuclio data
    Context = None
    Event   = None

    # Function parameters
    FunctionNamespace = "function_data"

    def __str__(self):
        return str(self.__class__) + ": " + str(self.__dict__)

    def __init__(self, context, event, silent=True):
        # Set self as instance
        __instance = self

        self.Context = context
        self.Event   = event

        self.__cleanup()

        self.__process(event.body)

        if not silent:
            try:
                self.log(" - Processing event - " + "\n" + str(event.body.decode("utf-8")), JobMessage.JobMessageType.DEBUG)
            except:
                self.log(" - Processing event - " + "\n" + event.body, JobMessage.JobMessageType.DEBUG)

    def __cleanup(self):
        """Restores the variables to initial values to ensure no remaining data is kept between executions
        """
        self.JSONData   = None
        self.Storage    : Storage    = None
        self.Container  : Container  = None
        self.User       : User       = None
        self.Function   : Function   = None
        self.Trigger    : Trigger    = None
        self.Job        : Job        = None
        self.Files      : list(File) = []
        self.Parameters : dict       = {}

    def __process(self, request_body):
        """Retrieves all the information to the main class attributes
        """

        try:
            self.JSONData = json.loads(request_body)
            # Get main data as "sudo" to avoid permission issues
            self.sudo()

            self.Parameters.update(self.JSONData["function_params"].items())

            if "job" in self.JSONData[self.FunctionNamespace]:
                self.Job = Job.get(self.JSONData[self.FunctionNamespace]["job"]["id"])
            if self.Job is not None:
                if self.Job.status == Job.JobStatus.FAILED.name or self.Job.status == Job.JobStatus.COMPLETED.name:
                    self.Job = None
                    raise ValueError('The job has already been completed, skipping...')

            if "container" in self.JSONData[self.FunctionNamespace]:
                if Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"]) is not None:
                    self.Container = Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"])

            if "user" in self.JSONData[self.FunctionNamespace]:
                self.User = User.get(self.JSONData[self.FunctionNamespace]["user"]["id"])

            if "trigger" in self.JSONData[self.FunctionNamespace]:
                self.Trigger = Trigger.get(self.JSONData[self.FunctionNamespace]["trigger"]["id"])

            if "function" in self.JSONData[self.FunctionNamespace]:
                self.Function = Function.get(self.JSONData[self.FunctionNamespace]["function"]["id"])

            if self.Container is not None:
                self.Storage   = Storage.get(self.Container.storage_id)

            if self.Storage is not None:
                # Platform S3 Storage Driver
                S3Storage.initialize_storage(self.Storage)

            if "ids" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["ids"]) > 0:
                for file_id in self.JSONData[self.FunctionNamespace]["files"]["ids"]:
                    file = File.get(file_id)
                    if file is not None:
                        self.Files.append(file)
            if len(self.Files) == 0:
                if self.Container is not None:
                    if "paths" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["paths"]) > 0:
                        for file_path in self.JSONData[self.FunctionNamespace]["files"]["paths"]:
                            self.Files.append(File.get_by_path(self.Container.id, file_path))
                            pass

            # But later on switch to the user API credentials
            self.unsudo()
        except Exception as e:
            # Input is not a JSON, Ehe te nandayo!
            print(e)
            import traceback
            traceback.print_exc()

            pass

    def parameter(self, parameter_name):
        """Get the value of a user input parameter.

        Args:
            parameter_name (str): The user parameter name whose value we want

        Returns:
            object: The value of the user parameter if exists
        """

        if parameter_name in self.Parameters:
            return self.Parameters[parameter_name]
        return None

    def log(self, message, message_type = JobMessage.JobMessageType.INFO, file_id = None):
        """This main logging function will log messages by the following cryteria:

         * If this is being executed inside Nuclio, it will add a message to the Nuclio function log

         * Else, it will print the message by console

         * If the current Job is not None, it will add the message to the Job Message table

        Args:
            message (str): The message to log
            message_type (JobMessage.JobMessageType): The log level of the message
            file_id (int): It will be used to relate the Job Message entry with an specific file
        """

        if self.Context is not None:
            nuclio_message = "[" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
            if message_type == JobMessage.JobMessageType.ERROR:
                self.Context.logger.error(nuclio_message)
            if message_type == JobMessage.JobMessageType.WARNING:
                self.Context.logger.warn(nuclio_message)
            if message_type == JobMessage.JobMessageType.INFO:
                self.Context.logger.info(nuclio_message)
            if message_type == JobMessage.JobMessageType.DEBUG:
                self.Context.logger._debug(nuclio_message)
        else:
            console_message = "[" + message_type.name + "] [" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
            print(console_message)

        if self.Job is not None:
            return JobMessage.add(self.Job.id, message, message_type, file_id)

    def job_init(self, function_id = None, container_id = None):
        """Sets the current job (if any) to status RUNNING.

        If the job does not exist, one job will be created
        """

        if self.Job is None:
            if self.Function is not None:
                self.Job = Job.create(
                    function_id  if function_id  is not None else self.Function.id,
                    container_id if container_id is not None else (
                        self.Container.id if self.Container is not None else None
                    )
                )
                if self.Job is not None:
                    # Likely to be ran using a command line, print the new job
                    print(f"Created job #{str(self.Job.id)}")
        if self.Job is not None:
            job_init_result = Job.set_status(self.Job.id, Job.JobStatus.RUNNING)
            if 'success' in job_init_result and job_init_result['success'] is False:
                raise ValueError(f'The job #{str(self.Job.id)} cannot be initialized, skipping...')
            return job_init_result

    def job_end(self, success: bool = True):
        """Sets the current job to either COMPLETED or FAILED depending of the `success` value.

        Args:
            success (bool): True if the job has completed successfully, False otherwise
        """

        if self.Job is not None:
            Job.set_status(self.Job.id, Job.JobStatus.COMPLETED if success else Job.JobStatus.FAILED)

    def job_asset_add(self, file : File):
        """Adds a `File` to the current `Job`.

        Args:
            file (File): The `File` object to relate to the current `Job`
        """

        if self.Job is not None:
            Job.add_asset(self.Job.id, file)

    def sudo(self):
        """Initializes the Api Driver using `root` credentials.
        """

        # Initialize API Driver
        api.Driver.initialize(
            str(self.JSONData["api"]["url"]),
            str(self.JSONData["api"]["key_root"]),
            True
        )

    def unsudo(self):
        """Initializes the Api Driver using `user` credentials.
        """

        # Initialize API Driver
        api.Driver.initialize(
            str(self.JSONData["api"]["url"]),
            str(self.JSONData["api"]["key_user"]),
            True
        )

    def add_dependencies(self, packages, check_and_import = False):
        """Ensure that the given dependencies are correctly installed and imported
        """
        import pip
        import importlib

        # Import the package to check if it is installed and if not install it
        if check_and_import:
            for package in packages:
                try:
                    importlib.import_module(package)
                except ImportError:
                    pip.main(['install', package])
                    importlib.import_module(package)
        # Just install the package to ensure it actually is installed
        else:
            for package in packages:
                pip.main(['install', package])

    def add_dependencies_old(self, packages, check_and_import = False):
        """Ensure that the given dependencies are correctly installed and imported
        """
        import pip
        import importlib

        # Import the package to check if it is installed and if not install it
        if check_and_import:
            for package in packages:
                try:
                    __import__(package)
                except ImportError:
                    pip.main(['install', package])
                    __import__(package)
        # Just install the package to ensure it actually is installed
        else:
            for package in packages:
                pip.main(['install', package])

Class variables

var Container
var Context
var Event
var Files
var Function
var FunctionNamespace
var JSONData
var Job
var Parameters
var Storage
var Trigger
var User

Methods

def add_dependencies(self, packages, check_and_import=False)

Ensure that the given dependencies are correctly installed and imported

Expand source code
def add_dependencies(self, packages, check_and_import = False):
    """Ensure that the given dependencies are correctly installed and imported
    """
    import pip
    import importlib

    # Import the package to check if it is installed and if not install it
    if check_and_import:
        for package in packages:
            try:
                importlib.import_module(package)
            except ImportError:
                pip.main(['install', package])
                importlib.import_module(package)
    # Just install the package to ensure it actually is installed
    else:
        for package in packages:
            pip.main(['install', package])
def add_dependencies_old(self, packages, check_and_import=False)

Ensure that the given dependencies are correctly installed and imported

Expand source code
def add_dependencies_old(self, packages, check_and_import = False):
    """Ensure that the given dependencies are correctly installed and imported
    """
    import pip
    import importlib

    # Import the package to check if it is installed and if not install it
    if check_and_import:
        for package in packages:
            try:
                __import__(package)
            except ImportError:
                pip.main(['install', package])
                __import__(package)
    # Just install the package to ensure it actually is installed
    else:
        for package in packages:
            pip.main(['install', package])
def job_asset_add(self, file: libnova.common.api.File)

Adds a File to the current Job.

Args

file : File
The File object to relate to the current Job
Expand source code
def job_asset_add(self, file : File):
    """Adds a `File` to the current `Job`.

    Args:
        file (File): The `File` object to relate to the current `Job`
    """

    if self.Job is not None:
        Job.add_asset(self.Job.id, file)
def job_end(self, success: bool = True)

Sets the current job to either COMPLETED or FAILED depending of the success value.

Args

success : bool
True if the job has completed successfully, False otherwise
Expand source code
def job_end(self, success: bool = True):
    """Sets the current job to either COMPLETED or FAILED depending of the `success` value.

    Args:
        success (bool): True if the job has completed successfully, False otherwise
    """

    if self.Job is not None:
        Job.set_status(self.Job.id, Job.JobStatus.COMPLETED if success else Job.JobStatus.FAILED)
def job_init(self, function_id=None, container_id=None)

Sets the current job (if any) to status RUNNING.

If the job does not exist, one job will be created

Expand source code
def job_init(self, function_id = None, container_id = None):
    """Sets the current job (if any) to status RUNNING.

    If the job does not exist, one job will be created
    """

    if self.Job is None:
        if self.Function is not None:
            self.Job = Job.create(
                function_id  if function_id  is not None else self.Function.id,
                container_id if container_id is not None else (
                    self.Container.id if self.Container is not None else None
                )
            )
            if self.Job is not None:
                # Likely to be ran using a command line, print the new job
                print(f"Created job #{str(self.Job.id)}")
    if self.Job is not None:
        job_init_result = Job.set_status(self.Job.id, Job.JobStatus.RUNNING)
        if 'success' in job_init_result and job_init_result['success'] is False:
            raise ValueError(f'The job #{str(self.Job.id)} cannot be initialized, skipping...')
        return job_init_result
def log(self, message, message_type=JobMessageType.INFO, file_id=None)

This main logging function will log messages by the following cryteria:

  • If this is being executed inside Nuclio, it will add a message to the Nuclio function log

  • Else, it will print the message by console

  • If the current Job is not None, it will add the message to the Job Message table

Args

message : str
The message to log
message_type : JobMessage.JobMessageType
The log level of the message
file_id : int
It will be used to relate the Job Message entry with an specific file
Expand source code
def log(self, message, message_type = JobMessage.JobMessageType.INFO, file_id = None):
    """This main logging function will log messages by the following cryteria:

     * If this is being executed inside Nuclio, it will add a message to the Nuclio function log

     * Else, it will print the message by console

     * If the current Job is not None, it will add the message to the Job Message table

    Args:
        message (str): The message to log
        message_type (JobMessage.JobMessageType): The log level of the message
        file_id (int): It will be used to relate the Job Message entry with an specific file
    """

    if self.Context is not None:
        nuclio_message = "[" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
        if message_type == JobMessage.JobMessageType.ERROR:
            self.Context.logger.error(nuclio_message)
        if message_type == JobMessage.JobMessageType.WARNING:
            self.Context.logger.warn(nuclio_message)
        if message_type == JobMessage.JobMessageType.INFO:
            self.Context.logger.info(nuclio_message)
        if message_type == JobMessage.JobMessageType.DEBUG:
            self.Context.logger._debug(nuclio_message)
    else:
        console_message = "[" + message_type.name + "] [" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
        print(console_message)

    if self.Job is not None:
        return JobMessage.add(self.Job.id, message, message_type, file_id)
def parameter(self, parameter_name)

Get the value of a user input parameter.

Args

parameter_name : str
The user parameter name whose value we want

Returns

object
The value of the user parameter if exists
Expand source code
def parameter(self, parameter_name):
    """Get the value of a user input parameter.

    Args:
        parameter_name (str): The user parameter name whose value we want

    Returns:
        object: The value of the user parameter if exists
    """

    if parameter_name in self.Parameters:
        return self.Parameters[parameter_name]
    return None
def sudo(self)

Initializes the Api Driver using root credentials.

Expand source code
def sudo(self):
    """Initializes the Api Driver using `root` credentials.
    """

    # Initialize API Driver
    api.Driver.initialize(
        str(self.JSONData["api"]["url"]),
        str(self.JSONData["api"]["key_root"]),
        True
    )
def unsudo(self)

Initializes the Api Driver using user credentials.

Expand source code
def unsudo(self):
    """Initializes the Api Driver using `user` credentials.
    """

    # Initialize API Driver
    api.Driver.initialize(
        str(self.JSONData["api"]["url"]),
        str(self.JSONData["api"]["key_user"]),
        True
    )