Commit abf2779a authored by Christina Maria Mayr's avatar Christina Maria Mayr
Browse files

Merge branch 'fix_write_bug' into 'master'

Fix write bug

See merge request !157
parents e1a54e49 3737f498
Pipeline #527631 canceled with stages
in 10 minutes and 15 seconds
......@@ -77,7 +77,8 @@ public class ClientHandler implements Runnable {
}
}
} finally {
}
finally {
traCISocket.close();
remoteManager.stopSimulationIfRunning();
cmdExecutor = null;
......
......@@ -40,6 +40,7 @@ public class Manager {
server = new VadereServer(serverSocket, pool, Paths.get(ns.getString("output-dir")), ns.getBoolean("guiMode"), ns.getBoolean("trace"));
}
server.run();
logger.info("Run finished.");
} catch (HelpScreenException ignored) {
......@@ -47,6 +48,7 @@ public class Manager {
e.printStackTrace();
System.exit(-1);
}
logger.info("Close Vadere.");
}
private static VadereArgumentParser createArgumentParser() {
......
package org.vadere.manager;
import org.apache.commons.collections.CollectionUtils;
import org.vadere.gui.onlinevisualization.OnlineVisualization;
import org.vadere.manager.traci.commandHandler.StateAccessHandler;
import org.vadere.manager.traci.compound.object.SimulationCfg;
import org.vadere.simulator.control.simulation.SimThreadState;
import org.vadere.simulator.control.simulation.SimulationState;
import org.vadere.simulator.entrypoints.ScenarioFactory;
import org.vadere.simulator.projects.RunnableFinishedListener;
import org.vadere.simulator.projects.Scenario;
import org.vadere.simulator.projects.dataprocessing.outputfile.OutputFile;
import org.vadere.simulator.utils.cache.ScenarioCache;
import org.vadere.state.traci.TraCIException;
import org.vadere.state.traci.TraCIExceptionInternal;
......@@ -15,11 +18,16 @@ import org.vadere.util.logging.Logger;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
/**
* This class acts as interface between the TraCI handling and the actual simulation. All
......@@ -87,6 +95,10 @@ public class RemoteManager implements RunnableFinishedListener {
currentSimulationRun = new RemoteScenarioRun(scenario, outputDir, this, scenarioPath, scenarioCache);
}
public SimThreadState getCurrentSimThreadState(){
return currentSimulationRun.getCurrentSimThreadState();
}
public void loadScenario(String scenarioString) {
loadScenario(scenarioString, null);
}
......@@ -110,8 +122,9 @@ public class RemoteManager implements RunnableFinishedListener {
return scenarioCache;
}
public boolean stopSimulationIfRunning() {
public boolean stopSimulationIfRunning(){
if (currentSimulationThread != null && currentSimulationThread.isAlive()) {
logger.errorf("kill simulation thread");
currentSimulationThread.interrupt();
return true;
}
......@@ -130,7 +143,6 @@ public class RemoteManager implements RunnableFinishedListener {
public boolean accessState(StateAccessHandler stateAccessHandler) {
if (currentSimulationRun == null)
return false;
currentSimulationRun.accessState(this, stateAccessHandler);
return true;
......@@ -149,6 +161,14 @@ public class RemoteManager implements RunnableFinishedListener {
return true;
}
public void notifySimulationThread(){
currentSimulationRun.notifySimulationThread();
}
public void waitForSimulationEnd(){
currentSimulationRun.waitForSimulationEnd();
}
public double getSimulationStoppedEarlyAtTime(){
return currentSimulationRun.getSimulationStoppedEarlyAtTime();
}
......@@ -199,4 +219,6 @@ public class RemoteManager implements RunnableFinishedListener {
public void setSimCfg(SimulationCfg simCfg) {
this.simCfg = simCfg;
}
}
......@@ -3,6 +3,7 @@ package org.vadere.manager;
import org.vadere.manager.traci.commandHandler.StateAccessHandler;
import org.vadere.simulator.control.simulation.RemoteRunListener;
import org.vadere.simulator.control.simulation.ScenarioRun;
import org.vadere.simulator.control.simulation.SimThreadState;
import org.vadere.simulator.projects.RunnableFinishedListener;
import org.vadere.simulator.projects.Scenario;
import org.vadere.simulator.utils.cache.ScenarioCache;
......@@ -14,10 +15,9 @@ import java.util.concurrent.locks.ReentrantLock;
public class RemoteScenarioRun extends ScenarioRun implements RemoteRunListener {
private final Object waitForLoopEnd;
private final Object waitForSimStepLoopEnd;
private final ReentrantLock lock;
private List<Subscription> subscriptions;
private boolean lastSimulationStep;
private double simulationStoppedEarlyAtTime;
......@@ -25,9 +25,8 @@ public class RemoteScenarioRun extends ScenarioRun implements RemoteRunListener
// overwriteTimestampSetting. In RemoteScenarioRun the caller defines where the output should go.
super(scenario, outputDir.toString(), true,scenarioFinishedListener, scenarioPath, scenarioCache);
this.singleStepMode = true;
this.waitForLoopEnd = new Object();
this.waitForSimStepLoopEnd = new Object();
this.lock = new ReentrantLock();
this.lastSimulationStep = false;
this.simulationStoppedEarlyAtTime = Double.MAX_VALUE;
addRemoteManagerListener(this);
}
......@@ -35,11 +34,14 @@ public class RemoteScenarioRun extends ScenarioRun implements RemoteRunListener
synchronized public boolean accessState(RemoteManager remoteManager, StateAccessHandler stateAccessHandler) {
try {
if (!isWaitForSimCommand()) {
synchronized (waitForLoopEnd) {
waitForLoopEnd.wait();
synchronized (waitForSimStepLoopEnd) {
waitForSimStepLoopEnd.wait();
}
}
lock.lock();
if (!checkValidThreadState()){
throw new TraCIException("Invalid access to simulation state. Simulation thread in state %s", simulation.getThreadState().name());
}
stateAccessHandler.execute(remoteManager, getSimulationState());
} catch (InterruptedException e) {
......@@ -51,6 +53,25 @@ public class RemoteScenarioRun extends ScenarioRun implements RemoteRunListener
return true;
}
synchronized public SimThreadState getCurrentSimThreadState(){
if (simulation == null){
return SimThreadState.INIT;
}
return simulation.getThreadState();
}
synchronized public void waitForSimulationEnd(){
try {
synchronized (waitForSimStepLoopEnd) {
waitForSimStepLoopEnd.wait();
}
} catch (InterruptedException e) {
logger.errorf("Interrupted while waiting for simulation thread to finish post loop");
}
}
synchronized public void nextStep(double simTime) {
try {
lock.lock();
......@@ -61,18 +82,22 @@ public class RemoteScenarioRun extends ScenarioRun implements RemoteRunListener
}
}
synchronized public void notifySimulationThread(){
nextStep(-1);
}
@Override
public void simulationStepFinishedListener() {
synchronized (waitForLoopEnd) {
waitForLoopEnd.notify();
public void notifySimStepListener() {
synchronized (waitForSimStepLoopEnd) {
waitForSimStepLoopEnd.notify();
}
}
@Override
public void lastSimulationStepFinishedListener() {
synchronized (waitForLoopEnd) {
lastSimulationStep = true;
waitForLoopEnd.notify();
public void notifySimulationEndListener() {
synchronized (waitForSimStepLoopEnd) {
waitForSimStepLoopEnd.notify();
}
}
......@@ -81,10 +106,6 @@ public class RemoteScenarioRun extends ScenarioRun implements RemoteRunListener
simulationStoppedEarlyAtTime = time;
}
public boolean isLastSimulationStep() {
return lastSimulationStep;
}
public double getSimulationStoppedEarlyAtTime() {
return simulationStoppedEarlyAtTime;
}
......
......@@ -36,9 +36,9 @@ public class VadereSingleClientServer extends AbstractVadereServer {
}
}
Thread t = new Thread(handler);
t.start();
t.join();
Thread traciThread = new Thread(handler);
traciThread.start();
traciThread.join();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
......
......@@ -12,6 +12,7 @@ import org.vadere.manager.traci.commandHandler.variables.ControlVar;
import org.vadere.manager.traci.commands.TraCICommand;
import org.vadere.manager.traci.commands.control.*;
import org.vadere.manager.traci.response.*;
import org.vadere.simulator.control.simulation.SimThreadState;
import org.vadere.util.logging.Logger;
import java.lang.reflect.Method;
......@@ -86,8 +87,13 @@ public class ControlCommandHandler extends CommandHandler<ControlVar> {
logger.debugf("%s: Simulate until=%f", TraCICmd.SIM_STEP.name(), cmd.getTargetTime());
if (!remoteManager.nextStep(cmd.getTargetTime())) {
//simulation finished;
// Simulation finished. Wait until simulation thread finishes with post loop before
// telling traci client
if (remoteManager.getCurrentSimThreadState().equals(SimThreadState.MAIN_LOOP)){
remoteManager.waitForSimulationEnd();
}
cmd.setResponse(TraCISimTimeResponse.simEndReached());
remoteManager.notifySimulationThread();
return cmd;
}
// execute all
......@@ -114,7 +120,13 @@ public class ControlCommandHandler extends CommandHandler<ControlVar> {
if (remoteManager.getSimulationStoppedEarlyAtTime() != Double.MAX_VALUE){
double stoppedAtTime = remoteManager.getSimulationStoppedEarlyAtTime();
logger.infof("Stop simulation at %f. Inform TraCI client with simEndReach Response.", stoppedAtTime);
// Simulation finished. Wait until simulation thread finishes with post loop before
// telling traci client
if (remoteManager.getCurrentSimThreadState().equals(SimThreadState.MAIN_LOOP)){
remoteManager.waitForSimulationEnd();
}
cmd.setResponse(TraCISimTimeResponse.simEndReached());
remoteManager.notifySimulationThread();
}
logger.debug("process_simStep done.");
......
......@@ -6,9 +6,9 @@ public interface RemoteRunListener {
* Notify RemoteManger that the simulation reached end of loop and the {@link SimulationState}
* is ready to be read/changed. The simulation thread will wait after the call is finished.
*/
void simulationStepFinishedListener();
void notifySimStepListener();
void lastSimulationStepFinishedListener();
void notifySimulationEndListener();
/**
* Notify RemoteManger about early shutdown. This is used to gracefully stop TraCI connection.
......
......@@ -202,8 +202,10 @@ public class ScenarioRun implements Runnable {
throw new RuntimeException("Simulation failed.", e);
} finally {
simulationResult.stopTime();
logger.info("Simulation run finished.");
doAfterSimulation();
VadereContext.remove(scenarioStore.getTopography().getContextId());
}
}
......@@ -265,6 +267,8 @@ public class ScenarioRun implements Runnable {
return simulation != null && simulation.isRunning();
}
public boolean isScenarioInSingleStepMode(){
return simulation != null && simulation.isSingleStepMode();
}
......@@ -278,6 +282,13 @@ public class ScenarioRun implements Runnable {
return simulation != null && simulation.isWaitForSimCommand();
}
public boolean checkValidThreadState(){
if (simulation == null){
return false;
}
return SimThreadState.MAIN_LOOP.equals(simulation.getThreadState()) || SimThreadState.POST_LOOP.equals(simulation.getThreadState());
}
public void nextSimCommand(double simulateUntilInSec){
if ( !simulation.isSingleStepMode())
......
package org.vadere.simulator.control.simulation;
public enum SimThreadState {
INIT,
PRE_LOOP,
MAIN_LOOP,
POST_LOOP,
FINISHED
}
......@@ -52,6 +52,7 @@ public class Simulation implements ControllerProvider{
private final List<RemoteRunListener> remoteRunListeners;
private List<Model> models;
private SimThreadState threadState = SimThreadState.INIT;
private boolean isRunSimulation = false;
private boolean isPaused = false;
private boolean singleStepMode; // constructor
......@@ -239,10 +240,22 @@ public class Simulation implements ControllerProvider{
// Models and processors require the latest topography for post processing.
// Therefore, reset topography afterwards (I guess resetting the topography was introduced by Stefan).
topographyController.postLoop(this.simTimeInSec);
// Notify remoteManger that simulation ended. If a command waited for the next
// simulation step notify it and execute command with current SimulationState.
setWaitForSimCommand(true); // its save to read the state now.
remoteRunListeners.forEach(RemoteRunListener::lastSimulationStepFinishedListener);
if (attributesSimulation.isWriteSimulationData()) {
processorManager.writeOutput();
}
logger.info("Finished writing all output files");
// Notify remoteManger that simulation ended.
logger.info("Post-loop: before waitForTraci");
if (singleStepMode) {
synchronized (this){
waitForTraci();
}
}
logger.info("Post-loop: finished.");
}
/**
......@@ -250,14 +263,17 @@ public class Simulation implements ControllerProvider{
*/
public void run() {
try {
if (attributesSimulation.isWriteSimulationData()) {
processorManager.setMainModel(mainModel);
processorManager.initOutputFiles();
}
threadState = SimThreadState.PRE_LOOP;
preLoop();
logger.info("preLoop finished.");
threadState = SimThreadState.MAIN_LOOP;
while (isRunSimulation) {
synchronized (this) {
while (isPaused) {
......@@ -319,19 +335,7 @@ public class Simulation implements ControllerProvider{
boolean timeReached = Math.round(simTimeInSec) >= Math.round(simulateUntilInSec);
if (timeReached || simulateUntilInSec == -1){
logger.debugf("Simulated until: %.4f", simTimeInSec);
setWaitForSimCommand(true);
remoteRunListeners.forEach(RemoteRunListener::simulationStepFinishedListener);
while (waitForSimCommand){
logger.debugf("wait for next SimCommand...");
try {
wait();
} catch (InterruptedException e) {
waitForSimCommand = false;
Thread.currentThread().interrupt();
logger.warn("interrupt while waitForSimCommand");
}
}
waitForTraci();
}
}
}
......@@ -348,18 +352,45 @@ public class Simulation implements ControllerProvider{
if (Thread.interrupted()) {
isRunSimulation = false;
simulationResult.setState("Simulation interrupted");
simulationResult.setState("Simulation interrupted.");
logger.info("Simulation interrupted.");
}
}
} finally {
// this is necessary to free the resources (files), the SimulationWriter and processor are writing in!
// Always execute postLoop
isRunSimulation = false;
threadState = SimThreadState.POST_LOOP;
postLoop();
threadState = SimThreadState.FINISHED;
}
}
if (attributesSimulation.isWriteSimulationData()) {
processorManager.writeOutput();
private void waitForTraci() {
setWaitForSimCommand(true);
if (threadState.equals(SimThreadState.MAIN_LOOP)){
remoteRunListeners.forEach(RemoteRunListener::notifySimStepListener);
}
else if (threadState.equals(SimThreadState.POST_LOOP))
{
remoteRunListeners.forEach(RemoteRunListener::notifySimulationEndListener);
}
else{
logger.errorf("Wrong thread state: %s ", threadState);
return;
}
while (isWaitForSimCommand()){
logger.debugf("wait for next SimCommand...");
try {
wait();
} catch (InterruptedException e) {
setWaitForSimCommand(false);
Thread.currentThread().interrupt();
logger.warn("interrupt while waitForSimCommand");
}
logger.info("Finished writing all output files");
}
}
......@@ -462,6 +493,10 @@ public class Simulation implements ControllerProvider{
return isRunSimulation && !isPaused() && !isWaitForSimCommand();
}
public synchronized SimThreadState getThreadState(){
return threadState;
}
synchronized boolean isSingleStepMode(){ return singleStepMode;}
void setSingleStepMode(boolean singleStepMode) {
......@@ -584,4 +619,5 @@ public class Simulation implements ControllerProvider{
public ProcessorManager getProcessorManager() {
return processorManager;
}
}
\ No newline at end of file
......@@ -7,6 +7,7 @@ import org.vadere.simulator.projects.dataprocessing.datakey.DataKey;
import org.vadere.simulator.projects.dataprocessing.processor.DataProcessor;
import org.vadere.simulator.projects.dataprocessing.writer.VadereWriter;
import org.vadere.simulator.projects.dataprocessing.writer.VadereWriterFactory;
import org.vadere.util.logging.Logger;
import java.io.File;
import java.io.IOException;
......@@ -33,6 +34,7 @@ import java.util.stream.Stream;
public abstract class OutputFile<K extends DataKey<K>> {
private static Logger logger = Logger.getLogger(OutputFile.class);
//Note: Only header information from data keys are written in this, therefore, these are the indices
// Each processor attached to this processor itself attaches also headers, but they are not listed in this
......@@ -104,6 +106,7 @@ public abstract class OutputFile<K extends DataKey<K>> {
public void write() {
if (!isEmpty()) {
logger.info("Absolute file name" + absoluteFileName);
try (VadereWriter out = writerFactory.create(absoluteFileName)) {
this.writer = out;
......@@ -120,6 +123,7 @@ public abstract class OutputFile<K extends DataKey<K>> {
out.flush();
} catch (IOException e) {
logger.error(e.getMessage());
throw new UncheckedIOException(e);
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment