diff --git a/BUILD b/BUILD index e591cd3fd6d5dc9204e745b529cef85d438c2791..4befa73ce734f37d500a8aed72dc2bfadfe150b6 100644 --- a/BUILD +++ b/BUILD @@ -124,5 +124,5 @@ pkg_deb( package = "prometheus-dyson", postrm = "debian/postrm", prerm = "debian/prerm", - version = "0.4.0", + version = "0.4.1", ) diff --git a/connect.py b/connect.py index 4a4716c947dbccb3e3cb690d9c823e344f82b216..8d8ef8b21be52a071d38747b82d137f28d307095 100644 --- a/connect.py +++ b/connect.py @@ -1,7 +1,9 @@ """Wraps libdyson's connections with support for config & retries.""" +import time import functools import logging +import os import threading from typing import Callable, Dict, List, Optional @@ -31,8 +33,8 @@ class DeviceWrapper: def __init__(self, device: config.Device, environment_refresh_secs=30): self._config_device = device self._environment_refresh_secs = environment_refresh_secs - self._environment_timer : Optional[threading.Timer] = None - self._timeout_timer : Optional[threading.Timer] = None + self._environment_timer: Optional[threading.Timer] = None + self._timeout_timer: Optional[threading.Timer] = None self.libdyson = self._create_libdyson_device() @property @@ -61,16 +63,19 @@ class DeviceWrapper: if self.is_connected: logger.info( - 'Already connected to %s (%s); no need to reconnect.', host, self.serial) + "Already connected to %s (%s); no need to reconnect.", host, self.serial + ) else: try: self.libdyson.connect(host) self._refresh_timer() except libdyson.exceptions.DysonConnectTimeout: logger.error( - 'Timeout connecting to %s (%s); will retry', host, self.serial) + "Timeout connecting to %s (%s); will retry", host, self.serial + ) self._timeout_timer = threading.Timer( - retry_on_timeout_secs, self.connect, args=[host]) + retry_on_timeout_secs, self.connect, args=[host] + ) self._timeout_timer.start() def disconnect(self): @@ -80,30 +85,36 @@ class DeviceWrapper: if self._timeout_timer: self._timeout_timer.cancel() - self.libdyson.disconnect() + # libdyson will handle disconnects on its own and will raise if you + # try to disconnect a second time. + if self.libdyson.is_connected: + self.libdyson.disconnect() def _refresh_timer(self): - self._environment_timer = threading.Timer(self._environment_refresh_secs, - self._timer_callback) + self._environment_timer = threading.Timer( + self._environment_refresh_secs, self._timer_callback + ) self._environment_timer.start() def _timer_callback(self): self._environment_timer = None if self.is_connected: - logger.debug( - 'Requesting updated environmental data from %s', self.serial) + logger.debug("Requesting updated environmental data from %s", self.serial) try: - self.libdyson.request_environmental_data() + self.libdyson.request_environmental_data() except AttributeError: - logger.error('Race with a disconnect? Skipping an iteration.') + logger.error("Race with a disconnect? Skipping an iteration.") self._refresh_timer() else: - logger.debug('Device %s is disconnected.', self.serial) + logger.debug("Device %s is disconnected.", self.serial) def _create_libdyson_device(self): - return libdyson.get_device(self.serial, self._config_device.credentials, - self._config_device.product_type) + return libdyson.get_device( + self.serial, + self._config_device.credentials, + self._config_device.product_type, + ) class ConnectionManager: @@ -113,16 +124,30 @@ class ConnectionManager: update_fn: A callable taking a name, serial, devices: a list of config.Device entities hosts: a dict of serial -> IP address, for direct (non-zeroconf) connections. + reconnect: True if we should automatically reconnect, False otherwise + watchdog_secs: Number of seconds to wait before terminating the process (0 for no watchdog) """ - def __init__(self, update_fn: Callable[[str, str, bool, bool], None], - devices: List[config.Device], hosts: Dict[str, str], reconnect: bool = True): + def __init__( + self, + update_fn: Callable[[str, str, bool, bool], None], + devices: List[config.Device], + hosts: Dict[str, str], + reconnect: bool = True, + watchdog_secs: int = 300, + ): self._update_fn = update_fn self._hosts = hosts self._reconnect = reconnect self._devices = [DeviceWrapper(d) for d in devices] - logger.info('Starting discovery...') + self._last_update_time = int(time.time()) + self._watchdog_secs = watchdog_secs + if watchdog_secs: + logger.info("Starting process watchdog with %d sec timeout", watchdog_secs) + self._start_watchdog() + + logger.info("Starting discovery...") self._discovery = libdyson.discovery.DysonDiscovery() self._discovery.start_discovery() @@ -133,10 +158,34 @@ class ConnectionManager: """Disconnects from all devices.""" self._discovery.stop_discovery() + if self._watchdog_timer: + self._watchdog_timer.cancel() + for device in self._devices: - logger.info('Disconnecting from %s (%s)', device.name, device.serial) + logger.info("Disconnecting from %s (%s)", device.name, device.serial) device.disconnect() + def _start_watchdog(self): + self._watchdog_timer = threading.Timer( + self._watchdog_secs, self._watchdog_callback + ) + self._watchdog_timer.start() + + def _watchdog_callback(self): + now = int(time.time()) + delta = now - self._last_update_time + if delta > self._watchdog_secs: + logger.error( + "Watchdog process abort: last update was %d seconds ago -- process hung?", + delta, + ) + + # Use os.abort() here to force a crash. sys.exit will raise a SystemExit exception and + # walk the exception handlers, which might take a while. + os.abort() + else: + self._start_watchdog() + def _add_device(self, device: DeviceWrapper, add_listener=True): """Adds and connects to a device. @@ -154,12 +203,19 @@ class ConnectionManager: manual_ip = self._hosts.get(device.serial.upper()) if manual_ip: - logger.info('Attempting connection to device "%s" (serial=%s) via configured IP %s', - device.name, device.serial, manual_ip) + logger.info( + 'Attempting connection to device "%s" (serial=%s) via configured IP %s', + device.name, + device.serial, + manual_ip, + ) device.connect(manual_ip) else: - logger.info('Attempting to discover device "%s" (serial=%s) via zeroconf', - device.name, device.serial) + logger.info( + 'Attempting to discover device "%s" (serial=%s) via zeroconf', + device.name, + device.serial, + ) callback_fn = functools.partial(self._discovery_callback, device) self._discovery.register_device(device.libdyson, callback_fn) @@ -170,14 +226,16 @@ class ConnectionManager: # When we call connect() on libpurecool or libdyson, that code spawns # a new thread for MQTT and returns. In other words: we don't need to # worry about connect() blocking zeroconf here. - logger.info('Discovered %s on %s', device.serial, address) + logger.info("Discovered %s on %s", device.serial, address) device.connect(address) def _device_callback(self, device, message): - logger.debug('Received update from %s: %s', device.serial, message) + logger.debug("Received update from %s: %s", device.serial, message) if not device.is_connected and self._reconnect: logger.info( - 'Device %s is now disconnected, clearing it and re-adding', device.serial) + "Device %s is now disconnected, clearing it and re-adding", + device.serial, + ) device.disconnect() self._discovery.stop_discovery() self._discovery.start_discovery() @@ -186,6 +244,7 @@ class ConnectionManager: is_state = message == libdyson.MessageType.STATE is_environ = message == libdyson.MessageType.ENVIRONMENTAL - self._update_fn(device.name, device.libdyson, is_state=is_state, - is_environmental=is_environ) - + self._last_update_time = int(time.time()) + self._update_fn( + device.name, device.libdyson, is_state=is_state, is_environmental=is_environ + ) diff --git a/main.py b/main.py index 46175695f8430e6227cbf09c0f0cb0a285fc9483..55fe677a3703bbf04a5df3541858f752cb20a7a1 100755 --- a/main.py +++ b/main.py @@ -28,54 +28,73 @@ def _sleep_forever() -> None: def main(argv): """Main body of the program.""" parser = argparse.ArgumentParser(prog=argv[0]) - parser.add_argument('--port', help='HTTP server port', - type=int, default=8091) + parser.add_argument("--port", help="HTTP server port", type=int, default=8091) parser.add_argument( - '--config', help='Configuration file (INI file)', default='config.ini') + "--config", help="Configuration file (INI file)", default="config.ini" + ) parser.add_argument( - '--log_level', help='Logging level (DEBUG, INFO, WARNING, ERROR)', type=str, default='INFO') + "--log_level", + help="Logging level (DEBUG, INFO, WARNING, ERROR)", + type=str, + default="INFO", + ) parser.add_argument( - '--include_inactive_devices', - help='Do not use; this flag has no effect and remains for compatibility only', - action='store_true') + "--watchdog_timeout_seconds", + help="Timeout to abort the process if exceeded (0 for no watchdog)", + type=int, + default=300, + ) + parser.add_argument( + "--include_inactive_devices", + help="Do not use; this flag has no effect and remains for compatibility only", + action="store_true", + ) + args = parser.parse_args() try: level = getattr(logging, args.log_level) except AttributeError: - print(f'Invalid --log_level: {args.log_level}') + print(f"Invalid --log_level: {args.log_level}") sys.exit(-1) args = parser.parse_args() logging.basicConfig( - format='%(asctime)s [%(name)24s %(thread)d] %(levelname)10s %(message)s', - datefmt='%Y/%m/%d %H:%M:%S', - level=level) + format="%(asctime)s [%(name)24s %(thread)d] %(levelname)10s %(message)s", + datefmt="%Y/%m/%d %H:%M:%S", + level=level, + ) - logger.info('Starting up on port=%s', args.port) + logger.info("Starting up on port=%s", args.port) if args.include_inactive_devices: logger.warning( - '--include_inactive_devices is now inoperative and will be removed in a future release') + "--include_inactive_devices is now inoperative and will be removed in a future release" + ) try: cfg = config.Config(args.config) except: - logger.exception('Could not load configuration: %s', args.config) + logger.exception("Could not load configuration: %s", args.config) sys.exit(-1) devices = cfg.devices if len(devices) == 0: logger.fatal( - 'No devices configured; please re-run this program with --create_device_cache.') + "No devices configured; please re-run this program with --create_device_cache." + ) sys.exit(-2) prometheus_client.start_http_server(args.port) - - connect.ConnectionManager(metrics.Metrics().update, devices, cfg.hosts) + connect.ConnectionManager( + metrics.Metrics().update, + devices, + cfg.hosts, + watchdog_secs=args.watchdog_timeout_seconds, + ) _sleep_forever() -if __name__ == '__main__': +if __name__ == "__main__": main(sys.argv)