Source code for pak.io.connection
"""Contains :class:`io.Connection <.Connection>`."""
import abc
import asyncio
__all__ = [
"Connection",
]
[docs]class Connection(abc.ABC):
r"""A connection between two :class:`.Packet` sources.
This class models a protocol structure that is relatively common,
where there is a stream of incoming :class:`.Packet`\s that
aren't expected to be any specific type of :class:`.Packet`.
This may not model your protocol structure adequately.
This in particular may be the case if you are not able
to read and send data asynchronously. In such a case,
you should not use this class.
Parameters
----------
reader : :class:`asyncio.StreamReader` or ``None``
The stream for incoming data.
writer : :class:`asyncio.StreamWriter` or ``None``
The stream for outgoing data.
ctx : :class:`.Packet.Context`
The context for incoming and outgoing :class:`.Packet`\s.
Attributes
----------
reader : :class:`asyncio.StreamReader` or ``None``
The stream for incoming data.
writer : :class:`asyncio.StreamWriter` or ``None``
The stream for outgoing data.
ctx : :class:`.Packet.Context`
The context for incoming and outgoing :class:`.Packet`\s.
This should **always** be passed to :class:`.Packet`
operations, such as :meth:`.Packet.unpack` and
:meth:`.Packet.pack`.
Examples
--------
A :class:`Connection` can be used in an ``async with`` statement, like so::
connection = ...
async with connection:
...
This will make sure that ``connection`` is closed by the end
of the of the ``async with`` statement.
"""
def __init__(self, *, reader=None, writer=None, ctx):
self.reader = reader
self.writer = writer
self.ctx = ctx
self._packet_watch_info = {}
[docs] def is_closing(self):
"""Gets whether the :class:`Connection` is closed or in the process of closing.
Returns
-------
:class:`bool`
Whether the :class:`Connection` is closed or in the process of closing.
"""
# 'StreamReader' cannot be closed.
return self.writer is None or self.writer.is_closing()
[docs] def close(self):
"""Closes the :class:`Connection`.
This method should be used along with the :meth:`wait_closed` method.
"""
self._end_packet_watches()
if self.writer is None:
return
self.writer.close()
[docs] async def wait_closed(self):
"""Waits until the :class:`Connection` is closed."""
if self.writer is None:
return
await self.writer.wait_closed()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
self.close()
await self.wait_closed()
[docs] def create_packet(self, packet_cls, /, **fields):
"""Creates a :class:`.Packet` for the :class:`Connection`.
The :attr:`ctx` attribute is used to create the :class:`.Packet`.
Parameters
----------
packet_cls : subclass of :class:`.Packet`
The :class:`.Packet` to create.
**fields
The names and corresponding values of the
:class:`.Packet` to create.
Returns
-------
:class:`.Packet`
The created :class:`.Packet`.
"""
return packet_cls(**fields, ctx=self.ctx)
[docs] async def read_data(self, size):
"""Reads incoming data out of the :attr:`reader` attribute.
Parameters
----------
size : :class:`int`
How many bytes to read.
Returns
-------
:class:`bytes` or ``None``
The incoming data.
If EOF is reached on the :attr:`reader` attribute and
``size`` bytes cannot be read, then ``None`` is returned.
"""
try:
return await self.reader.readexactly(size)
except asyncio.IncompleteReadError:
# We return 'None' instead of letting the exception
# propagate because reaching EOF is something that
# should be explicitly handled by the user at the
# call site.
return None
[docs] @abc.abstractmethod
async def _read_next_packet(self):
"""Reads the next incoming :class:`.Packet`.
.. note::
In your implementation, you do not need to ensure
that reading is atomic.
.. seealso::
:meth:`read_data`
Returns
-------
:class:`.Packet` or ``None``
The next incoming :class:`.Packet`.
If ``None``, then that means that there is no
next :class:`.Packet` and that :meth:`continuously_read_packets`
should end. This should be when EOF is reached
on the :attr:`reader` attribute, which will be
when :meth:`read_data` returns ``None``.
"""
raise NotImplementedError
def _dispatch_to_packet_watches(self, packet):
# Make a copy of the items so we may modify
# 'self._packet_watch_info' within the same loop.
for packet_cls, packet_future in list(self._packet_watch_info.items()):
if isinstance(packet, packet_cls):
packet_future.set_result(packet)
self._packet_watch_info.pop(packet_cls)
# Don't break here since there could be other
# packet watches that requested a more derived
# subclass of 'packet_cls'.
def _end_packet_watches(self):
# Make a copy of the items so we may modify
# 'self._packet_watch_info' within the same loop.
for packet_cls, packet_future in list(self._packet_watch_info.items()):
# We could attempt to end the watches
# right as they receive their values.
if not packet_future.done():
packet_future.set_result(None)
self._packet_watch_info.pop(packet_cls)
[docs] async def continuously_read_packets(self):
r"""Continuously reads and yields all incoming :class:`.Packet`\s.
.. note::
This must be iterated over for :meth:`watch_for_packet` to function.
This will continue to yield :class:`.Packet`\s until the
:class:`Connection` is closed or EOF is reached.
.. warning::
This method should **not** be called twice concurrently.
Doing so may cause data to be read incorrectly.
Yields
------
:class:`.Packet`
An incoming :class:`.Packet`.
Examples
--------
::
connection = ...
async for packet in connection.continuously_read_packets():
...
"""
while not self.is_closing():
packet = await self._read_next_packet()
if packet is None:
self.close()
await self.wait_closed()
return
self._dispatch_to_packet_watches(packet)
yield packet
[docs] async def watch_for_packet(self, packet_cls):
r"""Watches for a specific type of :class:`.Packet` from the incoming stream of :class:`.Packet`\s.
Requires :meth:`continuously_read_packets` to be iterated over.
Parameters
----------
packet_cls : subclass of :class:`.Packet`
The type of :class:`.Packet` to watch for.
Returns
-------
:class:`.Packet` or ``None``
The specified incoming :class:`.Packet`.
Returns ``None`` when the :class:`Connection` is closed
or EOF is reached.
"""
if self.is_closing():
# If we're already closing, then
# don't bother waiting for a packet.
#
# This would appear important to do
# so that we don't end up creating
# and awaiting a future that is
# never completed, and so deadlocking.
return None
packet_future = self._packet_watch_info.get(packet_cls)
if packet_future is None:
loop = asyncio.get_running_loop()
packet_future = loop.create_future()
self._packet_watch_info[packet_cls] = packet_future
# NOTE: We shield the future so that
# if it gets canceled, then we can still
# reuse it for other packet watches.
return await asyncio.shield(packet_future)
[docs] def is_watching_for_packet(self, packet_cls):
"""Gets whether a specific type of :class:`.Packet` is being watched for.
.. seealso::
:meth:`watch_for_packet`
Parameters
----------
packet_cls : subclass of :class:`.Packet`
The type of :class:`.Packet` to check.
Returns
-------
:class:`bool`
Whether ``packet_cls`` is being watched for.
"""
for watch_cls in self._packet_watch_info.keys():
if issubclass(packet_cls, watch_cls):
return True
return False
[docs] async def write_data(self, data):
"""Writes outgoing data to the :attr:`writer` attribute.
Parameters
----------
data : :class:`bytes`
The data to write.
"""
self.writer.write(data)
await self.writer.drain()
[docs] async def write_packet(self, packet_cls, /, **fields):
"""Writes an outgoing :class:`.Packet`.
This method uses :meth:`create_packet` to create the
:class:`.Packet` to write. It then passes it to
:meth:`write_packet_instance`.
If you have an already created :class:`.Packet` you
wish to write, then you should use :meth:`write_packet_instance`.
Parameters
----------
packet_cls : subclass of :class:`.Packet`
The type of :class:`.Packet` to write.
**fields
The names and corresponding values of the
:class:`.Packet` to write.
"""
await self.write_packet_instance(self.create_packet(packet_cls, **fields))
[docs] @abc.abstractmethod
async def write_packet_instance(self, packet):
"""Writes an outgoing :class:`.Packet` instance.
.. warning::
In most cases, the :meth:`write_packet` method should be used instead.
This method should only be used if you have a pre-existing :class:`.Packet`
instance.
.. note::
In your implementation, writes should be atomic.
It is thus recommended to only write data in one fell swoop.
.. seealso::
:meth:`write_data`
Parameters
----------
packet : :class:`.Packet`
The :class:`.Packet` to write.
"""
raise NotImplementedError