Module libnova.common.nuclio.Request
Expand source code
#!/usr/bin/env python
# coding: utf-8
import os
import json
from libnova import common
from libnova.common import api
from libnova.common.api import Driver, Storage, Container, File, Job, User, Trigger, Function, JobMessage
from libnova.common.filesystem import S3
class Request:
"""Nuclio Request Helper.
It has some built-in functions to help during the flow of a function
Some examples are:
* Start the job related to the function
* Add log messages
* Automatically retrieve the related files using the API
Args:
context (object): The Nuclio context object.
event (str): The Nuclio event object.
silent (bool): If silent is True, it will skip the logging of the Nuclio request to either console or log system.
Attributes:
JSONData (dict): Function request body as a dict.
Storage (libnova.common.api.Storage): Storage related to the current function execution
Container (libnova.common.api.Container): Container related to the current function execution
User (libnova.common.api.User): User that has triggered the current function
Function (libnova.common.api.Function): The function that is being ran
Trigger (libnova.common.api.Trigger): The trigger that made this function be called
Job (libnova.common.api.Job): The job related to the current function execution
Files (list[libnova.common.api.File]): List of files related to the current function execution
Parameters (dict): Extra input parameters filled by the user when calling the current function
Context (object): Nuclio context object
Context (event): Nuclio event object
Returns:
Request: A Request Helper instance
"""
JSONData = None
Storage = None
Container = None
User = None
Function = None
Trigger = None
Job = None
Parameters = {}
Files = []
# Nuclio data
Context = None
Event = None
# Function parameters
FunctionNamespace = "function_data"
def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)
def __init__(self, context, event, silent=False):
self.Context = context
self.Event = event
self.__cleanup()
self.__process(event.body)
if not silent:
try:
self.log(" - Processing event - " + "\n" + event.body.decode("utf-8"), JobMessage.JobMessageType.DEBUG)
except:
self.log(" - Processing event - " + "\n" + event.body, JobMessage.JobMessageType.DEBUG)
def __cleanup(self):
"""Restores the variables to initial values to ensure no remaining data is kept between executions
"""
self.JSONData = None
self.Storage : Storage = None
self.Container : Container = None
self.User : User = None
self.Function : Function = None
self.Trigger : Trigger = None
self.Job : Job = None
self.Files : list(File) = []
self.Parameters : dict = {}
def __process(self, request_body):
"""Retrieves all the information to the main class attributes
"""
try:
self.JSONData = json.loads(request_body)
# Get main data as "sudo" to avoid permission issues
self.sudo()
self.Parameters.update(self.JSONData["function_params"].items())
if "container" in self.JSONData[self.FunctionNamespace]:
if Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"]) is not None:
self.Container = Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"])
if "user" in self.JSONData[self.FunctionNamespace]:
self.User = User.get(self.JSONData[self.FunctionNamespace]["user"]["id"])
if "job" in self.JSONData[self.FunctionNamespace]:
self.Job = Job.get(self.JSONData[self.FunctionNamespace]["job"]["id"])
if "trigger" in self.JSONData[self.FunctionNamespace]:
self.Trigger = Trigger.get(self.JSONData[self.FunctionNamespace]["trigger"]["id"])
if "function" in self.JSONData[self.FunctionNamespace]:
self.Function = Function.get(self.JSONData[self.FunctionNamespace]["function"]["id"])
if self.Container is not None:
self.Storage = Storage.get(self.Container.storage_id)
if self.Storage is not None:
# Platform S3 Storage Driver
S3.Storage.initialize_storage(self.Storage)
if "ids" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["ids"]) > 0:
for file_id in self.JSONData[self.FunctionNamespace]["files"]["ids"]:
file = File.get(file_id)
if file is not None:
self.Files.append(file)
if len(self.Files) == 0:
if self.Container is not None:
if "paths" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["paths"]) > 0:
for file_path in self.JSONData[self.FunctionNamespace]["files"]["paths"]:
self.Files.append(File.get_by_path(self.Container.id, file_path))
pass
# But later on switch to the user API credentials
self.unsudo()
except Exception as e:
# Input is not a JSON, Ehe te nandayo!
pass
def parameter(self, parameter_name):
"""Get the value of a user input parameter.
Args:
parameter_name (str): The user parameter name whose value we want
Returns:
object: The value of the user parameter if exists
"""
if parameter_name in self.Parameters:
return self.Parameters[parameter_name]
return None
def log(self, message, message_type = JobMessage.JobMessageType.INFO, file_id = None):
"""This main logging function will log messages by the following cryteria:
* If this is being executed inside Nuclio, it will add a message to the Nuclio function log
* Else, it will print the message by console
* If the current Job is not None, it will add the message to the Job Message table
Args:
message (str): The message to log
message_type (JobMessage.JobMessageType): The log level of the message
file_id (int): It will be used to relate the Job Message entry with an specific file
"""
if self.Context is not None:
nuclio_message = "[" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
if message_type == JobMessage.JobMessageType.ERROR:
self.Context.logger.error(nuclio_message)
if message_type == JobMessage.JobMessageType.WARNING:
self.Context.logger.warn(nuclio_message)
if message_type == JobMessage.JobMessageType.INFO:
self.Context.logger.info(nuclio_message)
if message_type == JobMessage.JobMessageType.DEBUG:
self.Context.logger.debug(nuclio_message)
else:
console_message = "[" + message_type.name + "] [" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message
print(console_message)
if self.Job is not None:
return JobMessage.add(self.Job.id, message, message_type, file_id)
def job_init(self):
"""Sets the current job (if any) to status RUNNING.
If the job does not exist, one job will be created
"""
if self.Job is None:
self.Job = Job.create()
if self.Job is not None:
Job.set_status(self.Job.id, Job.JobStatus.RUNNING)
def job_end(self, success: bool = True):
"""Sets the current job to either COMPLETED or FAILED depending of the `success` value.
Args:
success (bool): True if the job has completed successfully, False otherwise
"""
if self.Job is not None:
Job.set_status(self.Job.id, Job.JobStatus.COMPLETED if success else Job.JobStatus.FAILED)
def sudo(self):
"""Initializes the Api Driver using `root` credentials.
"""
# Initialize API Driver
api.Driver.initialize(
str(self.JSONData["api"]["url"]),
str(self.JSONData["api"]["key_root"]),
True
)
def unsudo(self):
"""Initializes the Api Driver using `user` credentials.
"""
# Initialize API Driver
api.Driver.initialize(
str(self.JSONData["api"]["url"]),
str(self.JSONData["api"]["key_user"]),
True
)
if __name__ == "__main__":
print('This file cannot be executed directly!')
Classes
class Request (context, event, silent=False)
-
Nuclio Request Helper.
It has some built-in functions to help during the flow of a function Some examples are:
-
Start the job related to the function
-
Add log messages
-
Automatically retrieve the related files using the API
Args
context
:object
- The Nuclio context object.
event
:str
- The Nuclio event object.
silent
:bool
- If silent is True, it will skip the logging of the Nuclio request to either console or log system.
Attributes
JSONData
:dict
- Function request body as a dict.
Storage
:libnova.common.api.Storage
- Storage related to the current function execution
Container
:libnova.common.api.Container
- Container related to the current function execution
User
:libnova.common.api.User
- User that has triggered the current function
Function
:libnova.common.api.Function
- The function that is being ran
Trigger
:libnova.common.api.Trigger
- The trigger that made this function be called
Job
:libnova.common.api.Job
- The job related to the current function execution
Files
:list[libnova.common.api.File]
- List of files related to the current function execution
Parameters
:dict
- Extra input parameters filled by the user when calling the current function
Context
:object
- Nuclio context object
Context
:event
- Nuclio event object
Returns
Request
- A Request Helper instance
Expand source code
class Request: """Nuclio Request Helper. It has some built-in functions to help during the flow of a function Some examples are: * Start the job related to the function * Add log messages * Automatically retrieve the related files using the API Args: context (object): The Nuclio context object. event (str): The Nuclio event object. silent (bool): If silent is True, it will skip the logging of the Nuclio request to either console or log system. Attributes: JSONData (dict): Function request body as a dict. Storage (libnova.common.api.Storage): Storage related to the current function execution Container (libnova.common.api.Container): Container related to the current function execution User (libnova.common.api.User): User that has triggered the current function Function (libnova.common.api.Function): The function that is being ran Trigger (libnova.common.api.Trigger): The trigger that made this function be called Job (libnova.common.api.Job): The job related to the current function execution Files (list[libnova.common.api.File]): List of files related to the current function execution Parameters (dict): Extra input parameters filled by the user when calling the current function Context (object): Nuclio context object Context (event): Nuclio event object Returns: Request: A Request Helper instance """ JSONData = None Storage = None Container = None User = None Function = None Trigger = None Job = None Parameters = {} Files = [] # Nuclio data Context = None Event = None # Function parameters FunctionNamespace = "function_data" def __str__(self): return str(self.__class__) + ": " + str(self.__dict__) def __init__(self, context, event, silent=False): self.Context = context self.Event = event self.__cleanup() self.__process(event.body) if not silent: try: self.log(" - Processing event - " + "\n" + event.body.decode("utf-8"), JobMessage.JobMessageType.DEBUG) except: self.log(" - Processing event - " + "\n" + event.body, JobMessage.JobMessageType.DEBUG) def __cleanup(self): """Restores the variables to initial values to ensure no remaining data is kept between executions """ self.JSONData = None self.Storage : Storage = None self.Container : Container = None self.User : User = None self.Function : Function = None self.Trigger : Trigger = None self.Job : Job = None self.Files : list(File) = [] self.Parameters : dict = {} def __process(self, request_body): """Retrieves all the information to the main class attributes """ try: self.JSONData = json.loads(request_body) # Get main data as "sudo" to avoid permission issues self.sudo() self.Parameters.update(self.JSONData["function_params"].items()) if "container" in self.JSONData[self.FunctionNamespace]: if Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"]) is not None: self.Container = Container.get(self.JSONData[self.FunctionNamespace]["container"]["id"]) if "user" in self.JSONData[self.FunctionNamespace]: self.User = User.get(self.JSONData[self.FunctionNamespace]["user"]["id"]) if "job" in self.JSONData[self.FunctionNamespace]: self.Job = Job.get(self.JSONData[self.FunctionNamespace]["job"]["id"]) if "trigger" in self.JSONData[self.FunctionNamespace]: self.Trigger = Trigger.get(self.JSONData[self.FunctionNamespace]["trigger"]["id"]) if "function" in self.JSONData[self.FunctionNamespace]: self.Function = Function.get(self.JSONData[self.FunctionNamespace]["function"]["id"]) if self.Container is not None: self.Storage = Storage.get(self.Container.storage_id) if self.Storage is not None: # Platform S3 Storage Driver S3.Storage.initialize_storage(self.Storage) if "ids" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["ids"]) > 0: for file_id in self.JSONData[self.FunctionNamespace]["files"]["ids"]: file = File.get(file_id) if file is not None: self.Files.append(file) if len(self.Files) == 0: if self.Container is not None: if "paths" in self.JSONData[self.FunctionNamespace]["files"] and len(self.JSONData[self.FunctionNamespace]["files"]["paths"]) > 0: for file_path in self.JSONData[self.FunctionNamespace]["files"]["paths"]: self.Files.append(File.get_by_path(self.Container.id, file_path)) pass # But later on switch to the user API credentials self.unsudo() except Exception as e: # Input is not a JSON, Ehe te nandayo! pass def parameter(self, parameter_name): """Get the value of a user input parameter. Args: parameter_name (str): The user parameter name whose value we want Returns: object: The value of the user parameter if exists """ if parameter_name in self.Parameters: return self.Parameters[parameter_name] return None def log(self, message, message_type = JobMessage.JobMessageType.INFO, file_id = None): """This main logging function will log messages by the following cryteria: * If this is being executed inside Nuclio, it will add a message to the Nuclio function log * Else, it will print the message by console * If the current Job is not None, it will add the message to the Job Message table Args: message (str): The message to log message_type (JobMessage.JobMessageType): The log level of the message file_id (int): It will be used to relate the Job Message entry with an specific file """ if self.Context is not None: nuclio_message = "[" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message if message_type == JobMessage.JobMessageType.ERROR: self.Context.logger.error(nuclio_message) if message_type == JobMessage.JobMessageType.WARNING: self.Context.logger.warn(nuclio_message) if message_type == JobMessage.JobMessageType.INFO: self.Context.logger.info(nuclio_message) if message_type == JobMessage.JobMessageType.DEBUG: self.Context.logger.debug(nuclio_message) else: console_message = "[" + message_type.name + "] [" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message print(console_message) if self.Job is not None: return JobMessage.add(self.Job.id, message, message_type, file_id) def job_init(self): """Sets the current job (if any) to status RUNNING. If the job does not exist, one job will be created """ if self.Job is None: self.Job = Job.create() if self.Job is not None: Job.set_status(self.Job.id, Job.JobStatus.RUNNING) def job_end(self, success: bool = True): """Sets the current job to either COMPLETED or FAILED depending of the `success` value. Args: success (bool): True if the job has completed successfully, False otherwise """ if self.Job is not None: Job.set_status(self.Job.id, Job.JobStatus.COMPLETED if success else Job.JobStatus.FAILED) def sudo(self): """Initializes the Api Driver using `root` credentials. """ # Initialize API Driver api.Driver.initialize( str(self.JSONData["api"]["url"]), str(self.JSONData["api"]["key_root"]), True ) def unsudo(self): """Initializes the Api Driver using `user` credentials. """ # Initialize API Driver api.Driver.initialize( str(self.JSONData["api"]["url"]), str(self.JSONData["api"]["key_user"]), True )
Class variables
var Container
var Context
var Event
var Files
var Function
var FunctionNamespace
var JSONData
var Job
var Parameters
var Storage
var Trigger
var User
Methods
def job_end(self, success: bool = True)
-
Sets the current job to either COMPLETED or FAILED depending of the
success
value.Args
success
:bool
- True if the job has completed successfully, False otherwise
Expand source code
def job_end(self, success: bool = True): """Sets the current job to either COMPLETED or FAILED depending of the `success` value. Args: success (bool): True if the job has completed successfully, False otherwise """ if self.Job is not None: Job.set_status(self.Job.id, Job.JobStatus.COMPLETED if success else Job.JobStatus.FAILED)
def job_init(self)
-
Sets the current job (if any) to status RUNNING.
If the job does not exist, one job will be created
Expand source code
def job_init(self): """Sets the current job (if any) to status RUNNING. If the job does not exist, one job will be created """ if self.Job is None: self.Job = Job.create() if self.Job is not None: Job.set_status(self.Job.id, Job.JobStatus.RUNNING)
def log(self, message, message_type=JobMessageType.INFO, file_id=None)
-
This main logging function will log messages by the following cryteria:
-
If this is being executed inside Nuclio, it will add a message to the Nuclio function log
-
Else, it will print the message by console
-
If the current Job is not None, it will add the message to the Job Message table
Args
message
:str
- The message to log
message_type
:JobMessage.JobMessageType
- The log level of the message
file_id
:int
- It will be used to relate the Job Message entry with an specific file
Expand source code
def log(self, message, message_type = JobMessage.JobMessageType.INFO, file_id = None): """This main logging function will log messages by the following cryteria: * If this is being executed inside Nuclio, it will add a message to the Nuclio function log * Else, it will print the message by console * If the current Job is not None, it will add the message to the Job Message table Args: message (str): The message to log message_type (JobMessage.JobMessageType): The log level of the message file_id (int): It will be used to relate the Job Message entry with an specific file """ if self.Context is not None: nuclio_message = "[" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message if message_type == JobMessage.JobMessageType.ERROR: self.Context.logger.error(nuclio_message) if message_type == JobMessage.JobMessageType.WARNING: self.Context.logger.warn(nuclio_message) if message_type == JobMessage.JobMessageType.INFO: self.Context.logger.info(nuclio_message) if message_type == JobMessage.JobMessageType.DEBUG: self.Context.logger.debug(nuclio_message) else: console_message = "[" + message_type.name + "] [" + self.JSONData[self.FunctionNamespace]["function"]["key"] + "] " + message print(console_message) if self.Job is not None: return JobMessage.add(self.Job.id, message, message_type, file_id)
-
def parameter(self, parameter_name)
-
Get the value of a user input parameter.
Args
parameter_name
:str
- The user parameter name whose value we want
Returns
object
- The value of the user parameter if exists
Expand source code
def parameter(self, parameter_name): """Get the value of a user input parameter. Args: parameter_name (str): The user parameter name whose value we want Returns: object: The value of the user parameter if exists """ if parameter_name in self.Parameters: return self.Parameters[parameter_name] return None
def sudo(self)
-
Initializes the Api Driver using
root
credentials.Expand source code
def sudo(self): """Initializes the Api Driver using `root` credentials. """ # Initialize API Driver api.Driver.initialize( str(self.JSONData["api"]["url"]), str(self.JSONData["api"]["key_root"]), True )
def unsudo(self)
-
Initializes the Api Driver using
user
credentials.Expand source code
def unsudo(self): """Initializes the Api Driver using `user` credentials. """ # Initialize API Driver api.Driver.initialize( str(self.JSONData["api"]["url"]), str(self.JSONData["api"]["key_user"]), True )
-