from sensirion_shdlc_driver import ShdlcSerialPort, ShdlcConnection, ShdlcDevice
from sensirion_shdlc_driver.errors import ShdlcTimeoutError
from sensirion_shdlc_driver.command import ShdlcCommand
from struct import pack, unpack
from Drivers.SensorBase import SensorBase
import logging
logger = logging.getLogger("root")
FLOW_UNIT = 1 # 0 = Normalized (0..1) / 1 = Physical / 2 = User Defined
MAXIMUM_FLOW_SLM = 100
FLOW_MEASUREMENT_NAME = "Flow"
class Sfc5400ShdlcCmdSetSetpoint(ShdlcCommand):
def __init__(self, setpoint_normalized):
super(Sfc5400ShdlcCmdSetSetpoint, self).__init__(
id=0x00,
data=pack(">Bf", FLOW_UNIT, setpoint_normalized),
max_response_time=5e-3,
)
class Sfc5400ShdlcCmdGetDeviceInformation(ShdlcCommand):
def __init__(self, index):
super(Sfc5400ShdlcCmdGetDeviceInformation, self).__init__(
id=0xD0, data=[index], max_response_time=10e-3
)
class Sfc5400ShdlcCmdReadMeasuredFlow(ShdlcCommand):
def __init__(self):
super(Sfc5400ShdlcCmdReadMeasuredFlow, self).__init__(
id=0x08, data=[FLOW_UNIT], max_response_time=5e-3
)
def interpret_response(self, data):
return unpack(">f", data)[0]
[docs]class SFX5400(SensorBase):
"""
SFX5400 represents either a Sensirion Flow Controller (SFC) or a Sensirion Flow Meter (SFM) of type 5400.
:type serial_port: str
:param serial_port: Name of the comport the SFX is connected to.
:type name: str
:param name: Name of the device.
"""
def __init__(self, serial_port: str, name="Sfc5400"):
super(SFX5400, self).__init__(name)
self.port = serial_port
self.ShdlcPort = None
self.ShdlcDevice = None
[docs] def connect(self) -> bool:
"""
Attempts to connect to the SFX and reports success.
:return: True if connected successifully, False otherwise.
"""
try:
self.ShdlcPort = ShdlcSerialPort(port=self.port, baudrate=115200)
self.ShdlcDevice = ShdlcDevice(
ShdlcConnection(self.ShdlcPort), slave_address=0
)
except Exception as e:
return e
return True
[docs] def disconnect(self) -> None:
"""
Disconnects the device.
"""
self.set_flow(0)
self.ShdlcPort.close()
[docs] def measure(self) -> dict:
"""
Measures the current mass flow.
:return: Dictionary containing the measurement.
"""
result = self.ShdlcDevice.execute(Sfc5400ShdlcCmdReadMeasuredFlow())
return {FLOW_MEASUREMENT_NAME: result}
[docs] def set_flow(self, setpoint_normalized: float) -> bool:
"""
Sets the current desired mass flow if a flow controller is connected.
:type setpoint_normalized: float
:param setpoint_normalized: Flow setpoint as normalized input between 0 and 1.
:return: True if set successifully, False if exception occured.
"""
if FLOW_UNIT == 1:
setpoint_normalized *= MAXIMUM_FLOW_SLM
try:
self.ShdlcDevice.execute(Sfc5400ShdlcCmdSetSetpoint(setpoint_normalized))
except Exception as e:
logger.error(
"Could not set the mass flow. Make sure there is a mass "
"flow controller connected: {}".format(e)
)
return False
return True
[docs] def is_connected(self):
"""
Checks if the device is connected by reading its serial number.
:return: True if connected, False if not.
"""
try:
self.ShdlcDevice.get_serial_number()
except (TimeoutError, ShdlcTimeoutError, AttributeError):
return False
return True
if __name__ == "__main__":
from Drivers.DeviceIdentifier import DeviceIdentifier
from Utility.Logger import setup_custom_logger
from logging import getLevelName
logger = setup_custom_logger(name="root", level=getLevelName("DEBUG"))
serials = {"EKS": "EKS23Z50PQ", "Heater": "AM01ZB7J", "SFC": "FT1PXV63"}
devices = DeviceIdentifier(serials=serials)
devices.open()
with SFX5400(serial_port=devices.serial_ports["SFC"]) as sfc:
sfc.open()
print(sfc.measure())