24.09., 9:00 - 11:00: Due to updates GitLab will be unavailable for some minutes between 09:00 and 11:00.

Commit 332ef5a3 authored by Moritz Klischat's avatar Moritz Klischat

initial commit

parents
This diff is collapsed.
# SUMO - CommonRoad Interface
This interface couples the framework for motion planning of automated vehicles based on [CommonRoad_io](https://pypi.org/project/commonroad-io/) and the traffic simulator [SUMO](https://sumo.dlr.de).
# Prerequisites
The package is written in Python 3.6 and tested on Linux.
Install required packages:
```console
cd sumo-interface
pip install -r requirements.txt
sudo apt-get install ffmpeg
```
## Install SUMO
Clone a customized version of SUMO for smooth lane changes from https://gitlab.lrz.de/ge36pob/sumo and check out branch `smooth-lane-change`.
For installation we recommend building with:
```
cd sumo
mkdir build && cd build
cmake ..
make -j8
```
More options on the installation can be found here: https://sumo.dlr.de/wiki/Installing/Linux_Build .
## Configure SUMO and local environment
1. copy the file `pathConfig_DEFAULT.py` and rename it to `pathConfig.py`. Add local paths, if required.
# Integrating the SUMO-interface into CommonRoad
- All scenarios are stored in `/scenarios`. All files for each scenario must be named consistently with the same scenario name as the folder.
- the default config file with all options for the simulation can be found in `/config/default.py`.
## Convert maps
Before using the interface, a map needs to be available in CommonRoad xml and SUMO's .net.xml format. Currently only the conversion from .net.xml to commonroad format is implemented. For an example see `example_scripts/create_scenario_files.py`.
## Create .rou.xml files
.rou.xml files specify all vehicles for the simulation. An introduction can be found here: https://sumo.dlr.de/wiki/Tutorials/quick_start#Traffic_demand.
To use the interface for planning with ego vehicles, the vehicle ID in the .rou.xml file needs to start with the prefix `egoVehicle`.
A simple script to create .rou.xml files using https://sumo.dlr.de/wiki/Tools/Trip is implemented in the function
`ScenarioWrapper.recreate_route_file()`. See `example_scripts/create_scenario_files.py` for an example.
## Configure SUMO and sumo-interface
To use smooth lane changes, add
~~~~
<processing>
<time-to-teleport value="-1"/>
<lanechange-output value="output_lanechange.txt"/>
<lanechange.duration value="10"/>
<lanechange.smooth-change value="1"/>
</processing>
~~~~
to the .sumo.cfg file of the scenario (activated by default for scenarios generated by the class `ScenarioWrapper`).
Further options for the interface can be found in the sumo-interface config file `sumo-interface/config/default.py`.
## Minimal example for using the interface:
The file `example_scripts/minimal_example.py` gives an example how the interface can be integrated into an existing trajectory planner. For testing, the scenarios `cross_example_map` and `traci_tls` are recommended.
Plug in a trajectory planner and run:
```python
from sumo2cr.interface.sumo_simulation import SumoSimulation
from sumo2cr.visualization.video import create_video
from scenarios.a9.scenario_config import Conf as conf
sumo_sim = SumoSimulation()
sumo_sim.initialize(conf)
output_folder = "/path/to/output_folder"
for t in range(conf.simulation_steps):
ego_vehicles = sumo_sim.ego_vehicles
commonroad_scenario = sumo_sim.commonroad_scenario_at_time_step(sumo_sim.current_time_step)
# plan trajectories for all ego vehicles
for id, ego_vehicle in ego_vehicles.items():
current_state = ego_vehicle.current_state
ego_trajectory = # plug in trajectory planner here
ego_vehicle.set_planned_trajectory(ego_trajectory)
sumo_sim.simulate_step()
create_video(sumo_sim, conf.video_start, conf.video_end, output_folder)
```
By default, the videos are created in `\videos`.
<?xml version="1.0" encoding="UTF-8"?>
<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/sumoConfiguration.xsd">
<input>
<net-file value=""/>
<route-files value=""/>
<additional-files value=""/>
</input>
<time>
<begin value="0"/>
<step-length value="0.1" />
</time>
<report>
<verbose value="true"/>
<no-step-log value="true"/>
</report>
<processing>
<time-to-teleport value="-1"/>
<lanechange-output value="output_lanechange.txt"/>
<lanechange.duration value="15"/>
<lanechange.smooth-change value="1"/>
</processing>
</configuration>
# sumo id prefix
EGO_ID_START = 'egoVehicle'
# CommonRoad id prefixes
ID_DICT={'laneExt': 1, 'laneSpecial': 2, 'obstacleVehicle': 3, 'egoVehicle': 4}
class SumoCommonRoadConfig():
verbose = False
# simulation
dt = 0.1 # length of simulation step of the interface
delta_steps = 2 # number of sub-steps simulated in SUMO during every dt
presimulation_steps = 10 # number of time steps before simulation with ego vehicle starts
simulation_steps = 100 # number of simulated (and synchronized) time steps
with_sumo_gui = False
# plotting
video_start = 1
video_end = 100
scenario_name = '<scenario_name>'
# autoscale plot limits; plot_x1,plot_x2, plot_y1, plot_y2 only works if plot_auto is False
plot_auto = True
# axis limits of plot
plot_x1 = 450
plot_x2 = 550
plot_y1 = 65
plot_y2 = 1100
figsize_x = 15
figsize_y= 15
window_width = 150
window_height = 200
# ego vehicle
ego_veh_width = 2.0
ego_veh_length = 5.0
#ego vehicle sync parameters
lanelet_check_time_window = int(2/dt) # Time window to detect the lanelet change in seconds
protection_margin = 2.0 # The absolute margin allowed between the planner position and ego position in SUMO
consistency_window = 4 # Variable can be used to force the consistency to certain number of steps
lane_change_sync = False # Used to limit the sync mechanism only to movexy
lane_change_tol = 0.00 # tolerance for detecting start of lane change
\ No newline at end of file
# visualization parameters
basic_shape_parameters_obs = {'opacity': 1.0,
'facecolor': '#1d7eea',
'edgecolor': '#0066cc',
'zorder': 20}
basic_shape_parameters_ego = {'opacity': 1.0,
'facecolor': '#d95558',
'edgecolor': '#831d20',
'linewidth': 0.5,
'zorder': 20}
draw_params_obstacle = {'dynamic_obstacle': {
'draw_shape': True,
'draw_icon': False,
'draw_bounding_box': True,
'show_label': True,
'trajectory_steps': 25,
'zorder': 20,
'shape': basic_shape_parameters_obs,
'occupancy': {
'draw_occupancies': 1, # -1= never, 0= if prediction of vehicle is set-based, 1=always
'shape': {
'opacity': 0.25,
'facecolor': '#6f9bcb',
'edgecolor': '#48617b',
'zorder': 18,
}
}
}}
draw_params_ego = {'dynamic_obstacle': {
'draw_shape': True,
'draw_icon': False,
'draw_bounding_box': True,
'show_label': False,
'trajectory_steps': 0,
'zorder': 20,
'shape': basic_shape_parameters_ego,
'occupancy': {
'draw_occupancies': 1, # -1= never, 0= if prediction of vehicle is set-based, 1=always
'shape': {
'opacity': 0.25,
'facecolor': '#b05559',
'edgecolor': '#9e4d4e',
'zorder': 18,
}
}
}}
\ No newline at end of file
"""
Converts net.xml to Commonroad xml files and creates rou files for simulation.
"""
import matplotlib.pyplot as plt
from typing import List
from commonroad.visualization.draw_dispatch_cr import draw_object
from sumo2cr.maps.sumo_scenario import ScenarioWrapper
from commonroad.common.util import Interval
from commonroad.scenario.trajectory import State
import os
__author__ = "Moritz Klischat"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
__version__ = "1.0.0"
__maintainer__ = "Moritz Klischat"
__email__ = "commonroad-i06@in.tum.de"
__status__ = "Released"
scenario_folder = os.path.join(os.path.dirname(__file__),'../scenarios')
net_file = os.path.join(scenario_folder, 'a_9/a_9.net.xml')
sumo_cfg_file = os.path.join(scenario_folder, 'a9/a9.sumo.cfg')
# parameters
dt = 0.1
convert_map = True
n_vehicles_max:int = 30
veh_per_second = 50
n_ego_vehicles:int = 1
ego_ids:List[int] = []
initial_states:List[State] = []
ego_start_time:int=4
departure_time_ego = 3
departure_interval_vehicles = Interval(0,20)
if convert_map:
# generate commonroad map + rou file + cfg file
scenario = ScenarioWrapper.full_conversion_from_net(net_file, dt, n_vehicles_max, n_ego_vehicles, ego_ids, ego_start_time, departure_time_ego, departure_interval_vehicles,veh_per_second)
else:
# generate rou file
scenario = ScenarioWrapper.recreate_route_file(sumo_cfg_file, dt, n_vehicles_max, n_ego_vehicles, ego_ids, ego_start_time, departure_time_ego, departure_interval_vehicles,veh_per_second)
plt.figure(figsize=(25, 25))
draw_object(scenario.lanelet_network)
plt.autoscale()
plt.axis('equal')
plt.xlim([290,380])
plt.ylim([195,250])
plt.show()
\ No newline at end of file
"""
Minimal example, plugging in of trajectory planner required.
"""
from sumo2cr.interface.sumo_simulation import SumoSimulation
from sumo2cr.visualization.video import create_video
from scenarios.a9.scenario_config import Conf as conf
__author__ = "Moritz Klischat"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
__version__ = "1.0.0"
__maintainer__ = "Moritz Klischat"
__email__ = "commonroad-i06@in.tum.de"
__status__ = "Released"
sumo_sim = SumoSimulation()
sumo_sim.initialize(conf)
output_folder = "/path/to/output_folder"
for t in range(conf.simulation_steps):
ego_vehicles = sumo_sim.ego_vehicles
commonroad_scenario = sumo_sim.commonroad_scenario_at_time_step(sumo_sim.current_time_step)
# plan trajectories for all ego vehicles
for id, ego_vehicle in ego_vehicles.items():
current_state = ego_vehicle.current_state
ego_trajectory = # plug in a trajectory planner here
ego_vehicle.set_planned_trajectory(ego_trajectory)
sumo_sim.simulate_step()
create_video(sumo_sim, conf.video_start, conf.video_end, output_folder)
'''
Copy this file and rename to path_config.py, afterwards set paths according to your local configuration
'''
# set installation locations
SUMO_GUI_BINARY = '/usr/bin/sumo-gui'
# path to binary of adapted sumo repository (see readme)
SUMO_BINARY = '/home/user/sumo/bin/sumo'
# by default port 8873 is used, you can modify the port number
TRACI_PORT = 8873
# default files (no adaption required)
DEFAULT_CFG_FILE = 'config/.sumo.cfg'
\ No newline at end of file
commonroad-io>=2018.1
opendrive2lanelet>=1.0.1
\ No newline at end of file
import warnings
from typing import Dict, List, Union
import numpy as np
import copy
from commonroad.geometry.shape import Rectangle
from commonroad.planning.planning_problem import PlanningProblem
from commonroad.prediction.prediction import TrajectoryPrediction
from commonroad.scenario.obstacle import DynamicObstacle, ObstacleType
from commonroad.scenario.trajectory import State, Trajectory
__author__ = "Moritz Klischat"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
__version__ = "1.0.0"
__maintainer__ = "Moritz Klischat"
__email__ = "commonroad-i06@in.tum.de"
__status__ = "Released"
class EgoVehicle:
"""
Interface object for ego vehicle.
Intended use: Get current state with EgoVehicle.current_state() and set planned trajectory with
EgoVehicle.set_planned_trajectory(planned_trajectory).
"""
def __init__(self, id, initial_state:State, delta_steps: int, width:float=2.0, length:float=5.0, planning_problem:PlanningProblem=None):
self.id = id
self._width = width
self._length = length
self._initial_state = initial_state
self._state_dict: Dict[State] = dict() # collects driven states
self.shape = Rectangle(self.length,self.width,center=np.array([0,0]),orientation=0.0)
self._driven_trajectory = None # returns trajectory object of driven
self._planned_trajectories: Dict[List[State]] = {} # collects trajectories from planner for every time step
self._current_time_step = initial_state.time_step
self.delta_steps = delta_steps
if planning_problem is not None:
assert self.initial_state == planning_problem.initial_state, '<sumo_simulation/ego_vehicle> planning problem\' initial_state must equal initial_state'
self.planning_problem = planning_problem
def set_planned_trajectory(self, planned_state_list:List[State]) -> None:
"""
Set planned trajectory beginning with current time step.
:param planned_state_list:
:param initial_time_step. If None, add at current time step
:return:
"""
assert len(planned_state_list) >= self.delta_steps,\
'planned_trajectory must contain at least {} states, but contains {}. (See delta_steps in config file)'\
.format(self.delta_steps,len(planned_state_list))
assert 1 == planned_state_list[0].time_step,\
'planned_trajectory must start at next time_step ({}) but starts at time_step {}'\
.format(1, planned_state_list[0].time_step)
self._planned_trajectories[self.current_time_step] = planned_state_list
self.add_state(planned_state_list[0])
@property
def get_planned_trajectory(self) -> List[State]:
"""Get planned trajectory from current time step"""
return self._planned_trajectories[self.current_time_step]
def get_dynamic_obstacle(self,time_step: Union[int,None]=None) -> DynamicObstacle:
"""
If time step is false, add complete driven trajectory.
If time step is int: start from given step and add planned trajectory
:param time_step: initial time step of vehicle
:return: DynamicObstacle
"""
if time_step is False:
return DynamicObstacle(self.id,obstacle_type=ObstacleType.CAR,
obstacle_shape=Rectangle(self.length,self.width,center=np.array([0,0]),orientation=0.0),
initial_state=self.initial_state, prediction=self.driven_trajectory)
elif isinstance(time_step,int):
if time_step in self._state_dict:
if time_step in self._planned_trajectories:
prediction = TrajectoryPrediction(Trajectory(self._planned_trajectories[time_step][0].time_step,
self._planned_trajectories[time_step]),
self.shape)
else:
prediction = None
return DynamicObstacle(self.id, obstacle_type=ObstacleType.CAR,
obstacle_shape=Rectangle(self.length, self.width, center=np.array([0, 0]), orientation=0.0),
initial_state=self.get_state_at_timestep(time_step), prediction=prediction)
else:
raise ValueError('time needs to be None or integer')
def get_planned_state(self, delta_step: int=0):
planned_state:State = copy.deepcopy(self._planned_trajectories[self.current_time_step][0])
if self.delta_steps > 1:
# linear interpolation
for state in planned_state.attributes:
curr_state = getattr(self.current_state, state)
next_state = getattr(planned_state, state)
setattr(planned_state, state, curr_state + (delta_step+1)/self.delta_steps * (next_state-curr_state))
return planned_state
@property
def current_state(self) -> State:
if self.current_time_step == self.initial_state.time_step:
return self.initial_state
else:
return self._state_dict[self.current_time_step]
def get_state_at_timestep(self,time_step: int):
if time_step == self.initial_state.time_step:
return self.initial_state
else:
state = self._state_dict[time_step]
state.time_step = 0
return state
@current_state.setter
def current_state(self,current_state):
raise PermissionError('current_state cannot be set manually, use set_planned_trajectory()')
@property
def current_time_step(self):
return self._current_time_step
@current_time_step.setter
def current_time_step(self, current_time_step):
raise PermissionError('current_state cannot be set manually, use set_planned_trajectory()')
@property
def goal(self):
return self.planning_problem.goal
def add_state(self,state):
self._state_dict[self._current_time_step+1] = state
@property
def driven_trajectory(self) -> TrajectoryPrediction:
"""
Returns trajectory prediction object for driven trajectory (mainly for plotting)
:return:
"""
state_dict_tmp ={}
for t, state in self._state_dict.items():
state_dict_tmp[t] = state
state_dict_tmp[t].time_step = t
sorted_list = sorted(state_dict_tmp.keys())
state_list = [state_dict_tmp[key] for key in sorted_list]
return TrajectoryPrediction(Trajectory(self.initial_state.time_step+1, state_list),self.shape)
@driven_trajectory.setter
def driven_trajectory(self, _):
if hasattr(self, '_driven_trajectory'):
warnings.warn('driven_trajectory of vehicle cannot be changed')
return
@property
def width(self):
return self._width
@width.setter
def width(self,width):
if hasattr(self, '_width'):
warnings.warn('width of vehicle cannot be changed')
return
@property
def length(self):
return self._length
@length.setter
def length(self, length):
if hasattr(self, '_length'):
warnings.warn('length of vehicle cannot be changed')
return
@property
def initial_state(self):
return self._initial_state
@initial_state.setter
def initial_state(self, _):
if hasattr(self, '_initial_state'):
warnings.warn('initial_state of vehicle cannot be changed')
return
\ No newline at end of file
This diff is collapsed.
import os
import xml.etree.ElementTree as et
import warnings
import config.default
from config import plot_params
__author__ = "Moritz Klischat"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
__version__ = "1.0.0"
__maintainer__ = "Moritz Klischat"
__email__ = "commonroad-i06@in.tum.de"
__status__ = "Released"
def get_route_files(config_file):
"""
:param config_file: SUMO config file (.sumocfg)
:return: net-file and route-files specified in the config file
"""
if not os.path.isfile(config_file):
raise FileNotFoundError(config_file)
tree = et.parse(config_file)
file_directory = os.path.dirname(config_file)
# find route-files
all_route_files = tree.findall('*/route-files')
route_files = []
if len(all_route_files) < 1:
raise RouteError()
for item in all_route_files:
attributes = item.attrib['value'].split(',')
for route in attributes:
route_files.append(os.path.join(file_directory, route))
return route_files
def initialize_id_dicts(id_convention):
"""From id_convention, create empty nested dict structure for sumo2cr and cr2sumo dicts.
id_convention: dict with mapping of object type to start number of id"""
sumo2cr = {}
cr2sumo = {}
for k in id_convention:
sumo2cr[k] = {}
cr2sumo[k] = {}
sumo2cr['all_ids'] = {}
cr2sumo['all_ids'] = {}
return sumo2cr, cr2sumo
def generate_cr_id(type, sumo_id, ids_sumo2cr):
""" Generates a new commonroad ID without adding it to any ID dictionary."""
if type not in config.default.ID_DICT:
raise ValueError(
'{0} is not a valid type of id_convention. Only allowed: {1}'.format(type, config.default.ID_DICT.keys()))
if sumo_id in ids_sumo2cr[type]:
warnings.warn('For this sumo_id there is already a commonroad id. No cr ID is generated')
return ids_sumo2cr[type][sumo_id]
elif sumo_id in ids_sumo2cr['all_ids']:
raise ValueError(
'Two sumo objects of different types seem to have same sumo ID {0}. ID must be unique'.format(sumo_id))
# If this case happens for auto-generated nets, this code will have to be changed. Conversion of SUMO id to
# cr id would have to incorporate type.
cr_id = int(str(config.default.ID_DICT[type]) + str(len(ids_sumo2cr[type])))
return cr_id
def cr2sumo(cr_id, ids_cr2sumo):
""" gets CommonRoad ID and returns corresponding SUMO ID
:param ids_cr2sumo:
"""
if type(cr_id) == list:
print("id: " + str(cr_id) + ": " + str(ids_cr2sumo['all_ids']))
print("\n")
if cr_id is None:
return None
elif cr_id in ids_cr2sumo['all_ids']:
return ids_cr2sumo['all_ids'][cr_id]
else:
raise ValueError('Commonroad id {0} does not exist.'.format(cr_id))
def sumo2cr(sumo_id, ids_sumo2cr):
""" gets SUMO ID and returns corresponding CommonRoad ID """
if sumo_id is None:
return None
elif sumo_id in ids_sumo2cr['all_ids']:
return ids_sumo2cr['all_ids'][sumo_id]
else:
if sumo_id == "":
warnings.warn('Tried to convert id <empty string>. \
Check if your net file is complete (e. g. having internal-links,...)')
raise ValueError('Sumo id \'%s\' does not exist.' % sumo_id)
class NetError(Exception):
""" Exception raised if there is no net-file or multiple net-files """
def __init__(self, len):
self.len = len
def __str__(self):
if self.len == 0:
return repr('There is no net-file.')
else:
return repr('There are more than one net-files.')
class RouteError(Exception):
""" Exception raised if there is no route-file """
def __str__(self):
return repr('There is no route-file.')
class OutOfLanelet(Exception):
""" Exception raised if the position of the ego vehicle is outside all lanelets """
def __init__(self, position):
self.position = position
def __str__(self):
return repr('The position %s is not part of any lanelet.' % self.position)
import os, sys, warnings
import matplotlib.pyplot as plt
from commonroad.visualization.draw_dispatch_cr import draw_object
from typing import List
import pathlib
import xml.etree.ElementTree as et
from xml.dom import minidom
from sumo2cr.interface.util import NetError
from sumo2cr.maps.util import get_scenario_name_from_netfile, convert_net_to_cr, generate_rou_file
from pathConfig import DEFAULT_CFG_FILE
from commonroad.scenario.trajectory import State
from commonroad.scenario.lanelet import LaneletNetwork
from commonroad.common.util import Interval
from commonroad.common.file_reader import CommonRoadFileReader
from config.default import SumoCommonRoadConfig as conf
__author__ = "Moritz Klischat"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
__version__ = "1.0.0"
__maintainer__ = "Moritz Klischat"
__email__ = "commonroad-i06@in.tum.de"
__status__ = "Released"
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools)
else:
sys.exit("please declare environment variable 'SUMO_HOME'")
class ScenarioWrapper:
def __init__(self):
self.scenario_name: str = ''
self.net_file: str = ''
self.cr_map_file: str = ''
self.sumo_cfg_file = None
self.ego_start_time: int = 0
self.sumo_net = None
self.lanelet_network: LaneletNetwork = None
def initialize(self,scenario_name:str,sumo_cfg_file:str,cr_map_file:str,ego_start_time:int=None):
self.scenario_name = scenario_name
self.sumo_cfg_file = sumo_cfg_file
self.net_file = self._get_net_file(self.sumo_cfg_file)
self.cr_map_file = cr_map_file
self.ego_start_time = ego_start_time
self.lanelet_network = CommonRoadFileReader(self.cr_map_file).open_lanelet_network()
# self.ids_sumo2cr, self.ids_cr2sumo = self.initialize_id_dicts(params.ID_DICT)
# self.sumo_net = sumolib_net.readNet(self.net_file, withInternal=True)
@classmethod