Notice to GitKraken users: A vulnerability has been found in the SSH key generation of GitKraken versions 7.6.0 to 8.0.0 (https://www.gitkraken.com/blog/weak-ssh-key-fix). If you use GitKraken and have generated a SSH key using one of these versions, please remove it both from your local workstation and from your LRZ GitLab profile.

21.10.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit 11ed2f46 authored by Sebastian Maierhofer's avatar Sebastian Maierhofer
Browse files

Add informed/uninformed search tutorials

parent 820b450b
Tutorials on Tree Search Algorithms
============
This is a short guide on how to use the search algorithms in tree_search.py for lectures and tutorials.
Set up
============
For using the code, you have to install the tools as described in the README in the root folder. For using the code in
Pycharm, create a new project in the folder `./GSMP/motion_automata`, to avoid path issues.
# Documentation
For running the search algorithms, you can currently specify three different configurations which mainly influence the
visualization of the search and are defined in `config.py`:
### Default:
* This configuration should only be used in the jupyter notebook tutorial (`cr_uninformed_search_tutorial` and
`cr_informed_search_tutorial`). It first runs the algorithm and stores the state of
each motion primitive, i.e., explored/frontier/currently exploring etc., for each step. Then it plots these steps using
ipywidgets, so the student can step through the search in the jupyter notebook.
* The color code for plotting is taken from the AIMA tutorial, but can be changed in `config.py`.
* By setting PLOT_LEGEND to True, a legend with all states of
motion primitives (see `MotionPrimitiveStatus` in `helper_tree_search.py`) is plotted.
* To reduce the number of steps, you can turn off that motion primitives with collisions are plotted one after the other by setting PLOT_COLLISION_STEPS
to False.
### StudentScript:
This configuration uses the same parameters as the Default configuration, but it can be run in Pycharm,
e.g. when executing `main.py`. It plots the different steps while executing the search algorithm. It should help
students to easily go through the code.
......@@ -7,9 +7,10 @@ sys.path.append(os.path.join(path_commonroad_search, "GSMP/tools/commonroad-coll
sys.path.append(os.path.join(path_commonroad_search, "GSMP/tools/commonroad-road-boundary"))
sys.path.append(os.path.join(path_commonroad_search, "GSMP/motion_automata/vehicle_model"))
import time
from multiprocessing import Manager, Process
import enum
import ipywidgets as widgets
import numpy as np
from math import atan2
......@@ -17,12 +18,16 @@ from math import atan2
import matplotlib.pyplot as plt
from IPython import display
from ipywidgets import widgets
from IPython.display import display
# import CommonRoad-io modules
from commonroad.visualization.draw_dispatch_cr import draw_object
from commonroad.common.file_reader import CommonRoadFileReader
from commonroad.scenario.trajectory import Trajectory, State
from commonroad.scenario.scenario import Scenario
from commonroad.geometry.shape import Rectangle
from commonroad.planning.planning_problem import PlanningProblem
from commonroad.scenario.obstacle import StaticObstacle, ObstacleRole, ObstacleType, DynamicObstacle
from commonroad.prediction.prediction import TrajectoryPrediction
from commonroad_cc.collision_detection.pycrcc_collision_dispatch import create_collision_object
from commonroad_cc.visualization.draw_dispatch import draw_object as draw_it
......@@ -32,6 +37,8 @@ from commonroad.common.solution_writer import CommonRoadSolutionWriter, VehicleT
from automata.MotionAutomata import MotionAutomata
from automata.MotionPrimitive import MotionPrimitive
from automata.States import FinalState
from typing import List, Dict, Tuple
# ================================== scenario ================================== #
def load_scenario(scenario_path):
......@@ -42,8 +49,8 @@ def load_scenario(scenario_path):
# ================================== automata ================================== #
def generate_automata(veh_type_id: int):
"""
def generate_automata(veh_type_id, mp_file=None, search_tutorial=False):
"""
1. We first create a Motion Automata object, then load pre-computed motion primitives into it.
Note that every motion primitive is a short trajectory with varied initial states that lead to different final states.
......@@ -51,10 +58,15 @@ def generate_automata(veh_type_id: int):
which contains all connectable primitives from the focused primitive.
If the velocity and steering angel of the start state of primitive B match
those of the final state of primitive A, we say B is connectable from A.
:param veh_type_id: id for the vehicle model in the motion primitives
:param mp_file: optional parameter for specifying the path to a motion primitives xml file
:param search_tutorial: boolean value to specify if the search is used for a tutorial ->
motion primitives are sorted by their y-coordinate in the final state
"""
# the primitives vary for different vehicle models 1, 2 and 3.
assert veh_type_id in (1,2,3), "Input vehicle type id is not valid! Must be either 1, 2 or 3."
assert veh_type_id in (1, 2, 3), "Input vehicle type id is not valid! Must be either 1, 2 or 3."
# step 1
automata = MotionAutomata(veh_type_id)
......@@ -62,22 +74,38 @@ def generate_automata(veh_type_id: int):
print("Reading motion primitives...")
try:
prefix = '../../GSMP/motion_automata/motion_primitives/'
if mp_file is not None:
automata.readFromXML(prefix + mp_file)
else:
if veh_type_id == 1:
automata.readFromXML(
prefix + 'V_0.0_20.0_Vstep_1.0_SA_-0.91_0.91_SAstep_0.23_T_0.5_Model_FORD_ESCORT.xml')
elif veh_type_id == 2:
automata.readFromXML(
prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.066_1.066_SAstep_0.27_T_0.5_Model_BMW320i.xml')
elif veh_type_id == 3:
automata.readFromXML(
prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.023_1.023_SAstep_0.26_T_0.5_Model_VW_VANAGON.xml')
if veh_type_id == 1:
automata.readFromXML(prefix + 'V_0.0_20.0_Vstep_1.0_SA_-0.91_0.91_SAstep_0.23_T_0.5_Model_FORD_ESCORT.xml')
elif veh_type_id == 2:
automata.readFromXML(prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.066_1.066_SAstep_0.27_T_0.5_Model_BMW320i.xml')
elif veh_type_id == 3:
automata.readFromXML(prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.023_1.023_SAstep_0.26_T_0.5_Model_VW_VANAGON.xml')
except Exception:
prefix = '../../../GSMP/motion_automata/motion_primitives/'
if veh_type_id == 1:
automata.readFromXML(prefix + 'V_0.0_20.0_Vstep_1.0_SA_-0.91_0.91_SAstep_0.23_T_0.5_Model_FORD_ESCORT.xml')
elif veh_type_id == 2:
automata.readFromXML(prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.066_1.066_SAstep_0.27_T_0.5_Model_BMW320i.xml')
elif veh_type_id == 3:
automata.readFromXML(prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.023_1.023_SAstep_0.26_T_0.5_Model_VW_VANAGON.xml')
if mp_file is not None:
automata.readFromXML(prefix + mp_file)
else:
if veh_type_id == 1:
automata.readFromXML(
prefix + 'V_0.0_20.0_Vstep_1.0_SA_-0.91_0.91_SAstep_0.23_T_0.5_Model_FORD_ESCORT.xml')
elif veh_type_id == 2:
automata.readFromXML(
prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.066_1.066_SAstep_0.27_T_0.5_Model_BMW320i.xml')
elif veh_type_id == 3:
automata.readFromXML(
prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.023_1.023_SAstep_0.26_T_0.5_Model_VW_VANAGON.xml')
# additional step for search examples -> sorting motion primitives by max. y-coordinate
if search_tutorial:
automata.sort_primitives()
# step 2
automata.createConnectivityLists()
......@@ -90,36 +118,34 @@ def generate_automata(veh_type_id: int):
return automata
def add_initial_state_to_automata(automata, planning_problem, flag_print_states = True):
def add_initial_state_to_automata(automata, planning_problem):
# set initial steering angle to zero
planning_problem.initial_state.steering_angle = 0.0
planning_problem.initial_state.steering_angle = 0.0
# the initial velocity of the planning problem may be any value, we need to obtain the closest velocity to it
# from StartStates of the primitives in order to get the feasible successors of the planning problem
velocity_closest = automata.getClosestStartVelocity(planning_problem.initial_state.velocity)
planning_problem.initial_state.velocity = velocity_closest
velocity_closest = automata.getClosestStartVelocity(planning_problem.initial_state.velocity)
planning_problem.initial_state.velocity = velocity_closest
initial_state = planning_problem.initial_state
goal_region = planning_problem.goal.state_list[0]
initial_state = planning_problem.initial_state
# if flag_print_states:
# print("Initial State:", initial_state)
# print("Goal Region:", goal_region)
# turn intial state into a motion primitive to check for connectivity to subsequent motion primitives
final_state_primitive = FinalState(x=initial_state.position[0],
y=initial_state.position[1],
# turn initial state into a motion primitive to check for connectivity to subsequent motion primitives
final_state_primitive = FinalState(x=initial_state.position[0],
y=initial_state.position[1],
steering_angle=initial_state.steering_angle,
velocity=initial_state.velocity,
orientation=initial_state.orientation,
velocity=initial_state.velocity,
orientation=initial_state.orientation,
time_step=initial_state.time_step)
initial_motion_primitive = MotionPrimitive(startState=None, finalState=final_state_primitive, timeStepSize=0, trajectory=None)
initial_motion_primitive = MotionPrimitive(startState=None, finalState=final_state_primitive, timeStepSize=0,
trajectory=None)
# create connectivity list for this imaginary motion primitive
automata.createConnectivityListPrimitive(initial_motion_primitive)
# create connectivity list for this imaginary motion primitive
automata.createConnectivityListPrimitive(initial_motion_primitive)
return automata, initial_motion_primitive
return automata, initial_motion_primitive
# ================================== search for solution ================================== #
"""
......@@ -133,6 +159,7 @@ More information on python Prcess could be found at [1](https://docs.python.org/
3. To execute the search, we need to define an individual wrapper function and set it as the target of the new Process.
"""
def execute_search(motion_planner, initial_motion_primitive, path_shared, dict_status_shared, max_tree_depth):
"""
definition of the wrapper function for search
......@@ -148,49 +175,51 @@ def execute_search(motion_planner, initial_motion_primitive, path_shared, dict_s
else:
display.clear_output(wait=True)
path_shared.value = result[0]
if len(result[1]) > 0:
print("Found primitives")
# for primitive in result[1]:
# print('\t', primitive)
def start_search(scenario, planning_problem, automata, motion_planner, initial_motion_primitive, flag_plot_intermediate_results=True, flag_plot_planning_problem=True):
def start_search(scenario, planning_problem, automata, motion_planner, initial_motion_primitive,
flag_plot_intermediate_results=True, flag_plot_planning_problem=True):
# create Manager object to manage shared variables between different Processes
process_manager = Manager()
# create shared variables between Processes
path_shared = process_manager.Value('path_shared', None)
path_shared = process_manager.Value('path_shared', None)
dict_status_shared = process_manager.Value('dict_status_shared', None)
# initialize status with a dictionary.
# IMPORTANT NOTE: if the shared variable contains complex object types (list, dict, ...), to change its value,
# we need to do it via another object (here dict_status) and pass it to the shared variable at the end
dict_status = {'cost_current': 0, 'path_current':[]}
dict_status = {'cost_current': 0, 'path_current': []}
dict_status_shared.value = dict_status
# maximum number of concatenated primitives
max_tree_depth = 100
max_tree_depth = 100
# create and start the Process for search
process_search = Process(target=execute_search,
args=(motion_planner,
initial_motion_primitive,
path_shared,
dict_status_shared,
process_search = Process(target=execute_search,
args=(motion_planner,
initial_motion_primitive,
path_shared,
dict_status_shared,
max_tree_depth))
process_search.start()
if flag_plot_intermediate_results:
# create a figure, plot the scenario and planning problem
fig = plt.figure(figsize=(9, 9))
fig = plt.figure(figsize=(9, 9))
plt.clf()
draw_object(scenario)
if flag_plot_planning_problem:
if flag_plot_planning_problem:
draw_object(planning_problem)
plt.gca().set_aspect('equal')
plt.gca().set_axis_off()
plt.margins(0,0)
plt.margins(0, 0)
plt.show()
refresh_rate_plot = 5.0
......@@ -200,14 +229,14 @@ def start_search(scenario, planning_problem, automata, motion_planner, initial_m
# break the loop if process_search is not alive anymore
if not process_search.is_alive():
break
# print current status
string_status = "Cost so far:", round(dict_status_shared.value['cost_current'], 2)
string_status += "Time step:", round(dict_status_shared.value['path_current'][-1].time_step, 2)
string_status += "Orientation:",round(dict_status_shared.value['path_current'][-1].orientation, 2)
string_status += "Orientation:", round(dict_status_shared.value['path_current'][-1].orientation, 2)
string_status += "Velocity:", round(dict_status_shared.value['path_current'][-1].velocity, 2)
print (string_status, end='\r')
print(string_status, end='\r')
# if there is a path to be visualized
if len(dict_status_shared.value['path_current']) > 0:
if flag_plot_intermediate_results:
......@@ -217,28 +246,28 @@ def start_search(scenario, planning_problem, automata, motion_planner, initial_m
draw_object(planning_problem)
plt.gca().set_aspect('equal')
plt.gca().set_axis_off()
plt.margins(0,0)
plt.margins(0, 0)
plt.show()
# create a Trajectory from the current states of the path
# Trajectory is essentially a list of states
trajectory = Trajectory(initial_time_step=0, state_list=dict_status_shared.value['path_current'])
# create an occupancy Prediction via the generated trajectory and shape of ego vehicle
# the Prediction includes a Trajectory and Shape of the object
prediction = TrajectoryPrediction(trajectory=trajectory, shape=automata.egoShape)
# create a colission object from prediction
# create a collision object from prediction
# here it is used for visualization purpose
collision_object = create_collision_object(prediction)
# draw this collision object
draw_it(collision_object, draw_params={'collision': {'facecolor': 'magenta'}})
# visualize current trajectory
fig.canvas.draw()
fig.canvas.draw()
# wait till process_search terminates
# wait till process_search terminates
process_search.join()
time.sleep(0.5)
print("Search finished.")
......@@ -250,22 +279,5 @@ def start_search(scenario, planning_problem, automata, motion_planner, initial_m
return path_shared.value, dict_status_shared.value
# ================================== visualization ================================== #
def get_state_at_time(t):
for state in path_shared.value:
# return the first state that has time_step >= given t
if state.time_step >= t:
return state
# else return last state
return path[-1]
def draw_state(t):
print("current time step: ", t)
draw_figure()
if path_shared is not None:
state = get_state_at_time(t)
trajectory = Trajectory(initial_time_step=int(state.time_step),state_list=[state])
prediction = TrajectoryPrediction(trajectory=trajectory, shape=automata.egoShape)
collision_object = create_collision_object(prediction)
draw_it(collision_object)
\ No newline at end of file
......@@ -5,6 +5,7 @@ from automata.MotionPrimitiveParser import MotionPrimitiveParser
from automata.MotionPrimitive import MotionPrimitive
from commonroad.geometry.shape import Rectangle
class MotionAutomata:
"""
Class to handle motion primitives for motion planning
......@@ -39,6 +40,9 @@ class MotionAutomata:
self.setVehicleTypeIdPrimitives()
return
def sort_primitives(self):
self.Primitives.sort(key=lambda x: x.finalState.y, reverse=False)
def createConnectivityListPrimitive(self, primitive: MotionPrimitive) -> None:
"""
Creates the successor list for a single primitive and stores them in a successor list of the given primitive.
......
......@@ -24,7 +24,7 @@ class MotionPrimitiveParser:
startSteeringAngle = float(startNode.find('steering_angle').text)
startVelocity = float(startNode.find('velocity').text)
startOrientation = float(startNode.find('orientation').text)
startTimeStep = float(startNode.find('time_step').text)
startTimeStep = int(startNode.find('time_step').text)
startState = StartState(startX, startY, startSteeringAngle, startVelocity, startOrientation, startTimeStep)
......@@ -35,13 +35,14 @@ class MotionPrimitiveParser:
finalSteeringAngle = float(finalNode.find('steering_angle').text)
finalVelocity = float(finalNode.find('velocity').text)
finalOrientation = float(finalNode.find('orientation').text)
finalTimeStep = float(finalNode.find('time_step').text)
finalTimeStep = int(finalNode.find('time_step').text)
finalState = FinalState(finalX, finalY, finalSteeringAngle, finalVelocity, finalOrientation, finalTimeStep)
# create trajectory from path node and start/final states
pathNode = xmlNode.find('Path')
timeStepSize = 1.0 / (len(pathNode) + 1)
duration = xmlNode.find('Duration')
timeStepSize = float(duration.text) / (len(pathNode) + 1)
trajectory = MotionPrimitiveParser.createTrajectoryFromPathStates(pathNode, startState, finalState, timeStepSize)
......
__author__ = "Anna-Katharina Rettinger"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["CoPlan"]
__version__ = "0.1"
__maintainer__ = "Anna-Katharina Rettinger"
__email__ = "commonroad-i06@in.tum.de"
__status__ = "Beta"
import enum
import ipywidgets as widgets
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from IPython import display
from ipywidgets import widgets
from IPython.display import display
# import CommonRoad-io modules
from commonroad.visualization.draw_dispatch_cr import draw_object
from commonroad.scenario.trajectory import State
from commonroad.scenario.scenario import Scenario
from commonroad.geometry.shape import Rectangle
from commonroad.planning.planning_problem import PlanningProblem
from commonroad.scenario.obstacle import ObstacleType, DynamicObstacle
# import Motion Automaton modules
from automata.MotionPrimitive import MotionPrimitive
from typing import List, Dict, Tuple
@enum.unique
class MotionPrimitiveStatus(enum.Enum):
IN_FRONTIER = 0
INVALID = 1
CURRENTLY_EXPLORED = 2
EXPLORED = 3
SOLUTION = 4
def plot_legend(plotting_config):
if hasattr(plotting_config, 'LABELS'):
node_status, labels = plotting_config.LABELS
else:
node_status = [status.value for status in MotionPrimitiveStatus]
labels = ['Frontier', 'Collision', 'Currently Exploring', 'Explored', 'Final Solution']
custom_lines = []
for value in node_status:
custom_lines.append(Line2D([0], [0], color=plotting_config.PLOTTING_PARAMS[value][0],
linestyle=plotting_config.PLOTTING_PARAMS[value][1],
linewidth=plotting_config.PLOTTING_PARAMS[value][2]))
legend = plt.legend(handles=custom_lines, labels=labels, loc='lower left',
bbox_to_anchor=(0.02, 0.51), prop={'size': 18})
legend.set_zorder(30)
plt.rcParams["legend.framealpha"] = 1.0
plt.rcParams["legend.shadow"] = True
def plot_search_scenario(scenario, initial_state, ego_shape, planning_problem, config):
plt.figure(figsize=(22.5, 4.5))
plt.axis('equal')
plt.xlim(55, 97)
plt.ylim(-2.5, 5.5)
draw_params = {'scenario': {'lanelet': {'facecolor': '#F8F8F8'}}}
draw_object(scenario, draw_params=draw_params)
ego_vehicle = DynamicObstacle(obstacle_id=scenario.generate_object_id(), obstacle_type=ObstacleType.CAR,
obstacle_shape=ego_shape,
initial_state=initial_state)
draw_object(ego_vehicle)
draw_object(planning_problem)
if config.PLOT_LEGEND:
plot_legend(plotting_config=config)
def initial_visualization(scenario, initial_state, ego_shape, planning_problem, config, path_fig):
if not config.JUPYTER_NOTEBOOK:
plot_search_scenario(scenario, initial_state, ego_shape, planning_problem, config)
if path_fig:
plt.rcParams['svg.fonttype'] = 'none'
plt.axis('off')
try:
plt.savefig(path_fig + 'initial_scenario.' + config.OUTPUT_FORMAT, format=config.OUTPUT_FORMAT,
bbox_inches='tight')
except:
print('Saving was not successful')
else:
plt.show(block=False)
def plot_state(state: State, color='red'):
plt.plot(state.position[0], state.position[1], color=color, marker='o', markersize=6)
def plot_motion_primitive(mp: MotionPrimitive, color='red'):
"""
Plots an object of class MotionPrimitive with marker at initial state and end state
@param mp: object of class Motion Primitive
@param color: color for plotting Motion Primitive
@return:
"""
plot_state(state=mp.trajectory.state_list[0])
plot_state(state=mp.trajectory.state_list[-1])
x = []
y = []
for state in mp.trajectory.state_list:
x.append(state.position[0])
y.append(state.position[1])
plt.plot(x, y, color=color, marker="")
def plot_primitive_path(mp: List[State], status: MotionPrimitiveStatus, plotting_params):
plt.plot(mp[-1].position[0], mp[-1].position[1], color=plotting_params[status.value][0], marker='o', markersize=8,
zorder=27)
x = []
y = []
for state in mp:
x.append(state.position[0])
y.append(state.position[1])
plt.plot(x, y, color=plotting_params[status.value][0], marker="", linestyle=plotting_params[status.value][1],
linewidth=plotting_params[status.value][2], zorder=25)
def update_visualization(mp: List[State], status: MotionPrimitiveStatus, node_status: Dict[int, Tuple],
path_fig, config, count):
assert isinstance(status, MotionPrimitiveStatus), "Status in not of type MotionPrimitiveStatus."
node_status.update({hash(mp[-1]): (mp, status)})
# only plot if run with python script
if not config.JUPYTER_NOTEBOOK:
plot_primitive_path(mp=mp, status=status, plotting_params=config.PLOTTING_PARAMS)
if path_fig:
plt.axis('off')
plt.rcParams['svg.fonttype'] = 'none'
try:
plt.savefig(path_fig + 'solution_step_' + str(count) + '.' + config.OUTPUT_FORMAT,
format=config.OUTPUT_FORMAT, bbox_inches='tight')
except:
print('Saving was not successful')
else:
plt.pause(0.4)
return node_status
def show_scenario(scenario_data: Tuple[Scenario, State, Rectangle, PlanningProblem], node_status: Dict[int, Tuple],
config):
plot_search_scenario(scenario=scenario_data[0], initial_state=scenario_data[1], ego_shape=scenario_data[2],
planning_problem=scenario_data[3], config=config)
for node in node_status.values():
plot_primitive_path(node[0], node[1], config.PLOTTING_PARAMS)
plt.show()
def display_steps(scenario_data, config, algorithm, **args):
def slider_callback(iteration):
# don't show graph for the first time running the cell calling this function
try:
show_scenario(scenario_data, node_status=all_node_status[iteration], config=config)
except:
pass
def visualize_callback(Visualize):
if Visualize is True:
button.value = False
global all_node_status
if 'limit' in args:
path, primitives, all_node_status = algorithm(limit=args['limit'])
else:
path, primitives, all_node_status = algorithm()
slider.max = len(all_node_status) - 1
for i in range(slider.max + 1):
slider.value = i
# time.sleep(.5)
slider = widgets.IntSlider(min=0, max=1, step=1, value=0)
slider_visual = widgets.interactive(slider_callback, iteration=slider)
display(slider_visual)
button = widgets.ToggleButton(value=False)
button_visual = widgets.interactive(visualize_callback, Visualize=button)
display(button_visual)
\ No newline at end of file
from commonroad.scenario.trajectory import State
from typing import List
from automata.MotionPrimitive import MotionPrimitive
class Node: