Source code for bohicalog.handlers.telegram.buffer

"""
This module contains the buffer class.
"""

from threading import RLock


[docs]class Buffer: """ Buffer class for the telegram handler Usage: .. code-block:: python from bohicalog.handlers.telegram.buffer import Buffer buffer = Buffer(max_size=1000) """ def __init__(self, max_size=None): """ :param max_size: max size of the buffer """ self._lock = RLock() self._buffer = "" self._max_size = max_size
[docs] def write(self, data): """ Write data to the buffer :param data: data to write """ with self._lock: self._buffer = f"{self._buffer}\n{data}"[: self._max_size]
[docs] def read(self, count): """ Read data from the buffer :param count: count of data to read :rtype: str :return: data read """ result = "" with self._lock: result, self._buffer = self._buffer[:count], self._buffer[count:] return result