Source code for PyDSS.pydss_results

"""Provides access to PyDSS result data."""
from collections import defaultdict
from datetime import datetime
import json
import logging
import os
import re
from pathlib import Path

import h5py
import numpy as np
import pandas as pd

from PyDSS.common import  DatasetPropertyType
from PyDSS.dataset_buffer import DatasetBuffer
from PyDSS.element_options import ElementOptions
from PyDSS.exceptions import InvalidParameter
from PyDSS.pydss_project import PyDssProject, RUN_SIMULATION_FILENAME
from PyDSS.reports.reports import Reports, REPORTS_DIR
from PyDSS.utils.dataframe_utils import read_dataframe, write_dataframe
from PyDSS.utils.utils import dump_data, load_data, make_json_serializable, \
    make_timestamps
from PyDSS.value_storage import ValueStorageBase, get_dataset_property_type, \
    get_time_step_path


logger = logging.getLogger(__name__)


[docs]class PyDssResults: """Interface to perform analysis on PyDSS output data.""" def __init__( self, project_path=None, project=None, in_memory=False, frequency=False, mode=False ): """Constructs PyDssResults object. Parameters ---------- project_path : str | None Load project from files in path project : PyDssProject | None Existing project object in_memory : bool If true, load all exported data into memory. frequency : bool If true, add frequency column to all dataframes. mode : bool If true, add mode column to all dataframes. """ options = ElementOptions() if project_path is not None: # TODO: handle old version? self._project = PyDssProject.load_project( project_path, simulation_file=RUN_SIMULATION_FILENAME, ) elif project is None: raise InvalidParameter("project_path or project must be set") else: self._project = project self._fs_intf = self._project.fs_interface self._scenarios = [] filename = self._project.get_hdf_store_filename() driver = "core" if in_memory else None self._hdf_store = h5py.File(filename, "r", driver=driver) if self._project.simulation_config.exports.export_results: for name in self._project.list_scenario_names(): metadata = self._project.read_scenario_export_metadata(name) scenario_result = PyDssScenarioResults( name, self.project_path, self._hdf_store, self._fs_intf, metadata, options, frequency=frequency, mode=mode, ) self._scenarios.append(scenario_result)
[docs] def generate_reports(self): """Generate all reports specified in the configuration. Returns ------- list list of report filenames """ return Reports.generate_reports(self)
[docs] def read_report(self, report_name): """Return the report data. Parameters ---------- report_name : str Returns ------- str """ all_reports = Reports.get_all_reports() if report_name not in all_reports: raise InvalidParameter(f"invalid report name {report_name}") report_cls = all_reports[report_name] # This bypasses self._fs_intf because reports are always extracted. reports_dir = os.path.join(self._project.project_path, REPORTS_DIR) for filename in os.listdir(reports_dir): name, ext = os.path.splitext(filename) if name == os.path.splitext(report_cls.FILENAME)[0]: path = os.path.join(reports_dir, filename) if ext in (".json", ".toml"): return load_data(path) if ext in (".csv", ".h5"): return read_dataframe(path) raise InvalidParameter(f"did not find report {report_name} in {reports_dir}")
@property def project(self): """Return the PyDssProject instance. Returns ------- PyDssProject """ return self._project @property def scenarios(self): """Return the PyDssScenarioResults instances for the project. Returns ------- list list of PyDssScenarioResults """ return self._scenarios
[docs] def get_scenario(self, name): """Return the PyDssScenarioResults object for scenario with name. Parameters ---------- name : str Scenario name Results ------- PyDssScenarioResults Raises ------ InvalidParameter Raised if the scenario does not exist. """ for scenario in self._scenarios: if name == scenario.name: return scenario raise InvalidParameter(f"scenario {name} does not exist")
@property def hdf_store(self): """Return a handle to the HDF data store. Returns ------- h5py.File """ return self._hdf_store @property def project_path(self): """Return the path to the PyDSS project. Returns ------- str """ return self._project.project_path
[docs] def read_file(self, path): """Read a file from the PyDSS project. Parameters ---------- path : str Path to the file relative from the project directory. Returns ------- str Contents of the file """ return self._fs_intf.read_file(path)
@property def simulation_config(self): """Return the simulation configuration Returns ------- dict """ return self._project.simulation_config
[docs]class PyDssScenarioResults: """Contains results for one scenario.""" def __init__( self, name, project_path, store, fs_intf, metadata, options, frequency=False, mode=False ): self._name = name self._project_path = project_path self._hdf_store = store self._metadata = metadata self._options = options self._fs_intf = fs_intf self._group = self._hdf_store[f"Exports/{name}"] self._elem_classes = [ x for x in self._group if isinstance(self._group[x], h5py.Group) ] self._elems_by_class = defaultdict(set) self._elem_data_by_prop = defaultdict(dict) self._elem_values_by_prop = defaultdict(dict) self._elem_indices_by_prop = defaultdict(dict) self._props_by_class = defaultdict(list) self._elem_props = defaultdict(list) self._column_ranges_per_elem = defaultdict(dict) self._summed_elem_props = defaultdict(dict) self._summed_elem_timeseries_props = defaultdict(list) self._indices_df = None self._add_frequency = frequency self._add_mode = mode self._data_format_version = self._hdf_store.attrs["version"] self._parse_datasets() def _parse_datasets(self): for elem_class in self._elem_classes: class_group = self._group[elem_class] if "ElementProperties" in class_group: prop_group = class_group["ElementProperties"] for prop, dataset in prop_group.items(): dataset_property_type = get_dataset_property_type(dataset) if dataset_property_type == DatasetPropertyType.TIME_STEP: continue if dataset_property_type == DatasetPropertyType.VALUE: self._elem_values_by_prop[elem_class][prop] = [] prop_names = self._elem_values_by_prop elif dataset_property_type in ( DatasetPropertyType.PER_TIME_POINT, DatasetPropertyType.FILTERED, ): self._elem_data_by_prop[elem_class][prop] = [] prop_names = self._elem_data_by_prop else: continue self._props_by_class[elem_class].append(prop) self._elem_indices_by_prop[elem_class][prop] = {} names = DatasetBuffer.get_names(dataset) self._column_ranges_per_elem[elem_class][prop] = \ DatasetBuffer.get_column_ranges(dataset) for i, name in enumerate(names): self._elems_by_class[elem_class].add(name) prop_names[elem_class][prop].append(name) self._elem_indices_by_prop[elem_class][prop][name] = i self._elem_props[name].append(prop) else: self._elems_by_class[elem_class] = set() summed_elem_props = self._group[elem_class].get("SummedElementProperties", []) for prop in summed_elem_props: dataset = self._group[elem_class]["SummedElementProperties"][prop] dataset_property_type = get_dataset_property_type(dataset) if dataset_property_type == DatasetPropertyType.VALUE: df = DatasetBuffer.to_dataframe(dataset) assert len(df) == 1 self._summed_elem_props[elem_class][prop] = { x: df[x].values[0] for x in df.columns } else: self._summed_elem_timeseries_props[elem_class].append(prop)
[docs] @staticmethod def get_name_from_column(column): """Return the element name from the dataframe column. The dataframe should have been returned from this class. Parameters ---------- column : str Returns ------- str """ fields = column.split(ValueStorageBase.DELIMITER) assert len(fields) > 1 return fields[0]
@property def name(self): """Return the name of the scenario. Returns ------- str """ return self._name
[docs] def export_data(self, path=None, fmt="csv", compress=False): """Export data to path. Parameters ---------- path : str Output directory; defaults to scenario exports path fmt : str Filer format type (csv, h5) compress : bool Compress data """ if path is None: path = os.path.join(self._project_path, "Exports", self._name) os.makedirs(path, exist_ok=True) self._export_element_timeseries(path, fmt, compress) self._export_element_values(path, fmt, compress) self._export_summed_element_timeseries(path, fmt, compress) self._export_summed_element_values(path, fmt, compress)
def _export_element_timeseries(self, path, fmt, compress): for elem_class in self.list_element_classes(): for prop in self.list_element_properties(elem_class): dataset = self._group[f"{elem_class}/ElementProperties/{prop}"] prop_type = get_dataset_property_type(dataset) if prop_type == DatasetPropertyType.FILTERED: self._export_filtered_dataframes(elem_class, prop, path, fmt, compress) else: df = self.get_full_dataframe(elem_class, prop) base = "__".join([elem_class, prop]) filename = os.path.join(path, base + "." + fmt.replace(".", "")) write_dataframe(df, filename, compress=compress) def _export_element_values(self, path, fmt, compress): elem_prop_nums = defaultdict(dict) for elem_class in self._elem_values_by_prop: for prop in self._elem_values_by_prop[elem_class]: dataset = self._group[f"{elem_class}/ElementProperties/{prop}"] for name in self._elem_values_by_prop[elem_class][prop]: col_range = self._get_element_column_range(elem_class, prop, name) start = col_range[0] length = col_range[1] if length == 1: val = dataset[:][0][start] else: val = dataset[:][0][start: start + length] if prop not in elem_prop_nums[elem_class]: elem_prop_nums[elem_class][prop] = {} elem_prop_nums[elem_class][prop][name] = val if elem_prop_nums: filename = os.path.join(path, "element_property_values.json") dump_data(elem_prop_nums, filename, indent=2, default=make_json_serializable) logger.info("Exported data to %s", path) def _export_filtered_dataframes(self, elem_class, prop, path, fmt, compress): for name, df in self.get_filtered_dataframes(elem_class, prop).items(): if df.empty: logger.debug("Skip empty dataframe %s %s %s", elem_class, prop, name) continue base = "__".join([elem_class, prop, name]) filename = os.path.join(path, base + "." + fmt.replace(".", "")) write_dataframe(df, filename, compress=compress) def _export_summed_element_timeseries(self, path, fmt, compress): for elem_class in self._summed_elem_timeseries_props: for prop in self._summed_elem_timeseries_props[elem_class]: dataset = self._group[elem_class]["SummedElementProperties"][prop] prop_type = get_dataset_property_type(dataset) if prop_type == DatasetPropertyType.PER_TIME_POINT: df = DatasetBuffer.to_dataframe(dataset) self._finalize_dataframe(df, dataset) base = "__".join([elem_class, prop]) filename = os.path.join(path, base + "." + fmt.replace(".", "")) write_dataframe(df, filename, compress=compress) def _export_summed_element_values(self, path, fmt, compress): filename = os.path.join(path, "summed_element_property_values.json") dump_data(self._summed_elem_props, filename, default=make_json_serializable)
[docs] def get_dataframe(self, element_class, prop, element_name, real_only=False, abs_val=False, **kwargs): """Return the dataframe for an element. Parameters ---------- element_class : str prop : str element_name : str real_only : bool If dtype of any column is complex, drop the imaginary component. abs_val : bool If dtype of any column is complex, compute its absolute value. kwargs Filter on options; values can be strings or regular expressions. Returns ------- pd.DataFrame Raises ------ InvalidParameter Raised if the element is not stored. """ if element_name not in self._elem_props: raise InvalidParameter(f"element {element_name} is not stored") dataset = self._group[f"{element_class}/ElementProperties/{prop}"] prop_type = get_dataset_property_type(dataset) if prop_type == DatasetPropertyType.PER_TIME_POINT: return self._get_elem_prop_dataframe( element_class, prop, element_name, dataset, real_only=real_only, abs_val=abs_val, **kwargs ) elif prop_type == DatasetPropertyType.FILTERED: return self._get_filtered_dataframe( element_class, prop, element_name, dataset, real_only=real_only, abs_val=abs_val, **kwargs ) assert False, str(prop_type)
[docs] def get_filtered_dataframes(self, element_class, prop, real_only=False, abs_val=False): """Return the dataframes for all elements. Calling this is much more efficient than calling get_dataframe for each element. Parameters ---------- element_class : str prop : str element_name : str real_only : bool If dtype of any column is complex, drop the imaginary component. abs_val : bool If dtype of any column is complex, compute its absolute value. Returns ------- dict key = str (name), val = pd.DataFrame The dict will be empty if no data was stored. """ if prop not in self.list_element_properties(element_class): logger.debug("%s/%s is not stored", element_class, prop) return {} dataset = self._group[f"{element_class}/ElementProperties/{prop}"] columns = DatasetBuffer.get_columns(dataset) names = DatasetBuffer.get_names(dataset) length = dataset.attrs["length"] indices_df = self._get_indices_df() data_vals = dataset[:length] elem_data = defaultdict(list) elem_timestamps = defaultdict(list) # The time_step_dataset has these columns: # 1. time step index # 2. element index # Each row describes the source data in the dataset row. path = dataset.attrs["time_step_path"] assert length == self._hdf_store[path].attrs["length"] time_step_data = self._hdf_store[path][:length] for i in range(length): ts_index = time_step_data[:, 0][i] elem_index = time_step_data[:, 1][i] # TODO DT: more than one column? val = data_vals[i, 0] if real_only: val = val.real elif abs_val: val = abs(val) elem_data[elem_index].append(val) elem_timestamps[elem_index].append(indices_df.iloc[ts_index, 0]) dfs = {} for elem_index, vals in elem_data.items(): elem_name = names[elem_index] cols = self._fix_columns(elem_name, columns) dfs[elem_name] = pd.DataFrame( vals, columns=cols, index=elem_timestamps[elem_index], ) return dfs
[docs] def get_full_dataframe(self, element_class, prop, real_only=False, abs_val=False, **kwargs): """Return a dataframe containing all data. The dataframe is copied. Parameters ---------- element_class : str prop : str real_only : bool If dtype of any column is complex, drop the imaginary component. abs_val : bool If dtype of any column is complex, compute its absolute value. kwargs Filter on options; values can be strings or regular expressions. Returns ------- pd.DataFrame """ if prop not in self.list_element_properties(element_class): raise InvalidParameter(f"property {prop} is not stored") dataset = self._group[f"{element_class}/ElementProperties/{prop}"] df = DatasetBuffer.to_dataframe(dataset) if kwargs: options = self._check_options(element_class, prop, **kwargs) names = self._elems_by_class.get(element_class, set()) columns = ValueStorageBase.get_columns(df, names, options, **kwargs) columns = list(columns) columns.sort() df = df[columns] self._finalize_dataframe(df, dataset, real_only=real_only, abs_val=abs_val) return df
[docs] def get_summed_element_total(self, element_class, prop): """Return the total value for a summed element property. Parameters ---------- element_class : str prop : str Returns ------- dict Raises ------ InvalidParameter Raised if the element class is not stored. """ if element_class not in self._summed_elem_props: raise InvalidParameter(f"{element_class} is not stored") if prop not in self._summed_elem_props[element_class]: raise InvalidParameter(f"{prop} is not stored") return self._summed_elem_props[element_class][prop]
[docs] def get_element_property_value(self, element_class, prop, element_name): """Return the number stored for the element property.""" if element_class not in self._elem_values_by_prop: raise InvalidParameter(f"{element_class} is not stored") if prop not in self._elem_values_by_prop[element_class]: raise InvalidParameter(f"{prop} is not stored") if element_name not in self._elem_values_by_prop[element_class][prop]: raise InvalidParameter(f"{element_name} is not stored") dataset = self._group[f"{element_class}/ElementProperties/{prop}"] col_range = self._get_element_column_range(element_class, prop, element_name) start = col_range[0] length = col_range[1] if length == 1: return dataset[:][0][start] return dataset[:][0][start: start + length]
[docs] def get_option_values(self, element_class, prop, element_name): """Return the option values for the element property. element_class : str prop : str element_name : str Returns ------- list """ df = self.get_dataframe(element_class, prop, element_name) return ValueStorageBase.get_option_values(df, element_name)
[docs] def get_summed_element_dataframe(self, element_class, prop, real_only=False, abs_val=False): """Return the dataframe for a summed element property. Parameters ---------- element_class : str prop : str real_only : bool If dtype of any column is complex, drop the imaginary component. abs_val : bool If dtype of any column is complex, compute its absolute value. Returns ------- pd.DataFrame Raises ------ InvalidParameter Raised if the element class is not stored. """ if element_class not in self._summed_elem_timeseries_props: raise InvalidParameter(f"{element_class} is not stored") if prop not in self._summed_elem_timeseries_props[element_class]: raise InvalidParameter(f"{prop} is not stored") elem_group = self._group[element_class]["SummedElementProperties"] dataset = elem_group[prop] df = DatasetBuffer.to_dataframe(dataset) self._add_indices_to_dataframe(df) if real_only: for column in df.columns: if df[column].dtype == complex: df[column] = [x.real for x in df[column]] elif abs_val: for column in df.columns: if df[column].dtype == complex: df[column] = [abs(x) for x in df[column]] return df
[docs] def iterate_dataframes(self, element_class, prop, real_only=False, abs_val=False, **kwargs): """Returns a generator over the dataframes by element name. Parameters ---------- element_class : str prop : str real_only : bool If dtype of any column is complex, drop the imaginary component. abs_val : bool If dtype of any column is complex, compute its absolute value. kwargs : dict Filter on options; values can be strings or regular expressions. Returns ------- tuple Tuple containing the name or property and a pd.DataFrame """ for name in self.list_element_names(element_class): if prop in self._elem_props[name]: df = self.get_dataframe( element_class, prop, name, real_only=real_only, abs_val=abs_val, **kwargs ) yield name, df
[docs] def iterate_element_property_values(self): """Return a generator over all element properties stored as values. Yields ------ tuple element_class, property, element_name, value """ for elem_class in self._elem_values_by_prop: for prop in self._elem_values_by_prop[elem_class]: for name in self._elem_values_by_prop[elem_class][prop]: val = self.get_element_property_value(elem_class, prop, name) yield elem_class, prop, name, val
[docs] def list_element_classes(self): """Return the element classes stored in the results. Returns ------- list """ return self._elem_classes[:]
[docs] def list_element_names(self, element_class, prop=None): """Return the element names for a property stored in the results. Parameters ---------- element_class : str prop : str Returns ------- list """ # TODO: prop is deprecated return sorted(list(self._elems_by_class.get(element_class, [])))
[docs] def list_element_properties(self, element_class, element_name=None): """Return the properties stored in the results for a class. Parameters ---------- element_class : str element_name : str | None If not None, list properties only for that name. Returns ------- list """ if element_class not in self._props_by_class: return [] if element_name is None: return sorted(list(self._props_by_class[element_class])) return self._elem_props.get(element_name, [])
[docs] def list_element_value_names(self, element_class, prop): if element_class not in self._elem_values_by_prop: raise InvalidParameter(f"{element_class} is not stored") if prop not in self._elem_values_by_prop[element_class]: raise InvalidParameter(f"{element_class} / {prop} is not stored") return sorted(self._elem_values_by_prop[element_class][prop])
[docs] def list_element_property_values(self, element_name): nums = [] for elem_class in self._elem_prop_nums: for prop in self._elem_prop_nums[elem_class]: for name in self._elem_prop_nums[elem_class][prop]: if name == element_name: nums.append(prop) return nums
[docs] def list_element_property_options(self, element_class, prop): """List the possible options for the element class and property. Parameters ---------- element_class : str prop : str Returns ------- list """ return self._options.list_options(element_class, prop)
[docs] def list_element_info_files(self): """Return the files describing the OpenDSS element objects. Returns ------- list list of filenames (str) """ return self._metadata["element_info_files"]
[docs] def list_summed_element_properties(self, element_class): """Return the properties stored for a class where the values are a sum of all elements. Parameters ---------- element_class : str Returns ------- list Raises ------ InvalidParameter Raised if the element_class is not stored. """ if element_class not in self._summed_elem_props: raise InvalidParameter(f"class={element_class} is not stored") return self._summed_elem_props[element_class]
[docs] def read_element_info_file(self, filename): """Return the contents of file describing an OpenDSS element object. Parameters ---------- filename : str full path to a file (returned by list_element_info_files) or an element class, like "Transformers" Returns ------- pd.DataFrame """ if "." not in filename: actual = None for _file in self.list_element_info_files(): basename = os.path.splitext(os.path.basename(_file))[0] if basename.replace("Info", "") == filename: actual = _file if actual is None: raise InvalidParameter( f"element info file for {filename} is not stored" ) filename = actual return self._fs_intf.read_csv(filename)
[docs] def read_capacitor_changes(self): """Read the capacitor state changes from the OpenDSS event log. Returns ------- dict Maps capacitor names to count of state changes. """ text = self.read_file(self._metadata["event_log"]) return _read_capacitor_changes(text)
[docs] def read_event_log(self): """Returns the event log for the scenario. Returns ------- list list of dictionaries (one dict for each row in the file) """ text = self.read_file(self._metadata["event_log"]) return _read_event_log(text)
[docs] def read_pv_profiles(self): """Returns exported PV profiles for all PV systems. Returns ------- dict """ return self._fs_intf.read_scenario_pv_profiles(self._name)
def _check_options(self, element_class, prop, **kwargs): """Checks that kwargs are valid and returns available option names.""" for option in kwargs: if not self._options.is_option_valid(element_class, prop, option): raise InvalidParameter( f"class={element_class} property={prop} option={option} is invalid" ) return self._options.list_options(element_class, prop)
[docs] def read_feeder_head_info(self): """Read the feeder head information. Returns ------- dict """ return json.loads(self.read_file(f"Exports/{self._name}/FeederHeadInfo.json"))
[docs] def read_file(self, path): """Read a file from the PyDSS project. Parameters ---------- path : str Path to the file relative from the project directory. Returns ------- str Contents of the file """ return self._fs_intf.read_file(path)
def _add_indices_to_dataframe(self, df): indices_df = self._get_indices_df() df["Timestamp"] = indices_df["Timestamp"] if self._add_frequency: df["Frequency"] = indices_df["Frequency"] if self._add_mode: df["Simulation Mode"] = indices_df["Simulation Mode"] df.set_index("Timestamp", inplace=True) def _finalize_dataframe(self, df, dataset, real_only=False, abs_val=False): if df.empty: return dataset_property_type = get_dataset_property_type(dataset) if dataset_property_type == DatasetPropertyType.FILTERED: time_step_path = get_time_step_path(dataset) time_step_dataset = self._hdf_store[time_step_path] df["TimeStep"] = DatasetBuffer.to_datetime(time_step_dataset) df.set_index("TimeStep", inplace=True) else: self._add_indices_to_dataframe(df) if real_only: for column in df.columns: if df[column].dtype == complex: df[column] = [x.real for x in df[column]] elif abs_val: for column in df.columns: if df[column].dtype == complex: df[column] = [abs(x) for x in df[column]] @staticmethod def _fix_columns(name, columns): cols = [] for column in columns: fields = column.split(ValueStorageBase.DELIMITER) fields[0] = name cols.append(ValueStorageBase.DELIMITER.join(fields)) return cols def _get_elem_prop_dataframe(self, elem_class, prop, name, dataset, real_only=False, abs_val=False, **kwargs): col_range = self._get_element_column_range(elem_class, prop, name) df = DatasetBuffer.to_dataframe(dataset, column_range=col_range) if kwargs: options = self._check_options(elem_class, prop, **kwargs) columns = ValueStorageBase.get_columns(df, name, options, **kwargs) df = df[columns] self._finalize_dataframe(df, dataset, real_only=real_only, abs_val=abs_val) return df def _get_element_column_range(self, elem_class, prop, name): elem_index = self._elem_indices_by_prop[elem_class][prop][name] col_range = self._column_ranges_per_elem[elem_class][prop][elem_index] return col_range def _get_filtered_dataframe(self, elem_class, prop, name, dataset, real_only=False, abs_val=False, **kwargs): indices_df = self._get_indices_df() elem_index = self._elem_indices_by_prop[elem_class][prop][name] length = dataset.attrs["length"] data_vals = dataset[:length] # The time_step_dataset has these columns: # 1. time step index # 2. element index # Each row describes the source data in the dataset row. path = dataset.attrs["time_step_path"] time_step_data = self._hdf_store[path][:length] assert length == self._hdf_store[path].attrs["length"] data = [] timestamps = [] for i in range(length): stored_elem_index = time_step_data[:, 1][i] if stored_elem_index == elem_index: ts_index = time_step_data[:, 0][i] # TODO DT: more than one column? val = data_vals[i, 0] # TODO: profile this vs a df operation at end if real_only: val = val.real elif abs_val: val = abs(val) data.append(val) timestamps.append(indices_df.iloc[ts_index, 0]) columns = self._fix_columns(name, DatasetBuffer.get_columns(dataset)) return pd.DataFrame(data, columns=columns, index=timestamps) def _get_indices_df(self): if self._indices_df is None: self._make_indices_df() return self._indices_df def _make_indices_df(self): data = { "Timestamp": make_timestamps(self._group["Timestamp"][:, 0]) } if self._add_frequency: data["Frequency"] = self._group["Frequency"][:, 0] if self._add_mode: data["Simulation Mode"] = self._group["Mode"][:, 0] df = pd.DataFrame(data) self._indices_df = df
def _read_capacitor_changes(event_log_text): """Read the capacitor state changes from an OpenDSS event log. Parameters ---------- event_log_text : str Text of event log Returns ------- dict Maps capacitor names to count of state changes. """ capacitor_changes = {} regex = re.compile(r"(Capacitor\.\w+)") data = _read_event_log(event_log_text) for row in data: match = regex.search(row["Element"]) if match: name = match.group(1) if name not in capacitor_changes: capacitor_changes[name] = 0 action = row["Action"].replace("*", "") if action in ("OPENED", "CLOSED", "STEP UP"): capacitor_changes[name] += 1 return capacitor_changes def _read_event_log(event_log_text): """Return OpenDSS event log information. Parameters ---------- event_log_text : str Text of event log Returns ------- list list of dictionaries (one dict for each row in the file) """ data = [] if not event_log_text: return data for line in event_log_text.split("\n"): if line == "": continue tokens = [x.strip() for x in line.split(",")] row = {} for token in tokens: name_and_value = [x.strip() for x in token.split("=")] name = name_and_value[0] value = name_and_value[1] row[name] = value data.append(row) return data