#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Tue Nov 23 11:11:41 2021 @author: adamr """ import sys, os sys.path.append(os.path.dirname(os.path.abspath(__file__))) from connection import NeurOne, RDA # import RDA import time import matplotlib import mne #import inspect import sys, os import numpy as np import scipy.signal as ss import matplotlib.pyplot as plt import matplotlib as mpl import matplotlib.cm as cm import PyQt5.uic as uic import pandas as pd import json import scipy.stats as st import csv import datetime import ctypes import scipy import pywt # import queue from cycler import cycler # from matplotlib.backend_bases import MouseButton from PyQt5.QtCore import QTimer, Qt from PyQt5.QtGui import QImage, QPixmap, QIcon, QFont from PyQt5.QtWidgets import (QMainWindow, QFileDialog, QMessageBox, QCheckBox, QLineEdit, QWidget, QPushButton, QLabel, QHBoxLayout, QGridLayout, QAction, QApplication, QDialog, QDialogButtonBox, QVBoxLayout, QFrame, QFormLayout) from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar from matplotlib.figure import Figure from mne.channels.layout import _find_topomap_coords as get_pos if sys.platform=='darwin': import multiprocessing as mp from multiprocessing import Process, Value #from multiprocessing import Queue as QueueOld #Queue doesn't work on MacOS else: from multiprocessing import Process, Queue, Value from utils.Waiting import Waiting_window from utils.FirstWindow import First_window from utils.Functions import (apply_montage, eye_reg, most_frequent, connect_sig, set_to_gray, update_stem, adjust_hist, min_zero, pentropy, entropy, check_integrity, doubleMADsfromMedian) if sys.platform=='darwin': from multiprocessing.queues import Queue as QueueOld class SharedCounter(object): #source: https://gist.github.com/FanchenBao/d8577599c46eab1238a81857bb7277c9 """A synchronized shared counter. The locking done by multiprocessing.Value ensures that only a single process or thread may read or write the in-memory ctypes object. However, in order to do n += 1, Python performs a read followed by a write, so a second process may read the old value before the new one is written by the first process. The solution is to use a multiprocessing.Lock to guarantee the atomicity of the modifications to Value. This class comes almost entirely from Eli Bendersky's blog: http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/ """ def __init__(self, n=0): self.count = mp.Value('i', n) def increment(self, n=1): """ Increment the counter by n (default = 1) """ with self.count.get_lock(): self.count.value += n @property def value(self): """ Return the value of the counter """ return self.count.value class Queue(QueueOld): #source: https://gist.github.com/FanchenBao/d8577599c46eab1238a81857bb7277c9 """ A portable implementation of multiprocessing.Queue. Because of multithreading / multiprocessing semantics, Queue.qsize() may raise the NotImplementedError exception on Unix platforms like Mac OS X where sem_getvalue() is not implemented. This subclass addresses this problem by using a synchronized shared counter (initialized to zero) and increasing / decreasing its value every time the put() and get() methods are called, respectively. This not only prevents NotImplementedError from being raised, but also allows us to implement a reliable version of both qsize() and empty(). Note the implementation of __getstate__ and __setstate__ which help to serialize MyQueue when it is passed between processes. If these functions are not defined, MyQueue cannot be serialized, which will lead to the error of "AttributeError: 'MyQueue' object has no attribute 'size'". See the answer provided here: https://stackoverflow.com/a/65513291/9723036 For documentation of using __getstate__ and __setstate__ to serialize objects, refer to here: https://docs.python.org/3/library/pickle.html#pickling-class-instances """ def __init__(self): super().__init__(ctx=mp.get_context()) self.size = SharedCounter(0) def __getstate__(self): """Help to make MyQueue instance serializable. Note that we record the parent class state, which is the state of the actual queue, and the size of the queue, which is the state of MyQueue. self.size is a SharedCounter instance. It is itself serializable. """ return { 'parent_state': super().__getstate__(), 'size': self.size, } def __setstate__(self, state): super().__setstate__(state['parent_state']) self.size = state['size'] def put(self, *args, **kwargs): super().put(*args, **kwargs) self.size.increment(1) def get(self, *args, **kwargs): item = super().get(*args, **kwargs) self.size.increment(-1) return item def qsize(self): """ Reliable implementation of multiprocessing.Queue.qsize() """ return self.size.value def empty(self): """ Reliable implementation of multiprocessing.Queue.empty() """ return not self.qsize() class NeurOneOffline(): def __init__(self): self.data = None def NO(self,ip,port=50000,buffersize=2**15,ringbuffersize=2000, sendqueue=False,ringbuf_factor=2,dump=False,avgPackets=1): self.ringbuffersize = ringbuffersize tmp_path = '/mnt/projects/P_BCT_EEG/DLPFCM1_iTBS/DLPFC/nobeep/subj_8/X47851_iTBS.vhdr'# #'/mnt/projects/P_BCT_EEG/DLPFCM1_iTBS/DLPFC/beep/subj_14/X13193_adam.vhdr' #'/mnt/projects/P_BCT_EEG/DLPFCM1_iTBS/DLPFC/beep/subj_6/X7738_iTBS.vhdr' num_electr = 18 eeg_chn = np.arange(0,num_electr,1) hdr = mne.io.read_raw_brainvision(tmp_path) # hdr.set_channel_types({'EMGleft': 'emg', 'EOGright': 'eog'}) # hdr.set_montage(mne.channels.read_custom_montage('easycap-M10_63_NO.txt')) # mrk_fullpath = tmp_path[:-4]+'vmrk' # eeg_fullpath = tmp_path[:-4]+'eeg' #this two are made by hand instead of function. #Maybe there is some func for this # Annotations returns all events - stimA, stimB, stopA, stopB, start of experiment etc... We chose only stim stim = hdr.annotations.onset[np.logical_or(hdr.annotations.description=="Stimulus/A", hdr.annotations.description=="Stimulus/B")] # Separate stimA and stimB stimA = hdr.annotations.onset[hdr.annotations.description=="Stimulus/A"] stimB = hdr.annotations.onset[hdr.annotations.description=="Stimulus/B"] #divide for stim A and B #stimR = hdr.annotations.onset[hdr.annotations.description=='Response/R 16'] npts = hdr.n_times nfft = int(hdr.info['sfreq']) # Sampling rate [Hz] fs = int(hdr.info['sfreq']) # Sampling rate [Hz] endsample = npts begsample = 0 print('Sampling rate [Hz]: ' + str(fs)) #[stim, resp, segment, timezero] = mne.read_annotations(mrk_fullpath) thinkkkkk eeg_raw = hdr.get_data(start=begsample, stop=endsample); eeg = eeg_raw[eeg_chn, :] # Select all electrodes data_raw = (hdr.get_data()*1e7) # data_raw = data_raw[:,:]#int(data_raw.shape[1]%5)] print(data_raw.shape) stimB_arr = (stimB*fs).astype(int) dividing_factor = int(fs/1000) data_raw_new = np.zeros([data_raw.shape[0], int(data_raw.shape[1]//dividing_factor)-1]) for i in range(data_raw.shape[0]): a = data_raw[i, :-int(data_raw.shape[1]%dividing_factor)-dividing_factor] R = dividing_factor data_raw_new[i,:] = a.reshape(-1, R).mean(1) data_raw = data_raw_new fs=1000 stimB_arr = (stimB_arr/dividing_factor).astype(int) stim_sig = np.zeros(data_raw.shape[1]).reshape([1,data_raw.shape[1]]) for i in stimB_arr: stim_sig[0,i] = 1 self.fs = fs self.data = np.concatenate((data_raw, stim_sig)) self.buffer = self.data[:,:ringbuffersize] def start(self): self.time = time.time() def getBuffer(self): time_now = time.time() time_diff = int((time_now - self.time)*self.fs) return self.data[:,time_diff-self.ringbuffersize:time_diff] def acquire_data(q, size, run, speed, downsample, sleep_time, ip = '192.168.200.201', port = 5000, offline=False): """Acquires data from NeurOne_v3 and pass it to the queue. Function is supposed to work in separated process. Parameters ------------ q: Queue class from multiprocessing library size: number of samples to acquire run: Value class from multiprocessing library. That value can be changed in main process downsample: boolean value. Says if data will be downsampled to 1000 Hz sleep_time: int, set how often function should refresh. Usually it takes a bit more that that""" # offline = 'offline' #import NeurOne_v3 if offline=="offline": NO = NeurOneOffline() NO.NO(ip=ip,port=50000,buffersize=2**15,ringbuffersize=size,\ sendqueue=False,ringbuf_factor=2,dump=False,avgPackets=downsample) #offline elif offline=="NeurOne": # ip='192.168.200.201' # port = 50000 NO = NeurOne.NO(ip=ip,port=port,buffersize=2**15,ringbuffersize=size,\ sendqueue=False,ringbuf_factor=2, dump=None,avgPackets=downsample) #online elif offline=="BrainProducts": # ip = "169.254.252.66" # port = 51244 NO = RDA.RDA(ip=ip, port=port, buffersize=2**15, ringbuffersize=size,\ sendqueue=False, avgPackets=downsample) NO.start() while True: startt = time.time() if run.value: data = NO.getBuffer()#[channel_list] try: q.put(data) except: print('ERROR') endd=time.time() time.sleep(sleep_time*speed.value-(endd-startt)) class AppForm(QMainWindow): """TMS GUI script.""" def __init__(self, passed_params = None, parent=None): super().__init__() #self.setStyleSheet("background: whitesmoke") self.offline = None #"offline" #'NeurOne' #"BrainProducts"#False self.time_start = time.time() QMainWindow.__init__(self, parent) self.montage_file_path = 'montage_18ch.csv' self.features() self.speed_general = 250 # fs has to be divisible by this value self.all_params(passed_params) self.run = Value('i', 0) # Value that can be changed inside separated process # To control data flow speed we can initialize another variable used inside the process self.speed = Value(ctypes.c_float, self.speed_general/1000) self.q = Queue() # Queue for data from separated process (data acquire) # Run acquire_data function in separated process so it doesn't freeze when other operations are going self.p = Process(target=acquire_data, args=(self.q, self.size_of_up, self.run, self.speed, True, self.update_time, self.ip, self.port, self.offline)) self.p.start() #start it self.timer = QTimer(timerType = Qt.PreciseTimer) #preparing timer self.timer.setInterval(self.speed_general) #every 1000 ms = 1 sec self.timer.timeout.connect(self.update) #use update fuction every 1 sec #self.timer.start() #set name and icon (for some reason the icon doesn't appear on the taskbar) self.setWindowTitle('EStiMo') self.setWindowIcon(QIcon('icon.jpg')) def features(self): """Collection of implemented features. Should be moved to separated .py file in the future, to make adding new features easier. For now it's part of the class, what also optimizes it slightly, because some of parameters are shared within a class TO ADD NEW FEATURES: check the bottom of this function""" def calculate_theta(): #1 """Theta band calculation based on previously calculated FFT/Welch""" theta = self.S[:, self.theta_band[0]:self.theta_band[1]].mean(1) return theta.mean() def calculate_alpha(): #2 """Alpha band calculation based on previously calculated FFT/Welch""" alpha = self.S[:, self.alpha_band[0]:self.alpha_band[1]].mean(1) return alpha.mean() def calculate_beta(): #3 """Beta band calculation based on previously calculated FFT/Welch""" beta = self.S[:,self.beta_band[0]:self.beta_band[1]].mean(1) return beta.mean() def calculate_h_gamma(): #4 """High Gamma band calculation based on previously calculated FFT/Welch""" high = self.S[:,80:120].mean(1) return high.mean() def calculate_temp_entropy(): #5 """Temporal Entropy calculation""" return np.mean(np.mean([entropy(self.second_to_analyze[e_ch]) for e_ch in range(self.second_to_analyze.shape[0])])) def calculate_spect_entropy(): #6 """Spectral Entropy calculation based on previously calculated FFT/Welch""" return np.mean(np.mean([entropy(self.S[e_ch]) for e_ch in range(self.S.shape[0])])) def calculate_line_length(): #7 """Line length calculation. The function scipy.spatial.distance.cdist makes it very fast compared to other methods""" ch_sum = 0 for i in range(self.second_to_analyze.shape[0]): a_seg = np.zeros([2, self.second_to_analyze.shape[1]]) a_seg[0,:] = self.second_to_analyze[0]#*1e6 a_seg[1,:] = np.linspace(0,1,self.Fs) matrix_dist = scipy.spatial.distance.cdist(a_seg, a_seg, 'euclidean') ch_sum += sum(np.diagonal(matrix_dist,1)) return ch_sum def wavelets(band): #8,9,10,11 """Power in given band based on DWT calculated before. Works correctly only for fs=1000 Hz!!! With different Fs it will give power for different band. Ranges for 1000 Hz are: 1000, 500, 250, 125, 62.5, 31.25, 15.625, 7.8125, 3.9062, 0. We are interested in lasts of them: 8: 0 - 3.906 Hz 9: 3.906 - 7.812 Hz 10: 7.812 - 15.625 Hz 11: 15.625 - 31.25 Hz """ band = band - 9 #weird way, but then call from function works well for particular band #but works correctly only for this set of features! dwt_pw1, dwt_pw2, dwt_pw3, dwt_pw4 = self.dwt[:4] #sum of squares dwt_pw1 = np.sum(dwt_pw1**2) dwt_pw2 = np.sum(dwt_pw2**2) dwt_pw3 = np.sum(dwt_pw3**2) dwt_pw4 = np.sum(dwt_pw4**2) dwts = dwt_pw1, dwt_pw2, dwt_pw3, dwt_pw4 return dwts[band] def calculate_variance(): #12 """Sum of Variance calculation""" return sum(np.var(self.second_to_analyze,1)) def calculate_corr(): #13 """Temporal Correlation between channels calculation. Returnes mean of absoulte values of correlations between channels""" R = np.corrcoef(self.second_to_analyze) if type(R)==np.ndarray: R = abs(np.tril(R, -1)) #tril zeroes one triangle of array R_sum = R[R!=0] #we take only what is left if type(R)==np.ndarray: #in case it is just only one value (that means we analyze only one channels) R_sum = R_sum[~np.isnan(R_sum)] return np.mean(np.abs(R_sum)) ############################################### # You can add new functions now: ############################################### # Add all new functions to this list self.functions = [None, calculate_theta, calculate_alpha, calculate_beta, calculate_h_gamma, calculate_spect_entropy, calculate_temp_entropy, calculate_line_length, wavelets, wavelets, wavelets, wavelets, calculate_variance, calculate_corr] # Set the unit for the output of each function (they correspond to functions based on index) self.unit_label = ['None', "Power", "Power", "Power", "Power", "Entropy", "Entropy", "Length", "Power", "Power", "Power", "Power", "Variance", "Correlation"] # Add name of the added feature to the end of this list self.features_names = ['None', 'Theta FFT Power', 'Alpha FFT Power', 'Beta FFT Power', 'High Gamma FFT Power', 'Spectral entropy', 'Temporal entropy', 'Line length', 'DWT Power 0-4 Hz', 'DWT 4-8 Hz', 'DWT 8-16 Hz', 'DWT 16-31 Hz','Variance','Correlation'] # One last thing to use your new function is to add string with its name # to the other First_window.py: variable features_names in class First_window def all_params(self, passed_params = None, restarted=''): #reads values from TMS_protocol.txt file #montage matrix is a matrix that is multiplied with the signal #identity matrix multiply all channels by 1, so we have the same signal as an output settings_file = pd.read_csv('settings/TMS_protocol.txt',sep=':', header=None) #read file with settings print(restarted) #open the file to save values during the experiment self.log_file = open('logs//TMS_log_'+str(f"{datetime.datetime.now():%Y-%m-%d-%H-%M}"+restarted+'.csv'), 'w') self.log_file_writer = csv.writer(self.log_file) self.log_file.flush() #flushing is applying changes we made to the file self.Fs = 1000 #5000 self.size_of_up = 2*self.Fs #5000 how much data we get from NeurOne function #file with settings and assigning variables to them # settings_file = pd.read_csv('settings/TMS_protocol.txt',sep=':', header=None) #params can be set in GUI, so then no need for using ones from the file if passed_params is not None: self.montage_file_path = passed_params['montage_path'] self.time_between_bursts = int(passed_params['time_between']) self.breaktime = int(passed_params['cut_time']) self.included_ch = passed_params['included_ch'] self.eog_ch_l = list(passed_params['eog_ch']) self.emg_ch_l = list(passed_params['emg_ch']) self.num_of_ch = int(passed_params['num_of_ch']) self.num_of_lines = int(passed_params['num_of_lines']) self.ch_names = passed_params['ch_names'] self.slow_mode = passed_params['slow_mode'] self.used_features = passed_params['features'] self.use_notch = passed_params['notch'] self.use_regression = passed_params['eye_reg'] self.remove_outliers = passed_params['remove_outliers'] self.ip = passed_params['ip'] self.port = passed_params['port'] self.offline = self.offline if not self.offline==None else passed_params['offline'] self.exp_trig = passed_params['exp_trig'] self.exp_time = passed_params['exp_time'] self.if_percentage = passed_params['percentages'] self.received_thr_values = passed_params['thr_values'] self.plot_len = passed_params['plot_len'] print(self.offline) else: self.montage_file_path = 'montage_18ch.csv' self.time_between_bursts = int(settings_file[settings_file[0]=='time_between_trains'].values[0][1]) self.breaktime = int(settings_file[settings_file[0]=='cut_time'].values[0][1]) self.included_ch = settings_file[settings_file[0]=='included_channels'].values[0][1].strip() self.eog_ch_l = [int(settings_file[settings_file[0]=='eog_channel'].values[0][1])] self.emg_ch_l = [int(settings_file[settings_file[0]=='emg_channel'].values[0][1])] self.num_of_ch = int(settings_file[settings_file[0]=='number_of_channels'].values[0][1]) self.num_of_lines = int(settings_file[settings_file[0]=='number_of_lines'].values[0][1]) self.exp_trig_loaded = int(settings_file[settings_file[0]=='expected_triggers'].values[0][1]) self.exp_time_loaded = int(settings_file[settings_file[0]=='expected_time'].values[0][1]) self.if_percentage = np.ones(12) self.received_thr_values = [10]*12 self.slow_mode = False self.used_features = [0,1,2,3,4,5] self.use_notch = True self.use_regression = True self.remove_outliers = True self.ip = '192.168.200.201' self.port = 50000 self.offline = False self.plot_len = 4 #length of data to plot (last seconds of data array) self.unit_label = np.array(self.unit_label)[self.used_features] self.log_file_writer.writerow(['time', 'state', self.used_features]) if len(self.eog_ch_l)==0: self.eog_ch = '' if len(self.emg_ch_l)==0: self.emg_ch = '' self.eog_ch = self.eog_ch_l[0] self.emg_ch = self.emg_ch_l[0] if self.slow_mode: self.speed_general = 1000 else: self.speed_general = 250 if self.included_ch in ['all', 'All']: self.included_ch = np.arange(len(self.ch_names)) elif type(self.included_ch)==list: pass else: self.included_ch = json.loads(self.included_ch) print(self.included_ch) # self.montage_matrix = np.identity(self.num_of_ch) if self.montage_file_path in [None, '']: self.montage_file_path = 'settings/montage_18ch.csv' self.montage_matrix = np.array(pd.read_csv(self.montage_file_path, header=None)) #initialization of variables. Some of them are loaded from file and cannot be changed in the GUI self.no_eeg = len(self.eog_ch_l) + len(self.emg_ch_l) # print(self.no_eeg) # self.ch_names = json.loads(settings_file[settings_file[0]=='names'].values[0][1]) self.update_time = 1 #how many seconds between updates of everything #prepare empty arrays for data self.feature1 = np.full([1000, self.time_between_bursts-self.breaktime], None) self.feature2 = np.full([1000, self.time_between_bursts-self.breaktime], None) self.feature3 = np.full([1000, self.time_between_bursts-self.breaktime], None) self.feature4 = np.full([1000, self.time_between_bursts-self.breaktime], None) self.feature5 = np.full([1000, self.time_between_bursts-self.breaktime], None) self.feature6 = np.full([1000, self.time_between_bursts-self.breaktime], None) #json.loads takes string and returns list, so we can read list from txt file self.alpha_band = json.loads(settings_file[settings_file[0]=='alpha_range'].values[0][1]) self.beta_band = json.loads(settings_file[settings_file[0]=='beta_range'].values[0][1]) self.theta_band= json.loads(settings_file[settings_file[0]=='theta_range'].values[0][1]) self.colors = ['b', 'm', 'r', 'k', 'c', ] #colors used for lines if less that 6 of them self.data_len = 30*self.Fs #length of the data array in seconds self.plot_dividing_factor = 100 self.previous_state = np.zeros(6) if self.num_of_lines>5: #if more than 5 lines then colors of them from colormap self.colors = cm.plasma(np.linspace(0,1,self.num_of_lines)) #Example of thresholds for each measurment. Changes after first second of calibration. self.thr_parameter = int(settings_file[settings_file[0]=='threshold_parameter'].values[0][1]) self.thr_2 = [20,100] self.thr_1 = [20,100] self.thr_3 = [20,100] self.thr_4 = [0, 0.5] self.thr_4 = [0, 0.5] self.thr_5 = [0.3, 0.8] self.thr_6 = [1,3] self.linetemps = np.zeros(6, dtype=object) self.labels_size = 7 self.lab_x_dist = 1. self.lab_y_dist = 0.5 self.measures_x_lims = np.array([[-1,1],[0,350],[0,350],[0,350],[0,2],[3,8]], dtype=float) self.measures_y_lims = np.array([[0,70],[0,70],[0,70],[0,70],[0,70],[0,70]], dtype=float) self.create_main_frame() #create plots, buttons, figures etc... #create data array self.loaded = np.full([self.num_of_ch,self.data_len], None) self.loaded_full = np.full([self.num_of_ch+1,self.data_len], None) self.data = np.full((self.num_of_ch,self.data_len), None) self.trigg_data = np.zeros(self.data_len) #array to keep trigger data in self.num=0 self.doit=0 #to count number of seconds after last stimuli in train self.plot_story = np.zeros([self.num_of_lines, 6], dtype=object) #thigs that are being plot self.trial_num = 0 #how many train there were self.auto_readout_lim = True self.disp_max = True self.do_calibration = False #if True then calibration process is going on self.calibration_counter = 0 self.calibration_counter_max = int(1000/self.speed_general) #Lists to keep calibration values for each measurment self.f1_cal = [] self.f2_cal = [] self.f3_cal = [] self.f4_cal = [] self.f5_cal = [] self.f6_cal = [] self.qmbx = None self.red_dots = [] def data_processing(self, calibration=False): """Pre-processing of the data used for extracting features""" #in case of readout period times have to be adjusted to the last pulse, during calibration it's just a last second. if calibration: self.last_sec = self.loaded_noeye[:,-self.Fs:].copy() if self.use_regression: eog = self.loaded_noeye[self.eog_ch, -self.Fs:] else: self.last_sec = self.loaded_noeye[:,self.id1:self.id2].copy() if self.use_regression: eog = self.loaded_noeye[self.eog_ch ,self.id1:self.id2] [A,B] = ss.butter(2, 0.1/(self.Fs/2), 'highpass') self.last_sec = ss.filtfilt(A, B, self.last_sec) if self.use_notch: [A,B] = ss.butter(2, [49/(self.Fs/2), 51/(self.Fs/2)], 'bandstop') self.last_sec = ss.filtfilt(A, B, self.last_sec) self.last_sec = ss.detrend(self.last_sec, axis=1) # plt.figure() # plt.plot(self.last_sec[self.included_ch].T) # plt.plot(eog) #that's stupid, move channel selection before!!!!!!!!!!!!! if self.use_regression: self.last_sec = eye_reg(self.last_sec[self.included_ch], eog) return self.last_sec def update_methods(self): """Calculates all features and plot their values""" times = time.time() self.second_to_analyze = self.data_processing() #Take data #calculate fft self.S = abs(np.fft.rfft(self.second_to_analyze)) #do dwt only if any features requires it if any(feature_num in self.used_features for feature_num in [9,10,11,12]): self.dwt = pywt.wavedec(self.second_to_analyze, 'db1', level=8) #db1, 8 levels. Works well for 1000 Hz only self.results = np.zeros(len(self.used_features)) #Put all results to one array for idx, feature in enumerate(self.used_features): if feature==0: self.results[idx] = None elif feature in [8,9,10,11]: #to specify which band self.results[idx] = self.functions[feature](feature) else: self.results[idx] = self.functions[feature]() #Checks how many fields were filled already, so we know what stage are we on #and where to save the data x = np.where(self.feature1==None)[0][0] y = np.where(self.feature1==None)[1][0] if y==0: self.previous_state = np.zeros(6) self.feature1[x,y] = self.results[0] self.feature2[x,y] = self.results[1] self.feature3[x,y] = self.results[2] self.feature4[x,y] = self.results[3] self.feature5[x,y] = self.results[4] self.feature6[x,y] = self.results[5] #save in the log self.log_file_writer.writerow([time.time() - self.time_start, 'between bursts', self.results]) self.log_file.flush() times1 = time.time() print("Calculation of features: {}".format(times1-times)) #if value is not within threshold values then background color is red (salmon), otherwise green #checks if there is a need to change a color of the background old_prv_state = self.previous_state.copy() if not all(np.isnan(self.thr_1)): if self.results[0]>=self.thr_1[0] and self.results[0]<=self.thr_1[1]: if self.previous_state[0]==1: self.axesMap[0,0].set_facecolor('whitesmoke') self.previous_state[0]=0 elif self.previous_state[0]==0: self.red_dots.append(self.axesMap[0,0].plot(y+1, self.results[0], 'r*')) self.axesMap[0,0].set_facecolor('xkcd:salmon') self.previous_state[0]=1 if not all(np.isnan(self.thr_2)): if self.results[1]>=self.thr_2[0] and self.results[1]<=self.thr_2[1]: if self.previous_state[1]==1: self.axesMap[0,1].set_facecolor('whitesmoke') self.previous_state[1]=0 elif self.previous_state[1]==0: self.red_dots.append(self.axesMap[0,1].plot(y+1, self.results[1], 'r*')) self.axesMap[0,1].set_facecolor('xkcd:salmon') self.previous_state[1]=1 if not all(np.isnan(self.thr_3)): if self.results[2]>=self.thr_3[0] and self.results[2]<=self.thr_3[1]: if self.previous_state[2]==1: self.axesMap[1,0].set_facecolor('whitesmoke') self.previous_state[2]=0 elif self.previous_state[2]==0: self.red_dots.append(self.axesMap[1,0].plot(y+1, self.results[2], 'r*')) self.axesMap[1,0].set_facecolor('xkcd:salmon') self.previous_state[2]=1 if not all(np.isnan(self.thr_4)): if self.results[3]>=self.thr_4[0] and self.results[3]<=self.thr_4[1]: if self.previous_state[3]==1: self.axesMap[1,1].set_facecolor('whitesmoke') self.previous_state[3]=0 elif self.previous_state[3]==0: self.red_dots.append(self.axesMap[0,0].plot(y+1, self.results[3], 'r*')) self.axesMap[1,1].set_facecolor('xkcd:salmon') self.previous_state[3]=1 if not all(np.isnan(self.thr_5)): if self.results[4]>=self.thr_5[0] and self.results[4]<=self.thr_5[1]: if self.previous_state[4]==1: self.axesMap[2,0].set_facecolor('whitesmoke') self.previous_state[4]=0 elif self.previous_state[4]==0: self.red_dots.append(self.axesMap[2,0].plot(y+1, self.results[4], 'r*')) self.axesMap[2,0].set_facecolor('xkcd:salmon') self.previous_state[4]=1 if not all(np.isnan(self.thr_6)): if self.results[5]>=self.thr_6[0] and self.results[5]<=self.thr_6[1]: if self.previous_state[5]==1: self.axesMap[2,1].set_facecolor('whitesmoke') self.previous_state[5]=0 elif self.previous_state[5]==0: self.red_dots.append(self.axesMap[2,1].plot(y+1, self.results[5], 'r*')) self.axesMap[2,1].set_facecolor('xkcd:salmon') self.previous_state[5]=1 #If there was no change in a color, not need to redraw figure if all(self.previous_state==old_prv_state): need_redraw = False else: #but otherwise we need to redraw the figure need_redraw = True alfa = 1 color = 'b' #if first plot if self.trial_num0: self.timer.setInterval(int(1*self.speed_general*0.97)) if self.q.qsize()<1 and self.timer.interval()!= int(self.speed_general*1.1): self.timer.setInterval(int(self.speed_general*1.03)) times = time.time() # self.offline='offline' #remove this! if self.offline=="offline": incl = [0,2,6,7,8,10,13,16,18,22,25,28,31,34,41,43,-3,-2,-1] # For offline only loaded_temp = self.q.get()[incl]/10 # Load data try: self.loaded, most_fr = connect_sig(self.loaded_full, loaded_temp, self.speed_general) except ValueError: print("ValueError, wait...") return 0 else: data_divide = 1 if self.offline=="NeurOne": data_divide = 10 loaded_temp = self.q.get()/data_divide try: self.loaded, most_fr = connect_sig(self.loaded_full, loaded_temp.T, self.speed_general) except ValueError: print("ValueError, wait...") return 0 self.loaded_noeye = self.loaded.copy() step1 = time.time()-time_start #if connection point is too far or too close it modifies the acquisition rate. try: if most_fr>900: self.speed.value=1.01*self.speed_general/1000 elif most_fr<500: self.speed.value=0.8*self.speed_general/1000 elif most_fr<800 and most_fr>500: self.speed.value = 0.98*self.speed_general/1000 print('speed:',self.speed.value) #TODO: check if that is still needed except UnboundLocalError: pass self.trigg = self.loaded[-1].copy() #keeps trigger in a different array self.loaded_full = self.loaded.copy() #keeps it for the next second step2 = time.time()-time_start # ZeroDivisionError means that data is not yet loaded try: # [A,B] = ss.butter(2, 0.1/(self.Fs/2), 'highpass') # self.loaded[:self.num_of_ch,-4*self.Fs:] = ss.filtfilt(A, B, self.loaded[:self.num_of_ch, -4*self.Fs:]) # self.loaded[:self.num_of_ch,-4*self.Fs:] = self.loaded[:self.num_of_ch,-4*self.Fs:] - np.mean(self.loaded[:self.num_of_ch,-4*self.Fs:],1, keepdims=True) self.loaded[:self.num_of_ch,-self.plot_len*self.Fs:] = ss.detrend(self.loaded[:self.num_of_ch,-self.plot_len*self.Fs:]) except ZeroDivisionError: # This error means that buffer is still not full if self.qmbx == None: self.qmbx = Waiting_window() # Small window with a message to wait print('Waiting for data (should take up to few seconds)') return(0) # Stop function here, because in this case rest of update is not needed # Getting to this point of the code means there were no exception before # So the waiting window can be destroyed if self.qmbx!=None: self.qmbx.setText('Data received!') self.qmbx=None #then it dies step3 = time.time()-time_start # Eye regression self.trigg_data = self.trigg.copy() od = self.data_len-self.plot_len*self.Fs # First point do = self.data_len # Last point # For interpolation of TMS artifact int_from = 0.3 #first point for interpolation int_to = 0.3 #last point for interpolation step4 = time.time()-time_start # If two stimuli (A and B) are sent together (shouldn't be) then it removes one to not create chaos stim = check_integrity(np.where(self.trigg_data[od:do]>0)[0]) step5 = time.time()-time_start # If there are some pulses detected then it interpolates the data used for the visualisation if len(stim)>0: for ind in stim[::-1]: size = self.data_len - (od+ind-int(int_from*self.Fs)) # print('size:', size) # print(len(self.loaded)) # Interpolation - pretty long line, but basically it chooses ranges and # assign boundary value as a baseline and does that in (I guess) more optimal way than using loops # self.loaded[:, od+ind-int(int_from*self.Fs):od+ind+int(int_to*self.Fs)] = np.outer( # self.loaded[:,min(od+ind+int(int_to*self.Fs), 30000-1)], np.ones(min(size, int((int_from+int_to)*self.Fs)))) # this way is even easier... self.loaded[:, od+ind-int(int_from*self.Fs):od+ind+int(int_to*self.Fs)] = self.loaded[:,min(od+ind+int(int_to*self.Fs), 30000-1)].reshape(-1, 1) # for i in range(self.loaded.shape[0]): # self.loaded[i, od+ind-int(int_from*self.Fs):od+ind+int(int_to*self.Fs)] = np.linspace( # self.loaded[i,min(od+ind-int(int_from*self.Fs),30000-1)], # self.loaded[i,max(od+ind+int(int_to*self.Fs),30000-1)], # od+ind+int(int_to*self.Fs)-(od+ind-int(int_from*self.Fs))) step6 = time.time()-time_start # Eye regression for visualisation #plt.figure() #plt.plot(self.loaded[3, -4*self.Fs:]) #plt.plot(self.loaded[self.eog_ch,-4*self.Fs:], '--b') if self.use_regression: self.loaded[:(self.num_of_ch-self.no_eeg),-4*self.Fs:] = eye_reg( self.loaded[:self.num_of_ch-self.no_eeg, -4*self.Fs:], self.loaded[self.eog_ch,-4*self.Fs:]) #plt.plot(self.loaded[3,-4*self.Fs:], '--r') #plt.show() step7 = time.time()-time_start # Applies montage for both types of data (visualisation and processing) self.loaded = apply_montage(self.loaded, self.montage_matrix) self.loaded_noeye = apply_montage(self.loaded_noeye, self.montage_matrix) step8 = time.time()-time_start self.update_plot() #updates main plot with data stim_where = check_integrity(np.where(self.trigg_data>0)[0]) #indexes of triggers step9 = time.time()-time_start # If do_calibration is True and there were trigger recently then stop calibration # TODO: THAT'S CONDITION REQUIRED FOR STARTING MEASURMENTS. PROBABLY IT WON'T WORK FOR SOME MORE EXTREME SETTINGS # if self.do_calibration and len(stim_where[stim_where>(self.data_len-max( # 2000, round(self.Fs*self.time_between_bursts*0.7, -3)))])>0: if self.do_calibration and len(stim_where[stim_where>(self.data_len-max(2000, self.exp_time+1500))])>0: self.calibration() if self.do_calibration: # If do_calibration is True then continue calibration process and exit this function if self.calibration_counter%self.calibration_counter_max==0: self.calibration_process() self.calibration_counter+=1 return 0 #ends run of this function so nothing else happens step10 = time.time()-time_start # If set number of stimuli is detected # doit --> set length of the measurment between bursts # if self.doit==0 and sum(stim_where>self.data_len-max( # 2000, round(self.Fs*self.time_between_bursts*0.7, -3)))==10: print(sum(stim_where>self.data_len-max(2000, self.exp_time+1500))) if self.doit==0 and sum(stim_where>self.data_len- max(2000, self.exp_time+1500))==self.exp_trig: self.axesMap[0,0].set_facecolor('whitesmoke') self.axesMap[0,1].set_facecolor('whitesmoke') self.axesMap[1,0].set_facecolor('whitesmoke') self.axesMap[1,1].set_facecolor('whitesmoke') self.axesMap[2,0].set_facecolor('whitesmoke') self.axesMap[2,1].set_facecolor('whitesmoke') self.doit=self.time_between_bursts-self.breaktime self.last_stim = stim_where[-1] set_to_gray(self.plot_story) for dot in self.red_dots: for line in dot: line.remove() self.red_dots = [] self.canvasMap.draw() # If doit is more than 0 --> readout phase elif self.doit>0: print(self.doit) self.last_stim = stim_where[-1] # In some situations last stimuli might be already from another train, while # First 100ms of loaded signal belong to previous one. That is why this exception exists # EDIT: not sure if it's still needed after other changes I made if any(np.diff(stim_where[stim_where>self.data_len-max( 2000, round(self.Fs*self.time_between_bursts*1.2, -3))])>1000): print('UWAGA NA TO') self.last_stim = stim_where[np.where(np.diff(stim_where)>1000)[0]][-1] #index of last stim + half of breaktime + seconds that were already read before self.id1 = int(self.last_stim+0.5*(self.breaktime*self.Fs)+( self.time_between_bursts-self.breaktime-self.doit)*self.Fs) self.id2 = int(self.id1 + self.Fs) #one second later print('id1',self.id1) if self.id20)[0]) # dif = np.max(loaded[:,-self.plot_len*self.Fs:])-np.min( # loaded[:,-self.plot_len*self.Fs:]) #difference between max and min #include only plot for i in range(self.num_of_ch): self.data[i] = loaded[i].copy() #dif*i+dif #to fit into one plot #set data, last plot_len seconds plot_data = self.data[i,self.data_len-self.plot_len* self.Fs:self.data_len] plot_data = plot_data[::self.plot_len] #lets speed up plotting by downsampling #EMG has higher amplitude usually. A special case to make it smaller #self.emg_ch+1 because self.num_of_ch doesn't include trigger if i==np.arange(self.num_of_ch)[self.emg_ch+1] and self.emg_ch!='': plot_data = plot_data/(self.plot_dividing_factor*0.1) + self.num_of_ch - i if i==np.arange(self.num_of_ch)[self.eog_ch+1]: plot_data = plot_data/(self.plot_dividing_factor*2) + self.num_of_ch - i else: plot_data = plot_data/self.plot_dividing_factor + self.num_of_ch - i # plot_data = plot_data/(3.5*np.max(np.abs(plot_data))) + self.num_of_ch - i self.line[i].set_data(np.arange(0,self.Fs), plot_data) #self.plot_len* self.axes.set_ylim(0, self.num_of_ch+1) #self.axes.set_ylim(0,np.max(self.data[:,-self.plot_len*self.Fs:])+dif) #set ylim to fit everything on the plot if len(stim)>0: for ind in stim: self.axes.axvline(int(ind/self.plot_len)) #plot vertical line for each trigger self.num = len(stim) # plt.figure() # plt.plot(ss.detrend(self.data[10,self.data_len-self.plot_len* # self.Fs:self.data_len])) self.canvas.draw() #update canvas for i in range(self.num): self.axes.lines[-1].remove() #if trigger outside of plot range then remove vertical line self.num=0 print('main plot time:', time.time()-startt) def start_stop(self): """Starts and stops timer and changes value of run value for data acquiring process""" if self.timer.isActive(): self.timer.stop() else: self.timer.start() self.run.value = not self.run.value def calibration(self): """Prepares the process of calibration, changes a button's text and color""" self.do_calibration = not self.do_calibration if self.do_calibration: #This prepares the software for calibration phase self.axesMap[0,0].set_facecolor('whitesmoke') self.axesMap[0,1].set_facecolor('whitesmoke') self.axesMap[1,0].set_facecolor('whitesmoke') self.axesMap[1,1].set_facecolor('whitesmoke') self.axesMap[2,0].set_facecolor('whitesmoke') self.axesMap[2,1].set_facecolor('whitesmoke') self.button4.setText("Stop calibration") self.button4.setStyleSheet('background-color : lawngreen') #Need to clean the figure to prepare is for a different type of plot for i in range(6): self.axesMap[i//2,i%2].cla() self.f1_cal = [] self.f2_cal = [] self.f3_cal = [] self.f4_cal = [] self.f5_cal = [] self.f6_cal = [] #Change labels and make sure labels are ok for i in range(6): #if not all(np.isnan(self.thrs[i])): self.axesMap[i//2,i%2].set_ylabel("Counts", fontsize=self.labels_size) self.axesMap[i//2,i%2].set_xlabel(self.unit_label[i], fontsize=self.labels_size) self.axesMap[i//2,i%2].set_title(self.Titles[i], fontsize=9) else: self.trial_num = 0 self.plot_story = np.zeros([self.num_of_lines, 6], dtype=object) #This is post-calibration phase self.button4.setText("Start calibration") self.button4.setStyleSheet('') #par = self.thr_parameter #Need to clean the figure to prepare is for a different type of plot for i in range(6): self.axesMap[i//2,i%2].cla() #Draw horizontal lines - out calculated thresholds self.axhlines = [None for _ in range(12)] num_temp = 0 cals = [self.f1_cal, self.f2_cal, self.f3_cal, self.f4_cal, self.f5_cal, self.f6_cal] print(len(self.f1_cal)) #remove outliers! if self.remove_outliers: for cal_idx, cal in enumerate(cals): cals[cal_idx] = np.array(cal)[~doubleMADsfromMedian(cal)] self.f1_cal, self.f2_cal, self.f3_cal, self.f4_cal, self.f5_cal, self.f6_cal = cals print(len(self.f1_cal)) rThr = self.received_thr_values self.thrs = np.zeros([6,2]) cals_temp = [self.f1_cal, self.f2_cal, self.f3_cal, self.f4_cal, self.f5_cal, self.f6_cal] print(self.if_percentage) if all(np.array(self.if_percentage) == 0): self.thrs = [[rThr[0], rThr[1]], [rThr[2], rThr[3]], [rThr[4], rThr[5]], [rThr[6], rThr[7]], [rThr[8], rThr[9]], [rThr[10], rThr[11]]] else: for idx in range(len(cals_temp)): cal = cals_temp[idx] if self.if_percentage[2*idx]: self.thrs[idx, 0] = np.min(cal)-0.01*rThr[2*idx]*(np.max(cal) - np.min(cal)) print(np.min(cal), np.max(cal), rThr[2*idx]) else: self.thrs[idx, 0] = rThr[2*idx] if self.if_percentage[2*idx+1]: self.thrs[idx, 1] = np.max(cal)+0.01*rThr[2*idx+1]*(np.max(cal) - np.min(cal)) else: self.thrs[idx, 1] = rThr[2*idx+1] print('ddddd', rThr[2*idx+1]) self.thr_1, self.thr_2, self.thr_3, self.thr_4, self.thr_5, self.thr_6 = self.thrs #put axhlines for i in range(6): self.axhlines[num_temp] = self.axesMap[i//2,i%2].axhline(self.thrs[i][0], color = 'r') self.axhlines[num_temp+1] = self.axesMap[i//2,i%2].axhline(self.thrs[i][1], color = 'r') num_temp+=2 #Auto_readout_lim - automatic adjustment of the figure limits: if self.auto_readout_lim: for i in range(6): if not all(np.isnan(self.thrs[i])): self.axesMap[i//2,i%2].set_ylim(min_zero(np.array(self.thrs[i]) + np.diff(np.array(self.thrs[i]))*np.array([-2,2]))) else: for i in range(6): if not all(np.isnan(self.thrs[i])): self.axesMap[i//2,i%2].set_ylim(self.measures_x_lims[i,0],self.measures_x_lims[i,1]) #set xlim, titles and ylabels for i in range(6): if not all(np.isnan(self.thrs[i])): self.axesMap[i//2,i%2].set_xlim(0.5,self.time_between_bursts-self.breaktime+0.5) self.axesMap[i//2,i%2].set_title(self.Titles[i], fontsize=9) self.axesMap[i//2,i%2].set_ylabel(self.unit_label[i], fontsize=self.labels_size, rotation=270) self.axesMap[i//2,i%2].set_xlabel("Time [s]", fontsize=self.labels_size) texts = [] #print on the figure whgat was the max of each feature for ind,val in enumerate([np.max(self.f1_cal), np.max(self.f2_cal), np.max(self.f3_cal), np.max(self.f4_cal), np.max(self.f5_cal), np.max(self.f6_cal)]): if not all(np.isnan(self.thrs[ind])): texts.append(self.axesMap[ind//2,ind%2].text(0.8,0.9, round(val,2), transform=self.axesMap[ind//2,ind%2].transAxes, color='r', fontsize=7)) self.canvasMap.draw() def settings(self): self.options = Ui(self) def update_plot_lim(self): if not self.do_calibration: if self.auto_readout_lim: for i in range(6): self.axesMap[i//2,i%2].set_ylim(min_zero(np.array(self.thrs[i]) + np.diff(np.array(self.thrs[i]))*np.array([-2,2]))) else: for i in range(6): self.axesMap[i//2,i%2].set_ylim(self.measures_x_lims[i,0],self.measures_x_lims[i,1]) else: for i in range(6): self.axesMap[i//2,i%2].set_xlim(self.measures_x_lims[i,0],self.measures_x_lims[i,1]) self.canvasMap.draw() def calibration_process(self): """Calibrate thresholds values""" #same things as in update_methods time_start = time.time() self.second_to_analyze = self.data_processing(True) #calculate fft self.S = abs(np.fft.rfft(self.second_to_analyze)) #this is a bit shady, but should work. check it out if doesn't! #do dwt only if any features requires it if any(feature_num in self.used_features for feature_num in [8,9,10,11]): self.dwt = pywt.wavedec(self.second_to_analyze, 'db1', level=8) self.results = np.zeros(len(self.used_features)) #print(self.used_features) for idx, feature in enumerate(self.used_features): if feature==0: self.results[idx] = None continue elif feature in [8,9,10,11]: #to specify which band self.results[idx] = self.functions[feature](feature) else: self.results[idx] = self.functions[feature]() self.log_file_writer.writerow([time.time() - self.time_start, 'Calibration', self.results]) self.log_file.flush() #add mean to the list self.f1_cal.append(self.results[0]) self.f2_cal.append(self.results[1]) self.f3_cal.append(self.results[2]) self.f4_cal.append(self.results[3]) self.f5_cal.append(self.results[4]) self.f6_cal.append(self.results[5]) self.cals = [self.f1_cal, self.f2_cal, self.f3_cal, self.f4_cal, self.f5_cal, self.f6_cal] #!!! Temporary, it's wrong but it's overwritten later. It's needed to check if all features are used, #but there could be more optimal solution. Remove it at some point! self.thr_1 = [np.min(self.f1_cal)-0.1*np.min(self.f1_cal), np.max(self.f1_cal)+0.1*np.max(self.f1_cal)] self.thr_2 = [np.min(self.f2_cal)-0.1*np.min(self.f2_cal), np.max(self.f2_cal)+0.1*np.max(self.f2_cal)] self.thr_3 = [np.min(self.f3_cal)-0.1*np.min(self.f3_cal), np.max(self.f3_cal)+0.1*np.max(self.f3_cal)] self.thr_4 = [np.min(self.f4_cal)-0.1*np.min(self.f4_cal), np.max(self.f4_cal)+0.1*np.max(self.f4_cal)] self.thr_5 = [np.min(self.f5_cal)-0.1*np.min(self.f5_cal), np.max(self.f5_cal)+0.1*np.max(self.f5_cal)] self.thr_6 = [np.min(self.f6_cal)-0.1*np.min(self.f6_cal), np.max(self.f6_cal)+0.1*np.max(self.f6_cal)] self.thrs = [self.thr_1, self.thr_2, self.thr_3, self.thr_4, self.thr_5, self.thr_6] times1 = time.time() print("Thr features calculation: {}".format(times1-time_start)) num_bins=35 if self.auto_readout_lim: if len(self.f4_cal)==1: for line_idx in range(len(self.linetemps)): if not all(np.isnan(self.thrs[line_idx])): y,x = np.histogram(self.cals[line_idx], num_bins, self.thrs[line_idx]) self.linetemps[line_idx] = self.axesMap[line_idx//2,line_idx%2].plot(adjust_hist(x), y, '.-') else: for line_idx in range(len(self.linetemps)): if not all(np.isnan(self.thrs[line_idx])): data = np.histogram(self.cals[line_idx], num_bins, self.thrs[line_idx]) update_stem(self.linetemps[line_idx], data, self.axesMap[line_idx//2,line_idx%2]) for i in range(6): if not all(np.isnan(self.thrs[i])): self.axesMap[i//2,i%2].set_xlim(self.thrs[i][0], self.thrs[i][1]) elif len(self.f4_cal)==1: for line_idx in range(len(self.linetemps)): if not all(np.isnan(self.thrs[line_idx])): y,x = np.histogram(self.cals[line_idx], num_bins, self.measures_x_lims[line_idx]) self.linetemps[line_idx] = self.axesMap[line_idx//2,line_idx%2].plot(adjust_hist(x), y, '.-') else: for line_idx in range(len(self.linetemps)): if not all(np.isnan(self.thrs[line_idx])): data = np.histogram(self.cals[line_idx], num_bins, self.thrs[line_idx]) update_stem(self.linetemps[line_idx], data, self.axesMap[line_idx//2,line_idx%2]) for i in range(6): if not all(np.isnan(self.thrs[i])): self.axesMap[i//2,i%2].set_xlim(self.measures_x_lims[i,0],self.measures_x_lims[i,1]) for i in range(6): if not all(np.isnan(self.thrs[i])): self.axesMap[i//2,i%2].set_ylim(self.measures_y_lims[i,0],self.measures_y_lims[i,1]) # self.linetemps = [self.linetemp1, self.linetemp2, self.linetemp3, self.linetemp4, # self.linetemp5, self.linetemp6] # for i in range(3): # for j in range(2): # self.axesMap[i,j].draw_artist(self.linetemps[i*2+j][0]) # if len(self.f1_cal)>0: # self.canvasMap.update() # self.canvasMap.flush_events() # else: self.canvasMap.draw() #update canvas print('Thr histogram plots', time.time()-time_start) def zoom_out(self): self.plot_dividing_factor = self.plot_dividing_factor*2 def zoom_in(self): self.plot_dividing_factor = self.plot_dividing_factor*0.5 def create_main_frame(self): """Prepare figure, buttons, plots, etc...""" #set colors using for plotting, I THINK I DONT NEED THAT ANYMORE mpl.rcParams['axes.prop_cycle'] = cycler(color=cm.nipy_spectral( np.linspace(0,0.2,6))) self.main_frame = QWidget() self.dpi = 100 self.fig = Figure((11, 10), dpi=self.dpi)#, facecolor='whitesmoke') #figure for signal plot self.figMap = Figure((8, 10), dpi=self.dpi) #figure for measurments self.axes = self.fig.subplots(1,1) #self.axes.set_position([0, 0, 1, 1]) self.line = [None for _ in range(self.num_of_ch)] #list for lines #self.axes.set_axis_off() #turn all axes off for tick in self.axes.xaxis.get_major_ticks(): tick.tick1line.set_visible(False) tick.tick2line.set_visible(False) tick.label1.set_visible(False) tick.label2.set_visible(False) # for tick in self.axes.yaxis.get_major_ticks(): # tick.tick1line.set_visible(False) # tick.tick2line.set_visible(False) # tick.label1.set_visible(False) # tick.label2.set_visible(False) self.axes.set_yticks(np.arange(1, (self.num_of_ch)*1.01, 1)) self.axes.set_yticklabels(self.ch_names[::-1]) self.axes.set_xticks(np.arange(int(self.Fs/self.plot_len), self.Fs, int(self.Fs/self.plot_len))) #self.plot_len* self.axes.grid(True) self.canvas = FigureCanvas(self.fig) self.canvas.setParent(self.main_frame) self.canvasMap = FigureCanvas(self.figMap) # self.canvasMap.setParent(self.main_frame) self.axesMap = self.figMap.subplots(3,2) self.Titles = [self.features_names[idx] for idx in self.used_features] for i in range(6): self.axesMap[i//2,i%2].set_title(self.Titles[i], fontsize=9) self.axesMap[i//2,i%2].set_ylabel(self.unit_label[i], fontsize=self.labels_size, rotation=270) label_size = 7 for i in range(3): for j in range(2): self.axesMap[i,j].set_xlabel("Time [s]", fontsize=self.labels_size) self.axesMap[i,j].yaxis.set_label_coords(self.lab_x_dist, self.lab_y_dist) self.axesMap[i,j].xaxis.set_label_coords(self.lab_y_dist, self.lab_x_dist) self.axesMap[i,j].xaxis.set_tick_params(labelsize = label_size) self.axesMap[i,j].yaxis.set_tick_params(labelsize = label_size) #using list with lines we plot all channels for i in range(self.num_of_ch): if i in self.included_ch or self.ch_names[i] in ["EMG", "EOG"]: self.line[i], = self.axes.plot([] , color = 'black', linewidth=0.4) else: self.line[i], = self.axes.plot([] , color = 'silver', linewidth=0.3) self.axes.set_xlim(0, self.Fs) #self.plot_len* self.axes.set_ylim(0, (self.num_of_ch+1)*1) #self.axes.axvspan((self.plot_len-1)*self.Fs, # self.plot_len*self.size_of_up, alpha=0.3, color='lightcoral') self.fig.tight_layout(pad=1, rect=(0.02,-0.02,1.02,1.02)) #tighten the layout by removing free spaces self.figMap.tight_layout() # self.fig.subplots_adjust(left=0.02,right=1, # bottom=0,top=1, # hspace=0,wspace=0) self.axes.spines['left'].set_visible(True) self.axes.spines['bottom'].set_visible(False) self.axes.spines['top'].set_visible(False) self.axes.spines['right'].set_visible(False) self.canvas.draw() self.canvasMap.draw() #update canvas #NOT NEEDED # for i in range(self.num_of_ch): # self.axbackground = self.canvas.copy_from_bbox(self.axes.bbox) # texts = [] # for ind,name in enumerate(self.ch_names): # texts.append(self.axes.text(-0.03,1-(ind+1)/(len(self.ch_names)+1), # name, transform=self.axes.transAxes, color='r', fontsize=13)) self.button1 = QPushButton("&Settings") self.button1.setCheckable(False) self.button1.clicked.connect(self.settings) self.button2 = QPushButton("&Start/Stop") self.button2.setCheckable(True) self.button2.clicked.connect(self.start_stop) self.button4 = QPushButton("&Start calibration") self.button4.setCheckable(False) self.button4.clicked.connect(self.calibration) self.button_zoom_in = QPushButton("+") self.button_zoom_in.setCheckable(False) self.button_zoom_in.clicked.connect(self.zoom_in) self.button_zoom_in.setMaximumWidth(20) self.button_zoom_out = QPushButton("-") self.button_zoom_out.setCheckable(False) self.button_zoom_out.clicked.connect(self.zoom_out) self.button_zoom_out.setMaximumWidth(20) hbox_zoom = QHBoxLayout() hbox_zoom.addWidget(self.button_zoom_in, Qt.AlignLeft) hbox_zoom.addWidget(self.button_zoom_out, Qt.AlignLeft) hbox_zoom.addStretch(1) # hbox_zoom.setAlignment(self.button_zoom_in, Qt.AlignLeft) # hbox_zoom.setAlignment(self.button_zoom_out, Qt.AlignLeft) #set layout of buttons hbox = QHBoxLayout() for w in [self.button1, self.button2, #self.button3, self.button4]: hbox.addWidget(w) hbox.setAlignment(w, Qt.AlignVCenter) #set genera layout layout = QGridLayout() layout.addWidget(self.canvas, 0, 0, 1, 1) layout.addWidget(self.canvasMap, 0, 1, 1, 1) layout.addLayout(hbox, 1, 1, 1, 1) layout.addLayout(hbox_zoom, 1,0,1,1) self.main_frame.setLayout(layout) self.setCentralWidget(self.main_frame) class Ui(QMainWindow): """""" def __init__(self, main_file): self.main_file = main_file super(Ui, self).__init__() uic.loadUi('settings/soft2.ui', self) self.show() self.load_button = self.findChild(QPushButton, 'pushButton') self.load_button.clicked.connect(self.get_file) self.auto_ylim = self.findChild(QCheckBox, 'checkBox') self.auto_ylim.setChecked(self.main_file.auto_readout_lim) self.disp_max_box = self.findChild(QCheckBox, 'checkBox_2') self.disp_max_box.setChecked(self.main_file.disp_max) self.apply_lims_button = self.findChild(QPushButton, 'pushButton_2') self.apply_lims_button.clicked.connect(self.apply_lims) self.apply_restart = self.findChild(QPushButton, 'pushButton_3') self.apply_restart.clicked.connect(lambda: self.main_file.all_params('restarted')) self.file_path = self.findChild(QLabel, 'label_12') names_xlim = ['lineEdit_2', 'lineEdit_3', 'lineEdit_4', 'lineEdit_5', 'lineEdit_6', 'lineEdit_7'] names_ylim = ['lineEdit_9', 'lineEdit_13', 'lineEdit_10', 'lineEdit_11', 'lineEdit_8', 'lineEdit_12'] self.xlim_values = np.zeros(6, dtype=object) self.ylim_values = np.zeros(6, dtype=object) for i in range(6): self.xlim_values[i] = self.findChild(QLineEdit, names_xlim[i]) self.xlim_values[i].setText(str(list(self.main_file.measures_x_lims[i]))) self.ylim_values[i] = self.findChild(QLineEdit, names_ylim[i]) self.ylim_values[i].setText(str(list(self.main_file.measures_y_lims[i]))) def get_file(self): self.path = QFileDialog.getOpenFileName(self, 'Open File', os.path.dirname(os.getcwd()), 'Text files (*.txt *.csv)') print(self.path) self.file_path.setText(self.path[0]) self.main_file.montage_matrix = np.array(pd.read_csv(self.path[0], header=None)) self.main_file.montage_file_path = self.path[0] def apply_lims(self): for i in range(6): try: self.main_file.measures_x_lims[i] = json.loads(self.xlim_values[i].text()) self.main_file.measures_y_lims[i] = json.loads(self.ylim_values[i].text()) self.main_file.auto_readout_lim = self.auto_ylim.isChecked() self.main_file.disp_max = self.disp_max_box.isChecked() except: print('Fix line:', self.xlim_values[i].text()) print(self.main_file.measures_x_lims) print(self.main_file.measures_y_lims) self.main_file.update_plot_lim() if __name__ == '__main__': app = QApplication(sys.argv) app.setQuitOnLastWindowClosed(True) form = First_window(AppForm) #AppForm() form.show() sys.exit(app.exec_()) # cut time different from both sides # some deafult settings. Maybe remember last configuration? # EMG and EOG - none, more than one? # change names to final names