-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgamepad_reader.py
133 lines (112 loc) · 4.36 KB
/
gamepad_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""Importing PyGame & WebSocketServer"""
import pygame
from websocket_server import WebsocketServer
# PyGame initialization and setup
pygame.init()
MASTER_CLOCK = pygame.time.Clock()
MASTER_CLOCK_TIME = 144
# Some globals for joysticks
JOYSTICK_HANDLE = None
JOYSTICK_TO_USE = None
JOYSTICK_AXES = []
JOYSTICK_BUTTONS = []
JOYSTICK_HATS = []
# Some globals for websockets
WEBSOCKET_SERVER = None
WEBSOCKET_PORT = 5309
def setupJoystick():
"""Runs a setup sequence for picking the right joystick"""
# Keep an eye on the globals
global JOYSTICK_TO_USE
print(f"Total Joysticks Found:{pygame.joystick.get_count()}")
for _x in range(pygame.joystick.get_count()):
print(f"{_x} - {pygame.joystick.Joystick(_x).get_name()}")
print("Press a button on the controller you wish to use.")
joysticks = [pygame.joystick.Joystick(x) for x in range(pygame.joystick.get_count())]
EXIT_LOOP = False
while not EXIT_LOOP:
MASTER_CLOCK.tick(MASTER_CLOCK_TIME)
pygame.event.get()
for _x in range(pygame.joystick.get_count()):
for _y in range(joysticks[_x].get_numbuttons()):
if joysticks[_x].get_button(_y):
JOYSTICK_TO_USE = _x
EXIT_LOOP = True
print(f"Using {pygame.joystick.Joystick(JOYSTICK_TO_USE).get_name()}")
for _x in range(pygame.joystick.get_count()):
joysticks[_x].quit()
def compareInputs(a,b):
"""Compares two arrays and returns true if all is the same"""
if len(a) != len(b):
return False
for _value in range(len(a)):
if a[_value] != b[_value]:
return False
return True
def arrayDump(arg):
"""Dumps an array to a CSV."""
export_variable = ""
for index,value in enumerate(arg):
export_variable += str(value)
if (index + 1) != len(arg):
export_variable += ','
return export_variable
def new_user_notice(_client, _server):
"""Notifies console when a new user joins."""
print()
print("New connection!")
print()
#Setup the joystick
print()
setupJoystick()
JOYSTICK_HANDLE = pygame.joystick.Joystick(JOYSTICK_TO_USE)
JOYSTICK_HANDLE.init()
# Setup the arrays
JOYSTICK_AXES = list(range(JOYSTICK_HANDLE.get_numaxes()))
JOYSTICK_BUTTONS = list(range(JOYSTICK_HANDLE.get_numbuttons()))
JOYSTICK_HATS = list(range(JOYSTICK_HANDLE.get_numhats() * 2))
TEMP_JOYSTICK_AXES = []
TEMP_JOYSTICK_BUTTONS = []
TEMP_JOYSTICK_HATS = []
# Get the websocket server going
print(f"Starting Websocket Server on port {WEBSOCKET_PORT}...")
WEBSOCKET_SERVER = WebsocketServer(host='0.0.0.0', port=WEBSOCKET_PORT)
WEBSOCKET_SERVER.set_fn_new_client(new_user_notice)
WEBSOCKET_SERVER.run_forever(True)
# Main Loop
print()
EXIT_LOOP = False
while not EXIT_LOOP:
try:
#Take a sip and run some updates
MASTER_CLOCK.tick(MASTER_CLOCK_TIME)
pygame.event.get()
# Grab the Axis data
for _axis in range(JOYSTICK_HANDLE.get_numaxes()):
JOYSTICK_AXES[_axis] = round(JOYSTICK_HANDLE.get_axis(_axis),3)
# Grab the Button data
for _button in range(JOYSTICK_HANDLE.get_numbuttons()):
JOYSTICK_BUTTONS[_button] = JOYSTICK_HANDLE.get_button(_button)
# Grab the hat data
for _hat in range(JOYSTICK_HANDLE.get_numhats()):
# Convert to handle like Axis
JOYSTICK_HATS[_hat*2] = JOYSTICK_HANDLE.get_hat(_hat)[0]
JOYSTICK_HATS[(_hat*2)+1] = JOYSTICK_HANDLE.get_hat(_hat)[1]
# Do we have any updates?
if not compareInputs(TEMP_JOYSTICK_AXES, JOYSTICK_AXES):
print(f"Axes:{JOYSTICK_AXES}")
WEBSOCKET_SERVER.send_message_to_all(arrayDump(["axes"]+JOYSTICK_AXES))
if not compareInputs(TEMP_JOYSTICK_BUTTONS, JOYSTICK_BUTTONS):
print(f"Btns:{JOYSTICK_BUTTONS}")
WEBSOCKET_SERVER.send_message_to_all(arrayDump(["btns"]+JOYSTICK_BUTTONS))
if not compareInputs(TEMP_JOYSTICK_HATS, JOYSTICK_HATS):
print(f"Hats:{JOYSTICK_HATS}")
WEBSOCKET_SERVER.send_message_to_all(arrayDump(["hats"]+JOYSTICK_HATS))
# Update our temporary values
TEMP_JOYSTICK_AXES = JOYSTICK_AXES.copy()
TEMP_JOYSTICK_BUTTONS = JOYSTICK_BUTTONS.copy()
TEMP_JOYSTICK_HATS = JOYSTICK_HATS.copy()
except KeyboardInterrupt:
print("Keyboard Interrupt Detected - Closing")
break
pygame.quit()