Commit ed05735e authored by Stefan Schuhbaeck's avatar Stefan Schuhbaeck
Browse files

change PythonTraciWrapper

parent 333e0a9f
#!/usr/bin/python3 #!/usr/bin/python3
import argparse import argparse
import logging
import os
import select
import signal
import socket import socket
import subprocess import subprocess
import os
import sys import sys
import time
import threading import threading
import signal import time
import select
import logging
""" """
vadere-lauchner.py dispatches each client to a new (fresh) instance of a vadere server to ensure that no side effect vadere-lauchner.py dispatches each client to a new (fresh) instance of a vadere server to ensure that no side effect
...@@ -21,16 +22,16 @@ Script is based on (sumo-launchd.py)[https://github.com/sommer/veins/blob/master ...@@ -21,16 +22,16 @@ Script is based on (sumo-launchd.py)[https://github.com/sommer/veins/blob/master
class VadereRunner(threading.Thread): class VadereRunner(threading.Thread):
def __init__(self, client_socket, server_port, options): def __init__(self, client_socket, server_port, options):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.client_socket: socket.socket = client_socket self.client_socket: socket.socket = client_socket
self.server_port = server_port self.server_port = server_port
self.options = options self.options = options
self.stop = False self.stop = False
self.vadere = '' self.vadere = ""
self.log = logging.getLogger( self.log = logging.getLogger(
f"vadere-runner-{self.client_socket.getpeername()[1]}:{server_port}") f"vadere-runner-{self.client_socket.getpeername()[1]}:{server_port}"
)
def stop_tread(self): def stop_tread(self):
self.stop = True self.stop = True
...@@ -39,24 +40,28 @@ class VadereRunner(threading.Thread): ...@@ -39,24 +40,28 @@ class VadereRunner(threading.Thread):
return f"subprocess: pid: {self.vadere.pid} returncode: {self.returncode}" return f"subprocess: pid: {self.vadere.pid} returncode: {self.returncode}"
def forward_connection(self, server_socket: socket.socket): def forward_connection(self, server_socket: socket.socket):
self.client_socket.setsockopt( self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.log.debug( self.log.debug(
f"start forwarding {self.client_socket.getpeername()[0]}:{self.client_socket.getpeername()[1]} " f"start forwarding {self.client_socket.getpeername()[0]}:{self.client_socket.getpeername()[1]} "
f"<----->" f"<----->"
f" {server_socket.getpeername()[0]}:{server_socket.getpeername()[1]}") f" {server_socket.getpeername()[0]}:{server_socket.getpeername()[1]}"
)
while not self.stop: while not self.stop:
(read, write, exp) = select.select( (read, write, exp) = select.select(
[self.client_socket, server_socket], [], [self.client_socket, server_socket], 1) [self.client_socket, server_socket],
[],
[self.client_socket, server_socket],
1,
)
if len(exp) != 0: if len(exp) != 0:
self.stop = True self.stop = True
if self.client_socket in read: if self.client_socket in read:
try: try:
data = self.client_socket.recv(65535) data = self.client_socket.recv(65535)
if data == b'': if data == b"":
self.stop = True self.stop = True
except socket.error: except socket.error:
self.stop = True self.stop = True
...@@ -67,7 +72,7 @@ class VadereRunner(threading.Thread): ...@@ -67,7 +72,7 @@ class VadereRunner(threading.Thread):
if server_socket in read: if server_socket in read:
try: try:
data = server_socket.recv(65535) data = server_socket.recv(65535)
if data == b'': if data == b"":
self.stop = True self.stop = True
except socket.error: except socket.error:
self.stop = True self.stop = True
...@@ -78,48 +83,66 @@ class VadereRunner(threading.Thread): ...@@ -78,48 +83,66 @@ class VadereRunner(threading.Thread):
self.log.debug( self.log.debug(
f"stop forwarding {self.client_socket.getpeername()[0]}:{self.client_socket.getpeername()[1]} " f"stop forwarding {self.client_socket.getpeername()[0]}:{self.client_socket.getpeername()[1]} "
f"<----->" f"<----->"
f" {server_socket.getpeername()[0]}:{server_socket.getpeername()[1]}") f" {server_socket.getpeername()[0]}:{server_socket.getpeername()[1]}"
)
def run(self): def run(self):
cmd = ['java', '-jar', self.options.jar, '--port', cmd = [
str(self.server_port), '--single-client'] "java",
"-jar",
self.options.jar,
"--loglevel",
self.options.vadere_log_level,
"--port",
str(self.server_port),
"--single-client",
]
if self.options.gui: if self.options.gui:
cmd.append('--gui-mode') cmd.append("--gui-mode")
try: try:
if self.options.vadere_log: if self.options.vadere_log == "":
log_file_name = f"vader-port-{self.client_socket.getpeername()[1]}-{self.server_port}.log"
else:
log_file_name = os.devnull log_file_name = os.devnull
else:
log_file = open(log_file_name, 'w') log_path = os.path.abspath(
os.path.join(
self.options.vadere_log,
f"vader-port-{self.client_socket.getpeername()[1]}-{self.server_port}.log",
)
)
os.makedirs(os.path.dirname(log_path), exist_ok=True)
log_file_name = log_path
self.log.info(f"crate log at: {log_file_name}")
log_file = open(log_file_name, "w")
self.vadere = subprocess.Popen( self.vadere = subprocess.Popen(
cmd, cmd,
cwd=os.path.curdir, cwd=os.path.curdir,
shell=False, shell=False,
stdin=None, stdin=None,
stdout=log_file, stdout=log_file,
stderr=log_file) stderr=log_file,
)
connected = False connected = False
tries = 1 tries = 1
while not connected: while not connected:
try: try:
server_socket = socket.socket( server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.AF_INET, socket.SOCK_STREAM) server_socket.connect(("127.0.0.1", self.server_port))
server_socket.connect(('127.0.0.1', self.server_port))
connected = True connected = True
except socket.error as e: except socket.error as e:
self.log.debug(f"connection attempt {tries} : {e}") self.log.debug(f"connection attempt {tries} : {e}")
if tries > 10: if tries > 10:
raise raise
time.sleep(tries * .25) time.sleep(tries * 0.25)
tries += 1 tries += 1
self.log.debug(f"connection attempt {tries} : OK") self.log.debug(f"connection attempt {tries} : OK")
self.log.info( self.log.info(
f"started vadere server process (PID:{self.vadere.pid}) at port {self.server_port}") f"started vadere server process (PID:{self.vadere.pid}) at port {self.server_port}"
)
self.forward_connection(server_socket) self.forward_connection(server_socket)
self.client_socket.close() self.client_socket.close()
...@@ -129,22 +152,26 @@ class VadereRunner(threading.Thread): ...@@ -129,22 +152,26 @@ class VadereRunner(threading.Thread):
self.vadere.wait(timeout=2) self.vadere.wait(timeout=2)
except subprocess.TimeoutExpired as e: except subprocess.TimeoutExpired as e:
self.log.info( self.log.info(
f"vadere server process (PID:{self.vadere.pid}) not terminated after timeout. Send SIGKILL") f"vadere server process (PID:{self.vadere.pid}) not terminated after timeout. Send SIGKILL"
)
if self.vadere.returncode is None: if self.vadere.returncode is None:
os.kill(self.vadere.pid, signal.SIGKLL) os.kill(self.vadere.pid, signal.SIGKLL)
time.sleep(0.5) time.sleep(0.5)
if self.vadere.returncode is None: if self.vadere.returncode is None:
self.log.error( self.log.error(
f"vadere server process (PID:{self.vadere.pid}) still not dead.") f"vadere server process (PID:{self.vadere.pid}) still not dead."
)
raise raise
if log_file_name == os.devnull: if log_file_name == os.devnull:
self.log.info( self.log.info(
f"vadere server process (PID:{self.vadere.pid}) exited with returncode: {self.vadere.returncode}") f"vadere server process (PID:{self.vadere.pid}) exited with returncode: {self.vadere.returncode}"
)
else: else:
self.log.info( self.log.info(
f"vadere server process (PID:{self.vadere.pid}) exited with returncode: {self.vadere.returncode} log for details: {os.path.abspath(log_file_name)} ") f"vadere server process (PID:{self.vadere.pid}) exited with returncode: {self.vadere.returncode} log for details: {os.path.abspath(log_file_name)} "
)
finally: finally:
log_file.close() log_file.close()
...@@ -170,8 +197,7 @@ def wait_for_client(options): ...@@ -170,8 +197,7 @@ def wait_for_client(options):
while True: while True:
conn, addr = s.accept() conn, addr = s.accept()
port = get_port(options.bind) port = get_port(options.bind)
logging.info( logging.info(f"client connected start vadere runner at port {port}...")
f"client connected start vadere runner at port {port}...")
thread = VadereRunner(conn, port, options) thread = VadereRunner(conn, port, options)
thread.start() thread.start()
threads.append(thread) threads.append(thread)
...@@ -189,7 +215,7 @@ def wait_for_client(options): ...@@ -189,7 +215,7 @@ def wait_for_client(options):
s.close() s.close()
def main(): def main(args=None):
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(
"-p", "-p",
...@@ -197,56 +223,70 @@ def main(): ...@@ -197,56 +223,70 @@ def main():
dest="port", dest="port",
default=9998, default=9998,
type=int, type=int,
help="listen on port for incoming requests. [default= 9998]") help="listen on port for incoming requests. [default= 9998]",
)
parser.add_argument( parser.add_argument(
"-b", "-b",
"--bind", "--bind",
dest="bind", dest="bind",
default='127.0.0.1', default="127.0.0.1",
help="bind to address. [default= 127.0.0.1]") help="bind to address. [default= 127.0.0.1]",
)
parser.add_argument( parser.add_argument(
"-g", "-g",
"--gui-mode", "--gui-mode",
dest="gui", dest="gui",
default=False, default=False,
action='store_true', action="store_true",
help="show graphical userinterface if client connects. [default= false]") help="show graphical userinterface if client connects. [default= false]",
)
parser.add_argument( parser.add_argument(
"--jar", "--jar",
dest="jar", dest="jar",
default='', default="",
help="JAR file to execute. If not given use default location.") help="JAR file to execute. If not given use default location.",
)
parser.add_argument(
"--vadere-log-level",
dest="vadere_log_level",
default="INFO",
type=str,
help="Loglevel for Vadere server. [default= False]",
)
parser.add_argument( parser.add_argument(
"--vadere-log", "--vadere-log",
dest="vadere_log", dest="vadere_log",
default=False, default="",
action='store_true', required=False,
help="If set create log file with client:server port in file name. [default= False]") type=str,
help="Path log location. Filename is dynamic . [default= no logfile]",
)
parser.add_argument( parser.add_argument(
"--log", "--log",
dest="log", dest="log",
default=logging.INFO, default=logging.INFO,
choices=list( choices=list(logging._nameToLevel.keys()),
logging._nameToLevel.keys()), help="write log files for each vadere" " instance. [default= INFO]",
help="write log files for each vadere" )
" instance. [default= INFO]") if args is None:
options = parser.parse_args() options = parser.parse_args()
else:
options = parser.parse_args(args)
logging.basicConfig( logging.basicConfig(level=options.log, format="%(levelname)s:%(name)s> %(message)s")
level=options.log,
format="%(levelname)s:%(name)s> %(message)s")
if options.jar == '': if options.jar == "":
vadere_home = os.getenv("VADERE_HOME") vadere_home = os.getenv("VADERE_HOME")
if vadere_home is None: if vadere_home is None:
logging.error("JAR file not given and VADERE_HOME not set.") logging.error("JAR file not given and VADERE_HOME not set.")
sys.exit(-1) sys.exit(-1)
else: else:
options.jar = os.path.join( options.jar = os.path.join(
vadere_home, 'VadereManager/target/vadere-server.jar') vadere_home, "VadereManager/target/vadere-server.jar"
)
wait_for_client(options) wait_for_client(options)
if __name__ == '__main__': if __name__ == "__main__":
main() main()
...@@ -11,6 +11,7 @@ import org.vadere.manager.server.VadereServer; ...@@ -11,6 +11,7 @@ import org.vadere.manager.server.VadereServer;
import org.vadere.manager.server.VadereSingleClientServer; import org.vadere.manager.server.VadereSingleClientServer;
import org.vadere.util.logging.Logger; import org.vadere.util.logging.Logger;
import java.net.InetAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
...@@ -29,7 +30,7 @@ public class Manager { ...@@ -29,7 +30,7 @@ public class Manager {
try { try {
ns = p.parseArgs(args); ns = p.parseArgs(args);
ServerSocket serverSocket = new ServerSocket(ns.getInt("port")); ServerSocket serverSocket = new ServerSocket(ns.getInt("port"), 50, InetAddress.getByName(ns.getString("bind")));
logger.infof("Start Server(%s) with Loglevel: %s", VadereServer.currentVersion.getVersionString(), logger.getLevel().toString()); logger.infof("Start Server(%s) with Loglevel: %s", VadereServer.currentVersion.getVersionString(), logger.getLevel().toString());
AbstractVadereServer server; AbstractVadereServer server;
if (ns.getBoolean("singleClient")) { if (ns.getBoolean("singleClient")) {
...@@ -83,6 +84,13 @@ public class Manager { ...@@ -83,6 +84,13 @@ public class Manager {
.dest("port") .dest("port")
.help("Set port number."); .help("Set port number.");
parser.addArgument("--bind")
.required(false)
.type(String.class)
.setDefault("127.0.0.1")
.dest("bind")
.help("Set ip number.");
// no action required call to Logger.setMainArguments(args) already configured Logger. // no action required call to Logger.setMainArguments(args) already configured Logger.
parser.addArgument("--clientNum") parser.addArgument("--clientNum")
.required(false) .required(false)
......
...@@ -66,11 +66,11 @@ public class TraCIEntryPoint implements Runnable { ...@@ -66,11 +66,11 @@ public class TraCIEntryPoint implements Runnable {
try { try {
ns = p.parseArgs(args); ns = p.parseArgs(args);
ExecutorService pool = Executors.newFixedThreadPool(ns.getInt("clientNum")); // ExecutorService pool = Executors.newFixedThreadPool(ns.getInt("clientNum"));
ServerSocket serverSocket = new ServerSocket(ns.getInt("port")); // ServerSocket serverSocket = new ServerSocket(ns.getInt("port"));
logger.infof("Start Server(%s) with Loglevel: %s", VadereServer.currentVersion.getVersionString(), logger.getLevel().toString()); // logger.infof("Start Server(%s) with Loglevel: %s", VadereServer.currentVersion.getVersionString(), logger.getLevel().toString());
Thread serverThread = new Thread(new VadereServer(serverSocket, pool, Paths.get(ns.getString("output-dir")), ns.getBoolean("guiMode"))); // Thread serverThread = new Thread(new VadereServer(serverSocket, pool, Paths.get(ns.getString("output-dir")), ns.getBoolean("guiMode")));
serverThread.start(); // serverThread.start();
TraCIEntryPoint entryPoint = new TraCIEntryPoint(ns.getInt("port"), ns.getString("bind"), ns.getInt("javaPort"), ns.getInt("pythonPort")); TraCIEntryPoint entryPoint = new TraCIEntryPoint(ns.getInt("port"), ns.getString("bind"), ns.getInt("javaPort"), ns.getInt("pythonPort"));
entryPoint.basePath = ns.getString("basePath"); entryPoint.basePath = ns.getString("basePath");
entryPoint.defaultScenario = ns.getString("defaultScenario"); entryPoint.defaultScenario = ns.getString("defaultScenario");
...@@ -102,7 +102,6 @@ public class TraCIEntryPoint implements Runnable { ...@@ -102,7 +102,6 @@ public class TraCIEntryPoint implements Runnable {
.setDefault("INFO") .setDefault("INFO")
.help("Set Log Level."); .help("Set Log Level.");
// no action required call to Logger.setMainArguments(args) already configured Logger.
parser.addArgument("--logname") parser.addArgument("--logname")
.required(false) .required(false)
.type(String.class) .type(String.class)
...@@ -110,16 +109,14 @@ public class TraCIEntryPoint implements Runnable { ...@@ -110,16 +109,14 @@ public class TraCIEntryPoint implements Runnable {
.help("Write log to given file."); .help("Write log to given file.");
// no action required call to Logger.setMainArguments(args) already configured Logger. parser.addArgument("--server-port")
parser.addArgument("--port")
.required(false) .required(false)
.type(Integer.class) .type(Integer.class)
.setDefault(9999) .setDefault(9998)
.dest("port") .dest("port")
.help("Set port number."); .help("Set port number.");
// no action required call to Logger.setMainArguments(args) already configured Logger.
parser.addArgument("--bind") parser.addArgument("--bind")
.required(false) .required(false)
.type(String.class) .type(String.class)
...@@ -128,7 +125,6 @@ public class TraCIEntryPoint implements Runnable { ...@@ -128,7 +125,6 @@ public class TraCIEntryPoint implements Runnable {
.help("Set ip number."); .help("Set ip number.");
// no action required call to Logger.setMainArguments(args) already configured Logger.
parser.addArgument("--java-port") parser.addArgument("--java-port")
.required(false) .required(false)
.type(Integer.class) .type(Integer.class)
...@@ -137,7 +133,6 @@ public class TraCIEntryPoint implements Runnable { ...@@ -137,7 +133,6 @@ public class TraCIEntryPoint implements Runnable {
.help("Set port number of gateway server for java."); .help("Set port number of gateway server for java.");
// no action required call to Logger.setMainArguments(args) already configured Logger.
parser.addArgument("--python-port") parser.addArgument("--python-port")
.required(false) .required(false)
.type(Integer.class) .type(Integer.class)
...@@ -145,29 +140,6 @@ public class TraCIEntryPoint implements Runnable { ...@@ -145,29 +140,6 @@ public class TraCIEntryPoint implements Runnable {
.dest("pythonPort") .dest("pythonPort")
.help("Set port number of gateway server for python."); .help("Set port number of gateway server for python.");
// no action required call to Logger.setMainArguments(args) already configured Logger.
parser.addArgument("--clientNum")
.required(false)
.type(Integer.class)
.setDefault(4)
.dest("clientNum")
.help("Set number of clients to manager. Important: Each client has a separate simulation. No communication between clients");
// boolean switch to tell server to start in gui mode.
parser.addArgument("--gui-mode")
.required(false)
.action(Arguments.storeTrue())
.type(Boolean.class)
.dest("guiMode")
.help("Start server with GUI support. If a scenario is received show the current state of the scenario");
parser.addArgument("--output-dir", "-o")
.required(false)
.setDefault("./vadere-server-output")
.dest("output-dir") // set name in namespace
.type(String.class)
.help("Supply output directory as base directory for received scenarios.");
parser.addArgument("--base-path") parser.addArgument("--base-path")
.required(true) .required(true)
.dest("basePath") // set name in namespace .dest("basePath") // set name in namespace
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment