diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..96310da --- /dev/null +++ b/examples/README.md @@ -0,0 +1,2 @@ +# Dummy examples created by @ll7 + diff --git a/examples/example01.py b/examples/example01.py new file mode 100644 index 0000000..c23bdf0 --- /dev/null +++ b/examples/example01.py @@ -0,0 +1,13 @@ +""" +# Example 01: basic import test +""" +import pysocialforce as pysf + + +simulator = pysf.Simulator_v2() + +for step in range(10): + simulator.step() + print(f"step {step}") + print(simulator.states.ped_positions) + print("=================") diff --git a/examples/example02.py b/examples/example02.py new file mode 100644 index 0000000..16261e5 --- /dev/null +++ b/examples/example02.py @@ -0,0 +1,26 @@ +""" +# Example 02: test the configuration of the simulator +""" +import pysocialforce as pysf + +obstacle01 = pysf.map_config.Obstacle( + [(10, 10), (15,10), (15, 15), (10, 15)]) +obstacle02 = pysf.map_config.Obstacle( + [(20, 10), (25,10), (25, 15), (20, 15)]) + +route01 = pysf.map_config.GlobalRoute( + [(0, 0), (10, 10), (20, 10), (30, 0)]) +crowded_zone01 = ((10, 10), (20, 10), (20, 20)) + +map_def = pysf.map_config.MapDefinition( + obstacles=[obstacle01, obstacle02], + routes=[route01], + crowded_zones=[crowded_zone01]) + +simulator = pysf.Simulator_v2(map_def) + +for step in range(10): + simulator.step() + print(f"step {step}") + print(simulator.states.ped_positions) + print("=================") diff --git a/examples/example03.py b/examples/example03.py new file mode 100644 index 0000000..6599937 --- /dev/null +++ b/examples/example03.py @@ -0,0 +1,36 @@ +""" +# Example 03: simulate with visual rendering +""" +import pysocialforce as pysf +import numpy as np + +obstacle01 = pysf.map_config.Obstacle( + [(10, 10), (15,10), (15, 15), (10, 15)]) +obstacle02 = pysf.map_config.Obstacle( + [(20, 10), (25,10), (25, 15), (20, 15)]) + +route01 = pysf.map_config.GlobalRoute( + [(0, 0), (10, 10), (20, 10), (30, 0)]) +crowded_zone01 = ((10, 10), (20, 10), (20, 20)) + +map_def = pysf.map_config.MapDefinition( + obstacles=[obstacle01, obstacle02], + routes=[route01], + crowded_zones=[crowded_zone01]) + +simulator = pysf.Simulator_v2(map_def) +sim_view = pysf.SimulationView(obstacles=map_def.obstacles, scaling=10) +sim_view.show() + +for step in range(10_000): + simulator.step() + ped_pos = np.array(simulator.states.ped_positions) + ped_vel = np.array(simulator.states.ped_velocities) + actions = np.concatenate(( + np.expand_dims(ped_pos, axis=1), + np.expand_dims(ped_pos + ped_vel, axis=1) + ), axis=1) + state = pysf.sim_view.VisualizableSimState(step, ped_pos, actions) + sim_view.render(state, fps=10) + +sim_view.exit() diff --git a/pysocialforce/__init__.py b/pysocialforce/__init__.py index 34508d1..392094c 100644 --- a/pysocialforce/__init__.py +++ b/pysocialforce/__init__.py @@ -5,3 +5,4 @@ from .logging import * from .simulator import Simulator, Simulator_v2 from .forces import * +from .sim_view import SimulationView, VisualizableSimState diff --git a/pysocialforce/config.py b/pysocialforce/config.py index d11e2d0..8752225 100644 --- a/pysocialforce/config.py +++ b/pysocialforce/config.py @@ -7,7 +7,7 @@ class SceneConfig: enable_group: bool = True agent_radius: float = 0.35 - step_width: float = 1.0 + dt_secs: float = 0.1 max_speed_multiplier: float = 1.3 tau: float = 0.5 resolution: float = 10 diff --git a/pysocialforce/map_config.py b/pysocialforce/map_config.py index 5000886..07231a0 100644 --- a/pysocialforce/map_config.py +++ b/pysocialforce/map_config.py @@ -6,11 +6,22 @@ Vec2D = Tuple[float, float] Line2D = Tuple[float, float, float, float] +Circle = Tuple[Vec2D, float] Rect = Tuple[Vec2D, Vec2D, Vec2D] Zone = Tuple[Vec2D, Vec2D, Vec2D] # rect ABC with sides |A B|, |B C| and diagonal |A C| def sample_zone(zone: Zone, num_samples: int) -> List[Vec2D]: + """ + Sample points within a given zone. + + Args: + zone (Zone): The zone defined by three points. + num_samples (int): The number of points to sample. + + Returns: + List[Vec2D]: A list of sampled points within the zone. + """ a, b, c = zone a, b, c = np.array(a), np.array(b), np.array(c) vec_ba, vec_bc = a - b, c - b @@ -20,8 +31,36 @@ def sample_zone(zone: Zone, num_samples: int) -> List[Vec2D]: return [(x, y) for x, y in points] +def sample_circle(circle: Circle, num_samples: int) -> List[Vec2D]: + """ + Sample points within a given circle. + + Args: + circle (Circle): The circle defined by center point and radius. + num_samples (int): The number of points to sample. + + Returns: + List[Vec2D]: A list of sampled points within the zone. + """ + center, radius = circle + rot = np.random.uniform(0, np.pi*2, (num_samples, 1)) + radius = np.random.uniform(0, radius, (num_samples, 1)) + rel_x, rel_y = np.cos(rot) * radius, np.sin(rot) * radius + points = np.concatenate((rel_x, rel_y), axis=1) + np.array([center]) + return [(x, y) for x, y in points] + + @dataclass class Obstacle: + """ + Represents an obstacle in the map. + + Attributes: + vertices (List[Vec2D]): The vertices of the obstacle. + lines (List[Line2D]): The lines formed by connecting the vertices. + vertices_np (np.ndarray): The vertices as a NumPy array. + """ + vertices: List[Vec2D] lines: List[Line2D] = field(init=False) vertices_np: np.ndarray = field(init=False) @@ -43,30 +82,62 @@ def __post_init__(self): @dataclass class GlobalRoute: - spawn_id: int - goal_id: int + """ + Represents a global route from a spawn point to a goal point in a map. + """ waypoints: List[Vec2D] - spawn_zone: Rect - goal_zone: Rect + spawn_radius: float = 5.0 def __post_init__(self): - if self.spawn_id < 0: - raise ValueError('Spawn id needs to be an integer >= 0!') - if self.goal_id < 0: - raise ValueError('Goal id needs to be an integer >= 0!') + """ + Initializes the GlobalRoute object. + + Raises: + + ValueError: If the route contains no waypoints. + """ + if len(self.waypoints) < 1: - raise ValueError(f'Route {self.spawn_id} -> {self.goal_id} contains no waypoints!') + raise ValueError(f'Route contains no waypoints!') + + @property + def spawn_circle(self) -> Circle: + """ + Returns the spawn circle of the route. + + Returns: + Circle: The spawn circle of the route. + """ + return (self.waypoints[0], self.spawn_radius) @property def sections(self) -> List[Tuple[Vec2D, Vec2D]]: + """ + Returns a list of sections along the route. + + Returns: + List[Tuple[Vec2D, Vec2D]]: The list of sections, where each section is represented by a tuple of two waypoints. + """ return [] if len(self.waypoints) < 2 else list(zip(self.waypoints[:-1], self.waypoints[1:])) @property def section_lengths(self) -> List[float]: + """ + Returns a list of lengths of each section along the route. + + Returns: + List[float]: The list of section lengths. + """ return [dist(p1, p2) for p1, p2 in self.sections] @property def section_offsets(self) -> List[float]: + """ + Returns a list of offsets for each section along the route. + + Returns: + List[float]: The list of section offsets. + """ lengths = self.section_lengths offsets = [] temp_offset = 0.0 @@ -77,11 +148,25 @@ def section_offsets(self) -> List[float]: @property def total_length(self) -> float: + """ + Returns the total length of the route. + + Returns: + float: The total length of the route. + """ return 0 if len(self.waypoints) < 2 else sum(self.section_lengths) @dataclass class MapDefinition: + """ + Represents the definition of a map. + + Attributes: + obstacles (List[Obstacle]): A list of obstacles in the map. + routes (List[GlobalRoute]): A list of global routes in the map. + crowded_zones (List[Zone]): A list of crowded zones in the map. + """ obstacles: List[Obstacle] routes: List[GlobalRoute] crowded_zones: List[Zone] diff --git a/pysocialforce/navigation.py b/pysocialforce/navigation.py index db164ae..3042117 100644 --- a/pysocialforce/navigation.py +++ b/pysocialforce/navigation.py @@ -8,6 +8,17 @@ @dataclass class RouteNavigator: + """ + A class that represents a route navigator for navigating through waypoints. + + Attributes: + waypoints (List[Vec2D]): The list of waypoints to navigate. + waypoint_id (int): The index of the current waypoint. + proximity_threshold (float): The proximity threshold for reaching a waypoint. + pos (Vec2D): The current position of the navigator. + reached_waypoint (bool): Indicates whether the current waypoint has been reached. + """ + waypoints: List[Vec2D] = field(default_factory=list) waypoint_id: int = 0 proximity_threshold: float = 1.0 # info: should be set to vehicle radius + goal radius @@ -16,24 +27,54 @@ class RouteNavigator: @property def reached_destination(self) -> bool: + """ + Check if the destination has been reached. + + Returns: + bool: True if the destination has been reached, False otherwise. + """ return len(self.waypoints) == 0 or \ dist(self.waypoints[-1], self.pos) <= self.proximity_threshold @property def current_waypoint(self) -> Vec2D: + """ + Get the current waypoint. + + Returns: + Vec2D: The current waypoint. + """ return self.waypoints[self.waypoint_id] @property def next_waypoint(self) -> Optional[Vec2D]: + """ + Get the next waypoint. + + Returns: + Optional[Vec2D]: The next waypoint if available, None otherwise. + """ return self.waypoints[self.waypoint_id + 1] \ if self.waypoint_id + 1 < len(self.waypoints) else None @property def initial_orientation(self) -> float: + """ + Get the initial orientation of the navigator. + + Returns: + float: The initial orientation in radians. + """ return atan2(self.waypoints[1][1] - self.waypoints[0][1], self.waypoints[1][0] - self.waypoints[0][0]) def update_position(self, pos: Vec2D): + """ + Update the position of the navigator. + + Args: + pos (Vec2D): The new position of the navigator. + """ reached_waypoint = dist(self.current_waypoint, pos) <= self.proximity_threshold if reached_waypoint: self.waypoint_id = min(len(self.waypoints) - 1, self.waypoint_id + 1) @@ -41,5 +82,11 @@ def update_position(self, pos: Vec2D): self.reached_waypoint = reached_waypoint def new_route(self, route: List[Vec2D]): + """ + Set a new route for the navigator. + + Args: + route (List[Vec2D]): The new route to navigate. + """ self.waypoints = route self.waypoint_id = 0 diff --git a/pysocialforce/ped_behavior.py b/pysocialforce/ped_behavior.py index 1e5dd10..c4fa674 100644 --- a/pysocialforce/ped_behavior.py +++ b/pysocialforce/ped_behavior.py @@ -2,7 +2,7 @@ from typing import List, Dict, Tuple, Protocol from dataclasses import dataclass, field -from pysocialforce.map_config import GlobalRoute, sample_zone +from pysocialforce.map_config import GlobalRoute, sample_circle, sample_zone from pysocialforce.navigation import RouteNavigator from pysocialforce.ped_grouping import PedestrianGroupings @@ -20,12 +20,31 @@ def reset(self): @dataclass class CrowdedZoneBehavior: + """ + A class representing the behavior of pedestrians in crowded zones. + + Attributes: + groups (PedestrianGroupings): The pedestrian groupings. + zone_assignments (Dict[int, int]): The assignments of zones to pedestrians. + crowded_zones (List[Zone]): The list of crowded zones. + goal_proximity_threshold (float): The proximity threshold to the goal. + + Methods: + step(): Perform a step of the behavior. + reset(): Reset the behavior. + """ + groups: PedestrianGroupings zone_assignments: Dict[int, int] crowded_zones: List[Zone] goal_proximity_threshold: float = 1 def step(self): + """ + Perform a step of the behavior. + + If a pedestrian group is close to its goal, redirect the group to a new goal within its assigned crowded zone. + """ for gid in self.groups.group_ids: centroid = self.groups.group_centroid(gid) goal = self.groups.goal_of_group(gid) @@ -37,6 +56,11 @@ def step(self): self.groups.redirect_group(gid, new_goal) def reset(self): + """ + Reset the behavior. + + Redirect each pedestrian group to a new goal within its assigned crowded zone. + """ for gid in self.groups.group_ids: any_pid = next(iter(self.groups.groups[gid])) zone = self.crowded_zones[self.zone_assignments[any_pid]] @@ -46,6 +70,16 @@ def reset(self): @dataclass class FollowRouteBehavior: + """ + Represents the behavior of pedestrians following a predefined route. + + Attributes: + groups (PedestrianGroupings): The pedestrian groupings. + route_assignments (Dict[int, GlobalRoute]): The route assignments for each group. + initial_sections (List[int]): The initial sections for each group. + goal_proximity_threshold (float): The proximity threshold to consider a goal reached. + navigators (Dict[int, RouteNavigator]): The route navigators for each group. + """ groups: PedestrianGroupings route_assignments: Dict[int, GlobalRoute] initial_sections: List[int] @@ -53,6 +87,9 @@ class FollowRouteBehavior: navigators: Dict[int, RouteNavigator] = field(init=False) def __post_init__(self): + """ + Initializes the route navigators for each group based on the route assignments and initial sections. + """ self.navigators = {} for (gid, route), sec_id in zip(self.route_assignments.items(), self.initial_sections): group_pos = self.groups.group_centroid(gid) @@ -60,6 +97,9 @@ def __post_init__(self): route.waypoints, sec_id + 1, self.goal_proximity_threshold, group_pos) def step(self): + """ + Performs a step of the behavior for each group. + """ for gid, nav in self.navigators.items(): group_pos = self.groups.group_centroid(gid) nav.update_position(group_pos) @@ -69,13 +109,22 @@ def step(self): self.groups.redirect_group(gid, nav.current_waypoint) def reset(self): + """ + TODO: why does the reset method do nothing? + """ pass def respawn_group_at_start(self, gid: int): + """ + Respawns a group at the start of the route. + + Args: + gid (int): The group ID. + """ nav = self.navigators[gid] num_peds = self.groups.group_size(gid) - spawn_zone = self.route_assignments[gid].spawn_zone - spawn_positions = sample_zone(spawn_zone, num_peds) + circle = self.route_assignments[gid].spawn_circle + spawn_positions = sample_circle(circle, num_peds) self.groups.reposition_group(gid, spawn_positions) self.groups.redirect_group(gid, nav.waypoints[0]) nav.waypoint_id = 0 diff --git a/pysocialforce/ped_grouping.py b/pysocialforce/ped_grouping.py index 1d1cfc9..b8b2b26 100644 --- a/pysocialforce/ped_grouping.py +++ b/pysocialforce/ped_grouping.py @@ -9,6 +9,21 @@ @dataclass class PedestrianStates: + """ + Represents the states of pedestrians in a simulation. + + Attributes: + raw_states (np.ndarray): The raw states of the pedestrians. + + Methods: + num_peds() -> int: Returns the number of pedestrians. + ped_positions() -> np.ndarray: Returns the positions of all pedestrians. + redirect(ped_id: int, new_goal: Vec2D): Redirects a pedestrian to a new goal. + reposition(ped_id: int, new_pos: Vec2D): Repositions a pedestrian to a new position. + goal_of(ped_id: int) -> Vec2D: Returns the goal position of a pedestrian. + pos_of(ped_id: int) -> Vec2D: Returns the current position of a pedestrian. + pos_of_many(ped_ids: Set[int]) -> np.ndarray: Returns the positions of multiple pedestrians. + """ raw_states: np.ndarray @property @@ -18,6 +33,10 @@ def num_peds(self) -> int: @property def ped_positions(self) -> np.ndarray: return self.raw_states[:, 0:2] + + @property + def ped_velocities(self) -> np.ndarray: + return self.raw_states[:, 2:4] def redirect(self, ped_id: int, new_goal: Vec2D): self.raw_states[ped_id, 4:6] = new_goal @@ -39,35 +58,102 @@ def pos_of_many(self, ped_ids: Set[int]) -> np.ndarray: @dataclass class PedestrianGroupings: + """ + A class representing groupings of pedestrians. + + Attributes: + states (PedestrianStates): The states of the pedestrians. + groups (Dict[int, Set[int]]): A dictionary mapping group IDs to sets of pedestrian IDs. + group_by_ped_id (Dict[int, int]): A dictionary mapping pedestrian IDs to group IDs. + + Methods: + groups_as_lists() -> List[List[int]]: Returns the groups as lists of pedestrian IDs. + group_ids() -> Set[int]: Returns the set of group IDs. + group_centroid(group_id: int) -> Vec2D: Returns the centroid of a group. + group_size(group_id: int) -> int: Returns the size of a group. + goal_of_group(group_id: int) -> Vec2D: Returns the goal of a group. + new_group(ped_ids: Set[int]) -> int: Creates a new group with the given pedestrian IDs. + remove_group(group_id: int): Removes a group. + redirect_group(group_id: int, new_goal: Vec2D): Redirects the pedestrians in a group to a new goal. + reposition_group(group_id: int, new_positions: List[Vec2D]): Repositions the pedestrians in a group to new positions. + """ states: PedestrianStates groups: Dict[int, Set[int]] = field(default_factory=dict) group_by_ped_id: Dict[int, int] = field(default_factory=dict) @property def groups_as_lists(self) -> List[List[int]]: + """ + Returns the groups as lists of pedestrian IDs. + + Returns: + List[List[int]]: The groups as lists of pedestrian IDs. + """ # info: this facilitates slicing over numpy arrays # for some reason, numpy cannot slide over indices provided as set ... return [list(ped_ids) for ped_ids in self.groups.values()] @property def group_ids(self) -> Set[int]: + """ + Returns the set of group IDs. + + Returns: + Set[int]: The set of group IDs. + """ # info: ignore empty groups return {k for k in self.groups if len(self.groups[k]) > 0} def group_centroid(self, group_id: int) -> Vec2D: + """ + Returns the centroid of a group. + + Args: + group_id (int): The ID of the group. + + Returns: + Vec2D: The centroid of the group. + """ group = self.groups[group_id] positions = self.states.pos_of_many(group) c_x, c_y = np.mean(positions, axis=0) return (c_x, c_y) def group_size(self, group_id: int) -> int: + """ + Returns the size of a group. + + Args: + group_id (int): The ID of the group. + + Returns: + int: The size of the group. + """ return len(self.groups[group_id]) def goal_of_group(self, group_id: int) -> Vec2D: + """ + Returns the goal of a group. + + Args: + group_id (int): The ID of the group. + + Returns: + Vec2D: The goal of the group. + """ any_ped_id_of_group = next(iter(self.groups[group_id])) return self.states.goal_of(any_ped_id_of_group) def new_group(self, ped_ids: Set[int]) -> int: + """ + Creates a new group with the given pedestrian IDs. + + Args: + ped_ids (Set[int]): The set of pedestrian IDs. + + Returns: + int: The ID of the new group. + """ new_gid = max(self.groups.keys()) + 1 if self.groups.keys() else 0 self.groups[new_gid] = ped_ids.copy() for ped_id in ped_ids: @@ -78,15 +164,35 @@ def new_group(self, ped_ids: Set[int]) -> int: return new_gid def remove_group(self, group_id: int): + """ + Removes a group. + + Args: + group_id (int): The ID of the group. + """ ped_ids = deepcopy(self.groups[group_id]) for ped_id in ped_ids: self.new_group({ped_id}) self.groups[group_id].clear() def redirect_group(self, group_id: int, new_goal: Vec2D): + """ + Redirects the pedestrians in a group to a new goal. + + Args: + group_id (int): The ID of the group. + new_goal (Vec2D): The new goal position. + """ for ped_id in self.groups[group_id]: self.states.redirect(ped_id, new_goal) def reposition_group(self, group_id: int, new_positions: List[Vec2D]): + """ + Repositions the pedestrians in a group to new positions. + + Args: + group_id (int): The ID of the group. + new_positions (List[Vec2D]): The new positions of the pedestrians. + """ for ped_id, new_pos in zip(self.groups[group_id], new_positions): self.states.reposition(ped_id, new_pos) diff --git a/pysocialforce/ped_population.py b/pysocialforce/ped_population.py index d39a5bc..33c36df 100644 --- a/pysocialforce/ped_population.py +++ b/pysocialforce/ped_population.py @@ -17,6 +17,10 @@ @dataclass class PedSpawnConfig: + """ + Configuration class for pedestrian spawning. + """ + peds_per_area_m2: float=0.04 max_group_members: int=5 group_member_probs: List[float] = field(default_factory=list) @@ -25,25 +29,43 @@ class PedSpawnConfig: sidewalk_width: float = 3.0 def __post_init__(self): + """ + Initialize the PedSpawnConfig object. + If the length of group_member_probs is not equal to max_group_members, + initialize group size probabilities decaying by power law. + """ if len(self.group_member_probs) != self.max_group_members: # initialize group size probabilities decaying by power law power_dist = [self.group_size_decay**i for i in range(self.max_group_members)] self.group_member_probs = [p / sum(power_dist) for p in power_dist] - -def sample_route( +def sample_group_spawn_on_route( route: GlobalRoute, num_samples: int, sidewalk_width: float) -> Tuple[List[Vec2D], int]: - + """ + Sample points along a given route. + + Args: + route (GlobalRoute): The global route to sample points from. + num_samples (int): The number of points to sample. + sidewalk_width (float): The width of the sidewalk. + + Returns: + Tuple[List[Vec2D], int]: A tuple containing a list of sampled points and the section ID of the sampled point. + """ + sampled_offset = np.random.uniform(0, route.total_length) sec_id = next(iter([i - 1 for i, o in enumerate(route.section_offsets) if o >= sampled_offset]), -1) + sec_offset = sampled_offset - route.section_offsets[sec_id] + sec_len = route.section_lengths[sec_id] start, end = route.sections[sec_id] add_vecs = lambda v1, v2: (v1[0] + v2[0], v1[1] + v2[1]) sub_vecs = lambda v1, v2: (v1[0] - v2[0], v1[1] - v2[1]) + scale_vec = lambda v, f: (v[0] * f, v[1] * f) clip_spread = lambda v: np.clip(v, -sidewalk_width / 2, sidewalk_width / 2) - center = add_vecs(start, sub_vecs(end, start)) + center = add_vecs(start, scale_vec(sub_vecs(end, start), sec_offset / sec_len)) std_dev = sidewalk_width / 4 x_offsets = clip_spread(np.random.normal(center[0], std_dev, (num_samples, 1))) @@ -54,6 +76,15 @@ def sample_route( @dataclass class ZonePointsGenerator: + """ + Generates points within different zones based on their areas. + + Attributes: + zones (List[Zone]): List of Zone objects representing different zones. + zone_areas (List[float]): List of zone areas calculated based on the zone points. + _zone_probs (List[float]): List of probabilities for each zone based on their areas. + """ + zones: List[Zone] zone_areas: List[float] = field(init=False) _zone_probs: List[float] = field(init=False) @@ -65,6 +96,15 @@ def __post_init__(self): # info: distribute proportionally by zone area def generate(self, num_samples: int) -> Tuple[List[Vec2D], int]: + """ + Generates a specified number of samples within a randomly selected zone. + + Args: + num_samples (int): Number of samples to generate. + + Returns: + Tuple[List[Vec2D], int]: A tuple containing the generated samples and the ID of the selected zone. + """ zone_id = np.random.choice(len(self.zones), size=1, p=self._zone_probs)[0] return sample_zone(self.zones[zone_id], num_samples), zone_id @@ -89,13 +129,26 @@ def total_sidewalks_area(self) -> float: def generate(self, num_samples: int) -> Tuple[List[Vec2D], int, int]: route_id = np.random.choice(len(self.routes), size=1, p=self._zone_probs)[0] - spawn_pos, sec_id = sample_route(self.routes[route_id], num_samples, self.sidewalk_width) + spawn_pos, sec_id = sample_group_spawn_on_route(self.routes[route_id], num_samples, self.sidewalk_width) return spawn_pos, route_id, sec_id def populate_ped_routes(config: PedSpawnConfig, routes: List[GlobalRoute]) \ -> Tuple[np.ndarray, List[PedGrouping], Dict[int, GlobalRoute], List[int]]: - + """ + Populates pedestrian routes with initial states and assignments. + + Args: + config (PedSpawnConfig): The configuration for pedestrian spawning. + routes (List[GlobalRoute]): The list of global routes. + + Returns: + Tuple[np.ndarray, List[PedGrouping], Dict[int, GlobalRoute], List[int]]: A tuple containing the following: + - ped_states (np.ndarray): An array of pedestrian states. + - groups (List[PedGrouping]): A list of pedestrian groupings. + - route_assignments (Dict[int, GlobalRoute]): A dictionary mapping group indices to assigned routes. + - initial_sections (List[int]): A list of initial section indices for each group. + """ proportional_spawn_gen = RoutePointsGenerator(routes, config.sidewalk_width) total_num_peds = ceil(proportional_spawn_gen.total_sidewalks_area * config.peds_per_area_m2) ped_states, groups = np.zeros((total_num_peds, 6)), [] @@ -131,7 +184,17 @@ def populate_ped_routes(config: PedSpawnConfig, routes: List[GlobalRoute]) \ def populate_crowded_zones(config: PedSpawnConfig, crowded_zones: List[Zone]) \ -> Tuple[PedState, List[PedGrouping], ZoneAssignments]: + """ + Populates crowded zones with pedestrians. + + Args: + config (PedSpawnConfig): The configuration for pedestrian spawning. + crowded_zones (List[Zone]): The list of crowded zones. + Returns: + Tuple[PedState, List[PedGrouping], ZoneAssignments]: A tuple containing the pedestrian states, + the list of pedestrian groupings, and the zone assignments. + """ proportional_spawn_gen = ZonePointsGenerator(crowded_zones) total_num_peds = ceil(sum(proportional_spawn_gen.zone_areas) * config.peds_per_area_m2) ped_states, groups = np.zeros((total_num_peds, 6)), [] @@ -169,6 +232,19 @@ def populate_simulation( tau: float, spawn_config: PedSpawnConfig, ped_routes: List[GlobalRoute], ped_crowded_zones: List[Zone] ) -> Tuple[PedestrianStates, PedestrianGroupings, List[PedestrianBehavior]]: + """ + Populates the simulation with pedestrians based on the given parameters. + + Args: + tau (float): The time step for the simulation. + spawn_config (PedSpawnConfig): The configuration for spawning pedestrians. + ped_routes (List[GlobalRoute]): The global routes for the pedestrians. + ped_crowded_zones (List[Zone]): The crowded zones for the pedestrians. + + Returns: + Tuple[PedestrianStates, PedestrianGroupings, List[PedestrianBehavior]]: A tuple containing the pedestrian states, + pedestrian groupings, and pedestrian behaviors for the simulation. + """ crowd_ped_states_np, crowd_groups, zone_assignments = \ populate_crowded_zones(spawn_config, ped_crowded_zones) diff --git a/pysocialforce/scene.py b/pysocialforce/scene.py index 3f8ba4a..89e3cdf 100644 --- a/pysocialforce/scene.py +++ b/pysocialforce/scene.py @@ -21,7 +21,7 @@ class PedState: def __init__(self, state: np.ndarray, groups: List[Group], config: SceneConfig): self.default_tau = config.tau - self.step_width = config.step_width + self.d_t = config.dt_secs self.agent_radius = config.agent_radius self.max_speed_multiplier = config.max_speed_multiplier @@ -76,14 +76,14 @@ def speeds(self): def step(self, force, groups=None): """Move peds according to forces""" # desired velocity - desired_velocity = self.vel() + self.step_width * force + desired_velocity = self.vel() + self.d_t * force desired_velocity = self.capped_velocity(desired_velocity, self.max_speeds) # stop when arrived desired_velocity[stateutils.desired_directions(self.state)[1] < 0.5] = [0, 0] # update state next_state = self.state - next_state[:, 0:2] += desired_velocity * self.step_width + next_state[:, 0:2] += desired_velocity * self.d_t next_state[:, 2:4] = desired_velocity next_groups = groups if groups is not None else self.groups self.update(next_state, next_groups) diff --git a/pysocialforce/sim_view.py b/pysocialforce/sim_view.py new file mode 100644 index 0000000..fb1ac90 --- /dev/null +++ b/pysocialforce/sim_view.py @@ -0,0 +1,158 @@ +from time import sleep +from math import sin, cos +from typing import Tuple, Union, List +from dataclasses import dataclass, field +from threading import Thread +from signal import signal, SIGINT + +import os +os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide" + +import pygame +import numpy as np + +from pysocialforce.map_config import Obstacle + +Vec2D = Tuple[float, float] +RobotPose = Tuple[Vec2D, float] +RobotAction = Tuple[float, float] +RgbColor = Tuple[int, int, int] + + +BACKGROUND_COLOR = (255, 255, 255) +BACKGROUND_COLOR_TRANSP = (255, 255, 255, 128) +OBSTACLE_COLOR = (20, 30, 20, 128) +PED_COLOR = (255, 50, 50) +PED_ACTION_COLOR = (255, 50, 50) +TEXT_COLOR = (0, 0, 0) + + +@dataclass +class VisualizableSimState: + """Representing a collection of properties to display + the simulator's state at a discrete timestep.""" + timestep: int + pedestrian_positions: np.ndarray + ped_actions: np.ndarray + + +@dataclass +class SimulationView: + width: float=1200 + height: float=800 + scaling: float=15 + ped_radius: float=0.4 + obstacles: List[Obstacle] = field(default_factory=list) + size_changed: bool = field(init=False, default=False) + is_exit_requested: bool = field(init=False, default=False) + is_abortion_requested: bool = field(init=False, default=False) + screen: pygame.surface.Surface = field(init=False) + font: pygame.font.Font = field(init=False) + + @property + def timestep_text_pos(self) -> Vec2D: + return (self.width - 100, 10) + + def __post_init__(self): + pygame.init() + pygame.font.init() + self.screen = pygame.display.set_mode( + (self.width, self.height), pygame.RESIZABLE) + pygame.display.set_caption('RobotSF Simulation') + self.font = pygame.font.SysFont('Consolas', 14) + self.surface_obstacles = self.preprocess_obstacles() + self.clear() + + def preprocess_obstacles(self) -> pygame.Surface: + obst_vertices = [o.vertices_np * self.scaling for o in self.obstacles] + min_x, max_x, min_y, max_y = 0, -np.inf, 0, -np.inf + for vertices in obst_vertices: + min_x, max_x = min(np.min(vertices[:, 0]), min_x), max(np.max(vertices[:, 0]), max_x) + min_y, max_y = min(np.min(vertices[:, 1]), min_y), max(np.max(vertices[:, 1]), max_y) + width, height = max_x - min_x, max_y - min_y + surface = pygame.Surface((width, height), pygame.SRCALPHA) + surface.fill(BACKGROUND_COLOR_TRANSP) + for vertices in obst_vertices: + pygame.draw.polygon(surface, OBSTACLE_COLOR, [(x, y) for x, y in vertices]) + return surface + + def show(self): + self.ui_events_thread = Thread(target=self._process_event_queue) + self.ui_events_thread.start() + + def handle_sigint(signum, frame): + self.is_exit_requested = True + self.is_abortion_requested = True + + signal(SIGINT, handle_sigint) + + def exit(self): + self.is_exit_requested = True + self.ui_events_thread.join() + + def _process_event_queue(self): + while not self.is_exit_requested: + for e in pygame.event.get(): + if e.type == pygame.QUIT: + self.is_exit_requested = True + self.is_abortion_requested = True + elif e.type == pygame.VIDEORESIZE: + self.size_changed = True + self.width, self.height = e.w, e.h + sleep(0.01) + + def clear(self): + self.screen.fill(BACKGROUND_COLOR) + self._augment_timestep(0) + pygame.display.update() + + def render(self, state: VisualizableSimState, fps: int=60): + sleep(1 / fps) + + # info: event handling needs to be processed + # in the main thread to access UI resources + if self.is_exit_requested: + pygame.quit() + self.ui_events_thread.join() + if self.is_abortion_requested: + exit() + if self.size_changed: + self._resize_window() + self.size_changed = False + + state, offset = self._zoom_camera(state) + self.screen.fill(BACKGROUND_COLOR) + self._draw_obstacles(offset) + self._augment_ped_actions(state.ped_actions) + self._draw_pedestrians(state.pedestrian_positions) + self._augment_timestep(state.timestep) + pygame.display.update() + + def _resize_window(self): + old_surface = self.screen + self.screen = pygame.display.set_mode( + (self.width, self.height), pygame.RESIZABLE) + self.screen.blit(old_surface, (0, 0)) + + def _zoom_camera(self, state: VisualizableSimState) \ + -> Tuple[VisualizableSimState, Tuple[float, float]]: + state.pedestrian_positions *= self.scaling + state.ped_actions *= self.scaling + return state, (0, 0) + + def _draw_pedestrians(self, ped_pos: np.ndarray): + for ped_x, ped_y in ped_pos: + pygame.draw.circle(self.screen, PED_COLOR, (ped_x, ped_y), self.ped_radius * self.scaling) + + def _draw_obstacles(self, offset: Tuple[float, float]): + offset = offset[0], offset[1] + self.screen.blit(self.surface_obstacles, offset) + + def _augment_ped_actions(self, ped_actions: np.ndarray): + for p1, p2 in ped_actions: + pygame.draw.line(self.screen, PED_ACTION_COLOR, p1, p2, width=3) + + def _augment_timestep(self, timestep: int): + text = f'step: {timestep}' + text_surface = self.font.render(text, False, TEXT_COLOR) + self.screen.blit(text_surface, self.timestep_text_pos) diff --git a/pysocialforce/simulator.py b/pysocialforce/simulator.py index 589bd70..571c0e2 100644 --- a/pysocialforce/simulator.py +++ b/pysocialforce/simulator.py @@ -46,15 +46,26 @@ def make_forces(sim: pysf.Simulator, config: SimulatorConfig) -> List[pysf.force class Simulator_v2: - def __init__(self, - map_definition: MapDefinition=EMPTY_MAP, - config: SimulatorConfig=SimulatorConfig(), - make_forces: Callable[[Simulator, SimulatorConfig], List[forces.Force]]=make_forces, - populate: SimPopulator=lambda s, m: \ - populate_simulation( - s.scene_config.tau, s.ped_spawn_config, - m.routes, m.crowded_zones), - on_step: Callable[[SimState], None] = lambda s: None): + def __init__( + self, + map_definition: MapDefinition=EMPTY_MAP, + config: SimulatorConfig=SimulatorConfig(), + make_forces: Callable[[Simulator, SimulatorConfig], List[forces.Force]]=make_forces, + populate: SimPopulator=lambda s, m: \ + populate_simulation( + s.scene_config.tau, s.ped_spawn_config, + m.routes, m.crowded_zones), + on_step: Callable[[SimState], None] = lambda s: None): + """ + Initializes a Simulator_v2 object. + + Args: + map_definition (MapDefinition, optional): The definition of the map. Defaults to EMPTY_MAP. + config (SimulatorConfig, optional): The configuration for the simulator. Defaults to SimulatorConfig(). + make_forces (Callable[[Simulator, SimulatorConfig], List[forces.Force]], optional): A function that creates a list of forces. Defaults to make_forces. + populate (SimPopulator, optional): A function that populates the simulation with initial states, groupings, and behaviors. Defaults to a lambda function. + on_step (Callable[[SimState], None], optional): A function that is called after each step. Defaults to a lambda function. + """ self.config = config self.on_step = on_step self.states, self.groupings, self.behaviors = populate(config, map_definition) @@ -63,37 +74,77 @@ def __init__(self, self.env = EnvState(obstacles, self.config.scene_config.resolution) self.peds = PedState( self.states.raw_states, - self.groupings.groups, + self.groupings.groups_as_lists, self.config.scene_config) self.forces = make_forces(self, config) @property def current_state(self) -> SimState: + """ + Returns the current state of the simulation. + + Returns: + SimState: The current state of the simulation. + """ return self.peds.state, self.peds.groups @property def obstacles(self): + """ + Returns the obstacles in the environment. + + Returns: + List[Line]: The obstacles in the environment. + """ return self.env.obstacles @property def raw_obstacles(self): + """ + Returns the raw obstacles in the environment. + + Returns: + List[Line]: The raw obstacles in the environment. + """ return self.env.obstacles_raw def get_obstacles(self): + """ + Returns the obstacles in the environment. + + Returns: + List[Line]: The obstacles in the environment. + """ return self.env.obstacles def get_raw_obstacles(self): + """ + Returns the raw obstacles in the environment. + + Returns: + List[Line]: The raw obstacles in the environment. + """ return self.env.obstacles_raw def _step_once(self): - """step once""" + """ + Performs a single step in the simulation. + """ forces = sum(map(lambda force: force(), self.forces)) self.peds.step(forces) for behavior in self.behaviors: behavior.step() def step(self, n=1): - """Step n time""" + """ + Performs n steps in the simulation. + + Args: + n (int, optional): The number of steps to perform. Defaults to 1. + + Returns: + Simulator_v2: The Simulator_v2 object. + """ for _ in range(n): self._step_once() self.on_step(self.current_state) diff --git a/requirements.txt b/requirements.txt index cdbf31a..a740ee1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ numba>=0.51 scipy>=1.5 matplotlib==3.6.1 Pillow==9.2.0 -tk==0.1.0 \ No newline at end of file +tk==0.1.0 +pygame==2.1.2 \ No newline at end of file diff --git a/tests/test_simulator.py b/tests/test_simulator.py new file mode 100644 index 0000000..7eecc88 --- /dev/null +++ b/tests/test_simulator.py @@ -0,0 +1,30 @@ +import pysocialforce as pysf + + +def test_can_simulate_with_empty_map_no_peds(): + simulator = pysf.Simulator_v2() + for _ in range(10): + simulator.step() + print(simulator) + + +def test_can_simulate_with_populated_map(): + obstacle01 = pysf.map_config.Obstacle( + [(10, 10), (15,10), (15, 15), (10, 15)]) + obstacle02 = pysf.map_config.Obstacle( + [(20, 10), (25,10), (25, 15), (20, 15)]) + + route01 = pysf.map_config.GlobalRoute( + [(0, 0), (10, 10), (20, 10), (30, 0)]) + crowded_zone01 = ((10, 10), (20, 10), (20, 20)) + + map_def = pysf.map_config.MapDefinition( + obstacles=[obstacle01, obstacle02], + routes=[route01], + crowded_zones=[crowded_zone01]) + + simulator = pysf.Simulator_v2(map_def) + + for _ in range(10): + simulator.step() + print(simulator.states.ped_positions)