The expiration time for new job artifacts in CI/CD pipelines is now 30 days (GitLab default). Previously generated artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information:

Commit 73ab8e0a authored by Edmond Irani Liu's avatar Edmond Irani Liu 🏂
Browse files

version for programming exercise

parent 4a237c6c
import sys
import os
import time
from multiprocessing import Manager, Process
import numpy as np
from math import atan2
import matplotlib.pyplot as plt
from IPython import display
from ipywidgets import widgets
# import CommonRoad-io modules
from commonroad.visualization.draw_dispatch_cr import draw_object
from commonroad.common.file_reader import CommonRoadFileReader
from commonroad.scenario.trajectory import Trajectory, State
from commonroad.geometry.shape import Rectangle
from commonroad.prediction.prediction import TrajectoryPrediction
from commonroad_cc.collision_detection.pycrcc_collision_dispatch import create_collision_object
from commonroad_cc.visualization.draw_dispatch import draw_object as draw_it
from commonroad.common.solution_writer import CommonRoadSolutionWriter, VehicleType, CostFunction
# import Motion Automata modules
from automata.MotionAutomata import MotionAutomata
from automata.MotionPrimitive import MotionPrimitive
from automata.States import FinalState
# ================================== scenario ================================== #
def load_scenario(scenario_path):
# open and read in scenario and planning problem set
scenario, planning_problem_set = CommonRoadFileReader(scenario_path).open()
return scenario, planning_problem_set
# ================================== automata ================================== #
def generate_automata(veh_type_id: int):
1. We first create a Motion Automata object, then load pre-computed motion primitives into it.
Note that every motion primitive is a short trajectory with varied initial states that lead to different final states.
2. We create a connectivity list for every pritimive in the Motion Automata,
which contains all connectable primitives from the focused primitive.
If the velocity and steering angel of the start state of primitive B match
those of the final state of primitive A, we say B is connectable from A.
# the primitives vary for different vehicle models 1, 2 and 3.
assert veh_type_id in (1,2,3), "Input vehicle type id is not valid! Must be either 1, 2 or 3."
# step 1
automata = MotionAutomata(veh_type_id)
prefix = '../../GSMP/motion_automata/motion_primitives/'
print("Reading motion primitives...")
if veh_type_id == 1:
automata.readFromXML(prefix + 'V_0.0_20.0_Vstep_1.0_SA_-0.91_0.91_SAstep_0.23_T_0.5_Model_FORD_ESCORT.xml')
elif veh_type_id == 2:
automata.readFromXML(prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.066_1.066_SAstep_0.27_T_0.5_Model_BMW320i.xml')
elif veh_type_id == 3:
automata.readFromXML(prefix + 'V_0.0_20.0_Vstep_1.0_SA_-1.023_1.023_SAstep_0.26_T_0.5_Model_VW_VANAGON.xml')
# step 2
# assign vehicle type id to all primitives
print("Automata created.")
print('Number of loaded primitives: ' + str(len(automata.Primitives)))
return automata
def add_initial_state_to_automata(automata, planning_problem, flag_print_states = True):
# set initial steering angle to zero
planning_problem.initial_state.steering_angle = 0.0
# the initial velocity of the planning problem may be any value, we need to obtain the closest velocity to it
# from StartStates of the primitives in order to get the feasible successors of the planning problem
velocity_closest = automata.getClosestStartVelocity(planning_problem.initial_state.velocity)
planning_problem.initial_state.velocity = velocity_closest
initial_state = planning_problem.initial_state
goal_region = planning_problem.goal.state_list[0]
# if flag_print_states:
# print("Initial State:", initial_state)
# print("Goal Region:", goal_region)
# turn intial state into a motion primitive to check for connectivity to subsequent motion primitives
final_state_primitive = FinalState(x=initial_state.position[0],
initial_motion_primitive = MotionPrimitive(startState=None, finalState=final_state_primitive, timeStepSize=0, trajectory=None)
# create connectivity list for this imaginary motion primitive
return automata, initial_motion_primitive
# ================================== search for solution ================================== #
1. Searching for a possible solution could be time consuming, and also we would like to visualize the intermediate states during the search.
Thus, we create a separate Process to handle the search, and visualize the intermediate states via Main Process.
More information on python Prcess could be found at [1]( and
2. We need multiprocessing.Manager.Value() to help us share variables between different Processes.
3. To execute the search, we need to define an individual wrapper function and set it as the target of the new Process.
def execute_search(motion_planner, initial_motion_primitive, path_shared, dict_status_shared, max_tree_depth):
definition of the wrapper function for search
calls search algorithm and update the status dictionary for visualization in search algorithm
note that dict_status_shared is being shared between difference Processes
result = motion_planner.search_alg(initial_motion_primitive.Successors, max_tree_depth, dict_status_shared)
# result is in form of (final path, used_primitives)
if result is None:
# no path found
path_shared.value = result[0]
if len(result[1]) > 0:
print("Found primitives")
# for primitive in result[1]:
# print('\t', primitive)
def start_search(scenario, planning_problem, automata, motion_planner, initial_motion_primitive, flag_plot_intermediate_results=True, flag_plot_planning_problem=True):
# create Manager object to manage shared variables between different Processes
process_manager = Manager()
# create shared variables between Processes
path_shared = process_manager.Value('path_shared', None)
dict_status_shared = process_manager.Value('dict_status_shared', None)
# initialize status with a dictionary.
# IMPORTANT NOTE: if the shared variable contains complex object types (list, dict, ...), to change its value,
# we need to do it via another object (here dict_status) and pass it to the shared variable at the end
dict_status = {'cost_current': 0, 'path_current':[]}
dict_status_shared.value = dict_status
# maximum number of concatenated primitives
max_tree_depth = 100
# create and start the Process for search
process_search = Process(target=execute_search,
if flag_plot_intermediate_results:
# create a figure, plot the scenario and planning problem
fig = plt.figure(figsize=(9, 9))
if flag_plot_planning_problem:
refresh_rate_plot = 5.0
while True:
time.sleep(1 / refresh_rate_plot)
# break the loop if process_search is not alive anymore
if not process_search.is_alive():
# print current status
string_status = "Cost so far:", round(dict_status_shared.value['cost_current'], 2)
string_status += "Time step:", round(dict_status_shared.value['path_current'][-1].time_step, 2)
string_status += "Orientation:",round(dict_status_shared.value['path_current'][-1].orientation, 2)
string_status += "Velocity:", round(dict_status_shared.value['path_current'][-1].velocity, 2)
print (string_status, end='\r')
# if there is a path to be visualized
if len(dict_status_shared.value['path_current']) > 0:
if flag_plot_intermediate_results:
if flag_plot_planning_problem:
# create a Trajectory from the current states of the path
# Trajectory is essentially a list of states
trajectory = Trajectory(initial_time_step=0, state_list=dict_status_shared.value['path_current'])
# create an occupancy Prediction via the generated trajectory and shape of ego vehicle
# the Prediction includes a Trajectory and Shape of the object
prediction = TrajectoryPrediction(trajectory=trajectory, shape=automata.egoShape)
# create a colission object from prediction
# here it is used for visualization purpose
collision_object = create_collision_object(prediction)
# draw this collision object
draw_it(collision_object, draw_params={'collision': {'facecolor': 'magenta'}})
# visualize current trajectory
# wait till process_search terminates
print("Search finished.")
if path_shared.value is not None and len(path_shared.value) > 1:
print("Solution successfully found :)")
print("Finding solution failed :(")
return path_shared.value, dict_status_shared.value
# ================================== visualization ================================== #
def get_state_at_time(t):
for state in path_shared.value:
# return the first state that has time_step >= given t
if state.time_step >= t:
return state
# else return last state
return path[-1]
def draw_state(t):
print("current time step: ", t)
if path_shared is not None:
state = get_state_at_time(t)
trajectory = Trajectory(initial_time_step=int(state.time_step),state_list=[state])
prediction = TrajectoryPrediction(trajectory=trajectory, shape=automata.egoShape)
collision_object = create_collision_object(prediction)
\ No newline at end of file
import copy
import numpy as np
import xml.etree.ElementTree as et
from sphinx.util import xmlname_checker
from Automata.MotionPrimitiveParser import MotionPrimitiveParser
from Automata.MotionPrimitive import MotionPrimitive
from automata.MotionPrimitiveParser import MotionPrimitiveParser
from automata.MotionPrimitive import MotionPrimitive
from commonroad.geometry.shape import Rectangle
class MotionAutomata:
def __init__(self, state_tuple, dt):
self.state_tuple = state_tuple
Class to handle motion primitives for motion planning
def __init__(self, veh_type_id):
self.numPrimitives = 0
self.Primitives = []#np.empty(0, dtype=MotionPrimitive)
#self.dt = dt
self.Primitives = []
self.veh_type_id = veh_type_id
self.egoShape = None
if veh_type_id == 1:
self.egoShape = Rectangle(length=4.298, width=1.674)
elif veh_type_id == 2:
self.egoShape = Rectangle(length=4.508, width=1.610)
elif veh_type_id == 3:
self.egoShape = Rectangle(length=4.569, width=1.844)
def readFromXML(self, filename: str) -> None:
......@@ -19,45 +28,73 @@ class MotionAutomata:
:param filename: the name of the file to be read from
# parse XML file
xmlTree = et.parse(filename).getroot()
# add trajectories
self.numPrimitives = self.numPrimitives + len(xmlTree.findall('Trajectory'))
#self.Primitives = np.empty(self.numPrimitives, dtype=MotionPrimitive)
#i = 0
for t in xmlTree.findall('Trajectory'):
self.Primitives.append(MotionPrimitiveParser.createFromNode(t)) # self.dt
#self.Primitives[i].id = i
#i = i + 1
def createConnectivityListPrimitive(self, primitive: MotionPrimitive)->None:
def createConnectivityListPrimitive(self, primitive: MotionPrimitive) -> None:
Creates the successor list for a single primitive and stores them in a successor list of the given primitive.
:param primitive: the primitive to be connected
for i in range(self.numPrimitives):
if primitive.checkConnectivityToNext(self.Primitives[i]):
for self_primitive in self.Primitives:
if primitive.checkConnectivityToNext(self_primitive):
def createConnectivityLists(self)->None:
def createConnectivityLists(self) -> None:
Creates a connectivity list for every primitive (let every primitive has its corresponding successor list).
for t in self.Primitives:
for self_primitive in self.Primitives:
def createMirroredPrimitives(self)->None:
def createMirroredPrimitives(self) -> None:
Creates the mirrored motion primitives since the file to load primitives by default only contains left curves. This function computes the right curves.
import copy
oldPrimitives = self.Primitives
self.numPrimitives = 2 * self.numPrimitives
self.Primitives = np.empty(self.numPrimitives, dtype=MotionPrimitive)
for i in range(len(oldPrimitives)):
self.Primitives[i] = oldPrimitives[i]
newPrim = copy.deepcopy(self.Primitives[i])
self.Primitives[i + len(oldPrimitives)] = newPrim
# create mirrored primitives for the old primitives
newPrimitive = copy.deepcopy(self.Primitives[i])
# add the new mirrored primitive to self primitive list
self.Primitives[i + len(oldPrimitives)] = newPrimitive
def getClosestStartVelocity(self, initial_velocity):
get the velocity among start states that is closest to the given initial velocity
min_error = float('inf')
min_idx = 0
for i in range(len(self.Primitives)):
primitive = self.Primitives[i]
error_velocity = abs(initial_velocity - primitive.startState.velocity)
if error_velocity < min_error:
min_error = error_velocity
min_idx = i
return self.Primitives[min_idx].startState.velocity
def setVehicleTypeIdPrimitives(self):
assign vehicle type id to all primitives
for primitive in self.Primitives:
primitive.veh_type_id = self.veh_type_id
from Automata.States import StartState, FinalState
from commonroad.scenario.trajectory import Trajectory
from commonroad.scenario.trajectory import State as StateTupleFactory
from commonroad.scenario.trajectory import State
import os
import numpy as np
import xml.etree.ElementTree as et
from typing import List
from copy import deepcopy
from automata.States import StartState, FinalState
from commonroad.scenario.trajectory import Trajectory
from commonroad.scenario.trajectory import State
from commonroad.common.solution_writer import CommonRoadSolutionWriter, VehicleType, CostFunction
from commonroad.common.solution_writer import VehicleModel as VM
from solution_checker import *
class MotionPrimitive:
def __init__(self, startState: StartState, finalState: FinalState, timeStepSize, trajectory: Trajectory):
......@@ -12,17 +18,44 @@ class MotionPrimitive:
:param startState: start state of the primitive
:param finalState: final state of the primitive
:param trajectory: trajectory of the primitive
# = id
self.startState = startState
self.finalState = finalState
self.timeStepSize = timeStepSize
self.trajectory = trajectory
self.Successors = []
self.vehicle_type = VehicleType.BMW_320i
self.vehicle_model = VM.KS
# a list to store connectable successive primitives from current primitive
self.Successors = []
self.veh_type_id = 1 # default
def __str__(self):
return "Primitive (" + str(self.startState) + ") => (" + str(self.finalState) + ")"
return "Primitive: \n\t {}\t=>\n\t{}".format(str(self.startState), str(self.finalState))
def print_info(self):
kwarg = {'position' : np.array([self.startState.x, self.startState.y]),
'velocity' : self.startState.velocity,
'steering_angle': self.startState.steering_angle,
'orientation' : self.startState.orientation,
'time_step' : self.startState.time_step}
state_start = State(**kwarg)
# add final state into the trajectory
kwarg = {'position' : np.array([self.finalState.x, self.finalState.y]),
'velocity' : self.finalState.velocity,
'steering_angle': self.finalState.steering_angle,
'orientation' : self.finalState.orientation,
'time_step' : self.finalState.time_step}
state_final = State(**kwarg)
for state in self.trajectory.state_list:
def mirror(self) -> None:
Mirrors the current primitive so that it describes a primitive in the opposite direction.
......@@ -35,7 +68,6 @@ class MotionPrimitive:
state.orientation = - state.orientation
self.trajectory.state_list[i] = state
def checkConnectivityToNext(self, other: 'MotionPrimitive') -> bool:
Any primitive whose first state's velocity is in the velocity range of the last state of the current primitive is considered as connected.
......@@ -43,61 +75,87 @@ class MotionPrimitive:
:param other: the next motion primitive to be checked
if (self.finalState.velocity - self.finalState.vRange >= other.startState.velocity - other.startState.vRange
and self.finalState.velocity + self.finalState.vRange <= other.startState.velocity + other.startState.vRange):
# if (self.finalState.velocity - self.finalState.vRange >= other.startState.velocity - other.startState.vRange and
# self.finalState.velocity + self.finalState.vRange <= other.startState.velocity + other.startState.vRange):
# return True
if abs(self.finalState.velocity - other.startState.velocity) < 0.01 and \
abs(self.finalState.steering_angle - other.startState.steering_angle) < 0.01 :
return True
return False
# append the basic primitive to the state and return the new state but without intermediate steps
def appendToState(self, state, state_tuple): ##TODO: to be finalized
stateDict = state._asdict()
def appendTrajectoryToState(self, state: State) -> List[State]:
Appends the primitive to the given state and returns the new list of states including the intermediate steps.
pos = np.array([[0,0], [self.finalState.x, self.finalState.y]])
ori = np.array([0,self.finalState.orientation])
vel = np.array([state.velocity, self.finalState.velocity])
:param state: the state to which the trajectory will be appended
traj = create_from_vertices(pos, 0, ori, vel)
traj.translate_rotate(np.zeros(2), state.orientation)
traj.translate_rotate(np.array(state.position), 0)
# deep copy to prevent manipulation of original data
trajectory = deepcopy(self.trajectory)
newStateDict = traj.state_list[1]._asdict()
newStateDict['time'] = stateDict['time'] * self.timeStepSize
# rotate the trajectory by the orientation of the given state
trajectory.translate_rotate(np.zeros(2), state.orientation)
# translate the trajectory by the position of the given state
trajectory.translate_rotate(state.position, 0)
return state_tuple(**newStateDict)
def appendTrajectoryToState(self, state: State) -> List[State]:
Appends the primitive to the given state and returns the new list of states including the intermediate steps.
# trajectory includes all states of the primitive
# as its first state is the same as the last state of the primitive to be connected
# we pop out the first state
# we modify the time steps of the trajectory
time_step_state = int(state.time_step)
for i in range(len(trajectory.state_list)):
trajectory.state_list[i].time_step = time_step_state + i + 1
:param state: the trajectory will be appended to this state
# check connectability between the first state of the trajectory and the state
state_start = deepcopy(state)
state_end = deepcopy(trajectory.state_list[0])
state_start.time_step = 0
state_end.time_step = 1
trajectory_to_be_cheked = Trajectory(0, [state_start, state_end])
if self.check_validity(trajectory_to_be_cheked):
return trajectory.state_list
return None
def check_validity(self, trajectory_to_be_cheked):
Checks feasibility of the trajectory
from copy import deepcopy
traj = deepcopy(self.trajectory)
traj.translate_rotate(np.zeros(2), state.orientation)
traj.translate_rotate(state.position, 0)
path = []
num_timeSteps = int(10*self.timeStepSize)
if num_timeSteps > 0:
ratio = len(traj.state_list)/num_timeSteps
index = len(traj.state_list)-1
timeStamp = num_timeSteps
while len(path) < num_timeSteps:
newStateDict = None
idx = round(index)
newStateDict = deepcopy(traj.state_list[idx])
newStateDict.time_step = int(state.time_step) + int(timeStamp)
index -= ratio
timeStamp -= 1
return path
if self.veh_type_id == 1:
veh_type = VehicleType.FORD_ESCORT
elif self.veh_type_id == 2:
veh_type = VehicleType.BMW_320i
elif self.veh_type_id == 3:
veh_type = VehicleType.VW_VANAGON
csw = CommonRoadSolutionWriter(output_dir=os.getcwd(),
# use solution writer to generate target xml tree
csw.add_solution_trajectory(trajectory=trajectory_to_be_cheked, planning_problem_id=100)
xmlTree = csw.root_node
# generate states to be checked
[node] = xmlTree.findall('ksTrajectory')
veh_trajectory = KSTrajectory.from_xml(node)
veh_model = KinematicSingleTrackModel(self.veh_type_id, veh_trajectory, None)