Skip to content
Snippets Groups Projects
Commit 9996233c authored by Václav Jelínek's avatar Václav Jelínek
Browse files

Add binary and ASCII modes to ESP BT data transfer

parent 043f4063
No related branches found
No related tags found
1 merge request!7Merge old hardware code with new hardware code
......@@ -41,26 +41,98 @@ class Esp:
MODE_BT = const(0)
MODE_WIFI = const(1)
MODE_CMD = const(2)
def __init__(self, baud_rate=115200):
self.uart = UART(INTERNAL_UART_HW_ID, baudrate=baud_rate, bits=8, parity=None, stop=1,
tx=Pin(INTERNAL_UART_TX_PIN), rx=Pin(INTERNAL_UART_RX_PIN), flow=0, invert=0)
self.uart_timer = Timer(-1)
self.uart_timer.init(mode=Timer.PERIODIC, period=10, callback=self.UART_RX_handler)
# CONFIG
self.header = 0
self.length = 0
self.message_idx = 0
self.message = bytes()
self.message_tmp = bytes()
self.message_cmd = bytes()
self.message_tmp_cmd = bytes()
self.payload = bytes()
self.new_message = False
self.new_cmd = False
self.esp_running = False
self.esp_mode = Esp.MODE_BT
self.esp_mode_prev = Esp.MODE_BT
self.not_responding = 0
# BT
self.messageTmp = ""
self.message = ""
self.binary_message = bytes()
self.binary_mode = False
self.binary_header = []
self.header = []
self.header_idx = 0
self.header_read = False
self.data_size = 0
self.data_idx = 0
self.new_message = False
# WIFI
self.wifi_indicators = tuple([False]*Esp.NUM_INDICATORS)
self.wifi_buttons = tuple([False]*Esp.NUM_BUTTONS)
self.wifi_switches = tuple([False]*Esp.NUM_SWITCHES)
def deinit(self):
self.uart_timer.deinit()
self.uart.deinit()
def cmd_start(self):
if self.esp_mode != Esp.MODE_CMD:
self.esp_mode_prev = self.esp_mode
self.esp_mode = Esp.MODE_CMD
def cmd_stop(self):
if self.esp_mode == Esp.MODE_CMD:
self.esp_mode = self.esp_mode_prev
# Return received message form ESP if available
# In ASCII mode return string message
# In binary mode return bytes message
def read(self):
if self.esp_mode == Esp.MODE_BT and self.message_ready():
self.new_message = False
if self.binary_mode:
return self.binary_message
else:
return self.message
else:
return None
# Send message to ESP
# In ASCII mode send string message
# In binary mode send bytes message with previously set header
def write(self, message):
if self.esp_mode == Esp.MODE_BT:
if self.binary_mode:
self.uart.write(self.binary_header+message)
else:
self.uart.write(message)
def message_ready(self):
if self.esp_mode == Esp.MODE_BT:
return self.new_message
else:
return None
# Set binary mode if header is not none
# Header is tuple of numbers (0-255)
# Data size is number of bytes in message (without header)
def set_binary(self, header=None, data_size=0):
if header:
self.header = header
header_format = ''
for i in header:
header_format += "B"
self.binary_header = struct.pack('<{}'.format(header_format), *header)
self.data_size = data_size
self.binary_mode = True
else:
self.binary_mode = False
def timeout(self):
self.not_responding += 1
......@@ -78,16 +150,16 @@ class Esp:
while self.uart.any():
self.uart.read()
self.message_idx = 0
self.new_message = False
self.message_tmp = bytes()
self.new_cmd = False
self.message_tmp_cmd = bytes()
def message_ready(self):
return self.new_message
def message_cmd_ready(self):
return self.new_cmd
def read_message(self):
if self.message_ready():
self.new_message = False
return self.message
if self.message_cmd_ready():
self.new_cmd = False
return self.message_cmd
else:
return None
......@@ -101,18 +173,18 @@ class Esp:
return Esp.TIMEOUT
def ack_received(self):
if self.new_message:
if self.new_cmd:
if self.header == Esp.COM_ACK:
self.new_message = False
self.new_cmd = False
return Esp.ACK
if self.header == Esp.COM_NACK:
self.new_message = False
self.new_cmd = False
return Esp.NACK
else:
return Esp.TIMEOUT
def reset(self, timeout=2000):
self.esp_mode = Esp.MODE_BT
self.esp_mode_prev = Esp.MODE_BT
self.flush()
time = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), time) < timeout:
......@@ -144,8 +216,8 @@ class Esp:
utime.sleep_ms(1)
if self.ack_received() == Esp.NACK:
return None
if self.new_message and self.header == Esp.COM_BT_PIN:
self.new_message = False
if self.new_cmd and self.header == Esp.COM_BT_PIN:
self.new_cmd = False
return struct.unpack('<I', self.payload)[0]
return -1
......@@ -257,38 +329,74 @@ class Esp:
def wifi_get_switches(self):
return tuple(self.wifi_switches)
def wifi_new_message(self, header, payload):
def wifi_new_cmd(self, header, payload):
if header == Esp.COM_WIFI_BUTTONS:
self.wifi_buttons = self.make_bools_from_bytes(Esp.NUM_BUTTONS, payload)
elif header == Esp.COM_WIFI_SWITCHES:
self.wifi_switches = self.make_bools_from_bytes(Esp.NUM_SWITCHES, payload)
def UART_RX_handler(self, p):
while self.uart.any():
b_ch = self.uart.read(1)
#print(chr(struct.unpack('<B', b_ch)[0]), end="")
self.message_tmp += b_ch
if self.message_idx == 0:
self.header = struct.unpack('<B', b_ch)[0]
self.message_idx += 1
if (self.esp_mode == Esp.MODE_WIFI
and self.header != COM_WIFI_BUTTONS
and self.header != COM_WIFI_SWITCHES):
self.flush()
elif self.message_idx == 1:
self.length = struct.unpack('<B', b_ch)[0]
self.message_idx += 1
elif self.message_idx == self.length + 2:
def config_message_receive(self, b_ch):
#print(chr(struct.unpack('<B', b_ch)[0]), end="")
self.message_tmp_cmd += b_ch
if self.message_idx == 0:
self.header = struct.unpack('<B', b_ch)[0]
self.message_idx += 1
if (self.esp_mode == Esp.MODE_WIFI
and self.header != COM_WIFI_BUTTONS
and self.header != COM_WIFI_SWITCHES):
self.flush()
elif self.message_idx == 1:
self.length = struct.unpack('<B', b_ch)[0]
self.message_idx += 1
elif self.message_idx == self.length + 2:
self.message_idx = 0
if self.calculate_checksum(self.message_tmp_cmd) == 0:
self.message_cmd = self.message_tmp_cmd
self.payload = self.message_cmd[2:-1]
self.message_tmp_cmd = bytes()
self.new_cmd = True
self.message_idx = 0
if self.calculate_checksum(self.message_tmp) == 0:
self.message = self.message_tmp
self.payload = self.message[2:-1]
self.message_tmp = bytes()
if self.esp_mode == Esp.MODE_WIFI:
self.wifi_new_cmd(self.header, self.payload)
else:
self.send_nack()
else:
self.message_idx += 1
def bt_message_receive(self, b_ch):
if self.binary_mode:
if self.header_read:
self.binary_message += b_ch
self.data_idx += 1
if self.data_idx == self.data_size:
self.new_message = True
self.message_idx = 0
if self.esp_mode == Esp.MODE_WIFI:
self.wifi_new_message(self.header, self.payload)
self.header_read = False
self.data_idx = 0
else:
u_ch = struct.unpack('<B', b_ch)[0]
if u_ch == self.header[self.header_idx]:
self.header_idx += 1
if len(self.header) == self.header_idx:
self.header_read = True
self.binary_message = bytes()
self.header_idx = 0
else:
self.send_nack()
self.header_idx = 0
else:
ascii_code = int.from_bytes(b_ch, 'big')
character = chr(ascii_code)
if ascii_code == 10 or ascii_code == 13:
self.message = self.messageTmp
self.messageTmp = ""
self.new_message = True
else:
self.message_idx += 1
\ No newline at end of file
self.messageTmp = self.messageTmp + character
def UART_RX_handler(self, p):
while self.uart.any():
b_ch = self.uart.read(1)
if self.esp_mode == Esp.MODE_CMD or self.esp_mode == Esp.MODE_WIFI:
self.config_message_receive(b_ch)
elif self.esp_mode == Esp.MODE_BT:
self.bt_message_receive(b_ch)
\ No newline at end of file
......@@ -63,11 +63,14 @@ def main():
robot.esp_command = True
robot.buttons.set_pin(8, False)
utime.sleep(0.01)
robot.esp.cmd_start()
def esp_command_off(robot):
robot.esp.cmd_stop()
robot.buttons.set_pin(8, True)
utime.sleep(0.01)
robot.esp_command = False
# Analyze pressed buttons for menu movement
def buttons(robot, button_values, robotState, menu_move, menu_debounce):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment