199 lines
No EOL
6.5 KiB
Python
199 lines
No EOL
6.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Created on Tue Aug 16 09:15:49 2016
|
|
|
|
@author: KHM
|
|
"""
|
|
|
|
import socket,time,sys,datetime
|
|
import numpy as np
|
|
#import scipy.io
|
|
if sys.version_info[0]>=3:
|
|
import queue
|
|
else:
|
|
import Queue as queue
|
|
|
|
import threading,platform
|
|
if platform.system()=='Windows':
|
|
tfunc=time.time
|
|
else:
|
|
tfunc=time.time
|
|
|
|
def bytes_to_int32(databuf):
|
|
in_data=np.frombuffer(databuf,dtype=np.dtype([('1','i1'),('2','>u2')]))
|
|
data=in_data['1'].astype('i4')
|
|
data <<= 16
|
|
data |= in_data['2']
|
|
return data
|
|
|
|
def sampleLoop(no):
|
|
firstpackage=True
|
|
# databuf = np.empty((no.bufsiz,), dtype=np.uint8)
|
|
databuf = bytearray(b' ' * no.bufsiz)
|
|
readdump=False
|
|
if not no.readdump is None:
|
|
fnin=open(no.readdump,'rb')
|
|
readdump=True
|
|
tsamp=tfunc()+no.si
|
|
sampidx=0
|
|
firstsamp=0
|
|
oldsampidx=0
|
|
droppeds=0
|
|
timeout=False
|
|
if not no.dump is None:
|
|
if no.dump==True:
|
|
fn=open('dump_'+datetime.datetime.now().strftime('%Y%m%d_%H-%M-%S')+'.raw','wb')
|
|
else:
|
|
fn=open(no.dump,'wb')
|
|
while not no.stop:
|
|
try:
|
|
if readdump:
|
|
while tsamp>tfunc():
|
|
#time.sleep(tsamp-tfunc())
|
|
time.sleep(0.)
|
|
tsamp+=no.si
|
|
tsin=np.frombuffer(fnin.read(10),dtype=np.uint16)
|
|
nbytes=tsin[-1]
|
|
tsin=tsin[:-1].view(np.float64)[0]
|
|
databuf[:nbytes]=np.frombuffer(fnin.read(nbytes),dtype=np.uint8)
|
|
else:
|
|
nbytes=no.sock.recv_into(databuf)
|
|
t0=tfunc()
|
|
if timeout:
|
|
print('Connection re-established.')
|
|
timeout=False
|
|
except:
|
|
nbytes=0
|
|
if not timeout:
|
|
print('Timeout - package too small, will keep retrying every second.')
|
|
timeout=True
|
|
time.sleep(1.)
|
|
if nbytes>28:
|
|
#spacket = databuf[0]
|
|
#mainunit = databuf[1]
|
|
packetno = np.frombuffer(databuf[4:8],'>u4').copy()
|
|
nch = np.frombuffer(databuf[8:10],'>u2').copy()
|
|
nsamp = np.frombuffer(databuf[10:12],'>u2').copy()
|
|
sampidx = np.frombuffer(databuf[12:20],'>u8').copy()
|
|
tstamp = np.frombuffer(databuf[20:28],'>u8').copy()
|
|
if oldsampidx!=0:
|
|
ds=sampidx-oldsampidx
|
|
if ds>nsamp:
|
|
droppeds += ds-nsamp
|
|
print('Dropped %i samples'%(ds-nsamp,))
|
|
elif ds!=nsamp:
|
|
print('delta samp %i, samples %i'%(ds,nsamp))
|
|
else:
|
|
firstsamp=sampidx
|
|
oldsampidx=sampidx
|
|
data=bytes_to_int32(databuf[28:28 + 3 * nch[0] * nsamp[0]]).reshape((nsamp[0],nch[0]))
|
|
t1=tfunc()
|
|
if not no.ringbuffer is None:
|
|
if no.avgPackets:
|
|
data1=data.mean(axis=0, keepdims=True)
|
|
data1[:,-1]=np.max(data[:,-1])
|
|
no.updateRingBuffer(data1,sampidx,(tstamp,t1))
|
|
else:
|
|
no.updateRingBuffer(data,sampidx,(tstamp,t1))
|
|
if no.sendqueue:
|
|
no.queue.put((data,(packetno,sampidx,tstamp,t0,t1)))
|
|
if firstpackage:
|
|
no.tstamp0=(tstamp,t1)
|
|
if no.dump:
|
|
fn.write(np.array(t1).tostring())
|
|
fn.write(np.array(nbytes,dtype=np.uint16).tostring())
|
|
fn.write(databuf[:nbytes])
|
|
try:
|
|
fn.close()
|
|
except:
|
|
pass
|
|
try:
|
|
fnin.close()
|
|
except:
|
|
pass
|
|
totals=sampidx-firstsamp
|
|
if totals>0:
|
|
if droppeds>0:
|
|
print('Dropped %i out of %i samples (%.1f%%)'%(droppeds,totals,droppeds/totals*100.))
|
|
else:
|
|
print('Acquired %i samples none were dropped.'%(totals,))
|
|
else:
|
|
print('No samples acquired.')
|
|
|
|
class NO():
|
|
def __init__(self,ip='127.0.0.1',port=50000,buffersize=2**10,ringbuffersize=None,sendqueue=False,\
|
|
ringbuf_factor=2,dump=None,readdump=None,si=1./1000.,avgPackets=False):
|
|
if readdump is None:
|
|
self.sock=socket.socket(socket.AF_INET, # Internet
|
|
socket.SOCK_DGRAM) #UDP
|
|
self.sock.bind((ip, port))
|
|
self.sock.settimeout(2.)
|
|
self.avgPackets = avgPackets
|
|
self.bufsiz=buffersize
|
|
self.ip=ip
|
|
self.port=port
|
|
self.sampidx=0
|
|
self.tstamp=None
|
|
self.tstamp0=None
|
|
self.queue=queue.Queue()
|
|
self.A=None
|
|
self.stop=False
|
|
self.dump=dump
|
|
self.readdump=readdump
|
|
if ringbuffersize is None:
|
|
self.ringbuffer=None
|
|
else:
|
|
self.ringbuffer=True
|
|
self.idx=0
|
|
self.ringbufferinit=True
|
|
self.ringbuffersize=ringbuffersize
|
|
self.ringbuf_factor=ringbuf_factor
|
|
self.sendqueue=sendqueue
|
|
self.lock=threading.RLock()
|
|
self.si=si
|
|
|
|
def updateRingBuffer(self,data,i=None,tstamp=None):
|
|
if self.ringbufferinit:
|
|
self.ringbuffer=np.zeros((self.ringbuffersize*self.ringbuf_factor,data.shape[1]),dtype=np.float32)
|
|
self.ringbufferinit=False
|
|
ringbuf=self.ringbuffer
|
|
wlen=self.ringbuffersize
|
|
self.lock.acquire()
|
|
if (self.idx+data.shape[0])<=ringbuf.shape[0]:
|
|
ringbuf[self.idx:self.idx+data.shape[0],:]=data
|
|
self.idx+=data.shape[0]
|
|
else:
|
|
ringbuf[0:wlen-data.shape[0],:]=ringbuf[self.idx-wlen+data.shape[0]:self.idx,:]
|
|
self.idx=wlen
|
|
ringbuf[wlen-data.shape[0]:wlen,:]=data
|
|
self.datawindow=ringbuf[self.idx-wlen:self.idx]
|
|
if not i is None:
|
|
self.sampidx=i
|
|
if not tstamp is None:
|
|
self.tstamp=tstamp
|
|
self.lock.release()
|
|
|
|
def getBuffer(self,returnIdx=False):
|
|
self.lock.acquire()
|
|
try:
|
|
out=self.datawindow.copy()
|
|
except:
|
|
out=None
|
|
#print self.sampleno
|
|
if returnIdx:
|
|
out=(out,self.sampidx)
|
|
self.lock.release()
|
|
return out
|
|
|
|
def start(self):
|
|
self.thread=threading.Thread(target=sampleLoop,args=(self,))
|
|
self.thread.start()
|
|
|
|
def stopit(self):
|
|
self.stop=True
|
|
self.thread.join()
|
|
|
|
try:
|
|
self.sock.close()
|
|
except:
|
|
pass |