Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
9.2.2023: Due to updates GitLab will be unavailable for some minutes between 9:00 and 11:00.
Open sidebar
tum-cps
commonroad-sumo-interface
Commits
9f568a62
Commit
9f568a62
authored
Sep 20, 2021
by
Moritz Klischat
Browse files
version 2021.5
parent
c5dcc358
Changes
11
Hide whitespace changes
Inline
Side-by-side
CHANGELOG.md
0 → 100644
View file @
9f568a62
# 2021.5
-
improved speed of the SUMO co-simulation
-
removed map conversion function that are now available in the commonroad scenario designer
setup.py
View file @
9f568a62
...
...
@@ -8,7 +8,7 @@ with open(path.join(this_directory, 'README.md'), 'r', encoding='utf-8') as f:
setup
(
name
=
'sumocr'
,
version
=
'2021.
4
'
,
version
=
'2021.
5
'
,
description
=
'Python tool to interface with the SUMO traffic simulator'
,
keywords
=
'autonomous automated vehicles driving motion planning'
,
url
=
'https://commonroad.in.tum.de/sumo-interface'
,
...
...
sumocr/__init__.py
View file @
9f568a62
import
os
import
os
import
sys
# make sure $SUMO_HOME is in system pat
...
...
@@ -10,4 +10,4 @@ if 'SUMO_HOME' in os.environ:
else
:
sumo_installed
=
False
DOCKER_REGISTRY
=
"gitlab.lrz.de:5005/
cps/
sumo-interface/sumo_docker"
DOCKER_REGISTRY
=
"gitlab.lrz.de:5005/
tum-cps/commonroad-
sumo-interface/sumo_docker"
sumocr/docs/source/conf.py
View file @
9f568a62
...
...
@@ -36,7 +36,7 @@ copyright = '2021, Moritz Klischat'
author
=
'Moritz Klischat'
# The full version, including alpha/beta/rc tags
release
=
'
2021.
4'
release
=
"
2021.
5"
# -- General configuration ---------------------------------------------------
...
...
@@ -69,8 +69,8 @@ master_doc = 'index'
project
=
'commonroad-sumo-interface'
copyright
=
'2021, Moritz Klischat, Peter Kocsis'
author
=
'Moritz Klischat'
version
=
'
2021.
4'
release
=
'
2021.
4'
version
=
"
2021.
5"
release
=
"
2021.
5"
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
...
...
sumocr/interface/ego_vehicle.py
View file @
9f568a62
...
...
@@ -12,7 +12,7 @@ from commonroad.scenario.trajectory import State, Trajectory
__author__
=
"Moritz Klischat"
__copyright__
=
"TUM Cyber-Physical Systems Group"
__credits__
=
[
"ZIM Projekt ZF4086007BZ8"
]
__version__
=
"2021.
4
"
__version__
=
"2021.
5
"
__maintainer__
=
"Moritz Klischat"
__email__
=
"commonroad@lists.lrz.de"
__status__
=
"Released"
...
...
@@ -37,9 +37,7 @@ class EgoVehicle:
self
.
_planned_trajectories
:
Dict
[
int
,
List
[
State
]]
=
{}
# collects trajectories from planner for every time step
self
.
_current_time_step
=
initial_state
.
time_step
self
.
delta_steps
=
delta_steps
if
planning_problem
is
not
None
:
self
.
planning_problem
=
planning_problem
self
.
planning_problem
=
planning_problem
@
property
def
pp_id
(
self
)
->
int
:
...
...
sumocr/interface/sumo_simulation.py
View file @
9f568a62
...
...
@@ -28,21 +28,14 @@ from sumocr.sumo_config.pathConfig import SUMO_BINARY, SUMO_GUI_BINARY
from
xml.etree
import
cElementTree
as
ET
try
:
import
traci
from
traci._person
import
PersonDomain
from
traci._vehicle
import
VehicleDomain
from
traci._simulation
import
SimulationDomain
from
traci._route
import
RouteDomain
import
libsumo
as
traci
except
ModuleNotFoundError
as
exp
:
PersonDomain
=
DummyClass
VehicleDomain
=
DummyClass
SimulationDomain
=
DummyClass
RouteDomain
=
DummyClass
pass
__author__
=
"Moritz Klischat, Maximilian Frühauf"
__copyright__
=
"TUM Cyber-Physical Systems Group"
__credits__
=
[
"ZIM Projekt ZF4086007BZ8"
]
__version__
=
"2021.
4
"
__version__
=
"2021.
5
"
__maintainer__
=
"Moritz Klischat"
__email__
=
"commonroad@lists.lrz.de"
__status__
=
"Released"
...
...
@@ -61,10 +54,10 @@ class SumoSimulation:
self
.
planning_problem_set
:
PlanningProblemSet
=
None
# {time_step: {obstacle_id: state}}
self
.
obstacle_states
:
Dict
[
int
,
Dict
[
int
,
State
]]
=
defaultdict
(
lambda
:
dict
())
self
.
simulationdomain
:
SimulationDomain
=
S
imulation
Domain
()
self
.
vehicledomain
:
VehicleDomain
=
VehicleDomain
()
self
.
persondomain
:
PersonDomain
=
PersonDomain
()
self
.
routedomain
:
RouteDomain
=
RouteDomain
()
self
.
simulationdomain
=
traci
.
s
imulation
self
.
vehicledomain
=
traci
.
vehicle
self
.
persondomain
=
traci
.
person
self
.
routedomain
=
traci
.
route
self
.
_current_time_step
=
0
self
.
ids_sumo2cr
,
self
.
ids_cr2sumo
=
initialize_id_dicts
(
ID_DICT
)
self
.
_max_lanelet_network_id
=
0
# keep track of all IDs in CR lanelet_network
...
...
@@ -94,7 +87,7 @@ class SumoSimulation:
return
self
.
_scenarios
@
scenarios
.
setter
def
scenarios
(
self
,
scenarios
:
Abstract
ScenarioWrapper
):
def
scenarios
(
self
,
scenarios
:
ScenarioWrapper
):
def
max_lanelet_network_id
(
lanelet_network
:
LaneletNetwork
)
->
int
:
max_lanelet
=
np
.
max
([
l
.
lanelet_id
for
l
in
lanelet_network
.
lanelets
])
\
if
lanelet_network
.
lanelets
else
0
...
...
@@ -120,7 +113,7 @@ class SumoSimulation:
self
.
_scenarios
=
scenarios
def
initialize
(
self
,
conf
:
DefaultConfig
,
scenario_wrapper
:
Abstract
ScenarioWrapper
,
scenario_wrapper
:
ScenarioWrapper
,
planning_problem_set
:
PlanningProblemSet
=
None
)
->
None
:
"""
Reads scenario files, starts traci simulation, initializes vehicles, conducts pre-simulation.
...
...
@@ -137,8 +130,8 @@ class SumoSimulation:
self
.
logger
=
self
.
_init_logging
()
#
assert isinstance(scenario_wrapper, AbstractScenarioWrapper), \
#
f'scenario_wrapper expected type
Abstract
ScenarioWrapper or None, but got type {type(scenario_wrapper)}'
assert
isinstance
(
scenario_wrapper
,
AbstractScenarioWrapper
),
\
f
'scenario_wrapper expected type ScenarioWrapper or None, but got type
{
type
(
scenario_wrapper
)
}
'
self
.
scenarios
=
scenario_wrapper
self
.
dt
=
self
.
conf
.
dt
self
.
dt_sumo
=
self
.
conf
.
dt
/
self
.
conf
.
delta_steps
...
...
@@ -173,10 +166,10 @@ class SumoSimulation:
random
.
seed
(
self
.
conf
.
random_seed
)
cmd
.
extend
([
'--seed'
,
str
(
self
.
conf
.
random_seed
)])
traci
.
start
(
cmd
,
label
=
self
.
traci_label
)
traci
.
start
(
cmd
)
# simulate until ego_time_start
if
self
.
planning_problem_set
is
not
None
and
len
(
self
.
_find_ego_vehicles_in_rou_file
())
==
0
:
self
.
__presimulation_silent
(
self
.
conf
.
presimulation_steps
-
1
,
fetch_obstacles
=
False
)
self
.
__presimulation_silent
(
max
(
0
,
self
.
conf
.
presimulation_steps
-
1
)
,
fetch_obstacles
=
False
)
if
len
(
self
.
ego_vehicles
)
>
0
:
self
.
logger
.
warning
(
'<SumoSimulation/init_ego_vehicles> Ego vehicles are already defined through .rou'
'file and planning problem!'
)
...
...
@@ -233,7 +226,7 @@ class SumoSimulation:
self
.
dummy_ego_simulation
=
False
# retrieve arbitrary route id for initialization (will not be used by interface)
generic_route_id
=
self
.
route
domain
.
getIDList
()[
0
]
generic_route_id
=
traci
.
route
.
getIDList
()[
0
]
width
=
self
.
conf
.
ego_veh_width
length
=
self
.
conf
.
ego_veh_length
...
...
@@ -247,7 +240,7 @@ class SumoSimulation:
sumo_id
=
EGO_ID_START
+
str
(
new_id
)
cr_id
=
self
.
_create_cr_id
(
type
=
'egoVehicle'
,
sumo_id
=
sumo_id
,
sumo_prefix
=
EGO_ID_START
)
self
.
vehicledomain
.
add
(
sumo_id
,
generic_route_id
,
typeID
=
"DEFAULT_VEHTYPE"
,
depart
=
None
,
depart
=
"now"
,
departLane
=
'first'
,
departPos
=
"base"
,
departSpeed
=
planning_problem
.
initial_state
.
velocity
,
arrivalLane
=
"current"
,
arrivalPos
=
"max"
,
...
...
@@ -259,9 +252,9 @@ class SumoSimulation:
position_tmp
=
planning_problem
.
initial_state
.
position
\
+
0.5
*
length
*
np
.
array
([
math
.
cos
(
planning_problem
.
initial_state
.
orientation
),
math
.
sin
(
planning_problem
.
initial_state
.
orientation
)])
self
.
vehicledomain
.
setLength
(
vehID
=
sumo_id
,
length
=
length
)
self
.
vehicledomain
.
setWidth
(
vehID
=
sumo_id
,
width
=
width
)
self
.
vehicledomain
.
moveToXY
(
vehID
=
sumo_id
,
edgeID
=
'dummy'
,
lane
=-
1
,
self
.
vehicledomain
.
setLength
(
sumo_id
,
length
=
length
)
self
.
vehicledomain
.
setWidth
(
sumo_id
,
width
=
width
)
self
.
vehicledomain
.
moveToXY
(
sumo_id
,
edgeID
=
'dummy'
,
lane
Index
=-
1
,
x
=
position_tmp
[
0
],
y
=
position_tmp
[
1
],
angle
=
sumo_angle
,
keepRoute
=
2
)
...
...
@@ -281,18 +274,11 @@ class SumoSimulation:
:param ego_vehicle: the ego vehicle to be added.
"""
if
ego_vehicle
.
id
in
self
.
_ego_vehicles
:
self
.
logger
.
error
(
self
.
logger
.
debug
(
f
'Ego vehicle with id
{
ego_vehicle
.
id
}
already exists!'
,
exc_info
=
True
)
self
.
_ego_vehicles
[
ego_vehicle
.
id
]
=
ego_vehicle
@
property
def
traci_label
(
self
):
"""Unique label to identify simulation"""
if
self
.
_traci_label
is
None
:
self
.
_traci_label
=
self
.
conf
.
scenario_name
+
time
.
strftime
(
"%H%M%S"
)
+
str
(
random
.
randint
(
0
,
100
))
return
self
.
_traci_label
else
:
self
.
_ego_vehicles
[
ego_vehicle
.
id
]
=
ego_vehicle
@
property
def
ego_vehicles
(
self
)
->
Dict
[
int
,
EgoVehicle
]:
...
...
@@ -441,20 +427,42 @@ class SumoSimulation:
self
.
_fetch_sumo_vehicles
(
self
.
current_time_step
,
self
.
vehicledomain
.
getIDList
())
self
.
_fetch_sumo_pedestrians
(
self
.
current_time_step
,
self
.
persondomain
.
getIDList
())
def
_subscribe_new
(
self
,
new_ids
:
List
[
str
],
domain
=
traci
.
vehicle
):
"""
Subscribe to values of new vehicle/pedestrian
:param new_ids: list of new SUMO IDs
:param domain: domain from which should be subscribed
:return:
"""
vehicle_ids
=
self
.
sort_ego_first
(
new_ids
)
for
veh_id
in
vehicle_ids
:
domain
.
subscribe
(
veh_id
,
traci_subscription_values
)
def
_get_subscription_results
(
self
,
domain
=
traci
.
vehicle
)
->
Dict
[
str
,
Dict
[
int
,
any
]]:
"""
Return subscription results from current time step
:param domain:
:return: dict with results with structure {sumo_id: {value_id: value}}
"""
return
domain
.
getAllSubscriptionResults
()
def
_fetch_sumo_vehicles
(
self
,
time_step
:
int
,
new_ids
:
List
[
str
]):
"""
Gets and stores all vehicle states from SUMO. Initializes ego vehicles when they enter simulation.
"""
vehicle_ids
=
self
.
sort_ego_first
(
self
.
vehicledomain
.
getIDList
())
if
len
(
vehicle_ids
)
==
0
:
self
.
_subscribe_new
(
new_ids
)
vehicle_states
=
self
.
_get_subscription_results
()
if
len
(
vehicle_states
)
==
0
:
return
if
not
self
.
dummy_ego_simulation
:
self
.
check_ego_collisions
(
raise_error
=
True
)
for
veh_id
in
vehicle_ids
:
state
=
self
.
_get_current_state_from_sumo
(
self
.
vehicledomain
,
str
(
veh_id
),
SUMO_VEHICLE_PREFIX
)
for
veh_id
,
vehicle_state
in
vehicle_states
.
items
():
state
=
self
.
_extract_current_state
(
self
.
vehicledomain
,
veh_id
,
vehicle_state
,
SUMO_VEHICLE_PREFIX
)
if
state
is
None
:
continue
...
...
@@ -470,6 +478,8 @@ class SumoSimulation:
if
self
.
planning_problem_set
is
not
None
:
planning_problem
=
list
(
self
.
planning_problem_set
.
planning_problem_dict
.
values
())[
0
]
elif
self
.
scenarios
.
planning_problem_set
is
not
None
:
planning_problem
=
list
(
self
.
scenarios
.
planning_problem_set
.
planning_problem_dict
.
values
())[
0
]
else
:
planning_problem
=
None
...
...
@@ -492,7 +502,7 @@ class SumoSimulation:
# read signal state
if
not
self
.
_silent
:
signal_states
=
get_signal_state
(
self
.
vehicle
domain
.
getSignals
(
veh_id
)
,
self
.
current_time_step
)
signal_states
=
get_signal_state
(
vehicle
_state
[
traci
.
constants
.
VAR_SIGNALS
]
,
self
.
current_time_step
)
key
=
self
.
ids_sumo2cr
[
'obstacleVehicle'
][
veh_id
]
\
if
veh_id
in
self
.
ids_sumo2cr
[
'obstacleVehicle'
]
else
self
.
ids_sumo2cr
[
EGO_ID_START
][
veh_id
]
self
.
signal_states
[
key
].
append
(
signal_states
)
...
...
@@ -516,14 +526,15 @@ class SumoSimulation:
Gets and stores all vehicle states from SUMO. Initializes ego vehicles when they enter simulation.
"""
person_ids
=
self
.
persondomain
.
getIDList
()
self
.
_subscribe_new
(
new_ids
,
domain
=
traci
.
person
)
person_states
=
self
.
_get_subscription_results
(
domain
=
traci
.
person
)
if
not
person_
id
s
:
if
not
person_
state
s
:
return
obstacle_type
=
"obstaclePedestrian"
for
ped_id
in
person_ids
:
state
=
self
.
_
g
et_current_state
_from_sumo
(
self
.
persondomain
,
str
(
ped_id
)
,
SUMO_PEDESTRIAN_PREFIX
)
for
ped_id
,
ped_state
in
person_states
.
items
()
:
state
=
self
.
_e
xtrac
t_current_state
(
self
.
persondomain
,
ped_id
,
ped_state
,
SUMO_PEDESTRIAN_PREFIX
)
if
state
is
None
:
continue
if
ped_id
in
new_ids
:
...
...
@@ -539,7 +550,7 @@ class SumoSimulation:
# get obstacle vehicle state
self
.
obstacle_states
[
time_step
][
self
.
ids_sumo2cr
[
obstacle_type
][
ped_id
]]
=
state
def
_get_veh_shape
(
self
,
vehicledomain
:
VehicleDomain
,
sumo_id
):
def
_get_veh_shape
(
self
,
vehicledomain
,
sumo_id
):
return
Rectangle
(
vehicledomain
.
getLength
(
sumo_id
),
vehicledomain
.
getWidth
(
sumo_id
))
def
check_ego_collisions
(
self
,
raise_error
=
True
):
...
...
@@ -597,8 +608,8 @@ class SumoSimulation:
finally
:
return
Rectangle
(
length
,
width
)
def
_
g
et_current_state
_from_sumo
(
self
,
domain
,
veh_id
:
str
,
sumo_prefix
:
str
)
->
Union
[
None
,
State
]:
def
_e
xtrac
t_current_state
(
self
,
domain
,
veh_id
:
str
,
vehicle_state
:
Dict
[
int
,
any
],
sumo_prefix
:
str
)
->
Union
[
None
,
State
]:
"""
Gets the current state from sumo.
:param domain
...
...
@@ -608,8 +619,8 @@ class SumoSimulation:
:return: the state of the given vehicle or None, if not in field of view
"""
unique_id
=
sumo_prefix
+
veh_id
position
=
np
.
array
(
domain
.
getPosition
(
veh_id
)
)
velocity
=
domain
.
getSpeed
(
veh_id
)
position
=
np
.
array
(
vehicle_state
[
traci
.
constants
.
VAR_POSITION
]
)
velocity
=
math
.
sqrt
(
vehicle_state
[
traci
.
constants
.
VAR_SPEED
]
**
2
+
vehicle_state
[
traci
.
constants
.
VAR_SPEED_LAT
]
**
2
)
cr_id
=
sumo2cr
(
veh_id
,
self
.
ids_sumo2cr
)
if
cr_id
and
cr_id
in
self
.
obstacle_shapes
:
...
...
@@ -624,7 +635,7 @@ class SumoSimulation:
delta_pos
=
position
-
self
.
cached_position
[
unique_id
]
orientation
=
math
.
atan2
(
delta_pos
[
1
],
delta_pos
[
0
])
else
:
orientation
=
math
.
radians
(
-
domain
.
getAngle
(
veh_id
)
+
90
)
orientation
=
math
.
radians
(
-
vehicle_state
[
traci
.
constants
.
VAR_ANGLE
]
+
90
.0
)
self
.
cached_position
[
unique_id
]
=
position
position
-=
0.5
*
length
*
np
.
array
([
math
.
cos
(
orientation
),
math
.
sin
(
orientation
)])
...
...
@@ -633,10 +644,7 @@ class SumoSimulation:
if
in_fov
is
False
:
return
None
try
:
acceleration
=
domain
.
getAcceleration
(
veh_id
)
except
AttributeError
:
acceleration
=
0
acceleration
=
vehicle_state
[
traci
.
constants
.
VAR_ACCELERATION
]
return
State
(
position
=
position
,
orientation
=
orientation
,
...
...
@@ -825,7 +833,7 @@ class SumoSimulation:
Generates a new unused id for SUMO
:return:
"""
id_list
=
self
.
vehicle
domain
.
getIDList
()
id_list
=
traci
.
vehicle
.
getIDList
()
new_id
=
int
(
len
(
id_list
))
i
=
0
while
i
<
1000
:
...
...
@@ -1039,7 +1047,7 @@ class SumoSimulation:
self
.
vehicledomain
.
moveToXY
(
vehID
=
id_sumo
,
edgeID
=
'dummy'
,
lane
=-
1
,
lane
Index
=-
1
,
x
=
position_sumo
[
0
],
y
=
position_sumo
[
1
],
angle
=
sumo_angle
,
...
...
sumocr/interface/util.py
View file @
9f568a62
...
...
@@ -3,6 +3,7 @@ import xml.etree.ElementTree as et
import
warnings
import
libsumo
as
traci
from
sumocr.sumo_config
import
plot_params
,
ID_DICT
from
typing
import
List
,
Tuple
,
Set
from
sumocr.sumo_config.default
import
SUMO_PEDESTRIAN_PREFIX
,
SUMO_VEHICLE_PREFIX
...
...
@@ -12,7 +13,7 @@ import enum
__author__
=
"Moritz Klischat"
__copyright__
=
"TUM Cyber-Physical Systems Group"
__credits__
=
[
"ZIM Projekt ZF4086007BZ8"
]
__version__
=
"2021.
4
"
__version__
=
"2021.
5
"
__maintainer__
=
"Moritz Klischat"
__email__
=
"commonroad@lists.lrz.de"
__status__
=
"Released"
...
...
@@ -151,7 +152,7 @@ def cr2sumo(cr_id: int, ids_cr2sumo: dict) -> int:
# raise ValueError('Commonroad id {0} does not exist.'.format(cr_id))
def
sumo2cr
(
sumo_id
:
int
,
ids_sumo2cr
:
dict
)
->
int
:
def
sumo2cr
(
sumo_id
:
str
,
ids_sumo2cr
:
dict
)
->
int
:
"""
Returns corresponding CommonRoad ID according to sumo id.
...
...
@@ -173,13 +174,15 @@ def sumo2cr(sumo_id: int, ids_sumo2cr: dict) -> int:
# raise ValueError('Sumo id \'%s\' does not exist.' % sumo_id)
class
DummyClass
:
"""Dummy class used if SUMO is not installed and traci cannot be imported"""
def
__init__
(
self
):
pass
traci_subscription_values
=
(
traci
.
constants
.
VAR_POSITION
,
traci
.
constants
.
VAR_SPEED
,
traci
.
constants
.
VAR_SPEED_LAT
,
traci
.
constants
.
VAR_ACCELERATION
,
traci
.
constants
.
VAR_ANGLE
,
traci
.
constants
.
VAR_SIGNALS
)
class
SumoSignalIndices
(
enum
.
Enum
):
class
SumoSignalIndices
(
enum
.
Int
Enum
):
"""All interpretations with their respective bit indices
ref.: https://sumo.dlr.de/docs/TraCI/Vehicle_Signalling.html"""
VEH_SIGNAL_BLINKER_RIGHT
=
0
...
...
@@ -198,6 +201,9 @@ class SumoSignalIndices(enum.Enum):
VEH_SIGNAL_EMERGENCY_YELLOW
=
13
max_signal_index
:
int
=
max
([
s
.
value
for
s
in
SumoSignalIndices
])
_defined_signals
=
{
# only the following signals are computed on every time step
SumoSignalIndices
.
VEH_SIGNAL_BLINKER_LEFT
:
"indicator_left"
,
...
...
@@ -212,15 +218,12 @@ def get_signal_state(state: int, time_step: int) -> SignalState:
Computes the CR Signal state from the sumo signals
"""
binary
=
list
(
reversed
(
bin
(
state
)[
2
:]))
max_signal_index
:
int
=
max
([
s
.
value
for
s
in
SumoSignalIndices
])
bit_string
:
List
[
bool
]
=
[
binary
[
i
]
==
"1"
if
i
<
len
(
binary
)
else
False
for
i
in
range
(
max_signal_index
+
1
)]
args
=
{
cr_name
:
bit_string
[
sumo_name
.
value
]
for
sumo_name
,
cr_name
in
_defined_signals
.
items
()}
return
SignalState
(
**
{
**
args
,
# the following are not modelled by sumo, so have to be inserted manually
**
{
"horn"
:
False
,
"hazard_warning_lights"
:
False
},
**
{
"time_step"
:
time_step
}})
...
...
sumocr/maps/scenario_wrapper.py
View file @
9f568a62
...
...
@@ -4,15 +4,48 @@ from commonroad.scenario.lanelet import LaneletNetwork
__author__
=
"Moritz Klischat"
__copyright__
=
"TUM Cyber-Physical Systems Group"
__credits__
=
[
"ZIM Projekt ZF4086007BZ8"
]
__version__
=
"2021.
4
"
__version__
=
"2021.
5
"
__maintainer__
=
"Moritz Klischat"
__email__
=
"commonroad@lists.lrz.de"
__status__
=
"Released"
from
commonroad.scenario.scenario
import
Scenario
class
AbstractScenarioWrapper
(
metaclass
=
ABCMeta
):
# these
two
attributes are abstract and need to be defined by the inheriting subclass
# these attributes are abstract and need to be defined by the inheriting subclass
sumo_cfg_file
=
""
lanelet_network
:
LaneletNetwork
=
None
planning_problem_set
=
None
_initial_scenario
=
None
@
property
def
lanelet_network
(
self
)
->
LaneletNetwork
:
return
self
.
_lanelet_network
@
lanelet_network
.
setter
def
lanelet_network
(
self
,
_
):
raise
RuntimeError
(
"lanelet_network cannot be set, set initial_scenario instead"
)
@
property
def
initial_scenario
(
self
)
->
Scenario
:
return
self
.
_initial_scenario
@
initial_scenario
.
setter
def
initial_scenario
(
self
,
initial_scenario
):
self
.
_initial_scenario
=
initial_scenario
self
.
_lanelet_network
=
initial_scenario
.
lanelet_network
def
create_minimal_scenario
(
self
)
->
Scenario
:
sc
=
Scenario
(
dt
=
self
.
initial_scenario
.
dt
,
scenario_id
=
self
.
initial_scenario
.
scenario_id
)
sc
.
lanelet_network
=
self
.
initial_scenario
.
lanelet_network
return
sc
def
create_full_meta_scenario
(
self
)
->
Scenario
:
sc
=
self
.
create_minimal_scenario
()
sc
.
author
=
self
.
initial_scenario
.
author
sc
.
tags
=
self
.
initial_scenario
.
tags
sc
.
affiliation
=
self
.
initial_scenario
.
affiliation
sc
.
source
=
self
.
initial_scenario
.
source
sc
.
location
=
self
.
initial_scenario
.
location
return
sc
sumocr/maps/sumo_scenario.py
View file @
9f568a62
import
logging
import
os
import
matplotlib.pyplot
as
plt
from
commonroad.scenario.scenario
import
Scenario
from
commonroad.visualization.draw_dispatch_cr
import
draw_object
import
pathlib
import
xml.etree.ElementTree
as
et
...
...
@@ -12,21 +9,11 @@ from commonroad.common.file_reader import CommonRoadFileReader
from
sumocr.interface.util
import
NetError
from
sumocr.sumo_config
import
DefaultConfig
from
sumocr.maps.scenario_wrapper
import
AbstractScenarioWrapper
from
sumocr.maps.util
import
get_scenario_name_from_crfile
,
get_scenario_name_from_netfile
,
generate_rou_file
try
:
from
crdesigner.conversion.sumo_map.cr2sumo.converter
import
CR2SumoMapConverter
from
crdesigner.conversion.sumo_map.config
import
SumoConfig
from
crdesigner.conversion.sumo_map.sumo2cr
import
convert_net_to_cr
cr_map_converter_installed
=
True
except
ImportError
:
cr_map_converter_installed
=
False
__author__
=
"Moritz Klischat"
__copyright__
=
"TUM Cyber-Physical Systems Group"
__credits__
=
[
"ZIM Projekt ZF4086007BZ8"
]
__version__
=
"2021.
4
"
__version__
=
"2021.
5
"
__maintainer__
=
"Moritz Klischat"
__email__
=
"commonroad@lists.lrz.de"
__status__
=
"Released"
...
...
@@ -45,37 +32,6 @@ class ScenarioWrapper(AbstractScenarioWrapper):
self
.
planning_problem_set
=
None
self
.
_route_planner
=
None
@
property
def
lanelet_network
(
self
)
->
LaneletNetwork
:
return
self
.
_lanelet_network
@
lanelet_network
.
setter
def
lanelet_network
(
self
,
_
):
raise
RuntimeError
(
"lanelet_network cannot be set, set initial_scenario instead"
)
@
property
def
initial_scenario
(
self
)
->
Scenario
:
return
self
.
_initial_scenario
@
initial_scenario
.
setter
def
initial_scenario
(
self
,
initial_scenario
):
self
.
_initial_scenario
=
initial_scenario
self
.
_lanelet_network
=
initial_scenario
.
lanelet_network
def
create_minimal_scenario
(
self
)
->
Scenario
:
sc
=
Scenario
(
dt
=
self
.
initial_scenario
.
dt
,
scenario_id
=
self
.
initial_scenario
.
scenario_id
)
sc
.
lanelet_network
=
self
.
initial_scenario
.
lanelet_network
return
sc
def
create_full_meta_scenario
(
self
)
->
Scenario
:
sc
=
self
.
create_minimal_scenario
()
sc
.
author
=
self
.
initial_scenario
.
author
sc
.
tags
=
self
.
initial_scenario
.
tags
sc
.
affiliation
=
self
.
initial_scenario
.
affiliation
sc
.
source
=
self
.
initial_scenario
.
source
sc
.
location
=
self
.
initial_scenario
.
location
return
sc
def
initialize
(
self
,
scenario_name
:
str
,
sumo_cfg_file
:
str
,
...
...
@@ -129,108 +85,6 @@ class ScenarioWrapper(AbstractScenarioWrapper):
ego_start_time
)
return
obj
@
classmethod
def
recreate_route_file
(
cls
,
sumo_cfg_file
,
conf
:
DefaultConfig
=
DefaultConfig
)
->
'ScenarioWrapper'
:
"""
Creates new .rou.xml file and returns ScenarioWrapper. Assumes .cr.xml, .net.xml and .sumo.cfg file have already been created in scenario folder.
:param sumo_cfg_file:
:param conf:
"""
sumo_scenario
=
cls
()
out_folder
=
os
.
path
.
dirname
(
sumo_cfg_file
)
net_file
=
sumo_scenario
.
_get_net_file
(
sumo_cfg_file
)
scenario_name
=
get_scenario_name_from_netfile
(
net_file
)
generate_rou_file
(
net_file
,
out_folder
,
conf
)
cr_map_file
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'../../scenarios/'
,
scenario_name
,
scenario_name
+
'.cr.xml'
)
sumo_scenario
.
initialize
(
scenario_name
,
sumo_cfg_file
,
cr_map_file
,
conf
.
ego_start_time
)
return
sumo_scenario
@
classmethod
def
init_from_cr_file
(
cls
,
cr_file
:
str
,
conf
:
DefaultConfig
=
DefaultConfig
())
->
'ScenarioWrapper'
:
"""
Convert CommonRoad xml to sumo net file and return Scenario Wrapper.
:param cr_file: path to the cr map file
:param conf: configuration file
:return:total length of all lanes, conversion_possible
"""
if
cr_map_converter_installed
is
False
:
raise
EnvironmentError
(
'This function requires map converter from the package'
'crdesigner which is yet to be released.'
)
scenario_name
=
get_scenario_name_from_crfile
(
cr_file
)
out_folder
=
os
.
path
.
join
(
conf
.
scenarios_path
,
scenario_name
)
pathlib
.
Path
(
out_folder
).
mkdir
(
parents
=
True
,
exist_ok
=
True
)
net_file
=
os
.
path
.
join
(
out_folder
,
scenario_name
+
'.net.xml'
)
# convert from cr to net
cr2sumo_converter
=
CR2SumoMapConverter
.
from_file
(
cr_file
,
SumoConfig
.
from_dict
({
attr_name
:
getattr
(
conf
,
attr_name
)
for
attr_name
in
dir
(
conf
)
if
not
attr_name
.
startswith
(
'_'
)
}))
# Write final net file
logging
.
info
(
'write map to path'
,
net_file
)
conversion_possible
=
cr2sumo_converter
.
convert_to_net_file
(
out_folder
)
assert
conversion_possible
,
"Conversion from cr file to net file failed!"
sumo_scenario
=
cls
()
sumo_scenario
.
initialize
(
scenario_name
,
cr2sumo_converter
.
sumo_cfg_file
,
cr_file
,
conf
.
ego_start_time
)
return
sumo_scenario
@
classmethod