Commit 1eac71f7 authored by Stefan Schuhbaeck's avatar Stefan Schuhbaeck
Browse files

separate start procedure of vadere server and entry-point.

parent ed05735e
Pipeline #211515 passed with stages
in 145 minutes and 58 seconds
...@@ -2,10 +2,10 @@ from py4j.java_gateway import java_import ...@@ -2,10 +2,10 @@ from py4j.java_gateway import java_import
class ApiWrapper(object): class ApiWrapper(object):
def __init__(self, apiObject, gateway, client):
def __init__(self, apiObject, gateway):
self._apiObject = apiObject self._apiObject = apiObject
self._gateway = gateway self._gateway = gateway
self._client = client
# imports # imports
java_import(self._gateway.jvm, "org.vadere.manager.traci.compoundobjects.*") java_import(self._gateway.jvm, "org.vadere.manager.traci.compoundobjects.*")
......
...@@ -2,7 +2,6 @@ from ._api_wrapper import ApiWrapper ...@@ -2,7 +2,6 @@ from ._api_wrapper import ApiWrapper
class ControlWrapper(ApiWrapper): class ControlWrapper(ApiWrapper):
def sendFile(self, scenarioPath): def sendFile(self, scenarioPath):
return self._apiObject.sendFile(scenarioPath) return self._apiObject.sendFile(scenarioPath)
...@@ -13,4 +12,10 @@ class ControlWrapper(ApiWrapper): ...@@ -13,4 +12,10 @@ class ControlWrapper(ApiWrapper):
return self._apiObject.nextSimTimeStep(str(simTimeStep)) return self._apiObject.nextSimTimeStep(str(simTimeStep))
def close(self): def close(self):
return self._apiObject.close() """
Clean closing of server and entry-point thread
:return: TraCI Return Code
"""
ret = self._apiObject.close()
self._client.close_threads()
return ret
import logging
import os
import signal
import subprocess
import threading
import time
class Runner(threading.Thread):
def __init__(self, command, thread_name, log_location=None, use_stdout=False):
threading.Thread.__init__(self)
self.command = command
self.log_location = log_location
self.use_stdout = use_stdout
self.thread_name = thread_name
self.log = logging.getLogger()
def stop(self):
self._cleanup()
def run(self):
try:
if self.log_location is None:
self.log_location = os.devnull
log_file = open(self.log_location, "w")
self.process = subprocess.Popen(
self.command,
cwd=os.path.curdir,
shell=False,
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for line in self.process.stdout:
if self.use_stdout:
print(f"{self.thread_name}> {line.decode('utf-8')}", end="")
log_file.write(line.decode("utf-8"))
if self.process.returncode is not None:
self._cleanup()
finally:
log_file.close()
print(
f"{self.thread_name}> subprocess returncode={self.process.returncode}"
)
def _cleanup(self):
try:
os.kill(self.process.pid, signal.SIGTERM)
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.log.error("subprocess not stopping. Send SIGKILL")
if self.process.returncode is None:
os.kill(self.process.pid, signal.SIGKILL)
time.sleep(0.5)
if self.process.returncode is None:
self.log.error("subprocess still not dead")
raise
import argparse import argparse
import atexit
import os import os
import subprocess
from time import sleep, time from time import sleep, time
from typing import List from typing import List
from py4j.java_gateway import JavaGateway, GatewayParameters, CallbackServerParameters from py4j.java_gateway import (CallbackServerParameters, GatewayParameters,
JavaGateway)
from pythontraciwrapper import ControlWrapper from pythontraciwrapper import (ControlWrapper, PersonapiWrapper,
from pythontraciwrapper import PersonapiWrapper, SimulationapiWrapper, PolygonapiWrapper, VadereapiWrapper PolygonapiWrapper, SimulationapiWrapper,
VadereapiWrapper)
from pythontraciwrapper._runner import Runner
class Py4jClient: class Py4jClient:
@classmethod
def create(
cls,
project_path,
vadere_base_path="",
gui_mode=False,
start_server=True,
server_log=None,
entry_point_log=None,
debug=False,
):
return cls(
vadere_base_path=vadere_base_path,
args=["--base-path", project_path, "--gui-mode", gui_mode],
start_server=start_server,
server_log=server_log,
entry_point_log=entry_point_log,
debug=debug,
)
def __init__(self, vaderePath, args=""): def __init__(
self,
vadere_base_path="",
args=None,
start_server=True,
server_log=None,
entry_point_log=None,
debug=False,
):
self.vadere_base_path = vadere_base_path
self.start_server = start_server
self.server_log = server_log
self.entrypoint_log = entry_point_log
self.debug = debug
cmd_entrypoint_process = ["java", "-jar", os.path.join(vaderePath, self._build_argparser()
"VadereManager/target/vadere-traci-entrypoint.jar")]
if isinstance(args, List): if isinstance(args, List):
self._argParser = argparse.ArgumentParser(description='Py4jClient.') self.args = self._parser.parse_args(args)
self._addArgs()
self._traciEntrypointArgs = self._buildTraciEntrypointArgs(args)
else: else:
raise ValueError("args must be of type List") self._parser.parse_args(["--help"])
cmd_entrypoint_process.extend(self._traciEntrypointArgs) self._start_entrypoint()
self._setup_entrypointProcess(cmd_entrypoint_process) self._connect_gateway()
self._start_gateway()
def startScenario(self, scenarioName): def startScenario(self, scenarioName):
self.ctr.sendFile(scenarioName) if scenarioName.endswith(".scenario"):
self.ctr.sendFile(scenarioName[: -len(".scenario")])
else:
self.ctr.sendFile(scenarioName)
def _start_gateway(self): def _connect_gateway(self):
# Gateeway # Gateeway
gp = GatewayParameters(port=self.args.javaPort) gp = GatewayParameters(port=self.args.javaPort)
...@@ -41,7 +72,7 @@ class Py4jClient: ...@@ -41,7 +72,7 @@ class Py4jClient:
# api # api
startTime = time() startTime = time()
maxWaitingTime = 60. maxWaitingTime = 5.0
connected = False connected = False
while not connected and ((time() - startTime) < maxWaitingTime) is True: while not connected and ((time() - startTime) < maxWaitingTime) is True:
try: try:
...@@ -53,91 +84,181 @@ class Py4jClient: ...@@ -53,91 +84,181 @@ class Py4jClient:
connected = True connected = True
print("Python client connected to java via py4j") print("Python client connected to java via py4j")
except Exception: except Exception:
print("TraciEntryPoint not ready after " + str(round(time() - startTime, 2)) + " seconds, wait..")
sleep(0.1) print(
"TraciEntryPoint not ready after "
+ str(round(time() - startTime, 2))
+ " seconds, wait.."
)
sleep(1.0)
if not connected:
print("Cannot connect to TraciEntryPoint/TraCIServer")
exit(-1)
# wrap api # wrap api
self.ctr = ControlWrapper(control, gateway) self.ctr = ControlWrapper(control, gateway, self)
self.pers = PersonapiWrapper(personapi, gateway) self.pers = PersonapiWrapper(personapi, gateway, self)
self.sim = SimulationapiWrapper(simulationapi, gateway) self.sim = SimulationapiWrapper(simulationapi, gateway, self)
self.va = VadereapiWrapper(vadereapi, gateway) self.va = VadereapiWrapper(vadereapi, gateway, self)
self.poly = PolygonapiWrapper(polygonapi, gateway) self.poly = PolygonapiWrapper(polygonapi, gateway, self)
def _setup_entrypointProcess(self, cmdEntrypointProcess): def close_threads(self):
self._entrypointProcess = subprocess.Popen(cmdEntrypointProcess) if self.server_thread is not None:
atexit.register(self._killProcessAtExit) self.server_thread.stop()
def _killProcessAtExit(self): if self.entrypoint_thread is not None:
self._entrypointProcess.kill() self.entrypoint_thread.stop()
def _start_entrypoint(self):
if self.start_server:
vadere_server_cmd = [
"java",
"-jar",
os.path.join(
self.vadere_base_path, "VadereManager/target/vadere-server.jar",
),
]
vadere_server_cmd.extend(self._server_args())
self.server_thread = Runner(
command=vadere_server_cmd,
thread_name="Server",
log_location=self.server_log,
use_stdout=self.debug,
)
print("Start Server Thread...")
self.server_thread.start()
sleep(0.8)
else:
self.server_thread = None
print("Connecting to existing Server...")
def _buildTraciEntrypointArgs(self, argList): entrypoint_cmd = [
"java",
"-jar",
os.path.join(
self.vadere_base_path,
"VadereManager/target/vadere-traci-entrypoint.jar",
),
]
entrypoint_cmd.extend(self._entrypoint_args())
self.entrypoint_thread = Runner(
command=entrypoint_cmd,
thread_name="Entrypoint",
log_location=self.entrypoint_log,
use_stdout=self.debug,
)
print("Client> Start Entrypoint Thread...")
self.entrypoint_thread.start()
sleep(0.8)
self.args = self._argParser.parse_args(argList) def _server_args(self):
arg_list = list() arg_list = list()
arg_list.extend(["--loglevel", self.args.loglevel]) arg_list.extend(["--loglevel", self.args.loglevel])
if self.args.logname is not None: if self.args.logname is not None:
arg_list.extend([" --logname", self.args.logname]) arg_list.extend([" --logname", self.args.logname])
arg_list.extend(["--port", str(self.args.port)]) arg_list.extend(["--bind", self.args.bind])
arg_list.extend(["--java-port", str(self.args.javaPort)]) arg_list.extend(["--port", str(self.args.server_port)])
arg_list.extend(["--python-port", str(self.args.pythonPort)])
arg_list.extend(["--clientNum", str(self.args.clientNum)]) arg_list.extend(["--clientNum", str(self.args.clientNum)])
if self.args.guiMode is True: if self.args.guiMode is True:
arg_list.append("--gui-mode") arg_list.append("--gui-mode")
return arg_list
def _entrypoint_args(self):
arg_list = list()
arg_list.extend(["--loglevel", self.args.loglevel])
if self.args.logname is not None:
arg_list.extend([" --logname", self.args.logname])
arg_list.extend(["--bind", self.args.bind])
arg_list.extend(["--server-port", str(self.args.server_port)])
arg_list.extend(["--java-port", str(self.args.javaPort)])
arg_list.extend(["--python-port", str(self.args.pythonPort)])
arg_list.extend(["--base-path", str(self.args.basePath)]) arg_list.extend(["--base-path", str(self.args.basePath)])
arg_list.extend(["--default-scenario", str(self.args.defaultScenario)]) arg_list.extend(["--default-scenario", str(self.args.defaultScenario)])
return arg_list return arg_list
def _addArgs(self): def _build_argparser(self):
self._argParser.add_argument("--loglevel", self._parser = argparse.ArgumentParser(description="Py4jClient.")
required=False, self._parser.add_argument(
type=str, "--loglevel",
dest="loglevel", required=False,
choices=["OFF", "FATAL", "ERROR", "WARN", "INFO", "DEBUG", type=str,
"TRACE", "ALL"], dest="loglevel",
default="INFO", choices=["OFF", "FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE", "ALL"],
help="Set Log Level.") default="INFO",
self._argParser.add_argument("--logname", help="Set Log Level.",
required=False, )
type=str, self._parser.add_argument(
dest="logname", "--logname",
help="Write log to given file.") required=False,
self._argParser.add_argument('--port', type=str,
required=False, # default="log",
type=int, dest="logname",
default=9998, help="Write log to given file.",
help="Set port number.") )
self._argParser.add_argument('--java-port', self._parser.add_argument(
required=False, "--bind", required=False, type=str, default="127.0.0.1",
type=int, )
default=10001, self._parser.add_argument(
dest="javaPort", "--server-port",
help="Set port number of gateway server for java.") required=False,
self._argParser.add_argument('--python-port', type=int,
required=False, default=9998,
type=int, help="Set port number.",
default=10002, )
dest="pythonPort", self._parser.add_argument(
help="Set port number of gateway server for python.") "--java-port",
self._argParser.add_argument("--clientNum", required=False,
required=False, type=int,
type=int, default=10001,
default=4, dest="javaPort",
dest="clientNum", help="Set port number of gateway server for java.",
help="Set number of clients to manager. Important: Each client has a separate simulation." + )
" No communication between clients.") self._parser.add_argument(
self._argParser.add_argument("--gui-mode", "--python-port",
required=False, required=False,
type=bool, type=int,
dest="guiMode", default=10002,
help="Start server with GUI support. If a scenario is reveived show the current state of " + dest="pythonPort",
"the scenario") help="Set port number of gateway server for python.",
self._argParser.add_argument("--base-path", )
required=True, self._parser.add_argument(
type=str, "--clientNum",
dest="basePath", required=False,
help="Scenario directory") type=int,
self._argParser.add_argument("--default-scenario", default=4,
required=False, dest="clientNum",
type=str, help="Set number of clients to manager. Important: Each client has a separate simulation."
dest="defaultScenario", + " No communication between clients.",
help="Supply a default scenario") )
self._parser.add_argument(
"--gui-mode",
required=False,
type=bool,
dest="guiMode",
help="Start server with GUI support. If a scenario is reveived show the current state of "
+ "the scenario",
)
self._parser.add_argument(
"--base-path",
required=True,
type=str,
dest="basePath",
help="Scenario directory",
)
self._parser.add_argument(
"--default-scenario",
required=False,
type=str,
dest="defaultScenario",
help="Supply a default scenario",
)
if __name__ == "__main__":
c = Py4jClient.create(
project_path="/path/to/vadere/scenarios/",
vadere_base_path="/path/to/vadere",
start_server=False,
debug=False,
)
c.startScenario("Test001.scenario")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment