Commit 8b115ed0 authored by Edmond Irani Liu's avatar Edmond Irani Liu 🌊
Browse files

update to 2020 version

parent a3ab2c9b
Tutorials on Tree Search Algorithms
This is a short guide on how to use the search algorithms in for lectures and tutorials.
Set up
For using the code, you have to install the tools as described in the README in the root folder. For using the code in
Pycharm, create a new project in the folder `./GSMP/motion_automata`, to avoid path issues.
# Documentation
For running the search algorithms, you can currently specify three different configurations which mainly influence the
visualization of the search and are defined in ``:
### Default:
* This configuration should only be used in the jupyter notebook tutorial (`cr_uninformed_search_tutorial` and
`cr_informed_search_tutorial`). It first runs the algorithm and stores the state of
each motion primitive, i.e., explored/frontier/currently exploring etc., for each step. Then it plots these steps using
ipywidgets, so the student can step through the search in the jupyter notebook.
* The color code for plotting is taken from the AIMA tutorial, but can be changed in ``.
* By setting PLOT_LEGEND to True, a legend with all states of
motion primitives (see `MotionPrimitiveStatus` in ``) is plotted.
* To reduce the number of steps, you can turn off that motion primitives with collisions are plotted one after the other by setting PLOT_COLLISION_STEPS
to False.
### StudentScript:
This configuration uses the same parameters as the Default configuration, but it can be run in Pycharm,
e.g. when executing ``. It plots the different steps while executing the search algorithm. It should help
students to easily go through the code.
import sys
import os
path_commonroad_search = "../../"
sys.path.append(os.path.join(path_commonroad_search, "GSMP/tools/"))
sys.path.append(os.path.join(path_commonroad_search, "GSMP/tools/commonroad-collision-checker"))
sys.path.append(os.path.join(path_commonroad_search, "GSMP/tools/commonroad-road-boundary"))
sys.path.append(os.path.join(path_commonroad_search, "GSMP/motion_automata/vehicle_model"))
import time
from multiprocessing import Manager, Process
import enum
import ipywidgets as widgets
import numpy as np
from math import atan2
import matplotlib.pyplot as plt
from IPython import display
from ipywidgets import widgets
#from IPython.display import display
# import CommonRoad-io modules
from commonroad.visualization.draw_dispatch_cr import draw_object
from commonroad.common.file_reader import CommonRoadFileReader
from commonroad.scenario.trajectory import Trajectory, State
from commonroad.scenario.scenario import Scenario
from commonroad.geometry.shape import Rectangle
from commonroad.planning.planning_problem import PlanningProblem
from commonroad.scenario.obstacle import StaticObstacle, ObstacleRole, ObstacleType, DynamicObstacle
from commonroad.prediction.prediction import TrajectoryPrediction
from commonroad_cc.collision_detection.pycrcc_collision_dispatch import create_collision_object
from commonroad_cc.visualization.draw_dispatch import draw_object as draw_it
from commonroad.common.solution_writer import CommonRoadSolutionWriter, VehicleType, CostFunction
# import Motion Automata modules
from automata.MotionAutomata import MotionAutomata
from automata.MotionPrimitive import MotionPrimitive
from automata.States import FinalState
from typing import List, Dict, Tuple
# ================================== scenario ================================== #
def load_scenario(scenario_path):
# open and read in scenario and planning problem set
scenario, planning_problem_set = CommonRoadFileReader(scenario_path).open()
return scenario, planning_problem_set
# ================================== automata ================================== #
def generate_automata(veh_type_id, mp_file=None, search_tutorial=False):
1. We first create a Motion Automata object, then load pre-computed motion primitives into it.
Note that every motion primitive is a short trajectory with varied initial states that lead to different final states.
2. We create a connectivity list for every pritimive in the Motion Automata,
which contains all connectable primitives from the focused primitive.
If the velocity and steering angel of the start state of primitive B match
those of the final state of primitive A, we say B is connectable from A.
:param veh_type_id: id for the vehicle model in the motion primitives
:param mp_file: optional parameter for specifying the path to a motion primitives xml file
:param search_tutorial: boolean value to specify if the search is used for a tutorial ->
motion primitives are sorted by their y-coordinate in the final state
# the primitives vary for different vehicle models 1, 2 and 3.
assert veh_type_id in (1, 2, 3), "Input vehicle type id is not valid! Must be either 1, 2 or 3."
# step 1
automata = MotionAutomata(veh_type_id)
print("Reading motion primitives...")
prefix = '../../GSMP/motion_automata/motion_primitives/'
if mp_file is not None:
automata.readFromXML(prefix + mp_file)
if veh_type_id == 1:
prefix + 'V_0.0_20.0_Vstep_1.0_SA_-0.91_0.91_SAstep_0.23_T_0.5_Model_FORD_ESCORT.xml')
elif veh_type_id == 2:
prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.066_1.066_SAstep_0.27_T_0.5_Model_BMW320i.xml')
elif veh_type_id == 3:
prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.023_1.023_SAstep_0.26_T_0.5_Model_VW_VANAGON.xml')
except Exception:
prefix = '../../../GSMP/motion_automata/motion_primitives/'
if mp_file is not None:
automata.readFromXML(prefix + mp_file)
if veh_type_id == 1:
prefix + 'V_0.0_20.0_Vstep_1.0_SA_-0.91_0.91_SAstep_0.23_T_0.5_Model_FORD_ESCORT.xml')
elif veh_type_id == 2:
prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.066_1.066_SAstep_0.27_T_0.5_Model_BMW320i.xml')
elif veh_type_id == 3:
prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.023_1.023_SAstep_0.26_T_0.5_Model_VW_VANAGON.xml')
# additional step for search examples -> sorting motion primitives by max. y-coordinate
if search_tutorial:
# step 2
# assign vehicle type id to all primitives
print("Automata created.")
print('Number of loaded primitives: ' + str(len(automata.Primitives)))
return automata
def add_initial_state_to_automata(automata, planning_problem):
# set initial steering angle to zero
planning_problem.initial_state.steering_angle = 0.0
# the initial velocity of the planning problem may be any value, we need to obtain the closest velocity to it
# from StartStates of the primitives in order to get the feasible successors of the planning problem
velocity_closest = automata.getClosestStartVelocity(planning_problem.initial_state.velocity)
planning_problem.initial_state.velocity = velocity_closest
initial_state = planning_problem.initial_state
# turn initial state into a motion primitive to check for connectivity to subsequent motion primitives
final_state_primitive = FinalState(x=initial_state.position[0],
initial_motion_primitive = MotionPrimitive(startState=None, finalState=final_state_primitive, timeStepSize=0,
# create connectivity list for this imaginary motion primitive
return automata, initial_motion_primitive
# ================================== search for solution ================================== #
1. Searching for a possible solution could be time consuming, and also we would like to visualize the intermediate states during the search.
Thus, we create a separate Process to handle the search, and visualize the intermediate states via Main Process.
More information on python Prcess could be found at [1]( and
2. We need multiprocessing.Manager.Value() to help us share variables between different Processes.
3. To execute the search, we need to define an individual wrapper function and set it as the target of the new Process.
def execute_search(motion_planner, initial_motion_primitive, path_shared, dict_status_shared, max_tree_depth):
definition of the wrapper function for search
calls search algorithm and update the status dictionary for visualization in search algorithm
note that dict_status_shared is being shared between difference Processes
result = motion_planner.search_alg(initial_motion_primitive.Successors, max_tree_depth, dict_status_shared)
# result is in form of (final path, used_primitives)
if result is None:
# no path found
path_shared.value = result[0]
if len(result[1]) > 0:
print("Found primitives")
# for primitive in result[1]:
# print('\t', primitive)
def start_search(scenario, planning_problem, automata, motion_planner, initial_motion_primitive,
flag_plot_intermediate_results=True, flag_plot_planning_problem=True):
# create Manager object to manage shared variables between different Processes
process_manager = Manager()
# create shared variables between Processes
path_shared = process_manager.Value('path_shared', None)
dict_status_shared = process_manager.Value('dict_status_shared', None)
# initialize status with a dictionary.
# IMPORTANT NOTE: if the shared variable contains complex object types (list, dict, ...), to change its value,
# we need to do it via another object (here dict_status) and pass it to the shared variable at the end
dict_status = {'cost_current': 0, 'path_current': []}
dict_status_shared.value = dict_status
# maximum number of concatenated primitives
max_tree_depth = 100
# create and start the Process for search
process_search = Process(target=execute_search,
if flag_plot_intermediate_results:
# create a figure, plot the scenario and planning problem
fig = plt.figure(figsize=(9, 9))
if flag_plot_planning_problem:
plt.margins(0, 0)
refresh_rate_plot = 5.0
while True:
time.sleep(1 / refresh_rate_plot)
# break the loop if process_search is not alive anymore
if not process_search.is_alive():
# print current status
string_status = "Cost so far:", round(dict_status_shared.value['cost_current'], 2)
string_status += "Time step:", round(dict_status_shared.value['path_current'][-1].time_step, 2)
string_status += "Orientation:", round(dict_status_shared.value['path_current'][-1].orientation, 2)
string_status += "Velocity:", round(dict_status_shared.value['path_current'][-1].velocity, 2)
print(string_status, end='\r')
# if there is a path to be visualized
if len(dict_status_shared.value['path_current']) > 0:
if flag_plot_intermediate_results:
if flag_plot_planning_problem:
plt.margins(0, 0)
# create a Trajectory from the current states of the path
# Trajectory is essentially a list of states
trajectory = Trajectory(initial_time_step=0, state_list=dict_status_shared.value['path_current'])
# create an occupancy Prediction via the generated trajectory and shape of ego vehicle
# the Prediction includes a Trajectory and Shape of the object
prediction = TrajectoryPrediction(trajectory=trajectory, shape=automata.egoShape)
# create a collision object from prediction
# here it is used for visualization purpose
collision_object = create_collision_object(prediction)
# draw this collision object
draw_it(collision_object, draw_params={'collision': {'facecolor': 'magenta'}})
# visualize current trajectory
if flag_plot_intermediate_results is True:
# wait till process_search terminates
print("Search finished.")
if path_shared.value is not None and len(path_shared.value) > 1:
print("Solution successfully found :)")
print("Finding solution failed :(")
return path_shared.value, dict_status_shared.value
import copy
import numpy as np
import xml.etree.ElementTree as et
from automata.MotionPrimitiveParser import MotionPrimitiveParser
from automata.MotionPrimitive import MotionPrimitive
from commonroad.geometry.shape import Rectangle
class MotionAutomata:
Class to handle motion primitives for motion planning
def __init__(self, veh_type_id):
self.numPrimitives = 0
self.Primitives = []
self.veh_type_id = veh_type_id
self.egoShape = None
if veh_type_id == 1:
self.egoShape = Rectangle(length=4.298, width=1.674)
elif veh_type_id == 2:
self.egoShape = Rectangle(length=4.508, width=1.610)
elif veh_type_id == 3:
self.egoShape = Rectangle(length=4.569, width=1.844)
def readFromXML(self, filename: str) -> None:
Reads all MotionPrimitives from the given file and stores them in the primitives array.
:param filename: the name of the file to be read from
# parse XML file
xmlTree = et.parse(filename).getroot()
# add trajectories
self.numPrimitives = self.numPrimitives + len(xmlTree.findall('Trajectory'))
for t in xmlTree.findall('Trajectory'):
def sort_primitives(self):
self.Primitives.sort(key=lambda x: x.finalState.y, reverse=False)
def createConnectivityListPrimitive(self, primitive: MotionPrimitive) -> None:
Creates the successor list for a single primitive and stores them in a successor list of the given primitive.
:param primitive: the primitive to be connected
for self_primitive in self.Primitives:
if primitive.checkConnectivityToNext(self_primitive):
def createConnectivityLists(self) -> None:
Creates a connectivity list for every primitive (let every primitive has its corresponding successor list).
for self_primitive in self.Primitives:
def createMirroredPrimitives(self) -> None:
Creates the mirrored motion primitives since the file to load primitives by default only contains left curves. This function computes the right curves.
oldPrimitives = self.Primitives
self.numPrimitives = 2 * self.numPrimitives
self.Primitives = np.empty(self.numPrimitives, dtype=MotionPrimitive)
for i in range(len(oldPrimitives)):
self.Primitives[i] = oldPrimitives[i]
# create mirrored primitives for the old primitives
newPrimitive = copy.deepcopy(self.Primitives[i])
# add the new mirrored primitive to self primitive list
self.Primitives[i + len(oldPrimitives)] = newPrimitive
def getClosestStartVelocity(self, initial_velocity):
get the velocity among start states that is closest to the given initial velocity
min_error = float('inf')
min_idx = 0
for i in range(len(self.Primitives)):
primitive = self.Primitives[i]
error_velocity = abs(initial_velocity - primitive.startState.velocity)
if error_velocity < min_error:
min_error = error_velocity
min_idx = i
return self.Primitives[min_idx].startState.velocity
def setVehicleTypeIdPrimitives(self):
assign vehicle type id to all primitives
for primitive in self.Primitives:
primitive.veh_type_id = self.veh_type_id
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import os
import numpy as np
import xml.etree.ElementTree as et
from typing import List
from copy import deepcopy
from automata.States import StartState, FinalState
from commonroad.scenario.trajectory import Trajectory
from commonroad.scenario.trajectory import State
from commonroad.common.solution_writer import CommonRoadSolutionWriter, VehicleType, CostFunction
from commonroad.common.solution_writer import VehicleModel as VM
from solution_checker import *
class MotionPrimitive:
def __init__(self, startState: StartState, finalState: FinalState, timeStepSize, trajectory: Trajectory):
Initialisation of a motion primitive.
:param startState: start state of the primitive
:param finalState: final state of the primitive
:param trajectory: trajectory of the primitive
self.startState = startState
self.finalState = finalState
self.timeStepSize = timeStepSize
self.trajectory = trajectory
self.vehicle_type = VehicleType.BMW_320i
self.vehicle_model = VM.KS
# a list to store connectable successive primitives from current primitive
self.Successors = []
self.veh_type_id = 1 # default
def __str__(self):
return "Primitive: \n\t {}\t=>\n\t{}".format(str(self.startState), str(self.finalState))
def print_info(self):
kwarg = {'position' : np.array([self.startState.x, self.startState.y]),
'velocity' : self.startState.velocity,
'steering_angle': self.startState.steering_angle,
'orientation' : self.startState.orientation,
'time_step' : self.startState.time_step}
state_start = State(**kwarg)
# add final state into the trajectory
kwarg = {'position' : np.array([self.finalState.x, self.finalState.y]),
'velocity' : self.finalState.velocity,
'steering_angle': self.finalState.steering_angle,
'orientation' : self.finalState.orientation,
'time_step' : self.finalState.time_step}
state_final = State(**kwarg)
for state in self.trajectory.state_list:
def mirror(self) -> None:
Mirrors the current primitive so that it describes a primitive in the opposite direction.
self.finalState.y = - self.finalState.y
self.finalState.orientation = - self.finalState.orientation
for i in range(len(self.trajectory.state_list)):
state = self.trajectory.state_list[i]
self.trajectory.state_list[i].position[1] = - self.trajectory.state_list[i].position[1]
state.orientation = - state.orientation
self.trajectory.state_list[i] = state
def checkConnectivityToNext(self, other: 'MotionPrimitive') -> bool:
Any primitive whose first state's velocity is in the velocity range of the last state of the current primitive is considered as connected.
Returns true if the two primitives are connected.
:param other: the next motion primitive to be checked
# if (self.finalState.velocity - self.finalState.vRange >= other.startState.velocity - other.startState.vRange and
# self.finalState.velocity + self.finalState.vRange <= other.startState.velocity + other.startState.vRange):
# return True
if abs(self.finalState.velocity - other.startState.velocity) < 0.01 and \
abs(self.finalState.steering_angle - other.startState.steering_angle) < 0.01 :
return True
return False
def appendTrajectoryToState(self, state: State) -> List[State]:
Appends the primitive to the given state and returns the new list of states including the intermediate steps.
:param state: the state to which the trajectory will be appended
# deep copy to prevent manipulation of original data
trajectory = deepcopy(self.trajectory)
# rotate the trajectory by the orientation of the given state
trajectory.translate_rotate(np.zeros(2), state.orientation)
# translate the trajectory by the position of the given state
trajectory.translate_rotate(state.position, 0)
# trajectory includes all states of the primitive
# as its first state is the same as the last state of the primitive to be connected
# we pop out the first state
# we modify the time steps of the trajectory
time_step_state = int(state.time_step)
for i in range(len(trajectory.state_list)):
trajectory.state_list[i].time_step = time_step_state + i + 1
# check connectability between the first state of the trajectory and the state
state_start = deepcopy(state)
state_end = deepcopy(trajectory.state_list[0])
state_start.time_step = 0
state_end.time_step = 1
trajectory_to_be_cheked = Trajectory(0, [state_start, state_end])
if self.check_validity(trajectory_to_be_cheked):
return trajectory.state_list
return None
def check_validity(self, trajectory_to_be_cheked):
Checks feasibility of the trajectory
if self.veh_type_id == 1:
veh_type = VehicleType.FORD_ESCORT
elif self.veh_type_id == 2:
veh_type = VehicleType.BMW_320i
elif self.veh_type_id == 3:
veh_type = VehicleType.VW_VANAGON
csw = CommonRoadSolutionWriter(output_dir=os.getcwd(),
# use solution writer to generate target xml tree
csw.add_solution_trajectory(trajectory=trajectory_to_be_cheked, planning_problem_id=100)
xmlTree = csw.root_node
# generate states to be checked
[node] = xmlTree.findall('ksTrajectory')
veh_trajectory = KSTrajectory.from_xml(node)
veh_model = KinematicSingleTrackModel(self.veh_type_id, veh_trajectory, None)
# validate
result = TrajectoryValidator.is_trajectory_valid(veh_trajectory, veh_model, 0.1)
return result
def costs(self) -> float:
Returns euclidean distance between the last state of current primitive and [0,0].