Source code for ardurpc.handler
import binascii
import struct
import time
from ardurpc.exception import *
[docs]class Handler(object):
"""Handler base class."""
def __init__(self, serial=None, handler_id=None, name=None):
self.handler_id = handler_id
self.name = name
self.serial = serial
self._type_map = {
0x01: 'b',
0x02: 'B',
0x03: 'h',
0x04: 'H',
0x05: 'i',
0x06: 'I',
0x07: 'q',
0x08: 'Q'
}
self.timeout = 1
[docs] def _exec(self, command_id, fmt=None, *data):
"""
Execute a command on the microcontroller.
:param Integer command_id: The ID of the command
:param String fmt: The format of the data
:param *data: Parameters for the command
:return: Returns the result
"""
pkt_data = b''
if fmt is not None:
pkt_data = struct.pack(fmt, *data)
pkt_head = struct.pack(
'BBBB',
0,
self.handler_id,
command_id,
len(pkt_data)
)
hex_data = binascii.hexlify(pkt_head + pkt_data)
#print("exec", hex_data)
i = 0
while i < 3:
#print("in", self.serial.timeout)
try:
self.serial.write(b':' + hex_data + b'\n')
#print("out")
result = self._get_result()
return result
except Timeout:
pass
time.sleep(1)
i = i + 1
[docs] def _get_result(self):
"""
Reade data from serial port and wait for the result.
:return: Parsed result
"""
time_start = time.time()
while True:
data = self.serial.readline()
time_diff = time.time() - time_start
if len(data) == 0:
if time_diff > self.timeout:
raise Timeout()
continue
#print("read:", data)
if data[0] == 58: # ASCII(58) = :
data = data.decode('ASCII')
data = data.rstrip()
#print(data)
return self._parse_result(binascii.unhexlify(data[1:]))
if time_diff > self.timeout:
raise Timeout()
[docs] def _parse_result(self, data):
"""
Parse the result data.
:param String data: Data to parse.
:return: Mixed-Type
"""
return_code = data[0]
if return_code == 127:
raise Failure()
if return_code == 126:
raise CommandNotFound()
if return_code == 125:
raise HandlerNotFound()
if return_code == 124:
raise FunctionNotFound()
if return_code > 0:
raise UnknownReturnCode()
return_type = data[1]
if return_type == 0x00:
return return_code
data = data[2:]
if return_type in self._type_map:
fmt = '>' + self._type_map[return_type]
return struct.unpack(fmt, data[:struct.calcsize(fmt)])[0]
# Array
if return_type == 0x10:
return_subtype = data[0]
length = struct.unpack('>B', data[1:2])[0]
data = data[2:]
if return_subtype not in self._type_map:
# ToDo
raise Exception
fmt = '>' + self._type_map[return_subtype] * length
return struct.unpack(fmt, data[:struct.calcsize(fmt)])
# String
if return_type == 0x11:
length = struct.unpack('>B', data[:1])[0]
data = data[1:]
name = struct.unpack('>%ds' % length, data[:length])[0]
name = name.decode('ASCII').replace('\0', '').strip()
return name
# Multicolumn array
if return_type == 0x12:
rows = []
fmt = '>'
num_cols = struct.unpack('>B', data[:1])[0]
data = data[1:]
while num_cols > 0:
num_cols = num_cols - 1
col_type = struct.unpack('>B', data[:1])[0]
data = data[1:]
if col_type in self._type_map:
fmt = fmt + self._type_map[col_type]
num_rows = struct.unpack('>B', data[:1])[0]
data = data[1:]
chunk_size = struct.calcsize(fmt)
#print(fmt)
while num_rows > 0:
num_rows = num_rows - 1
#print(data, num_rows, len(data), chunk_size)
if len(data) < chunk_size:
print('Error')
# ToDo: error
continue
rows.append(struct.unpack(fmt, data[:chunk_size]))
data = data[chunk_size:]
return rows