import abc
import copy
import logging
import numpy as np
from PyDSS.common import StoreValuesType
from PyDSS.utils.simulation_utils import CircularBufferHelper
from PyDSS.value_storage import ValueContainer
logger = logging.getLogger(__name__)
[docs]class StorageFilterBase(abc.ABC):
"""Base class for storage containers.
Subclasses can perform custom filtering based on StoreValuesType.
"""
def __init__(self, hdf_store, path, prop, num_steps, max_chunk_bytes, values, elem_names, **kwargs):
self._prop = prop
self._container = self.make_container(
hdf_store,
path,
prop,
num_steps,
max_chunk_bytes,
values,
elem_names,
)
logger.debug("Created %s path=%s", self.__class__.__name__, path)
[docs] @abc.abstractmethod
def append_values(self, values, time_step):
"""Store a new set of values for each element."""
[docs] def close(self):
"""Perform any final writes to the container."""
self.flush_data()
[docs] def flush_data(self):
"""Flush data to disk."""
self._container.flush_data()
[docs] def max_num_bytes(self):
"""Return the maximum number of bytes the container could hold.
Returns
-------
int
"""
return self._container.max_num_bytes()
[docs] @staticmethod
def make_container(hdf_store, path, prop, num_steps, max_chunk_bytes, values, elem_names):
"""Return an instance of ValueContainer for storing values."""
container = ValueContainer(
values,
hdf_store,
path,
prop.get_max_size(num_steps),
elem_names,
prop.get_dataset_property_type(),
max_chunk_bytes=max_chunk_bytes,
store_time_step=prop.should_store_time_step(),
)
logger.debug("Created storage container path=%s", path)
return container
[docs]class StorageAll(StorageFilterBase):
"""Store values at every time point, optionally filtered."""
[docs] def append_values(self, values, time_step):
if self._prop.limits:
for i, value in enumerate(values):
if value.is_nan():
break
if self._prop.should_store_value(value.value):
self._container.append_by_time_step(value, time_step, i)
else:
self._container.append(values)
"""
class StorageChangeCount(StorageFilterBase):
def __init__(self, *args):
super().__init__(*args)
self._last_value = None
self._change_count = (None, 0)
def append_values(self, values, time_step):
assert False
"""
[docs]class StorageMin(StorageFilterBase):
"""Stores the min value across time points."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._min = None
[docs] def append_values(self, values, time_step):
if values[0].is_nan():
return
self._handle_values(values)
[docs] def close(self):
if self._min is not None:
self._container.append(self._min)
self._container.flush_data()
def _handle_values(self, values):
if self._min is None:
self._min = [copy.deepcopy(x) for x in values]
else:
for i, new_val in enumerate(values):
cur_val = self._min[i]
if (np.isnan(cur_val.value) and not np.isnan(new_val.value)) or \
new_val < cur_val:
self._min[i].set_value(new_val.value)
[docs]class StorageMax(StorageFilterBase):
"""Stores the max value across time points."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._max = None
[docs] def append_values(self, values, time_step):
if values[0].is_nan():
return
self._handle_values(values)
[docs] def close(self):
if self._max is not None:
self._container.append(self._max)
self._container.flush_data()
def _handle_values(self, values):
if self._max is None:
self._max = [copy.deepcopy(x) for x in values]
else:
for i, new_val in enumerate(values):
cur_val = self._max[i]
if (np.isnan(cur_val.value) and not np.isnan(new_val.value)) or \
new_val > cur_val:
self._max[i].set_value(new_val.value)
[docs]class StorageMovingAverage(StorageFilterBase):
"""Stores a moving average across time points."""
def __init__(self, *args, **kwargs):
"""Constructor for StorageMovingAverage.
window_size comes from either the passed `prop` variable or an optional
`window_sizes` keyword-argument variable. In the former case one size
is applied to all elements being tracked. In the latter case
`window_sizes` must be a list of integers that is a window_size for
each corresponding element index.
"""
super().__init__(*args, **kwargs)
self._averages = None
self._bufs = None
self._window_sizes = kwargs.get("window_sizes")
[docs] def append_values(self, values, time_step):
if values[0].is_nan():
return
# Store every value in the circular buffer. Apply limits to the
# moving average.
if self._bufs is None:
self._averages = [copy.deepcopy(x) for x in values]
self._bufs = _make_circular_buffers(len(values), self._prop, self._window_sizes)
for i, val in enumerate(values):
buf = self._bufs[i]
buf.append(val.value)
self._averages[i].set_value(buf.average())
if self._prop.limits:
for i, avg in enumerate(self._averages):
if self._prop.should_store_value(avg.value):
self._container.append_by_time_step(avg, time_step, i)
else:
self._container.append(self._averages)
[docs]class StorageMovingAverageMax(StorageMax):
"""Stores the max value of a moving average across time points."""
def __init__(self, *args, **kwargs):
"""Constructor for StorageMovingAverageMax.
window_size comes from either the passed `prop` variable or an optional
`window_sizes` keyword-argument variable. In the former case one size
is applied to all elements being tracked. In the latter case
`window_sizes` must be a list of integers that is a window_size for
each corresponding element index.
"""
super().__init__(*args, **kwargs)
self._bufs = None
self._averages = None
self._window_sizes = kwargs.get("window_sizes")
[docs] def append_values(self, values, time_step):
if values[0].is_nan():
return
if self._bufs is None:
self._averages = [copy.deepcopy(x) for x in values]
self._bufs = _make_circular_buffers(len(values), self._prop, self._window_sizes)
for i, val in enumerate(values):
buf = self._bufs[i]
buf.append(val.value)
self._averages[i].set_value(buf.average())
self._handle_values(self._averages)
[docs]class StorageSum(StorageFilterBase):
"""Keeps a running sum of all values and records the total."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._sum = None
[docs] def append_values(self, values, _time_step):
if values[0].is_nan():
return
if self._sum is None:
self._sum = [copy.deepcopy(x) for x in values]
else:
for i, val in enumerate(values):
self._sum[i] += val
[docs] def close(self):
if self._sum is not None:
self._container.append(self._sum)
self._container.flush_data()
def _make_circular_buffers(num_elements, prop, window_sizes):
if window_sizes is None:
window_size = prop.window_size
bufs = [CircularBufferHelper(window_size) for _ in range(num_elements)]
else:
bufs = [CircularBufferHelper(window_sizes[i]) for i in range(num_elements)]
return bufs
STORAGE_TYPE_MAP = {
StoreValuesType.ALL: StorageAll,
#StoreValuesType.CHANGE_COUNT: StorageChangeCount,
StoreValuesType.MAX: StorageMax,
StoreValuesType.MIN: StorageMin,
StoreValuesType.MOVING_AVERAGE: StorageMovingAverage,
StoreValuesType.MOVING_AVERAGE_MAX: StorageMovingAverageMax,
StoreValuesType.SUM: StorageSum,
}