-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathrevoltoutlets.py
170 lines (125 loc) · 5.5 KB
/
revoltoutlets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#!/usr/bin/python
# Script to control the Revolt PX-1672 remote-control power outlets via the provided USB dongle.
# Requires pyusb.
# based on the pyusb tutorial script
import usb.core
import usb.util
import binascii
import math
import argparse
ACTION_VALUES = {
"on1": 15,
"off1": 14,
"on2": 13,
"off2": 12,
"on3": 11,
"off3": 10,
"on4": 9,
"off4": 8,
"on5": 7,
"off5": 6,
"on6": 5,
"off6": 4,
"on7": 3, # has no off7 counterpart!
"ona": 2,
"offa": 1,
"off8": 0, # has no on8 counterpart!
}
class RevoltController(object):
VENDOR_ID = 0xffff
PRODUCT_ID = 0x1122
# reminder: these are the variables that will be used by this class
device = None
interface_number = None
endpoint = None
def __enter__(self):
# find our device
self.device = usb.core.find(idVendor=self.VENDOR_ID, idProduct=self.PRODUCT_ID)
# was it found?
if self.device is None:
raise ValueError('Device not found')
# set the active configuration. With no arguments, the first
# configuration will be the active one
self.device.set_configuration()
# get an endpoint instance
device_configuration = self.device.get_active_configuration()
self.interface_number = device_configuration[(0, 0)].bInterfaceNumber
usb.util.claim_interface(self.device, self.interface_number)
alternate_setting = usb.control.get_interface(self.device, self.interface_number)
interface = usb.util.find_descriptor(
device_configuration, bInterfaceNumber=self.interface_number,
bAlternateSetting=alternate_setting
)
endpoint = usb.util.find_descriptor(
interface,
# match the first OUT endpoint
custom_match=(lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT)
)
assert endpoint is not None
self.endpoint = endpoint
return self
def __exit__(self, exc_type, exc_val, exc_tb):
usb.util.release_interface(self.device, self.interface_number)
def send_command(self, frame_id, frame_count, command):
message = self.prepare_message(frame_id, frame_count, command)
self.send_message(message)
def send_message(self, message):
# write the data
self.endpoint.write(binascii.a2b_hex(message))
def prepare_message(self, frame_id, frame_count, command):
msgpart_frame_count = hex(frame_count).split('x')[1].ljust(2, '0') #
# convert the id to hex but get rid of the '0x' at the beginning to be able to
# concatenate the message and make sure there are 4 characters (1 byte in hex)
msgpart_id = hex(frame_id).split('x')[1].ljust(4, '0')
msgpart_padding = "20" # not relevant padding
msgpart_end = "0000" # unknown, not relevant
if command in ACTION_VALUES:
raw_action = ACTION_VALUES[command]
else:
raise ValueError('unknown action: %s' % command)
msgpart_action = hex(raw_action).split('x')[1].ljust(2, '0')
# compute the checksum: byte01+02+03+04 mod 256 have to be 255
checksum = int(msgpart_id[:2], 16) + int(msgpart_id[2:], 16) + raw_action * 16
raw_checksum = int(math.ceil(checksum / 256.0) * 256) - checksum - 1
msgpart_checksum = hex(raw_checksum).split('x')[1].ljust(2, '0')
message = msgpart_id + msgpart_action + msgpart_checksum + msgpart_padding + msgpart_frame_count + msgpart_end
return message
def argparse_frame_count_constraints(value):
intvalue = int(value)
if intvalue < 1 or intvalue > 255:
raise argparse.ArgumentTypeError('Frame transmission count not in range (1 to 255): %s' % value)
return intvalue
def argparse_frame_id_constraints(value):
intvalue = int(value)
if intvalue < 0 or intvalue > 65535:
raise argparse.ArgumentTypeError('Frame ID not in range (0 to 65535): %s' % value)
return intvalue
def main():
# the original software knows 3 parameters
# Bit Width 100+6*50 seems not to be used, doesn't change the message
# ID 0-65535
default_raw_id = 6789
default_frame_count = 2 # 3-255 number of sent frames (resend)
parser = argparse.ArgumentParser()
parser.add_argument("command", nargs='+', help='{on|off}{1..6}')
parser.add_argument("--verbose", "-v", action='store_true', help="increase output verbosity")
parser.add_argument("--tx-count", "-n", type=argparse_frame_count_constraints, default=default_frame_count,
help='Number of frame transmissions. More transmissions increase chance that the outlet '
'receives them, but also increases transmission duration. Defaults to 2.')
parser.add_argument("--id", "-i", type=argparse_frame_id_constraints, default=default_raw_id,
help='Frame ID (0 - 65535). Can be used to control multiple sets of outlets. Defaults to 6789.')
args = parser.parse_args()
frame_count = args.tx_count
raw_id = args.id
if not args.command:
raise ValueError('No action specified')
if args.verbose:
print('Using ID %d' % raw_id)
print('Requesting %d frame transmissions' % frame_count)
with RevoltController() as controller:
for command in args.command:
if args.verbose:
print 'sending command \'%s\'' % command
controller.send_command(raw_id, frame_count, command)
if __name__ == "__main__":
main()