Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 81 additions & 30 deletions thinkgear/thinkgear.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@

import serial

from cStringIO import StringIO
if sys.version_info.major == 2:
from cStringIO import StringIO
else:
from io import BytesIO

import struct

Expand Down Expand Up @@ -67,28 +70,42 @@ def __init__(self, port):
# TODO: ???

self.serial = serial.Serial(port, 57600)
self.preread = StringIO()
if sys.version_info.major == 2:
self.preread = StringIO()
else:
self.preread = BytesIO()
self.io = self.serial

@staticmethod
def _chksum(packet):
return ~sum( ord(c) for c in packet ) & 0xff
if sys.version_info.major == 2:
return ~sum( ord(c) for c in packet ) & 0xff
else:
return ~sum( c for c in packet ) & 0xff

def _read(self, n):
buf = self.io.read(n)
if len(buf) < n:
_log.debug('incomplete read, short %s bytes', n - len(buf))
_log.debug('incomplete read 1, short %s bytes', n - len(buf))
if self.io == self.preread:
_log.debug('end of preread buffer')
self.preread.reset()
self.preread.truncate()
#self.preread.reset() # Just create a new one : faster
#self.preread.truncate()
if sys.version_info.major == 2:
self.preread = StringIO()
else:
self.preread = BytesIO()
self.io = self.serial
buf += self.io.read(n-len(buf))
if len(buf) < n:
_log.debug('incomplete read, short %s bytes', n - len(buf))
_log.debug('incomplete read 2, short %s bytes', n - len(buf))

for o in xrange(0, len(buf), 16):
_bytelog.debug('%04X '+' '.join(('%02X',)*len(buf[o:o+16])), o, *( ord(c) for c in buf[o:o+16] ))
if sys.version_info.major == 2:
for o in xrange(0, len(buf), 16):
_bytelog.debug('%04X '+' '.join(('%02X',)*len(buf[o:o+16])), o, *( ord(c) for c in buf[o:o+16] ))
else:
for o in range(0, len(buf), 16):
_bytelog.debug('%04X '+' '.join(('%02X',)*len(buf[o:o+16])), o, *( c for c in buf[o:o+16] ))

return buf

Expand All @@ -105,21 +122,33 @@ def get_packets(self):
while True:
last_two = last_two[-1:]+(self._read(1),)
#_log.debug('last_two: %r', last_two)
if last_two == ('\xAA','\xAA'):
plen = self._read(1)
if plen >= '\xAA':
if sys.version_info.major == 2:
sync_byte = '\xAA' # 170
else:
sync_byte = b'\xAA' # 170
if last_two == (sync_byte, sync_byte): # Detect a packet header
plen = self._read(1) # read a payload length
if plen >= sync_byte: # Any value from 0 up to 169
# Bogosity
_log.debug('discarding %r while syncing', last_two[0])
_log.debug('discarding %r while syncing: Payload length too large.', last_two[0])
last_two = last_two[-1:]+(plen,)

else:
last_two = ()
packet = self._read(ord(plen))
if sys.version_info.major == 2:
packet = self._read(ord(plen))
else:
packet = self._read(int.from_bytes(plen, "big"))

checksum = self._read(1)

if ord(checksum) == self._chksum(packet):
yield self._decode(packet)
if sys.version_info.major == 2:
chksum = ord(checksum)
else:
chksum = int.from_bytes(checksum, "big")

if chksum == self._chksum(packet):
yield self._decode(packet)
else:
_log.debug('bad checksum')
self._deread(packet+checksum)
Expand All @@ -131,22 +160,39 @@ def _decode(self, packet):
decoded = []

while packet:
if sys.version_info.major == 2:
excode_byte = '\x55' # Extended code
else:
excode_byte = b'\55' # Extended code
extended_code_level = 0
while len(packet) and packet[0] == '\x55':
while len(packet) and packet[0] == excode_byte:
extended_code_level += 1
packet = packet[1:]
if len(packet) < 2:
_log.debug('ran out of packet: %r', '\x55'*extended_code_level+packet)
if sys.version_info.major == 2:
_log.debug('ran out of packet: %r', excode_byte*extended_code_level+packet)
else:
_log.debug('ran out of packet:' + '\\x55'*extended_code_level + ''.join(('\\x%02X',)*len(packet)), *( c for c in packet ))
break
code = ord(packet[0])

if sys.version_info.major == 2:
code = ord(packet[0])
else:
code = packet[0]
if code < 0x80:
value = packet[1]
packet = packet[2:]
else:
vlen = ord(packet[1])
if len(packet) < 2+vlen:
_log.debug('ran out of packet: %r', '\x55'*extended_code_level+chr(code)+chr(vlen)+packet)
break
if sys.version_info.major == 2:
vlen = ord(packet[1])
if len(packet) < 2+vlen:
_log.debug('ran out of packet: %r', '\x55'*extended_code_level+chr(code)+chr(vlen)+packet)
break
else:
vlen = packet[1]
if len(packet) < 2+vlen:
_log.debug('ran out of packet:' + '\\x55'*extended_code_level + ''.join(('\\x%02X',)*len(packet)), *( c for c in packet ))
break
value = packet[2:2+vlen]
packet = packet[2+vlen:]

Expand Down Expand Up @@ -177,8 +223,9 @@ def __new__(mcls, name, bases, data):
data_types[(extended_code_level,code)] = cls
return cls

ThinkGearClass = ThinkGearMetaClass("ThinkGearData", (object, ), {"__doc__": ThinkGearMetaClass.__doc__})

class ThinkGearData(object):
class ThinkGearData(ThinkGearClass):
def __init__(self, extended_code_level, code, value):
self.extended_code_level = extended_code_level
self.code = code
Expand All @@ -193,8 +240,6 @@ def _decode(v):
def __str__(self):
return self._strfmt % vars(self)

__metaclass__ = ThinkGearMetaClass

_log = logging.DEBUG


Expand All @@ -207,21 +252,24 @@ class ThinkGearPoorSignalData(ThinkGearData):
'''POOR_SIGNAL Quality (0-255)'''
code = 0x02
_strfmt = 'POOR SIGNAL: %(value)s'
_decode = staticmethod(ord)
if sys.version_info.major == 2:
_decode = staticmethod(ord)


class ThinkGearAttentionData(ThinkGearData):
'''ATTENTION eSense (0 to 100)'''
code = 0x04
_strfmt = 'ATTENTION eSense: %(value)s'
_decode = staticmethod(ord)
if sys.version_info.major == 2:
_decode = staticmethod(ord)


class ThinkGearMeditationData(ThinkGearData):
'''MEDITATION eSense (0 to 100)'''
code = 0x05
_strfmt = 'MEDITATION eSense: %(value)s'
_decode = staticmethod(ord)
if sys.version_info.major == 2:
_decode = staticmethod(ord)


class ThinkGearRawWaveData(ThinkGearData):
Expand All @@ -243,7 +291,10 @@ class ThinkGearEEGPowerData(ThinkGearData):

code = 0x83
_strfmt = 'ASIC EEG Power: %(value)r'
_decode = staticmethod(lambda v: EEGPowerData(*struct.unpack('>8L', ''.join( '\x00'+v[o:o+3] for o in xrange(0, 24, 3)))))
if sys.version_info.major == 2:
_decode = staticmethod(lambda v: EEGPowerData(*struct.unpack('>8L', ''.join( '\x00'+v[o:o+3] for o in xrange(0, 24, 3)))))
else:
_decode = staticmethod(lambda v: EEGPowerData(*[ int.from_bytes(v[o:o+3], "big") for o in range(0, 24, 3)]))


def main():
Expand Down