ego_vehicle.py 8.93 KB
Newer Older
Moritz Klischat's avatar
Moritz Klischat committed
1
2
3
4
import warnings
from typing import Dict, List, Union
import numpy as np
import copy
5

Moritz Klischat's avatar
Moritz Klischat committed
6
from commonroad.geometry.shape import Rectangle
Moritz Klischat's avatar
Moritz Klischat committed
7
from commonroad.planning.planning_problem import PlanningProblem, GoalRegion
Moritz Klischat's avatar
Moritz Klischat committed
8
9
10
11
12
13
14
from commonroad.prediction.prediction import TrajectoryPrediction
from commonroad.scenario.obstacle import DynamicObstacle, ObstacleType
from commonroad.scenario.trajectory import State, Trajectory

__author__ = "Moritz Klischat"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
Moritz Klischat's avatar
Moritz Klischat committed
15
__version__ = "2021.5"
Moritz Klischat's avatar
Moritz Klischat committed
16
__maintainer__ = "Moritz Klischat"
Moritz Klischat's avatar
Moritz Klischat committed
17
__email__ = "commonroad@lists.lrz.de"
Moritz Klischat's avatar
Moritz Klischat committed
18
19
__status__ = "Released"

20

Moritz Klischat's avatar
Moritz Klischat committed
21
22
23
class EgoVehicle:
    """
    Interface object for ego vehicle.
Moritz Klischat's avatar
Moritz Klischat committed
24
25
    How to use: After each simulation step, get current state with EgoVehicle.current_state()
    and set planned trajectory with EgoVehicle.set_planned_trajectory(planned_trajectory).
Moritz Klischat's avatar
Moritz Klischat committed
26
    """
27
28
29

    def __init__(self, id, initial_state: State, delta_steps: int, width: float = 2.0, length: float = 5.0,
                 planning_problem: PlanningProblem = None):
Moritz Klischat's avatar
Moritz Klischat committed
30
31
32
33
34
        self.id = id
        self._width = width
        self._length = length
        self._initial_state = initial_state
        self._state_dict: Dict[State] = dict()  # collects driven states
35
        self.shape = Rectangle(self.length, self.width, center=np.array([0, 0]), orientation=0.0)
Moritz Klischat's avatar
Moritz Klischat committed
36
        self._driven_trajectory = None  # returns trajectory object of driven
Moritz Klischat's avatar
Moritz Klischat committed
37
        self._planned_trajectories: Dict[int, List[State]] = {}  # collects trajectories from planner for every time step
Moritz Klischat's avatar
Moritz Klischat committed
38
39
        self._current_time_step = initial_state.time_step
        self.delta_steps = delta_steps
Moritz Klischat's avatar
Moritz Klischat committed
40
        self.planning_problem = planning_problem
Moritz Klischat's avatar
Moritz Klischat committed
41

Moritz Klischat's avatar
Moritz Klischat committed
42
43
44
45
46
47
48
    @property
    def pp_id(self) -> int:
        """
        :returns: associated planning problem id
        """
        return self.planning_problem.planning_problem_id if self.planning_problem is not None else None

49
    def set_planned_trajectory(self, planned_state_list: List[State]) -> None:
Moritz Klischat's avatar
Moritz Klischat committed
50
        """
Moritz Klischat's avatar
Moritz Klischat committed
51
52
53
54
        Sets planned trajectory beginning with current time step.

        :param planned_state_list: the planned trajectory

Moritz Klischat's avatar
Moritz Klischat committed
55
56
        """

57
58
59
60
61
        assert len(planned_state_list) >= self.delta_steps, \
            'planned_trajectory must contain at least {} states, but contains {}. (See delta_steps in sumo_config file)' \
                .format(self.delta_steps, len(planned_state_list))
        assert 1 == planned_state_list[0].time_step, \
            'planned_trajectory must always start at time_step ({}) but starts at time_step {}' \
Moritz Klischat's avatar
Moritz Klischat committed
62
63
64
65
66
67
                .format(1, planned_state_list[0].time_step)
        self._planned_trajectories[self.current_time_step] = planned_state_list
        self.add_state(planned_state_list[0])

    @property
    def get_planned_trajectory(self) -> List[State]:
Moritz Klischat's avatar
Moritz Klischat committed
68
        """Gets planned trajectory according to the current time step"""
Moritz Klischat's avatar
Moritz Klischat committed
69
70
        return self._planned_trajectories[self.current_time_step]

71
    def get_dynamic_obstacle(self, time_step: Union[int, None] = None) -> DynamicObstacle:
Moritz Klischat's avatar
Moritz Klischat committed
72
        """
Moritz Klischat's avatar
Moritz Klischat committed
73
        If time step is None, adds complete driven trajectory and returns the dynamic obstacles.
Moritz Klischat's avatar
Moritz Klischat committed
74
75
        If time step is int: starts from given step and adds planned trajectory and returns the dynamic obstacles.

Moritz Klischat's avatar
Moritz Klischat committed
76
        :param time_step: initial time step of vehicle
Moritz Klischat's avatar
Moritz Klischat committed
77
        :return: DynamicObstacle object of the ego vehicle.
Moritz Klischat's avatar
Moritz Klischat committed
78
        """
79
80
81
82
83
84
        if time_step is None:
            return DynamicObstacle(self.id, obstacle_type=ObstacleType.CAR,
                                   obstacle_shape=Rectangle(self.length, self.width, center=np.array([0, 0]),
                                                            orientation=0.0),
                                   initial_state=self.initial_state, prediction=self.driven_trajectory)
        elif isinstance(time_step, int):
Moritz Klischat's avatar
Moritz Klischat committed
85
86
87
88
            if time_step in self._state_dict:
                if time_step in self._planned_trajectories:
                    prediction = TrajectoryPrediction(Trajectory(self._planned_trajectories[time_step][0].time_step,
                                                                 self._planned_trajectories[time_step]),
89
                                                      self.shape)
Moritz Klischat's avatar
Moritz Klischat committed
90
91
92
                else:
                    prediction = None
                return DynamicObstacle(self.id, obstacle_type=ObstacleType.CAR,
93
94
95
                                       obstacle_shape=Rectangle(self.length, self.width, center=np.array([0, 0]),
                                                                orientation=0.0),
                                       initial_state=self.get_state_at_timestep(time_step), prediction=prediction)
Moritz Klischat's avatar
Moritz Klischat committed
96
        else:
97
            raise ValueError('time needs to be type None or int')
Moritz Klischat's avatar
Moritz Klischat committed
98

99
    def get_planned_state(self, delta_step: int = 0):
Moritz Klischat's avatar
Moritz Klischat committed
100
101
102
        """
        Returns the planned state.

103
        :param delta_step: get planned state after delta steps
Moritz Klischat's avatar
Moritz Klischat committed
104
105
        
        """
Moritz Klischat's avatar
Moritz Klischat committed
106
107
108
109
        assert self.current_time_step in self._planned_trajectories,\
            f"No planned trajectory found at time step {self.current_time_step} for ego vehicle {self.id}! " \
            f"Use ego_vehicle.set_planned_trajectory() to set the trajectory."

110
        planned_state: State = copy.deepcopy(self._planned_trajectories[self.current_time_step][0])
Moritz Klischat's avatar
Moritz Klischat committed
111
112
113
114
115
        if self.delta_steps > 1:
            # linear interpolation
            for state in planned_state.attributes:
                curr_state = getattr(self.current_state, state)
                next_state = getattr(planned_state, state)
116
117
                setattr(planned_state, state,
                        curr_state + (delta_step + 1) / self.delta_steps * (next_state - curr_state))
Moritz Klischat's avatar
Moritz Klischat committed
118
119
120
121
122

        return planned_state

    @property
    def current_state(self) -> State:
Moritz Klischat's avatar
Moritz Klischat committed
123
124
125
        """
        Returns the current state.
        """
Moritz Klischat's avatar
Moritz Klischat committed
126
127
128
129
130
        if self.current_time_step == self.initial_state.time_step:
            return self.initial_state
        else:
            return self._state_dict[self.current_time_step]

131
    def get_state_at_timestep(self, time_step: int) -> State:
Moritz Klischat's avatar
Moritz Klischat committed
132
133
134
135
136
        """
        Returns the state according to the given time step.

        :param time_step: the state is returned according to this time step.
        """
Moritz Klischat's avatar
Moritz Klischat committed
137

Moritz Klischat's avatar
Moritz Klischat committed
138
139
140
141
142
143
144
145
        if time_step == self.initial_state.time_step:
            return self.initial_state
        else:
            state = self._state_dict[time_step]
            state.time_step = 0
            return state

    @current_state.setter
146
    def current_state(self, current_state):
Moritz Klischat's avatar
Moritz Klischat committed
147
148
149
        raise PermissionError('current_state cannot be set manually, use set_planned_trajectory()')

    @property
Moritz Klischat's avatar
Moritz Klischat committed
150
151
152
153
    def current_time_step(self) -> int:
        """
        Returns current time step.
        """
Moritz Klischat's avatar
Moritz Klischat committed
154
155
156
157
158
159
160
        return self._current_time_step

    @current_time_step.setter
    def current_time_step(self, current_time_step):
        raise PermissionError('current_state cannot be set manually, use set_planned_trajectory()')

    @property
Moritz Klischat's avatar
Moritz Klischat committed
161
162
163
164
    def goal(self) -> GoalRegion:
        """
        Returns the goal of the planning problem.
        """
Moritz Klischat's avatar
Moritz Klischat committed
165
166
        return self.planning_problem.goal

Moritz Klischat's avatar
Moritz Klischat committed
167
168
169
170
171
172
173
    def add_state(self, state: State) -> None:
        """
        Adds a state to the current state dictionary.

        :param state: the state to be added

        """
174
        self._state_dict[self._current_time_step + 1] = state
Moritz Klischat's avatar
Moritz Klischat committed
175
176
177
178
179

    @property
    def driven_trajectory(self) -> TrajectoryPrediction:
        """
        Returns trajectory prediction object for driven trajectory (mainly for plotting)
Moritz Klischat's avatar
Moritz Klischat committed
180

Moritz Klischat's avatar
Moritz Klischat committed
181
        """
182
        state_dict_tmp = {}
Moritz Klischat's avatar
Moritz Klischat committed
183
184
185
186
187
188
        for t, state in self._state_dict.items():
            state_dict_tmp[t] = state
            state_dict_tmp[t].time_step = t

        sorted_list = sorted(state_dict_tmp.keys())
        state_list = [state_dict_tmp[key] for key in sorted_list]
189
        return TrajectoryPrediction(Trajectory(self.initial_state.time_step + 1, state_list), self.shape)
Moritz Klischat's avatar
Moritz Klischat committed
190
191
192
193
194
195
196
197

    @driven_trajectory.setter
    def driven_trajectory(self, _):
        if hasattr(self, '_driven_trajectory'):
            warnings.warn('driven_trajectory of vehicle cannot be changed')
            return

    @property
Moritz Klischat's avatar
Moritz Klischat committed
198
199
200
201
    def width(self) -> float:
        """
        Returns the width of the ego vehicle.
        """
Moritz Klischat's avatar
Moritz Klischat committed
202
203
204
        return self._width

    @width.setter
205
    def width(self, width):
Moritz Klischat's avatar
Moritz Klischat committed
206
207
208
209
210
        if hasattr(self, '_width'):
            warnings.warn('width of vehicle cannot be changed')
            return

    @property
Moritz Klischat's avatar
Moritz Klischat committed
211
212
213
214
    def length(self) -> float:
        """
        Returns the length of the ego vehicle.
        """
Moritz Klischat's avatar
Moritz Klischat committed
215
216
217
218
219
220
221
222
223
        return self._length

    @length.setter
    def length(self, length):
        if hasattr(self, '_length'):
            warnings.warn('length of vehicle cannot be changed')
            return

    @property
Moritz Klischat's avatar
Moritz Klischat committed
224
225
226
227
    def initial_state(self) -> State:
        """
        Returns the initial state of the ego vehicle.
        """
Moritz Klischat's avatar
Moritz Klischat committed
228
229
230
231
232
233
        return self._initial_state

    @initial_state.setter
    def initial_state(self, _):
        if hasattr(self, '_initial_state'):
            warnings.warn('initial_state of vehicle cannot be changed')
234
            return