-
Notifications
You must be signed in to change notification settings - Fork 0
/
eml327.py
136 lines (100 loc) · 3.93 KB
/
eml327.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import serial
import time
import re
import atcommands
class EML327Connection(object):
__BAUD_RATES = [38400, 9600, 230400, 115200, 57600, 19200]
__ELM_PROMPT = b'>'
__RESPONSE_OK = 'OK'
def __init__(self, port=None, baud_rate=None, timeout=10):
self.__port = port
self.__baud_rate = baud_rate
self.__timeout = timeout
def connect(self):
try:
self.__serial_port = serial.serial_for_url(
self.__port,
parity = serial.PARITY_NONE,
stopbits = 1,
bytesize = 8,
timeout = self.__timeout)
self.__setup_connection()
except serial.SerialException:
raise ConnectionError("Failed to connect to serial port: %s" % self.__port)
def close(self):
if self.__serial_port is None:
return
self.send_at_command(atcommands.RESET, ok=False)
self.__serial_port.close()
def set_at_flag(self, flag, state, echo=False, ok=True, delay=None):
command = flag + ('1' if state else '0')
self.send_at_command(command, echo, ok, delay)
def send_at_command(self, command, echo=False, ok=True, delay=None):
response = self.send(b"%s%s" % (atcommands.PREFIX, command))
if response is None:
raise ConnectionError("No response from EML")
if not ok:
return response
if echo:
for line in response:
if __RESPONSE_OK in line:
return response
else:
if len(response) == 1 and response[0] == __RESPONSE_OK:
return response
raise ConnectionError("Unexepcted response from EML: %s" % response)
def send(self, data, delay=None):
if self.__serial_port is None:
return
self.__write(data)
if delay is None:
time.sleep(delay)
return self.__read_lines()
def __setup_connection(self):
baud_rate = self.__baud_rate or self.__auto_baud_rate()
if baud_rate is None:
raise ConnectionError("Failed to auto detect baud")
try:
self.send_at_command(atcommands.RESET, ok=False, delay=1)
self.set_at_flag(atcommands.ECHO_FLAG, False, echo=True)
self.set_at_flag(atcommands.HEADER_FLAG, True)
self.set_at_flag(atcommands.LINEFEED_FLAG, False)
except:
raise ConnectionError("Failed to setup connection")
def __auto_baud_rate(self):
for baud_rate in self.__BAUD_RATES:
try:
self.__serial_port.baudrate = baud_rate
self.__serial_port.flushInput()
self.__serial_port.flushOutput()
command = b"\x7F"
# Write a nonsense command to get a prompt
self.__write(command * 2);
# If we're prompt then assume the correct baudrate
response = self.__serial_port.read(1024)
if response.endswith(self.__ELM_PROMPT):
return baud_rate
except serial.SerialException:
pass
def __write(self, data):
self.__serial_port.flushInput()
self.__serial_port.write(b"%s\r\n" % data)
self.__serial_port.flush()
def __read(self):
if self.__serial_port is None:
return
buffer = bytearray()
while True:
length = self.__serial_port.in_waiting or 1
data = self.__serial_port.read(length)
if data is None:
break
buffer.extend(data)
if __ELM_PROMPT in data:
break
buffer = re.sub(b"\x00", b"", buffer)
if buffer.endswith(self.ELM_PROMPT):
buffer = buffer[:-1]
decoded = buffer.decode()
lines = [s.strip() for s in re.split("[\r\n]", string) if bool(s)]
return lines