-
Notifications
You must be signed in to change notification settings - Fork 0
/
visualtest_ani_withZaxis.py
102 lines (84 loc) · 3.05 KB
/
visualtest_ani_withZaxis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import asyncio
import websockets
import json
import logging
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import threading
import queue
import random
import time
logging.basicConfig(level=logging.INFO)
# Queue to store received coordinates and IDs
plot_queue = queue.Queue()
# Dictionary to store colors for different IDs
id_colors = {}
# List to store scatter plot points and their creation times
points = []
# Create a new figure for the 3D scatter plot
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.set_xlabel('X (meters)')
ax.set_ylabel('Y (meters)')
ax.set_zlabel('Z (meters)')
# Set the limits for the scatter plot
ax.set_xlim(-3, 6) # X-axis limit from -6 to 6 meters
ax.set_ylim(-2, 7) # Y-axis limit from -2 to 7 meters
ax.set_zlim(-7, 7) # Z-axis limit from -7 to 7 meters
# Function to update the plot
def update(frame):
global points
current_time = time.time()
# Remove points that are older than three seconds
points = [(p, t) for (p, t) in points if current_time - t < 10]
# Clear previous plot
ax.clear()
ax.set_xlabel('X (meters)')
ax.set_ylabel('Y (meters)')
ax.set_ylabel('Z (meters)')
# Restore the limits after clearing the plot
ax.set_xlim(-3, 6)
ax.set_ylim(-2, 7)
ax.set_zlim(-7, 7)
# Plot points, adjusting alpha based on age
for p, t in points:
obj_id, x, y, z = p # Ignore Z coordinate
color = id_colors.get(obj_id, (random.random(), random.random(), random.random()))
id_colors[obj_id] = color
alpha = max(0, 1 - (current_time - t) / 3) # Fade over 3 seconds
ax.scatter(x, y, z, c=[color], alpha=alpha)
# Add new points
while not plot_queue.empty():
p = plot_queue.get()
points.append((p, current_time))
# Set up the animation
ani = FuncAnimation(fig, update, interval=50)
async def test():
uri = "ws://192.168.8.9:8765"
async with websockets.connect(uri) as websocket:
while True:
try:
# Set a timeout of 10 seconds for receiving a message
message = await asyncio.wait_for(websocket.recv(), timeout=10)
data = json.loads(message)
logging.info(data)
# Check if data has 'obj_id', 'x', 'y', and 'z'
if all(key in data for key in ['obj_id', 'x', 'y', 'z']):
obj_id, x, y, z = data['obj_id'], data['x'], data['y'], data['z']
plot_queue.put((obj_id, x, y, z))
except asyncio.TimeoutError:
logging.warning("No data received for 10 seconds. Continuing to listen...")
except websockets.exceptions.ConnectionClosed:
logging.error("Unreal Engine disconnected")
break
except Exception as e:
logging.error(e)
continue
# Function to run the asyncio loop
def run_loop():
asyncio.run(test())
# Start the asyncio loop in a separate thread
loop_thread = threading.Thread(target=run_loop)
loop_thread.start()
# Show the plot
plt.show()