Source code for GUI.CustomWidgets.Widgets

from abc import ABC

from GUI.CustomWidgets.BaseWidgets import *
from GUI.Utils import resource_path
from setup import Setup
from typing import Callable
import logging
import numpy
import time
from abc import abstractmethod

logger = logging.getLogger("root")

[docs]class FancyPointCounter(QLCDNumber): """ Custom version of the QLCDNumber. """ def __init__(self, setup, *args, **kwargs): super(FancyPointCounter, self).__init__(*args) self.setup = setup self._value = 0 # configure counter self.setFixedHeight(180) self.setFixedWidth(300) # Set up timer for live updating self.timer = QTimer() self.timer.setInterval(300) self.timer.timeout.connect(self._update_counter) def _update_counter(self): delta_t = numpy.asarray( self.setup.measurement_buffer["Temperature_Difference"] ).flatten() target_delta_t = numpy.asarray( self.setup.measurement_buffer["Target_Delta_T"] ).flatten() errors = delta_t - target_delta_t squared_sums = numpy.sum(errors ** 2) self._value = int(squared_sums) self.display(self._value) def start(self): self.timer.start() def stop(self): self.timer.stop() def reset(self): self.display(0) @property def value(self): return self._value
[docs]class CompetitionWidget(FramedWidget): """ The CompetitionWidget allows to start a recording of the current performance and displays the number of points reached. """ def __init__( self, setup: Setup, start_recording_action: Callable, stop_recording_action: Callable, enable_output_action: Callable, *args, **kwargs ) -> None: super(CompetitionWidget, self).__init__(*args, **kwargs) self.setup = setup self.setup_stop_recording = stop_recording_action self.setup_start_recording = start_recording_action self.output = enable_output_action self.fancy_counter = FancyPointCounter(setup=setup) self.start_button = QPushButton( QIcon(resource_path("Icons\\control-record.png")), "", self ) self.start_button.setFixedSize(32, 32) self.start_button.setStatusTip( "Start recording points. Only possible if dT < 0.5!" ) self.start_button.clicked.connect(self._start_recording) # Set up error message self.error_message = QErrorMessage() self.success_message = QErrorMessage() # Set up time progress bar self.progressbar = QProgressBar() self.progressbar.setMinimum(0) # Set up layout self.horizontal_layout = QHBoxLayout() self.vertical_layout = QVBoxLayout() self.button_layout = QVBoxLayout() self.button_layout.addWidget(self.start_button) self.button_layout.addWidget(QLabel(), 1) self.vertical_layout.addLayout(self.horizontal_layout) self.vertical_layout.addWidget(self.progressbar) self.horizontal_layout.addLayout(self.button_layout) self.horizontal_layout.addWidget(QLabel(), 1) self.horizontal_layout.addWidget(self.fancy_counter) self.setLayout(self.vertical_layout) # Set up timer for live updating self.timer = QTimer() self.timer.setInterval(100) self.timer.timeout.connect(self._update_progress) self.initial_time = None
[docs] def _update_progress(self) -> None: """ Update the progressbar to show the current remaining time. If the recording interval has passed the process is stopped. """ running_time_s = time.time() - self.initial_time if running_time_s < self.wait_time_s: self._update_process_values(running_time_s=running_time_s) self.progressbar.setValue(running_time_s) else: # Stop the QTimer updating this widget self.timer.stop() # Stop the QTimer of the counter self.fancy_counter.stop() # Show the final value of the progressbar, process completed self.progressbar.setValue(self.progressbar.maximum()) # Stop recording measurement values to allow the user to inspect the measurement plot self.setup_stop_recording() # Reset anything else if necessary self._stop_recording() # Re-enable the start button of this widget self.start_button.setEnabled(True) # Show the final number of points and declare success self.success_message.showMessage( "Congratulations, you accumulated only {} points!".format( self.fancy_counter.value ) )
[docs] def reset(self) -> None: """ Reset the competition widget upon reloading it. """ self.fancy_counter.reset()
[docs] def _update_process_values(self, running_time_s) -> None: """ Container function for updates that are specific to the inheriting widgets """ pass
@abstractmethod def _start_recording(self) -> None: pass def _stop_recording(self) -> None: # not abstract since not necessary in every case pass
class CompetitionDisturbanceRejectionWidget(CompetitionWidget): def __init__( self, setup: Setup, start_recording_action: Callable, stop_recording_action: Callable, enable_output_action: Callable, set_flow_action: Callable, enable_toggle_setpoint_action: Callable, disable_toggle_setpoint_action: Callable, pid_sliders=None, *args, **kwargs ) -> None: super().__init__( setup=setup, start_recording_action=start_recording_action, stop_recording_action=stop_recording_action, enable_output_action=enable_output_action, *args, **kwargs ) # Add set flow action self.set_flow = set_flow_action self.pid_sliders = pid_sliders # Add toggle setpoint actions self.enable_toggle_setpoint = enable_toggle_setpoint_action self.disable_toggle_setpoint = disable_toggle_setpoint_action # divide by 100 to convert from slm to normalized units self.nominal_flow = self.setup.config["general"]["nominal_mass_flow_rate"] / 100 self.disturbance_high = ( self.nominal_flow + self.setup.config["disturbance_rejection"]["deviation"] / 100 ) self.disturbance_low = ( self.nominal_flow - self.setup.config["disturbance_rejection"]["deviation"] / 100 ) self.disturbance_duration = self.setup.config["disturbance_rejection"][ "duration" ] self.disturbance_delay = self.setup.config["disturbance_rejection"]["delay"] self.process_state = 0 # Configure progressbar and waiting time self.wait_time_s = self.disturbance_delay * 3 + self.disturbance_duration * 2 self.progressbar.setMaximum(self.wait_time_s) def _start_recording(self) -> None: """ Takes control of the whole setup and GUI to start the recording. .. note:: This function can only be called successifully if the current temperature difference is smaller 0.5 degrees. This is necessary to prevent cheating. """ if ( abs( self.setup.state["Temperature_Difference"] - self.setup.temperature_difference_setpoint ) < self.setup.config["anti_cheat"]["pid_setting_threshold"] ) or self.setup.simulation_mode: # Set the initial time of the recording self.initial_time = time.time() # Start the QTimer that controls the update rate of the widget self.timer.start() # Start the internal QTimer of the point counter self.fancy_counter.start() # Set the progress bar to initial value 0 self.progressbar.setValue(0) # Start the buffering of new measurements self.setup_start_recording() # Clear the measurement buffer to get rid of old measurements self.setup.measurement_buffer.clear() # Set the mass flow to the initial value self.set_flow(self.nominal_flow) # Disable the start button for the duration of the recording self.start_button.setDisabled(True) # Disable pid sliders if self.pid_sliders is not None: for pid_slider in self.pid_sliders: pid_slider.setDisabled(True) # Disable toggling of setpoint self.disable_toggle_setpoint() else: self.error_message.showMessage( "Recording a game is only possible if the system is close to the temperature" " difference setpoint (deviation smaller {})!".format( self.setup.config["anti_cheat"]["pid_setting_threshold"] ) ) def _update_process_values(self, running_time_s) -> None: """ Container function for updates that are specific to the inheriting widgets """ if running_time_s > self.disturbance_delay/2 and self.process_state == 0: # Switch to disturbance high self.set_flow(self.disturbance_high) self.process_state = 1 elif ( running_time_s > self.disturbance_delay + self.disturbance_duration and self.process_state == 1 ): self.set_flow(self.nominal_flow) self.process_state = 2 elif ( running_time_s > 2 * self.disturbance_delay + self.disturbance_duration and self.process_state == 2 ): self.set_flow(self.disturbance_low) self.process_state = 3 elif ( running_time_s > 2 * self.disturbance_delay + 2 * self.disturbance_delay and self.process_state == 3 ): self.set_flow(self.nominal_flow) self.process_state = 4 else: pass def _stop_recording(self) -> None: # Enable pid sliders if self.pid_sliders is not None: for pid_slider in self.pid_sliders: pid_slider.setEnabled(True) # Reenable toggle setpoint self.enable_toggle_setpoint()
[docs]class StatusWidget(FramedWidget): """ The StatusWidget displays the current temperatures, flow and temperature difference measured. """ def __init__(self, setup, *args, **kwargs) -> None: super(StatusWidget, self).__init__(*args, **kwargs) self.setup = setup # Widgets grid = QGridLayout() gbox_temp1 = QGroupBox("Temperature Sensor 1") gbox_temp2 = QGroupBox("Temperature Sensor 2") gbox_flow = QGroupBox("Flow Sensor") gbox_delta_t = QGroupBox("Temperature Difference") # LCDs self.lcds = { "T1": LabelledQLCD(signal="Temperature_1", title="Temperature [°C]"), "T2": LabelledQLCD(signal="Temperature_2", title="Temperature [°C]"), "H1": LabelledQLCD(signal="Humidity_1", title="Humidity [%]"), "H2": LabelledQLCD(signal="Humidity_2", title="Humidity [%]"), "FL": LabelledQLCD(signal="Flow", title="Flow [slm]"), "DT": LabelledQLCD(signal="Temperature_Difference", title="Delta T [°C]"), } # Layouts hlayout_temp1 = QHBoxLayout() hlayout_temp2 = QHBoxLayout() hlayout_flow = QHBoxLayout() hlayout_delta_t = QHBoxLayout() # Put stuff together hlayout_temp1.addWidget(self.lcds["T1"]) hlayout_temp1.addWidget(self.lcds["H1"]) hlayout_temp2.addWidget(self.lcds["T2"]) hlayout_temp2.addWidget(self.lcds["H2"]) hlayout_flow.addWidget(self.lcds["FL"]) hlayout_delta_t.addWidget(self.lcds["DT"]) gbox_temp1.setLayout(hlayout_temp1) gbox_temp2.setLayout(hlayout_temp2) gbox_flow.setLayout(hlayout_flow) gbox_delta_t.setLayout(hlayout_delta_t) grid.addWidget(gbox_temp1, 0, 0) grid.addWidget(gbox_temp2, 1, 0) grid.addWidget(gbox_flow, 0, 1) grid.addWidget(gbox_delta_t, 1, 1) self.setLayout(grid) # Set up timer for live updating self.timer = QTimer() self.timer.setInterval(300) self.timer.timeout.connect(self._update_lcds) self.timer.start()
[docs] def _update_lcds(self) -> None: """ Updates the displayed values. """ for key, lcd in self.lcds.items(): if key == "FL": lcd.number.display("{:.1f}".format(self.setup.state[lcd.signal])) else: lcd.number.display("{:.2f}".format(self.setup.state[lcd.signal]))