| from frozenlist import FrozenList | |
| __version__ = "1.3.2" | |
| __all__ = ("Signal",) | |
| class Signal(FrozenList): | |
| """Coroutine-based signal implementation. | |
| To connect a callback to a signal, use any list method. | |
| Signals are fired using the send() coroutine, which takes named | |
| arguments. | |
| """ | |
| __slots__ = ("_owner",) | |
| def __init__(self, owner): | |
| super().__init__() | |
| self._owner = owner | |
| def __repr__(self): | |
| return "<Signal owner={}, frozen={}, {!r}>".format( | |
| self._owner, self.frozen, list(self) | |
| ) | |
| async def send(self, *args, **kwargs): | |
| """ | |
| Sends data to all registered receivers. | |
| """ | |
| if not self.frozen: | |
| raise RuntimeError("Cannot send non-frozen signal.") | |
| for receiver in self: | |
| await receiver(*args, **kwargs) # type: ignore | |