Source code for pak.io.streams

"""Asynchronous data streams."""

import asyncio

from .. import util

__all__ = [
    "ByteStreamReader",
    "ByteStreamWriter",
]

[docs]class ByteStreamReader: """An :class:`asyncio.StreamReader` which reads from predetermined data. .. note:: While this technically does not inherit from :class:`asyncio.StreamReader`, it has the same API and semantics. Thus it is perfectly usable for e.g. :class:`io.Connection <.Connection>`. Parameters ---------- data : bytes-like The data to read from. """ def __init__(self, data=b""): self._buffer = bytearray(data)
[docs] async def read(self, n=-1): """Reads up to ``n`` bytes. Parameters ---------- n : :class:`int` The number of bytes to read. If ``-1``, then read until EOF. Returns ------- :class:`bytes` The data read from the stream. """ await util.yield_exec() if n < 0: n = len(self._buffer) extracted_data = self._buffer[:n] self._buffer = self._buffer[n:] return bytes(extracted_data)
[docs] async def readline(self): """Reads until the next newline. If EOF is reached before the next newline, then partial data is returned. Returns ------- :class:`bytes` The data read from the stream. The newline will be included in the data. """ try: return await self.readuntil(b"\n") except asyncio.IncompleteReadError as e: return e.partial
[docs] async def readexactly(self, n): """Reads exactly ``n`` bytes. Parameters ---------- n : :class:`int` The exact number of bytes to read. Returns ------- :class:`bytes` The data read from the stream. Raises ------ :exc:`asyncio.IncompleteReadError` If ``n`` bytes cannot be read. The ``partial`` attribute will contain the partially read data. """ if n > len(self._buffer): raise asyncio.IncompleteReadError(partial=await self.read(), expected=n) return await self.read(n)
def _find_separator_end(self, separator): # NOTE: Support for a tuple of multiple # separators was added in Python 3.13. if not isinstance(separator, tuple): separator = [separator] match_end = None for to_find in separator: if len(to_find) <= 0: raise ValueError("Separator must contain at least one byte") pos = self._buffer.find(to_find) if pos >= 0: possible_end = pos + len(to_find) if match_end is None: match_end = possible_end else: match_end = min(match_end, possible_end) # NOTE: This will return 'None' instead of '-1' # to signify that we did not find any separators. return match_end
[docs] async def readuntil(self, separator=b"\n"): """Reads until a separator is found. Parameters ---------- separator : :class:`bytes` or :class:`tuple` of :class:`bytes` If :class:`bytes`, then the separator to read until. If a :class:`tuple`, then the collection of possible separators to read until. The separator which results in the least amount of data being read will be the one utilized. Returns ------- :class:`bytes` The data read from the stream. The appropriate separator will be included in the data. Raises ------ :exc:`ValueError` If the separators don't all contain at least one byte. :exc:`asyncio.IncompleteReadError` If no separator can be found. The ``partial`` attribute will contain the partially read data, potentially including part of a separator. """ pos = self._find_separator_end(separator) if pos is None: raise asyncio.IncompleteReadError(partial=await self.read(), expected=None) return await self.readexactly(pos)
[docs] def at_eof(self): """Gets whether the stream has ended. Returns ------- :class:`bool` Whether the stream has ended. """ return len(self._buffer) == 0
[docs]class ByteStreamWriter: """An :class:`asyncio.StreamWriter` which writes to an internal buffer. .. note:: While this technically does not inherit from :class:`asyncio.StreamWriter`, it has the same API and semantics. Thus it is perfectly usable for e.g. :class:`io.Connection <.Connection>`. """ def __init__(self): self._buffer = bytearray() self._close_event = asyncio.Event() @property def written_data(self): """The data that has been written. Returns ------- :class:`bytes` """ return bytes(self._buffer)
[docs] def write(self, data): """Writes data to the internal buffer. This method should be used along with the :meth:`drain` method. Parameters ---------- data : bytes-like The data to write. """ self._buffer.extend(data)
[docs] def writelines(self, data): """Writes an iterable of bytes to the internal buffer. This method should be used along with the :meth:`drain` method. Parameters ---------- data : iterable of bytes-like The iterable of bytes to write. """ self.write(b"".join(data))
[docs] async def drain(self): """Waits until it is appropriate to resume writing to the :class:`ByteStreamWriter`.""" await util.yield_exec()
[docs] def close(self): """Closes the :class:`ByteStreamWriter`. This method should be used along with the :meth:`wait_closed` method. """ self._close_event.set()
[docs] def is_closing(self): """Gets whether the :class:`ByteStreamWriter` is closed or in the process of closing. Returns ------- :class:`bool` Whether the :class:`ByteStreamWriter` is closed or in the process of closing. """ return self._close_event.is_set()
[docs] async def wait_closed(self): """Waits until the :class:`ByteStreamWriter` is closed.""" await self._close_event.wait()