Module libnova.common.filesystem.S3.SeekableStream

Expand source code
#!/usr/bin/env python
# coding: utf-8

import cgi
import math
import time
import urllib.request
from   sys import stderr
from   io  import IOBase

from libnova.common.filesystem    import S3
from libnova.common.api           import Storage
from libnova.common.filesystem.S3 import File

class SeekableStream(IOBase):
    """SeekableStream

    This class provides a seekable Stream using presigned URLs and the range HTTP header

    It will also take care of the 5GB limit of files, generating as much presigned URLs as needed
    """
    storage:     Storage = None
    object_name: str     = None

    _pos:        int  = 0
    _length:     int  = -1
    _seekable:   bool = True
    _chunk_size: int  = 4294967296 # 4GB per chunk; Max can be up to 5GB but 4GB is used to keep things safe
    _chunks:     dict = []

    _debug:               bool = False
    _request_error_delay: int  = -1

    _buffer:      bytes = None
    _buffer_size: int   = 8 * 1024
    _buffer_init: int   = 0
    _buffer_end:  int   = 0

    def __cleanup(self):
        """Restores the variables to initial values to ensure no remaining data is kept between executions
        """
        self.storage:     Storage = None
        self.object_name: str     = None

        self._pos:        int  = 0
        self._length:     int  = -1
        self._seekable:   bool = True
        self._chunk_size: int  = 4294967296
        self._chunks:     dict = []

        self._debug:               bool = False
        self._request_error_delay: int  = -1

        self._buffer:      bytes = None
        self._buffer_size: int   = 8 * 1024
        self._buffer_init: int   = 0
        self._buffer_end:  int   = 0

    def __init__(self, storage: Storage, object_name, repeat_time=-1, debug=False, buffer_size=0):
        """Get a seekable Stream using Presigned URLs and the range HTTP header

        Args:
            object_name (str): The S3 object key
            repeat_time (int): In case of HTTP errors wait `_request_error_delay` seconds before trying again. Negative value or `None` disables retrying and simply passes on the exception (default behaviour).
        """

        """Allow a file accessible via HTTP to be used like a local file by utilities
         that use `seek()` to read arbitrary parts of the file, such as `ZipFile`.
        Seeking is done via the 'range: bytes=xx-yy' HTTP header.

        Parameters
        ----------
        url : str
            A HTTP or HTTPS URL
        name : str, optional
            The filename of the file.
            Will be filled from the Content-Disposition header if not provided.
        _request_error_delay : int, optional
            In case of HTTP errors wait `_request_error_delay` seconds before trying again.
            Negative value or `None` disables retrying and simply passes on the exception (the default).
        """

        self.__cleanup()

        super().__init__()
        self.storage     = storage
        self.object_name = object_name

        self._request_error_delay = repeat_time
        self._debug               = debug

        self.set_buffer(buffer_size)

        object_head = File.head(self.storage.extra_data["bucket"], self.object_name)
        if object_head is not None:
            self._length = object_head['ContentLength']

        if self._length < 1:
            self._seekable = False

        if self._length > self._chunk_size:
            for i in range(0, int(math.ceil(self._length / self._chunk_size))):
                range_init =  i   * self._chunk_size
                range_end  = (i+1)* self._chunk_size
                if range_end > self._length:
                    range_end = self._length

                self._chunks.append(
                    File.presign(
                        self.storage.extra_data["bucket"],
                        self.object_name, 14400,
                        [range_init, range_end]
                    )
                )
        else:
            self._chunks.append(
                File.presign(
                    self.storage.extra_data["bucket"],
                    self.object_name, 14400,
                    [0, self._length]
                )
            )


    def set_buffer(self, buffer_size):
        # Buffer size should be 8KB at least, but we will allow any value
        # for other purposes as reading TAR files which have 512 byte file registers
        # in order to optimize the process of reading specific file structures
        self._buffer_size = buffer_size


    def __buffer_load(self, byte_range):
        range_init = byte_range[0]
        range_end  = byte_range[1]
        if range_end > self._length:
            range_end = self._length

        with self._urlopen([range_init, range_end]) as remote_reader:
            self._buffer = remote_reader.read()
        #self._buffer = bytearray(self._buffer_size)


    def __read(self, byte_range=None):
        if byte_range is None:
            byte_range = [0, self._buffer_size]

        reload_buffer = False
        if self._buffer is None:
            reload_buffer = True
        if byte_range[0] < self._buffer_init:
            reload_buffer = True
        if byte_range[1] > self._buffer_end:
            reload_buffer = True

        if reload_buffer is True:
            if self._debug is True:
                print(
                    "Reloading buffer: " + "\n" +
                    " [*] Current buffer init: " + str(self._buffer_init) + "\n" +
                    " [*] Current buffer end:  " + str(self._buffer_end)  + "\n" +
                    " [*] Requested read init: " + str(byte_range[0])     + "\n" +
                    " [*] Requested read end:  " + str(byte_range[1])
                )
            self.__buffer_load([byte_range[0], byte_range[0] + self._buffer_size])

            self._buffer_init = byte_range[0]
            self._buffer_end  = byte_range[0] + self._buffer_size

        data_buffer_length = byte_range[1] - byte_range[0] + 1
        data_buffer_init   = byte_range[0] - self._buffer_init
        data_buffer_end    = data_buffer_init + data_buffer_length

        return self._buffer[data_buffer_init:data_buffer_end]


    def seek(self, offset, whence=0):
        if not self.seekable():
            raise OSError
        if whence == 0:
            self._pos = 0
        elif whence == 1:
            pass
        elif whence == 2:
            self._pos = self._length
        self._pos += offset
        return self._pos

    def seekable(self, *args, **kwargs):
        return self._seekable

    def readable(self, *args, **kwargs):
        return not self.closed

    def writable(self, *args, **kwargs):
        return False

    def read(self, amt=-1):
        if self._pos >= self._length:
            return b""
        if amt < 0:
            end = self._length - 1
        else:
            end = min(self._pos + amt - 1, self._length - 1)
        byte_range = (self._pos, end)
        self._pos = end + 1

        return self.__read(byte_range)


    def readall(self):
        return self.read(-1)

    def tell(self):
        return self._pos

    def __getattribute__(self, item):
        attr = object.__getattribute__(self, item)
        if not object.__getattribute__(self, "_debug"):
            return attr

        if hasattr(attr, '__call__'):
            def trace(*args, **kwargs):
                a = ", ".join(map(str, args))
                if kwargs:
                    a += ", ".join(["{}={}".format(k, v) for k, v in kwargs.items()])
                print("Calling: {}({})".format(item, a))
                return attr(*args, **kwargs)

            return trace
        else:
            return attr

    def _urlopen(self, byte_range=None):
        header = {}
        if byte_range:
            header = {"range": "bytes={}-{}".format(*byte_range)}
        while True:
            try:
                r = urllib.request.Request(self._chunks[0], headers=header)
                return urllib.request.urlopen(r)
            except urllib.error.HTTPError as e:
                print("[" + self._chunks[0] + "]" + "Server responded with " + str(e), file=stderr)
                if e.code == 404:
                    # Can't open a non-existing file
                    raise
                if e.code == 403:
                    # Amazon turned crazy or the boto3 library shitted himself and set a wrong expiration
                    raise
                if self._request_error_delay is None or self._request_error_delay < 0:
                    raise
                print("Sleeping for {} seconds before trying again".format(self._request_error_delay), file=stderr)
                time.sleep(self._request_error_delay)

Classes

class SeekableStream (storage: libnova.common.api.Storage, object_name, repeat_time=-1, debug=False, buffer_size=0)

SeekableStream

This class provides a seekable Stream using presigned URLs and the range HTTP header

It will also take care of the 5GB limit of files, generating as much presigned URLs as needed

Get a seekable Stream using Presigned URLs and the range HTTP header

Args

object_name : str
The S3 object key
repeat_time : int
In case of HTTP errors wait _request_error_delay seconds before trying again. Negative value or None disables retrying and simply passes on the exception (default behaviour).
Expand source code
class SeekableStream(IOBase):
    """SeekableStream

    This class provides a seekable Stream using presigned URLs and the range HTTP header

    It will also take care of the 5GB limit of files, generating as much presigned URLs as needed
    """
    storage:     Storage = None
    object_name: str     = None

    _pos:        int  = 0
    _length:     int  = -1
    _seekable:   bool = True
    _chunk_size: int  = 4294967296 # 4GB per chunk; Max can be up to 5GB but 4GB is used to keep things safe
    _chunks:     dict = []

    _debug:               bool = False
    _request_error_delay: int  = -1

    _buffer:      bytes = None
    _buffer_size: int   = 8 * 1024
    _buffer_init: int   = 0
    _buffer_end:  int   = 0

    def __cleanup(self):
        """Restores the variables to initial values to ensure no remaining data is kept between executions
        """
        self.storage:     Storage = None
        self.object_name: str     = None

        self._pos:        int  = 0
        self._length:     int  = -1
        self._seekable:   bool = True
        self._chunk_size: int  = 4294967296
        self._chunks:     dict = []

        self._debug:               bool = False
        self._request_error_delay: int  = -1

        self._buffer:      bytes = None
        self._buffer_size: int   = 8 * 1024
        self._buffer_init: int   = 0
        self._buffer_end:  int   = 0

    def __init__(self, storage: Storage, object_name, repeat_time=-1, debug=False, buffer_size=0):
        """Get a seekable Stream using Presigned URLs and the range HTTP header

        Args:
            object_name (str): The S3 object key
            repeat_time (int): In case of HTTP errors wait `_request_error_delay` seconds before trying again. Negative value or `None` disables retrying and simply passes on the exception (default behaviour).
        """

        """Allow a file accessible via HTTP to be used like a local file by utilities
         that use `seek()` to read arbitrary parts of the file, such as `ZipFile`.
        Seeking is done via the 'range: bytes=xx-yy' HTTP header.

        Parameters
        ----------
        url : str
            A HTTP or HTTPS URL
        name : str, optional
            The filename of the file.
            Will be filled from the Content-Disposition header if not provided.
        _request_error_delay : int, optional
            In case of HTTP errors wait `_request_error_delay` seconds before trying again.
            Negative value or `None` disables retrying and simply passes on the exception (the default).
        """

        self.__cleanup()

        super().__init__()
        self.storage     = storage
        self.object_name = object_name

        self._request_error_delay = repeat_time
        self._debug               = debug

        self.set_buffer(buffer_size)

        object_head = File.head(self.storage.extra_data["bucket"], self.object_name)
        if object_head is not None:
            self._length = object_head['ContentLength']

        if self._length < 1:
            self._seekable = False

        if self._length > self._chunk_size:
            for i in range(0, int(math.ceil(self._length / self._chunk_size))):
                range_init =  i   * self._chunk_size
                range_end  = (i+1)* self._chunk_size
                if range_end > self._length:
                    range_end = self._length

                self._chunks.append(
                    File.presign(
                        self.storage.extra_data["bucket"],
                        self.object_name, 14400,
                        [range_init, range_end]
                    )
                )
        else:
            self._chunks.append(
                File.presign(
                    self.storage.extra_data["bucket"],
                    self.object_name, 14400,
                    [0, self._length]
                )
            )


    def set_buffer(self, buffer_size):
        # Buffer size should be 8KB at least, but we will allow any value
        # for other purposes as reading TAR files which have 512 byte file registers
        # in order to optimize the process of reading specific file structures
        self._buffer_size = buffer_size


    def __buffer_load(self, byte_range):
        range_init = byte_range[0]
        range_end  = byte_range[1]
        if range_end > self._length:
            range_end = self._length

        with self._urlopen([range_init, range_end]) as remote_reader:
            self._buffer = remote_reader.read()
        #self._buffer = bytearray(self._buffer_size)


    def __read(self, byte_range=None):
        if byte_range is None:
            byte_range = [0, self._buffer_size]

        reload_buffer = False
        if self._buffer is None:
            reload_buffer = True
        if byte_range[0] < self._buffer_init:
            reload_buffer = True
        if byte_range[1] > self._buffer_end:
            reload_buffer = True

        if reload_buffer is True:
            if self._debug is True:
                print(
                    "Reloading buffer: " + "\n" +
                    " [*] Current buffer init: " + str(self._buffer_init) + "\n" +
                    " [*] Current buffer end:  " + str(self._buffer_end)  + "\n" +
                    " [*] Requested read init: " + str(byte_range[0])     + "\n" +
                    " [*] Requested read end:  " + str(byte_range[1])
                )
            self.__buffer_load([byte_range[0], byte_range[0] + self._buffer_size])

            self._buffer_init = byte_range[0]
            self._buffer_end  = byte_range[0] + self._buffer_size

        data_buffer_length = byte_range[1] - byte_range[0] + 1
        data_buffer_init   = byte_range[0] - self._buffer_init
        data_buffer_end    = data_buffer_init + data_buffer_length

        return self._buffer[data_buffer_init:data_buffer_end]


    def seek(self, offset, whence=0):
        if not self.seekable():
            raise OSError
        if whence == 0:
            self._pos = 0
        elif whence == 1:
            pass
        elif whence == 2:
            self._pos = self._length
        self._pos += offset
        return self._pos

    def seekable(self, *args, **kwargs):
        return self._seekable

    def readable(self, *args, **kwargs):
        return not self.closed

    def writable(self, *args, **kwargs):
        return False

    def read(self, amt=-1):
        if self._pos >= self._length:
            return b""
        if amt < 0:
            end = self._length - 1
        else:
            end = min(self._pos + amt - 1, self._length - 1)
        byte_range = (self._pos, end)
        self._pos = end + 1

        return self.__read(byte_range)


    def readall(self):
        return self.read(-1)

    def tell(self):
        return self._pos

    def __getattribute__(self, item):
        attr = object.__getattribute__(self, item)
        if not object.__getattribute__(self, "_debug"):
            return attr

        if hasattr(attr, '__call__'):
            def trace(*args, **kwargs):
                a = ", ".join(map(str, args))
                if kwargs:
                    a += ", ".join(["{}={}".format(k, v) for k, v in kwargs.items()])
                print("Calling: {}({})".format(item, a))
                return attr(*args, **kwargs)

            return trace
        else:
            return attr

    def _urlopen(self, byte_range=None):
        header = {}
        if byte_range:
            header = {"range": "bytes={}-{}".format(*byte_range)}
        while True:
            try:
                r = urllib.request.Request(self._chunks[0], headers=header)
                return urllib.request.urlopen(r)
            except urllib.error.HTTPError as e:
                print("[" + self._chunks[0] + "]" + "Server responded with " + str(e), file=stderr)
                if e.code == 404:
                    # Can't open a non-existing file
                    raise
                if e.code == 403:
                    # Amazon turned crazy or the boto3 library shitted himself and set a wrong expiration
                    raise
                if self._request_error_delay is None or self._request_error_delay < 0:
                    raise
                print("Sleeping for {} seconds before trying again".format(self._request_error_delay), file=stderr)
                time.sleep(self._request_error_delay)

Ancestors

  • io.IOBase
  • _io._IOBase

Class variables

var object_name : str
var storagelibnova.common.api.Storage

Methods

def read(self, amt=-1)
Expand source code
def read(self, amt=-1):
    if self._pos >= self._length:
        return b""
    if amt < 0:
        end = self._length - 1
    else:
        end = min(self._pos + amt - 1, self._length - 1)
    byte_range = (self._pos, end)
    self._pos = end + 1

    return self.__read(byte_range)
def readable(self, *args, **kwargs)

Return whether object was opened for reading.

If False, read() will raise OSError.

Expand source code
def readable(self, *args, **kwargs):
    return not self.closed
def readall(self)
Expand source code
def readall(self):
    return self.read(-1)
def seek(self, offset, whence=0)

Change stream position.

Change the stream position to the given byte offset. The offset is interpreted relative to the position indicated by whence. Values for whence are:

  • 0 – start of stream (the default); offset should be zero or positive
  • 1 – current stream position; offset may be negative
  • 2 – end of stream; offset is usually negative

Return the new absolute position.

Expand source code
def seek(self, offset, whence=0):
    if not self.seekable():
        raise OSError
    if whence == 0:
        self._pos = 0
    elif whence == 1:
        pass
    elif whence == 2:
        self._pos = self._length
    self._pos += offset
    return self._pos
def seekable(self, *args, **kwargs)

Return whether object supports random access.

If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().

Expand source code
def seekable(self, *args, **kwargs):
    return self._seekable
def set_buffer(self, buffer_size)
Expand source code
def set_buffer(self, buffer_size):
    # Buffer size should be 8KB at least, but we will allow any value
    # for other purposes as reading TAR files which have 512 byte file registers
    # in order to optimize the process of reading specific file structures
    self._buffer_size = buffer_size
def tell(self)

Return current stream position.

Expand source code
def tell(self):
    return self._pos
def writable(self, *args, **kwargs)

Return whether object was opened for writing.

If False, write() will raise OSError.

Expand source code
def writable(self, *args, **kwargs):
    return False