EStiMo/EStiMo_GUI.py
2025-05-08 18:05:51 +00:00

1366 lines
67 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Nov 23 11:11:41 2021
@author: adamr
"""
import sys, os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from connection import NeurOne, RDA
# import RDA
import time
import matplotlib
import mne
#import inspect
import sys, os
import numpy as np
import scipy.signal as ss
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.cm as cm
import PyQt5.uic as uic
import pandas as pd
import json
import scipy.stats as st
import csv
import datetime
import ctypes
import scipy
import pywt
from cycler import cycler
from PyQt5.QtCore import QTimer, Qt
from PyQt5.QtGui import QImage, QPixmap, QIcon, QFont
from PyQt5.QtWidgets import (QMainWindow, QFileDialog, QMessageBox, QCheckBox, QLineEdit, QWidget, QPushButton,
QLabel, QHBoxLayout, QGridLayout, QAction, QApplication, QDialog, QDialogButtonBox,
QVBoxLayout, QFrame, QFormLayout)
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar
from matplotlib.figure import Figure
from mne.channels.layout import _find_topomap_coords as get_pos
if sys.platform=='darwin':
import multiprocessing as mp
from multiprocessing import Process, Value
#from multiprocessing import Queue as QueueOld #Queue doesn't work on MacOS
else:
from multiprocessing import Process, Queue, Value
from utils.Waiting import Waiting_window
from utils.FirstWindow import First_window
from utils.Functions import (apply_montage, eye_reg, most_frequent, connect_sig, set_to_gray, update_stem,
adjust_hist, min_zero, pentropy, entropy, check_integrity, doubleMADsfromMedian)
if sys.platform=='darwin':
from multiprocessing.queues import Queue as QueueOld
class SharedCounter(object):
#source: https://gist.github.com/FanchenBao/d8577599c46eab1238a81857bb7277c9
"""A synchronized shared counter.
The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
in order to do n += 1, Python performs a read followed by a write, so a
second process may read the old value before the new one is written by the
first process. The solution is to use a multiprocessing.Lock to guarantee
the atomicity of the modifications to Value.
This class comes almost entirely from Eli Bendersky's blog:
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
"""
def __init__(self, n=0):
self.count = mp.Value('i', n)
def increment(self, n=1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n
@property
def value(self):
""" Return the value of the counter """
return self.count.value
class Queue(QueueOld):
#source: https://gist.github.com/FanchenBao/d8577599c46eab1238a81857bb7277c9
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
Note the implementation of __getstate__ and __setstate__ which help to
serialize MyQueue when it is passed between processes. If these functions
are not defined, MyQueue cannot be serialized, which will lead to the error
of "AttributeError: 'MyQueue' object has no attribute 'size'".
See the answer provided here: https://stackoverflow.com/a/65513291/9723036
For documentation of using __getstate__ and __setstate__ to serialize objects,
refer to here: https://docs.python.org/3/library/pickle.html#pickling-class-instances
"""
def __init__(self):
super().__init__(ctx=mp.get_context())
self.size = SharedCounter(0)
def __getstate__(self):
"""Help to make MyQueue instance serializable.
Note that we record the parent class state, which is the state of the
actual queue, and the size of the queue, which is the state of MyQueue.
self.size is a SharedCounter instance. It is itself serializable.
"""
return {
'parent_state': super().__getstate__(),
'size': self.size,
}
def __setstate__(self, state):
super().__setstate__(state['parent_state'])
self.size = state['size']
def put(self, *args, **kwargs):
super().put(*args, **kwargs)
self.size.increment(1)
def get(self, *args, **kwargs):
item = super().get(*args, **kwargs)
self.size.increment(-1)
return item
def qsize(self):
""" Reliable implementation of multiprocessing.Queue.qsize() """
return self.size.value
def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()
class NeurOneOffline():
def __init__(self):
self.data = None
def NO(self,ip,port=50000,buffersize=2**15,ringbuffersize=2000,
sendqueue=False,ringbuf_factor=2,dump=False,avgPackets=1):
self.ringbuffersize = ringbuffersize
tmp_path = '/mnt/projects/P_BCT_EEG/DLPFCM1_iTBS/DLPFC/nobeep/subj_8/X47851_iTBS.vhdr'#
#'/mnt/projects/P_BCT_EEG/DLPFCM1_iTBS/DLPFC/beep/subj_14/X13193_adam.vhdr' #'/mnt/projects/P_BCT_EEG/DLPFCM1_iTBS/DLPFC/beep/subj_6/X7738_iTBS.vhdr'
num_electr = 18
eeg_chn = np.arange(0,num_electr,1)
hdr = mne.io.read_raw_brainvision(tmp_path)
# Annotations returns all events - stimA, stimB, stopA, stopB, start of experiment etc... We chose only stim
stim = hdr.annotations.onset[np.logical_or(hdr.annotations.description=="Stimulus/A",
hdr.annotations.description=="Stimulus/B")]
# Separate stimA and stimB
stimA = hdr.annotations.onset[hdr.annotations.description=="Stimulus/A"]
stimB = hdr.annotations.onset[hdr.annotations.description=="Stimulus/B"]
npts = hdr.n_times
nfft = int(hdr.info['sfreq']) # Sampling rate [Hz]
fs = int(hdr.info['sfreq']) # Sampling rate [Hz]
endsample = npts
begsample = 0
print('Sampling rate [Hz]: ' + str(fs))
#[stim, resp, segment, timezero] = mne.read_annotations(mrk_fullpath) thinkkkkk
eeg_raw = hdr.get_data(start=begsample, stop=endsample);
eeg = eeg_raw[eeg_chn, :] # Select all electrodes
data_raw = (hdr.get_data()*1e7)
# data_raw = data_raw[:,:]#int(data_raw.shape[1]%5)]
print(data_raw.shape)
stimB_arr = (stimB*fs).astype(int)
dividing_factor = int(fs/1000)
data_raw_new = np.zeros([data_raw.shape[0], int(data_raw.shape[1]//dividing_factor)-1])
for i in range(data_raw.shape[0]):
a = data_raw[i, :-int(data_raw.shape[1]%dividing_factor)-dividing_factor]
R = dividing_factor
data_raw_new[i,:] = a.reshape(-1, R).mean(1)
data_raw = data_raw_new
fs=1000
stimB_arr = (stimB_arr/dividing_factor).astype(int)
stim_sig = np.zeros(data_raw.shape[1]).reshape([1,data_raw.shape[1]])
for i in stimB_arr:
stim_sig[0,i] = 1
self.fs = fs
self.data = np.concatenate((data_raw, stim_sig))
self.buffer = self.data[:,:ringbuffersize]
def start(self):
self.time = time.time()
def getBuffer(self):
time_now = time.time()
time_diff = int((time_now - self.time)*self.fs)
return self.data[:,time_diff-self.ringbuffersize:time_diff]
def acquire_data(q, size, run, speed, downsample, sleep_time, ip = '192.168.200.201', port = 5000, offline=False):
"""Acquires data from NeurOne_v3 and pass it to the queue. Function is supposed to work
in separated process.
Parameters
------------
q: Queue class from multiprocessing library
size: number of samples to acquire
run: Value class from multiprocessing library. That value can be changed in main process
downsample: boolean value. Says if data will be downsampled to 1000 Hz
sleep_time: int, set how often function should refresh. Usually it takes a bit more that that"""
# offline = 'offline'
#import NeurOne_v3
if offline=="offline":
NO = NeurOneOffline()
NO.NO(ip=ip,port=50000,buffersize=2**15,ringbuffersize=size,\
sendqueue=False,ringbuf_factor=2,dump=False,avgPackets=downsample) #offline
elif offline=="NeurOne":
# ip='192.168.200.201'
# port = 50000
NO = NeurOne.NO(ip=ip,port=port,buffersize=2**15,ringbuffersize=size,\
sendqueue=False,ringbuf_factor=2, dump=None,avgPackets=downsample) #online
elif offline=="BrainProducts":
# ip = "169.254.252.66"
# port = 51244
NO = RDA.RDA(ip=ip, port=port, buffersize=2**15, ringbuffersize=size,\
sendqueue=False, avgPackets=downsample)
NO.start()
while True:
startt = time.time()
if run.value:
data = NO.getBuffer()#[channel_list]
try:
q.put(data)
except: print('ERROR')
endd=time.time()
time.sleep(sleep_time*speed.value-(endd-startt))
class AppForm(QMainWindow):
"""TMS GUI script."""
def __init__(self, passed_params = None, parent=None):
super().__init__()
#self.setStyleSheet("background: whitesmoke")
self.offline = None #"offline" #'NeurOne' #"BrainProducts"#False
self.time_start = time.time()
QMainWindow.__init__(self, parent)
self.montage_file_path = 'montage_18ch.csv'
self.features()
self.speed_general = 250 # fs has to be divisible by this value
self.all_params(passed_params)
self.run = Value('i', 0) # Value that can be changed inside separated process
# To control data flow speed we can initialize another variable used inside the process
self.speed = Value(ctypes.c_float, self.speed_general/1000)
self.q = Queue() # Queue for data from separated process (data acquire)
# Run acquire_data function in separated process so it doesn't freeze when other operations are going
self.p = Process(target=acquire_data, args=(self.q, self.size_of_up,
self.run, self.speed, True,
self.update_time, self.ip, self.port,
self.offline))
self.p.start() #start it
self.timer = QTimer(timerType = Qt.PreciseTimer) #preparing timer
self.timer.setInterval(self.speed_general) #every 1000 ms = 1 sec
self.timer.timeout.connect(self.update) #use update fuction every 1 sec
#self.timer.start()
#set name and icon (for some reason the icon doesn't appear on the taskbar)
self.setWindowTitle('EStiMo')
self.setWindowIcon(QIcon('icon.jpg'))
def features(self):
"""Collection of implemented features. Should be moved to separated .py
file in the future, to make adding new features easier. For now it's part
of the class, what also optimizes it slightly, because some of parameters are
shared within a class
TO ADD NEW FEATURES: check the bottom of this function"""
def calculate_theta(): #1
"""Theta band calculation based on previously calculated FFT/Welch"""
theta = self.S[:, self.theta_band[0]:self.theta_band[1]].mean(1)
return theta.mean()
def calculate_alpha(): #2
"""Alpha band calculation based on previously calculated FFT/Welch"""
alpha = self.S[:, self.alpha_band[0]:self.alpha_band[1]].mean(1)
return alpha.mean()
def calculate_beta(): #3
"""Beta band calculation based on previously calculated FFT/Welch"""
beta = self.S[:,self.beta_band[0]:self.beta_band[1]].mean(1)
return beta.mean()
def calculate_h_gamma(): #4
"""High Gamma band calculation based on previously calculated FFT/Welch"""
high = self.S[:,80:120].mean(1)
return high.mean()
def calculate_temp_entropy(): #5
"""Temporal Entropy calculation"""
return np.mean(np.mean([entropy(self.second_to_analyze[e_ch]) for e_ch
in range(self.second_to_analyze.shape[0])]))
def calculate_spect_entropy(): #6
"""Spectral Entropy calculation based on previously calculated FFT/Welch"""
return np.mean(np.mean([entropy(self.S[e_ch]) for e_ch in range(self.S.shape[0])]))
def calculate_line_length(): #7
"""Line length calculation. The function scipy.spatial.distance.cdist makes it
very fast compared to other methods"""
ch_sum = 0
for i in range(self.second_to_analyze.shape[0]):
a_seg = np.zeros([2, self.second_to_analyze.shape[1]])
a_seg[0,:] = self.second_to_analyze[i]#*1e6
a_seg[1,:] = np.linspace(0,1,self.Fs)
matrix_dist = scipy.spatial.distance.cdist(a_seg, a_seg, 'euclidean')
ch_sum += sum(np.diagonal(matrix_dist,1))
return ch_sum
def wavelets(band): #8,9,10,11
"""Power in given band based on DWT calculated before. Works correctly
only for fs=1000 Hz!!! With different Fs it will give power for different band.
Ranges for 1000 Hz are: 1000, 500, 250, 125, 62.5, 31.25, 15.625, 7.8125, 3.9062, 0.
We are interested in lasts of them:
8: 0 - 3.906 Hz
9: 3.906 - 7.812 Hz
10: 7.812 - 15.625 Hz
11: 15.625 - 31.25 Hz
"""
band = band - 9
dwt_pw1, dwt_pw2, dwt_pw3, dwt_pw4 = self.dwt[:4]
#sum of squares
dwt_pw1 = np.sum(dwt_pw1**2)
dwt_pw2 = np.sum(dwt_pw2**2)
dwt_pw3 = np.sum(dwt_pw3**2)
dwt_pw4 = np.sum(dwt_pw4**2)
dwts = dwt_pw1, dwt_pw2, dwt_pw3, dwt_pw4
return dwts[band]
def calculate_variance(): #12
"""Sum of Variance calculation"""
return sum(np.var(self.second_to_analyze,1))
def calculate_corr(): #13
"""Temporal Correlation between channels calculation. Returnes mean of absoulte
values of correlations between channels"""
R = np.corrcoef(self.second_to_analyze)
if type(R)==np.ndarray:
R = abs(np.tril(R, -1)) #tril zeroes one triangle of array
R_sum = R[R!=0] #we take only what is left
if type(R)==np.ndarray: #in case it is just only one value (that means we analyze only one channels)
R_sum = R_sum[~np.isnan(R_sum)]
return np.mean(np.abs(R_sum))
###############################################
# You can add new functions now:
###############################################
# Add all new functions to this list
self.functions = [None, calculate_theta, calculate_alpha, calculate_beta, calculate_h_gamma,
calculate_spect_entropy, calculate_temp_entropy, calculate_line_length,
wavelets, wavelets, wavelets, wavelets, calculate_variance, calculate_corr]
# Set the unit for the output of each function (they correspond to functions based on index)
self.unit_label = ['None', "Power", "Power", "Power", "Power", "Entropy", "Entropy", "Length",
"Power", "Power", "Power", "Power", "Variance", "Correlation"]
# Add name of the added feature to the end of this list
self.features_names = ['None', 'Theta FFT Power', 'Alpha FFT Power', 'Beta FFT Power',
'High Gamma FFT Power', 'Spectral entropy', 'Temporal entropy',
'Line length', 'DWT Power 0-4 Hz', 'DWT 4-8 Hz',
'DWT 8-16 Hz', 'DWT 16-31 Hz','Variance','Correlation']
def all_params(self, passed_params = None, restarted=''):
#reads values from TMS_protocol.txt file
#montage matrix is a matrix that is multiplied with the signal
#identity matrix multiply all channels by 1, so we have the same signal as an output
settings_file = pd.read_csv('settings/TMS_protocol.txt',sep=':', header=None) #read file with settings
print(restarted)
#open the file to save values during the experiment
self.log_file = open('logs//TMS_log_'+str(f"{datetime.datetime.now():%Y-%m-%d-%H-%M}"+restarted+'.csv'), 'w')
self.log_file_writer = csv.writer(self.log_file)
self.log_file.flush() #flushing is applying changes we made to the file
self.Fs = 1000 #5000
self.size_of_up = 2*self.Fs #5000 how much data we get from NeurOne function
#file with settings and assigning variables to them
# settings_file = pd.read_csv('settings/TMS_protocol.txt',sep=':', header=None)
#params can be set in GUI, so then no need for using ones from the file
if passed_params is not None:
self.montage_file_path = passed_params['montage_path']
self.time_between_bursts = int(passed_params['time_between'])
self.breaktime = int(passed_params['cut_time'])
self.included_ch = passed_params['included_ch']
self.eog_ch_l = list(passed_params['eog_ch'])
self.emg_ch_l = list(passed_params['emg_ch'])
self.num_of_ch = int(passed_params['num_of_ch'])
self.num_of_lines = int(passed_params['num_of_lines'])
self.ch_names = passed_params['ch_names']
self.slow_mode = passed_params['slow_mode']
self.used_features = passed_params['features']
self.use_notch = passed_params['notch']
self.use_regression = passed_params['eye_reg']
self.remove_outliers = passed_params['remove_outliers']
self.ip = passed_params['ip']
self.port = passed_params['port']
self.offline = self.offline if not self.offline==None else passed_params['offline']
self.exp_trig = passed_params['exp_trig']
self.exp_time = passed_params['exp_time']
self.if_percentage = passed_params['percentages']
self.received_thr_values = passed_params['thr_values']
self.plot_len = passed_params['plot_len']
print(self.offline)
else:
self.montage_file_path = 'montage_18ch.csv'
self.time_between_bursts = int(settings_file[settings_file[0]=='time_between_trains'].values[0][1])
self.breaktime = int(settings_file[settings_file[0]=='cut_time'].values[0][1])
self.included_ch = settings_file[settings_file[0]=='included_channels'].values[0][1].strip()
self.eog_ch_l = [int(settings_file[settings_file[0]=='eog_channel'].values[0][1])]
self.emg_ch_l = [int(settings_file[settings_file[0]=='emg_channel'].values[0][1])]
self.num_of_ch = int(settings_file[settings_file[0]=='number_of_channels'].values[0][1])
self.num_of_lines = int(settings_file[settings_file[0]=='number_of_lines'].values[0][1])
self.exp_trig_loaded = int(settings_file[settings_file[0]=='expected_triggers'].values[0][1])
self.exp_time_loaded = int(settings_file[settings_file[0]=='expected_time'].values[0][1])
self.if_percentage = np.ones(12)
self.received_thr_values = [10]*12
self.slow_mode = False
self.used_features = [0,1,2,3,4,5]
self.use_notch = True
self.use_regression = True
self.remove_outliers = True
self.ip = '192.168.200.201'
self.port = 50000
self.offline = False
self.plot_len = 4 #length of data to plot (last seconds of data array)
self.unit_label = np.array(self.unit_label)[self.used_features]
self.log_file_writer.writerow(['time', 'state', self.used_features])
if len(self.eog_ch_l)==0:
self.eog_ch = ''
if len(self.emg_ch_l)==0:
self.emg_ch = ''
self.eog_ch = self.eog_ch_l[0]
self.emg_ch = self.emg_ch_l[0]
if self.slow_mode:
self.speed_general = 1000
else:
self.speed_general = 250
if self.included_ch in ['all', 'All']:
self.included_ch = np.arange(len(self.ch_names))
elif type(self.included_ch)==list:
pass
else:
self.included_ch = json.loads(self.included_ch)
print(self.included_ch)
# self.montage_matrix = np.identity(self.num_of_ch)
if self.montage_file_path in [None, '']:
self.montage_file_path = 'settings/montage_18ch.csv'
self.montage_matrix = np.array(pd.read_csv(self.montage_file_path, header=None))
#initialization of variables. Some of them are loaded from file and cannot be changed in the GUI
self.no_eeg = len(self.eog_ch_l) + len(self.emg_ch_l)
# print(self.no_eeg)
# self.ch_names = json.loads(settings_file[settings_file[0]=='names'].values[0][1])
self.update_time = 1 #how many seconds between updates of everything
#prepare empty arrays for data
self.feature1 = np.full([1000, self.time_between_bursts-self.breaktime], None)
self.feature2 = np.full([1000, self.time_between_bursts-self.breaktime], None)
self.feature3 = np.full([1000, self.time_between_bursts-self.breaktime], None)
self.feature4 = np.full([1000, self.time_between_bursts-self.breaktime], None)
self.feature5 = np.full([1000, self.time_between_bursts-self.breaktime], None)
self.feature6 = np.full([1000, self.time_between_bursts-self.breaktime], None)
#json.loads takes string and returns list, so we can read list from txt file
self.alpha_band = json.loads(settings_file[settings_file[0]=='alpha_range'].values[0][1])
self.beta_band = json.loads(settings_file[settings_file[0]=='beta_range'].values[0][1])
self.theta_band= json.loads(settings_file[settings_file[0]=='theta_range'].values[0][1])
self.colors = ['b', 'm', 'r', 'k', 'c', ] #colors used for lines if less that 6 of them
self.data_len = 30*self.Fs #length of the data array in seconds
self.plot_dividing_factor = 100
self.previous_state = np.zeros(6)
if self.num_of_lines>5: #if more than 5 lines then colors of them from colormap
self.colors = cm.plasma(np.linspace(0,1,self.num_of_lines))
#Example of thresholds for each measurment. Changes after first second of calibration.
self.thr_parameter = int(settings_file[settings_file[0]=='threshold_parameter'].values[0][1])
self.thr_2 = [20,100]
self.thr_1 = [20,100]
self.thr_3 = [20,100]
self.thr_4 = [0, 0.5]
self.thr_4 = [0, 0.5]
self.thr_5 = [0.3, 0.8]
self.thr_6 = [1,3]
self.linetemps = np.zeros(6, dtype=object)
self.labels_size = 7
self.lab_x_dist = 1.
self.lab_y_dist = 0.5
self.measures_x_lims = np.array([[-1,1],[0,350],[0,350],[0,350],[0,2],[3,8]], dtype=float)
self.measures_y_lims = np.array([[0,70],[0,70],[0,70],[0,70],[0,70],[0,70]], dtype=float)
self.create_main_frame() #create plots, buttons, figures etc...
#create data array
self.loaded = np.full([self.num_of_ch,self.data_len], None)
self.loaded_full = np.full([self.num_of_ch+1,self.data_len], None)
self.data = np.full((self.num_of_ch,self.data_len), None)
self.trigg_data = np.zeros(self.data_len) #array to keep trigger data in
self.num=0
self.doit=0 #to count number of seconds after last stimuli in train
self.plot_story = np.zeros([self.num_of_lines, 6], dtype=object) #thigs that are being plot
self.trial_num = 0 #how many train there were
self.auto_readout_lim = True
self.disp_max = True
self.do_calibration = False #if True then calibration process is going on
self.calibration_counter = 0
self.calibration_counter_max = int(1000/self.speed_general)
#Lists to keep calibration values for each measurment
self.f1_cal = []
self.f2_cal = []
self.f3_cal = []
self.f4_cal = []
self.f5_cal = []
self.f6_cal = []
self.qmbx = None
self.red_dots = []
def data_processing(self, calibration=False):
"""Pre-processing of the data used for extracting features"""
#in case of readout period times have to be adjusted to the last pulse, during calibration it's just a last second.
if calibration:
self.last_sec = self.loaded_noeye[:,-self.Fs:].copy()
if self.use_regression:
eog = self.loaded_noeye[self.eog_ch, -self.Fs:]
else:
self.last_sec = self.loaded_noeye[:,self.id1:self.id2].copy()
if self.use_regression:
eog = self.loaded_noeye[self.eog_ch ,self.id1:self.id2]
[A,B] = ss.butter(2, 0.1/(self.Fs/2), 'highpass')
self.last_sec = ss.filtfilt(A, B, self.last_sec)
if self.use_notch:
[A,B] = ss.butter(2, [49/(self.Fs/2), 51/(self.Fs/2)], 'bandstop')
self.last_sec = ss.filtfilt(A, B, self.last_sec)
self.last_sec = ss.detrend(self.last_sec, axis=1)
if self.use_regression:
self.last_sec = eye_reg(self.last_sec[self.included_ch], eog)
return self.last_sec
def update_methods(self):
"""Calculates all features and plot their values"""
times = time.time()
self.second_to_analyze = self.data_processing() #Take data
#calculate fft
self.S = abs(np.fft.rfft(self.second_to_analyze))
#do dwt only if any features requires it
if any(feature_num in self.used_features for feature_num in [9,10,11,12]):
self.dwt = pywt.wavedec(self.second_to_analyze, 'db1', level=8) #db1, 8 levels. Works well for 1000 Hz only
self.results = np.zeros(len(self.used_features))
#Put all results to one array
for idx, feature in enumerate(self.used_features):
if feature==0:
self.results[idx] = None
elif feature in [8,9,10,11]: #to specify which band
self.results[idx] = self.functions[feature](feature)
else:
self.results[idx] = self.functions[feature]()
x = np.where(self.feature1==None)[0][0]
y = np.where(self.feature1==None)[1][0]
if y==0:
self.previous_state = np.zeros(6)
self.feature1[x,y] = self.results[0]
self.feature2[x,y] = self.results[1]
self.feature3[x,y] = self.results[2]
self.feature4[x,y] = self.results[3]
self.feature5[x,y] = self.results[4]
self.feature6[x,y] = self.results[5]
#save in the log
self.log_file_writer.writerow([time.time() - self.time_start, 'between bursts',
self.results])
self.log_file.flush()
times1 = time.time()
print("Calculation of features: {}".format(times1-times))
old_prv_state = self.previous_state.copy()
if not all(np.isnan(self.thr_1)):
if self.results[0]>=self.thr_1[0] and self.results[0]<=self.thr_1[1]:
if self.previous_state[0]==1:
self.axesMap[0,0].set_facecolor('whitesmoke')
self.previous_state[0]=0
elif self.previous_state[0]==0:
self.red_dots.append(self.axesMap[0,0].plot(y+1, self.results[0], 'r*'))
self.axesMap[0,0].set_facecolor('xkcd:salmon')
self.previous_state[0]=1
if not all(np.isnan(self.thr_2)):
if self.results[1]>=self.thr_2[0] and self.results[1]<=self.thr_2[1]:
if self.previous_state[1]==1:
self.axesMap[0,1].set_facecolor('whitesmoke')
self.previous_state[1]=0
elif self.previous_state[1]==0:
self.red_dots.append(self.axesMap[0,1].plot(y+1, self.results[1], 'r*'))
self.axesMap[0,1].set_facecolor('xkcd:salmon')
self.previous_state[1]=1
if not all(np.isnan(self.thr_3)):
if self.results[2]>=self.thr_3[0] and self.results[2]<=self.thr_3[1]:
if self.previous_state[2]==1:
self.axesMap[1,0].set_facecolor('whitesmoke')
self.previous_state[2]=0
elif self.previous_state[2]==0:
self.red_dots.append(self.axesMap[1,0].plot(y+1, self.results[2], 'r*'))
self.axesMap[1,0].set_facecolor('xkcd:salmon')
self.previous_state[2]=1
if not all(np.isnan(self.thr_4)):
if self.results[3]>=self.thr_4[0] and self.results[3]<=self.thr_4[1]:
if self.previous_state[3]==1:
self.axesMap[1,1].set_facecolor('whitesmoke')
self.previous_state[3]=0
elif self.previous_state[3]==0:
self.red_dots.append(self.axesMap[0,0].plot(y+1, self.results[3], 'r*'))
self.axesMap[1,1].set_facecolor('xkcd:salmon')
self.previous_state[3]=1
if not all(np.isnan(self.thr_5)):
if self.results[4]>=self.thr_5[0] and self.results[4]<=self.thr_5[1]:
if self.previous_state[4]==1:
self.axesMap[2,0].set_facecolor('whitesmoke')
self.previous_state[4]=0
elif self.previous_state[4]==0:
self.red_dots.append(self.axesMap[2,0].plot(y+1, self.results[4], 'r*'))
self.axesMap[2,0].set_facecolor('xkcd:salmon')
self.previous_state[4]=1
if not all(np.isnan(self.thr_6)):
if self.results[5]>=self.thr_6[0] and self.results[5]<=self.thr_6[1]:
if self.previous_state[5]==1:
self.axesMap[2,1].set_facecolor('whitesmoke')
self.previous_state[5]=0
elif self.previous_state[5]==0:
self.red_dots.append(self.axesMap[2,1].plot(y+1, self.results[5], 'r*'))
self.axesMap[2,1].set_facecolor('xkcd:salmon')
self.previous_state[5]=1
#If there was no change in a color, not need to redraw figure
if all(self.previous_state==old_prv_state):
need_redraw = False
else: #but otherwise we need to redraw the figure
need_redraw = True
alfa = 1
color = 'b'
#if first plot
if self.trial_num<self.num_of_lines and self.plot_story[self.trial_num,0]==0:
self.plot_story[self.trial_num,0] = self.axesMap[0,0].plot(np.arange(1,self.time_between_bursts-self.breaktime+1),
self.feature1[x].T, '--o', alpha=alfa, color=color)
self.plot_story[self.trial_num,1] = self.axesMap[0,1].plot(np.arange(1,self.time_between_bursts-self.breaktime+1),
self.feature2[x].T, '--o', alpha=alfa, color=color)
self.plot_story[self.trial_num,2] = self.axesMap[1,0].plot(np.arange(1,self.time_between_bursts-self.breaktime+1),
self.feature3[x].T, '--o', alpha=alfa, color=color)
self.plot_story[self.trial_num,3] = self.axesMap[1,1].plot(np.arange(1,self.time_between_bursts-self.breaktime+1),
self.feature4[x].T, '--o', alpha=alfa, color=color)
self.plot_story[self.trial_num,4] = self.axesMap[2,0].plot(np.arange(1,self.time_between_bursts-self.breaktime+1),
self.feature5[x].T, '--o', alpha=alfa, color=color)
self.plot_story[self.trial_num,5] = self.axesMap[2,1].plot(np.arange(1,self.time_between_bursts-self.breaktime+1),
self.feature6[x].T, '--o', alpha=alfa, color=color)
self.canvasMap.draw()
else: #if not the first plot, just set_ydata to optimize the speed
self.plot_story[self.trial_num%self.num_of_lines,0][0].set_ydata(self.feature1[x].T)
self.plot_story[self.trial_num%self.num_of_lines,0][0].set_zorder(self.trial_num)
self.plot_story[self.trial_num%self.num_of_lines,0][0].set_color('b')
self.plot_story[self.trial_num%self.num_of_lines,1][0].set_ydata(self.feature2[x].T)
self.plot_story[self.trial_num%self.num_of_lines,1][0].set_zorder(self.trial_num)
self.plot_story[self.trial_num%self.num_of_lines,1][0].set_color('b')
self.plot_story[self.trial_num%self.num_of_lines,2][0].set_ydata(self.feature3[x].T)
self.plot_story[self.trial_num%self.num_of_lines,2][0].set_zorder(self.trial_num)
self.plot_story[self.trial_num%self.num_of_lines,2][0].set_color('b')
self.plot_story[self.trial_num%self.num_of_lines,3][0].set_ydata(self.feature4[x].T)
self.plot_story[self.trial_num%self.num_of_lines,3][0].set_zorder(self.trial_num)
self.plot_story[self.trial_num%self.num_of_lines,3][0].set_color('b')
self.plot_story[self.trial_num%self.num_of_lines,4][0].set_ydata(self.feature5[x].T)
self.plot_story[self.trial_num%self.num_of_lines,4][0].set_zorder(self.trial_num)
self.plot_story[self.trial_num%self.num_of_lines,4][0].set_color('b')
self.plot_story[self.trial_num%self.num_of_lines,5][0].set_ydata(self.feature6[x].T)
self.plot_story[self.trial_num%self.num_of_lines,5][0].set_zorder(self.trial_num)
self.plot_story[self.trial_num%self.num_of_lines,5][0].set_color('b')
#If there is a need to redraw we do that. draw() option is slower, but more robust
if need_redraw:
self.canvasMap.draw()
#otherwise we can just update the line, or to be precise,
else:
for i in range(3):
for j in range(2):
self.axesMap[i,j].draw_artist(self.plot_story[self.trial_num%self.num_of_lines,i*2+j][0])
self.canvasMap.update()
self.canvasMap.flush_events()
# self.canvasMap.draw() #update plot
print("Plot update time: {}".format(time.time()-times1))
def update(self):
"""Updates data, checks if something should be plotted"""
time_start = time.time()
if self.q.qsize()>0:
self.timer.setInterval(int(1*self.speed_general*0.97))
if self.q.qsize()<1 and self.timer.interval()!= int(self.speed_general*1.1):
self.timer.setInterval(int(self.speed_general*1.03))
times = time.time()
# self.offline='offline' #remove this!
if self.offline=="offline":
incl = [0,2,6,7,8,10,13,16,18,22,25,28,31,34,41,43,-3,-2,-1] # For offline only
loaded_temp = self.q.get()[incl]/10 # Load data
try:
self.loaded, most_fr = connect_sig(self.loaded_full, loaded_temp, self.speed_general)
except ValueError:
print("ValueError, wait...")
return 0
else:
data_divide = 1
if self.offline=="NeurOne":
data_divide = 10
loaded_temp = self.q.get()/data_divide
try:
self.loaded, most_fr = connect_sig(self.loaded_full, loaded_temp.T, self.speed_general)
except ValueError:
print("ValueError, wait...")
return 0
self.loaded_noeye = self.loaded.copy()
step1 = time.time()-time_start
#if connection point is too far or too close it modifies the acquisition rate.
try:
if most_fr>900:
self.speed.value=1.01*self.speed_general/1000
elif most_fr<500:
self.speed.value=0.8*self.speed_general/1000
elif most_fr<800 and most_fr>500:
self.speed.value = 0.98*self.speed_general/1000
print('speed:',self.speed.value)
#TODO: check if that is still needed
except UnboundLocalError:
pass
self.trigg = self.loaded[-1].copy() #keeps trigger in a different array
self.loaded_full = self.loaded.copy() #keeps it for the next second
step2 = time.time()-time_start
# ZeroDivisionError means that data is not yet loaded
try:
# [A,B] = ss.butter(2, 0.1/(self.Fs/2), 'highpass')
# self.loaded[:self.num_of_ch,-4*self.Fs:] = ss.filtfilt(A, B, self.loaded[:self.num_of_ch, -4*self.Fs:])
# self.loaded[:self.num_of_ch,-4*self.Fs:] = self.loaded[:self.num_of_ch,-4*self.Fs:] - np.mean(self.loaded[:self.num_of_ch,-4*self.Fs:],1, keepdims=True)
self.loaded[:self.num_of_ch,-self.plot_len*self.Fs:] = ss.detrend(self.loaded[:self.num_of_ch,-self.plot_len*self.Fs:])
except ZeroDivisionError: # This error means that buffer is still not full
if self.qmbx == None:
self.qmbx = Waiting_window() # Small window with a message to wait
print('Waiting for data (should take up to few seconds)')
return(0) # Stop function here, because in this case rest of update is not needed
# Getting to this point of the code means there were no exception before
# So the waiting window can be destroyed
if self.qmbx!=None:
self.qmbx.setText('Data received!')
self.qmbx=None #then it dies
step3 = time.time()-time_start
# Eye regression
self.trigg_data = self.trigg.copy()
od = self.data_len-self.plot_len*self.Fs # First point
do = self.data_len # Last point
# For interpolation of TMS artifact
int_from = 0.3 #first point for interpolation
int_to = 0.3 #last point for interpolation
step4 = time.time()-time_start
# If two stimuli (A and B) are sent together (shouldn't be) then it removes one to not create chaos
stim = check_integrity(np.where(self.trigg_data[od:do]>0)[0])
step5 = time.time()-time_start
# If there are some pulses detected then it interpolates the data used for the visualisation
if len(stim)>0:
for ind in stim[::-1]:
size = self.data_len - (od+ind-int(int_from*self.Fs))
self.loaded[:, od+ind-int(int_from*self.Fs):od+ind+int(int_to*self.Fs)] = self.loaded[:,min(od+ind+int(int_to*self.Fs), 30000-1)].reshape(-1, 1)
# for i in range(self.loaded.shape[0]):
# self.loaded[i, od+ind-int(int_from*self.Fs):od+ind+int(int_to*self.Fs)] = np.linspace(
# self.loaded[i,min(od+ind-int(int_from*self.Fs),30000-1)],
# self.loaded[i,max(od+ind+int(int_to*self.Fs),30000-1)],
# od+ind+int(int_to*self.Fs)-(od+ind-int(int_from*self.Fs)))
step6 = time.time()-time_start
# Eye regression for visualisation
#plt.figure()
#plt.plot(self.loaded[3, -4*self.Fs:])
#plt.plot(self.loaded[self.eog_ch,-4*self.Fs:], '--b')
if self.use_regression:
self.loaded[:(self.num_of_ch-self.no_eeg),-4*self.Fs:] = eye_reg(
self.loaded[:self.num_of_ch-self.no_eeg, -4*self.Fs:], self.loaded[self.eog_ch,-4*self.Fs:])
#plt.plot(self.loaded[3,-4*self.Fs:], '--r')
#plt.show()
step7 = time.time()-time_start
# Applies montage for both types of data (visualisation and processing)
self.loaded = apply_montage(self.loaded, self.montage_matrix)
self.loaded_noeye = apply_montage(self.loaded_noeye, self.montage_matrix)
step8 = time.time()-time_start
self.update_plot() #updates main plot with data
stim_where = check_integrity(np.where(self.trigg_data>0)[0]) #indexes of triggers
step9 = time.time()-time_start
# if self.do_calibration and len(stim_where[stim_where>(self.data_len-max(
# 2000, round(self.Fs*self.time_between_bursts*0.7, -3)))])>0:
if self.do_calibration and len(stim_where[stim_where>(self.data_len-max(2000, self.exp_time+1500))])>0:
self.calibration()
if self.do_calibration:
# If do_calibration is True then continue calibration process and exit this function
if self.calibration_counter%self.calibration_counter_max==0:
self.calibration_process()
self.calibration_counter+=1
return 0 #ends run of this function so nothing else happens
step10 = time.time()-time_start
# If set number of stimuli is detected
# if self.doit==0 and sum(stim_where>self.data_len-max(
# 2000, round(self.Fs*self.time_between_bursts*0.7, -3)))==10:
print(sum(stim_where>self.data_len-max(2000, self.exp_time+1500)))
if self.doit==0 and sum(stim_where>self.data_len-
max(2000, self.exp_time+1500))==self.exp_trig:
self.axesMap[0,0].set_facecolor('whitesmoke')
self.axesMap[0,1].set_facecolor('whitesmoke')
self.axesMap[1,0].set_facecolor('whitesmoke')
self.axesMap[1,1].set_facecolor('whitesmoke')
self.axesMap[2,0].set_facecolor('whitesmoke')
self.axesMap[2,1].set_facecolor('whitesmoke')
self.doit=self.time_between_bursts-self.breaktime
self.last_stim = stim_where[-1]
set_to_gray(self.plot_story)
for dot in self.red_dots:
for line in dot: line.remove()
self.red_dots = []
self.canvasMap.draw()
# If doit is more than 0 --> readout phase
elif self.doit>0:
print(self.doit)
self.last_stim = stim_where[-1]
if any(np.diff(stim_where[stim_where>self.data_len-max(
2000, round(self.Fs*self.time_between_bursts*1.2, -3))])>1000):
print('UWAGA NA TO')
self.last_stim = stim_where[np.where(np.diff(stim_where)>1000)[0]][-1]
#index of last stim + half of breaktime + seconds that were already read before
self.id1 = int(self.last_stim+0.5*(self.breaktime*self.Fs)+(
self.time_between_bursts-self.breaktime-self.doit)*self.Fs)
self.id2 = int(self.id1 + self.Fs) #one second later
print('id1',self.id1)
if self.id2<self.data_len: #if indexes are in range of our data, otherwise it would be updated next second
self.update_methods() #update side plots
self.doit+=-1
if self.doit==0: #that means readout is over
self.trial_num+=1
if self.button1.isChecked():
self.plot_spectrogram()
print(self.q.qsize())
print("whole process time:", time.time()-times)
print('steps', [step1, step2, step3, step4, step5, step6, step7, step8, step9, step10])
def update_plot(self):
startt = time.time()
loaded = self.loaded #loaded data
self.trigg_data = self.trigg.copy()
od = self.data_len-self.plot_len*self.Fs
do = self.data_len
stim = check_integrity(np.where(self.trigg_data[od:do]>0)[0])
# dif = np.max(loaded[:,-self.plot_len*self.Fs:])-np.min(
# loaded[:,-self.plot_len*self.Fs:]) #difference between max and min
#include only plot
for i in range(self.num_of_ch):
self.data[i] = loaded[i].copy() #dif*i+dif #to fit into one plot
#set data, last plot_len seconds
plot_data = self.data[i,self.data_len-self.plot_len*
self.Fs:self.data_len]
plot_data = plot_data[::self.plot_len] #lets speed up plotting by downsampling
#EMG has higher amplitude usually. A special case to make it smaller
#self.emg_ch+1 because self.num_of_ch doesn't include trigger
if i==np.arange(self.num_of_ch)[self.emg_ch+1] and self.emg_ch!='':
plot_data = plot_data/(self.plot_dividing_factor*0.1) + self.num_of_ch - i
if i==np.arange(self.num_of_ch)[self.eog_ch+1]:
plot_data = plot_data/(self.plot_dividing_factor*2) + self.num_of_ch - i
else:
plot_data = plot_data/self.plot_dividing_factor + self.num_of_ch - i
# plot_data = plot_data/(3.5*np.max(np.abs(plot_data))) + self.num_of_ch - i
self.line[i].set_data(np.arange(0,self.Fs), plot_data) #self.plot_len*
self.axes.set_ylim(0, self.num_of_ch+1)
#self.axes.set_ylim(0,np.max(self.data[:,-self.plot_len*self.Fs:])+dif) #set ylim to fit everything on the plot
if len(stim)>0:
for ind in stim:
self.axes.axvline(int(ind/self.plot_len)) #plot vertical line for each trigger
self.num = len(stim)
# plt.figure()
# plt.plot(ss.detrend(self.data[10,self.data_len-self.plot_len*
# self.Fs:self.data_len]))
self.canvas.draw() #update canvas
for i in range(self.num):
self.axes.lines[-1].remove() #if trigger outside of plot range then remove vertical line
self.num=0
print('main plot time:', time.time()-startt)
def start_stop(self):
"""Starts and stops timer and changes value of run value for data acquiring process"""
if self.timer.isActive():
self.timer.stop()
else:
self.timer.start()
self.run.value = not self.run.value
def calibration(self):
"""Prepares the process of calibration, changes a button's text and color"""
self.do_calibration = not self.do_calibration
if self.do_calibration:
#This prepares the software for calibration phase
self.axesMap[0,0].set_facecolor('whitesmoke')
self.axesMap[0,1].set_facecolor('whitesmoke')
self.axesMap[1,0].set_facecolor('whitesmoke')
self.axesMap[1,1].set_facecolor('whitesmoke')
self.axesMap[2,0].set_facecolor('whitesmoke')
self.axesMap[2,1].set_facecolor('whitesmoke')
self.button4.setText("Stop calibration")
self.button4.setStyleSheet('background-color : lawngreen')
#Need to clean the figure to prepare is for a different type of plot
for i in range(6):
self.axesMap[i//2,i%2].cla()
self.f1_cal = []
self.f2_cal = []
self.f3_cal = []
self.f4_cal = []
self.f5_cal = []
self.f6_cal = []
#Change labels and make sure labels are ok
for i in range(6):
#if not all(np.isnan(self.thrs[i])):
self.axesMap[i//2,i%2].set_ylabel("Counts", fontsize=self.labels_size)
self.axesMap[i//2,i%2].set_xlabel(self.unit_label[i], fontsize=self.labels_size)
self.axesMap[i//2,i%2].set_title(self.Titles[i], fontsize=9)
else:
self.trial_num = 0
self.plot_story = np.zeros([self.num_of_lines, 6], dtype=object)
#This is post-calibration phase
self.button4.setText("Start calibration")
self.button4.setStyleSheet('')
#par = self.thr_parameter
for i in range(6):
self.axesMap[i//2,i%2].cla()
#Draw horizontal lines - out calculated thresholds
self.axhlines = [None for _ in range(12)]
num_temp = 0
cals = [self.f1_cal, self.f2_cal, self.f3_cal, self.f4_cal, self.f5_cal, self.f6_cal]
print(len(self.f1_cal))
#remove outliers!
if self.remove_outliers:
for cal_idx, cal in enumerate(cals):
cals[cal_idx] = np.array(cal)[~doubleMADsfromMedian(cal)]
self.f1_cal, self.f2_cal, self.f3_cal, self.f4_cal, self.f5_cal, self.f6_cal = cals
print(len(self.f1_cal))
rThr = self.received_thr_values
self.thrs = np.zeros([6,2])
cals_temp = [self.f1_cal, self.f2_cal, self.f3_cal, self.f4_cal, self.f5_cal, self.f6_cal]
print(self.if_percentage)
if all(np.array(self.if_percentage) == 0):
self.thrs = [[rThr[0], rThr[1]], [rThr[2], rThr[3]], [rThr[4], rThr[5]],
[rThr[6], rThr[7]], [rThr[8], rThr[9]], [rThr[10], rThr[11]]]
else:
for idx in range(len(cals_temp)):
cal = cals_temp[idx]
if self.if_percentage[2*idx]:
self.thrs[idx, 0] = np.min(cal)-0.01*rThr[2*idx]*(np.max(cal) - np.min(cal))
print(np.min(cal), np.max(cal), rThr[2*idx])
else:
self.thrs[idx, 0] = rThr[2*idx]
if self.if_percentage[2*idx+1]:
self.thrs[idx, 1] = np.max(cal)+0.01*rThr[2*idx+1]*(np.max(cal) - np.min(cal))
else:
self.thrs[idx, 1] = rThr[2*idx+1]
print('ddddd', rThr[2*idx+1])
self.thr_1, self.thr_2, self.thr_3, self.thr_4, self.thr_5, self.thr_6 = self.thrs
#put axhlines
for i in range(6):
self.axhlines[num_temp] = self.axesMap[i//2,i%2].axhline(self.thrs[i][0], color = 'r')
self.axhlines[num_temp+1] = self.axesMap[i//2,i%2].axhline(self.thrs[i][1], color = 'r')
num_temp+=2
#Auto_readout_lim - automatic adjustment of the figure limits:
if self.auto_readout_lim:
for i in range(6):
if not all(np.isnan(self.thrs[i])):
self.axesMap[i//2,i%2].set_ylim(min_zero(np.array(self.thrs[i]) +
np.diff(np.array(self.thrs[i]))*np.array([-2,2])))
else:
for i in range(6):
if not all(np.isnan(self.thrs[i])):
self.axesMap[i//2,i%2].set_ylim(self.measures_x_lims[i,0],self.measures_x_lims[i,1])
#set xlim, titles and ylabels
for i in range(6):
if not all(np.isnan(self.thrs[i])):
self.axesMap[i//2,i%2].set_xlim(0.5,self.time_between_bursts-self.breaktime+0.5)
self.axesMap[i//2,i%2].set_title(self.Titles[i], fontsize=9)
self.axesMap[i//2,i%2].set_ylabel(self.unit_label[i], fontsize=self.labels_size, rotation=270)
self.axesMap[i//2,i%2].set_xlabel("Time [s]", fontsize=self.labels_size)
texts = []
#print on the figure whgat was the max of each feature
for ind,val in enumerate([np.max(self.f1_cal), np.max(self.f2_cal), np.max(self.f3_cal),
np.max(self.f4_cal), np.max(self.f5_cal), np.max(self.f6_cal)]):
if not all(np.isnan(self.thrs[ind])):
texts.append(self.axesMap[ind//2,ind%2].text(0.8,0.9, round(val,2),
transform=self.axesMap[ind//2,ind%2].transAxes, color='r', fontsize=7))
self.canvasMap.draw()
def settings(self):
self.options = Ui(self)
def update_plot_lim(self):
if not self.do_calibration:
if self.auto_readout_lim:
for i in range(6):
self.axesMap[i//2,i%2].set_ylim(min_zero(np.array(self.thrs[i]) +
np.diff(np.array(self.thrs[i]))*np.array([-2,2])))
else:
for i in range(6):
self.axesMap[i//2,i%2].set_ylim(self.measures_x_lims[i,0],self.measures_x_lims[i,1])
else:
for i in range(6):
self.axesMap[i//2,i%2].set_xlim(self.measures_x_lims[i,0],self.measures_x_lims[i,1])
self.canvasMap.draw()
def calibration_process(self):
"""Calibrate thresholds values"""
#same things as in update_methods
time_start = time.time()
self.second_to_analyze = self.data_processing(True)
#calculate fft
self.S = abs(np.fft.rfft(self.second_to_analyze))
if any(feature_num in self.used_features for feature_num in [8,9,10,11]):
self.dwt = pywt.wavedec(self.second_to_analyze, 'db1', level=8)
self.results = np.zeros(len(self.used_features))
#print(self.used_features)
for idx, feature in enumerate(self.used_features):
if feature==0:
self.results[idx] = None
continue
elif feature in [8,9,10,11]: #to specify which band
self.results[idx] = self.functions[feature](feature)
else:
self.results[idx] = self.functions[feature]()
self.log_file_writer.writerow([time.time() - self.time_start, 'Calibration',
self.results])
self.log_file.flush()
#add mean to the list
self.f1_cal.append(self.results[0])
self.f2_cal.append(self.results[1])
self.f3_cal.append(self.results[2])
self.f4_cal.append(self.results[3])
self.f5_cal.append(self.results[4])
self.f6_cal.append(self.results[5])
self.cals = [self.f1_cal, self.f2_cal, self.f3_cal, self.f4_cal, self.f5_cal,
self.f6_cal]
self.thr_1 = [np.min(self.f1_cal)-0.1*np.min(self.f1_cal),
np.max(self.f1_cal)+0.1*np.max(self.f1_cal)]
self.thr_2 = [np.min(self.f2_cal)-0.1*np.min(self.f2_cal),
np.max(self.f2_cal)+0.1*np.max(self.f2_cal)]
self.thr_3 = [np.min(self.f3_cal)-0.1*np.min(self.f3_cal),
np.max(self.f3_cal)+0.1*np.max(self.f3_cal)]
self.thr_4 = [np.min(self.f4_cal)-0.1*np.min(self.f4_cal),
np.max(self.f4_cal)+0.1*np.max(self.f4_cal)]
self.thr_5 = [np.min(self.f5_cal)-0.1*np.min(self.f5_cal),
np.max(self.f5_cal)+0.1*np.max(self.f5_cal)]
self.thr_6 = [np.min(self.f6_cal)-0.1*np.min(self.f6_cal),
np.max(self.f6_cal)+0.1*np.max(self.f6_cal)]
self.thrs = [self.thr_1, self.thr_2, self.thr_3, self.thr_4, self.thr_5, self.thr_6]
times1 = time.time()
print("Thr features calculation: {}".format(times1-time_start))
num_bins=35
if self.auto_readout_lim:
if len(self.f4_cal)==1:
for line_idx in range(len(self.linetemps)):
if not all(np.isnan(self.thrs[line_idx])):
y,x = np.histogram(self.cals[line_idx], num_bins, self.thrs[line_idx])
self.linetemps[line_idx] = self.axesMap[line_idx//2,line_idx%2].plot(adjust_hist(x), y, '.-')
else:
for line_idx in range(len(self.linetemps)):
if not all(np.isnan(self.thrs[line_idx])):
data = np.histogram(self.cals[line_idx], num_bins, self.thrs[line_idx])
update_stem(self.linetemps[line_idx], data, self.axesMap[line_idx//2,line_idx%2])
for i in range(6):
if not all(np.isnan(self.thrs[i])):
self.axesMap[i//2,i%2].set_xlim(self.thrs[i][0], self.thrs[i][1])
elif len(self.f4_cal)==1:
for line_idx in range(len(self.linetemps)):
if not all(np.isnan(self.thrs[line_idx])):
y,x = np.histogram(self.cals[line_idx], num_bins, self.measures_x_lims[line_idx])
self.linetemps[line_idx] = self.axesMap[line_idx//2,line_idx%2].plot(adjust_hist(x), y, '.-')
else:
for line_idx in range(len(self.linetemps)):
if not all(np.isnan(self.thrs[line_idx])):
data = np.histogram(self.cals[line_idx], num_bins, self.thrs[line_idx])
update_stem(self.linetemps[line_idx], data, self.axesMap[line_idx//2,line_idx%2])
for i in range(6):
if not all(np.isnan(self.thrs[i])):
self.axesMap[i//2,i%2].set_xlim(self.measures_x_lims[i,0],self.measures_x_lims[i,1])
for i in range(6):
if not all(np.isnan(self.thrs[i])):
self.axesMap[i//2,i%2].set_ylim(self.measures_y_lims[i,0],self.measures_y_lims[i,1])
# self.linetemps = [self.linetemp1, self.linetemp2, self.linetemp3, self.linetemp4,
# self.linetemp5, self.linetemp6]
# for i in range(3):
# for j in range(2):
# self.axesMap[i,j].draw_artist(self.linetemps[i*2+j][0])
# if len(self.f1_cal)>0:
# self.canvasMap.update()
# self.canvasMap.flush_events()
# else:
self.canvasMap.draw() #update canvas
print('Thr histogram plots', time.time()-time_start)
def zoom_out(self):
self.plot_dividing_factor = self.plot_dividing_factor*2
def zoom_in(self):
self.plot_dividing_factor = self.plot_dividing_factor*0.5
def create_main_frame(self):
"""Prepare figure, buttons, plots, etc..."""
#set colors using for plotting, I THINK I DONT NEED THAT ANYMORE
mpl.rcParams['axes.prop_cycle'] = cycler(color=cm.nipy_spectral(
np.linspace(0,0.2,6)))
self.main_frame = QWidget()
self.dpi = 100
self.fig = Figure((11, 10), dpi=self.dpi)#, facecolor='whitesmoke') #figure for signal plot
self.figMap = Figure((8, 10), dpi=self.dpi) #figure for measurments
self.axes = self.fig.subplots(1,1)
#self.axes.set_position([0, 0, 1, 1])
self.line = [None for _ in range(self.num_of_ch)] #list for lines
#self.axes.set_axis_off() #turn all axes off
for tick in self.axes.xaxis.get_major_ticks():
tick.tick1line.set_visible(False)
tick.tick2line.set_visible(False)
tick.label1.set_visible(False)
tick.label2.set_visible(False)
self.axes.set_yticks(np.arange(1, (self.num_of_ch)*1.01, 1))
self.axes.set_yticklabels(self.ch_names[::-1])
self.axes.set_xticks(np.arange(int(self.Fs/self.plot_len), self.Fs, int(self.Fs/self.plot_len))) #self.plot_len*
self.axes.grid(True)
self.canvas = FigureCanvas(self.fig)
self.canvas.setParent(self.main_frame)
self.canvasMap = FigureCanvas(self.figMap) #
self.canvasMap.setParent(self.main_frame)
self.axesMap = self.figMap.subplots(3,2)
self.Titles = [self.features_names[idx] for idx in self.used_features]
for i in range(6):
self.axesMap[i//2,i%2].set_title(self.Titles[i], fontsize=9)
self.axesMap[i//2,i%2].set_ylabel(self.unit_label[i], fontsize=self.labels_size, rotation=270)
label_size = 7
for i in range(3):
for j in range(2):
self.axesMap[i,j].set_xlabel("Time [s]", fontsize=self.labels_size)
self.axesMap[i,j].yaxis.set_label_coords(self.lab_x_dist, self.lab_y_dist)
self.axesMap[i,j].xaxis.set_label_coords(self.lab_y_dist, self.lab_x_dist)
self.axesMap[i,j].xaxis.set_tick_params(labelsize = label_size)
self.axesMap[i,j].yaxis.set_tick_params(labelsize = label_size)
#using list with lines we plot all channels
for i in range(self.num_of_ch):
if i in self.included_ch or self.ch_names[i] in ["EMG", "EOG"]:
self.line[i], = self.axes.plot([] , color = 'black', linewidth=0.4)
else:
self.line[i], = self.axes.plot([] , color = 'silver', linewidth=0.3)
self.axes.set_xlim(0, self.Fs) #self.plot_len*
self.axes.set_ylim(0, (self.num_of_ch+1)*1)
#self.axes.axvspan((self.plot_len-1)*self.Fs,
# self.plot_len*self.size_of_up, alpha=0.3, color='lightcoral')
self.fig.tight_layout(pad=1, rect=(0.02,-0.02,1.02,1.02)) #tighten the layout by removing free spaces
self.figMap.tight_layout()
# self.fig.subplots_adjust(left=0.02,right=1,
# bottom=0,top=1,
# hspace=0,wspace=0)
self.axes.spines['left'].set_visible(True)
self.axes.spines['bottom'].set_visible(False)
self.axes.spines['top'].set_visible(False)
self.axes.spines['right'].set_visible(False)
self.canvas.draw()
self.canvasMap.draw() #update canvas
self.button1 = QPushButton("&Settings")
self.button1.setCheckable(False)
self.button1.clicked.connect(self.settings)
self.button2 = QPushButton("&Start/Stop")
self.button2.setCheckable(True)
self.button2.clicked.connect(self.start_stop)
self.button4 = QPushButton("&Start calibration")
self.button4.setCheckable(False)
self.button4.clicked.connect(self.calibration)
self.button_zoom_in = QPushButton("+")
self.button_zoom_in.setCheckable(False)
self.button_zoom_in.clicked.connect(self.zoom_in)
self.button_zoom_in.setMaximumWidth(20)
self.button_zoom_out = QPushButton("-")
self.button_zoom_out.setCheckable(False)
self.button_zoom_out.clicked.connect(self.zoom_out)
self.button_zoom_out.setMaximumWidth(20)
hbox_zoom = QHBoxLayout()
hbox_zoom.addWidget(self.button_zoom_in, Qt.AlignLeft)
hbox_zoom.addWidget(self.button_zoom_out, Qt.AlignLeft)
hbox_zoom.addStretch(1)
# hbox_zoom.setAlignment(self.button_zoom_in, Qt.AlignLeft)
# hbox_zoom.setAlignment(self.button_zoom_out, Qt.AlignLeft)
#set layout of buttons
hbox = QHBoxLayout()
for w in [self.button1, self.button2, #self.button3,
self.button4]:
hbox.addWidget(w)
hbox.setAlignment(w, Qt.AlignVCenter)
#set genera layout
layout = QGridLayout()
layout.addWidget(self.canvas, 0, 0, 1, 1)
layout.addWidget(self.canvasMap, 0, 1, 1, 1)
layout.addLayout(hbox, 1, 1, 1, 1)
layout.addLayout(hbox_zoom, 1,0,1,1)
self.main_frame.setLayout(layout)
self.setCentralWidget(self.main_frame)
class Ui(QMainWindow):
""""""
def __init__(self, main_file):
self.main_file = main_file
super(Ui, self).__init__()
uic.loadUi('settings/soft2.ui', self)
self.show()
self.load_button = self.findChild(QPushButton, 'pushButton')
self.load_button.clicked.connect(self.get_file)
self.auto_ylim = self.findChild(QCheckBox, 'checkBox')
self.auto_ylim.setChecked(self.main_file.auto_readout_lim)
self.disp_max_box = self.findChild(QCheckBox, 'checkBox_2')
self.disp_max_box.setChecked(self.main_file.disp_max)
self.apply_lims_button = self.findChild(QPushButton, 'pushButton_2')
self.apply_lims_button.clicked.connect(self.apply_lims)
self.apply_restart = self.findChild(QPushButton, 'pushButton_3')
self.apply_restart.clicked.connect(lambda: self.main_file.all_params('restarted'))
self.file_path = self.findChild(QLabel, 'label_12')
names_xlim = ['lineEdit_2', 'lineEdit_3', 'lineEdit_4', 'lineEdit_5', 'lineEdit_6', 'lineEdit_7']
names_ylim = ['lineEdit_9', 'lineEdit_13', 'lineEdit_10', 'lineEdit_11', 'lineEdit_8', 'lineEdit_12']
self.xlim_values = np.zeros(6, dtype=object)
self.ylim_values = np.zeros(6, dtype=object)
for i in range(6):
self.xlim_values[i] = self.findChild(QLineEdit, names_xlim[i])
self.xlim_values[i].setText(str(list(self.main_file.measures_x_lims[i])))
self.ylim_values[i] = self.findChild(QLineEdit, names_ylim[i])
self.ylim_values[i].setText(str(list(self.main_file.measures_y_lims[i])))
def get_file(self):
self.path = QFileDialog.getOpenFileName(self, 'Open File', os.path.dirname(os.getcwd()), 'Text files (*.txt *.csv)')
print(self.path)
self.file_path.setText(self.path[0])
self.main_file.montage_matrix = np.array(pd.read_csv(self.path[0], header=None))
self.main_file.montage_file_path = self.path[0]
def apply_lims(self):
for i in range(6):
try:
self.main_file.measures_x_lims[i] = json.loads(self.xlim_values[i].text())
self.main_file.measures_y_lims[i] = json.loads(self.ylim_values[i].text())
self.main_file.auto_readout_lim = self.auto_ylim.isChecked()
self.main_file.disp_max = self.disp_max_box.isChecked()
except:
print('Fix line:', self.xlim_values[i].text())
print(self.main_file.measures_x_lims)
print(self.main_file.measures_y_lims)
self.main_file.update_plot_lim()
if __name__ == '__main__':
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(True)
form = First_window(AppForm) #AppForm()
form.show()
sys.exit(app.exec_())