...
 
Commits (75)
pathConfig.py
scenarios/*
## Core latex/pdflatex auxiliary files:
*.aux
*.lof
*.log
*.lot
*.fls
*.out
*.toc
*.fmt
*.fot
*.cb
*.cb2
## Intermediate documents:
*.dvi
*-converted-to.*
# these rules might exclude image files for figures etc.
*.ps
# *.eps
# *.pdf
## Generated if empty string is given at "Please type another file name for output:"
.pdf
## Bibliography auxiliary files (bibtex/biblatex/biber):
*.bbl
*.bcf
*.blg
*-blx.aux
*-blx.bib
*.brf
*.run.xml
## Build tool auxiliary files:
*.fdb_latexmk
*.synctex
*.synctex(busy)
*.synctex.gz
*.synctex.gz(busy)
*.pdfsync
## Auxiliary and intermediate files from other packages:
# algorithms
*.alg
*.loa
# achemso
acs-*.bib
# amsthm
*.thm
# beamer
*.nav
*.snm
*.vrb
# cprotect
*.cpt
# fixme
*.lox
# feynmf/feynmp
*.mf
*.mp
*.t[1-9]
*.t[1-9][0-9]
*.tfm
*.[1-9]
*.[1-9][0-9]
#(r)(e)ledmac/(r)(e)ledpar
*.end
*.?end
*.[1-9]
*.[1-9][0-9]
*.[1-9][0-9][0-9]
*.[1-9]R
*.[1-9][0-9]R
*.[1-9][0-9][0-9]R
*.eledsec[1-9]
*.eledsec[1-9]R
*.eledsec[1-9][0-9]
*.eledsec[1-9][0-9]R
*.eledsec[1-9][0-9][0-9]
*.eledsec[1-9][0-9][0-9]R
# glossaries
*.acn
*.acr
*.glg
*.glo
*.gls
*.glsdefs
# gnuplottex
*-gnuplottex-*
# gregoriotex
*.gaux
*.gtex
# hyperref
*.brf
# knitr
*-concordance.tex
# TODO Comment the next line if you want to keep your tikz graphics files
*.tikz
*-tikzDictionary
# listings
*.lol
# makeidx
*.idx
*.ilg
*.ind
*.ist
# minitoc
*.maf
*.mlf
*.mlt
*.mtc
*.mtc[0-9]
*.mtc[1-9][0-9]
# minted
_minted*
*.pyg
# morewrites
*.mw
# mylatexformat
*.fmt
# nomencl
*.nlo
# sagetex
*.sagetex.sage
*.sagetex.py
*.sagetex.scmd
# scrwfile
*.wrt
# sympy
*.sout
*.sympy
sympy-plots-for-*.tex/
# pdfcomment
*.upa
*.upb
# pythontex
*.pytxcode
pythontex-files-*/
# thmtools
*.loe
# TikZ & PGF
*.dpth
*.md5
*.auxlock
# todonotes
*.tdo
# easy-todo
*.lod
# xindy
*.xdy
# xypic precompiled matrices
*.xyc
# endfloat
*.ttt
*.fff
# Latexian
TSWLatexianTemp*
## Editors:
# WinEdt
*.bak
*.sav
# Texpad
.texpadtmp
# Kile
*.backup
# KBibTeX
*~[0-9]*
# build files doc
tools/Python/commonroad/doc/build/*
# auto folder when using emacs and auctex
/auto/*
# python
*__pycache__/
*.cache
*pytest_cache/
*.egg-info
# SUMO - CommonRoad Interface
This interface couples the framework for motion planning of automated vehicles based on [CommonRoad_io](https://pypi.org/project/commonroad-io/) and the traffic simulator [SUMO](https://sumo.dlr.de).
This package implements the interface between the framework for motion planning of automated vehicles [CommonRoad_io](https://pypi.org/project/commonroad-io/) and the traffic simulator [SUMO](https://sumo.dlr.de). The interface is presented in detail in our [paper](https://mediatum.ub.tum.de/doc/1486856/344641.pdf) [1] and a documentation of the API can be found [here]( https://commonroad.in.tum.de/static/docs/commonroad-sumo-interface/index.html).
# Prerequisites
The package is written in Python 3.6 and tested on Linux.
......@@ -17,6 +17,7 @@ And add the absolute path of `commonroad-sumo-interface` to your Python interpre
Clone a customized version of SUMO for smooth lane changes from https://github.com/octavdragoi/sumo and check out branch `smooth-lane-change`.
For installation we recommend building with:
```
sudo apt-get install cmake python g++ libxerces-c-dev libfox-1.6-dev libgdal-dev libproj-dev libgl2ps-dev swig
cd sumo
......@@ -25,6 +26,7 @@ mkdir build && cd build
cmake ..
make -j8
```
More options on the installation can be found here: https://sumo.dlr.de/wiki/Installing/Linux_Build .
## Configure SUMO and local environment
......@@ -82,5 +84,8 @@ for t in range(conf.simulation_steps):
sumo_sim.simulate_step()
sumo_sim.stop()
create_video(sumo_sim, conf.video_start, conf.video_end, output_folder)
```
[1] Moritz Klischat, Octav Dragoi, Mostafa Eissa, and Matthias Althoff, Coupling SUMO with a Motion Planning Framework for Automated Vehicles, SUMO 2019: Simulating Connected Urban Mobility
"""
Converts net.xml to Commonroad xml files and creates rou files for simulation.
"""
import matplotlib.pyplot as plt
from typing import List
from commonroad.visualization.draw_dispatch_cr import draw_object
from sumo2cr.maps.sumo_scenario import ScenarioWrapper
from commonroad.common.util import Interval
from commonroad.scenario.trajectory import State
import os
__author__ = "Moritz Klischat"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
__version__ = "1.0.0"
__maintainer__ = "Moritz Klischat"
__email__ = "commonroad-i06@in.tum.de"
__status__ = "Released"
scenario_folder = os.path.join(os.path.dirname(__file__),'../scenarios')
net_file = os.path.join(scenario_folder, 'a9/a9.net.xml')
sumo_cfg_file = os.path.join(scenario_folder, 'a9/a9.sumo.cfg')
# parameters
dt = 0.1
convert_map = True
n_vehicles_max:int = 30
veh_per_second = 50
n_ego_vehicles:int = 1
ego_ids:List[int] = []
initial_states:List[State] = []
ego_start_time:int=10
departure_time_ego = 3
departure_interval_vehicles = Interval(0,20)
if convert_map:
# generate commonroad map + rou file + cfg file
scenario = ScenarioWrapper.full_conversion_from_net(net_file, dt, n_vehicles_max, n_ego_vehicles, ego_ids, ego_start_time, departure_time_ego, departure_interval_vehicles,veh_per_second)
else:
# generate rou file
scenario = ScenarioWrapper.recreate_route_file(sumo_cfg_file, dt, n_vehicles_max, n_ego_vehicles, ego_ids, ego_start_time, departure_time_ego, departure_interval_vehicles,veh_per_second)
plt.figure(figsize=(25, 25))
draw_object(scenario.lanelet_network)
plt.autoscale()
plt.axis('equal')
plt.xlim([290,380])
plt.ylim([195,250])
"""
Converts net.xml to Commonroad xml files and creates rou files for simulation.
"""
import matplotlib.pyplot as plt
from typing import List
from commonroad.visualization.draw_dispatch_cr import draw_object
from sumo2cr.maps.sumo_scenario import ScenarioWrapper
from commonroad.common.util import Interval
from commonroad.scenario.trajectory import State
import os
__author__ = "Moritz Klischat"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
__version__ = "1.0.0"
__maintainer__ = "Moritz Klischat"
__email__ = "commonroad-i06@in.tum.de"
__status__ = "Released"
scenario_folder = os.path.join(os.path.dirname(__file__),'../scenarios')
net_file = os.path.join(scenario_folder, 'a9/a9.net.xml')
sumo_cfg_file = os.path.join(scenario_folder, 'a9/a9.sumo.cfg')
# parameters
dt = 0.1
convert_map = True
n_vehicles_max:int = 30
veh_per_second = 50
n_ego_vehicles:int = 1
ego_ids:List[int] = []
initial_states:List[State] = []
ego_start_time:int=10
departure_time_ego = 3
departure_interval_vehicles = Interval(0,20)
if convert_map:
# generate commonroad map + rou file + cfg file
scenario = ScenarioWrapper.full_conversion_from_net(net_file, dt, n_vehicles_max, n_ego_vehicles, ego_ids, ego_start_time, departure_time_ego, departure_interval_vehicles,veh_per_second)
else:
# generate rou file
scenario = ScenarioWrapper.recreate_route_file(sumo_cfg_file, dt, n_vehicles_max, n_ego_vehicles, ego_ids, ego_start_time, departure_time_ego, departure_interval_vehicles,veh_per_second)
plt.figure(figsize=(25, 25))
draw_object(scenario.lanelet_network)
plt.autoscale()
plt.axis('equal')
plt.xlim([290,380])
plt.ylim([195,250])
plt.show()
\ No newline at end of file
......@@ -30,5 +30,7 @@ for t in range(conf.simulation_steps):
sumo_sim.simulate_step()
sumo_sim.stop()
create_video(sumo_sim, conf.video_start, conf.video_end, output_folder)
'''
Copy this file and rename to path_config.py, afterwards set paths according to your local configuration
'''
# set installation locations
SUMO_GUI_BINARY = '/usr/bin/sumo-gui'
# path to binary of adapted sumo repository (see readme)
SUMO_BINARY = '/home/user/sumo/bin/sumo'
# by default port 8873 is used, you can modify the port number
TRACI_PORT = 8873
# default files (no adaption required)
'''
Copy this file and rename to path_config.py, afterwards set paths according to your local configuration
'''
# set installation locations
SUMO_GUI_BINARY = '/usr/bin/sumo-gui'
# path to binary of adapted sumo repository (see readme)
SUMO_BINARY = '/home/user/sumo/bin/sumo'
# by default port 8873 is used, you can modify the port number
TRACI_PORT = 8873
# default files (no adaption required)
DEFAULT_CFG_FILE = 'sumo_config/.sumo.cfg'
\ No newline at end of file
......@@ -3,7 +3,7 @@ from typing import Dict, List, Union
import numpy as np
import copy
from commonroad.geometry.shape import Rectangle
from commonroad.planning.planning_problem import PlanningProblem
from commonroad.planning.planning_problem import PlanningProblem, GoalRegion
from commonroad.prediction.prediction import TrajectoryPrediction
from commonroad.scenario.obstacle import DynamicObstacle, ObstacleType
from commonroad.scenario.trajectory import State, Trajectory
......@@ -40,10 +40,10 @@ class EgoVehicle:
def set_planned_trajectory(self, planned_state_list:List[State]) -> None:
"""
Set planned trajectory beginning with current time step.
:param planned_state_list:
:param initial_time_step. If None, add at current time step
:return:
Sets planned trajectory beginning with current time step.
:param planned_state_list: the planned trajectory
"""
assert len(planned_state_list) >= self.delta_steps,\
......@@ -57,15 +57,16 @@ class EgoVehicle:
@property
def get_planned_trajectory(self) -> List[State]:
"""Get planned trajectory from current time step"""
"""Gets planned trajectory according to the current time step"""
return self._planned_trajectories[self.current_time_step]
def get_dynamic_obstacle(self,time_step: Union[int,None]=None) -> DynamicObstacle:
"""
If time step is false, add complete driven trajectory.
If time step is int: start from given step and add planned trajectory
If time step is false, adds complete driven trajectory and returns the dynamic obstacles.
If time step is int: starts from given step and adds planned trajectory and returns the dynamic obstacles.
:param time_step: initial time step of vehicle
:return: DynamicObstacle
:return: DynamicObstacle object of the ego vehicle.
"""
if time_step is False:
return DynamicObstacle(self.id,obstacle_type=ObstacleType.CAR,
......@@ -86,6 +87,12 @@ class EgoVehicle:
raise ValueError('time needs to be None or integer')
def get_planned_state(self, delta_step: int=0):
"""
Returns the planned state.
:param delta_step: get plannedd state after delta steps
"""
planned_state:State = copy.deepcopy(self._planned_trajectories[self.current_time_step][0])
if self.delta_steps > 1:
# linear interpolation
......@@ -98,12 +105,20 @@ class EgoVehicle:
@property
def current_state(self) -> State:
"""
Returns the current state.
"""
if self.current_time_step == self.initial_state.time_step:
return self.initial_state
else:
return self._state_dict[self.current_time_step]
def get_state_at_timestep(self,time_step: int):
def get_state_at_timestep(self,time_step: int) -> State:
"""
Returns the state according to the given time step.
:param time_step: the state is returned according to this time step.
"""
if time_step == self.initial_state.time_step:
return self.initial_state
else:
......@@ -116,7 +131,10 @@ class EgoVehicle:
raise PermissionError('current_state cannot be set manually, use set_planned_trajectory()')
@property
def current_time_step(self):
def current_time_step(self) -> int:
"""
Returns current time step.
"""
return self._current_time_step
@current_time_step.setter
......@@ -124,17 +142,26 @@ class EgoVehicle:
raise PermissionError('current_state cannot be set manually, use set_planned_trajectory()')
@property
def goal(self):
def goal(self) -> GoalRegion:
"""
Returns the goal of the planning problem.
"""
return self.planning_problem.goal
def add_state(self,state):
def add_state(self, state: State) -> None:
"""
Adds a state to the current state dictionary.
:param state: the state to be added
"""
self._state_dict[self._current_time_step+1] = state
@property
def driven_trajectory(self) -> TrajectoryPrediction:
"""
Returns trajectory prediction object for driven trajectory (mainly for plotting)
:return:
"""
state_dict_tmp ={}
for t, state in self._state_dict.items():
......@@ -152,7 +179,10 @@ class EgoVehicle:
return
@property
def width(self):
def width(self) -> float:
"""
Returns the width of the ego vehicle.
"""
return self._width
@width.setter
......@@ -162,7 +192,10 @@ class EgoVehicle:
return
@property
def length(self):
def length(self) -> float:
"""
Returns the length of the ego vehicle.
"""
return self._length
@length.setter
......@@ -172,7 +205,10 @@ class EgoVehicle:
return
@property
def initial_state(self):
def initial_state(self) -> State:
"""
Returns the initial state of the ego vehicle.
"""
return self._initial_state
@initial_state.setter
......
This diff is collapsed.
import os
import xml.etree.ElementTree as et
import warnings
import sumo_config.default
from sumo_config import plot_params
__author__ = "Moritz Klischat"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
__version__ = "1.0.0"
__maintainer__ = "Moritz Klischat"
__email__ = "commonroad-i06@in.tum.de"
__status__ = "Released"
def get_route_files(config_file):
"""
:param config_file: SUMO sumo_config file (.sumocfg)
:return: net-file and route-files specified in the sumo_config file
"""
if not os.path.isfile(config_file):
raise FileNotFoundError(config_file)
tree = et.parse(config_file)
file_directory = os.path.dirname(config_file)
# find route-files
all_route_files = tree.findall('*/route-files')
route_files = []
if len(all_route_files) < 1:
raise RouteError()
for item in all_route_files:
attributes = item.attrib['value'].split(',')
for route in attributes:
route_files.append(os.path.join(file_directory, route))
return route_files
def initialize_id_dicts(id_convention):
"""From id_convention, create empty nested dict structure for sumo2cr and cr2sumo dicts.
id_convention: dict with mapping of object type to start number of id"""
sumo2cr = {}
cr2sumo = {}
for k in id_convention:
sumo2cr[k] = {}
cr2sumo[k] = {}
sumo2cr['all_ids'] = {}
cr2sumo['all_ids'] = {}
return sumo2cr, cr2sumo
def generate_cr_id(type, sumo_id, ids_sumo2cr):
""" Generates a new commonroad ID without adding it to any ID dictionary."""
if type not in sumo_config.default.ID_DICT:
raise ValueError(
'{0} is not a valid type of id_convention. Only allowed: {1}'.format(type, sumo_config.default.ID_DICT.keys()))
if sumo_id in ids_sumo2cr[type]:
warnings.warn('For this sumo_id there is already a commonroad id. No cr ID is generated')
return ids_sumo2cr[type][sumo_id]
elif sumo_id in ids_sumo2cr['all_ids']:
raise ValueError(
'Two sumo objects of different types seem to have same sumo ID {0}. ID must be unique'.format(sumo_id))
# If this case happens for auto-generated nets, this code will have to be changed. Conversion of SUMO id to
# cr id would have to incorporate type.
cr_id = int(str(sumo_config.default.ID_DICT[type]) + str(len(ids_sumo2cr[type])))
return cr_id
def cr2sumo(cr_id, ids_cr2sumo):
""" gets CommonRoad ID and returns corresponding SUMO ID
:param ids_cr2sumo:
"""
if type(cr_id) == list:
print("id: " + str(cr_id) + ": " + str(ids_cr2sumo['all_ids']))
print("\n")
if cr_id is None:
return None
elif cr_id in ids_cr2sumo['all_ids']:
return ids_cr2sumo['all_ids'][cr_id]
else:
raise ValueError('Commonroad id {0} does not exist.'.format(cr_id))
def sumo2cr(sumo_id, ids_sumo2cr):
""" gets SUMO ID and returns corresponding CommonRoad ID """
if sumo_id is None:
return None
elif sumo_id in ids_sumo2cr['all_ids']:
return ids_sumo2cr['all_ids'][sumo_id]
else:
if sumo_id == "":
warnings.warn('Tried to convert id <empty string>. \
Check if your net file is complete (e. g. having internal-links,...)')
raise ValueError('Sumo id \'%s\' does not exist.' % sumo_id)
class NetError(Exception):
""" Exception raised if there is no net-file or multiple net-files """
def __init__(self, len):
self.len = len
def __str__(self):
if self.len == 0:
return repr('There is no net-file.')
else:
return repr('There are more than one net-files.')
class RouteError(Exception):
""" Exception raised if there is no route-file """
def __str__(self):
return repr('There is no route-file.')
class OutOfLanelet(Exception):
""" Exception raised if the position of the ego vehicle is outside all lanelets """
def __init__(self, position):
self.position = position
def __str__(self):
return repr('The position %s is not part of any lanelet.' % self.position)
import os
import xml.etree.ElementTree as et
import warnings
import sumo_config.default
from sumo_config import plot_params
from typing import List, Tuple
__author__ = "Moritz Klischat"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
__version__ = "1.0.0"
__maintainer__ = "Moritz Klischat"
__email__ = "commonroad-i06@in.tum.de"
__status__ = "Released"
def get_route_files(config_file) -> List[str]:
"""
Returns net-file and route-files specified in the config file.
:param config_file: SUMO config file (.sumocfg)
"""
if not os.path.isfile(config_file):
raise FileNotFoundError(config_file)
tree = et.parse(config_file)
file_directory = os.path.dirname(config_file)
# find route-files
all_route_files = tree.findall('*/route-files')
route_files = []
if len(all_route_files) < 1:
raise RouteError()
for item in all_route_files:
attributes = item.attrib['value'].split(',')
for route in attributes:
route_files.append(os.path.join(file_directory, route))
return route_files
def initialize_id_dicts(id_convention: dict) -> Tuple[dict, dict]:
"""
Creates empty nested dict structure for sumo2cr and cr2sumo dicts from id_convention and returns them.
:param id_convention: dict with mapping from object type to id start number
"""
sumo2cr = {}
cr2sumo = {}
for k in id_convention:
sumo2cr[k] = {}
cr2sumo[k] = {}
sumo2cr['all_ids'] = {}
cr2sumo['all_ids'] = {}
return sumo2cr, cr2sumo
def generate_cr_id(type:str, sumo_id:int, ids_sumo2cr:dict) -> int:
"""
Generates a new commonroad ID without adding it to any ID dictionary.
:param type: one of the keys in params.id_convention; the type defines the first digit of the cr_id
:param sumo_id: id in sumo simulation
:param ids_sumo2cr: dictionary of ids in sumo2cr
"""
if type not in sumo_config.default.ID_DICT:
raise ValueError(
'{0} is not a valid type of id_convention. Only allowed: {1}'.format(type, sumo_config.default.ID_DICT.keys()))
if sumo_id in ids_sumo2cr[type]:
warnings.warn('For this sumo_id there is already a commonroad id. No cr ID is generated')
return ids_sumo2cr[type][sumo_id]
elif sumo_id in ids_sumo2cr['all_ids']:
raise ValueError(
'Two sumo objects of different types seem to have same sumo ID {0}. ID must be unique'.format(sumo_id))
# If this case happens for auto-generated nets, this code will have to be changed. Conversion of SUMO id to
# cr id would have to incorporate type.
cr_id = int(str(sumo_config.default.ID_DICT[type]) + str(len(ids_sumo2cr[type])))
return cr_id
def cr2sumo(cr_id:int, ids_cr2sumo:dict) -> int:
"""
Gets CommonRoad ID and returns corresponding SUMO ID.
:param ids_cr2sumo: dictionary of ids in cr2sumo
"""
if type(cr_id) == list:
print("id: " + str(cr_id) + ": " + str(ids_cr2sumo['all_ids']))
print("\n")
if cr_id is None:
return None
elif cr_id in ids_cr2sumo['all_ids']:
return ids_cr2sumo['all_ids'][cr_id]
else:
raise ValueError('Commonroad id {0} does not exist.'.format(cr_id))
def sumo2cr(sumo_id:int, ids_sumo2cr:dict) -> int:
"""
Returns corresponding CommonRoad ID according to sumo id.
:param sumo_id: sumo id
:param ids_sumo2cr: dictionary of ids in sumo2cr.
"""
if sumo_id is None:
return None
elif sumo_id in ids_sumo2cr['all_ids']:
return ids_sumo2cr['all_ids'][sumo_id]
else:
if sumo_id == "":
warnings.warn('Tried to convert id <empty string>. \
Check if your net file is complete (e. g. having internal-links,...)')
raise ValueError('Sumo id \'%s\' does not exist.' % sumo_id)
class NetError(Exception):
"""
Exception raised if there is no net-file or multiple net-files.
"""
def __init__(self, len):
self.len = len
def __str__(self):
if self.len == 0:
return repr('There is no net-file.')
else:
return repr('There are more than one net-files.')
class RouteError(Exception):
"""
Exception raised if there is no route-file.
"""
def __str__(self):
return repr('There is no route-file.')
class OutOfLanelet(Exception):
"""
Exception raised if the position of the ego vehicle is outside all lanelets.
"""
def __init__(self, position):
self.position = position
def __str__(self):
return repr('The position %s is not part of any lanelet.' % self.position)
......@@ -40,7 +40,16 @@ class ScenarioWrapper:
self.sumo_net = None
self.lanelet_network: LaneletNetwork = None
def initialize(self,scenario_name:str,sumo_cfg_file:str,cr_map_file:str,ego_start_time:int=None):
def initialize(self,scenario_name:str, sumo_cfg_file:str, cr_map_file:str, ego_start_time: int = None) -> None:
"""
Initializes the ScenarioWrapper.
:param scenario_name: the name of the scenario
:param sumo_cfg_file: the .sumocfg file
:param cr_map_file: the commonroad map file
:param ego_start_time: the start time of the ego vehicle
"""
self.scenario_name = scenario_name
self.sumo_cfg_file = sumo_cfg_file
self.net_file = self._get_net_file(self.sumo_cfg_file)
......@@ -51,7 +60,15 @@ class ScenarioWrapper:
# self.sumo_net = sumolib_net.readNet(self.net_file, withInternal=True)
@classmethod
def init_from_scenario(cls, scenario_name, ego_start_time=None) -> 'ScenarioWrapper':
def init_from_scenario(cls, scenario_name: str, ego_start_time:int =None) -> 'ScenarioWrapper':
"""
Initializes the ScenarioWrapper according to the given scenario_name/ego_start_time and returns the ScenarioWrapper.
:param cls:
:param scenario_name: name of the scenario usef for the initialization.
:param ego_start_time: the start time of the ego vehicle.
"""
obj = cls()
sumo_cfg_file = os.path.join(os.path.dirname(__file__), '../../scenarios/',scenario_name, scenario_name + '.sumo.cfg')
cr_map_file = os.path.join(os.path.dirname(__file__), '../../scenarios/',scenario_name, scenario_name + '.cr.xml')
......@@ -67,14 +84,15 @@ class ScenarioWrapper:
def recreate_route_file(cls, sumo_cfg_file, dt:float, n_vehicles_max:int, n_ego_vehicles:int, ego_ids:List[int]=[], ego_start_time:int=0,
departure_time_ego:int=0, departure_interval_vehicles:Interval=Interval(0,conf.simulation_steps),veh_per_second=None) -> 'ScenarioWrapper':
"""
Creates new .rou.xml file. Assumes .cr.xml, .net.xml and .sumo.cfg file have already been created in scenario folder.
Creates new .rou.xml file and returns ScenarioWrapper. Assumes .cr.xml, .net.xml and .sumo.cfg file have already been created in scenario folder.
:param sumo_cfg_file:
:param n_vehicles_max:
:param n_ego_vehicles:
:param ego_ids:
:param ego_start_time:
:param departure_interval_vehicles:
:return:
"""
sumo_scenario = cls()
out_folder = os.path.dirname(sumo_cfg_file)
......@@ -91,14 +109,19 @@ class ScenarioWrapper:
ego_start_time:int=0, departure_time_ego:int=0,
departure_interval_vehicles:Interval=Interval(0,conf.simulation_steps), veh_per_second=None,scenario_folder:str=None) -> 'ScenarioWrapper':
"""
Convert net file to CommonRoad xml and generate specify ego vehicle either by using generated vehicles and/or by initial states.
Convert net file to CommonRoad xml and generate specific ego vehicle either by using generated vehicles and/or by initial states.
:param net_file: path of .net.xml file
:param dt: length of time step
:param n_vehicles_max: total number of ego vehicles that are generated
:param ego_id: if specified, vehicle with given id from .rou file is marked as ego_vehicle
:param initial_state: List of initial states for ego vehicle
:param scenario_folder: folder that contains all scenario folders. If not given, use folder from net_file
:return:
:param n_vehicles_max: max number of vehicles
:param n_ego_vehicles: total number of ego vehicles that are generated
:param ego_ids: if specified, vehicle with given id from .rou file is marked as ego_vehicle
:param ego_start_time: the starting time of ego vehicles
:param departure_time_ego: interval of departure times for ego vehicles
:param departure_interval_vehicles: interval of departure times for vehicles
:param veh_per_second: number of vehicle departures per second
:param scenario_folder: folder that contains all scenario folders. If not given, use commonroad-sumo-interface/scenarios/scenario_name
"""
assert len(ego_ids) <= n_ego_vehicles, "total number of given ego_vehicles must be <= n_ego_vehicles, but {}not<={}"\
.format(len(ego_ids),n_ego_vehicles)
......@@ -120,7 +143,16 @@ class ScenarioWrapper:
sumo_scenario.initialize(scenario_name,sumo_cfg_file,cr_map_file,ego_start_time)
return sumo_scenario
def generate_cfg_file(self, scenario_name, output_folder:str):
def generate_cfg_file(self, scenario_name:str, output_folder:str) -> str:
"""
Generates the configuration file according to the scenario name to the specified output folder.
:param scenario_name: name of the scenario used for the cfg file generation.
:param output_folder: the generated cfg file will be saved here
:return: the path of the generated cfg file.
"""
sumo_cfg_file = os.path.join(output_folder, scenario_name + '.sumo.cfg')
tree = et.parse(os.path.join(os.path.dirname(__file__), '../../',DEFAULT_CFG_FILE),)
tree.findall('*/net-file')[0].attrib['value'] = scenario_name + '.net.xml'
......@@ -138,10 +170,13 @@ class ScenarioWrapper:
return sumo_cfg_file
def _get_net_file(self, sumo_cfg_file):
def _get_net_file(self, sumo_cfg_file: str) -> str:
"""
:param sumo_cfg_file: SUMO sumo_config file (.sumocfg)
:return: net-file specified in the sumo_config file
Gets the net file configured in the cfg file.
:param sumo_cfg_file: SUMO config file (.sumocfg)
:return: net-file specified in the config file
"""
if not os.path.isfile(sumo_cfg_file):
raise ValueError("File not found: {}. Maybe scenario name is incorrect.".format(sumo_cfg_file))
......@@ -153,7 +188,16 @@ class ScenarioWrapper:
raise NetError(len(all_net_files))
return os.path.join(file_directory, all_net_files[0].attrib['value'])
def print_lanelet_net(self, with_lane_id=True, with_succ_pred=False, with_adj=False, with_speed=False):
def print_lanelet_net(self, with_lane_id=True, with_succ_pred=False, with_adj=False, with_speed=False) -> None:
"""
Prints the lanelet net.
:param with_lane_id: if true, shows the lane id.
:param with_succ_pred: if true, shows the successors and precessors.
:param with_adj: if true, show the adjacent lanelets.
:param with_speed: if true, shows the speed limit of the lanelt.
"""
plt.figure(figsize=(25, 25))
plt.gca().set_aspect('equal')
draw_object(self.lanelet_network)
......
......@@ -28,9 +28,11 @@ __status__ = "Released"
def convert_net_to_cr(net_file:str, out_folder:str=None,verbose=False) -> str:
"""
Converts .net file to CommonRoad xml using netconvert and OpenDRIVE 2 Lanelet Converter.
:param net_file: path of .net.xml file
:param out_folder: path of output folder for CommonRoad scenario.
:return:
:return: commonroad map file
"""
assert isinstance(net_file,str)
......@@ -65,14 +67,15 @@ def convert_net_to_cr(net_file:str, out_folder:str=None,verbose=False) -> str:
return cr_map_file
def find_ego_ids_by_departure_time(rou_file, n_ego_vehicles, departure_time_ego, ego_ids):
def find_ego_ids_by_departure_time(rou_file: str, n_ego_vehicles: int, departure_time_ego: int, ego_ids: list) -> list:
"""
Get ids of vehicles from route file which match desired departure time as close as possible.
Returns ids of vehicles from route file which match desired departure time as close as possible.
:param rou_file: path of route file
:param n_ego_vehicles: number of ego vehicles
:param departure_time_ego: desired departure time ego vehicle
:param: ego_ids: if desired ids of ego_vehicle known, specify here
:return:
:param ego_ids: if desired ids of ego_vehicle known, specify here
"""
vehicles = sumolib.output.parse_fast(rou_file, 'vehicle', ['id', 'depart'])
dep_times = []
......@@ -109,20 +112,29 @@ def find_ego_ids_by_departure_time(rou_file, n_ego_vehicles, departure_time_ego,
def get_scenario_name_from_netfile(filepath:str) -> str:
"""
Returns the scenario name specified in the net file.
:param filepath: the path of the net file
"""
scenario_name:str = (os.path.splitext(os.path.basename(filepath))[0]).split('.')[0]
return scenario_name
def generate_rou_file(net_file:str, dt:float, n_vehicles_max:int, departure_time: Interval, n_ego_vehicles:int, departure_time_ego:int, ego_ids:List[int]=None, veh_per_second:float=None, out_folder:str=None) -> str:
"""
Create route & trips files using randomTrips generator.
Creates route & trips files using randomTrips generator.
:param net_file: path of .net.xml file
:param dt: length of the time step
:param n_vehicles_max: max. number of vehicles in route file
:param departure_time: Interval of departure times for vehicles
:param n_ego_vehicles: number of ego vehicles
:param departure_time_ego: desired departure time ego vehicle
:param: ego_ids: if desired ids of ego_vehicle known, specify here
:param ego_ids: if desired ids of ego_vehicle known, specify here
:param veh_per_second: number of vehicle departures per second
:param out_folder: output folder of route file (same as net_file if None)
:return: path of route file
"""
if out_folder is None:
......@@ -158,7 +170,14 @@ def generate_rou_file(net_file:str, dt:float, n_vehicles_max:int, departure_time
return rou_file
def write_ego_ids_to_rou_file(rou_file:str, ego_ids:List[int]):
def write_ego_ids_to_rou_file(rou_file:str, ego_ids:List[int]) -> None:
"""
Writes ids of ego vehicles to the route file.
:param rou_file: the route file
:param ego_ids: a list of ego vehicle ids
"""
tree = et.parse(rou_file)
vehicles = tree.findall('vehicle')
ego_str = {}
......
"""
Minimal example, plugging in of trajectory planner required.
"""
from sumo2cr.interface.sumo_simulation import SumoSimulation
from sumo2cr.visualization.video import create_video
from scenarios.a9.scenario_config import Conf as conf
from copy import deepcopy
sumo_sim = SumoSimulation()
sumo_sim.initialize(conf)
output_folder = "/home/klischat/GIT_REPOS/sumo-interface/sumo2cr/tests"
for t in range(conf.simulation_steps):
print('time step :' +str(t))
ego_vehicles = sumo_sim.ego_vehicles
commonroad_scenario = sumo_sim.commonroad_scenario_at_time_step(sumo_sim.current_time_step)
print(sumo_sim.current_time_step)
# plan trajectories for all ego vehicles
for id, ego_vehicle in ego_vehicles.items():
current_state = ego_vehicle.current_state
# own implementation for testing - ego vehicle just stays in the initial position
next_state = deepcopy(current_state)
next_state.time_step = 1
ego_vehicle.set_planned_trajectory([next_state])
sumo_sim.simulate_step()
sumo_sim.stop()
create_video(sumo_sim, conf.video_start, conf.video_end, output_folder)
......@@ -15,15 +15,15 @@ __maintainer__ = "Moritz Klischat"
__email__ = "commonroad-i06@in.tum.de"
__status__ = "Released"
def create_video(sumo_sim: SumoSimulation, video_start:int, video_end:int, out_folder:str):
def create_video(sumo_sim: SumoSimulation, video_start:int, video_end:int, out_folder:str) -> None:
"""
Create video from simulated scenario.
Creates a video from the simulated scenario.
:param sumo_sim: the sumo simulation instance
:param video_start: initial time step of the video
:param video_stop: final step of the video
:param video_end: final step of the video
:param out_folder: folder for video output
:return:
"""
if video_start < 1 or video_end > sumo_sim.current_time_step:
warnings.warn('<create_video>: Requesting time interval [{},{}] that has not been simulated yet. Choose from interval [1,{}]'
......@@ -57,13 +57,15 @@ def create_video(sumo_sim: SumoSimulation, video_start:int, video_end:int, out_f
plt.savefig(plot_dir + '/' + str(t) + '.svg')
def draw_plot(sumo_sim: SumoSimulation, time_step:int=None, _draw_params_ego=None, _draw_params_obstacle=None):
def draw_plot(sumo_sim: SumoSimulation, time_step:int = None, _draw_params_ego = None, _draw_params_obstacle = None) -> None:
"""
create a plot of the current simulation
:param sumo_sim: instance of simulation
:param scenario_name: name of the scenario where simulation happen
:return:
:param sumo_sim: instance of simulation
:param time_step: the time step of the plot
:param _draw_params_ego: drawing parameters for ego vehicles
:param _draw_params_obstacle: drawing parameters for obstacles
"""
if time_step is None:
time_step = sumo_sim.current_time_step
......@@ -121,17 +123,18 @@ def draw_plot(sumo_sim: SumoSimulation, time_step:int=None, _draw_params_ego=Non
plt.draw()
# Print iterations progress
def _printProgressBar (iteration, total, prefix ='', suffix ='', decimals = 1, length = 100, fill ='█'):
def _printProgressBar (iteration:int, total:int, prefix:str ='', suffix:str ='', decimals:int = 1, length:int = 100, fill:str ='█') -> None:
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
:param iteration: Required : current iteration
:param total: Required : total iterations
:param prefix: Optional : prefix string
:param suffix: Optional : suffix string
:param decimals: Optional : positive number of decimals in percent complete
:param length: Optional : character length of bar
:param fill: Optional : bar fill character
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
......