Source code for vidhubcontrol.common

import enum
import asyncio
from typing import Optional, Any, Union
from loguru import logger

from pydispatch import Dispatcher

[docs]class ConnectionState(enum.IntFlag): r"""Enum to describe various connection states Members may be combined using bitwise operators (&, \|, ^, ~) """ not_connected = 1 """Indicates there is no connection and no connection attempts are being made """ connecting = 2 """Indicates an attempt to connect is being made """ disconnecting = 4 """Indicates the connection is being closed """ connected = 8 """Indicates the connection is active """ failure = 16 """Indicates an error occured """ waiting = connecting | disconnecting """Indicates the connection is either :attr:`connecting` or :attr:`disconnecting` """ @property def is_compound(self) -> bool: """This will evaluate to True for states combined using bitwise operators """ return self.name is None @property def is_connected(self) -> bool: """Convenience property evaluating as True if ``self == ConnectionState.connected`` """ return self == ConnectionState.connected
[docs] @classmethod def from_str(cls, s: str) -> 'ConnectionState': r"""Create a :class:`ConnectionState` member by name(s) Combined states can be created by separating their names with a "\|" >>> from vidhubcontrol.common import ConnectionState >>> ConnectionState.connected | ConnectionState.not_connected <ConnectionState.connected|not_connected: 9> >>> ConnectionState.disconnecting | ConnectionState.failure <ConnectionState.failure|disconnecting: 20> >>> # This combination is already defined as "waiting" >>> ConnectionState.connecting | ConnectionState.disconnecting <ConnectionState.waiting: 6> """ if '|' in s: result = None for name in s.split('|'): if result is None: result = cls.from_str(name) else: result |= cls.from_str(name) return result s = s.lower() return getattr(cls, s)
def __str__(self): if self.is_compound: return '|'.join((obj.name for obj in self)) return self.name def __format__(self, format_spec): if format_spec == '': return str(self) return format(self.value, format_spec) def __iter__(self): for member in ConnectionState: if member in self: yield member
StrOrState = Union[ConnectionState, str]
[docs]class ConnectionManager(Dispatcher): """A manager for tracking and waiting for :class:`connection states <ConnectionState>` A :class:`asyncio.Condition` is used to to notify any waiting tasks of changes to :attr:`state`. This requires the underlying lock to be :meth:`acquired <acquire>` before calling any of the waiter or setter methods and :meth:`released <release>` afterwards. This class supports the asynchronous context manager protocol for use in :keyword:`async with` statements. :Events: .. function:: state_changed(self: ConnectionManager, state: ConnectionState) Emitted when the value of :attr:`state` has changed """ failure_reason: Optional[str] """A message describing errors (if encountered)""" failure_exception: Optional[Exception] """The :class:`Exception` raised if an error occured""" _events_ = ['state_changed'] def __init__(self, initial: Optional[ConnectionState] = ConnectionState.not_connected): self.__state = initial self._lock = asyncio.Lock() self._condition = asyncio.Condition(self._lock) self.failure_reason = None self.failure_exception = None @property def state(self) -> ConnectionState: """The current state """ return self.__state
[docs] async def set_state(self, state: StrOrState): """Set the :attr:`state` to the given value The *state* argument may be either a :class:`ConnectionState` member or a string. (see :meth:`ConnectionState.from_str`) Raises: RuntimeError: If the lock is not :meth:`acquired <acquire>` before calling this method """ await self._set_state(state)
async def _set_state(self, state: StrOrState): if isinstance(state, str): state = ConnectionState.from_str(state) changed = False if ConnectionState.failure in self.state: if state & (ConnectionState.connecting | ConnectionState.connected): state &= ~ConnectionState.failure self.failure_reason = None self.failure_exception = None else: state |= ConnectionState.failure if state != self.state: changed = True self.__state = state self._condition.notify_all() if changed: self.emit('state_changed', self, self.state)
[docs] async def set_failure( self, reason: Any, exc: Optional[Exception] = None, state: Optional[StrOrState] = ConnectionState.disconnecting | ConnectionState.failure ): """Set :attr:`state` to indicate a failure Arguments: reason: A description of the failure exc: The Exception that caused the failure (if available) state: The new state to set. Must include :attr:`ConnectionState.failure` Raises: RuntimeError: If the lock is not :meth:`acquired <acquire>` before calling this method """ await self._set_failure(reason, exc, state)
async def _set_failure( self, reason: Any, exc: Optional[Exception] = None, state: Optional[StrOrState] = ConnectionState.disconnecting | ConnectionState.failure ): if isinstance(state, str): state = ConnectionState.from_str(state) assert ConnectionState.failure in state self.__state = state self.failure_reason = reason self.failure_exception = exc self._condition.notify_all() self.emit('state_changed', self, self.state)
[docs] async def wait(self, timeout: Optional[float] = None) -> ConnectionState: """Block until the next time :attr:`state` changes and return the value Arguments: timeout: If given, the number of seconds to wait. Otherwise, this will wait indefinitely Raises: asyncio.TimeoutError: If *timeout* is given and no state changes occured RuntimeError: If the lock is not :meth:`acquired <acquire>` before calling this method """ return await self._wait(timeout)
async def _wait(self, timeout: Optional[float] = None) -> ConnectionState: coro = self._condition.wait() if timeout is not None: await asyncio.wait_for(coro, timeout) else: await coro return self.state
[docs] async def wait_for( self, state: StrOrState, timeout: Optional[float] = None ) -> ConnectionState: """Wait for a specific state The *state* argument may be a :class:`ConnectionState` member or string as described in :meth:`ConnectionState.from_str`. If the given state is :attr:`compound <ConnectionState.is_compound>` or the :attr:`state` is set as compound, this will wait until all members from the *state* argument are contained within the :attr:`state` value. Arguments: state: The state to wait for timeout: If given, the number of seconds to wait. Otherwise, this will wait indefinitely Raises: asyncio.TimeoutError: If *timeout* is given and no matching state changes were found RuntimeError: If the lock is not :meth:`acquired <acquire>` before calling this method """ return await self._wait_for(state, timeout)
async def _wait_for( self, state: StrOrState, timeout: Optional[float] = None ) -> ConnectionState: if isinstance(state, str): state = ConnectionState.from_str(state) def predicate(): if state.is_compound or self.state.is_compound: return state & self.state != 0 return self.state == state if predicate(): return self.state coro = self._condition.wait_for(predicate) if timeout is not None: coro = asyncio.wait_for(coro, timeout) await coro result = self.state if state.is_compound: return state & result return result
[docs] async def wait_for_established( self, timeout: Optional[float] = None ) -> ConnectionState: """Wait for either a success (:attr:`ConnectionState.connected`) or failure (:attr:`ConnectionState.failure`) Arguments: timeout: If given, the number of seconds to wait. Otherwise, this will wait indefinitely Raises: asyncio.TimeoutError: If *timeout* is given and no matching state changes were found RuntimeError: If the lock is not :meth:`acquired <acquire>` before calling this method """ return await self._wait_for_established(timeout)
async def _wait_for_established( self, timeout: Optional[float] = None ) -> ConnectionState: state = ConnectionState.connected | ConnectionState.disconnecting result = await self._wait_for(state, timeout) if result & (ConnectionState.failure | ConnectionState.disconnecting): result = await self._wait_for(ConnectionState.not_connected, timeout) return result
[docs] async def wait_for_disconnected( self, timeout: Optional[float] = None ) -> ConnectionState: """Wait for :attr:`ConnectionState.not_connected` Arguments: timeout: If given, the number of seconds to wait. Otherwise, this will wait indefinitely Raises: asyncio.TimeoutError: If *timeout* is given and no matching state changes were found RuntimeError: If the lock is not :meth:`acquired <acquire>` before calling this method """ return await self.wait_for(ConnectionState.not_connected, timeout)
[docs] async def syncronize(self, other: 'ConnectionManager'): """Copy the :attr:`state` and failure values of another :class:`ConnectionManager` Note: The lock must **not** be acquired before calling this method. """ async with other: async with self: self._syncronize(other)
def _syncronize(self, other: 'ConnectionManager'): changed = False for attr in ('failure_reason', 'failure_exception', 'state'): if getattr(self, attr) != getattr(other, attr): changed = True break if not changed: return self.failure_reason = other.failure_reason self.failure_exception = other.failure_exception self.__state = other.state self._condition.notify_all() self.emit('state_changed', self, self.state)
[docs] def locked(self) -> bool: """True if the lock is acquired """ return self._lock.locked()
[docs] async def acquire(self): """Acquire the lock This method blocks until the lock is unlocked, then sets it to locked and returns True. """ return await self._lock.acquire()
[docs] def release(self): """Release the lock Raises: RuntimeError: if called on an unlocked lock """ self._lock.release()
async def __aenter__(self): await self.acquire() return self async def __aexit__(self, *args): self.release() def __repr__(self): return f'<{self.__class__} at 0x{id(self):x}: {self}>' def __str__(self): return str(self.state)
[docs]class SyncronizedConnectionManager(ConnectionManager): """A connection manager that syncronizes itself with another """ def __init__(self, initial: Optional[ConnectionState] = ConnectionState.not_connected): super().__init__(initial) self.__other = None self._instance_lock = asyncio.Lock() @property def other(self) -> Optional[ConnectionManager]: """The manager currently being syncronized to """ return self.__other
[docs] async def set_other(self, other: Optional[ConnectionManager]): """Set the manager to syncronize with This binds to the :func:`state_changed` event of *other* and calls the :meth:`~ConnectionManager.syncronize` method whenever the state of the other manager changes. If ``None`` is given, :attr:`~ConnectionManager.state` is set to :attr:`~ConnectionState.not_connected` Note: The lock must *not* be acquired before calling this method """ async with self._instance_lock: cur = self.other if cur is other: return self.__other = other if cur is not None: cur.unbind(self) async with self: if other is None: await self.set_state(ConnectionState.not_connected) else: async with other: self._syncronize(other) loop = asyncio.get_event_loop() other.bind_async(loop, state_changed=self._on_other_state_changed)
async def _on_other_state_changed(self, instance, state, **kwargs): if self._instance_lock.locked() or self.other is not instance: return async with self._instance_lock: if self.other is not instance: return await self.syncronize(instance)