Commit bedc4daa authored by Benedikt Zoennchen's avatar Benedikt Zoennchen
Browse files

add python notebook which defines and computes different trajectory metrices.

parent 984d6105
Pipeline #114759 passed with stages
in 139 minutes and 51 seconds
%% Cell type:code id: tags:
``` python
import json
import numpy as np
import pandas as pd
import math
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
file = "./data/trajectories_distance.txt"
real_file = "./data/KO/ko-240-120-240/ko-240-120-240_combined_MB.txt"
f = open(file, "r")
header = f.readline();
trajectories = dict({});
for row in f:
s = row.split(" ");
pedId = int(s[0]);
footsteps = json.loads(s[1]);
trajectories[pedId] = footsteps[0]['footSteps'];
def get_trajectory(pedId):
return trajectories[pedId]
def get_footstep(trajectory, i):
return trajectory[i];
def start_time(trajectory):
return trajectory[0]['startTime'];
def max_start_time(trajectories):
return max(map(lambda i: start_time(trajectories[i]), trajectories))
def end_time(trajectory):
return trajectory[-1]['endTime'];
def min_end_time(trajectories):
return min(map(lambda i: end_time(trajectories[i]), trajectories))
def length(fs):
start = fs['start'];
end = fs['end'];
x1 = start['x'];
y1 = start['y'];
x2 = end['x'];
y2 = end['y'];
dx = x1-x2;
dy = y1-y2;
return np.sqrt(dx*dx + dy*dy);
def trajectory_length(trajectory):
return sum(map(lambda fs : length(fs), trajectory))
def direction(fs):
start = fs['start'];
end = fs['end'];
x1 = start['x'];
y1 = start['y'];
x2 = end['x'];
y2 = end['y'];
return np.array([x2-x1, y2-y1]);
def duration(fs):
startTime = fs['startTime'];
endTime = fs['endTime'];
return endTime-startTime;
def speed(fs):
return length(fs) / duration(fs);
def is_between(fs, time):
startTime = fs['startTime'];
endTime = fs['endTime'];
return startTime <= time and time < endTime;
def footstep(trajectory, time):
l = list(filter(lambda fs : is_between(fs, time), trajectory))
assert len(l) <= 1;
if len(l) == 1:
return l[0];
def position(trajectory, time):
fs = footstep(trajectory, time);
if fs != None:
startTime = fs['startTime'];
endTime = fs['endTime'];
dur = duration(fs);
partial_dur = time - startTime;
ratio = partial_dur / dur;
start = fs['start'];
x1 = start['x'];
y1 = start['y'];
l = length(fs);
if l == 0.0:
return np.array([x1, y1])
else:
partial_l = l * ratio;
v = direction(fs) / l * partial_l;
return np.array([x1, y1]) + v;
def cut(trajectory, sTime, eTime):
return list(filter(lambda fs : fs['startTime'] >= sTime and fs['endTime'] < eTime, trajectory))
def cut_soft(trajectory, sTime, eTime):
return list(filter(lambda fs : fs['startTime'] >= sTime and fs['startTime'] < eTime, trajectory))
def euclid_d(traj1, traj2, times):
"""Computes the total (Euclidean) distance between two trajectories at certain times."""
return 0
sT = max([start_time(traj1), start_time(traj2)])
eT = min([end_time(traj1), end_time(traj2)])
filtered_times = list(filter(lambda t: t >= sT and t <= eT, times))
overlaps = len(filtered_times)
if overlaps == 0:
return 0
return sum(map(lambda t: np.linalg.norm(position(traj1, t)- position(traj2, t)), filtered_times)) / overlaps
def euclid_path_length(traj1, traj2, times):
sT = max([start_time(traj1), start_time(traj2)]);
eT = min([end_time(traj1), end_time(traj2)]);
filtered_times = list(filter(lambda t: t >= sT and t <= eT, times));
s = np.array([0, 0])
for i in range(len(filtered_times)-1):
t1 = filtered_times[i]
t2 = filtered_times[i+1]
d1 = position(traj1, t1) - position(traj1, t2)
d2 = position(traj2, t1) - position(traj2, t2)
diff = d1 - d2
s = s + diff
return s;
def inter_agent_d(trajectories, t):
s = 0
min_index = min(trajectories.keys())
c = 0
for i in range(len(trajectories)):
pos1 = position(trajectories[i+min_index], t)
for j in range(i+1, len(trajectories)):
pos2 = position(trajectories[j+min_index], t)
if pos1 is not None and pos2 is not None:
s = s + np.linalg.norm(pos1 - pos2)
c = c + 1
if c == 0:
return 0
else:
return s / c
def total_inter_agent(trajectories1, trajectories2, times):
return sum(map(lambda t: inter_agent_d(trajectories1, t) - inter_agent_d(trajectories2, t), times)) / len(times)
def euclid_len(trajectory, sTime, eTime):
"""Computes the total (Euclidean) length of the trajectory in between [sTime;eTime]."""
cut_traj = cut_soft(trajectory, sTime, eTime);
return trajectory_length(cut_traj)
def greedy_match(trajectories1, trajectories2, times, f):
"""Computes a match of trajectories by using a greedy algorithm."""
assert len(trajectories1) == len(trajectories2)
min_index1 = min(trajectories1.keys())
min_index2 = min(trajectories2.keys())
match = {}
indexSet = set(range(min_index2, len(trajectories2)))
for i in range(min_index1, len(trajectories1)):
traj1 = trajectories1[i]
minVal = None
minIndex = None
for j in indexSet:
traj2 = trajectories2[j]
if overlap(traj1, traj2, 0.4):
val = f(traj1, traj2, times)
if(minVal == None or val < minVal):
minIndex = j
minVal = val
match[i] = minIndex
indexSet.remove(minIndex)
return match
def overlap(traj1, traj2, dt):
return True
def load_experiment(file):
fps = 16
data = pd.read_csv(
file,
sep=' ',
names=['pedestrianId', 'timeStep', 'x', 'y', 'e'],
index_col=False,
header=None,
skiprows=0)
data['x'] = data['x'] / 100
data['y'] = data['y'] / 100
data['timeStep'] = data['timeStep'] / fps
return data
def to_trajectories(data):
trajectories = dict({})
trajectory = []
for i in range(len(data)-1):
pedId = data['pedestrianId'][i]
if pedId == data['pedestrianId'][i+1]:
pedId = data['pedestrianId'][i]
x1 = data['x'][i]
y1 = data['y'][i]
x2 = data['x'][i+1]
y2 = data['y'][i+1]
startTime = data['timeStep'][i]
endTime = data['timeStep'][i+1]
fs = {'startTime':startTime, 'endTime': endTime, 'start':{'x':x1, 'y':y1}, 'end':{'x':x2, 'y':y2}}
trajectory.append