-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathset_synchronous_mode.py
59 lines (47 loc) · 1.55 KB
/
set_synchronous_mode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import sys
import os
import settings
sys.path.append(settings.CARLA_EGG_PATH)
import carla
import random
import numpy as np
try:
import queue
except ImportError:
import Queue as queue
class CarlaSyncMode(object):
def __init__(self, world, *sensors, **kwargs):
self.world = world
self.sensors = sensors
self.frame = None
self.delta_seconds = 1.0 / kwargs.get('fps', 20)
self._queues = []
self._settings = None
def __enter__(self):
self._settings = self.world.get_settings()
self.frame = self.world.apply_settings(carla.WorldSettings(
no_rendering_mode=False,
synchronous_mode=True,
fixed_delta_seconds=self.delta_seconds))
def make_queue(register_event):
q = queue.Queue()
register_event(q.put)
self._queues.append(q)
make_queue(self.world.on_tick)
for sensor in self.sensors:
make_queue(sensor.listen)
return self
def tick(self, timeout):
self.frame = self.world.tick()
data = [self._retrieve_data(q, timeout) for q in self._queues]
assert all(x.frame == self.frame for x in data)
return data
def __exit__(self, *args, **kwargs):
self.world.apply_settings(self._settings)
def _retrieve_data(self, sensor_queue, timeout):
while True:
data = sensor_queue.get(timeout=timeout)
if data.frame == self.frame:
return data
def tick_no_data(self):
self.frame = self.world.tick()