214 lines
6.9 KiB
Python
214 lines
6.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Created on Mon Jan 24 11:48:34 2022
|
|
|
|
@author: Basics
|
|
"""
|
|
import numpy as np
|
|
import time
|
|
import scipy.signal as ss
|
|
import scipy
|
|
|
|
def apply_montage(data, matrix):
|
|
#print(matrix)
|
|
if data.shape[0]!=matrix.shape[0]:
|
|
if data.shape[0]==matrix.shape[0]+1:
|
|
data = data[:-1]
|
|
times = time.time()
|
|
new_data = np.zeros(data.shape)
|
|
# for i in range(matrix.shape[0]):
|
|
# for j in range(matrix.shape[1]):
|
|
# if float(abs(matrix[i,j]))>0:
|
|
# new_data[i,:]+=data[j]*float(matrix[i,j])
|
|
# print(time.time()-times)
|
|
print(data.shape, matrix.shape)
|
|
new_data = (data.T@matrix.T).T
|
|
return new_data
|
|
|
|
def eye_reg(eeg, eog, regg = True, Fs=1000):
|
|
if regg:
|
|
print(eeg.shape)
|
|
eeg = eeg.T
|
|
#eeg = ss.detrend(eeg.T)
|
|
print(eog.shape)
|
|
[A,B] = ss.butter(2, 40/(Fs/2), 'lowpass')
|
|
eog = ss.filtfilt(A, B, eog)
|
|
eogt = ss.detrend(eog.reshape([len(eog),1]),axis=0)
|
|
#eogt = eog.reshape([len(eog),1])
|
|
data_reg_eog = eeg - np.dot(eogt, np.linalg.lstsq(eogt,eeg, rcond=None)[0])
|
|
print('EYE REG ZROBIONEEEE hmmm')
|
|
print(data_reg_eog.shape, eogt.shape,eeg.shape)
|
|
# plt.figure()
|
|
# plt.plot(data_reg_eog[:,-5],c='b')
|
|
# plt.plot(eeg[:,-5], c='orange')
|
|
# plt.plot(eogt, c='green')
|
|
# plt.show()
|
|
return(data_reg_eog).T
|
|
else:
|
|
return(eeg)
|
|
|
|
def most_frequent(arr):
|
|
"""returns most frequent item in the array
|
|
|
|
Parameters
|
|
------------
|
|
arr: array type
|
|
|
|
Returns
|
|
------------
|
|
most frequent item in the list or 0
|
|
"""
|
|
try:
|
|
counts = np.bincount(arr)
|
|
return np.argmax(counts)
|
|
#return max(set(List), key = List.count)
|
|
except:
|
|
return 0
|
|
|
|
def connect_sig(data1, data2, fs):
|
|
"""Knowing that signal doesn't updates perfectly every x seconds, I was looking for a way
|
|
to connect it in proper time, but finally other approach have been used
|
|
|
|
Parameters
|
|
-------------
|
|
data1, data2: numpy array types with MxN size
|
|
fs: int
|
|
sampling rate
|
|
"""
|
|
print(data1.shape, data2.shape)
|
|
if all(data1[:, -fs] == None):
|
|
print(data2[:, -fs])
|
|
data_ret = data1.copy()
|
|
data_ret[:, :-fs] = data2[:, -fs].reshape(-1, 1)
|
|
data_ret[:,-fs:] = data2[:, -fs:]
|
|
return data_ret, 800
|
|
|
|
print(data2.shape)
|
|
data2 = data2
|
|
startt = time.time()
|
|
num = data1.shape[0]
|
|
size = data1.shape[1]
|
|
pts = list()
|
|
data_ret = np.zeros(data1.shape)
|
|
data_ret[:, :-fs] = data1[:,fs:]
|
|
|
|
if data2.shape[0]==0 or data2.shape[1]==0:
|
|
return data2
|
|
if fs<2000:
|
|
for i in range(num):
|
|
try:
|
|
pts.extend(np.where(data1[i,-1]==data2[i])[0].tolist())
|
|
except:
|
|
print("ARBEJDE IKKEEE")
|
|
# data_ret = np.concatenate((data1, data2[:,-int(size):]),1)
|
|
data_ret[:,-fs:] = data2[:, -fs:]
|
|
return data_ret, 800
|
|
#print('hehe', time.time()-startt)
|
|
most_fr = most_frequent(np.array(pts))
|
|
#print('hehe', time.time()-startt)
|
|
print(list(pts).count(most_fr)>num//2, most_fr)
|
|
if fs<2000 and list(pts).count(most_fr)>num//2:
|
|
most_fr = most_fr+1
|
|
#print(data2[:,int(pts[i]):int(pts[i]+size)].shape)
|
|
print('hehe', time.time()-startt)
|
|
#data_ret = np.concatenate((data1, data2[:,int(most_fr)+1:int(most_fr+size)+1]),1)
|
|
data_ret[:,-fs:] = data2[:, most_fr:most_fr+fs]
|
|
print('connect:', time.time()-startt)
|
|
return data_ret, most_fr
|
|
else:
|
|
data_ret[:,-fs:] = data2[:, -fs:]
|
|
print("NEEEJJJJJJJ")
|
|
print('connect:', time.time()-startt)
|
|
return data_ret, 800
|
|
|
|
def set_to_gray(lines):
|
|
for i in range(lines.shape[0]):
|
|
for j in range(lines.shape[1]):
|
|
if type(lines[i,j]) != int:
|
|
lines[i,j][0].set_color("gray")
|
|
|
|
def update_stem(line, data, ax, relim=False):
|
|
x = adjust_hist(data[1])
|
|
y = data[0]
|
|
|
|
line[0].set_ydata(y)
|
|
line[0].set_xdata(x) # not necessary for constant x
|
|
|
|
# stemlines
|
|
# line[1].set_paths([np.array([[xx, 0], [xx, yy]]) for (xx, yy) in zip(x, y)])
|
|
# line[2].set_xdata([np.min(x), np.max(x)])
|
|
# line[2].set_ydata([0, 0]) # not necessary for constant bottom
|
|
if relim:
|
|
ax.relim()
|
|
# update ax.viewLim using the new dataLim
|
|
ax.autoscale_view(scalex=True)
|
|
|
|
|
|
def adjust_hist(data):
|
|
new_data = np.zeros(len(data)-1)
|
|
for i in range(len(new_data)):
|
|
new_data[i] = np.mean([data[i], data[i+1]])
|
|
return new_data
|
|
|
|
def min_zero(thr):
|
|
if thr[0]<0:
|
|
return [0, thr[1]]
|
|
else:
|
|
return thr
|
|
|
|
def pentropy(signal, Fs, nperseg=None, fmin=None, fmax=None):
|
|
#I think it's good to put own nperseg value, because default one is not always good in this case
|
|
f, time, S = ss.spectrogram(signal, Fs, nperseg=nperseg) #spectrogram of signal
|
|
if fmin and fmax:
|
|
idxs = np.where((f<fmax) & (f>fmin)) #choosing only bands that we are interested in
|
|
S = S[idxs]
|
|
P = np.zeros(S.shape)
|
|
H = np.zeros(S.shape[1])
|
|
for t in range(S.shape[1]):
|
|
for m in range(S.shape[0]):
|
|
P[m,t] = S[m,t]/np.sum(S[:,t],0) #according to matlab instruction
|
|
H[t] += -P[m,t]*np.log2(P[m,t]+0.000001)/np.log2(S.shape[0])
|
|
return f, S,time, H
|
|
|
|
def entropy(S, bins=100):
|
|
histo = np.histogram(S, bins)[0]
|
|
p = scipy.special.entr(histo/histo.sum())
|
|
ent = sum(p)
|
|
return ent
|
|
|
|
def check_integrity(stim):
|
|
"""If the list contains two consecutive values, the function leaves only the first.
|
|
It was needed because NeurOne sometimes returns two stimuli pulses78.
|
|
|
|
Parameters
|
|
-----------------
|
|
stim: list or array type with numbers, in our case indexes of stimuli
|
|
|
|
Returns
|
|
-----------------
|
|
ent: array with removed values
|
|
|
|
"""
|
|
#copy the list, I need two, because from one values are removed, and other gives
|
|
#proper index values
|
|
stim_c = list(stim.copy())
|
|
stim = list(stim)
|
|
for ind in range(len(stim_c)-1):
|
|
if stim_c[ind]==(stim_c[ind+1]-1):
|
|
stim.remove(stim_c[ind+1])
|
|
return np.array(stim)
|
|
|
|
def doubleMADsfromMedian(y,thresh=3.5):
|
|
# warning: this function does not check for NAs
|
|
# nor does it address issues when
|
|
# more than 50% of your data have identical values
|
|
m = np.median(y)
|
|
abs_dev = np.abs(y - m)
|
|
left_mad = np.median(abs_dev[y <= m])
|
|
right_mad = np.median(abs_dev[y >= m])
|
|
y_mad = left_mad * np.ones(len(y))
|
|
y_mad[y > m] = right_mad
|
|
modified_z_score = 0.6745 * abs_dev / y_mad
|
|
modified_z_score[y == m] = 0
|
|
return modified_z_score > thresh
|
|
|