Module libnova.common.nuclio.Request
Expand source code
#!/usr/bin/env python
# coding: utf-8
import json
from libnova import common
from libnova.common import api
from libnova.common.api import Driver, Storage, Container, File, Job, User, Trigger, Function, JobMessage
from libnova.common.filesystem import S3
from libnova.common.filesystem.S3 import Storage as S3Storage
__instance = None
class Request:
global __instance
"""Nuclio Request Helper.
It has some built-in functions to help during the flow of a function
Some examples are:
* Start the job related to the function
* Add log messages
* Automatically retrieve the related files using the API
Args:
context (object): The Nuclio context object.
event (str): The Nuclio event object.
silent (bool): If silent is True, it will skip the logging of the Nuclio request to either console or log system.
Attributes:
JSONData (dict): Function request body as a dict.
Storage (libnova.common.api.Storage): Storage related to the current function execution
Container (libnova.common.api.Container): Container related to the current function execution
User (libnova.common.api.User): User that has triggered the current function
Function (libnova.common.api.Function): The function that is being ran
Trigger (libnova.common.api.Trigger): The trigger that made this function be called
Job (libnova.common.api.Job): The job related to the current function execution
Files (list[libnova.common.api.File]): List of files related to the current function execution
Parameters (dict): Extra input parameters filled by the user when calling the current function
Context (object): Nuclio context object
Context (event): Nuclio event object
Returns:
Request: A Request Helper instance
"""
JSONData = None
Storage = None
Container = None
User = None
Function = None
Trigger = None
Job = None
Parameters = {}
Files = []
# Nuclio data
Context = None
Event = None
# Function parameters
FunctionNamespace = "function_data"
def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)
def __init__(self, context, event, silent=True):
# Set self as instance
__instance = self
self.Context = context
self.Event = event
self.__cleanup()
self.__process(event.body)
if not silent:
try:
self.log(" - Processing event - " + "\n" + str(event.body.decode("utf-8")), JobMessage.JobMessageType.DEBUG)
except:
self.log(" - Processing event - " + "\n" + event.body, JobMessage.JobMessageType.DEBUG)
def __cleanup(self):
"""Restores the variables to initial values to ensure no remaining data is kept between executions
"""
self.JSONData = None
self.Storage : Storage = None
self.Container : Container = None
self.User : User = None
self.Function : Function = None
self.Trigger : Trigger = None
self.Job : Job = None
self.Files : list(File) = []
self.Parameters : dict = {}
def __process(self, request_body):
"""Retrieves all the information to the main class attributes
"""
try:
self.JSONData = json.loads(request_body)
# Get main data as "sudo" to avoid permission issues
self.sudo()
self.Parameters.update(self.JSONData["function_params"].items())
if "job" in self.JSONData[self.FunctionNamespace]:
self.Job = Job.get(self.JSONData[self.FunctionNamespace]["job"]["id"])
if self.Job is not None:
if self.Job.status == Job.JobStatus.FAILED.name or self.Job.status == Job.JobStatus.COMPLETED.name:
self.Job = None
raise ValueError('The job has already been completed, skipping...')
if "container" in self.JSONData[self.FunctionNamespace]:
if Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"]) is not None:
self.Container = Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"])
if "user" in self.JSONData[self.FunctionNamespace]:
self.User = User.get(self.JSONData[self.FunctionNamespace]["user"]["id"])
if "trigger" in self.JSONData[self.FunctionNamespace]:
self.Trigger = Trigger.get(self.JSONData[self.FunctionNamespace]["trigger"]["id"])
if "function" in self.JSONData[self.FunctionNamespace]:
self.Function = Function.get(self.JSONData[self.FunctionNamespace]["function"]["id"])
if self.Container is not None:
self.Storage = Storage.get(self.Container.storage_id)
if self.Storage is not None:
# Platform S3 Storage Driver
S3Storage.initialize_storage(self.Storage)
if "ids" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["ids"]) > 0:
for file_id in self.JSONData[self.FunctionNamespace]["files"]["ids"]:
file = File.get(file_id)
if file is not None:
self.Files.append(file)
if len(self.Files) == 0:
if self.Container is not None:
if "paths" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["paths"]) > 0:
for file_path in self.JSONData[self.FunctionNamespace]["files"]["paths"]:
self.Files.append(File.get_by_path(self.Container.id, file_path))
pass
# But later on switch to the user API credentials
self.unsudo()
except Exception as e:
# Input is not a JSON, Ehe te nandayo!
print(e)
import traceback
traceback.print_exc()
pass
def parameter(self, parameter_name):
"""Get the value of a user input parameter.
Args:
parameter_name (str): The user parameter name whose value we want
Returns:
object: The value of the user parameter if exists
"""
if parameter_name in self.Parameters:
return self.Parameters[parameter_name]
return None
def log(self, message, message_type = JobMessage.JobMessageType.INFO, file_id = None):
"""This main logging function will log messages by the following cryteria:
* If this is being executed inside Nuclio, it will add a message to the Nuclio function log
* Else, it will print the message by console
* If the current Job is not None, it will add the message to the Job Message table
Args:
message (str): The message to log
message_type (JobMessage.JobMessageType): The log level of the message
file_id (int): It will be used to relate the Job Message entry with an specific file
"""
if self.Context is not None:
nuclio_message = "[" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
if message_type == JobMessage.JobMessageType.ERROR:
self.Context.logger.error(nuclio_message)
if message_type == JobMessage.JobMessageType.WARNING:
self.Context.logger.warn(nuclio_message)
if message_type == JobMessage.JobMessageType.INFO:
self.Context.logger.info(nuclio_message)
if message_type == JobMessage.JobMessageType.DEBUG:
self.Context.logger._debug(nuclio_message)
else:
console_message = "[" + message_type.name + "] [" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
print(console_message)
if self.Job is not None:
return JobMessage.add(self.Job.id, message, message_type, file_id)
def job_init(self, function_id = None, container_id = None):
"""Sets the current job (if any) to status RUNNING.
If the job does not exist, one job will be created
"""
if self.Job is None:
if self.Function is not None:
self.Job = Job.create(
function_id if function_id is not None else self.Function.id,
container_id if container_id is not None else (
self.Container.id if self.Container is not None else None
)
)
if self.Job is not None:
# Likely to be ran using a command line, print the new job
print(f"Created job #{str(self.Job.id)}")
if self.Job is not None:
job_init_result = Job.set_status(self.Job.id, Job.JobStatus.RUNNING)
if 'success' in job_init_result and job_init_result['success'] is False:
raise ValueError(f'The job #{str(self.Job.id)} cannot be initialized, skipping...')
return job_init_result
def job_end(self, success: bool = True):
"""Sets the current job to either COMPLETED or FAILED depending of the `success` value.
Args:
success (bool): True if the job has completed successfully, False otherwise
"""
if self.Job is not None:
Job.set_status(self.Job.id, Job.JobStatus.COMPLETED if success else Job.JobStatus.FAILED)
def job_asset_add(self, file : File):
"""Adds a `File` to the current `Job`.
Args:
file (File): The `File` object to relate to the current `Job`
"""
if self.Job is not None:
Job.add_asset(self.Job.id, file)
def sudo(self):
"""Initializes the Api Driver using `root` credentials.
"""
# Initialize API Driver
api.Driver.initialize(
str(self.JSONData["api"]["url"]),
str(self.JSONData["api"]["key_root"]),
True
)
def unsudo(self):
"""Initializes the Api Driver using `user` credentials.
"""
# Initialize API Driver
api.Driver.initialize(
str(self.JSONData["api"]["url"]),
str(self.JSONData["api"]["key_user"]),
True
)
def add_dependencies(self, packages, check_and_import = False):
"""Ensure that the given dependencies are correctly installed and imported
"""
import pip
import importlib
# Import the package to check if it is installed and if not install it
if check_and_import:
for package in packages:
try:
importlib.import_module(package)
except ImportError:
pip.main(['install', package])
importlib.import_module(package)
# Just install the package to ensure it actually is installed
else:
for package in packages:
pip.main(['install', package])
def add_dependencies_old(self, packages, check_and_import = False):
"""Ensure that the given dependencies are correctly installed and imported
"""
import pip
import importlib
# Import the package to check if it is installed and if not install it
if check_and_import:
for package in packages:
try:
__import__(package)
except ImportError:
pip.main(['install', package])
__import__(package)
# Just install the package to ensure it actually is installed
else:
for package in packages:
pip.main(['install', package])
if __name__ == "__main__":
print('This file cannot be executed directly!')
Classes
class Request (context, event, silent=True)
-
Expand source code
class Request: global __instance """Nuclio Request Helper. It has some built-in functions to help during the flow of a function Some examples are: * Start the job related to the function * Add log messages * Automatically retrieve the related files using the API Args: context (object): The Nuclio context object. event (str): The Nuclio event object. silent (bool): If silent is True, it will skip the logging of the Nuclio request to either console or log system. Attributes: JSONData (dict): Function request body as a dict. Storage (libnova.common.api.Storage): Storage related to the current function execution Container (libnova.common.api.Container): Container related to the current function execution User (libnova.common.api.User): User that has triggered the current function Function (libnova.common.api.Function): The function that is being ran Trigger (libnova.common.api.Trigger): The trigger that made this function be called Job (libnova.common.api.Job): The job related to the current function execution Files (list[libnova.common.api.File]): List of files related to the current function execution Parameters (dict): Extra input parameters filled by the user when calling the current function Context (object): Nuclio context object Context (event): Nuclio event object Returns: Request: A Request Helper instance """ JSONData = None Storage = None Container = None User = None Function = None Trigger = None Job = None Parameters = {} Files = [] # Nuclio data Context = None Event = None # Function parameters FunctionNamespace = "function_data" def __str__(self): return str(self.__class__) + ": " + str(self.__dict__) def __init__(self, context, event, silent=True): # Set self as instance __instance = self self.Context = context self.Event = event self.__cleanup() self.__process(event.body) if not silent: try: self.log(" - Processing event - " + "\n" + str(event.body.decode("utf-8")), JobMessage.JobMessageType.DEBUG) except: self.log(" - Processing event - " + "\n" + event.body, JobMessage.JobMessageType.DEBUG) def __cleanup(self): """Restores the variables to initial values to ensure no remaining data is kept between executions """ self.JSONData = None self.Storage : Storage = None self.Container : Container = None self.User : User = None self.Function : Function = None self.Trigger : Trigger = None self.Job : Job = None self.Files : list(File) = [] self.Parameters : dict = {} def __process(self, request_body): """Retrieves all the information to the main class attributes """ try: self.JSONData = json.loads(request_body) # Get main data as "sudo" to avoid permission issues self.sudo() self.Parameters.update(self.JSONData["function_params"].items()) if "job" in self.JSONData[self.FunctionNamespace]: self.Job = Job.get(self.JSONData[self.FunctionNamespace]["job"]["id"]) if self.Job is not None: if self.Job.status == Job.JobStatus.FAILED.name or self.Job.status == Job.JobStatus.COMPLETED.name: self.Job = None raise ValueError('The job has already been completed, skipping...') if "container" in self.JSONData[self.FunctionNamespace]: if Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"]) is not None: self.Container = Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"]) if "user" in self.JSONData[self.FunctionNamespace]: self.User = User.get(self.JSONData[self.FunctionNamespace]["user"]["id"]) if "trigger" in self.JSONData[self.FunctionNamespace]: self.Trigger = Trigger.get(self.JSONData[self.FunctionNamespace]["trigger"]["id"]) if "function" in self.JSONData[self.FunctionNamespace]: self.Function = Function.get(self.JSONData[self.FunctionNamespace]["function"]["id"]) if self.Container is not None: self.Storage = Storage.get(self.Container.storage_id) if self.Storage is not None: # Platform S3 Storage Driver S3Storage.initialize_storage(self.Storage) if "ids" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["ids"]) > 0: for file_id in self.JSONData[self.FunctionNamespace]["files"]["ids"]: file = File.get(file_id) if file is not None: self.Files.append(file) if len(self.Files) == 0: if self.Container is not None: if "paths" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["paths"]) > 0: for file_path in self.JSONData[self.FunctionNamespace]["files"]["paths"]: self.Files.append(File.get_by_path(self.Container.id, file_path)) pass # But later on switch to the user API credentials self.unsudo() except Exception as e: # Input is not a JSON, Ehe te nandayo! print(e) import traceback traceback.print_exc() pass def parameter(self, parameter_name): """Get the value of a user input parameter. Args: parameter_name (str): The user parameter name whose value we want Returns: object: The value of the user parameter if exists """ if parameter_name in self.Parameters: return self.Parameters[parameter_name] return None def log(self, message, message_type = JobMessage.JobMessageType.INFO, file_id = None): """This main logging function will log messages by the following cryteria: * If this is being executed inside Nuclio, it will add a message to the Nuclio function log * Else, it will print the message by console * If the current Job is not None, it will add the message to the Job Message table Args: message (str): The message to log message_type (JobMessage.JobMessageType): The log level of the message file_id (int): It will be used to relate the Job Message entry with an specific file """ if self.Context is not None: nuclio_message = "[" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message if message_type == JobMessage.JobMessageType.ERROR: self.Context.logger.error(nuclio_message) if message_type == JobMessage.JobMessageType.WARNING: self.Context.logger.warn(nuclio_message) if message_type == JobMessage.JobMessageType.INFO: self.Context.logger.info(nuclio_message) if message_type == JobMessage.JobMessageType.DEBUG: self.Context.logger._debug(nuclio_message) else: console_message = "[" + message_type.name + "] [" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message print(console_message) if self.Job is not None: return JobMessage.add(self.Job.id, message, message_type, file_id) def job_init(self, function_id = None, container_id = None): """Sets the current job (if any) to status RUNNING. If the job does not exist, one job will be created """ if self.Job is None: if self.Function is not None: self.Job = Job.create( function_id if function_id is not None else self.Function.id, container_id if container_id is not None else ( self.Container.id if self.Container is not None else None ) ) if self.Job is not None: # Likely to be ran using a command line, print the new job print(f"Created job #{str(self.Job.id)}") if self.Job is not None: job_init_result = Job.set_status(self.Job.id, Job.JobStatus.RUNNING) if 'success' in job_init_result and job_init_result['success'] is False: raise ValueError(f'The job #{str(self.Job.id)} cannot be initialized, skipping...') return job_init_result def job_end(self, success: bool = True): """Sets the current job to either COMPLETED or FAILED depending of the `success` value. Args: success (bool): True if the job has completed successfully, False otherwise """ if self.Job is not None: Job.set_status(self.Job.id, Job.JobStatus.COMPLETED if success else Job.JobStatus.FAILED) def job_asset_add(self, file : File): """Adds a `File` to the current `Job`. Args: file (File): The `File` object to relate to the current `Job` """ if self.Job is not None: Job.add_asset(self.Job.id, file) def sudo(self): """Initializes the Api Driver using `root` credentials. """ # Initialize API Driver api.Driver.initialize( str(self.JSONData["api"]["url"]), str(self.JSONData["api"]["key_root"]), True ) def unsudo(self): """Initializes the Api Driver using `user` credentials. """ # Initialize API Driver api.Driver.initialize( str(self.JSONData["api"]["url"]), str(self.JSONData["api"]["key_user"]), True ) def add_dependencies(self, packages, check_and_import = False): """Ensure that the given dependencies are correctly installed and imported """ import pip import importlib # Import the package to check if it is installed and if not install it if check_and_import: for package in packages: try: importlib.import_module(package) except ImportError: pip.main(['install', package]) importlib.import_module(package) # Just install the package to ensure it actually is installed else: for package in packages: pip.main(['install', package]) def add_dependencies_old(self, packages, check_and_import = False): """Ensure that the given dependencies are correctly installed and imported """ import pip import importlib # Import the package to check if it is installed and if not install it if check_and_import: for package in packages: try: __import__(package) except ImportError: pip.main(['install', package]) __import__(package) # Just install the package to ensure it actually is installed else: for package in packages: pip.main(['install', package])
Class variables
var Container
var Context
var Event
var Files
var Function
var FunctionNamespace
var JSONData
var Job
var Parameters
var Storage
var Trigger
var User
Methods
def add_dependencies(self, packages, check_and_import=False)
-
Ensure that the given dependencies are correctly installed and imported
Expand source code
def add_dependencies(self, packages, check_and_import = False): """Ensure that the given dependencies are correctly installed and imported """ import pip import importlib # Import the package to check if it is installed and if not install it if check_and_import: for package in packages: try: importlib.import_module(package) except ImportError: pip.main(['install', package]) importlib.import_module(package) # Just install the package to ensure it actually is installed else: for package in packages: pip.main(['install', package])
def add_dependencies_old(self, packages, check_and_import=False)
-
Ensure that the given dependencies are correctly installed and imported
Expand source code
def add_dependencies_old(self, packages, check_and_import = False): """Ensure that the given dependencies are correctly installed and imported """ import pip import importlib # Import the package to check if it is installed and if not install it if check_and_import: for package in packages: try: __import__(package) except ImportError: pip.main(['install', package]) __import__(package) # Just install the package to ensure it actually is installed else: for package in packages: pip.main(['install', package])
def job_asset_add(self, file: libnova.common.api.File)
-
Adds a
File
to the currentJob
.Args
file
:File
- The
File
object to relate to the currentJob
Expand source code
def job_asset_add(self, file : File): """Adds a `File` to the current `Job`. Args: file (File): The `File` object to relate to the current `Job` """ if self.Job is not None: Job.add_asset(self.Job.id, file)
def job_end(self, success: bool = True)
-
Sets the current job to either COMPLETED or FAILED depending of the
success
value.Args
success
:bool
- True if the job has completed successfully, False otherwise
Expand source code
def job_end(self, success: bool = True): """Sets the current job to either COMPLETED or FAILED depending of the `success` value. Args: success (bool): True if the job has completed successfully, False otherwise """ if self.Job is not None: Job.set_status(self.Job.id, Job.JobStatus.COMPLETED if success else Job.JobStatus.FAILED)
def job_init(self, function_id=None, container_id=None)
-
Sets the current job (if any) to status RUNNING.
If the job does not exist, one job will be created
Expand source code
def job_init(self, function_id = None, container_id = None): """Sets the current job (if any) to status RUNNING. If the job does not exist, one job will be created """ if self.Job is None: if self.Function is not None: self.Job = Job.create( function_id if function_id is not None else self.Function.id, container_id if container_id is not None else ( self.Container.id if self.Container is not None else None ) ) if self.Job is not None: # Likely to be ran using a command line, print the new job print(f"Created job #{str(self.Job.id)}") if self.Job is not None: job_init_result = Job.set_status(self.Job.id, Job.JobStatus.RUNNING) if 'success' in job_init_result and job_init_result['success'] is False: raise ValueError(f'The job #{str(self.Job.id)} cannot be initialized, skipping...') return job_init_result
def log(self, message, message_type=JobMessageType.INFO, file_id=None)
-
This main logging function will log messages by the following cryteria:
-
If this is being executed inside Nuclio, it will add a message to the Nuclio function log
-
Else, it will print the message by console
-
If the current Job is not None, it will add the message to the Job Message table
Args
message
:str
- The message to log
message_type
:JobMessage.JobMessageType
- The log level of the message
file_id
:int
- It will be used to relate the Job Message entry with an specific file
Expand source code
def log(self, message, message_type = JobMessage.JobMessageType.INFO, file_id = None): """This main logging function will log messages by the following cryteria: * If this is being executed inside Nuclio, it will add a message to the Nuclio function log * Else, it will print the message by console * If the current Job is not None, it will add the message to the Job Message table Args: message (str): The message to log message_type (JobMessage.JobMessageType): The log level of the message file_id (int): It will be used to relate the Job Message entry with an specific file """ if self.Context is not None: nuclio_message = "[" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message if message_type == JobMessage.JobMessageType.ERROR: self.Context.logger.error(nuclio_message) if message_type == JobMessage.JobMessageType.WARNING: self.Context.logger.warn(nuclio_message) if message_type == JobMessage.JobMessageType.INFO: self.Context.logger.info(nuclio_message) if message_type == JobMessage.JobMessageType.DEBUG: self.Context.logger._debug(nuclio_message) else: console_message = "[" + message_type.name + "] [" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message print(console_message) if self.Job is not None: return JobMessage.add(self.Job.id, message, message_type, file_id)
-
def parameter(self, parameter_name)
-
Get the value of a user input parameter.
Args
parameter_name
:str
- The user parameter name whose value we want
Returns
object
- The value of the user parameter if exists
Expand source code
def parameter(self, parameter_name): """Get the value of a user input parameter. Args: parameter_name (str): The user parameter name whose value we want Returns: object: The value of the user parameter if exists """ if parameter_name in self.Parameters: return self.Parameters[parameter_name] return None
def sudo(self)
-
Initializes the Api Driver using
root
credentials.Expand source code
def sudo(self): """Initializes the Api Driver using `root` credentials. """ # Initialize API Driver api.Driver.initialize( str(self.JSONData["api"]["url"]), str(self.JSONData["api"]["key_root"]), True )
def unsudo(self)
-
Initializes the Api Driver using
user
credentials.Expand source code
def unsudo(self): """Initializes the Api Driver using `user` credentials. """ # Initialize API Driver api.Driver.initialize( str(self.JSONData["api"]["url"]), str(self.JSONData["api"]["key_user"]), True )