Source code for vidhubcontrol.config

import os
import json
import asyncio
from typing import List, Dict, ClassVar
from loguru import logger

import jsonfactory
from pydispatch import Dispatcher, Property
from pydispatch.properties import ListProperty, DictProperty

from vidhubcontrol.common import ConnectionState, ConnectionManager, SyncronizedConnectionManager
from vidhubcontrol.discovery import BMDDiscovery
from vidhubcontrol.backends import (
    DummyBackend,
    SmartViewDummyBackend,
    SmartScopeDummyBackend,
    TelnetBackend,
    SmartViewTelnetBackend,
    SmartScopeTelnetBackend,
)


BACKENDS = {
    'vidhub':{cls.__name__:cls for cls in [DummyBackend, TelnetBackend]},
    'smartview':{cls.__name__:cls for cls in [SmartViewDummyBackend, SmartViewTelnetBackend]},
    'smartscope':{cls.__name__:cls for cls in [SmartScopeDummyBackend, SmartScopeTelnetBackend]},
}

class ConfigBase(Dispatcher):
    _conf_attrs = []
    _events_ = ['trigger_save']
    def _get_conf_data(self):
        d = {}
        for attr in self._conf_attrs:
            val = getattr(self, attr)
            if isinstance(val, ConfigBase):
                val = val._get_conf_data()
            d[attr] = val
        return d

[docs]class Config(ConfigBase): """Config store for devices Handles storage of device connection information and any user-defined values for the backends defined in the :doc:`backends module <backends>`. Data is stored in JSON format. During :meth:`start`, all previously stored devices will be loaded and begin communication. Devices are also discovered using `Zeroconf`_ through the :doc:`discovery module <discovery>`. Since each device has a unique id, network address changes (due to DHCP, etc) are handled appropriately. The configuration data is stored when: * A device is added or removed * A change is detected for a device's network address * Any user-defined device value changes (device name, presets, etc) The recommended method to start ``Config`` is through the :meth:`load_async` method. Example: .. code-block:: python import asyncio from vidhubcontrol.config import Config loop = asyncio.get_event_loop() conf = loop.run_until_complete(Config.load_async()) Keyword Arguments: filename (:obj:`str`, optional): Filename to load/save config data to. If not given, defaults to :attr:`DEFAULT_FILENAME` Attributes: vidhubs: A :class:`~pydispatch.properties.DictProperty` of :class:`VidhubConfig` instances using :attr:`~DeviceConfigBase.device_id` as keys smartviews: A :class:`~pydispatch.properties.DictProperty` of :class:`SmartViewConfig` instances using :attr:`~DeviceConfigBase.device_id` as keys smartscopes: A :class:`~pydispatch.properties.DictProperty` of :class:`SmartScopeConfig` instances using :attr:`~DeviceConfigBase.device_id` as keys all_devices: A :class:`~pydispatch.properties.DictProperty` containing all devices from :attr:`vidhubs`, :attr:`smartviews` and :attr:`smartscopes` .. autoattribute:: DEFAULT_FILENAME .. _Zeroconf: https://en.wikipedia.org/wiki/Zero-configuration_networking """ DEFAULT_FILENAME = '~/vidhubcontrol.json' USE_DISCOVERY = True connection_manager: ConnectionManager #: Connection manager vidhubs: Dict[str, 'VidhubConfig'] = DictProperty() smartviews: Dict[str, 'SmartViewConfig'] = DictProperty() smartscopes: Dict[str, 'SmartScopeConfig'] = DictProperty() all_devices: Dict[str, 'DeviceConfigBase'] = DictProperty() _conf_attrs = ['vidhubs', 'smartscopes', 'smartviews'] _device_type_map = { 'vidhub':{'prop':'vidhubs'}, 'smartview':{'prop':'smartviews'}, 'smartscope':{'prop':'smartscopes'}, } def __init__(self, **kwargs): self.start_kwargs = kwargs.copy() self.connection_manager = ConnectionManager() self.filename = kwargs.get('filename', self.DEFAULT_FILENAME) self.loop = asyncio.get_event_loop() self.discovery_listener = None self.discovery_lock = asyncio.Lock() @property def connection_state(self) -> ConnectionState: """The current :attr:`~.common.ConnectionManager.state` of the :attr:`connection_manager` """ return self.connection_manager.state def id_for_device(self, device): if not isinstance(device, DeviceConfigBase): prop = getattr(self, self._device_type_map[device.device_type]['prop']) obj = None for _obj in prop.values(): if _obj.backend is device: obj = _obj break if obj is None: raise Exception('Could not find device {!r}'.format(device)) else: device = obj if device.device_id is not None: return device.device_id return str(id(device)) async def _initialize_backends(self, **kwargs): """Creates and initializes device backends Keyword Arguments: vidhubs (dict): A ``dict`` containing the necessary data (as values) to create an instance of :class:`VidhubConfig` smartviews (dict): A ``dict`` containing the necessary data (as values) to create an instance of :class:`SmartViewConfig` smartscopes (dict): A ``dict`` containing the necessary data (as values) to create an instance of :class:`SmartScopeConfig` Note: All config object instances are created using the :meth:`DeviceConfigBase.create` classmethod. """ async def _init_backend(prop, cls, **okwargs): okwargs['config'] = self obj = await cls.create(**okwargs) device_id = obj.device_id if device_id is None: device_id = self.id_for_device(obj) prop[device_id] = obj assert device_id not in self.all_devices self.all_devices[device_id] = obj obj.bind( device_id=self.on_backend_device_id, trigger_save=self.on_device_trigger_save, ) tasks = [] for key, d in self._device_type_map.items(): items = kwargs.get(d['prop'], {}) prop = getattr(self, d['prop']) for item_data in items.values(): okwargs = item_data.copy() task = _init_backend(prop, d['cls'], **okwargs) tasks.append(task) if len(tasks): await asyncio.gather(*tasks)
[docs] async def start(self, **kwargs): """Starts the device backends and discovery routines Keyword arguments passed to the initialization will be used here, but can be overridden in this method. They will also be passed to :meth:`_initialize_backends`. """ manager = self.connection_manager async with manager: if ConnectionState.connecting in manager.state: await manager.wait_for('connected') if manager.state.is_connected: return await manager.set_state('connecting') logger.info('Config starting...') self.start_kwargs.update(kwargs) kwargs = self.start_kwargs await self._initialize_backends(**kwargs) if not self.USE_DISCOVERY: async with manager: await manager.set_state('connected') return assert self.discovery_listener is None self.discovery_listener = BMDDiscovery(self.loop) self.discovery_listener.bind_async( self.loop, bmd_service_added=self.on_discovery_service_added, bmd_service_updated=self.on_discovery_service_updated, ) await self.discovery_listener.start() async with manager: await manager.set_state('connected') logger.debug('Config started')
[docs] async def stop(self): """Stops all device backends and discovery routines """ async with self.connection_manager as manager: if ConnectionState.waiting in manager.state: await manager.wait_for('connected|not_connected') if ConnectionState.not_connected in manager.state: return await manager.set_state('disconnecting') logger.debug('Config stopping...') if self.discovery_listener is not None: await self.discovery_listener.stop() self.discovery_listener = None coros = set() for vidhub in self.vidhubs.values(): coros.add(vidhub.close()) for smartview in self.smartviews.values(): coros.add(smartview.close()) for smartscope in self.smartscopes.values(): coros.add(smartscope.close()) if len(coros): await asyncio.gather(*coros) async with self.connection_manager as manager: await manager.set_state('not_connected') logger.debug('Config stopped')
[docs] async def build_backend(self, device_type, backend_name, **kwargs): """Creates a "backend" instance The supplied keyword arguments are used to create the instance object which will be created using its :meth:`~vidhubcontrol.backends.base.BackendBase.create` classmethod. The appropriate subclass of :class:`DeviceConfigBase` will be created and stored to the config using :meth:`add_device`. Arguments: device_type (str): Device type to create. Choices are "vidhub", "smartview", "smartscope" backend_name (str): The class name of the backend as found in :doc:`backends` Returns: An instance of a :class:`vidhubcontrol.backends.base.BackendBase` subclass """ prop = getattr(self, self._device_type_map[device_type]['prop']) for obj in prop.values(): if obj.backend_name != backend_name: continue if kwargs.get('device_id') is not None and kwargs['device_id'] == obj.device_id: return obj.backend if kwargs.get('hostaddr') is not None and kwargs['hostaddr'] == obj.hostaddr: return obj.backend cls = BACKENDS[device_type][backend_name] kwargs['event_loop'] = self.loop backend = await cls.create_async(**kwargs) await self.add_device(backend) return backend
async def add_vidhub(self, backend): return await self.add_device(backend) async def add_smartview(self, backend): return await self.add_device(backend) async def add_smartscope(self, backend): return await self.add_device(backend)
[docs] async def add_device(self, backend): """Adds a "backend" instance to the config A subclass of :class:`DeviceConfigBase` will be either created or updated from the given backend instance. If the ``device_id`` exists in the config, the :attr:`DeviceConfigBase.backend` value of the matching :class:`DeviceConfigBase` instance will be set to the given ``backend``. Otherwise, a new :class:`DeviceConfigBase` instance will be created using the :meth:`DeviceConfigBase.from_existing` classmethod. Arguments: backend: An instance of one of the subclasses of :class:`vidhubcontrol.backends.base.BackendBase` found in :doc:`backends` """ device_type = backend.device_type cls = self._device_type_map[device_type]['cls'] prop = getattr(self, self._device_type_map[device_type]['prop']) if backend.device_id is not None and backend.device_id in prop: obj = prop[backend.device_id] obj.backend = backend else: obj = await cls.from_existing(backend, config=self) if obj.device_id is None: obj.device_id = self.id_for_device(obj) assert obj.device_id not in self.all_devices self.all_devices[obj.device_id] = obj prop[obj.device_id] = obj obj.bind( trigger_save=self.on_device_trigger_save, device_id=self.on_backend_device_id, ) self.save()
def on_backend_device_id(self, backend, value, **kwargs): if value is None: return old = kwargs.get('old') prop = getattr(self, self._device_type_map[backend.device_type]['prop']) if old in self.all_devices: del self.all_devices[old] if old in prop: del prop[old] if value in prop: self.save() return assert value not in self.all_devices self.all_devices[value] = backend prop[value] = backend self.save() async def add_discovered_device(self, device_type, info, device_id): manager = self.connection_manager async with manager: if manager.state & (ConnectionState.disconnecting | ConnectionState.not_connected): return if ConnectionState.connecting in manager.state: await manager.wait_for('connected') logger.debug(f'add_discovered_device: {device_type}, {info}, {device_id}') async with self.discovery_lock: prop = getattr(self, self._device_type_map[device_type]['prop']) cls = None for key, _cls in BACKENDS[device_type].items(): if 'Telnet' in key: cls = _cls break hostaddr = str(info.address) hostport = int(info.port) if device_id in prop: obj = prop[device_id] logger.debug(f'existing device: {obj!r}') if obj.hostaddr != hostaddr or obj.hostport != hostport: logger.debug('resetting hostaddr') await obj.reset_hostaddr(hostaddr, hostport) elif not obj.connection_state.is_connected: await obj.reconnect() return backend = await cls.create_async( hostaddr=hostaddr, hostport=hostport, event_loop=self.loop, ) if backend is None: return if backend.device_id != device_id: await backend.disconnect() return await self.add_device(backend) async def on_discovery_service_added(self, info, **kwargs): if kwargs.get('class') not in ['Videohub', 'SmartView']: return device_type = kwargs.get('device_type') device_id = kwargs.get('id') if device_id is None: return await self.add_discovered_device(device_type, info, device_id) async def on_discovery_service_updated(self, info, **kwargs): logger.debug(f'update: {info!r}, {kwargs}') await self.on_discovery_service_added(info, **kwargs) def on_device_trigger_save(self, *args, **kwargs): self.save()
[docs] def save(self, filename=None): """Saves the config data to the given filename Arguments: filename (:obj:`str`, optional): The filename to write config data to. If not supplied, the current :attr:`filename` is used. Notes: If the ``filename`` argument is provided, it will replace the existing :attr:`filename` value. """ if filename is not None: self.filename = filename else: filename = self.filename filename = os.path.expanduser(filename) data = self._get_conf_data() if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) s = jsonfactory.dumps(data, indent=4) with open(filename, 'w') as f: f.write(s)
@classmethod def _prepare_load_params(cls, filename=None, **kwargs): if filename is None: filename = cls.DEFAULT_FILENAME kwargs['filename'] = filename filename = os.path.expanduser(filename) if os.path.exists(filename): with open(filename, 'r') as f: s = f.read() kwargs.update(jsonfactory.loads(s)) return kwargs
[docs] @classmethod def load(cls, filename=None, **kwargs): """Creates a Config instance, loading data from the given filename Arguments: filename (:obj:`str`, optional): The filename to read config data from, defaults to :const:`Config.DEFAULT_FILENAME` Returns: A :class:`Config` instance """ kwargs = cls._prepare_load_params(filename, **kwargs) return cls(**kwargs)
[docs] @classmethod async def load_async(cls, filename=None, **kwargs): """Creates a Config instance, loading data from the given filename This coroutine method creates the ``Config`` instance and will ``await`` all start-up coroutines and futures before returning. Arguments: filename (:obj:`str`, optional): The filename to read config data from, defaults to :attr:`DEFAULT_FILENAME` Returns: A :class:`Config` instance """ kwargs = cls._prepare_load_params(filename, **kwargs) config = cls(**kwargs) await config.start() return config
async def __aenter__(self): await self.start() return self async def __aexit__(self, *args): await self.stop()
[docs]class DeviceConfigBase(ConfigBase): """Base class for device config storage Attributes: config: A reference to the parent :class:`Config` instance backend: An instance of :class:`vidhubcontrol.backends.base.BackendBase` backend_name: The class name of the backend, used when loading from saved config data hostaddr: The IPv4 address of the device hostport: The port address of the device device_name: User-defined name to store with the device, defaults to the :attr:`device_id` value device_id: The unique id as reported by the device """ config: Config = Property() backend: 'vidhubcontrol.backends.base.BackendBase' = Property() backend_name: str = Property() hostaddr: str = Property() hostport: int = Property(9990) device_name: str = Property() device_id: str = Property() _conf_attrs = [ 'backend_name', 'hostaddr', 'hostport', 'device_name', 'device_id', ] connection_manager: SyncronizedConnectionManager """A connection manager that syncronizes its state with the :attr:`backend` """ def __init__(self, **kwargs): self.config = kwargs.get('config') self.loop = asyncio.get_event_loop() self.bind_async(self.loop, backend=self.on_backend_set) self.connection_manager = SyncronizedConnectionManager() @property def connection_state(self) -> ConnectionState: """The current :attr:`~.common.SyncronizedConnectionManager.state` of the :attr:`connection_manager` """ return self.connection_manager.state
[docs] @classmethod async def create(cls, **kwargs): """Creates device config and backend instances asynchronously Keyword arguments passed to this classmethod are passed to the init method and will be used to set its attributes. If a "backend" keyword argument is supplied, it should be a running instance of :class:`vidhubcontrol.backends.base.BackendBase`. It will then be used to collect config values from. If "backend" is not present, the appropriate one will be created using :meth:`build_backend`. Returns: An instance of :class:`DeviceConfigBase` """ self = cls(**kwargs) self.unbind(self.on_backend_set) for attr in self._conf_attrs: setattr(self, attr, kwargs.get(attr)) self.backend = kwargs.get('backend') if self.backend is not None: await self.on_backend_set(self, self.backend, old=None) else: self.backend = await self.build_backend(**self._get_conf_data()) await self.on_backend_set(self, self.backend, old=None) self.bind_async(self.loop, backend=self.on_backend_set) return self
[docs] @classmethod async def from_existing(cls, backend, **kwargs): """Creates a device config object from an existing backend Keyword arguments will be passed to the :meth:`create` method Arguments: backend: An instance of :class:`vidhubcontrol.backends.base.BackendBase` Returns: An instance of :class:`DeviceConfigBase` """ d = dict( backend=backend, backend_name=backend.__class__.__name__, hostaddr=getattr(backend, 'hostaddr', None), hostport=getattr(backend, 'hostport', None), device_name=backend.device_name, device_id=backend.device_id, ) for key, val in d.items(): kwargs.setdefault(key, val) kwargs['event_loop'] = backend.event_loop return await cls.create(**kwargs)
async def reconnect(self): async with self.connection_manager as manager: if ConnectionState.waiting in manager.state: await manager.wait_for('connected|failure') await self.backend.disconnect() await self.backend.connect() async def reset_hostaddr(self, hostaddr, hostport=None): if hostport is None: hostport = self.hostport await self.backend.disconnect() self.hostaddr = hostaddr self.hostport = hostport self.backend.hostaddr = hostaddr self.backend.hostport = hostport await self.backend.connect() self.emit('trigger_save') async def close(self): if self.backend is not None: await self.backend.disconnect() await self.connection_manager.set_other(None)
[docs] async def build_backend(self, cls=None, **kwargs): """Creates a backend instance asynchronously Keyword arguments will be passed to the :meth:`vidhubcontrol.backends.base.BackendBase.create_async` method. Arguments: cls (optional): A subclass of :class:`~vidhubcontrol.backends.base.BackendBase`. If not present, the class will be determined from existing values of :attr:`device_type` and :attr:`backend_name` Returns: An instance of :class:`vidhubcontrol.backends.base.BackendBase` """ kwargs.setdefault('event_loop', self.loop) if cls is None: cls = BACKENDS[self.device_type][self.backend_name] await self.connection_manager.set_other(None) async with self.connection_manager as manager: await manager.set_state(ConnectionState.connecting) backend = cls(**kwargs) async def wait_for_connecting(): async with backend.connection_manager as backend_mgr: state = await backend_mgr.wait_for('connecting') await self.connection_manager.set_other(backend_mgr) t = asyncio.ensure_future(backend.connect()) await wait_for_connecting() await t return backend
def on_backend_prop_change(self, instance, value, **kwargs): if instance is not self.backend: return if not instance.connection_state.is_connected: return prop = kwargs.get('property') setattr(self, prop.name, value) self.emit('trigger_save') async def on_backend_set(self, instance, backend, **kwargs): old = kwargs.get('old') if old is not None: old.unbind(self) if backend is None: await self.connection_manager.set_other(None) return if self.connection_manager.other is not backend.connection_manager: await self.connection_manager.set_other(backend.connection_manager) if backend.connection_state.is_connected: if self.backend.device_name != self.device_name: self.device_name = self.backend.device_name if backend.device_id is None: if self.device_id is None: self.device_id = self.config.id_for_device(self) elif backend.connection_state.is_connected: self.device_id = backend.device_id backend.bind( device_name=self.on_backend_prop_change, device_id=self._on_backend_device_id, ) if hasattr(backend, 'hostport'): if backend.connection_state.is_connected: self.hostaddr = backend.hostaddr self.hostport = backend.hostport backend.bind( hostaddr=self.on_backend_prop_change, hostport=self.on_backend_prop_change, ) def _on_backend_device_id(self, backend, value, **kwargs): if backend is not self.backend: return if not backend.connection_state.is_connected: return if backend.device_id is None: if self.device_id is not None: self.device_id = self.config.id_for_device(self) else: self.device_id = backend.device_id
[docs]class VidhubConfig(DeviceConfigBase): """Config container for VideoHub devices Attributes: presets: Preset data collected from the device :class:`presets <vidhubcontrol.backends.base.Preset>`. Will be used on initialization to populate the preset data to the device """ presets: List[Dict] = ListProperty() _conf_attrs = DeviceConfigBase._conf_attrs + [ 'presets', ] device_type: ClassVar[str] = 'vidhub'
[docs] @classmethod async def create(cls, **kwargs): kwargs.setdefault('presets', []) self = await super().create(**kwargs) return self
[docs] @classmethod async def from_existing(cls, backend, **kwargs): kwargs.setdefault('presets', []) for preset in backend.presets: kwargs['presets'].append(dict( name=preset.name, index=preset.index, crosspoints=preset.crosspoints.copy(), )) return await super().from_existing(backend, **kwargs)
[docs] async def build_backend(self, cls=None, **kwargs): kwargs['presets'] = kwargs['presets'][:] return await super().build_backend(cls, **kwargs)
async def on_backend_set(self, instance, backend, **kwargs): if backend is not None: pkwargs = {k:self.on_preset_update for k in ['name', 'crosspoints']} for preset in self.backend.presets: preset.bind(**pkwargs) self.backend.bind(on_preset_added=self.on_preset_added) await super().on_backend_set(instance, backend, **kwargs) def on_preset_added(self, *args, **kwargs): preset = kwargs.get('preset') self.presets.append(dict( name=preset.name, index=preset.index, crosspoints=preset.crosspoints.copy(), )) bkwargs = {k:self.on_preset_update for k in ['name', 'crosspoints']} self.emit('trigger_save') preset.bind(**bkwargs) def on_preset_update(self, instance, value, **kwargs): prop = kwargs.get('property') if prop.name == 'crosspoints': value = value.copy() self.presets[instance.index][prop.name] = value self.emit('trigger_save') def _get_conf_data(self): d = super()._get_conf_data() for pdata in d['presets']: if 'backend' in pdata: del pdata['backend'] return d
[docs]class SmartViewConfig(DeviceConfigBase): """Config container for SmartView devices """ device_type: ClassVar[str] = 'smartview'
[docs]class SmartScopeConfig(DeviceConfigBase): """Config container for SmartScope devices """ device_type: ClassVar[str] = 'smartscope'
Config._device_type_map['vidhub']['cls'] = VidhubConfig Config._device_type_map['smartview']['cls'] = SmartViewConfig Config._device_type_map['smartscope']['cls'] = SmartScopeConfig @jsonfactory.register class JsonHandler(object): def encode(self, o): if isinstance(o, ConfigBase): d = o._get_conf_data() return d def decode(self, d): keys = [key for key in d if key.isdigit()] if len(keys) == len(d.keys()): return {int(key):d[key] for key in d.keys()} return d