Module libnova.common.filesystem.S3.SeekableStream
Expand source code
#!/usr/bin/env python
# coding: utf-8
import cgi
import math
import time
import urllib.request
from sys import stderr
from io import IOBase
from libnova.common.filesystem import S3
from libnova.common.api import Storage
from libnova.common.filesystem.S3 import File
class SeekableStream(IOBase):
"""SeekableStream
This class provides a seekable Stream using presigned URLs and the range HTTP header
It will also take care of the 5GB limit of files, generating as much presigned URLs as needed
"""
storage: Storage = None
object_name: str = None
_pos: int = 0
_length: int = -1
_seekable: bool = True
_chunk_size: int = 4294967296 # 4GB per chunk; Max can be up to 5GB but 4GB is used to keep things safe
_chunks: dict = []
_debug: bool = False
_request_error_delay: int = -1
_buffer: bytes = None
_buffer_size: int = 8 * 1024
_buffer_init: int = 0
_buffer_end: int = 0
def __cleanup(self):
"""Restores the variables to initial values to ensure no remaining data is kept between executions
"""
self.storage: Storage = None
self.object_name: str = None
self._pos: int = 0
self._length: int = -1
self._seekable: bool = True
self._chunk_size: int = 4294967296
self._chunks: dict = []
self._debug: bool = False
self._request_error_delay: int = -1
self._buffer: bytes = None
self._buffer_size: int = 8 * 1024
self._buffer_init: int = 0
self._buffer_end: int = 0
def __init__(self, storage: Storage, object_name, repeat_time=-1, debug=False, buffer_size=0):
"""Get a seekable Stream using Presigned URLs and the range HTTP header
Args:
object_name (str): The S3 object key
repeat_time (int): In case of HTTP errors wait `_request_error_delay` seconds before trying again. Negative value or `None` disables retrying and simply passes on the exception (default behaviour).
"""
"""Allow a file accessible via HTTP to be used like a local file by utilities
that use `seek()` to read arbitrary parts of the file, such as `ZipFile`.
Seeking is done via the 'range: bytes=xx-yy' HTTP header.
Parameters
----------
url : str
A HTTP or HTTPS URL
name : str, optional
The filename of the file.
Will be filled from the Content-Disposition header if not provided.
_request_error_delay : int, optional
In case of HTTP errors wait `_request_error_delay` seconds before trying again.
Negative value or `None` disables retrying and simply passes on the exception (the default).
"""
self.__cleanup()
super().__init__()
self.storage = storage
self.object_name = object_name
self._request_error_delay = repeat_time
self._debug = debug
self.set_buffer(buffer_size)
object_head = File.head(self.storage.extra_data["bucket"], self.object_name)
if object_head is not None:
self._length = object_head['ContentLength']
if self._length < 1:
self._seekable = False
if self._length > self._chunk_size:
for i in range(0, int(math.ceil(self._length / self._chunk_size))):
range_init = i * self._chunk_size
range_end = (i+1)* self._chunk_size
if range_end > self._length:
range_end = self._length
self._chunks.append(
File.presign(
self.storage.extra_data["bucket"],
self.object_name, 14400,
[range_init, range_end]
)
)
else:
self._chunks.append(
File.presign(
self.storage.extra_data["bucket"],
self.object_name, 14400,
[0, self._length]
)
)
def set_buffer(self, buffer_size):
# Buffer size should be 8KB at least, but we will allow any value
# for other purposes as reading TAR files which have 512 byte file registers
# in order to optimize the process of reading specific file structures
self._buffer_size = buffer_size
def __buffer_load(self, byte_range):
range_init = byte_range[0]
range_end = byte_range[1]
if range_end > self._length:
range_end = self._length
with self._urlopen([range_init, range_end]) as remote_reader:
self._buffer = remote_reader.read()
#self._buffer = bytearray(self._buffer_size)
def __read(self, byte_range=None):
if byte_range is None:
byte_range = [0, self._buffer_size]
reload_buffer = False
if self._buffer is None:
reload_buffer = True
if byte_range[0] < self._buffer_init:
reload_buffer = True
if byte_range[1] > self._buffer_end:
reload_buffer = True
if reload_buffer is True:
if self._debug is True:
print(
"Reloading buffer: " + "\n" +
" [*] Current buffer init: " + str(self._buffer_init) + "\n" +
" [*] Current buffer end: " + str(self._buffer_end) + "\n" +
" [*] Requested read init: " + str(byte_range[0]) + "\n" +
" [*] Requested read end: " + str(byte_range[1])
)
self.__buffer_load([byte_range[0], byte_range[0] + self._buffer_size])
self._buffer_init = byte_range[0]
self._buffer_end = byte_range[0] + self._buffer_size
data_buffer_length = byte_range[1] - byte_range[0] + 1
data_buffer_init = byte_range[0] - self._buffer_init
data_buffer_end = data_buffer_init + data_buffer_length
return self._buffer[data_buffer_init:data_buffer_end]
def seek(self, offset, whence=0):
if not self.seekable():
raise OSError
if whence == 0:
self._pos = 0
elif whence == 1:
pass
elif whence == 2:
self._pos = self._length
self._pos += offset
return self._pos
def seekable(self, *args, **kwargs):
return self._seekable
def readable(self, *args, **kwargs):
return not self.closed
def writable(self, *args, **kwargs):
return False
def read(self, amt=-1):
if self._pos >= self._length:
return b""
if amt < 0:
end = self._length - 1
else:
end = min(self._pos + amt - 1, self._length - 1)
byte_range = (self._pos, end)
self._pos = end + 1
return self.__read(byte_range)
def readall(self):
return self.read(-1)
def tell(self):
return self._pos
def __getattribute__(self, item):
attr = object.__getattribute__(self, item)
if not object.__getattribute__(self, "_debug"):
return attr
if hasattr(attr, '__call__'):
def trace(*args, **kwargs):
a = ", ".join(map(str, args))
if kwargs:
a += ", ".join(["{}={}".format(k, v) for k, v in kwargs.items()])
print("Calling: {}({})".format(item, a))
return attr(*args, **kwargs)
return trace
else:
return attr
def _urlopen(self, byte_range=None):
header = {}
if byte_range:
header = {"range": "bytes={}-{}".format(*byte_range)}
while True:
try:
r = urllib.request.Request(self._chunks[0], headers=header)
return urllib.request.urlopen(r)
except urllib.error.HTTPError as e:
print("[" + self._chunks[0] + "]" + "Server responded with " + str(e), file=stderr)
if e.code == 404:
# Can't open a non-existing file
raise
if e.code == 403:
# Amazon turned crazy or the boto3 library shitted himself and set a wrong expiration
raise
if self._request_error_delay is None or self._request_error_delay < 0:
raise
print("Sleeping for {} seconds before trying again".format(self._request_error_delay), file=stderr)
time.sleep(self._request_error_delay)
Classes
class SeekableStream (storage: libnova.common.api.Storage, object_name, repeat_time=-1, debug=False, buffer_size=0)
-
SeekableStream
This class provides a seekable Stream using presigned URLs and the range HTTP header
It will also take care of the 5GB limit of files, generating as much presigned URLs as needed
Get a seekable Stream using Presigned URLs and the range HTTP header
Args
object_name
:str
- The S3 object key
repeat_time
:int
- In case of HTTP errors wait
_request_error_delay
seconds before trying again. Negative value orNone
disables retrying and simply passes on the exception (default behaviour).
Expand source code
class SeekableStream(IOBase): """SeekableStream This class provides a seekable Stream using presigned URLs and the range HTTP header It will also take care of the 5GB limit of files, generating as much presigned URLs as needed """ storage: Storage = None object_name: str = None _pos: int = 0 _length: int = -1 _seekable: bool = True _chunk_size: int = 4294967296 # 4GB per chunk; Max can be up to 5GB but 4GB is used to keep things safe _chunks: dict = [] _debug: bool = False _request_error_delay: int = -1 _buffer: bytes = None _buffer_size: int = 8 * 1024 _buffer_init: int = 0 _buffer_end: int = 0 def __cleanup(self): """Restores the variables to initial values to ensure no remaining data is kept between executions """ self.storage: Storage = None self.object_name: str = None self._pos: int = 0 self._length: int = -1 self._seekable: bool = True self._chunk_size: int = 4294967296 self._chunks: dict = [] self._debug: bool = False self._request_error_delay: int = -1 self._buffer: bytes = None self._buffer_size: int = 8 * 1024 self._buffer_init: int = 0 self._buffer_end: int = 0 def __init__(self, storage: Storage, object_name, repeat_time=-1, debug=False, buffer_size=0): """Get a seekable Stream using Presigned URLs and the range HTTP header Args: object_name (str): The S3 object key repeat_time (int): In case of HTTP errors wait `_request_error_delay` seconds before trying again. Negative value or `None` disables retrying and simply passes on the exception (default behaviour). """ """Allow a file accessible via HTTP to be used like a local file by utilities that use `seek()` to read arbitrary parts of the file, such as `ZipFile`. Seeking is done via the 'range: bytes=xx-yy' HTTP header. Parameters ---------- url : str A HTTP or HTTPS URL name : str, optional The filename of the file. Will be filled from the Content-Disposition header if not provided. _request_error_delay : int, optional In case of HTTP errors wait `_request_error_delay` seconds before trying again. Negative value or `None` disables retrying and simply passes on the exception (the default). """ self.__cleanup() super().__init__() self.storage = storage self.object_name = object_name self._request_error_delay = repeat_time self._debug = debug self.set_buffer(buffer_size) object_head = File.head(self.storage.extra_data["bucket"], self.object_name) if object_head is not None: self._length = object_head['ContentLength'] if self._length < 1: self._seekable = False if self._length > self._chunk_size: for i in range(0, int(math.ceil(self._length / self._chunk_size))): range_init = i * self._chunk_size range_end = (i+1)* self._chunk_size if range_end > self._length: range_end = self._length self._chunks.append( File.presign( self.storage.extra_data["bucket"], self.object_name, 14400, [range_init, range_end] ) ) else: self._chunks.append( File.presign( self.storage.extra_data["bucket"], self.object_name, 14400, [0, self._length] ) ) def set_buffer(self, buffer_size): # Buffer size should be 8KB at least, but we will allow any value # for other purposes as reading TAR files which have 512 byte file registers # in order to optimize the process of reading specific file structures self._buffer_size = buffer_size def __buffer_load(self, byte_range): range_init = byte_range[0] range_end = byte_range[1] if range_end > self._length: range_end = self._length with self._urlopen([range_init, range_end]) as remote_reader: self._buffer = remote_reader.read() #self._buffer = bytearray(self._buffer_size) def __read(self, byte_range=None): if byte_range is None: byte_range = [0, self._buffer_size] reload_buffer = False if self._buffer is None: reload_buffer = True if byte_range[0] < self._buffer_init: reload_buffer = True if byte_range[1] > self._buffer_end: reload_buffer = True if reload_buffer is True: if self._debug is True: print( "Reloading buffer: " + "\n" + " [*] Current buffer init: " + str(self._buffer_init) + "\n" + " [*] Current buffer end: " + str(self._buffer_end) + "\n" + " [*] Requested read init: " + str(byte_range[0]) + "\n" + " [*] Requested read end: " + str(byte_range[1]) ) self.__buffer_load([byte_range[0], byte_range[0] + self._buffer_size]) self._buffer_init = byte_range[0] self._buffer_end = byte_range[0] + self._buffer_size data_buffer_length = byte_range[1] - byte_range[0] + 1 data_buffer_init = byte_range[0] - self._buffer_init data_buffer_end = data_buffer_init + data_buffer_length return self._buffer[data_buffer_init:data_buffer_end] def seek(self, offset, whence=0): if not self.seekable(): raise OSError if whence == 0: self._pos = 0 elif whence == 1: pass elif whence == 2: self._pos = self._length self._pos += offset return self._pos def seekable(self, *args, **kwargs): return self._seekable def readable(self, *args, **kwargs): return not self.closed def writable(self, *args, **kwargs): return False def read(self, amt=-1): if self._pos >= self._length: return b"" if amt < 0: end = self._length - 1 else: end = min(self._pos + amt - 1, self._length - 1) byte_range = (self._pos, end) self._pos = end + 1 return self.__read(byte_range) def readall(self): return self.read(-1) def tell(self): return self._pos def __getattribute__(self, item): attr = object.__getattribute__(self, item) if not object.__getattribute__(self, "_debug"): return attr if hasattr(attr, '__call__'): def trace(*args, **kwargs): a = ", ".join(map(str, args)) if kwargs: a += ", ".join(["{}={}".format(k, v) for k, v in kwargs.items()]) print("Calling: {}({})".format(item, a)) return attr(*args, **kwargs) return trace else: return attr def _urlopen(self, byte_range=None): header = {} if byte_range: header = {"range": "bytes={}-{}".format(*byte_range)} while True: try: r = urllib.request.Request(self._chunks[0], headers=header) return urllib.request.urlopen(r) except urllib.error.HTTPError as e: print("[" + self._chunks[0] + "]" + "Server responded with " + str(e), file=stderr) if e.code == 404: # Can't open a non-existing file raise if e.code == 403: # Amazon turned crazy or the boto3 library shitted himself and set a wrong expiration raise if self._request_error_delay is None or self._request_error_delay < 0: raise print("Sleeping for {} seconds before trying again".format(self._request_error_delay), file=stderr) time.sleep(self._request_error_delay)
Ancestors
- io.IOBase
- _io._IOBase
Class variables
var object_name : str
var storage : libnova.common.api.Storage
Methods
def read(self, amt=-1)
-
Expand source code
def read(self, amt=-1): if self._pos >= self._length: return b"" if amt < 0: end = self._length - 1 else: end = min(self._pos + amt - 1, self._length - 1) byte_range = (self._pos, end) self._pos = end + 1 return self.__read(byte_range)
def readable(self, *args, **kwargs)
-
Return whether object was opened for reading.
If False, read() will raise OSError.
Expand source code
def readable(self, *args, **kwargs): return not self.closed
def readall(self)
-
Expand source code
def readall(self): return self.read(-1)
def seek(self, offset, whence=0)
-
Change stream position.
Change the stream position to the given byte offset. The offset is interpreted relative to the position indicated by whence. Values for whence are:
- 0 – start of stream (the default); offset should be zero or positive
- 1 – current stream position; offset may be negative
- 2 – end of stream; offset is usually negative
Return the new absolute position.
Expand source code
def seek(self, offset, whence=0): if not self.seekable(): raise OSError if whence == 0: self._pos = 0 elif whence == 1: pass elif whence == 2: self._pos = self._length self._pos += offset return self._pos
def seekable(self, *args, **kwargs)
-
Return whether object supports random access.
If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().
Expand source code
def seekable(self, *args, **kwargs): return self._seekable
def set_buffer(self, buffer_size)
-
Expand source code
def set_buffer(self, buffer_size): # Buffer size should be 8KB at least, but we will allow any value # for other purposes as reading TAR files which have 512 byte file registers # in order to optimize the process of reading specific file structures self._buffer_size = buffer_size
def tell(self)
-
Return current stream position.
Expand source code
def tell(self): return self._pos
def writable(self, *args, **kwargs)
-
Return whether object was opened for writing.
If False, write() will raise OSError.
Expand source code
def writable(self, *args, **kwargs): return False