Skip to content

Commit

Permalink
Pandas wrapper (#483)
Browse files Browse the repository at this point in the history
<!--
SPDX-FileCopyrightText: ASSUME Developers

SPDX-License-Identifier: AGPL-3.0-or-later
-->

# Pull Request

## Related Issue
Closes #321 

## Description
As suggested by @maurerle this improves performance of all simulations
by a factor of 2 up to 4, by replacing pandas actions with numpy where
possible. For this, we are using a wrapper object FastIndex and
FastSeries which wraps a numpy array so that we can access it using
typical datetime accessors.

The speed up for small simulations is 2x and for large simulations 3x.

## Changes Proposed
- Shift to using special classes of FastIndex and FastSeries
- Adjust the rest of the code to make use of this new class

## Testing
Most of the tests pass and simulations work, but a more extensive
testing is required to test full functionality.

## Checklist
Please check all applicable items:

- [x] Code changes are sufficiently documented (docstrings, inline
comments, `doc` folder updates)
- [x] New unit tests added for new features or bug fixes
- [x] Existing tests pass with the changes
- [x] Reinforcement learning examples are operational (for DRL-related
changes)
- [x] Code tested with both local and Docker databases
- [x] Code follows project style guidelines and best practices
- [ ] Changes are backwards compatible, or deprecation notices added
- [ ] New dependencies added to `pyproject.toml`
- [x] A note for the release notes `doc/release_notes.rst` of the
upcoming release is included
- [x] Consent to release this PR's code under the GNU Affero General
Public License v3.0

## Additional Notes (if applicable)
[Any additional information, concerns, or areas you want reviewers to
focus on]

## Screenshots (if applicable)
[Add screenshots to demonstrate visual changes]

---------

Co-authored-by: Florian Maurer <[email protected]>
  • Loading branch information
nick-harder and maurerle authored Nov 25, 2024
1 parent 7d5465b commit b9cf60e
Show file tree
Hide file tree
Showing 60 changed files with 2,843 additions and 1,288 deletions.
201 changes: 86 additions & 115 deletions assume/common/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,11 @@
from typing import TypedDict

import numpy as np
import pandas as pd

from assume.common.fast_pandas import FastSeries, TensorFastSeries
from assume.common.forecasts import Forecaster
from assume.common.market_objects import MarketConfig, Orderbook, Product

try:
import torch as th
except ImportError:
th = None


class BaseStrategy:
pass
Expand All @@ -26,22 +21,12 @@ class BaseUnit:
"""
A base class for a unit. This class is used as a foundation for all units.
Attributes:
id (str): The ID of the unit.
unit_operator (str): The operator of the unit.
technology (str): The technology of the unit.
bidding_strategies (dict[str, BaseStrategy]): The bidding strategies of the unit.
index (pandas.DatetimeIndex): The index of the unit.
node (str, optional): The node of the unit. Defaults to "".
forecaster (Forecaster, optional): The forecast of the unit. Defaults to None.
**kwargs: Additional keyword arguments.
Args:
id (str): The ID of the unit.
unit_operator (str): The operator of the unit.
technology (str): The technology of the unit.
bidding_strategies (dict[str, BaseStrategy]): The bidding strategies of the unit.
index (pandas.DatetimeIndex): The index of the unit.
index (FastIndex): The index of the unit.
node (str, optional): The node of the unit. Defaults to "".
forecaster (Forecaster, optional): The forecast of the unit. Defaults to None.
location (tuple[float, float], optional): The location of the unit. Defaults to (0.0, 0.0).
Expand All @@ -55,38 +40,41 @@ def __init__(
unit_operator: str,
technology: str,
bidding_strategies: dict[str, BaseStrategy],
index: pd.DatetimeIndex,
forecaster: Forecaster,
node: str = "node0",
forecaster: Forecaster = None,
location: tuple[float, float] = (0.0, 0.0),
**kwargs,
):
self.id = id
self.unit_operator = unit_operator
self.technology = technology
self.bidding_strategies: dict[str, BaseStrategy] = bidding_strategies
self.forecaster = forecaster
self.index = forecaster.index

self.node = node
self.location = location
self.bidding_strategies: dict[str, BaseStrategy] = bidding_strategies
self.index = index
self.outputs = defaultdict(lambda: pd.Series(0.0, index=self.index))
# series does not like to convert from tensor to float otherwise

# RL data stored as lists to simplify storing to the buffer
self.outputs["rl_observations"] = []
self.outputs["rl_actions"] = []
self.outputs["rl_rewards"] = []
self.outputs = defaultdict(lambda: FastSeries(value=0.0, index=self.index))
# series does not like to convert from tensor to float otherwise

# some data is stored as series to allow to store it in the outputs
self.outputs["actions"] = pd.Series(0.0, index=self.index, dtype=object)
self.outputs["exploration_noise"] = pd.Series(
0.0, index=self.index, dtype=object
)
self.outputs["reward"] = pd.Series(0.0, index=self.index, dtype=object)
# check if any bidding strategy is using the RL strategy
if any(
isinstance(strategy, LearningStrategy)
for strategy in self.bidding_strategies.values()
):
self.outputs["actions"] = TensorFastSeries(value=0.0, index=self.index)
self.outputs["exploration_noise"] = TensorFastSeries(
value=0.0,
index=self.index,
)
self.outputs["reward"] = FastSeries(value=0.0, index=self.index)

if forecaster:
self.forecaster = forecaster
else:
self.forecaster = defaultdict(lambda: pd.Series(0.0, index=self.index))
# RL data stored as lists to simplify storing to the buffer
self.outputs["rl_observations"] = []
self.outputs["rl_actions"] = []
self.outputs["rl_rewards"] = []

def calculate_bids(
self,
Expand Down Expand Up @@ -128,12 +116,12 @@ def calculate_bids(

return bids

def calculate_marginal_cost(self, start: pd.Timestamp, power: float) -> float:
def calculate_marginal_cost(self, start: datetime, power: float) -> float:
"""
Calculates the marginal cost for the given power.
Calculates the marginal cost for the given power.`
Args:
start (pandas.Timestamp): The start time of the dispatch.
start (datetime.datetime): The start time of the dispatch.
power (float): The power output of the unit.
Returns:
Expand Down Expand Up @@ -192,19 +180,23 @@ def calculate_generation_cost(
start = self.index[0]

product_type_mc = product_type + "_marginal_costs"
product_data = self.outputs[product_type].loc[start:end]

marginal_costs = product_data.index.map(
lambda t: self.calculate_marginal_cost(t, product_data.loc[t])
)
new_values = np.abs(marginal_costs * product_data.values)
# Adjusted code for accessing product data and mapping over the index
product_data = self.outputs[product_type].loc[
start:end
] # Slicing directly without `.loc`

marginal_costs = [
self.calculate_marginal_cost(t, product_data[idx])
for idx, t in enumerate(self.index[start:end])
]
new_values = np.abs(marginal_costs * product_data)
self.outputs[product_type_mc].loc[start:end] = new_values

def execute_current_dispatch(
self,
start: pd.Timestamp,
end: pd.Timestamp,
) -> pd.Series:
start: datetime,
end: datetime,
) -> np.array:
"""
Checks if the total dispatch plan is feasible.
Expand All @@ -218,7 +210,7 @@ def execute_current_dispatch(
Returns:
The volume of the unit within the given time range.
"""
return self.outputs["energy"][start:end]
return self.outputs["energy"].loc[start:end]

def get_output_before(self, dt: datetime, product_type: str = "energy") -> float:
"""
Expand Down Expand Up @@ -267,21 +259,21 @@ def calculate_cashflow(self, product_type: str, orderbook: Orderbook):
end_excl = end - self.index.freq

if isinstance(order["accepted_volume"], dict):
cashflow = [
float(order["accepted_price"][i] * order["accepted_volume"][i])
for i in order["accepted_volume"].keys()
]
self.outputs[f"{product_type}_cashflow"].loc[start:end_excl] += (
cashflow * self.index.freq.n
cashflow = np.array(
[
float(order["accepted_price"][i] * order["accepted_volume"][i])
for i in order["accepted_volume"].keys()
]
)
else:
cashflow = float(
order.get("accepted_price", 0) * order.get("accepted_volume", 0)
)
elapsed_intervals = (end - start) / pd.Timedelta(self.index.freq)
self.outputs[f"{product_type}_cashflow"].loc[start:end_excl] += (
cashflow * elapsed_intervals
)

elapsed_intervals = (end - start) / self.index.freq
self.outputs[f"{product_type}_cashflow"].loc[start:end_excl] += (
cashflow * elapsed_intervals
)

def get_starting_costs(self, op_time: int) -> float:
"""
Expand Down Expand Up @@ -322,18 +314,18 @@ class SupportsMinMax(BaseUnit):
min_down_time: int = 0

def calculate_min_max_power(
self, start: pd.Timestamp, end: pd.Timestamp, product_type: str = "energy"
) -> tuple[pd.Series, pd.Series]:
self, start: datetime, end: datetime, product_type: str = "energy"
) -> tuple[np.array, np.array]:
"""
Calculates the min and max power for the given time period.
Args:
start (pandas.Timestamp): The start time of the dispatch.
end (pandas.Timestamp): The end time of the dispatch.
start (datetime.datetime): The start time of the dispatch.
end (datetime.datetime): The end time of the dispatch.
product_type (str): The product type of the unit.
Returns:
tuple[pandas.Series, pandas.Series]: The min and max power for the given time period.
tuple[np.array, np.array]: The min and max power for the given time period.
"""

def calculate_ramp(
Expand All @@ -355,7 +347,6 @@ def calculate_ramp(
Returns:
float: The corrected possible power to offer according to ramping restrictions.
"""

# was off before, but should be on now and min_down_time is not reached
if power > 0 and op_time < 0 and op_time > -self.min_down_time:
power = 0
Expand Down Expand Up @@ -383,20 +374,6 @@ def calculate_ramp(
)
return power

def get_clean_spread(self, prices: pd.DataFrame) -> float:
"""
Returns the clean spread for the given prices.
Args:
prices (pandas.DataFrame): The prices.
Returns:
float: The clean spread for the given prices.
"""
emission_cost = self.emission_factor * prices["co"].mean()
fuel_cost = prices[self.technology.replace("_combined", "")].mean()
return (fuel_cost + emission_cost) / self.efficiency

def get_operation_time(self, start: datetime) -> int:
"""
Returns the time the unit is operating (positive) or shut down (negative).
Expand All @@ -405,24 +382,32 @@ def get_operation_time(self, start: datetime) -> int:
start (datetime.datetime): The start time.
Returns:
int: The operation time.
int: The operation time as a positive integer if operating, or negative if shut down.
"""
before = start - self.index.freq
# Set the time window based on max of min operating/down time
max_time = max(self.min_operating_time, self.min_down_time, 1)
begin = max(start - self.index.freq * max_time, self.index[0])
end = start - self.index.freq

max_time = max(self.min_operating_time, self.min_down_time)
begin = start - self.index.freq * max_time
end = before
arr = self.outputs["energy"][begin:end][::-1] > 0
if len(arr) < 1:
if start <= self.index[0]:
# before start of index
return max_time
is_off = not arr.iloc[0]

# Check energy output in the defined time window, reversed for most recent state
arr = (self.outputs["energy"].loc[begin:end] > 0)[::-1]

# Determine initial state (off if the first period shows zero energy output)
is_off = not arr[0]
run = 0

# Count consecutive periods with the same status, break on change
for val in arr:
if val == is_off:
if val != (not is_off): # Stop if the state changes
break
run += 1
return (-1) ** is_off * run

# Return positive time if operating, negative if shut down
return -run if is_off else run

def get_average_operation_times(self, start: datetime) -> tuple[float, float]:
"""
Expand All @@ -440,14 +425,14 @@ def get_average_operation_times(self, start: datetime) -> tuple[float, float]:
op_series = []

before = start - self.index.freq
arr = self.outputs["energy"][self.index[0] : before][::-1] > 0
arr = self.outputs["energy"].loc[self.index[0] : before][::-1] > 0

if len(arr) < 1:
# before start of index
return max(self.min_operating_time, 1), min(-self.min_down_time, -1)

op_series = []
status = arr.iloc[0]
status = arr[0]
run = 0
for val in arr:
if val == status:
Expand Down Expand Up @@ -537,33 +522,33 @@ class SupportsMinMaxCharge(BaseUnit):
efficiency_discharge: float

def calculate_min_max_charge(
self, start: pd.Timestamp, end: pd.Timestamp, product_type="energy"
) -> tuple[pd.Series, pd.Series]:
self, start: datetime, end: datetime, product_type="energy"
) -> tuple[np.array, np.array]:
"""
Calculates the min and max charging power for the given time period.
Args:
start (pandas.Timestamp): The start time of the dispatch.
end (pandas.Timestamp): The end time of the dispatch.
start (datetime.datetime): The start time of the dispatch.
end (datetime.datetime): The end time of the dispatch.
product_type (str, optional): The product type of the unit. Defaults to "energy".
Returns:
tuple[pandas.Series, pandas.Series]: The min and max charging power for the given time period.
tuple[np.array, np.array]: The min and max charging power for the given time period.
"""

def calculate_min_max_discharge(
self, start: pd.Timestamp, end: pd.Timestamp, product_type="energy"
) -> tuple[pd.Series, pd.Series]:
self, start: datetime, end: datetime, product_type="energy"
) -> tuple[np.array, np.array]:
"""
Calculates the min and max discharging power for the given time period.
Args:
start (pandas.Timestamp): The start time of the dispatch.
end (pandas.Timestamp): The end time of the dispatch.
start (datetime.datetime): The start time of the dispatch.
end (datetime.datetime): The end time of the dispatch.
product_type (str, optional): The product type of the unit. Defaults to "energy".
Returns:
tuple[pandas.Series, pandas.Series]: The min and max discharging power for the given time period.
tuple[np.array, np.array]: The min and max discharging power for the given time period.
"""

def get_soc_before(self, dt: datetime) -> float:
Expand All @@ -583,20 +568,6 @@ def get_soc_before(self, dt: datetime) -> float:
else:
return self.outputs["soc"].at[dt - self.index.freq]

def get_clean_spread(self, prices: pd.DataFrame) -> float:
"""
Returns the clean spread for the given prices.
Args:
prices (pandas.DataFrame): The prices.
Returns:
float: The clean spread for the given prices.
"""
emission_cost = self.emission_factor * prices["co"].mean()
fuel_cost = prices[self.technology.replace("_combined", "")].mean()
return (fuel_cost + emission_cost) / self.efficiency_charge

def calculate_ramp_discharge(
self,
previous_power: float,
Expand Down
Loading

0 comments on commit b9cf60e

Please sign in to comment.