Commit de3438a0 authored by Stefan Schuhbaeck's avatar Stefan Schuhbaeck
Browse files

add vadere-launcher.py to launch multiple vadere server instances

using the same port.
parent 9bdcad8a
#!/usr/bin/python
import argparse
import socket
import subprocess
import os
import sys
import time
import threading
import signal
import select
import logging
"""
vadere-lauchner.py dispatches each client to a new (fresh) instance of a vadere server to ensure that no side effect
of multiple runs within one java VM will influence the simulations.
Script is based on (sumo-launchd.py)[https://github.com/sommer/veins/blob/master/sumo-launchd.py]
"""
class VadereRunner(threading.Thread):
def __init__(self, client_socket, server_port, options):
threading.Thread.__init__(self)
self.client_socket : socket.socket = client_socket
self.server_port = server_port
self.options = options
self.stop = False
self.vadere = ''
self.log = logging.getLogger(f"vadere-runner-{self.client_socket.getpeername()[1]}:{server_port}")
def stop_tread(self):
self.stop = True
def process_state(self):
return f"subprocess: pid: {self.vadere.pid} returncode: {self.returncode}"
def forward_connection(self, server_socket: socket.socket):
self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.log.debug(f"start forwarding {self.client_socket.getpeername()[0]}:{self.client_socket.getpeername()[1]} "
f"<----->"
f" {server_socket.getpeername()[0]}:{server_socket.getpeername()[1]}")
while not self.stop:
(read, write, exp) = select.select([self.client_socket, server_socket], [], [self.client_socket, server_socket],
1)
if len(exp) != 0:
self.stop = True
if self.client_socket in read:
try:
data = self.client_socket.recv(65535)
if data == b'':
self.stop = True
except socket.error:
self.stop = True
finally:
self.log.debug(f"client>server: {len(data)} Bytes...")
server_socket.send(data)
if server_socket in read:
try:
data = server_socket.recv(65535)
if data == b'':
self.stop = True
except socket.error:
self.stop = True
finally:
self.log.debug(f"server>client: {len(data)} Bytes...")
self.client_socket.send(data)
self.log.debug(f"stop forwarding {self.client_socket.getpeername()[0]}:{self.client_socket.getpeername()[1]} "
f"<----->"
f" {server_socket.getpeername()[0]}:{server_socket.getpeername()[1]}")
def run(self):
cmd = ['java', '-jar', self.options.jar, '--port', str(self.server_port), '--single-client']
if self.options.gui:
cmd.append('--gui-mode')
try:
if self.options.vadere_log:
log_file_name = f"vader-port-{self.client_socket.getpeername()[1]}-{self.server_port}.log"
else:
log_file_name = os.devnull
log_file = open(log_file_name, 'w')
self.vadere = subprocess.Popen(cmd, cwd=os.path.curdir, shell=False,
stdin=None,
stdout=log_file,
stderr=log_file)
connected = False
tries = 1
while not connected:
try:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.connect(('127.0.0.1', self.server_port))
connected = True
except socket.error as e:
self.log.debug(f"connection attempt {tries} : {e}")
if tries > 10:
raise
time.sleep(tries * .25)
tries += 1
self.log.debug(f"connection attempt {tries} : OK")
self.log.info(f"started vadere server process (PID:{self.vadere.pid}) at port {self.server_port}")
self.forward_connection(server_socket)
self.client_socket.close()
server_socket.close()
self.vadere.wait(timeout=2)
if self.vadere.returncode is None:
os.kill(self.vadere.pid, signal.SIGKLL)
time.sleep(0.5)
if self.vadere.returncode is None:
self.log.error(f"vadere server process (PID:{self.vadere.pid}) still not dead.")
raise
if log_file_name == os.devnull:
self.log.info(f"vadere server process (PID:{self.vadere.pid}) exited with returncode: {self.vadere.returncode}")
else:
self.log.info(f"vadere server process (PID:{self.vadere.pid}) exited with returncode: {self.vadere.returncode} log for details: {os.path.abspath(log_file_name)} ")
finally:
log_file.close()
def get_port(bind):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.bind((bind, 0))
_, port = sock.getsockname()
sock.close()
return port
def wait_for_client(options):
threads = []
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((options.bind, options.port))
s.listen(5)
logging.info(f"vadere launcher listening on port {options.port} ...")
try:
while True:
conn, addr = s.accept()
port = get_port(options.bind)
logging.info(f"client connected start vadere runner at port {port}...")
thread = VadereRunner(conn, port, options)
thread.start()
threads.append(thread)
except SystemExit:
logging.info("received kill signal shutting down...")
except KeyboardInterrupt:
logging.info("interrupt shutting down...")
finally:
for t in threads:
if t.isAlive():
t.stop_tread()
t.join(2)
if t.isAlive():
print(f"tread not stopping. {t.process_state()}")
s.close()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", dest="port", default=9998, type=int, help="listen on port for incoming requests. [default= 9998]")
parser.add_argument("-b", "--bind", dest="bind", default='127.0.0.1', help="bind to address. [default= 127.0.0.1]")
parser.add_argument("-g", "--gui-mode", dest="gui", default=False, action='store_true',
help="show graphical userinterface if client connects. [default= false]")
parser.add_argument("--jar", dest="jar", default='', help="JAR file to execute. If not given use default location.")
parser.add_argument("--vadere-log", dest="vadere_log", default=False, action='store_true',
help="If set create log file with client:server port in file name. [default= False]")
parser.add_argument("--log", dest="log", default=logging.INFO, choices=list(logging._nameToLevel.keys()), help="write log files for each vadere"
" instance. [default= INFO]")
options = parser.parse_args()
logging.basicConfig(level=options.log, format="%(levelname)s:%(name)s> %(message)s")
if options.jar == '':
vadere_home = os.getenv("VADERE_HOME")
if vadere_home is None:
logging.error("JAR file not given and VADERE_HOME not set.")
sys.exit(-1)
else:
options.jar = os.path.join(vadere_home, 'VadereManager/target/vadere-server.jar')
wait_for_client(options)
if __name__ == '__main__':
main()
...@@ -2,10 +2,14 @@ package org.vadere.manager; ...@@ -2,10 +2,14 @@ package org.vadere.manager;
import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments; import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentAction;
import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.internal.HelpScreenException; import net.sourceforge.argparse4j.internal.HelpScreenException;
import org.vadere.manager.server.AbstractVadereServer;
import org.vadere.manager.server.VadereServer;
import org.vadere.manager.server.VadereSingleClientServer;
import org.vadere.util.logging.Logger; import org.vadere.util.logging.Logger;
import java.net.ServerSocket; import java.net.ServerSocket;
...@@ -25,10 +29,16 @@ public class Manager { ...@@ -25,10 +29,16 @@ public class Manager {
try { try {
ns = p.parseArgs(args); ns = p.parseArgs(args);
ExecutorService pool = Executors.newFixedThreadPool(ns.getInt("clientNum"));
ServerSocket serverSocket = new ServerSocket(ns.getInt("port")); ServerSocket serverSocket = new ServerSocket(ns.getInt("port"));
logger.infof("Start Server(%s) with Loglevel: %s", VadereServer.currentVersion.getVersionString(), logger.getLevel().toString()); logger.infof("Start Server(%s) with Loglevel: %s", VadereServer.currentVersion.getVersionString(), logger.getLevel().toString());
VadereServer server = new VadereServer(serverSocket, pool, Paths.get(ns.getString("output-dir")), ns.getBoolean("guiMode")); AbstractVadereServer server;
if (ns.getBoolean("singleClient")){
server = new VadereSingleClientServer(serverSocket, Paths.get(ns.getString("output-dir")), ns.getBoolean("guiMode"));
} else {
ExecutorService pool = Executors.newFixedThreadPool(ns.getInt("clientNum"));
server = new VadereServer(serverSocket, pool, Paths.get(ns.getString("output-dir")), ns.getBoolean("guiMode"));
}
server.run(); server.run();
} catch (HelpScreenException ignored) { } catch (HelpScreenException ignored) {
...@@ -82,6 +92,13 @@ public class Manager { ...@@ -82,6 +92,13 @@ public class Manager {
.dest("clientNum") .dest("clientNum")
.help("Set number of clients to manager. Important: Each client has a separate simulation. No communication between clients"); .help("Set number of clients to manager. Important: Each client has a separate simulation. No communication between clients");
parser.addArgument("--single-client")
.required(false)
.action(Arguments.storeTrue())
.type(Boolean.class)
.dest("singleClient")
.help("Use server which only accepts one client and terminates after one simulation run.");
// boolean switch to tell server to start in gui mode. // boolean switch to tell server to start in gui mode.
parser.addArgument("--gui-mode") parser.addArgument("--gui-mode")
.required(false) .required(false)
......
package org.vadere.manager.server;
import org.vadere.manager.traci.TraCIVersion;
import org.vadere.util.logging.Logger;
import java.net.ServerSocket;
import java.nio.file.Path;
public abstract class AbstractVadereServer implements Runnable{
protected static Logger logger = Logger.getLogger(VadereServer.class);
public static int SUPPORTED_TRACI_VERSION = 20;
// public static int SUPPORTED_TRACI_VERSION = 1;
public static String SUPPORTED_TRACI_VERSION_STRING = "Vadere Simulator. Supports subset of commands based von TraCI Version " + SUPPORTED_TRACI_VERSION;
public static TraCIVersion currentVersion = TraCIVersion.V20_0_2;
protected final ServerSocket serverSocket;
protected final Path baseDir;
protected final boolean guiSupport;
public AbstractVadereServer(ServerSocket serverSocket, Path baseDir, boolean guiSupport) {
this.serverSocket = serverSocket;
this.baseDir = baseDir;
this.guiSupport = guiSupport;
}
}
package org.vadere.manager; package org.vadere.manager.server;
import org.vadere.manager.ClientHandler;
import org.vadere.manager.TraCISocket;
import org.vadere.manager.traci.TraCIVersion; import org.vadere.manager.traci.TraCIVersion;
import org.vadere.util.logging.Logger; import org.vadere.util.logging.Logger;
...@@ -10,28 +12,13 @@ import java.nio.file.Path; ...@@ -10,28 +12,13 @@ import java.nio.file.Path;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** public class VadereServer extends AbstractVadereServer{
* //todo comment
*/
public class VadereServer implements Runnable{
private static Logger logger = Logger.getLogger(VadereServer.class);
public static int SUPPORTED_TRACI_VERSION = 20;
// public static int SUPPORTED_TRACI_VERSION = 1;
public static String SUPPORTED_TRACI_VERSION_STRING = "Vadere Simulator. Supports subset of commands based von TraCI Version " + SUPPORTED_TRACI_VERSION;
public static TraCIVersion currentVersion = TraCIVersion.V20_0_2;
private final ServerSocket serverSocket;
private final ExecutorService handlerPool; private final ExecutorService handlerPool;
private final Path baseDir;
private final boolean guiSupport;
public VadereServer(ServerSocket serverSocket, ExecutorService handlerPool, Path baseDir, boolean guiSupport) { public VadereServer(ServerSocket serverSocket, ExecutorService handlerPool, Path baseDir, boolean guiSupport) {
this.serverSocket = serverSocket; super(serverSocket, baseDir, guiSupport);
this.handlerPool = handlerPool; this.handlerPool = handlerPool;
this.baseDir = baseDir;
this.guiSupport = guiSupport;
} }
@Override @Override
...@@ -40,7 +27,6 @@ public class VadereServer implements Runnable{ ...@@ -40,7 +27,6 @@ public class VadereServer implements Runnable{
logger.infof("listening on port %d... (gui-mode: %s)", serverSocket.getLocalPort(), Boolean.toString(guiSupport)); logger.infof("listening on port %d... (gui-mode: %s)", serverSocket.getLocalPort(), Boolean.toString(guiSupport));
while (true) { while (true) {
Socket clientSocket = serverSocket.accept(); Socket clientSocket = serverSocket.accept();
handlerPool.execute(new ClientHandler(serverSocket, new TraCISocket(clientSocket), baseDir, guiSupport)); handlerPool.execute(new ClientHandler(serverSocket, new TraCISocket(clientSocket), baseDir, guiSupport));
} }
} catch (IOException e) { } catch (IOException e) {
......
package org.vadere.manager.server;
import org.vadere.manager.ClientHandler;
import org.vadere.manager.TraCISocket;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.file.Path;
/**
* Open socket and wait for one client. After the simulation is finished do not accept new scenario files and stop
* gracefully.
*/
public class VadereSingleClientServer extends AbstractVadereServer {
public VadereSingleClientServer(ServerSocket serverSocket, Path baseDir, boolean guiSupport) {
super(serverSocket, baseDir, guiSupport);
}
@Override
public void run() {
try {
logger.infof("listening on port %d... (gui-mode: %s) Single Simulation", serverSocket.getLocalPort(), Boolean.toString(guiSupport));
Socket clientSocket = serverSocket.accept();
Thread t = new Thread(new ClientHandler(serverSocket, new TraCISocket(clientSocket), baseDir, guiSupport));
t.start();
t.join();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
logger.warn("Interrupt Vadere Server");
} finally {
logger.info("Shutdown Vadere Server ...");
if (!serverSocket.isClosed()) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
...@@ -65,7 +65,7 @@ public abstract class CommandHandler <VAR extends Enum> { ...@@ -65,7 +65,7 @@ public abstract class CommandHandler <VAR extends Enum> {
protected void init(Class<? extends Annotation> singleAnnotation, Class<? extends Annotation> multAnnotation){ protected void init(Class<? extends Annotation> singleAnnotation, Class<? extends Annotation> multAnnotation){
for (Method m : this.getClass().getDeclaredMethods()){ for (Method m : this.getClass().getDeclaredMethods()){
logger.infof(m.getName()); logger.tracef(m.getName());
if (m.isAnnotationPresent(singleAnnotation)){ if (m.isAnnotationPresent(singleAnnotation)){
init_HandlerSingle(m); init_HandlerSingle(m);
} }
......
...@@ -3,7 +3,7 @@ package org.vadere.manager.traci.commandHandler; ...@@ -3,7 +3,7 @@ package org.vadere.manager.traci.commandHandler;
import org.vadere.manager.RemoteManager; import org.vadere.manager.RemoteManager;
import org.vadere.manager.Subscription; import org.vadere.manager.Subscription;
import org.vadere.manager.VadereServer; import org.vadere.manager.server.VadereServer;
import org.vadere.manager.traci.TraCIVersion; import org.vadere.manager.traci.TraCIVersion;
import org.vadere.manager.traci.commandHandler.annotation.ControlHandler; import org.vadere.manager.traci.commandHandler.annotation.ControlHandler;
import org.vadere.manager.traci.commandHandler.annotation.ControlHandlers; import org.vadere.manager.traci.commandHandler.annotation.ControlHandlers;
......
package org.vadere.manager.traci.commands; package org.vadere.manager.traci.commands;
import org.vadere.manager.TraCIException; import org.vadere.manager.TraCIException;
import org.vadere.manager.VadereServer; import org.vadere.manager.server.VadereServer;
import org.vadere.manager.traci.CmdType; import org.vadere.manager.traci.CmdType;
import org.vadere.manager.traci.TraCICmd; import org.vadere.manager.traci.TraCICmd;
import org.vadere.manager.traci.TraCIVersion; import org.vadere.manager.traci.TraCIVersion;
......
...@@ -3,7 +3,10 @@ package org.vadere.manager.traci.writer; ...@@ -3,7 +3,10 @@ package org.vadere.manager.traci.writer;
import org.vadere.manager.TraCIException; import org.vadere.manager.TraCIException;
import org.vadere.manager.traci.TraCICmd; import org.vadere.manager.traci.TraCICmd;
import org.vadere.manager.traci.TraCIDataType; import org.vadere.manager.traci.TraCIDataType;
import org.vadere.manager.traci.commands.TraCICommand;
import org.vadere.manager.traci.commands.control.TraCIGetVersionCommand; import org.vadere.manager.traci.commands.control.TraCIGetVersionCommand;
import org.vadere.manager.traci.reader.TraCIByteBuffer;
import org.vadere.manager.traci.reader.TraCIPacketBuffer;
import org.vadere.manager.traci.respons.StatusResponse; import org.vadere.manager.traci.respons.StatusResponse;
import org.vadere.manager.traci.respons.TraCIGetResponse; import org.vadere.manager.traci.respons.TraCIGetResponse;
import org.vadere.manager.traci.respons.TraCIGetVersionResponse; import org.vadere.manager.traci.respons.TraCIGetVersionResponse;
...@@ -12,6 +15,8 @@ import org.vadere.manager.traci.respons.TraCIStatusResponse; ...@@ -12,6 +15,8 @@ import org.vadere.manager.traci.respons.TraCIStatusResponse;
import org.vadere.manager.traci.respons.TraCISubscriptionResponse; import org.vadere.manager.traci.respons.TraCISubscriptionResponse;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/** /**
* //todo comment * //todo comment
...@@ -95,7 +100,17 @@ public class TraCIPacket extends ByteArrayOutputStreamTraCIWriter{ ...@@ -95,7 +100,17 @@ public class TraCIPacket extends ByteArrayOutputStreamTraCIWriter{
} else { } else {
return asByteArray(); return asByteArray();
} }
}
public List<TraCICommand> getCommands(){
ByteBuffer buf = ByteBuffer.wrap(send());
buf.getInt(); // remove packet length.
TraCIPacketBuffer traciBuf = TraCIPacketBuffer.wrap(buf);
ArrayList<TraCICommand> commands = new ArrayList<>();
while (buf.hasRemaining()){
commands.add(traciBuf.nextCommand());
}
return commands;
} }
private TraCIWriter getCmdBuilder(){ private TraCIWriter getCmdBuilder(){
......
package org.vadere.manager;
import org.mockito.Mockito;
import org.vadere.manager.traci.commandHandler.StateAccessHandler;
import org.vadere.simulator.control.simulation.SimulationState;
import java.nio.file.Paths;
import static org.mockito.Mockito.mock;
/**
* Simplified version of RemoteManager used to test ONLY {@link SimulationState} access during calls to
* {@link #accessState(StateAccessHandler)} by some CommandHandler. Instantiate new anonymous classes of
* this abstract base and implement {@link #mockIt()} to mock the #SimulationState for the {@link #accessState(StateAccessHandler)}
* call.
*/
public abstract class TestRemoteManager extends RemoteManager {
protected SimulationState simState;
public TestRemoteManager() {
super(Paths.get(""), false);
this.simState = mock(SimulationState.class, Mockito.RETURNS_DEEP_STUBS);
mockIt();
}
@Override
public boolean accessState(StateAccessHandler stateAccessHandler) {
stateAccessHandler.execute(this, simState);
return true;
}
protected abstract void mockIt();
}
package org.vadere.manager.traci.commandHandler;
import org.hamcrest.core.IsEqual;
import org.vadere.manager.traci.commands.TraCICommand;
import org.vadere.manager.traci.commands.TraCIGetCommand;
import org.vadere.manager.traci.respons.TraCIStatusResponse;
import org.vadere.manager.traci.writer.TraCIPacket;
import java.util.List;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
public class CommandHandlerTest {
TraCICommand getFirstCommand(TraCIPacket packet){
List<TraCICommand> cmds = packet.getCommands();
assertThat("expected single command in Package",cmds.size(), IsEqual.equalTo(1));
return cmds.get(0);
}
public void checkGET_OK(TraCICommand cmd){
assertThat("command must be a TraCIGetCommand",cmd, instanceOf(TraCIGetCommand.class));
TraCIGetCommand getCmd = (TraCIGetCommand)cmd;
assertThat("Response must be OK",getCmd.getResponse().getStatusResponse().getResponse(), equalTo(TraCIStatusResponse.OK));
}
public void checkGET_Err(TraCICommand cmd){
assertThat("command must be a TraCIGetCommand",cmd, instanceOf(TraCIGetCommand.class));
TraCIGetCommand getCmd = (TraCIGetCommand)cmd;
assertThat("Response must be Err",getCmd.getResponse().getStatusResponse().getResponse(), equalTo(TraCIStatusResponse.ERR));
}
public void checkElementIdentifier(TraCIGetCommand cmd, String identifer){
assertThat(cmd.getResponse().getElementIdentifier(), equalTo(identifer));