# -*- coding: utf-8 -*- """ Created on Mon Jan 24 11:26:23 2022 @author: Basics """ import mne import os, sys, traceback, time from PyQt5.QtWidgets import (QMainWindow, QFileDialog, QMessageBox, QCheckBox, QLineEdit, QWidget, QPushButton, QLabel, QHBoxLayout, QGridLayout, QAction, QApplication, QDialog, QDialogButtonBox, QVBoxLayout, QFrame, QTabWidget, QComboBox, QScrollArea, QFormLayout) from PyQt5.QtCore import QTimer, Qt from PyQt5 import QtCore from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar from matplotlib.figure import Figure from matplotlib.widgets import Button as MatplotlibButton import multiprocessing as mp if sys.platform=='darwin': from multiprocessing import Process, Value #from multiprocessing import Queue as StupidNotWorkingQueue else: from multiprocessing import Process, Queue, Value from connection import NeurOne, RDA import pandas as pd from mne.channels.layout import _find_topomap_coords as get_pos import json import numpy as np import ctypes if sys.platform=='darwin': from multiprocessing.queues import Queue as QueueOld class SharedCounter(object): #source: https://gist.github.com/FanchenBao/d8577599c46eab1238a81857bb7277c9 """A synchronized shared counter. The locking done by multiprocessing.Value ensures that only a single process or thread may read or write the in-memory ctypes object. However, in order to do n += 1, Python performs a read followed by a write, so a second process may read the old value before the new one is written by the first process. The solution is to use a multiprocessing.Lock to guarantee the atomicity of the modifications to Value. This class comes almost entirely from Eli Bendersky's blog: http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/ """ def __init__(self, n=0): self.count = mp.Value('i', n) def increment(self, n=1): """ Increment the counter by n (default = 1) """ with self.count.get_lock(): self.count.value += n @property def value(self): """ Return the value of the counter """ return self.count.value class Queue(QueueOld): #source: https://gist.github.com/FanchenBao/d8577599c46eab1238a81857bb7277c9 """ A portable implementation of multiprocessing.Queue. Because of multithreading / multiprocessing semantics, Queue.qsize() may raise the NotImplementedError exception on Unix platforms like Mac OS X where sem_getvalue() is not implemented. This subclass addresses this problem by using a synchronized shared counter (initialized to zero) and increasing / decreasing its value every time the put() and get() methods are called, respectively. This not only prevents NotImplementedError from being raised, but also allows us to implement a reliable version of both qsize() and empty(). Note the implementation of __getstate__ and __setstate__ which help to serialize MyQueue when it is passed between processes. If these functions are not defined, MyQueue cannot be serialized, which will lead to the error of "AttributeError: 'MyQueue' object has no attribute 'size'". See the answer provided here: https://stackoverflow.com/a/65513291/9723036 For documentation of using __getstate__ and __setstate__ to serialize objects, refer to here: https://docs.python.org/3/library/pickle.html#pickling-class-instances """ def __init__(self): super().__init__(ctx=mp.get_context()) self.size = SharedCounter(0) def __getstate__(self): """Help to make MyQueue instance serializable. Note that we record the parent class state, which is the state of the actual queue, and the size of the queue, which is the state of MyQueue. self.size is a SharedCounter instance. It is itself serializable. """ return { 'parent_state': super().__getstate__(), 'size': self.size, } def __setstate__(self, state): super().__setstate__(state['parent_state']) self.size = state['size'] def put(self, *args, **kwargs): super().put(*args, **kwargs) self.size.increment(1) def get(self, *args, **kwargs): item = super().get(*args, **kwargs) self.size.increment(-1) return item def qsize(self): """ Reliable implementation of multiprocessing.Queue.qsize() """ return self.size.value def empty(self): """ Reliable implementation of multiprocessing.Queue.empty() """ return not self.qsize() class NeurOneOffline(): def __init__(self): self.data = None def NO(self,ip,port=50000,buffersize=2**15,ringbuffersize=2000, sendqueue=False,ringbuf_factor=2,dump=False,avgPackets=1): self.ringbuffersize = ringbuffersize tmp_path = '/mnt/projects/P_BCT_EEG/DLPFCM1_iTBS/DLPFC/beep/subj_14/X13193_adam.vhdr' num_electr = 18 eeg_chn = np.arange(0,num_electr,1) hdr = mne.io.read_raw_brainvision(tmp_path) # hdr.set_channel_types({'EMGleft': 'emg', 'EOGright': 'eog'}) # hdr.set_montage(mne.channels.read_custom_montage('easycap-M10_63_NO.txt')) mrk_fullpath = tmp_path[:-4]+'vmrk' eeg_fullpath = tmp_path[:-4]+'eeg' #this two are made by hand instead of function. #Maybe there is some func for this #annotations returns all events - stimA, stimB, stopA, stopB, start of experiment etc... We chose only stim stim = hdr.annotations.onset[np.logical_or(hdr.annotations.description=="Stimulus/A", hdr.annotations.description=="Stimulus/B")] #and here we separate stimA and stimB stimA = hdr.annotations.onset[hdr.annotations.description=="Stimulus/A"] stimB = hdr.annotations.onset[hdr.annotations.description=="Stimulus/B"] #divide for stim A and B #stimR = hdr.annotations.onset[hdr.annotations.description=='Response/R 16'] npts = hdr.n_times nfft = int(hdr.info['sfreq']) # Sampling rate [Hz] fs = int(hdr.info['sfreq']) # Sampling rate [Hz] endsample = npts begsample = 0 print('Sampling rate [Hz]: ' + str(fs)) #[stim, resp, segment, timezero] = mne.read_annotations(mrk_fullpath) thinkkkkk eeg_raw = hdr.get_data(start=begsample, stop=endsample); eeg = eeg_raw[eeg_chn, :] # Select all electrodes data_raw = (hdr.get_data()*1e7) data_raw = data_raw[:,:-5]#int(data_raw.shape[1]%5)] stimB_arr = (stimB*fs).astype(int) data_raw_new = np.zeros([data_raw.shape[0], int(data_raw.shape[1]//5)]) for i in range(data_raw.shape[0]): a = data_raw[i] R = 5 data_raw_new[i,:] = a.reshape(-1, R).mean(1) data_raw = data_raw_new fs=1000 stimB_arr = (stimB_arr/5).astype(int) stim_sig = np.zeros(data_raw.shape[1]).reshape([1,data_raw.shape[1]]) for i in stimB_arr: stim_sig[0,i] = 1 self.fs = fs self.data = np.concatenate((data_raw, stim_sig)) self.buffer = self.data[:,:ringbuffersize] def start(self): self.time = time.time() def getBuffer(self): time_now = time.time() time_diff = int((time_now - self.time)*self.fs) return self.data[:,time_diff-self.ringbuffersize:time_diff] def acquire_data(q, size, run, speed, downsample, sleep_time, ip = '192.168.200.201', port = 5000, offline=False): """Acquires data from NeurOne_v3 and pass it to the queue. Function is supposed to work in separated process. Parameters ------------ q: Queue class from multiprocessing library size: number of samples to acquire run: Value class from multiprocessing library. That value can be changed in main process downsample: boolean value. Says if data will be downsampled to 1000 Hz sleep_time: int, set how often function should refresh. Usually it takes a bit more that that""" #import NeurOne_v3 #channel_list = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15, 16,-1] #just a temp solution to have less channels if offline=="offline": NO = NeurOneOffline() NO.NO(ip=ip,port=50000,buffersize=2**15,ringbuffersize=size,\ sendqueue=False,ringbuf_factor=2,dump=False,avgPackets=downsample) #offline elif offline=="NeurOne": # ip='192.168.200.201' # port = 50000 NO = NeurOne.NO(ip=ip,port=port,buffersize=2**15,ringbuffersize=size,\ sendqueue=False,ringbuf_factor=2, dump=None,avgPackets=downsample) #online elif offline=="BrainProducts": # ip = "169.254.252.66" # port = 51244 NO = RDA.RDA(ip=ip, port=port, buffersize=2**15, ringbuffersize=size,\ sendqueue=False, avgPackets=downsample) NO.start() while True: startt = time.time() if run.value: data = NO.getBuffer()#[channel_list] try: q.put(data) except: print('ERROR') endd=time.time() time.sleep(sleep_time*speed.value-(endd-startt)) class First_window(QMainWindow): """This window opens at the beggining before the main program. It provides a GUI to choose a lot of parameters without modyfying any files.""" def __init__(self, AppForm): self.AppForm = AppForm super(First_window, self).__init__() self.setWindowTitle('EStiMo Configuration') try: cap_loc_file = pd.read_csv('settings/Electrode_selection.txt', sep=':', header=None) self.cap_file_path = cap_loc_file[cap_loc_file[0]=='cap_file_path'].values[0][1].strip() self.full_cap_file_path = cap_loc_file[cap_loc_file[0]=='full_cap_file_path'].values[0][1].strip() #'easycap-M10_63_NO.txt' except: print("CAP FILE EXCEPTION") self.cap_file_path = 'settings/easycap-M10_16_NO.txt' self.full_cap_file_path = 'settings/easycap-M10_63_NO.txt' self.conf_path = 'settings/TMS_protocol.txt' montage = mne.channels.read_custom_montage(self.cap_file_path) montage_file = pd.read_csv(self.cap_file_path, sep='\t') montage_full = mne.channels.read_custom_montage(self.full_cap_file_path) montage_full_file = pd.read_csv(self.full_cap_file_path, sep='\t') self.ch_names_loaded = montage_file.to_numpy()[:,0] self.ch_names_loaded_full = montage_full_file.to_numpy()[:,0] self.montage_file_path = None self.all_names = montage_file.iloc[:,0] self.full_all_names = montage_full_file.iloc[:,0] ch_info = mne.create_info(montage.ch_names, sfreq=1000, ch_types='eeg') ch_info.set_montage(montage) #2d projection of montage (positions) self.pos2d = get_pos(ch_info, None) ch_info_full = mne.create_info(montage_full.ch_names, sfreq=1000, ch_types='eeg') ch_info_full.set_montage(montage_full) #2d projection of montage (positions) self.pos2d_full = get_pos(ch_info_full, None) self.set_values() self.create_figure() def set_values(self, update_plot=False): """ Parameters ---------- update_plot : bool, optional Update plot if needed. The default is False. Returns ------- bool returns False if error occured. """ #Try to load all data from the file. self.ip = '192.168.200.201' self.port = 50000 try: self.included_ch = [] self.included_ch_full = [] settings_file = pd.read_csv(self.conf_path,sep=':', header=None) temp_names = settings_file[settings_file[0]=='names'].values[0][1].strip() self.num_of_ch_loaded = int(settings_file[settings_file[0]=='number_of_channels'].values[0][1]) self.num_of_lines_loaded = int(settings_file[settings_file[0]=='number_of_lines'].values[0][1]) self.time_between_bursts_loaded = int(settings_file[settings_file[0]=='time_between_trains'].values[0][1]) self.breaktime_loaded = int(settings_file[settings_file[0]=='cut_time'].values[0][1]) self.included_ch_loaded = json.loads(settings_file[settings_file[0]=='included_channels'].values[0][1]) self.eog_ch_loaded = int(settings_file[settings_file[0]=='eog_channel'].values[0][1]) self.emg_ch_loaded = int(settings_file[settings_file[0]=='emg_channel'].values[0][1]) self.exp_trig_loaded = int(settings_file[settings_file[0]=='expected_triggers'].values[0][1]) self.exp_time_loaded = int(settings_file[settings_file[0]=='expected_time'].values[0][1]) self.plot_len_loaded = int(settings_file[settings_file[0]=='plot_len'].values[0][1]) self.BoxChecked = False except Exception as e: ex_type, ex_value, ex_traceback = sys.exc_info() # Extract unformatter stack traces as tuples trace_back = traceback.extract_tb(ex_traceback) # Format stacktrace stack_trace = list() for trace in trace_back: stack_trace.append("File : %s , Line : %d, Func.Name : %s, Message : %s" % (trace[0], trace[1], trace[2], trace[3])) self.text_conf_file.setText("No configuration file selected") print('exception') ### Shows error message if error occured. Does not break the program self.msgBox = QMessageBox(self) self.msgBox.setIcon(QMessageBox.Critical) self.msgBox.setDetailedText("Detailed error: \nException type: {} \nMessage: {} \nTrace: {}".format(ex_type.__name__,ex_value, stack_trace)) self.msgBox.setText("Error occured while loading the file. Check the structure of the file and try again.") self.msgBox.setWindowTitle("Protocol file error") self.msgBox.setStandardButtons(QMessageBox.Ok) self.msgBox.show() return False #some code-words for default settings if temp_names not in ['Def', 'def', 'None', 'none', 'default', 'Default']: self.ch_names_loaded = json.loads(temp_names) #else use settings provided else: if self.eog_ch_loaded < self.emg_ch_loaded: self.ch_names_loaded = np.append(self.ch_names_loaded, ["EOG", "EMG"]) self.ch_names_loaded_full = np.append(self.ch_names_loaded_full, ["EOG", "EMG"]) else: self.ch_names_loaded = np.append(self.ch_names_loaded, ["EMG", "EOG"]) self.ch_names_loaded_full = np.append(self.ch_names_loaded_full, ["EMG", "EOG"]) if update_plot: self.color = np.zeros([len(self.pos2d[:,0]),3]) for idx in self.ch_names_loaded: num = np.where(self.all_names.astype(str)==idx) self.color[num] = [0,0,1] #int(self.color[event.ind]!=1) self.ddd.set_color(self.color) self.canvas.draw() lista = [self.time_between_bursts_loaded, self.breaktime_loaded, self.num_of_ch_loaded, self.num_of_lines_loaded, self.eog_ch_loaded, self.emg_ch_loaded] for idx, line in enumerate([self.line_time_between, self.line_cut_time, self.line_num_ch, self.line_num_lines, self.line_eog_ch, self.line_emg_ch]): line.setText(str(lista[idx])) def clickBox(self, state): #change state of the BoxChecked variable if state == QtCore.Qt.Checked: self.BoxChecked = True else: self.BoxChecked = False def systemChanged(self): if self.combobox_system.currentIndex()==0: self.line_ip.setText('192.168.200.201') self.line_port.setText('50000') elif self.combobox_system.currentIndex()==1: self.line_ip.setText("169.254.252.66") self.line_port.setText('51244') def check_connection(self): self.ip = str(self.line_ip.text()) self.port = int(self.line_port.text()) self.run = Value('i', 1) #value that can be changed inside separated process self.speed = Value(ctypes.c_float, 250/1000) #to control data flow speed we can initialize another variable used inside the function self.q = Queue() #queue for data from separated process (data acquire) #Run acquire_data function in separated process so it doesn't freeze when other operations are going if self.combobox_system.currentIndex()==0: self.offline = "NeurOne" #"BrainProducts" #False else: self.offline = "BrainProducts" self.checking_connection = QMessageBox(self) self.checking_connection.setText('Checking connection, please wait!') self.checking_connection.setWindowTitle("Checking connection") # self.checking_connection.setStandardButtons(QMessageBox.Ok) self.checking_connection.show() self.p = Process(target=acquire_data, args=(self.q, 100, self.run, self.speed, True, 1, self.ip, self.port, self.offline)) done=False self.p.start() #start it tries = 0 while tries<50: print(self.q.qsize()) if self.q.qsize()>0: while not done: try: shape = self.q.get().shape print(shape) if self.offline==False or self.offline=="NeurOne": shape=tuple(reversed(shape)) done=True except AttributeError: print('exception') continue if shape[0]>0 and shape[1]>0: self.p.terminate() self.p.join(0.1) self.q.close() self.checking_connection.done(1) self.confirmBoxCheck = QMessageBox(self) self.confirmBoxCheck.setText("{} channels detected. {}/100 samples received.".format( shape[0], shape[1])) self.confirmBoxCheck.setWindowTitle("Signal detected!") self.confirmBoxCheck.setStandardButtons(QMessageBox.Ok) self.confirmBoxCheck.show() return None tries+=1 time.sleep(0.2) if tries>=50: self.p.terminate() self.p.join(0.1) self.q.close() self.confirmBoxCheck = QMessageBox(self) self.confirmBoxCheck.setText("Signal was not detected. Check the connection!") self.confirmBoxCheck.setWindowTitle("No signal detected!") self.confirmBoxCheck.setStandardButtons(QMessageBox.Ok) self.confirmBoxCheck.show() return None def create_figure(self): """ Creates figure, defines layout, plots figures and defines connections between them and events. """ def line_picker(line, mouseevent): """ NOT USED ATM Find the points within a certain distance from the mouseclick in data coords and attach some extra attributes, pickx and picky which are the data points that were picked. """ if mouseevent.xdata is None: return False, dict() xdata = line.get_xdata() ydata = line.get_ydata() maxd = 0.005 d = np.sqrt( (xdata - mouseevent.xdata)**2 + (ydata - mouseevent.ydata)**2) ind, = np.nonzero(d <= maxd) if len(ind): pickx = xdata[ind] picky = ydata[ind] props = dict(ind=ind, pickx=pickx, picky=picky) return True, props else: return False, dict() def reset_included_ch_choice(event): if np.sum(self.color)==0: for i in range(len(self.color)): self.color[i] = [0,0,1] else: for i in range(len(self.color)): self.color[i] = [0,0,0] self.included_ch = [] for idx, col in enumerate(self.color): if list(col) == [0,0,1]: self.included_ch.append(idx) self.ddd.set_color(self.color) print(self.included_ch) self.canvas.draw() def onpick3(event): """ Event handler for electrode selection plot Parameters ---------- event : event type from PyQt5 event handler. Returns ------- int returns 0 if the wrong plot clicked. """ if event.artist!=self.ddd: return 0 ind = event.ind if list(self.color[event.ind][0]) == [0,0,0]: self.color[event.ind] = [0,0,1] else: self.color[event.ind] = [0,0,0] #int(self.color[event.ind]!=1) self.included_ch = [] for idx, col in enumerate(self.color): if list(col) == [0,0,1]: self.included_ch.append(idx) print(self.included_ch) self.ddd.set_color(self.color) self.canvas.draw() print('onpick3 scatter:', ind, self.pos2d[ind,0], self.pos2d[ind,1]) def onpick_full(event): """ Event handler for full cap plot Parameters ---------- event : event type from PyQt5 event handler. Returns ------- None. """ ind = event.ind if list(self.color_full[event.ind][0]) == [0,0,0]: self.color_full[event.ind] = [0,0,1] else: self.color_full[event.ind] = [0,0,0] #int(self.color[event.ind]!=1) self.included_ch_full = [] for idx, col in enumerate(self.color_full): if list(col) == [0,0,1]: self.included_ch_full.append(idx) print(self.included_ch_full) self.ddd_full.set_color(self.color_full) self.pos2d = self.pos2d_full[self.included_ch_full] #self.ddd.set_offsets(np.c_[self.pos2d[:,0], self.pos2d[:,1]]) #Clear the plot before replottig. Computationally very inefficient #but at that stage doesn't matter. self.axes.clear() self.canvas2.draw() self.color = np.zeros([len(self.pos2d[:,0]),3]) self.all_names = self.full_all_names[self.included_ch_full].reset_index(drop=True) draw_main() #self.canvas.draw() print('onpick_full scatter:', ind, self.pos2d_full[ind,0], self.pos2d_full[ind,1]) self.main_frame = QWidget() self.dpi = 100 self.fig = Figure((11, 10), dpi=self.dpi)#, facecolor='whitesmoke') #figure for signal plot self.fig2 = Figure((11, 10), dpi=self.dpi) self.canvas = FigureCanvas(self.fig) self.canvas2 = FigureCanvas(self.fig2) self.axes = self.fig.subplots() self.axes2 = self.fig2.subplots() self.color = np.zeros([len(self.pos2d[:,0]),3]) self.color_full = np.zeros([len(self.pos2d_full[:,0]),3]) for idx in np.array(self.ch_names_loaded)[self.included_ch_loaded]: num = np.where(self.all_names.astype(str)==idx) self.color[num] = [0,0,1] for idx in np.array(self.ch_names_loaded)[self.included_ch_loaded]: num = np.where(self.full_all_names.astype(str)==idx) self.color_full[num] = [0,0,1] for idx, col in enumerate(self.color): if list(col) == [0,0,1]: self.included_ch.append(idx) for idx, col in enumerate(self.color_full): if list(col) == [0,0,1]: self.included_ch_full.append(idx) ### input electrodes plot ### def draw_main(): """ Draws the main plot. Made as a function because it's being redrawn in case of electrode selection change (second plot). """ self.ddd_bck = self.axes.scatter(self.pos2d_full[:,0], self.pos2d_full[:,1], s=600, picker=True, c='silver')#, picker=line_picker) self.ddd = self.axes.scatter(self.pos2d[:,0], self.pos2d[:,1], s=600, picker=True, c=self.color)#, picker=line_picker) self.axes.set_axis_off() self.canvas.mpl_connect('pick_event', onpick3) self.axes.text(0.5, 1.05, 'Anterior', transform=self.axes.transAxes, ha='center', va='center') self.axes.text(0.5, -0.05, 'Posterior', transform=self.axes.transAxes, ha='center', va='center') texts = [] for i in range(len(self.pos2d[:,0])): texts.append(self.axes.text(self.pos2d[i,0], self.pos2d[i,1], self.all_names[i], c='w', ha='center', va='center', fontsize=15)) self.info_text1 = self.axes.text(0,1, "Click on the electrode to select or deselect it", transform=self.axes.transAxes, ha='left', va='center', fontsize=7) self.axnext = self.fig.add_axes([0.81, 0.05, 0.1, 0.075]) self.bnext = MatplotlibButton(self.axnext, 'Reset selection') self.bnext.on_clicked(reset_included_ch_choice) self.canvas.draw() draw_main() ### full cap plot ### self.ddd_full = self.axes2.scatter(self.pos2d_full[:,0], self.pos2d_full[:,1], s=600, picker=True, c=self.color_full)#, picker=line_picker) self.axes2.set_axis_off() self.canvas2.mpl_connect('pick_event', onpick_full) self.axes2.text(0.5, 1.05, 'Anterior', transform=self.axes2.transAxes, ha='center', va='center') self.axes2.text(0.5, -0.05, 'Posterior', transform=self.axes2.transAxes, ha='center', va='center') texts2 = [] for i in range(len(self.pos2d_full[:,0])): texts2.append(self.axes2.text(self.pos2d_full[i,0], self.pos2d_full[i,1], self.full_all_names[i], c='w', ha='center', va='center', fontsize=15)) self.info_text2 = self.axes2.text(0,1, "Select channels you defined in the EEG system's protocol as a real-time output, not\n"\ "channels you want to use for the feature calculation! Number of channels have to match \n"\ "number of the EEG channels from the EEG system", transform=self.axes2.transAxes, ha='left', va='center', fontsize=7) self.canvas2.draw() ### Buttons setup ### self.button_change_electrodes = QPushButton("&Load electrode locations") self.button_change_electrodes.setCheckable(False) self.button_change_electrodes.setMaximumWidth(250) self.button_change_electrodes.clicked.connect(self.get_electrodes_file) self.button1 = QPushButton("&Select montage") self.button1.setCheckable(False) self.button1.setMaximumWidth(370) self.button1.clicked.connect(self.get_file) self.button2 = QPushButton("&Load configuration") self.button2.setCheckable(True) self.button2.setMaximumWidth(370) self.button2.clicked.connect(self.get_configuration) self.button3 = QPushButton("&Check connection") self.button3.setCheckable(False) self.button3.setMaximumWidth(370) self.button3.clicked.connect(self.check_connection) self.button4 = QPushButton("&Run program") self.button4.setCheckable(False) self.button4.clicked.connect(self.run_main_program) def add_thing(self, text, settext, LineMaxWidth = 70): """add single row (label+lineedit) as a horizontal box layout""" layout = QHBoxLayout() lab = QLabel(self) lab.setText(text) lab.setMaximumWidth(300) line = QLineEdit(self) line.setMaximumWidth(LineMaxWidth) line.setAlignment(Qt.AlignRight) layout.addWidget(lab) layout.addWidget(line) if settext!=None: line.setText(str(settext)) return lab, line, layout #Creating label+lineedit as a layouts, to be able to add them to the main #layout later and keep the layout. self.time_between_label, self.line_time_between, time_between_layout = add_thing(self, "Time between trains [s]:", self.time_between_bursts_loaded) self.cut_time_lab, self.line_cut_time, cut_time_layout = add_thing(self, "Cut time [s]:", self.breaktime_loaded) self.num_ch_lab, self.line_num_ch, num_ch_layout = add_thing(self, "Number of channels:", self.num_of_ch_loaded) self.num_lines_lab, self.line_num_lines, num_lines_layout = add_thing(self, "Number of lines:", self.num_of_lines_loaded) self.eog_ch_lab, self.line_eog_ch, eog_ch_layout = add_thing(self, "EOG channel number:", self.eog_ch_loaded) self.emg_ch_lab, self.line_emg_ch, emg_ch_layout = add_thing(self, "EMG channel number:", self.emg_ch_loaded) self.exp_trig_lab, self.line_exp_trig, exp_trig_layout = add_thing(self, "Number of bursts within the train:", self.exp_trig_loaded) self.exp_time_lab, self.line_exp_time, exp_time_layout = add_thing(self, "Expected time of a single train:", self.exp_time_loaded) self.plot_len_lab, self.plot_len_time, plot_len_layout = add_thing(self, "Plot width [s]:", self.plot_len_loaded) # You can add feature name if function was added to the function "features" in the main file features_names = ['None', 'Theta FFT Power', 'Alpha FFT Power', 'Beta FFT Power', 'High Gamma FFT Power', 'Spectral entropy', 'Temporal entropy', 'Line length', 'DWT Power 0-4 Hz', 'DWT 4-8 Hz', 'DWT 8-16 Hz', 'DWT 16-31 Hz','Variance','Correlation'] self.combobox1 = QComboBox() self.combobox1.addItems(features_names) self.combobox2 = QComboBox() self.combobox2.addItems(features_names) self.combobox3 = QComboBox() self.combobox3.addItems(features_names) self.combobox4 = QComboBox() self.combobox4.addItems(features_names) self.combobox5 = QComboBox() self.combobox5.addItems(features_names) self.combobox6 = QComboBox() self.combobox6.addItems(features_names) #Montage file path text self.file_path = QLabel(self) self.file_path.setText("No montage selected") self.file_path.setMaximumWidth(370) self.file_path.setWordWrap(True) #Path for a electrode location file text self.text_right = QLabel(self) self.text_right.setText("Electrodes names and locations file: " + self.cap_file_path) text_left = QLabel(self) text_left.setText("General settings:") self.box = QCheckBox("Slow mode",self) self.box.stateChanged.connect(self.clickBox) self.eye_reg_box = QCheckBox("Eye regression",self) self.notch_box = QCheckBox("Notch filter",self) self.outliers_box = QCheckBox("Remove outliers",self) labels = [] self.line_edits = [] self.checkboxes = [] for i in range(1, 13): label = QLabel(f"Input {i}:") labels.append(label) line_edit = QLineEdit() line_edit.setMaximumWidth(50) line_edit.setText("10") self.line_edits.append(line_edit) checkbox = QCheckBox("%") checkbox.setChecked(True) checkbox.setMaximumWidth(35) self.checkboxes.append(checkbox) box_layout = QHBoxLayout() box_layout.addWidget(self.box) box_layout.addWidget(self.eye_reg_box) box_layout.addWidget(self.notch_box) # box_layout.addWidget(self.outliers_box) #Some description of settings text_last_ch = QLabel(self) text_last_ch.setWordWrap(True) text_last_ch.setFixedWidth(370) text_last_ch.setText("Channels can be set using Python notation (negative numbers as indices from the end). "\ "Number of channels and timings have to correspond to these in EEG system protocol. \n\n"\ "Channels chosen on the right screen will be used for feature calculation. "\ "Visualization will include all channels that are defined in EEG system protocol. "\ "To change used channels: \n"\ "\u2022 change protocol in EEG system \n"\ "\u2022 change TMS_protocol.txt file. Rows \"included channels\" and \"names\". " \ "If \"names\" will be set as \"def\" it will get data automatically from the location file. \n"\ "\u2022 change electrode location file to contain proper number of elements. \n"\ "\n\n"\ "Last channel (-1) is always reserved for TMS pulse marking.\n\n"\ "Slow mode will reduce the refresh rate of the raw signal plot. It will not affect features readout or calibration.") #configuration file path text self.text_conf_file = QLabel(self) self.text_conf_file.setMaximumWidth(370) self.text_conf_file.setWordWrap(True) self.text_conf_file.setText("No configuration file selected") #Tab widget for both plots as tabs tabwidget = QTabWidget() #add every widget and layout to the main layout vbox = QVBoxLayout() vbox.addWidget(self.button3) vbox.addWidget(self.text_conf_file) vbox.addWidget(self.button2) vbox.addStretch(1) vbox.addLayout(time_between_layout) vbox.addLayout(cut_time_layout) vbox.addLayout(num_ch_layout) vbox.addLayout(num_lines_layout) vbox.addLayout(eog_ch_layout) vbox.addLayout(emg_ch_layout) vbox.addLayout(exp_trig_layout) vbox.addLayout(exp_time_layout) vbox.addLayout(plot_len_layout) scroll = QScrollArea() scroll.setWidget(text_last_ch) scroll.setWidgetResizable(True) scroll.setFixedWidth(390) vbox.addWidget(scroll) vbox.addLayout(box_layout) vbox.addWidget(self.outliers_box) vbox.addStretch(5) vbox.addWidget(self.button1) vbox.addWidget(self.file_path) feature_choice_layout = QVBoxLayout() # feature_choice_layout = QFormLayout() text_for_combo = QLabel(self) #text_for_combo.setMaximumWidth(370) text_for_combo.setWordWrap(True) text_for_combo.setText("You can choose up to 6 different measurements that will be "\ "calculated during the intervention. For each of them threshold can be set. If \"%\" option is "\ "used the threshold will be calculated as the maximum registered value +/- given percent of distance between them.") connection_settings_text = QLabel(self) connection_settings_text.setWordWrap(True) connection_settings_text.setText('Set type of EEG system, ip address, and port') self.combobox_system = QComboBox() self.combobox_system.addItems(['NeurOne', 'Brain Products']) self.combobox_system.currentIndexChanged.connect(self.systemChanged) self.ip_box, self.line_ip, ip_layout = add_thing(self, 'IP: ', self.ip, 100) self.port_box, self.line_port, port_layout = add_thing(self, 'Port: ', self.port, 100) comboboxes = [self.combobox1, self.combobox2, self.combobox3, self.combobox4, self.combobox5, self.combobox6] feature_choice_layout.addWidget(text_for_combo) for idx,combobox in enumerate(comboboxes): vbox_temp = QHBoxLayout() vbox_temp.addWidget(combobox) vbox_temp.addWidget(self.checkboxes[idx*2]) vbox_temp.addWidget(self.line_edits[idx*2]) vbox_temp.addWidget(self.checkboxes[idx*2+1]) vbox_temp.addWidget(self.line_edits[idx*2+1]) feature_choice_layout.addLayout(vbox_temp) feature_choice_layout.addStretch(1) feature_choice_layout.addWidget(connection_settings_text) feature_choice_layout.addWidget(self.combobox_system) feature_choice_layout.addLayout(ip_layout) feature_choice_layout.addLayout(port_layout) self.feature_choice_widget = QWidget() self.feature_choice_widget.setLayout(feature_choice_layout) feature_choice_layout.addStretch(2) hbox = QHBoxLayout() hbox.addWidget(self.button_change_electrodes) hbox.addWidget(self.text_right) #set genera layout layout = QGridLayout() # layout.addWidget(self.button2, 0,1,1,1) layout.addLayout(hbox, 0, 1, 1, 1) # layout.addWidget(self.button_change_electrodes, 0, 1, 1, 1) # layout.addWidget(self.text_right, 0, 1, 1, 1) layout.addWidget(text_left, 0, 0, 1, 1) tabwidget.addTab(self.canvas, 'Select electrodes for features') tabwidget.addTab(self.canvas2, 'See the whole cap') tabwidget.addTab(self.feature_choice_widget, 'Features and connection') # tabwidget.addTab(self.threshold_widget, 'Thresholds') #layout.addWidget(self.canvas, 1, 1, 1, 1) layout.addLayout(vbox, 1, 0, 1, 1) layout.addWidget(self.button4, 2,1,1,1) layout.addWidget(tabwidget, 1,1,1,1) self.main_frame.setLayout(layout) #set main layout #Tight layout to remove padding on the sides self.fig.tight_layout(pad=0.95)#, rect=(0.02,-0.02,1.02,1.02)) self.fig2.tight_layout(pad=0.95) self.setCentralWidget(self.main_frame) self.show() def get_file(self): """ Load file path and set it in the main program. """ err = False self.path = QFileDialog.getOpenFileName(self, 'Open File', os.path.dirname(os.getcwd()), 'Text files (*.txt *.csv)') print(self.path) self.montage_file_path = self.path[0] montage_matrix = np.array([]) try: montage_matrix = np.array(pd.read_csv(self.montage_file_path, header=None)) except: err = True if len(montage_matrix.shape)!=2 or montage_matrix.shape[0]!=montage_matrix.shape[1] or err==True: self.montage_file_path = None self.bad_montage_msg = QMessageBox(self) self.bad_montage_msg.setIcon(QMessageBox.Warning) self.bad_montage_msg.setText("The montage file is incorrect. It has to be csv file resulting in (x, x) size. Choose the correct file!") self.bad_montage_msg.setWindowTitle("Montage file error") self.bad_montage_msg.setStandardButtons(QMessageBox.Ok) self.bad_montage_msg.show() else: self.file_path.setText(self.path[0]) def restart_kernel(i): os._exit(00) def get_electrodes_file(self): """ Load file path for electrode placement and set it in the main program. """ temp_path = QFileDialog.getOpenFileName(self, 'Full cap file', os.path.dirname(os.getcwd()), 'Text files (*.txt *.csv)') print(temp_path) if temp_path[0]=="": return 0 temp_path2 = QFileDialog.getOpenFileName(self, 'Selected electrodes', os.path.dirname(os.getcwd()), 'Text files (*.txt *.csv)') print(temp_path2) if temp_path2[0]=="": return 0 with open('Electrode_selection.txt', 'w') as f: f.write('full_cap_file_path: {}'.format(temp_path[0])) f.write('\n') f.write('cap_file_path: {}'.format(temp_path2[0])) self.msg = QMessageBox() self.msg.setIcon(QMessageBox.Information) self.msg.setText("Python Kernel will restart now. Run the program again.") # msg.setInformativeText("This is additional information") self.msg.setWindowTitle("Restart") # msg.setDetailedText("The details are as follows:") self.msg.setStandardButtons(QMessageBox.Ok) self.msg.buttonClicked.connect(self.restart_kernel) self.msg.show() def get_configuration(self): """ Load file path for configuration file and re-set values in the window. Returns ------- bool Used only as a function breaking exception. Returns False when no path was chosen. """ self.conf_path = QFileDialog.getOpenFileName(self, 'Open File', os.path.dirname(os.getcwd()), 'Text files (*.txt *.csv)')[0] print(self.conf_path) if self.conf_path=="": return False self.text_conf_file.setText("Configuration file: " + self.conf_path) self.set_values(True) def run_main_program(self): """ Runs main program and passes all parameters. """ def confirmation(msg): """ If OK option was chosen passes all parameters and runs the main program. Parameters ---------- msg : msg type from QMessageBox """ print(str(msg.text())) if msg.text()=="&OK" or msg.text()=="OK": self.main_app = self.AppForm(self.params_to_pass) self.main_app.show() self.confirmBox.close() self.showMinimized() #self.close() #add EOG and EMG name to make channel names list complete percentages = [] values = [] for i in range(12): percentages.append(self.checkboxes[i].isChecked()) values.append(float(self.line_edits[i].text())) arr_temp = np.arange(int(self.line_num_ch.text())) if self.line_eog_ch.text().strip()=="" and self.line_emg_ch.text().strip()=="": all_names_temp = self.all_names elif self.line_eog_ch.text().strip()=="" and self.line_emg_ch.text().strip()!="": all_names_temp = np.append(self.all_names, ["EMG"]) elif self.line_eog_ch.text().strip()!="" and self.line_emg_ch.text().strip()=="": all_names_temp = np.append(self.all_names, ["EOG"]) elif arr_temp[int(self.line_eog_ch.text())] < arr_temp[int(self.line_emg_ch.text())]: all_names_temp = np.append(self.all_names, ["EOG", "EMG"]) else: all_names_temp = np.append(self.all_names, ["EMG", "EOG"]) print(all_names_temp) chosen_features = [self.combobox1.currentIndex(), self.combobox2.currentIndex(), self.combobox3.currentIndex(), self.combobox4.currentIndex(), self.combobox5.currentIndex(), self.combobox6.currentIndex()] if self.combobox_system.currentIndex()==0: self.offline = "NeurOne" #"BrainProducts" #False else: self.offline = "BrainProducts" #Create a dictionary with structures to pass self.params_to_pass = {'time_between': int(self.line_time_between.text()), 'cut_time': int(self.line_cut_time.text()), 'included_ch': self.included_ch, 'eog_ch': json.loads('[{}]'.format(self.line_eog_ch.text())), #that will turn it into list (also '2,3' --> [2, 3]) 'emg_ch': json.loads('[{}]'.format(self.line_emg_ch.text())), #but don't use more than one channel for now! 'num_of_ch': int(self.line_num_ch.text()), 'num_of_lines': int(self.line_num_lines.text()), 'montage_path': self.montage_file_path, 'ch_names': all_names_temp, 'slow_mode': self.box.isChecked(), 'features': chosen_features, 'notch': self.notch_box.isChecked(), 'eye_reg': self.eye_reg_box.isChecked(), 'remove_outliers': self.outliers_box.isChecked(), 'ip': str(self.line_ip.text()), 'port': int(self.line_port.text()), 'offline': self.offline, 'exp_trig': int(self.line_exp_trig.text()), 'exp_time': int(self.line_exp_time.text()), 'percentages': percentages, 'thr_values': values, 'plot_len': int(self.plot_len_time.text()) } print(self.params_to_pass) #ConfirmationBox to check and inform the user if everything is fine. #If not, pass the comment about what is wrong self.confirmBox = QMessageBox(self) self.confirmBox.setDetailedText("Initial choice\nNames: {}\nIncluded:{}\n\n"\ "Final choice\nNames: {}\nIncluded: {}".format( self.ch_names_loaded, self.included_ch_loaded, self.params_to_pass["ch_names"], self.params_to_pass["included_ch"])) if self.params_to_pass['num_of_ch'] == len(self.params_to_pass["ch_names"]): msg_ending = "Set number of channels is equal with number of chosen electrodes :)\n\n"\ "channels selected (parameter): {}\nchosen channels: {}".format( self.params_to_pass['num_of_ch'], len(self.params_to_pass["ch_names"])) else: msg_ending = "Set number of channels is NOT equal with number of chosen electrodes :(\n\n"\ "channels selected (parameter): {}\nchosen channels: {}".format( self.params_to_pass['num_of_ch'], len(self.params_to_pass["ch_names"])) if np.array_equiv(all_names_temp, self.ch_names_loaded): self.confirmBox.setIcon(QMessageBox.Information) self.confirmBox.setText("Confirm the choice of electrodes. Click details to check it.") else: self.confirmBox.setIcon(QMessageBox.Warning) self.confirmBox.setText("You've changed the electrode selection in the software. Please check and confirm your choice. "\ "Remember that the selection of electrodes has to correspond to the protocol from the EEG system! "\ "If the number of input channels will be the same as the number of labels, but labels will be incorrect, "\ "software will run showing incorrect labels.\n\n{}".format(msg_ending)) self.confirmBox.setWindowTitle("Confirm changes") self.confirmBox.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel) self.confirmBox.buttonClicked.connect(confirmation) self.confirmBox.show() if __name__ == '__main__': app = QApplication(sys.argv) form = First_window('test') #AppForm() form.show() app.exec_()