Commit cd299969 authored by Edmond Irani Liu's avatar Edmond Irani Liu 🏂
Browse files

change placement of motion primitive generator/

parent 28ff4ce0
# paths can be either relative or absolute, units shown in brackets []
# output directory for generated motion primitives
output_directory: ../motion_primitives/
# id of vehicle. 1: FORD_ESCORT, 2: BMW_320i, 3: VW_VANAGON
veh_type_id: 2
# time length of trajectory [s]
T: 0.5
# time step for forward state simulation [s]
# note that commonroad scenarios have a discrete time step dt of 0.1 seconds; for higher accuracy
# of forward simulation, here dt is set to 0.05. the simulated states will be downsampled afterwards
dt_simulation: 0.05
# sampling range [m/s]
velocity_sample_min: 0.0
velocity_sample_max: 20.0
# for a step of 1.0 m/s, num = (v_max - v_min) / 1.0 + 1
num_sample_velocity: 21
# steer to one side only, we can mirror the primitives afterwards [rad]
steering_angle_sample_min: 0.0
# if steering_angle_sample_max set to 0, it will be assigned the maximum value
# given by the selected vehicle parameter
steering_angle_sample_max: 0.0
num_sample_steering_angle: 9
# number of segments of trajectory
num_segment_trajectory: 10
# number of sample trajectories to be generated
num_simulations: 2
%% Cell type:markdown id: tags:
# Tutorial: Batch Processing
%% Cell type:markdown id: tags:
This tutorial shows how [Batch Processing]( is used to solve for solutions to different scenarios. It is composed of
* a configuration file (.yaml format),
* some helper functions,
* and the script below to call the planer function.
Before proceeding, you should make sure the configuration file is set correctly. Here are the explanation for each of the parameters in the configuration file:
* **input_path**: input directory of CommonRoad scenarios that you indend to solve.
* **output_path**: output directory of the solution files.
* **overwrite**: flag to determine whether the existing solution files should be overwritten.
* **timeout**: timeout time for your motion planner, unit in seconds
* **motion_planner_path**: directory where the module containing the function to execute your motion planner is located
* **motion_planner_module_name**: name of the module that contains the function to execute your motion planner
* **motion_planner_function_name**: name of the function that executes your motion planner
* **default**: parameters specified under this section will be applied to all scenarios. If you wish to specify a different paramter for specific scenarios, simply copy the section and replace 'default' with the id of your scenario.
* **vehicle_model**: model of the vehicle, valid values: **PM, KS, ST and MB**. Please refer to [Vehicle Models]( for more information.
* **vehicle_type**: type of the vehicle, valid values: **FORD_ESCORT, BMW_320i and VW_VANAGON**.
* **cost_function**: identifier of cost function. Please refer to [Cost Functions]( for more information.
* **planner_id**: id of the planner that is used to solve for solutions. in this example, 1 = Greedy Best First Search, 2 = A*, else = Your own planner.
* **planning_problem_idx**: planner problem index. for cooperative scenarios: 0, 1, 2, ..., otherwise: 0
* **max_tree_depth**: maximum permissible depth of the search tree
Note: the paths can be either **relative** to this script or **absolute**.
Simply run the following script to start batch processing. The results here is just exemplary, you will see different output as your configuration varies.
%% Cell type:code id: tags:
``` python
# import helper functions
import helper_functions
# specify path to configuration file
path_file_config = "../configuration/batch_processing_config.yaml"
# load configuration file
configuration = helper_functions.load_configuration(path_file_config)
# get target function
function_target = helper_functions.get_target_function(configuration)
# get a list of scenario files
list_files_input = helper_functions.get_input_files(configuration)
# get length of the list and time before timeout
num_files = len(list_files_input)
time_timeout = configuration['timeout']
print("Total number of files to be processed: {}".format(num_files))
print("Timeout setting: {} seconds\n".format(time_timeout))
count_processed = 0
# iterate through scenarios
for file_scenario in list_files_input:
count_processed += 1
print("Processing file No. {} / {}. Scenario ID: {}".format(count_processed, num_files, file_scenario[:-4]))
# parse the scenario file
result_parse = helper_functions.parse_scenario_file(configuration, file_scenario)
# execute target function
solution_trajectories = helper_functions.execute_target_function(function_target, result_parse, time_timeout)
# save solution file
helper_functions.save_solution(configuration, solution_trajectories, result_parse)
%% Cell type:code id: tags:
``` python
%% Cell type:markdown id: tags:
## Tutorial: Motion Primitives Generator
This tutorial demonstrates how are the motion primitives generated. You can find all related files in the [CommonRoad Search]( repository, under folder GSMP/tools/motion_primitive_generator/.
The main components are:
* **motion_primitives_config.yaml** in which the configuration is set;
* **** in which the main functions are implemented; and
* the script below to execute the functions.
Before proceeding, you should make sure the configuration file is set correctly, which consists of the following parameters:
* **output setting**:
* output_directory: output directory for generated motion primitives.
* **vehicle setting**:
* veh_type_id: id of vehicle type. 1: FORD_ESCORT, 2: BMW_320i, 3: VW_VANAGON
* **primitive setting**:
* T: time length of trajectory [s].
* dt_simulation: time step for forwad state simulation [s].
* velocity_sample_min: minimum sampling velocity [m/s].
* velocity_sample_max: maximum sampling velocity [m/s].
* num_sample_velocity: number of velocity samples.
* steering_angle_sample_min: minimum sampling angle [rad]. Note that here we only consider steering to one side, as we will mirror the primitives afterwards.
* steering_angle_sample_max: maximum sampling angle [rad]. If set to 0, it will be assigned the maximum permissible value given by the selected vehicle parameter.
* num_sample_steering_angle: number of steering angle samples
* **sample trajectory setting**:
* num_segment_trajectory: number of segments in sample trajectories
* num_simulations: number of sample trajectories to be generated
Note: the paths can be either **relative** to this script, or **absolute**.
%% Cell type:markdown id: tags:
### Load configuration file and parameters
%% Cell type:code id: tags:
``` python
%load_ext autoreload
%autoreload 2
import matplotlib.pyplot as plt
# import helper functions
from implementation import Parameter, load_configuration, generate_motion_primitives, create_mirrored_primitives, \
save_motion_primitives, compute_number_successors_average, compute_number_successors_average, \
# specify path to configuration file
path_file_config = "../configuration/motion_primitives_config.yaml"
# load configuration file
configuration = load_configuration(path_file_config)
# parse configuration and generate parameter object
parameters = Parameter(configuration)
%% Cell type:markdown id: tags:
### Generate motion primitives
The attributes of each of the states in a motion primitive are: x position, y position, steering angle, velocity in x direction, orientation and time step
%% Cell type:code id: tags:
``` python
list_traj_accepted = generate_motion_primitives(parameters)
%% Cell type:markdown id: tags:
### Plot generated motion primitives
%% Cell type:code id: tags:
``` python
for traj in list_traj_accepted:
list_x = [state.position[0] for state in traj.state_list]
list_y = [state.position[1] for state in traj.state_list]
plt.plot(list_x, list_y)
%% Cell type:markdown id: tags:
### Create mirrored primitives
As we only computed primitives that have a positive (negative) steering angle, we mirror them to get the other half of primitives.
%% Cell type:code id: tags:
``` python
list_traj_accepted_mirrored = create_mirrored_primitives(list_traj_accepted, parameters)
print("Total number of primitives (mirrored included): ", len(list_traj_accepted_mirrored))
%% Cell type:markdown id: tags:
### Check average number of successors
We can inspect how many successors does every primitive have on average.
%% Cell type:code id: tags:
``` python
num_average_successors = compute_number_successors_average(list_traj_accepted_mirrored)
print("average number of successors: ", num_average_successors)
%% Cell type:markdown id: tags:
### Plot final motion primitives
We can plot the final generated motion primitives.
%% Cell type:code id: tags:
``` python
# fig = plt.figure()
for traj in list_traj_accepted_mirrored:
list_x = [state.position[0] for state in traj.state_list]
list_y = [state.position[1] for state in traj.state_list]
plt.plot(list_x, list_y)
%% Cell type:markdown id: tags:
### Generate sample trajectories
We can generate sample trajectories with the motion primitives that we have just generated
%% Cell type:code id: tags:
``` python
generate_sample_trajectories(list_traj_accepted_mirrored, parameters)
%% Cell type:markdown id: tags:
### Save the motion primitives into xml file
The xml containing generated motion primitives are output to the directory specified in the configuration
%% Cell type:code id: tags:
``` python
save_motion_primitives(list_traj_accepted_mirrored, parameters)
import os
import sys
import yaml
import xml.etree.ElementTree as et
from xml.dom import minidom
import copy
import random
import itertools
import numpy as np
import matplotlib.pyplot as plt
from math import atan2, sin, cos
# vehicle model
from vehicleDynamics_KS import vehicleDynamics_KS
from parameters_vehicle1 import parameters_vehicle1
from parameters_vehicle2 import parameters_vehicle2
from parameters_vehicle3 import parameters_vehicle3
# solution checker
from solution_checker import TrajectoryValidator, KSTrajectory, KinematicSingleTrackModel
# commonroad objects
from commonroad.scenario.trajectory import State, Trajectory
# solution writer
from commonroad.common.solution_writer import CommonRoadSolutionWriter, VehicleModel, VehicleType, CostFunction
class Parameter:
A class to hold all parameters
def __init__(self, configuration):
# outputs
self.directory_output = configuration['outputs']['output_directory']
# vehicle related
self.veh_type_id = configuration['vehicles']['veh_type_id']
if self.veh_type_id == 1:
self.veh_type = VehicleType.FORD_ESCORT
self.veh_param = parameters_vehicle1()
self.veh_name = "FORD_ESCORT"
elif self.veh_type_id == 2:
self.veh_type = VehicleType.BMW_320i
self.veh_param = parameters_vehicle2()
self.veh_name = "BMW320i"
elif self.veh_type_id == 3:
self.veh_type = VehicleType.VW_VANAGON
self.veh_param = parameters_vehicle3()
self.veh_name = "VW_VANAGON"
# time
self.T = configuration['primitives']['T']
self.dt_simulation = configuration['primitives']['dt_simulation']
# times 100 for two significant digits accuracy, turns into centi-seconds
self.time_stamps = ((np.arange(0, self.T, self.dt_simulation) + self.dt_simulation) * 100).astype(int)
# velocity
self.velocity_sample_min = configuration['primitives']['velocity_sample_min']
self.velocity_sample_max = configuration['primitives']['velocity_sample_max']
self.num_sample_velocity = configuration['primitives']['num_sample_velocity']
# steering angle
self.steering_angle_sample_min = configuration['primitives']['steering_angle_sample_min']
self.steering_angle_sample_max = configuration['primitives']['steering_angle_sample_max']
self.num_sample_steering_angle = configuration['primitives']['num_sample_steering_angle']
if int(self.steering_angle_sample_max) == 0:
self.steering_angle_sample_max = self.veh_param.steering.max
# sample trajectories
self.num_segment_trajectory = configuration['sample_trajectories']['num_segment_trajectory']
self.num_simulations = configuration['sample_trajectories']['num_simulations']
def load_configuration(path_file_config):
load input configuration file
with open(path_file_config, 'r') as stream:
configuration = yaml.load(stream)
return configuration
except yaml.YAMLError as exc:
def generate_list_of_samples(parameter: Parameter):
generate list of samples for velocity and steering angle based on the input parameters
list_samples_v = np.linspace(parameter.velocity_sample_min,
list_samples_steering_angle = np.linspace(parameter.steering_angle_sample_min,
return list_samples_v, list_samples_steering_angle
def check_acceleration_steering_rate_constraint(v_start, v_end, d_start, d_end, parameter):
compute the required inputs and check whether they are of permissible values
# compute required inputs
a_input = (v_end - v_start) / parameter.T
steering_rate_input = (d_end - d_start) / parameter.T
# check if the constraints are respected
if abs(a_input) > abs(parameter.veh_param.longitudinal.a_max) or \
abs(steering_rate_input) > abs(parameter.veh_param.steering.v_max):
return False
return True
def simulate_list_of_states(v_start, v_end, d_start, d_end, parameter):
forward simulation of states with friction constraint taken into account
elements of a state: x[m], y[m], steering angle[rad], velocity[m/s], orientation[rad], time step[decisecond]
# compute required inputs
a_input = (v_end - v_start) / parameter.T
steering_rate_input = (d_end - d_start) / parameter.T
# list to store the states
list_states = []
# trajectory always starts at position (0, 0) m with orientation of 0 rad
x_input = np.array([0.0, 0.0, d_start, v_start, 0.0])
# we assume constant input through the whole duration of T
u_input = np.array([steering_rate_input, a_input])
# time stamp of first state is 0
list_states.append(np.append(x_input, 0))
is_friction_constraint_satisfied = True
# forward simulation of states
# ref:, page 4
for time_stamp in parameter.time_stamps:
# simulate state transition
x_dot = np.array(vehicleDynamics_KS(x_input, u_input, parameter.veh_param))
# check friction circle constraint
if (a_input ** 2 + (x_input[3] * x_dot[4]) ** 2) ** 0.5 > parameter.veh_param.longitudinal.a_max:
is_friction_constraint_satisfied = False
# generate new state
x_output = x_input + x_dot * parameter.dt_simulation
# downsample the states with step size of 10 centiseconds (equivalent to 0.1 seconds)
if time_stamp % 10 == 0:
# add state to list, note that time stamps of states are in unit of deciseconds
list_states.append(np.append(x_output, time_stamp / 10))
# prepare for next iteration
x_input = x_output
# return None if the friction constraint is not satisfied
if is_friction_constraint_satisfied:
return list_states
return None
def create_trajectory_from_list_of_states(list_states):
a helper function to create CommonRoad Trajectory from a list of States
# list to hold states for final trajectory
list_states_new = list()
# iterate through trajectory states
for state in list_states:
# feed in required slots
kwarg = {'position': np.array([state[0], state[1]]),
'velocity': state[3],
'steering_angle': state[2],
'orientation': state[4],
'time_step': state[5].astype(int)}
# append state
# create new trajectory for evaluation
trajectory_new = Trajectory(initial_time_step=0, state_list=list_states_new)
return trajectory_new
def check_validity(trajectory_input, parameter):
check whether the input trajectory satisfies the kinematic single track model
note: to be replaced by the new feasiblity checker
csw = CommonRoadSolutionWriter(output_dir=os.getcwd(),
# use solution writer to generate target xml file
csw.add_solution_trajectory(trajectory=trajectory_input, planning_problem_id=100)
xmlTree = csw.root_node
# generate states to be checked
[node] = xmlTree.findall('ksTrajectory')
veh_trajectory = KSTrajectory.from_xml(node)
veh_model = KinematicSingleTrackModel(parameter.veh_type_id, veh_trajectory, None)
# validate
result = TrajectoryValidator.is_trajectory_valid(veh_trajectory, veh_model, 0.1)
return result
def create_mirrored_primitives(list_traj, parameter):
# make sure to make a deep copy
list_traj_mirrored = copy.deepcopy(list_traj)
count_accepted = 0
for traj in list_traj:
list_states_mirrored = []
for state in traj.state_list:
# add mirrored state into list
trajectory_new = create_trajectory_from_list_of_states(list_states_mirrored)
# double check the validity before adding to the list
if check_validity(trajectory_new, parameter):
count_accepted += 1
return list_traj_mirrored
def compute_number_successors_average(list_traj):
compute the average number of successors (branching factor)
list_count_succesors = []
for traj_main in list_traj:
count_successors = 0
# 2bc = to be connected
for traj_2bc in list_traj:
state_final_traj_main = traj_main.state_list[-1]
state_initial_traj_2bc = traj_2bc.state_list[0]
if abs(state_final_traj_main.velocity - state_initial_traj_2bc.velocity) < 0.02 and \
abs(state_final_traj_main.steering_angle - state_initial_traj_2bc.steering_angle) < 0.02:
count_successors += 1
return np.mean(list_count_succesors)
def generate_sample_trajectories(list_traj_input, parameter):
generate sample trajectories and check if they pass the solution checker
for count_run in range(parameter.num_simulations):
count_segment_traj = 0
num_traj = len(list_traj_input)
# get a random starting trajectory id
idx_traj = random.randrange(num_traj)
list_traj = []
while count_segment_traj< parameter.num_segment_trajectory:
# retrieve trajectory
traj_main = copy.deepcopy(list_traj_input[idx_traj])
list_successors_traj_main = []
count_segment_traj += 1
# obtain its successors
for j in range(num_traj):
# 2bc = to be connected
traj_2bc = list_traj_input[j]
state_final_traj_main = traj_main.state_list[-1]
state_initial_traj_2bc = traj_2bc.state_list[0]
if abs(state_final_traj_main.velocity - state_initial_traj_2bc.velocity) < 0.02 and \
abs(state_final_traj_main.steering_angle - state_initial_traj_2bc.steering_angle) < 0.02:
# start over if a trajectory does not have a valid successor
num_successors_traj_main = len(list_successors_traj_main)
if num_successors_traj_main == 0:
count_segment_traj = 0
idx_traj = random.randrange(num_traj)
list_traj = []