Commit e7b29b14 authored by schuegra's avatar schuegra
Browse files

Initial commit

parents
.idea
__pycache__
*.pyc
*.csv
# Vadere output often generated by console
log.out
# do not track Vadere outputs (folders and files and environments), every computer as different paths, which results in changes for every commit.
vadere_scenarios
*.scenarios
*.project
# will be created automatically:
suqc/suqc_envs/
# Vadere jar models are *not* directly stored in the repo anymore via git-lfs
*.jar
# Configuration files (generated when importing the project)
suq_config.json
update_server.sh
# pickle files
*.p
# Ignore virtual environment
venv
# folders / files when packaging
build
dist
suqc.egg-info
# tutorial output
/tutorial/example_output/
/tutorial/example_multirun_output/
from .main import *
\ No newline at end of file
from py4j.java_gateway import java_import
class ApiWrapper(object):
def __init__(self, apiObject, gateway):
self._apiObject = apiObject
self._gateway = gateway
# imports
java_import(self._gateway.jvm, "org.vadere.manager.traci.compoundobjects.*")
java_import(self._gateway.jvm, "org.vadere.util.geometry.shapes.*")
java_import(self._gateway.jvm, "java.util.*")
# java types
self._stringClass = self._gateway.jvm.String
self._vpointClass = self._gateway.jvm.VPoint
self._arraylistClass = self._gateway.jvm.ArrayList
\ No newline at end of file
from py4j.java_gateway import java_import
from ._api_wrapper import ApiWrapper
class ControllWrapper(ApiWrapper):
def sendFile(self, scenarioPath):
return self._apiObject.sendFile(scenarioPath)
def getVersion(self):
return self._apiObject.getVersion()
def nextStep(self, simTimeStep):
return self._apiObject.nextSimTimeStep(str(simTimeStep))
def close(self):
return self._apiObject.close()
\ No newline at end of file
from ._api_wrapper import ApiWrapper
class PersonapiWrapper(ApiWrapper):
def getIDList(self):
response = self._apiObject.getIDList()
result = response.getResponseData()
return result
def getNextFreeId(self):
response = self._apiObject.getNextFreeId()
result = response.getResponseData()
return result
def getIDCount(self):
response = self._apiObject.getIDCount()
result = response.getResponseData()
return result
def getSpeed(self, personID):
response = self._apiObject.getSpeed(personID)
result = response.getResponseData()
return result
def setVelocity(self, personID, velocity):
response = self._apiObject.setVelocity(personID, velocity)
result = response.toString()
return result
def getPosition2D(self, personID):
response = self._apiObject.getPosition2D(personID)
result = response.getResponseData()
return result
def setPosition2D(self, personID, x, y):
vpoint = self._vpointClass(float(x), float(y))
response = self._apiObject.setPosition2D(personID, vpoint)
result = response.toString()
return result
def getPosition2DList(self):
response = self._apiObject.getPosition2DList()
result = response.getResponseData()
ids = self.getIDList()
position2DList = {}
for id in ids:
vpoint = result[id]
position2DList[id] = [vpoint.getX(), vpoint.getY()]
return dict(position2DList)
def getTargetList(self, personID):
response = self._apiObject.getTargetList(personID)
result = response.getResponseData()
return result
def setTargetList(self, personID, targets):
targetsJavaArrayList = self._arraylistClass()
for t in targets:
targetsJavaArrayList.add(t)
response = self._apiObject.setTargetList(personID, targetsJavaArrayList)
result = response.toString()
return result
def setNextTargetListIndex(self, personID, nextTargetListIndex):
response = self._apiObject.setNextTargetListIndex(personID, nextTargetListIndex)
result = response.toString()
return result
def getNextTagetListIndex(self, personID):
response = self._apiObject.getNextTargetListIndex(personID)
result = response.toString()
return result
def createNew(self, jsonFilePath):
response = self._apiObject.createNew(jsonFilePath)
result = response.toString()
return result
\ No newline at end of file
from ._api_wrapper import ApiWrapper
class PolygonapiWrapper(ApiWrapper):
def getIDList(self):
response = self._apiObject.getIDList()
result = response.getResponseData()
return result
def getType(self, elementID):
response = self._apiObject.getType(elementID)
result = response.getResponseData()
return result
def getShape(self, elementID):
response = self._apiObject.getShape(elementID)
result = response.getResponseData()
points = result.getPoints()
shape = []
for p in points:
shape += [(p.getX(), p.getY())]
return shape
def getCentroid(self, elementID):
response = self._apiObject.getCentroid(elementID)
result = response.getResponseData()
vpoint = (result.getX(), result.getY())
return vpoint
def getDistance(self, elementID, x, y):
point = self._arraylistClass()
point.add(self._stringClass(str(x)))
point.add(self._stringClass(str(y)))
response = self._apiObject.getDistance(elementID, point)
result = response.getResponseData().get(0)
return float(result)
def getPosition2D(self, elementID):
response = self._apiObject.getPosition2D(elementID)
result = response.getResponseData()
vpoint = (result.getX(), result.getY())
return vpoint
def getIDCount(self):
response = self._apiObject.getIDCount()
result = response.getResponseData()
return result
from typing import List
from py4j.java_gateway import java_import
from ._api_wrapper import ApiWrapper
class SimulationapiWrapper(ApiWrapper):
def getSimTime(self):
response = self._apiObject.getTime()
result = response.getResponseData()
return result
\ No newline at end of file
from typing import List
from py4j.java_gateway import java_import
from ._api_wrapper import ApiWrapper
class VadereapiWrapper(ApiWrapper):
def addStimulusInfos(self, jsonFilePath):
response = self._apiObject.addStimulusInfos(jsonFilePath)
result = response.toString()
return result
def getAllStimulusInfos(self):
response = self._apiObject.getAllStimulusInfos()
result = response.getResponseData()
return result
def createTargetChanger(self, jsonFilePath):
response = self._apiObject.createTargetChanger(jsonFilePath)
result = response.toString()
return result
def removeTargetChanger(self, elementID: str):
response = self._apiObject.removeTargetChanger(elementID)
result = response.toString()
return result
from ._personapi_wrapper import PersonapiWrapper
from ._simulationapi_wrapper import SimulationapiWrapper
from ._polygonapi_wrapper import PolygonapiWrapper
from ._vadereapi_wrapper import VadereapiWrapper
from ._controll_wrapper import ControllWrapper
import sys, os
from time import sleep, time
import subprocess
import atexit
import argparse
from typing import List
from py4j.java_gateway import JavaGateway, java_import, GatewayParameters, CallbackServerParameters
from IPython import embed
from pythontraciwrapper import PersonapiWrapper, SimulationapiWrapper, PolygonapiWrapper, VadereapiWrapper
from pythontraciwrapper import ControllWrapper
class Py4jClient:
def __init__(self, vaderePath, args=""):
cmdEntrypointProcess = "java -jar " + os.path.join(vaderePath, "VadereManager/target/vadere-traci-entrypoint.jar")
if isinstance(args, List):
self._argParser = argparse.ArgumentParser(description='Py4jClient.')
self._addArgs()
self._buildTraciEntrypointArgString(args)
else:
raise ValueError("args must be of type List")
cmdEntrypointProcess += self._traciEntrypointArgString
self._setup_entrypointProcess(cmdEntrypointProcess)
self._start_gateway()
def startScenario(self, scenarioName):
self.ctr.sendFile(scenarioName)
def _start_gateway(self):
# Gateeway
gp = GatewayParameters(port=self.args.javaPort)
cp = CallbackServerParameters(port=self.args.pythonPort)
gateway = JavaGateway(gateway_parameters=gp, callback_server_parameters=cp)
entryPoint = gateway.entry_point
# api
startTime = time()
maxWaitingTime = 5.
connected = False
while not connected and ((time() - startTime) < maxWaitingTime) is True:
try:
personapi = entryPoint.getPersonapi()
simulationapi = entryPoint.getSimulationapi()
polygonapi = entryPoint.getPolygonapi()
vadereapi = entryPoint.getVadereapi()
controll = entryPoint.getTraciControll()
connected = True
print("Python client connected to java via py4j")
except Exception:
print("TraciEntryPoint not ready after " + str(round(time() - startTime, 2)) + " seconds, wait..")
sleep(0.1)
# wrap api
self.ctr = ControllWrapper(controll, gateway)
self.pers = PersonapiWrapper(personapi, gateway)
self.sim = SimulationapiWrapper(simulationapi, gateway)
self.va = VadereapiWrapper(vadereapi, gateway)
self.poly = PolygonapiWrapper(polygonapi, gateway)
def _setup_entrypointProcess(self, cmdEntrypointProcess):
self._entrypointProcess = subprocess.Popen(cmdEntrypointProcess)
atexit.register(self._killProcessAtExit)
def _killProcessAtExit(self):
self._entrypointProcess.kill()
def _buildTraciEntrypointArgString(self, argString):
self.args = self._argParser.parse_args(argString)
traciEntrypointArgString = ""
traciEntrypointArgString += " --loglevel " + self.args.loglevel
if self.args.logname is not None:
traciEntrypointArgString += " --logname " + self.args.logname
traciEntrypointArgString += " --port " + str(self.args.port)
traciEntrypointArgString += " --java-port " + str(self.args.javaPort)
traciEntrypointArgString += " --python-port " + str(self.args.pythonPort)
traciEntrypointArgString += " --clientNum " + str(self.args.clientNum)
if self.args.guiMode is True:
traciEntrypointArgString += " --gui-mode"
traciEntrypointArgString += " --base-path " + str(self.args.basePath)
traciEntrypointArgString += " --default-scenario " + str(self.args.defaultScenario)
self._traciEntrypointArgString = traciEntrypointArgString
def _addArgs(self):
self._argParser.add_argument("--loglevel",
required=False,
type=str,
dest="loglevel",
choices=["OFF", "FATAL", "ERROR", "WARN", "INFO", "DEBUG",
"TRACE", "ALL"],
default="INFO",
help="Set Log Level.")
self._argParser.add_argument("--logname",
required=False,
type=str,
dest="logname",
help="Write log to given file.")
self._argParser.add_argument('--port',
required=False,
type=int,
default=9998,
help="Set port number.")
self._argParser.add_argument('--java-port',
required=False,
type=int,
default=10001,
dest="javaPort",
help="Set port number of gateway server for java.")
self._argParser.add_argument('--python-port',
required=False,
type=int,
default=10002,
dest="pythonPort",
help="Set port number of gateway server for python.")
self._argParser.add_argument("--clientNum",
required=False,
type=int,
default=4,
dest="clientNum",
help="Set number of clients to manager. Important: Each client has a separate simulation." +
" No communication between clients.")
self._argParser.add_argument("--gui-mode",
required=False,
type=bool,
dest="guiMode",
help="Start server with GUI support. If a scenario is reveived show the current state of " +
"the scenario")
self._argParser.add_argument("--base-path",
required=True,
type=str,
dest="basePath",
help="Scenario directory")
self._argParser.add_argument("--default-scenario",
required=False,
type=str,
dest="defaultScenario",
help="Supply a default scenario")
ipython==7.8.0
py4j==0.10.8.1
\ No newline at end of file
from distutils.core import setup
setup(
name='PythonTraciWrapper',
version='',
packages=['pythontraciwrapper'],
url='',
license='',
author='Philipp Schuegraf',
author_email='',
description='Wrapper around py4j to communicate with vadere via TraCI.'
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment