Source code for Drivers.SensorBase

from __future__ import annotations

from abc import ABC, abstractmethod
import logging
from threading import RLock

logger = logging.getLogger("root")


[docs]class SensorBase(ABC): """ Abstract base class for all sensors used in this project. :type name: str :param name: Each sensor must have a unique name. """ def __init__(self, name) -> None: self.name = name self._lock = RLock() # Set up sensor-specific logger logger.info('Creating Sensor "{}"'.format(self.name))
[docs] def open(self) -> bool: """ Attempts to connect the sensor and reports success. :return: True fi connected successifully, False otherwise """ logger.info('Connecting Sensor "{}"...'.format(self.name)) answer = self.connect() if answer: logger.info("... connected {} successfully!".format(self.name)) else: logger.info("... could not connect {}! {}".format(self.name, answer)) return answer
[docs] def close(self) -> None: """ Disconnects the sensor if it is currently connected. """ if self.is_connected(): self.disconnect() logger.info('Closing Sensor "{}"'.format(self.name))
@abstractmethod def disconnect(self): raise NotImplementedError( "'disconnect' not implemented for sensor {}".format(self.name) ) @abstractmethod def connect(self): raise NotImplementedError( "'connect' not implemented for sensor {}".format(self.name) ) @abstractmethod def is_connected(self): raise NotImplementedError( "'is_connected' not implemented for sensor {}".format(self.name) ) @abstractmethod def measure(self): raise NotImplementedError( "'measure' not implemented for sensor {}".format(self.name) ) def __enter__(self) -> SensorBase: """ Ensures compatibility with 'with' statement. :return: Returns a reference to the current instance. """ return self def __exit__(self, exc_type, exc_val, exc_tb): """ Called when exiting the context established by the 'with' statement. .. warning: Only use this class inside a 'with' statement to ensure calling the shut down procedure for every connected platform even upon an unscheduled end of the program. """ self.close()