EStiMo/utils/Functions.py
2023-06-14 14:25:52 +02:00

214 lines
6.9 KiB
Python

# -*- coding: utf-8 -*-
"""
Created on Mon Jan 24 11:48:34 2022
@author: Basics
"""
import numpy as np
import time
import scipy.signal as ss
import scipy
def apply_montage(data, matrix):
#print(matrix)
if data.shape[0]!=matrix.shape[0]:
if data.shape[0]==matrix.shape[0]+1:
data = data[:-1]
times = time.time()
new_data = np.zeros(data.shape)
# for i in range(matrix.shape[0]):
# for j in range(matrix.shape[1]):
# if float(abs(matrix[i,j]))>0:
# new_data[i,:]+=data[j]*float(matrix[i,j])
# print(time.time()-times)
print(data.shape, matrix.shape)
new_data = (data.T@matrix.T).T
return new_data
def eye_reg(eeg, eog, regg = True, Fs=1000):
if regg:
print(eeg.shape)
eeg = eeg.T
#eeg = ss.detrend(eeg.T)
print(eog.shape)
[A,B] = ss.butter(2, 40/(Fs/2), 'lowpass')
eog = ss.filtfilt(A, B, eog)
eogt = ss.detrend(eog.reshape([len(eog),1]),axis=0)
#eogt = eog.reshape([len(eog),1])
data_reg_eog = eeg - np.dot(eogt, np.linalg.lstsq(eogt,eeg, rcond=None)[0])
print('EYE REG ZROBIONEEEE hmmm')
print(data_reg_eog.shape, eogt.shape,eeg.shape)
# plt.figure()
# plt.plot(data_reg_eog[:,-5],c='b')
# plt.plot(eeg[:,-5], c='orange')
# plt.plot(eogt, c='green')
# plt.show()
return(data_reg_eog).T
else:
return(eeg)
def most_frequent(arr):
"""returns most frequent item in the array
Parameters
------------
arr: array type
Returns
------------
most frequent item in the list or 0
"""
try:
counts = np.bincount(arr)
return np.argmax(counts)
#return max(set(List), key = List.count)
except:
return 0
def connect_sig(data1, data2, fs):
"""Knowing that signal doesn't updates perfectly every x seconds, I was looking for a way
to connect it in proper time, but finally other approach have been used
Parameters
-------------
data1, data2: numpy array types with MxN size
fs: int
sampling rate
"""
print(data1.shape, data2.shape)
if all(data1[:, -fs] == None):
print(data2[:, -fs])
data_ret = data1.copy()
data_ret[:, :-fs] = data2[:, -fs].reshape(-1, 1)
data_ret[:,-fs:] = data2[:, -fs:]
return data_ret, 800
print(data2.shape)
data2 = data2
startt = time.time()
num = data1.shape[0]
size = data1.shape[1]
pts = list()
data_ret = np.zeros(data1.shape)
data_ret[:, :-fs] = data1[:,fs:]
if data2.shape[0]==0 or data2.shape[1]==0:
return data2
if fs<2000:
for i in range(num):
try:
pts.extend(np.where(data1[i,-1]==data2[i])[0].tolist())
except:
print("ARBEJDE IKKEEE")
# data_ret = np.concatenate((data1, data2[:,-int(size):]),1)
data_ret[:,-fs:] = data2[:, -fs:]
return data_ret, 800
#print('hehe', time.time()-startt)
most_fr = most_frequent(np.array(pts))
#print('hehe', time.time()-startt)
print(list(pts).count(most_fr)>num//2, most_fr)
if fs<2000 and list(pts).count(most_fr)>num//2:
most_fr = most_fr+1
#print(data2[:,int(pts[i]):int(pts[i]+size)].shape)
print('hehe', time.time()-startt)
#data_ret = np.concatenate((data1, data2[:,int(most_fr)+1:int(most_fr+size)+1]),1)
data_ret[:,-fs:] = data2[:, most_fr:most_fr+fs]
print('connect:', time.time()-startt)
return data_ret, most_fr
else:
data_ret[:,-fs:] = data2[:, -fs:]
print("NEEEJJJJJJJ")
print('connect:', time.time()-startt)
return data_ret, 800
def set_to_gray(lines):
for i in range(lines.shape[0]):
for j in range(lines.shape[1]):
if type(lines[i,j]) != int:
lines[i,j][0].set_color("gray")
def update_stem(line, data, ax, relim=False):
x = adjust_hist(data[1])
y = data[0]
line[0].set_ydata(y)
line[0].set_xdata(x) # not necessary for constant x
# stemlines
# line[1].set_paths([np.array([[xx, 0], [xx, yy]]) for (xx, yy) in zip(x, y)])
# line[2].set_xdata([np.min(x), np.max(x)])
# line[2].set_ydata([0, 0]) # not necessary for constant bottom
if relim:
ax.relim()
# update ax.viewLim using the new dataLim
ax.autoscale_view(scalex=True)
def adjust_hist(data):
new_data = np.zeros(len(data)-1)
for i in range(len(new_data)):
new_data[i] = np.mean([data[i], data[i+1]])
return new_data
def min_zero(thr):
if thr[0]<0:
return [0, thr[1]]
else:
return thr
def pentropy(signal, Fs, nperseg=None, fmin=None, fmax=None):
#I think it's good to put own nperseg value, because default one is not always good in this case
f, time, S = ss.spectrogram(signal, Fs, nperseg=nperseg) #spectrogram of signal
if fmin and fmax:
idxs = np.where((f<fmax) & (f>fmin)) #choosing only bands that we are interested in
S = S[idxs]
P = np.zeros(S.shape)
H = np.zeros(S.shape[1])
for t in range(S.shape[1]):
for m in range(S.shape[0]):
P[m,t] = S[m,t]/np.sum(S[:,t],0) #according to matlab instruction
H[t] += -P[m,t]*np.log2(P[m,t]+0.000001)/np.log2(S.shape[0])
return f, S,time, H
def entropy(S, bins=100):
histo = np.histogram(S, bins)[0]
p = scipy.special.entr(histo/histo.sum())
ent = sum(p)
return ent
def check_integrity(stim):
"""If the list contains two consecutive values, the function leaves only the first.
It was needed because NeurOne sometimes returns two stimuli pulses78.
Parameters
-----------------
stim: list or array type with numbers, in our case indexes of stimuli
Returns
-----------------
ent: array with removed values
"""
#copy the list, I need two, because from one values are removed, and other gives
#proper index values
stim_c = list(stim.copy())
stim = list(stim)
for ind in range(len(stim_c)-1):
if stim_c[ind]==(stim_c[ind+1]-1):
stim.remove(stim_c[ind+1])
return np.array(stim)
def doubleMADsfromMedian(y,thresh=3.5):
# warning: this function does not check for NAs
# nor does it address issues when
# more than 50% of your data have identical values
m = np.median(y)
abs_dev = np.abs(y - m)
left_mad = np.median(abs_dev[y <= m])
right_mad = np.median(abs_dev[y >= m])
y_mad = left_mad * np.ones(len(y))
y_mad[y > m] = right_mad
modified_z_score = 0.6745 * abs_dev / y_mad
modified_z_score[y == m] = 0
return modified_z_score > thresh