from Drivers.SHT import EKS
from Drivers.SFX5400 import SFX5400
from Drivers.Shdlc_IO import ShdlcIoModule
from Drivers.DeviceIdentifier import DeviceIdentifier
from Utility.MeasurementBuffer import MeasurementBuffer
from Utility.Timer import RepeatTimer
from Utility.ConfigurationHandler import ConfigurationHandler
from simple_pid import PID
import logging
import time
import numpy as np
from enum import Enum
from copy import deepcopy
from scipy.io import savemat
import os
logger = logging.getLogger("root")
[docs]class Mode(Enum):
"""
Defines a set of system modes.
1. IDLE: Before any of the experiment modes has been loaded the system is idle.
2. FORCE_PWM_OFF: In this mode the pwm can be set directly, but the output is currently turned off.
3. FORCE_PWM_ON: In this mode the pwm can be set directly.
4. PID_OFF: In this mode the pid parameters can be set, but the output is currently turned off.
5. PID_ON: In this mode the pid parameters can be set and the controller is allowed to set pwm values.
"""
IDLE = 0 # Not started yet
FORCE_PWM_OFF = 1 # User defines PWM
FORCE_PWM_ON = 2
PID_OFF = 3 # PID defines PWM, but is inactive
PID_ON = 4 # PID defines PWM, is active
[docs]class Setup(object):
"""
The Setup handles all interaction with the hardware of the experiment.
:type serials: dict
:param serials: Dictionary of device names and corresponding USB serials.
:type t_sampling_s: float
:param t_sampling_s: Measurement sampling time in seconds.
:type interval_s: float
:param interval_s: Total buffered time interval in seconds, which in combination with the sampling time defines
the number of stored measurements.
"""
def __init__(self, config: ConfigurationHandler):
# allocate private member variables
self._serials = deepcopy(config["serials"])
self._t_sampling_s = config["general"]["t_sampling"]
self._devices = None
self._buffering = True
self._measurement_timer = None
self._eks = None
self._sfc = None
self._heater = None
self._sdp = None
self._current_pwm_value = 0
self._current_flow_value = 0
self._current_mode = Mode.IDLE
self.simulation_mode = False
self.temperature_difference_setpoint = config["general"][
"temperature_difference_set_point_low"
] # Temperature difference setpoint
self._delta_T = 0 # Static state temperature difference for calibration
self.config = config
self.massflow_estimator = MassflowEstimator(config=config)
# allocate public member variables
self.interval_s = config["general"]["interval"]
self.measurement_buffer = self._setup_measurement_buffer() # Measurement buffer
self.state = None # Storage for current measurement frame
self.controller = PID(
Kp=0.0,
Ki=0.0,
Kd=0.0,
setpoint=self.temperature_difference_setpoint,
sample_time=self.config["pid_controller"]["sample_time"],
output_limits=(0, 1),
)
# allocate frequently accessed configuration constants
self.safety_upper_temperature_limit = self.config["safety"][
"upper_temperature_limit"
]
self.safety_lower_flow_limit = self.config["safety"]["lower_flow_limit"]
self.nominal_massflow = self.config["general"]["nominal_mass_flow_rate"] / 100
# allocate error flags
self.error_high_temperature = False
self.error_low_flow = False
[docs] def save_measurement_buffer(self, folder, name, type='mat'):
"""
Saves the current measurement buffer to a file.
:param folder: Destination folder.
:param name: Name of the file. A time tag will be appended for uniqueness.
:param type: To allow different export filetypes.
"""
if type == 'mat':
# Save as matlab .mat file
file_name = "{}_{}.mat".format(name, time.strftime('%Y-%m-%d_%H-%M-%S'))
file_name = os.path.join(folder, file_name)
# Check if the folder exists
if os.path.exists(folder):
pass
else:
os.mkdir(path=folder)
savemat(file_name=file_name, mdict=self.measurement_buffer.data)
else:
raise NotImplementedError("File type {} not implemented yet".format(type))
[docs] def _setup_measurement_buffer(self) -> MeasurementBuffer:
"""
Defines the set of recorded signals and creates a corresponding MeasurementBuffer.
:return: An instance of MeasurementBuffer containing a deque instance for every signal.
.. seealso::
Module :mod:`Utility.MeasurementBuffer.MeasurementBuffer`
"""
signals = [
"Temperature_1",
"Temperature_2",
"Humidity_1",
"Humidity_2",
"Flow",
"Time",
"Temperature_Difference",
"PWM",
"Flow_Estimate",
"Target_Delta_T",
"Controller_Output_P",
"Controller_Output_I",
"Controller_Output_D",
"Controller_Output",
]
return MeasurementBuffer(
signals=signals,
buffer_interval_s=self.interval_s,
sampling_time_s=self._t_sampling_s,
)
[docs] def open(self) -> None:
"""
Finds and opens all the USB devices previously defined within `self.serials` by their serial number.
If one of the devices is not responsive or cannot be found, the setup is switching to simulation mode
in which all measurements are simulated. This allows to test the GUI without any attached devices.
.. seealso::
Module :mod:`Drivers.DeviceIdentifier.DeviceIdentifier`
"""
self._devices = DeviceIdentifier(serials=self._serials)
if self._devices.open():
# Connect all sensors / actuators
self._eks = EKS(serial_port=self._devices.serial_ports["EKS_ONE"])
self._eks.open()
self._sfc = SFX5400(serial_port=self._devices.serial_ports["SFC"])
self._sfc.open()
self._heater = ShdlcIoModule(
serial_port=self._devices.serial_ports["Heater"]
)
self._heater.open()
eks_online = self._eks.is_connected()
sfc_online = self._sfc.is_connected()
heater_online = self._heater.is_connected()
if not all([eks_online, sfc_online, heater_online]):
self.simulation_mode = True
logger.warning("Entering simulation mode.")
else:
self.simulation_mode = True
logger.warning("Entering simulation mode.")
# switch the temperature sensors if necessary:
if not self.simulation_mode and self.config["general"]["temp_sensors_switched"]:
self.reverse_temp_sensors(update=False)
[docs] def close(self) -> None:
"""
Closes all connected devices.
"""
self.stop_measurement_thread()
if self.simulation_mode:
pass
else:
self._eks.close()
self._sfc.close()
self._heater.close()
def __enter__(self):
"""
Ensures compatibility with 'with' statement.
:return: Returns a reference to the current instance.
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Called when exiting the context established by the 'with' statement.
.. warning: Only use this class inside a 'with' statement to ensure calling the shut down procedure for every
connected platform even upon an unscheduled end of the program.
"""
if isinstance(exc_val, KeyboardInterrupt):
logger.info("Experiment aborted with Keyboard-Interrupt!")
self.close()
return True
elif isinstance(exc_val, BaseException) or isinstance(exc_val, Exception):
logger.info(
"An unhandled error of type '{}' occured. "
"Please check console output!".format(exc_type.__name__)
)
self.close()
else:
logger.info("Experiment ended regularly.")
self.close()
[docs] def measure(self) -> None:
"""
Handles measuring and storing signals depending on the system mode and handles updating the PID
controller output.
.. seealso::
:meth:`_measure_simulation_mode`
:meth:`_measure_normal_mode`
:mod:`Utility.MeasurementBuffer.MeasurementBuffer`
"""
# Retrieve all recorded signals depending on system mode
if self.simulation_mode:
results = self._measure_simulation_mode()
else:
results = self._measure_normal_mode()
# Calculate control related signals depending on whether the controller is active
if self._current_mode is Mode.PID_ON:
desired_pwm = self.controller(input_=results["Temperature_Difference"])
(
results["Controller_Output_P"],
results["Controller_Output_I"],
results["Controller_Output_D"],
) = self.controller.components
results["Controller_Output"] = self.controller._last_output
else:
desired_pwm = 0
(
results["Controller_Output_P"],
results["Controller_Output_I"],
results["Controller_Output_D"],
results["Controller_Output"],
) = (0, 0, 0, 0)
# Decide whether to set a new pwm value:
# If the flow is too low or the temperatures too high stop heating immediately:
if (
results["Flow"] < self.safety_lower_flow_limit
and self._current_flow_value > 0
and self._current_pwm_value > 0
):
self.set_pwm(0)
self.error_low_flow = True
if (
results["Temperature_1"] > self.safety_upper_temperature_limit
or results["Temperature_2"] > self.safety_upper_temperature_limit
) and self._current_pwm_value > 0:
self.set_pwm(0)
self.error_high_temperature = True
# If we're in PID mode set the previously calculated value
elif self._current_mode is Mode.PID_ON:
self._current_pwm_value = desired_pwm
self.set_pwm(desired_pwm)
# If we're not in PID mode the pwm setting is handled directly via the slider
else:
pass
# Store the current measurement
self.state = results
if self._buffering:
# Buffer multiple measurements in the measurement buffer
self.measurement_buffer.update(results)
[docs] def _measure_simulation_mode(self) -> dict:
"""
When no devices are connected random values are generated instead of actual measurements.
:return: A dictionary with all signals
"""
T_1 = 25 + 0.1 * np.random.rand()
T_2 = 30 + 0.1 * np.random.rand()
delta_T = T_2 - T_1
results_timestamp = time.time()
results = {
"Temperature_1": T_1,
"Temperature_2": T_2,
"Humidity_1": 50 + np.random.rand(),
"Humidity_2": 30 + np.random.rand(),
"Flow": 50 + np.random.rand(),
"Time": results_timestamp,
"Temperature_Difference": delta_T,
"PWM": self._current_pwm_value,
"Flow_Estimate": self.massflow_estimator.calculate(
delta_t=delta_T, pwm=self._current_pwm_value
),
"Target_Delta_T": self.temperature_difference_setpoint,
}
return results
[docs] def _measure_normal_mode(self) -> dict:
"""
Measures all devices.
:return: A dictionary with all measured signals.
"""
results_eks = self._eks.measure()
results_sfc = self._sfc.measure()
results_timestamp = time.time()
delta_T = (
results_eks[1]["Temperature"]
- results_eks[0]["Temperature"]
- self._delta_T
)
results = {
"Temperature_1": results_eks[0]["Temperature"],
"Temperature_2": results_eks[1]["Temperature"] - self._delta_T,
"Humidity_1": results_eks[0]["Humidity"],
"Humidity_2": results_eks[1]["Humidity"],
"Flow": results_sfc["Flow"],
"Time": results_timestamp,
"Temperature_Difference": delta_T,
"PWM": self._current_pwm_value,
"Flow_Estimate": self.massflow_estimator.calculate(
delta_t=delta_T, pwm=self._current_pwm_value
),
"Target_Delta_T": self.temperature_difference_setpoint,
}
return results
[docs] def start_buffering(self) -> None:
"""
Start recording measurements in the MeasurementBuffer and delete previously recorded measurements.
"""
if self._buffering:
logger.error("Cannot start buffering, already recording to buffer!")
else:
self._buffering = True
self.measurement_buffer.clear()
[docs] def stop_buffering(self) -> None:
"""
Stop recording measurements in the MeasurementBuffer.
"""
if not self._buffering:
logger.error("Cannot stop buffering, not currenly recording to buffer!")
else:
self._buffering = False
[docs] def start_measurement_thread(self) -> None:
"""
Creates a thread.Timer that schedules future measurements at the desired sampling time.
.. seealso::
:mod:`Utility.Timer.RepeatTimer`
"""
if self._measurement_timer is None:
self.measurement_buffer.clear()
self._measurement_timer = RepeatTimer(
interval=self._t_sampling_s, function=self.measure
)
self._measurement_timer.start()
logger.info(
"Started measurement thread running at t_s={} s".format(
self._t_sampling_s
)
)
else:
logger.error("Measurement thread already running!")
[docs] def stop_measurement_thread(self) -> None:
"""
Cancels the current measurement thread.
"""
if self._measurement_timer is not None:
self.set_pwm(0)
self.controller.reset()
self._measurement_timer.cancel()
self._measurement_timer = None
logger.info("Stopped measurement thread.")
else:
logger.error("Measurement thread not started yet!")
[docs] def set_pwm(self, value: float) -> None:
"""
Safely sets the desired PWM value depending on the current system mode.
:type value: float
:param value: Desired PWM value as a normalized value between 0 and 1.
.. seealso::
:mod:`setup.Mode`
"""
if self.simulation_mode:
self._current_pwm_value = 0
elif self._current_mode in [Mode.IDLE, Mode.FORCE_PWM_OFF, Mode.PID_OFF]:
self._heater.set_pwm(pwm_bit=0, dc=0)
self._current_pwm_value = 0
elif self._current_mode in [Mode.FORCE_PWM_ON, Mode.PID_ON]:
value = float(value)
if not 0.0 <= value <= 1.0:
raise ValueError(
"PWM value: {} has to be between 0 and 1".format(value)
)
# Safety check: If the flow is smaller than x slm, heating will not be allowed
if value != 0:
if (
self.state["Flow"] < self.safety_lower_flow_limit
or self.state["Temperature_1"] > self.safety_upper_temperature_limit
or self.state["Temperature_2"] > self.safety_upper_temperature_limit
):
value = 0
# Register the newly set pwm value for later recording
self._current_pwm_value = value
# convert to heater units:
value = int(value * 65535.0)
self._heater.set_pwm(pwm_bit=0, dc=value)
[docs] def set_setpoint(self, value: float) -> None:
"""
Allows to define the temperature difference setpoint.
:type value: float
:param value: Positive value smaller 20 degrees.
"""
value = float(value)
if value < 0:
raise RuntimeError("This is a heating setup. Not a fridge, dummy!")
if value > self.config["safety"]["upper_temperature_limit"] - 25:
raise RuntimeError("This a test setup. Not an oven, du Löli!")
self.temperature_difference_setpoint = value
self.controller.setpoint = value
[docs] def set_kp(self, kp: float) -> None:
"""
Allows setting the Kp gain of the controller.
:type kp: float
:param kp: Kp gain of the controller.
"""
self.set_pid_parameters(kp=float(kp))
[docs] def set_ki(self, ki: float) -> None:
"""
Allows setting the Ki gain of the controller.
:type ki: float
:param ki: Ki gain of the controller
"""
self.set_pid_parameters(ki=float(ki))
[docs] def set_kd(self, kd: float) -> None:
"""
Allows setting the Kd gain of the controller.
:type kd: float
:param kd: Kd gain of the controller
"""
self.set_pid_parameters(kd=float(kd))
[docs] def set_pid_parameters(self, kp=None, ki=None, kd=None) -> None:
"""
Interface to the pid-setting functionality of simple_pid.
:type kp: float
:param kp: Kp gain of the controller
:type ki: float
:param ki: Ki gain of the controller
:type kd: float
:param kd: Kd gain of the controller
"""
if self.simulation_mode:
pass
else:
if kp is None:
kp = self.controller.Kp
if ki is None:
ki = self.controller.Ki
if kd is None:
kd = self.controller.Kd
self.controller.tunings = (kp, ki, kd)
[docs] def set_flow(self, value):
"""
Interface to the SFC5xxx drive for defining the current flow
setpoint
:type flow: float
:param flow: The desired massflow in normalized units, in [0, 1].
"""
if self.simulation_mode:
pass
else:
if 0.0 <= value <= 1:
self._sfc.set_flow(setpoint_normalized=value)
self._current_flow_value = value
[docs] def get_current_flow_value(self):
"""
Getter for last set target flow value.
:return: Last set target flow value in normalized units.
"""
return self._current_flow_value
[docs] def start_pid_controller(self, setpoint=None) -> None:
"""
Start pid mode with the output set to off.
:type setpoint: float
:param setpoint: Can be used to define a new temperature difference setpoint.
"""
self._current_mode = Mode.PID_OFF
if setpoint is not None:
self.temperature_difference_setpoint = setpoint
[docs] def start_direct_power_setting(self) -> None:
"""
Start pwm mode with the output set to off.
"""
self._current_mode = Mode.FORCE_PWM_OFF
[docs] def enable_output(self, desired_pwm_output=0) -> None:
"""
Enables the output in either pwn or pid mode.
:type desired_pwm_output: float
:param desired_pwm_output: Optionally enable pwm mode with a predefined nonzero output.
"""
self.controller.reset()
if self._current_mode is Mode.PID_OFF:
self._current_mode = Mode.PID_ON
elif self._current_mode is Mode.FORCE_PWM_OFF:
self._current_mode = Mode.FORCE_PWM_ON
self.set_pwm(desired_pwm_output)
[docs] def disable_output(self) -> None:
"""
Disable the output for either pwm or pid mode.
"""
if self._current_mode is Mode.PID_ON:
self._current_mode = Mode.PID_OFF
elif self._current_mode is Mode.FORCE_PWM_ON:
self._current_mode = Mode.FORCE_PWM_OFF
self.set_pwm(0)
[docs] def set_temperature_calibration(self) -> None:
"""
Record the current temperature offset, assuming steady state.
"""
# Reset first
self.reset_temperature_calibration()
delta_T = self.state["Temperature_Difference"]
threshold = self.config["measurement"]["temperature"][
"maximum_calibration_offset"
]
if delta_T > threshold:
logger.warning(
"Calibration attempted, delta T: {} larger than threshold: {}".format(
delta_T, threshold
)
)
else:
self._delta_T = delta_T
[docs] def reset_temperature_calibration(self) -> None:
"""
Reset the current temperature offset to zero.
"""
self._delta_T = 0
[docs] def reverse_temp_sensors(self, update=True) -> None:
"""
Reverse the order of the temperature sensors if the have been set up wrongly.
"""
self._eks.reverse_sensor_order()
if update:
# switch the corresponding entry in the configuration file and save it
if self.config["general"]["temp_sensors_switched"] == 1:
self.config["general"]["temp_sensors_switched"] = 0
else:
self.config["general"]["temp_sensors_switched"] = 1
self.config.write()
class MassflowEstimator(object):
def __init__(self, config):
self.c_p = config["measurement"]["massflow_estimate"]["c_p"]
self.resistance = config["measurement"]["massflow_estimate"]["resistance"]
self.voltage = config["measurement"]["massflow_estimate"]["voltage"]
self.massflow_SI2SLM = config["measurement"]["massflow_estimate"][
"massflow_SI2SLM"
]
def calculate(self, delta_t: float, pwm: float) -> float:
"""
Calculate the massflow estimate depending on the measured temperature difference and the current output power
:type delta_t: float
:param delta_t: Measured temperature difference
:type pwm: float
:param pwm: Current output power
"""
if delta_t == 0:
return 0
else:
return self.massflow_SI2SLM * (
pwm * self.voltage ** 2 / (self.resistance * self.c_p * delta_t)
)