Source code for vidhubcontrol.discovery

import asyncio
from typing import (
    List, Tuple, Dict, Union, Optional, Any, Callable, Coroutine, Awaitable,
)
import ipaddress
import platform
from loguru import logger

from pydispatch import Dispatcher, Property
from pydispatch.properties import DictProperty


try:
    import zeroconf
    ZEROCONF_AVAILABLE = True
except ImportError: # pragma: no cover
    zeroconf = None
    ZEROCONF_AVAILABLE = False

if ZEROCONF_AVAILABLE:
    from zeroconf.asyncio import AsyncZeroconf
    from zeroconf import IPVersion


from vidhubcontrol.utils import find_ip_addresses

PUBLISH_TTL = 60

StrOrBytes = Union[str, bytes]
AddressLike = Union[StrOrBytes, ipaddress.IPv4Address]
CoroFunc = Callable[[Any, Any], Coroutine]

def convert_bytes_dict(d: Dict[bytes, bytes]) -> Dict[str, str]:
    return {str(k, 'UTF-8'):str(d[k], 'UTF-8') for k in d.keys()}

def convert_dict_bytes(d: Dict[StrOrBytes, StrOrBytes]) -> Dict[bytes, bytes]:
    r = {}
    for key, val in d.items():
        if not isinstance(key, bytes):
            key = key.encode()
        if not isinstance(val, bytes):
            val = val.encode()
        r[key] = val
    return r

def pack_ip_address(ip: AddressLike) -> bytes:
    if isinstance(ip, bytes):
        return ip
    if isinstance(ip, ipaddress.IPv4Interface):
        ip = ip.ip.packed
    elif isinstance(ip, ipaddress.IPv4Address):
        ip = ip.packed
    elif isinstance(val, str):
        ip = ipaddress.ip_address(val).packed
    return ip

def unpack_ip_address(ip: AddressLike) -> ipaddress.IPv4Address:
    if isinstance(ip, ipaddress.IPv4Interface):
        return ip.ip
    return ipaddress.ip_address(ip)

def run_on_loop(
    coro: CoroFunc,
    dest_loop: asyncio.BaseEventLoop,
    cur_loop: Optional[asyncio.BaseEventLoop] = None,
    create_task: Optional[bool] = True
) -> Awaitable:
    if cur_loop is None:
        try:
            cur_loop = asyncio.get_event_loop()
        except RuntimeError:
            cur_loop = None
    is_same_loop = cur_loop is dest_loop
    if not is_same_loop:
        fut = asyncio.run_coroutine_threadsafe(coro, loop=dest_loop)
    elif create_task:
        fut = asyncio.ensure_future(coro)
    else:
        fut = coro
    return fut

[docs]class ServiceInfo(Dispatcher): """Container for Zeroconf service information Closely related to :class:`zeroconf.ServiceInfo` Attributes: type (str): Fully qualified service type name (str): Fully qualified service name server (str): Fully qualified name for service host (defaults to :attr:`name`) addresses: The service ip address port (int): The service port properties: Custom properties for the service """ properties: Dict[str, str] = DictProperty() _direct_attrs = ['name', 'server', 'port', 'properties'] def __init__( self, type_: str, name: str, server: Optional[str] = None, port: Optional[int] = None, addresses: Optional[List[AddressLike]] = None, properties: Optional[Dict] = None, ttl: Optional[int] = None, ) -> None: self.type: str = type_ self.name: str = name self.server: Optional[str] = server self.port: Optional[int] = port if addresses is None: addresses = [] self.addresses: List[AddressLike] = addresses if properties is not None: self.properties = properties self.ttl: Optional[int] = ttl @property def address(self) -> Optional[ipaddress.IPv4Address]: """The first element of :attr:`addresses` """ if not len(self.addresses): raise ValueError(f'{self!r} has no addresses') return self.addresses[0]
[docs] @classmethod def from_zc_info(cls, info: 'zeroconf.ServiceInfo') -> 'ServiceInfo': """Creates an instance from a :class:`zeroconf.ServiceInfo` object Arguments: info (:class:`zeroconf.ServiceInfo`): Returns: An instance of :class:`ServiceInfo` """ kwargs = {k:getattr(info, k) for k in cls._direct_attrs} kwargs['type_'] = info.type addresses = info.parsed_addresses(IPVersion.V4Only) kwargs['addresses'] = [unpack_ip_address(addr) for addr in addresses] kwargs['properties'] = convert_bytes_dict(info.properties) kwargs['ttl'] = info.host_ttl return cls(**kwargs)
@property def id(self) -> Tuple[str, str]: """Unique id for the service as a ``tuple`` of (:attr:`type`, :attr:`name`) """ return (self.type, self.name)#, self.address, self.port)
[docs] def to_zc_info(self) -> 'zeroconf.ServiceInfo': """Creates a copy as an instance of :class:`zeroconf.ServiceInfo` """ kwargs = {k:getattr(self, k) for k in self._direct_attrs} kwargs['type_'] = self.type kwargs['addresses'] = [pack_ip_address(addr) for addr in self.addresses] kwargs['properties'] = convert_dict_bytes(self.properties) if self.ttl is not None: kwargs['host_ttl'] = self.ttl return zeroconf.ServiceInfo(**kwargs)
[docs] def update(self, other: 'ServiceInfo'): """Updates the :attr:`properties` from another :class:`ServiceInfo` instance """ assert other.id == self.id self.properties = other.properties.copy() self.addresses = other.addresses.copy() self.server = other.server self.port = other.port
def __hash__(self): return hash(self.id) def __eq__(self, other: 'ServiceInfo'): return self.id == other.id def __repr__(self): return '<{self.__class__.__name__}> {self}'.format(self=self) def __str__(self): if not len(self.addresses): addr = None else: addr = self.addresses[0] return f'{self.name}: {self.type} ({addr}:{self.port}), properties={self.properties}'
[docs]class Message(object): """A message to communicate actions to and from :class:`Listener` Attributes: info: The :class:`ServiceInfo` related to the message Note: This class and its subclasses are not meant to be used directly. They are used internally in :class:`Listener` methods. """ info: ServiceInfo __slots__ = ('info',) def __init__(self, info: ServiceInfo): self.info: ServiceInfo = info def __repr__(self): return str(self) def __str__(self): return '{self.__class__.__name__}: {self.info}'.format(self=self)
[docs]class BrowserMessage(Message): pass
[docs]class AddedMessage(BrowserMessage): pass
[docs]class RemovedMessage(BrowserMessage): pass
[docs]class UpdateMessage(BrowserMessage): pass
[docs]class RegistrationMessage(Message): pass
[docs]class PublishMessage(RegistrationMessage): pass
[docs]class RepublishMessage(RegistrationMessage): pass
[docs]class UnPublishMessage(RegistrationMessage): pass
[docs]class Listener(Dispatcher): """An async zeroconf service listener Allows async communication with :class:`zeroconf.Zeroconf` through :meth:`asyncio.AbstractEventLoop.run_in_executor` calls. Arguments: mainloop (:class:`asyncio.BaseEventLoop`): asyncio event loop instance service_type (str): The fully qualified service type name to subscribe to Attributes: services: All services currently discovered as instances of :class:`ServiceInfo`. Stored using :attr:`ServiceInfo.id` as keys message_queue: Used to communicate actions and events with instances of :class:`Message` published_services: Stores services that have been published using :meth:`publish_service` as :class:`ServiceInfo` instances. """ _events_ = ['service_added', 'service_updated', 'service_removed'] services: Dict[str, ServiceInfo] = DictProperty() message_queue: asyncio.Queue published_services: Dict[str, ServiceInfo] def __init__(self, mainloop, service_type): self.mainloop = mainloop self.service_type = service_type self.running = False self.stopped = asyncio.Event() self.message_queue = asyncio.Queue() self._service_info_lock = asyncio.Lock() self.zeroconf = None self.published_services = {}
[docs] async def start(self): """Starts the service listener """ if self.running: return self.running = True logger.debug('Discovery starting...') self.run_zeroconf() self.run_future = asyncio.ensure_future(self.run()) logger.debug('Discovery started')
[docs] @logger.catch async def run(self): """Main loop for communicating with :class:`zeroconf.Zeroconf` Waits for messages on the :attr:`message_queue` and processes them. The loop will exit if an object placed on the queue is not an instance of :class:`Message`. """ async def handle_service_registration(msg: RegistrationMessage): if not ZEROCONF_AVAILABLE: return zc_info = msg.info.to_zc_info() if isinstance(msg, PublishMessage): coro = self.async_zeroconf.async_register_service(zc_info) elif isinstance(msg, RepublishMessage): coro = self.async_zeroconf.async_update_service(zc_info) else: coro = self.async_zeroconf.async_unregister_service(zc_info) await coro while self.running: msg = await self.message_queue.get() if not isinstance(msg, Message): self.message_queue.task_done() break logger.debug(f'Handling "{msg!r}"') if isinstance(msg, RegistrationMessage): await handle_service_registration(msg) elif isinstance(msg, AddedMessage): await self.add_service_info(msg.info) elif isinstance(msg, RemovedMessage): await self.remove_service_info(msg.info) elif isinstance(msg, UpdateMessage): await self.update_service_info(msg.info) logger.debug('Message handled') self.message_queue.task_done()
[docs] async def stop(self): """Stops the service listener """ if not self.running: return self.running = False logger.debug('Discovery stopping...') await self.message_queue.put(None) await self.run_future await self.stop_zeroconf() self.stopped.set() logger.debug('Discovery stopped')
[docs] def run_zeroconf(self): """Starts :class:`zeroconf.Zeroconf` and :class:`zeroconf.ServiceBrowser` instances """ if not ZEROCONF_AVAILABLE: return self.async_zeroconf = AsyncZeroconf() self.zeroconf = self.async_zeroconf.zeroconf self.zeroconf.listener = self self.browser = zeroconf.ServiceBrowser(self.zeroconf, self.service_type, self)
[docs] async def stop_zeroconf(self): """Closes the :class:`zeroconf.Zeroconf` instance """ if self.zeroconf is None: return zc, a_zc = self.zeroconf, self.async_zeroconf self.zeroconf = None self.async_zeroconf = None await a_zc.async_close()
[docs] async def add_message(self, msg: Message): """Adds a message to the :attr:`message_queue` Arguments: msg (:class:`Message`): Message to send """ await self.message_queue.put(msg)
async def add_service_info(self, info: ServiceInfo, **kwargs): async with self._service_info_lock: if info.id in self.services: raise ValueError(f'Service "{info}" already discovered') self.services[info.id] = info self.emit('service_added', info, **kwargs) async def update_service_info(self, info: ServiceInfo, **kwargs): async with self._service_info_lock: if info.id not in self.services: self.services[info.id] = info self.emit('service_added', info, **kwargs) return cur = self.services[info.id] cur.update(info) self.emit('service_updated', info, **kwargs) async def remove_service_info(self, info: ServiceInfo, **kwargs): async with self._service_info_lock: if info.id not in self.services: return del self.services[info.id] self.emit('service_removed', info, **kwargs) def add_service(self, zc: 'zeroconf.Zeroconf', type_: str, name: str): if self.running: info = zc.get_service_info(type_, name) info = ServiceInfo.from_zc_info(info) msg = AddedMessage(info) run_on_loop(self.add_message(msg), self.mainloop) def remove_service(self, zc: 'zeroconf.Zeroconf', type_: str, name: str): if self.running: info = ServiceInfo(type_=type_, name=name) msg = RemovedMessage(info) run_on_loop(self.add_message(msg), self.mainloop) def update_service(self, zc: 'zeroconf.Zeroconf', type_: str, name: str): if self.running: info = zc.get_service_info(type_, name) if info is None: self.remove_service(zc, type_, name) return info = ServiceInfo.from_zc_info(info) msg = UpdateMessage(info) run_on_loop(self.add_message(msg), self.mainloop) async def get_local_ifaces(self, refresh: Optional[bool] = False) -> List[ipaddress.IPv4Interface]: ifaces = getattr(self, '_local_ifaces', None) if ifaces is not None and not refresh: return ifaces ifaces = self._local_ifaces = [iface for iface_name, iface in find_ip_addresses()] return ifaces async def get_local_hostname(self) -> str: name = getattr(self, '_local_hostname', None) if name is not None: return name name = None for iface in await self.get_local_ifaces(): _name, srv = await self.mainloop.getnameinfo((str(iface.ip), 80)) if _name is None or _name == 'localhost': continue if _name == str(iface.ip): continue if len(_name) > 63: continue if name is None: name = platform.node() if not name: name = 'localhost' self._local_hostname = name return name
[docs] async def publish_service( self, type_: str, port: int, name: Optional[str] = None, addresses: Optional[AddressLike] = None, properties: Optional[Dict] = None, ttl: Optional[int] = PUBLISH_TTL ): """Publishes a service on the network Arguments: type_ (str): Fully qualified service type port (int): The service port name (str, optional): Fully qualified service name. If not provided, this will be generated from the ``type_`` and the hostname detected by :meth:`get_local_hostname` addresses (optional): If provided, an ``iterable`` of IP addresses to publish. Can be :class:`ipaddress.IPv4Address` or any type that can be parsed by :func:`ipaddress.ip_address` properties (dict, optional): Custom properties for the service ttl (int, optional): The TTL value to publish. Defaults to :const:`PUBLISH_TTL` """ hostname = await self.get_local_hostname() if name is None: name = '.'.join([hostname, type_]) if addresses is None: addresses = await self.get_local_ifaces() if properties is None: properties = {} if addresses is None: addresses = [] addresses = [unpack_ip_address(addr) for addr in addresses] info = ServiceInfo( type_=type_, port=port, name=name, properties=properties, ttl=ttl, addresses=addresses, ) if info.id in self.published_services: raise ValueError(f'Service already published: {info!r}') self.published_services[info.id] = info msg = PublishMessage(info) await run_on_loop(self.add_message(msg), self.mainloop)
[docs] async def republish_service( self, type_: str, port: int, name: Optional[str] = None, addresses: Optional[AddressLike] = None, properties: Optional[Dict] = None, ttl: Optional[int] = PUBLISH_TTL ): """Update an existing :class:`ServiceInfo` and republish it """ hostname = await self.get_local_hostname() if name is None: name = '.'.join([hostname, type_]) service_id = (type_, name) if service_id not in self.published_services: raise KeyError(f'Service "{service_id}" does not exist') info = self.published_services[service_id] info.port = port if addresses is not None: addresses = [unpack_ip_address(addr) for addr in addresses] info.addresses = addresses if properties is not None: info.properties = properties info.ttl = ttl msg = RepublishMessage(info) await run_on_loop(self.add_message(msg), self.mainloop)
[docs] async def unpublish_service(self, type_: str, name: Optional[str] = None): """Removes a service published through :meth:`publish_service` Arguments: type_ (str): Fully qualified service type name (str, optional): Fully qualified service name. If not provided, this will be generated from the ``type_`` and the hostname detected by :meth:`get_local_hostname` """ hostname = await self.get_local_hostname() if name is None: name = '.'.join([hostname, type_]) service_id = (type_, name) if service_id not in self.published_services: raise KeyError(f'Service "{service_id}" does not exist') info = self.published_services[service_id] msg = UnPublishMessage(info) del self.published_services[service_id] await run_on_loop(self.add_message(msg), self.mainloop)
[docs]class BMDDiscovery(Listener): """Zeroconf listener for Blackmagic devices Attributes: vidhubs: Contains discovered Videohub devices. This :class:`~pydispatch.properties.DictProperty` can be used to subscribe to changes. smart_views: Contains discovered SmartView devices. This :class:`~pydispatch.properties.DictProperty` can be used to subscribe to changes. smart_scopes: Contains discovered SmartScope devices. This :class:`~pydispatch.properties.DictProperty` can be used to subscribe to changes. """ vidhubs: Dict[str, ServiceInfo] = DictProperty() smart_views: Dict[str, ServiceInfo] = DictProperty() smart_scopes: Dict[str, ServiceInfo] = DictProperty() _events_ = ['bmd_service_added', 'bmd_service_updated', 'bmd_service_removed'] def __init__(self, mainloop, service_type='_blackmagic._tcp.local.'): super().__init__(mainloop, service_type) self.bind_async( mainloop, service_added=self._add_bmd_service_info, service_updated=self._update_bmd_service_info, service_removed=self._remove_bmd_service_info, ) async def _add_bmd_service_info(self, info: ServiceInfo, **kwargs): device_cls = info.properties['class'] bmd_id = info.properties['unique id'].upper() kwargs.update({'class':device_cls, 'id':bmd_id}) async with self._service_info_lock: if device_cls == 'Videohub': assert bmd_id not in self.vidhubs self.vidhubs[bmd_id] = info device_type = 'vidhub' elif device_cls == 'SmartView': if 'SmartScope' in info.properties['name']: assert bmd_id not in self.smart_scopes self.smart_scopes[bmd_id] = info device_type = 'smartscope' else: assert bmd_id not in self.smart_views self.smart_views[bmd_id] = info device_type = 'smartview' kwargs['device_type'] = device_type self.emit('bmd_service_added', info, **kwargs) async def _update_bmd_service_info(self, info: ServiceInfo, **kwargs): device_cls = info.properties['class'] bmd_id = info.properties['unique id'].upper() kwargs.update({'class':device_cls, 'id':bmd_id}) async with self._service_info_lock: if bmd_id in self.vidhubs: device_type = 'vidhub' o = self.vidhubs[bmd_id] elif bmd_id in self.smart_scopes: device_type = 'smartscope' o = self.smart_scopes[bmd_id] elif bmd_id in self.smart_views: device_type = 'smartview' o = self.smart_views[bmd_id] else: raise KeyError(f'Cannot find entry for "{info!r}"') assert o.id == info.id kwargs['device_type'] = device_type self.emit('bmd_service_updated', info, **kwargs) async def _remove_bmd_service_info(self, info: ServiceInfo, **kwargs): device_cls = info.properties.get('class') bmd_id = info.properties.get('unique id', '').upper() async with self._service_info_lock: if bmd_id in self.vidhubs and device_cls == 'Videohub': del self.vidhubs[bmd_id] kwargs.update({'class':device_cls, 'id':bmd_id, 'device_type':'vidhub'}) elif bmd_id in self.smart_views and device_cls == 'SmartView': del self.smart_views[bmd_id] kwargs.update({'class':device_cls, 'id':bmd_id, 'device_type':'smartview'}) elif bmd_id in self.smart_scopes and device_cls == 'SmartView': del self.smart_scopes[bmd_id] kwargs.update({'class':device_cls, 'id':bmd_id, 'device_type':'smartscope'}) self.emit('bmd_service_removed', info, **kwargs)
def main(): loop = asyncio.get_event_loop() loop.set_debug(True) listener = BMDDiscovery(loop) def on_service_added(info, **kwargs): logger.info('Added: {}'.format(info)) def on_service_removed(info, **kwargs): logger.info('Removed: {}'.format(info)) listener.bind(service_added=on_service_added, service_removed=on_service_removed) async def run(): await listener.start() await asyncio.sleep(5) logger.info(str(listener.services)) await listener.stop() loop.run_until_complete(run()) return listener if __name__ == '__main__': main()