| """ |
| QMP Events and EventListeners |
| |
| Asynchronous QMP uses `EventListener` objects to listen for events. An |
| `EventListener` is a FIFO event queue that can be pre-filtered to listen |
| for only specific events. Each `EventListener` instance receives its own |
| copy of events that it hears, so events may be consumed without fear or |
| worry for depriving other listeners of events they need to hear. |
| |
| |
| EventListener Tutorial |
| ---------------------- |
| |
| In all of the following examples, we assume that we have a `QMPClient` |
| instantiated named ``qmp`` that is already connected. |
| |
| |
| `listener()` context blocks with one name |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| The most basic usage is by using the `listener()` context manager to |
| construct them: |
| |
| .. code:: python |
| |
| with qmp.listener('STOP') as listener: |
| await qmp.execute('stop') |
| await listener.get() |
| |
| The listener is active only for the duration of the ‘with’ block. This |
| instance listens only for ‘STOP’ events. |
| |
| |
| `listener()` context blocks with two or more names |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| Multiple events can be selected for by providing any ``Iterable[str]``: |
| |
| .. code:: python |
| |
| with qmp.listener(('STOP', 'RESUME')) as listener: |
| await qmp.execute('stop') |
| event = await listener.get() |
| assert event['event'] == 'STOP' |
| |
| await qmp.execute('cont') |
| event = await listener.get() |
| assert event['event'] == 'RESUME' |
| |
| |
| `listener()` context blocks with no names |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| By omitting names entirely, you can listen to ALL events. |
| |
| .. code:: python |
| |
| with qmp.listener() as listener: |
| await qmp.execute('stop') |
| event = await listener.get() |
| assert event['event'] == 'STOP' |
| |
| This isn’t a very good use case for this feature: In a non-trivial |
| running system, we may not know what event will arrive next. Grabbing |
| the top of a FIFO queue returning multiple kinds of events may be prone |
| to error. |
| |
| |
| Using async iterators to retrieve events |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| If you’d like to simply watch what events happen to arrive, you can use |
| the listener as an async iterator: |
| |
| .. code:: python |
| |
| with qmp.listener() as listener: |
| async for event in listener: |
| print(f"Event arrived: {event['event']}") |
| |
| This is analogous to the following code: |
| |
| .. code:: python |
| |
| with qmp.listener() as listener: |
| while True: |
| event = listener.get() |
| print(f"Event arrived: {event['event']}") |
| |
| This event stream will never end, so these blocks will never terminate. |
| |
| |
| Using asyncio.Task to concurrently retrieve events |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| Since a listener’s event stream will never terminate, it is not likely |
| useful to use that form in a script. For longer-running clients, we can |
| create event handlers by using `asyncio.Task` to create concurrent |
| coroutines: |
| |
| .. code:: python |
| |
| async def print_events(listener): |
| try: |
| async for event in listener: |
| print(f"Event arrived: {event['event']}") |
| except asyncio.CancelledError: |
| return |
| |
| with qmp.listener() as listener: |
| task = asyncio.Task(print_events(listener)) |
| await qmp.execute('stop') |
| await qmp.execute('cont') |
| task.cancel() |
| await task |
| |
| However, there is no guarantee that these events will be received by the |
| time we leave this context block. Once the context block is exited, the |
| listener will cease to hear any new events, and becomes inert. |
| |
| Be mindful of the timing: the above example will *probably*– but does |
| not *guarantee*– that both STOP/RESUMED events will be printed. The |
| example below outlines how to use listeners outside of a context block. |
| |
| |
| Using `register_listener()` and `remove_listener()` |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| To create a listener with a longer lifetime, beyond the scope of a |
| single block, create a listener and then call `register_listener()`: |
| |
| .. code:: python |
| |
| class MyClient: |
| def __init__(self, qmp): |
| self.qmp = qmp |
| self.listener = EventListener() |
| |
| async def print_events(self): |
| try: |
| async for event in self.listener: |
| print(f"Event arrived: {event['event']}") |
| except asyncio.CancelledError: |
| return |
| |
| async def run(self): |
| self.task = asyncio.Task(self.print_events) |
| self.qmp.register_listener(self.listener) |
| await qmp.execute('stop') |
| await qmp.execute('cont') |
| |
| async def stop(self): |
| self.task.cancel() |
| await self.task |
| self.qmp.remove_listener(self.listener) |
| |
| The listener can be deactivated by using `remove_listener()`. When it is |
| removed, any possible pending events are cleared and it can be |
| re-registered at a later time. |
| |
| |
| Using the built-in all events listener |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| The `QMPClient` object creates its own default listener named |
| :py:obj:`~Events.events` that can be used for the same purpose without |
| having to create your own: |
| |
| .. code:: python |
| |
| async def print_events(listener): |
| try: |
| async for event in listener: |
| print(f"Event arrived: {event['event']}") |
| except asyncio.CancelledError: |
| return |
| |
| task = asyncio.Task(print_events(qmp.events)) |
| |
| await qmp.execute('stop') |
| await qmp.execute('cont') |
| |
| task.cancel() |
| await task |
| |
| |
| Using both .get() and async iterators |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| The async iterator and `get()` methods pull events from the same FIFO |
| queue. If you mix the usage of both, be aware: Events are emitted |
| precisely once per listener. |
| |
| If multiple contexts try to pull events from the same listener instance, |
| events are still emitted only precisely once. |
| |
| This restriction can be lifted by creating additional listeners. |
| |
| |
| Creating multiple listeners |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| Additional `EventListener` objects can be created at-will. Each one |
| receives its own copy of events, with separate FIFO event queues. |
| |
| .. code:: python |
| |
| my_listener = EventListener() |
| qmp.register_listener(my_listener) |
| |
| await qmp.execute('stop') |
| copy1 = await my_listener.get() |
| copy2 = await qmp.events.get() |
| |
| assert copy1 == copy2 |
| |
| In this example, we await an event from both a user-created |
| `EventListener` and the built-in events listener. Both receive the same |
| event. |
| |
| |
| Clearing listeners |
| ~~~~~~~~~~~~~~~~~~ |
| |
| `EventListener` objects can be cleared, clearing all events seen thus far: |
| |
| .. code:: python |
| |
| await qmp.execute('stop') |
| qmp.events.clear() |
| await qmp.execute('cont') |
| event = await qmp.events.get() |
| assert event['event'] == 'RESUME' |
| |
| `EventListener` objects are FIFO queues. If events are not consumed, |
| they will remain in the queue until they are witnessed or discarded via |
| `clear()`. FIFO queues will be drained automatically upon leaving a |
| context block, or when calling `remove_listener()`. |
| |
| |
| Accessing listener history |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| `EventListener` objects record their history. Even after being cleared, |
| you can obtain a record of all events seen so far: |
| |
| .. code:: python |
| |
| await qmp.execute('stop') |
| await qmp.execute('cont') |
| qmp.events.clear() |
| |
| assert len(qmp.events.history) == 2 |
| assert qmp.events.history[0]['event'] == 'STOP' |
| assert qmp.events.history[1]['event'] == 'RESUME' |
| |
| The history is updated immediately and does not require the event to be |
| witnessed first. |
| |
| |
| Using event filters |
| ~~~~~~~~~~~~~~~~~~~ |
| |
| `EventListener` objects can be given complex filtering criteria if names |
| are not sufficient: |
| |
| .. code:: python |
| |
| def job1_filter(event) -> bool: |
| event_data = event.get('data', {}) |
| event_job_id = event_data.get('id') |
| return event_job_id == "job1" |
| |
| with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener: |
| await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...}) |
| async for event in listener: |
| if event['data']['status'] == 'concluded': |
| break |
| |
| These filters might be most useful when parameterized. `EventListener` |
| objects expect a function that takes only a single argument (the raw |
| event, as a `Message`) and returns a bool; True if the event should be |
| accepted into the stream. You can create a function that adapts this |
| signature to accept configuration parameters: |
| |
| .. code:: python |
| |
| def job_filter(job_id: str) -> EventFilter: |
| def filter(event: Message) -> bool: |
| return event['data']['id'] == job_id |
| return filter |
| |
| with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener: |
| await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...}) |
| async for event in listener: |
| if event['data']['status'] == 'concluded': |
| break |
| |
| |
| Activating an existing listener with `listen()` |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| Listeners with complex, long configurations can also be created manually |
| and activated temporarily by using `listen()` instead of `listener()`: |
| |
| .. code:: python |
| |
| listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', |
| 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', |
| 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) |
| |
| with qmp.listen(listener): |
| await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...}) |
| async for event in listener: |
| print(event) |
| if event['event'] == 'BLOCK_JOB_COMPLETED': |
| break |
| |
| Any events that are not witnessed by the time the block is left will be |
| cleared from the queue; entering the block is an implicit |
| `register_listener()` and leaving the block is an implicit |
| `remove_listener()`. |
| |
| |
| Activating multiple existing listeners with `listen()` |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| While `listener()` is only capable of creating a single listener, |
| `listen()` is capable of activating multiple listeners simultaneously: |
| |
| .. code:: python |
| |
| def job_filter(job_id: str) -> EventFilter: |
| def filter(event: Message) -> bool: |
| return event['data']['id'] == job_id |
| return filter |
| |
| jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA')) |
| jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB')) |
| |
| with qmp.listen(jobA, jobB): |
| qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...}) |
| qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...}) |
| |
| async for event in jobA.get(): |
| if event['data']['status'] == 'concluded': |
| break |
| async for event in jobB.get(): |
| if event['data']['status'] == 'concluded': |
| break |
| |
| |
| Extending the `EventListener` class |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| In the case that a more specialized `EventListener` is desired to |
| provide either more functionality or more compact syntax for specialized |
| cases, it can be extended. |
| |
| One of the key methods to extend or override is |
| :py:meth:`~EventListener.accept()`. The default implementation checks an |
| incoming message for: |
| |
| 1. A qualifying name, if any :py:obj:`~EventListener.names` were |
| specified at initialization time |
| 2. That :py:obj:`~EventListener.event_filter()` returns True. |
| |
| This can be modified however you see fit to change the criteria for |
| inclusion in the stream. |
| |
| For convenience, a ``JobListener`` class could be created that simply |
| bakes in configuration so it does not need to be repeated: |
| |
| .. code:: python |
| |
| class JobListener(EventListener): |
| def __init__(self, job_id: str): |
| super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', |
| 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', |
| 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) |
| self.job_id = job_id |
| |
| def accept(self, event) -> bool: |
| if not super().accept(event): |
| return False |
| if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'): |
| return event['data']['id'] == job_id |
| return event['data']['device'] == job_id |
| |
| From here on out, you can conjure up a custom-purpose listener that |
| listens only for job-related events for a specific job-id easily: |
| |
| .. code:: python |
| |
| listener = JobListener('job4') |
| with qmp.listener(listener): |
| await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...}) |
| async for event in listener: |
| print(event) |
| if event['event'] == 'BLOCK_JOB_COMPLETED': |
| break |
| |
| |
| Experimental Interfaces & Design Issues |
| --------------------------------------- |
| |
| These interfaces are not ones I am sure I will keep or otherwise modify |
| heavily. |
| |
| qmp.listener()’s type signature |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| `listener()` does not return anything, because it was assumed the caller |
| already had a handle to the listener. However, for |
| ``qmp.listener(EventListener())`` forms, the caller will not have saved |
| a handle to the listener. |
| |
| Because this function can accept *many* listeners, I found it hard to |
| accurately type in a way where it could be used in both “one” or “many” |
| forms conveniently and in a statically type-safe manner. |
| |
| Ultimately, I removed the return altogether, but perhaps with more time |
| I can work out a way to re-add it. |
| |
| |
| API Reference |
| ------------- |
| |
| """ |
| |
| import asyncio |
| from contextlib import contextmanager |
| import logging |
| from typing import ( |
| AsyncIterator, |
| Callable, |
| Iterable, |
| Iterator, |
| List, |
| Optional, |
| Set, |
| Tuple, |
| Union, |
| ) |
| |
| from .error import QMPError |
| from .message import Message |
| |
| |
| EventNames = Union[str, Iterable[str], None] |
| EventFilter = Callable[[Message], bool] |
| |
| |
| class ListenerError(QMPError): |
| """ |
| Generic error class for `EventListener`-related problems. |
| """ |
| |
| |
| class EventListener: |
| """ |
| Selectively listens for events with runtime configurable filtering. |
| |
| This class is designed to be directly usable for the most common cases, |
| but it can be extended to provide more rigorous control. |
| |
| :param names: |
| One or more names of events to listen for. |
| When not provided, listen for ALL events. |
| :param event_filter: |
| An optional event filtering function. |
| When names are also provided, this acts as a secondary filter. |
| |
| When ``names`` and ``event_filter`` are both provided, the names |
| will be filtered first, and then the filter function will be called |
| second. The event filter function can assume that the format of the |
| event is a known format. |
| """ |
| def __init__( |
| self, |
| names: EventNames = None, |
| event_filter: Optional[EventFilter] = None, |
| ): |
| # Queue of 'heard' events yet to be witnessed by a caller. |
| self._queue: 'asyncio.Queue[Message]' = asyncio.Queue() |
| |
| # Intended as a historical record, NOT a processing queue or backlog. |
| self._history: List[Message] = [] |
| |
| #: Primary event filter, based on one or more event names. |
| self.names: Set[str] = set() |
| if isinstance(names, str): |
| self.names.add(names) |
| elif names is not None: |
| self.names.update(names) |
| |
| #: Optional, secondary event filter. |
| self.event_filter: Optional[EventFilter] = event_filter |
| |
| @property |
| def history(self) -> Tuple[Message, ...]: |
| """ |
| A read-only history of all events seen so far. |
| |
| This represents *every* event, including those not yet witnessed |
| via `get()` or ``async for``. It persists between `clear()` |
| calls and is immutable. |
| """ |
| return tuple(self._history) |
| |
| def accept(self, event: Message) -> bool: |
| """ |
| Determine if this listener accepts this event. |
| |
| This method determines which events will appear in the stream. |
| The default implementation simply checks the event against the |
| list of names and the event_filter to decide if this |
| `EventListener` accepts a given event. It can be |
| overridden/extended to provide custom listener behavior. |
| |
| User code is not expected to need to invoke this method. |
| |
| :param event: The event under consideration. |
| :return: `True`, if this listener accepts this event. |
| """ |
| name_ok = (not self.names) or (event['event'] in self.names) |
| return name_ok and ( |
| (not self.event_filter) or self.event_filter(event) |
| ) |
| |
| async def put(self, event: Message) -> None: |
| """ |
| Conditionally put a new event into the FIFO queue. |
| |
| This method is not designed to be invoked from user code, and it |
| should not need to be overridden. It is a public interface so |
| that `QMPClient` has an interface by which it can inform |
| registered listeners of new events. |
| |
| The event will be put into the queue if |
| :py:meth:`~EventListener.accept()` returns `True`. |
| |
| :param event: The new event to put into the FIFO queue. |
| """ |
| if not self.accept(event): |
| return |
| |
| self._history.append(event) |
| await self._queue.put(event) |
| |
| async def get(self) -> Message: |
| """ |
| Wait for the very next event in this stream. |
| |
| If one is already available, return that one. |
| """ |
| return await self._queue.get() |
| |
| def empty(self) -> bool: |
| """ |
| Return `True` if there are no pending events. |
| """ |
| return self._queue.empty() |
| |
| def clear(self) -> List[Message]: |
| """ |
| Clear this listener of all pending events. |
| |
| Called when an `EventListener` is being unregistered, this clears the |
| pending FIFO queue synchronously. It can be also be used to |
| manually clear any pending events, if desired. |
| |
| :return: The cleared events, if any. |
| |
| .. warning:: |
| Take care when discarding events. Cleared events will be |
| silently tossed on the floor. All events that were ever |
| accepted by this listener are visible in `history()`. |
| """ |
| events = [] |
| while True: |
| try: |
| events.append(self._queue.get_nowait()) |
| except asyncio.QueueEmpty: |
| break |
| |
| return events |
| |
| def __aiter__(self) -> AsyncIterator[Message]: |
| return self |
| |
| async def __anext__(self) -> Message: |
| """ |
| Enables the `EventListener` to function as an async iterator. |
| |
| It may be used like this: |
| |
| .. code:: python |
| |
| async for event in listener: |
| print(event) |
| |
| These iterators will never terminate of their own accord; you |
| must provide break conditions or otherwise prepare to run them |
| in an `asyncio.Task` that can be cancelled. |
| """ |
| return await self.get() |
| |
| |
| class Events: |
| """ |
| Events is a mix-in class that adds event functionality to the QMP class. |
| |
| It's designed specifically as a mix-in for `QMPClient`, and it |
| relies upon the class it is being mixed into having a 'logger' |
| property. |
| """ |
| def __init__(self) -> None: |
| self._listeners: List[EventListener] = [] |
| |
| #: Default, all-events `EventListener`. |
| self.events: EventListener = EventListener() |
| self.register_listener(self.events) |
| |
| # Parent class needs to have a logger |
| self.logger: logging.Logger |
| |
| async def _event_dispatch(self, msg: Message) -> None: |
| """ |
| Given a new event, propagate it to all of the active listeners. |
| |
| :param msg: The event to propagate. |
| """ |
| for listener in self._listeners: |
| await listener.put(msg) |
| |
| def register_listener(self, listener: EventListener) -> None: |
| """ |
| Register and activate an `EventListener`. |
| |
| :param listener: The listener to activate. |
| :raise ListenerError: If the given listener is already registered. |
| """ |
| if listener in self._listeners: |
| raise ListenerError("Attempted to re-register existing listener") |
| self.logger.debug("Registering %s.", str(listener)) |
| self._listeners.append(listener) |
| |
| def remove_listener(self, listener: EventListener) -> None: |
| """ |
| Unregister and deactivate an `EventListener`. |
| |
| The removed listener will have its pending events cleared via |
| `clear()`. The listener can be re-registered later when |
| desired. |
| |
| :param listener: The listener to deactivate. |
| :raise ListenerError: If the given listener is not registered. |
| """ |
| if listener == self.events: |
| raise ListenerError("Cannot remove the default listener.") |
| self.logger.debug("Removing %s.", str(listener)) |
| listener.clear() |
| self._listeners.remove(listener) |
| |
| @contextmanager |
| def listen(self, *listeners: EventListener) -> Iterator[None]: |
| r""" |
| Context manager: Temporarily listen with an `EventListener`. |
| |
| Accepts one or more `EventListener` objects and registers them, |
| activating them for the duration of the context block. |
| |
| `EventListener` objects will have any pending events in their |
| FIFO queue cleared upon exiting the context block, when they are |
| deactivated. |
| |
| :param \*listeners: One or more EventListeners to activate. |
| :raise ListenerError: If the given listener(s) are already active. |
| """ |
| _added = [] |
| |
| try: |
| for listener in listeners: |
| self.register_listener(listener) |
| _added.append(listener) |
| |
| yield |
| |
| finally: |
| for listener in _added: |
| self.remove_listener(listener) |
| |
| @contextmanager |
| def listener( |
| self, |
| names: EventNames = (), |
| event_filter: Optional[EventFilter] = None |
| ) -> Iterator[EventListener]: |
| """ |
| Context manager: Temporarily listen with a new `EventListener`. |
| |
| Creates an `EventListener` object and registers it, activating |
| it for the duration of the context block. |
| |
| :param names: |
| One or more names of events to listen for. |
| When not provided, listen for ALL events. |
| :param event_filter: |
| An optional event filtering function. |
| When names are also provided, this acts as a secondary filter. |
| |
| :return: The newly created and active `EventListener`. |
| """ |
| listener = EventListener(names, event_filter) |
| with self.listen(listener): |
| yield listener |