24.09., 9:00 - 11:00: Due to updates GitLab will be unavailable for some minutes between 09:00 and 11:00.

qoi.py 6.77 KB
Newer Older
1 2 3 4
#!/usr/bin/env python3

import os
import re
5
import time
6 7 8
from typing import *

import pandas as pd
9
from suqc.environment import VadereEnvironmentManager
10
from suqc.utils.dict_utils import deep_dict_lookup
11 12 13 14


class FileDataInfo(object):

15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
    # Implemented in Vadere merge request !38, this is only a fallback mode and requires
    # manual updating if there are changes in Vadere. See also Vadere issue #199 and #201.

    # all output types are from Vadere BUT "GeneralOutputFile"
    map_outputtype2index = {
        "IdOutputFile": 1,
        "LogEventOutputFile": 1,
        "NoDataKeyOutputFile": 0,
        "PedestrianIdOutputFile": 1,
        "TimestepOutputFile": 1,
        "TimestepPedestrianIdOutputFile": 2,
        "TimestepPedestrianIdOverlapOutputFile": 3,
        "TimestepPositionOutputFile": 3,
        "TimestepRowOutputFile": 2,
        "GeneralOutputFile": 1,
    }
31 32 33

    printFallbackMsg = False

34 35 36
    def __init__(
        self, process_file, processors=None, outputkey=None,
    ):
37
        self.filename = process_file["filename"]
38 39 40 41
        if outputkey is None:
            self.output_key = process_file["type"].split(".")[-1]
        else:
            self.output_key = outputkey
42 43 44 45 46 47 48
        self.processors = processors  # not really needed yet, but maybe in future.

        try:
            self.nr_row_indices = self.map_outputtype2index[self.output_key]
        except KeyError:
            if not self.printFallbackMsg:
                self.printFallbackMsg = True
49 50 51 52 53
                print(
                    f"WARNING: file type {self.output_key} was not found in list, this may require an update. Setting "
                    f"number of index columns to 1."
                )
            self.nr_row_indices = 1  # use simply first column as index
54 55 56


class QuantityOfInterest(object):
57
    def __init__(self, requested_files: Union[List[str], str]):
58 59 60 61 62 63

        assert isinstance(requested_files, (list, str))

        if isinstance(requested_files, str):
            requested_files = [requested_files]

64
        self.req_qois = self._requested_qoi(requested_files)
65

66
    def _requested_qoi(self, requested_files):
67 68
        req_qois = list()

69 70 71 72
        for rf in requested_files:
            pf = dict()
            pf["filename"] = rf
            pf["type"] = rf
73
            # TODO: This has to exactly match, maybe make more robust to allow without the file-ending
74 75 76
            req_qois.append(
                FileDataInfo(process_file=pf, outputkey="GeneralOutputFile")
            )
77 78 79 80

        return req_qois

    def _read_csv(self, req_qoi: FileDataInfo, filepath):
81

82 83 84 85 86 87 88 89 90 91 92 93 94 95
        # make sure that Vadere writes
        with open(filepath) as f:
            first_line = f.readline()

        try:
            # Tries to use the meta-data line, extract the number of rows
            nr_row_indices = re.search(r"ROW=(\d+)", first_line).group(1)
            nr_row_indices = int(nr_row_indices)
        except AttributeError or ValueError:  # AttributeError -> regex failed | ValueError -> converting to int failed
            # Fallback mode, infer index from the hard-coded list.
            nr_row_indices = req_qoi.nr_row_indices

        df = pd.read_csv(filepath, delimiter=" ", header=[0], comment="#")

96
        if req_qoi.nr_row_indices != 0:
97 98
            idx_keys = df.columns[:nr_row_indices]
            return df.set_index(idx_keys.tolist())
99 100
        else:
            return df
101

102
    def _add_parid2idx(self, df, par_id, run_id):
103 104
        # from https://stackoverflow.com/questions/14744068/prepend-a-level-to-a-pandas-multiindex

105 106 107 108 109 110 111 112 113
        original_column_order = df.index.names
        df["id"] = par_id
        df["run_id"] = run_id
        df.set_index(["id", "run_id"], append=True, inplace=True)

        df = df.reorder_levels(["id", "run_id"] + original_column_order)
        return df

    def read_and_extract_qois(self, par_id, run_id, output_path):
114 115 116 117 118 119

        read_data = dict()

        for k in self.req_qois:
            filepath = os.path.join(output_path, k.filename)
            df_data = self._read_csv(k, filepath)
120 121 122
            read_data[k.filename] = self._add_parid2idx(
                df_data, par_id, run_id
            )  # filename is identifier for QoI
123 124 125 126

        return read_data


127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
class VadereQuantityOfInterest(QuantityOfInterest):
    def __init__(self, basis_scenario: dict, requested_files: Union[List[str], str]):

        assert isinstance(requested_files, (list, str))

        if isinstance(requested_files, str):
            requested_files = [requested_files]

        user_set_writers, _ = deep_dict_lookup(basis_scenario, "processWriters")
        self.process_files = user_set_writers["files"]
        self.processsors = user_set_writers["processors"]

        super().__init__(requested_files)

    def get_process_files(self):
        return self.process_files

    def get_processors(self):
        return self.processsors

    def _requested_qoi(self, requested_files):

        req_qois = list()
        process_files = self.get_process_files()
        processsors = self.get_processors()

        for pf in process_files:

            # TODO: This has to exactly match, maybe make more robust to allow without the file-ending
            filename = pf["filename"]  # TODO: see issue #33

            if filename in requested_files:
                sel_procs = self._select_corresp_processors(pf, processsors)
                req_qois.append(FileDataInfo(process_file=pf, processors=sel_procs))

                requested_files.remove(
                    filename
                )  # -> processed, list should be empty when leaving function

        if requested_files:  # has to be empty
            raise ValueError(
                f"The requested files {requested_files} are not set in the Vadere scenario: \n "
                f"{process_files}"
            )

        return req_qois

    def _select_corresp_processors(self, process_file, processors):
        proc_ids = process_file["processors"]

        selected_procs = list()

        # TODO: see issue #33
        for pid in proc_ids:
            found = False
            for p in processors:
                if pid == p["id"]:
                    selected_procs.append(p)

                    if not found:
                        found = True
                    else:
                        raise ValueError(
                            "The Vadere scenario is not correctly set up! There are two processors with "
                            f"the id={pid}."
                        )

            if not found:
                raise ValueError(
                    f"The Vadere scenario is not correctly set up! Processor id {pid} could not be found "
                    "in 'processors'."
                )

        return selected_procs


203
if __name__ == "__main__":
204 205 206
    a = VadereQuantityOfInterest(
        "evacuationTimes.txt", VadereEnvironmentManager("corner")
    )
207 208

    print(a.req_qois)