Module libnova.common.nuclio.Request

Expand source code
#!/usr/bin/env python
# coding: utf-8

import os
import json
from libnova                     import common
from libnova.common            import api
from libnova.common.api        import Driver, Storage, Container, File, Job, User, Trigger, Function, JobMessage
from libnova.common.filesystem import S3

class Request:
    """Nuclio Request Helper.

    It has some built-in functions to help during the flow of a function
    Some examples are:

     * Start the job related to the function

     * Add log messages

     * Automatically retrieve the related files using the API

    Args:
        context (object): The Nuclio context object.
        event (str):    The Nuclio event object.
        silent (bool):   If silent is True, it will skip the logging of the Nuclio request to either console or log system.

    Attributes:
        JSONData (dict): Function request body as a dict.
        Storage (libnova.common.api.Storage): Storage related to the current function execution
        Container (libnova.common.api.Container): Container related to the current function execution
        User (libnova.common.api.User): User that has triggered the current function
        Function (libnova.common.api.Function): The function that is being ran
        Trigger (libnova.common.api.Trigger): The trigger that made this function be called
        Job (libnova.common.api.Job): The job related to the current function execution
        Files (list[libnova.common.api.File]): List of files related to the current function execution
        Parameters (dict): Extra input parameters filled by the user when calling the current function
        Context (object): Nuclio context object
        Context (event): Nuclio event object

    Returns:
        Request: A Request Helper instance

    """

    JSONData   = None

    Storage    = None
    Container  = None
    User       = None
    Function   = None
    Trigger    = None
    Job        = None
    Parameters = {}

    Files      = []

    # Nuclio data
    Context = None
    Event   = None

    # Function parameters
    FunctionNamespace = "function_data"

    def __str__(self):
        return str(self.__class__) + ": " + str(self.__dict__)

    def __init__(self, context, event, silent=False):
        self.Context = context
        self.Event   = event

        self.__cleanup()

        self.__process(event.body)

        if not silent:
            try:
                self.log(" - Processing event - " + "\n" + event.body.decode("utf-8"), JobMessage.JobMessageType.DEBUG)
            except:
                self.log(" - Processing event - " + "\n" + event.body, JobMessage.JobMessageType.DEBUG)

    def __cleanup(self):
        """Restores the variables to initial values to ensure no remaining data is kept between executions
        """
        self.JSONData   = None
        self.Storage    : Storage    = None
        self.Container  : Container  = None
        self.User       : User       = None
        self.Function   : Function   = None
        self.Trigger    : Trigger    = None
        self.Job        : Job        = None
        self.Files      : list(File) = []
        self.Parameters : dict       = {}

    def __process(self, request_body):
        """Retrieves all the information to the main class attributes
        """
        try:
            self.JSONData = json.loads(request_body)
            # Get main data as "sudo" to avoid permission issues
            self.sudo()

            self.Parameters.update(self.JSONData["function_params"].items())

            if "container" in self.JSONData[self.FunctionNamespace]:
                if Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"]) is not None:
                    self.Container = Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"])

            if "user" in self.JSONData[self.FunctionNamespace]:
                self.User = User.get(self.JSONData[self.FunctionNamespace]["user"]["id"])

            if "job" in self.JSONData[self.FunctionNamespace]:
                self.Job = Job.get(self.JSONData[self.FunctionNamespace]["job"]["id"])

            if "trigger" in self.JSONData[self.FunctionNamespace]:
                self.Trigger = Trigger.get(self.JSONData[self.FunctionNamespace]["trigger"]["id"])

            if "function" in self.JSONData[self.FunctionNamespace]:
                self.Function = Function.get(self.JSONData[self.FunctionNamespace]["function"]["id"])

            if self.Container is not None:
                self.Storage   = Storage.get(self.Container.storage_id)

            if self.Storage is not None:
                # Platform S3 Storage Driver
                S3.Storage.initialize_storage(self.Storage)

            if "ids" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["ids"]) > 0:
                for file_id in self.JSONData[self.FunctionNamespace]["files"]["ids"]:
                    file = File.get(file_id)
                    if file is not None:
                        self.Files.append(file)
            if len(self.Files) == 0:
                if self.Container is not None:
                    if "paths" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["paths"]) > 0:
                        for file_path in self.JSONData[self.FunctionNamespace]["files"]["paths"]:
                            self.Files.append(File.get_by_path(self.Container.id, file_path))
                            pass

            # But later on switch to the user API credentials
            self.unsudo()
        except Exception as e:
            # Input is not a JSON, Ehe te nandayo!
            pass

    def parameter(self, parameter_name):
        """Get the value of a user input parameter.

        Args:
            parameter_name (str): The user parameter name whose value we want

        Returns:
            object: The value of the user parameter if exists
        """

        if parameter_name in self.Parameters:
            return self.Parameters[parameter_name]
        return None

    def log(self, message, message_type = JobMessage.JobMessageType.INFO, file_id = None):
        """This main logging function will log messages by the following cryteria:

         * If this is being executed inside Nuclio, it will add a message to the Nuclio function log

         * Else, it will print the message by console

         * If the current Job is not None, it will add the message to the Job Message table

        Args:
            message (str): The message to log
            message_type (JobMessage.JobMessageType): The log level of the message
            file_id (int): It will be used to relate the Job Message entry with an specific file
        """

        if self.Context is not None:
            nuclio_message = "[" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
            if message_type == JobMessage.JobMessageType.ERROR:
                self.Context.logger.error(nuclio_message)
            if message_type == JobMessage.JobMessageType.WARNING:
                self.Context.logger.warn(nuclio_message)
            if message_type == JobMessage.JobMessageType.INFO:
                self.Context.logger.info(nuclio_message)
            if message_type == JobMessage.JobMessageType.DEBUG:
                self.Context.logger.debug(nuclio_message)
        else:
            console_message = "[" + message_type.name + "] [" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
            print(console_message)

        if self.Job is not None:
            return JobMessage.add(self.Job.id, message, message_type, file_id)

    def job_init(self):
        """Sets the current job (if any) to status RUNNING.

        If the job does not exist, one job will be created
        """

        if self.Job is None:
            self.Job = Job.create()
        if self.Job is not None:
            Job.set_status(self.Job.id, Job.JobStatus.RUNNING)

    def job_end(self, success: bool = True):
        """Sets the current job to either COMPLETED or FAILED depending of the `success` value.

        Args:
            success (bool): True if the job has completed successfully, False otherwise
        """

        if self.Job is not None:
            Job.set_status(self.Job.id, Job.JobStatus.COMPLETED if success else Job.JobStatus.FAILED)

    def sudo(self):
        """Initializes the Api Driver using `root` credentials.
        """

        # Initialize API Driver
        api.Driver.initialize(
            str(self.JSONData["api"]["url"]),
            str(self.JSONData["api"]["key_root"]),
            True
        )

    def unsudo(self):
        """Initializes the Api Driver using `user` credentials.
        """

        # Initialize API Driver
        api.Driver.initialize(
            str(self.JSONData["api"]["url"]),
            str(self.JSONData["api"]["key_user"]),
            True
        )


if __name__ == "__main__":
    print('This file cannot be executed directly!')

Classes

class Request (context, event, silent=False)

Nuclio Request Helper.

It has some built-in functions to help during the flow of a function Some examples are:

  • Start the job related to the function

  • Add log messages

  • Automatically retrieve the related files using the API

Args

context : object
The Nuclio context object.
event : str
The Nuclio event object.
silent : bool
If silent is True, it will skip the logging of the Nuclio request to either console or log system.

Attributes

JSONData : dict
Function request body as a dict.
Storage : libnova.common.api.Storage
Storage related to the current function execution
Container : libnova.common.api.Container
Container related to the current function execution
User : libnova.common.api.User
User that has triggered the current function
Function : libnova.common.api.Function
The function that is being ran
Trigger : libnova.common.api.Trigger
The trigger that made this function be called
Job : libnova.common.api.Job
The job related to the current function execution
Files : list[libnova.common.api.File]
List of files related to the current function execution
Parameters : dict
Extra input parameters filled by the user when calling the current function
Context : object
Nuclio context object
Context : event
Nuclio event object

Returns

Request
A Request Helper instance
Expand source code
class Request:
    """Nuclio Request Helper.

    It has some built-in functions to help during the flow of a function
    Some examples are:

     * Start the job related to the function

     * Add log messages

     * Automatically retrieve the related files using the API

    Args:
        context (object): The Nuclio context object.
        event (str):    The Nuclio event object.
        silent (bool):   If silent is True, it will skip the logging of the Nuclio request to either console or log system.

    Attributes:
        JSONData (dict): Function request body as a dict.
        Storage (libnova.common.api.Storage): Storage related to the current function execution
        Container (libnova.common.api.Container): Container related to the current function execution
        User (libnova.common.api.User): User that has triggered the current function
        Function (libnova.common.api.Function): The function that is being ran
        Trigger (libnova.common.api.Trigger): The trigger that made this function be called
        Job (libnova.common.api.Job): The job related to the current function execution
        Files (list[libnova.common.api.File]): List of files related to the current function execution
        Parameters (dict): Extra input parameters filled by the user when calling the current function
        Context (object): Nuclio context object
        Context (event): Nuclio event object

    Returns:
        Request: A Request Helper instance

    """

    JSONData   = None

    Storage    = None
    Container  = None
    User       = None
    Function   = None
    Trigger    = None
    Job        = None
    Parameters = {}

    Files      = []

    # Nuclio data
    Context = None
    Event   = None

    # Function parameters
    FunctionNamespace = "function_data"

    def __str__(self):
        return str(self.__class__) + ": " + str(self.__dict__)

    def __init__(self, context, event, silent=False):
        self.Context = context
        self.Event   = event

        self.__cleanup()

        self.__process(event.body)

        if not silent:
            try:
                self.log(" - Processing event - " + "\n" + event.body.decode("utf-8"), JobMessage.JobMessageType.DEBUG)
            except:
                self.log(" - Processing event - " + "\n" + event.body, JobMessage.JobMessageType.DEBUG)

    def __cleanup(self):
        """Restores the variables to initial values to ensure no remaining data is kept between executions
        """
        self.JSONData   = None
        self.Storage    : Storage    = None
        self.Container  : Container  = None
        self.User       : User       = None
        self.Function   : Function   = None
        self.Trigger    : Trigger    = None
        self.Job        : Job        = None
        self.Files      : list(File) = []
        self.Parameters : dict       = {}

    def __process(self, request_body):
        """Retrieves all the information to the main class attributes
        """
        try:
            self.JSONData = json.loads(request_body)
            # Get main data as "sudo" to avoid permission issues
            self.sudo()

            self.Parameters.update(self.JSONData["function_params"].items())

            if "container" in self.JSONData[self.FunctionNamespace]:
                if Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"]) is not None:
                    self.Container = Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"])

            if "user" in self.JSONData[self.FunctionNamespace]:
                self.User = User.get(self.JSONData[self.FunctionNamespace]["user"]["id"])

            if "job" in self.JSONData[self.FunctionNamespace]:
                self.Job = Job.get(self.JSONData[self.FunctionNamespace]["job"]["id"])

            if "trigger" in self.JSONData[self.FunctionNamespace]:
                self.Trigger = Trigger.get(self.JSONData[self.FunctionNamespace]["trigger"]["id"])

            if "function" in self.JSONData[self.FunctionNamespace]:
                self.Function = Function.get(self.JSONData[self.FunctionNamespace]["function"]["id"])

            if self.Container is not None:
                self.Storage   = Storage.get(self.Container.storage_id)

            if self.Storage is not None:
                # Platform S3 Storage Driver
                S3.Storage.initialize_storage(self.Storage)

            if "ids" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["ids"]) > 0:
                for file_id in self.JSONData[self.FunctionNamespace]["files"]["ids"]:
                    file = File.get(file_id)
                    if file is not None:
                        self.Files.append(file)
            if len(self.Files) == 0:
                if self.Container is not None:
                    if "paths" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["paths"]) > 0:
                        for file_path in self.JSONData[self.FunctionNamespace]["files"]["paths"]:
                            self.Files.append(File.get_by_path(self.Container.id, file_path))
                            pass

            # But later on switch to the user API credentials
            self.unsudo()
        except Exception as e:
            # Input is not a JSON, Ehe te nandayo!
            pass

    def parameter(self, parameter_name):
        """Get the value of a user input parameter.

        Args:
            parameter_name (str): The user parameter name whose value we want

        Returns:
            object: The value of the user parameter if exists
        """

        if parameter_name in self.Parameters:
            return self.Parameters[parameter_name]
        return None

    def log(self, message, message_type = JobMessage.JobMessageType.INFO, file_id = None):
        """This main logging function will log messages by the following cryteria:

         * If this is being executed inside Nuclio, it will add a message to the Nuclio function log

         * Else, it will print the message by console

         * If the current Job is not None, it will add the message to the Job Message table

        Args:
            message (str): The message to log
            message_type (JobMessage.JobMessageType): The log level of the message
            file_id (int): It will be used to relate the Job Message entry with an specific file
        """

        if self.Context is not None:
            nuclio_message = "[" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
            if message_type == JobMessage.JobMessageType.ERROR:
                self.Context.logger.error(nuclio_message)
            if message_type == JobMessage.JobMessageType.WARNING:
                self.Context.logger.warn(nuclio_message)
            if message_type == JobMessage.JobMessageType.INFO:
                self.Context.logger.info(nuclio_message)
            if message_type == JobMessage.JobMessageType.DEBUG:
                self.Context.logger.debug(nuclio_message)
        else:
            console_message = "[" + message_type.name + "] [" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
            print(console_message)

        if self.Job is not None:
            return JobMessage.add(self.Job.id, message, message_type, file_id)

    def job_init(self):
        """Sets the current job (if any) to status RUNNING.

        If the job does not exist, one job will be created
        """

        if self.Job is None:
            self.Job = Job.create()
        if self.Job is not None:
            Job.set_status(self.Job.id, Job.JobStatus.RUNNING)

    def job_end(self, success: bool = True):
        """Sets the current job to either COMPLETED or FAILED depending of the `success` value.

        Args:
            success (bool): True if the job has completed successfully, False otherwise
        """

        if self.Job is not None:
            Job.set_status(self.Job.id, Job.JobStatus.COMPLETED if success else Job.JobStatus.FAILED)

    def sudo(self):
        """Initializes the Api Driver using `root` credentials.
        """

        # Initialize API Driver
        api.Driver.initialize(
            str(self.JSONData["api"]["url"]),
            str(self.JSONData["api"]["key_root"]),
            True
        )

    def unsudo(self):
        """Initializes the Api Driver using `user` credentials.
        """

        # Initialize API Driver
        api.Driver.initialize(
            str(self.JSONData["api"]["url"]),
            str(self.JSONData["api"]["key_user"]),
            True
        )

Class variables

var Container
var Context
var Event
var Files
var Function
var FunctionNamespace
var JSONData
var Job
var Parameters
var Storage
var Trigger
var User

Methods

def job_end(self, success: bool = True)

Sets the current job to either COMPLETED or FAILED depending of the success value.

Args

success : bool
True if the job has completed successfully, False otherwise
Expand source code
def job_end(self, success: bool = True):
    """Sets the current job to either COMPLETED or FAILED depending of the `success` value.

    Args:
        success (bool): True if the job has completed successfully, False otherwise
    """

    if self.Job is not None:
        Job.set_status(self.Job.id, Job.JobStatus.COMPLETED if success else Job.JobStatus.FAILED)
def job_init(self)

Sets the current job (if any) to status RUNNING.

If the job does not exist, one job will be created

Expand source code
def job_init(self):
    """Sets the current job (if any) to status RUNNING.

    If the job does not exist, one job will be created
    """

    if self.Job is None:
        self.Job = Job.create()
    if self.Job is not None:
        Job.set_status(self.Job.id, Job.JobStatus.RUNNING)
def log(self, message, message_type=JobMessageType.INFO, file_id=None)

This main logging function will log messages by the following cryteria:

  • If this is being executed inside Nuclio, it will add a message to the Nuclio function log

  • Else, it will print the message by console

  • If the current Job is not None, it will add the message to the Job Message table

Args

message : str
The message to log
message_type : JobMessage.JobMessageType
The log level of the message
file_id : int
It will be used to relate the Job Message entry with an specific file
Expand source code
def log(self, message, message_type = JobMessage.JobMessageType.INFO, file_id = None):
    """This main logging function will log messages by the following cryteria:

     * If this is being executed inside Nuclio, it will add a message to the Nuclio function log

     * Else, it will print the message by console

     * If the current Job is not None, it will add the message to the Job Message table

    Args:
        message (str): The message to log
        message_type (JobMessage.JobMessageType): The log level of the message
        file_id (int): It will be used to relate the Job Message entry with an specific file
    """

    if self.Context is not None:
        nuclio_message = "[" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
        if message_type == JobMessage.JobMessageType.ERROR:
            self.Context.logger.error(nuclio_message)
        if message_type == JobMessage.JobMessageType.WARNING:
            self.Context.logger.warn(nuclio_message)
        if message_type == JobMessage.JobMessageType.INFO:
            self.Context.logger.info(nuclio_message)
        if message_type == JobMessage.JobMessageType.DEBUG:
            self.Context.logger.debug(nuclio_message)
    else:
        console_message = "[" + message_type.name + "] [" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
        print(console_message)

    if self.Job is not None:
        return JobMessage.add(self.Job.id, message, message_type, file_id)
def parameter(self, parameter_name)

Get the value of a user input parameter.

Args

parameter_name : str
The user parameter name whose value we want

Returns

object
The value of the user parameter if exists
Expand source code
def parameter(self, parameter_name):
    """Get the value of a user input parameter.

    Args:
        parameter_name (str): The user parameter name whose value we want

    Returns:
        object: The value of the user parameter if exists
    """

    if parameter_name in self.Parameters:
        return self.Parameters[parameter_name]
    return None
def sudo(self)

Initializes the Api Driver using root credentials.

Expand source code
def sudo(self):
    """Initializes the Api Driver using `root` credentials.
    """

    # Initialize API Driver
    api.Driver.initialize(
        str(self.JSONData["api"]["url"]),
        str(self.JSONData["api"]["key_root"]),
        True
    )
def unsudo(self)

Initializes the Api Driver using user credentials.

Expand source code
def unsudo(self):
    """Initializes the Api Driver using `user` credentials.
    """

    # Initialize API Driver
    api.Driver.initialize(
        str(self.JSONData["api"]["url"]),
        str(self.JSONData["api"]["key_user"]),
        True
    )