Skip to content

Commit

Permalink
distribute current time correctly to agents running in shadow contain…
Browse files Browse the repository at this point in the history
…er in different process (#199)

While using same_process=False works on current main, this PR fixes the
distribution of the current time to the other container.

Before, the output agent did not block when joining the dataframes but
did not execute its schedule at all (because he did not receive updates
of the current time)
  • Loading branch information
maurerle authored Oct 18, 2023
1 parent 7a454f9 commit 4a1f1a7
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 64 deletions.
13 changes: 13 additions & 0 deletions assume/common/mango_serializer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
import calendar
import pickle
from datetime import datetime

from mango.messages.codecs import JSON, GenericProtoMsg


def datetime_json_serializer():
def __tostring__(dt: datetime):
return calendar.timegm(dt.utctimetuple())

def __fromstring__(dt: datetime):
return datetime.utcfromtimestamp(dt)

return datetime, __tostring__, __fromstring__


def generic_json_serializer():
def __tostring__(generic_obj):
return pickle.dumps(generic_obj).hex()
Expand Down Expand Up @@ -30,5 +42,6 @@ def __fromstring__(data):

def mango_codec_factory():
codec = JSON()
codec.add_serializer(*datetime_json_serializer())
codec.add_serializer(*generic_json_serializer())
return codec
2 changes: 1 addition & 1 deletion assume/common/units_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def register_market(self, market: MarketConfig):
self.context.schedule_timestamp_task(
self.context.send_acl_message(
{"context": "registration", "market": market.name},
market.addr,
receiver_addr=market.addr,
receiver_id=market.aid,
acl_metadata={
"sender_addr": self.context.addr,
Expand Down
18 changes: 13 additions & 5 deletions assume/markets/base_market.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def validate_registration(self, meta: dict) -> bool:
"""
return True

def validate_orderbook(self, orderbook: Orderbook, agent_tuple) -> None:
def validate_orderbook(self, orderbook: Orderbook, agent_tuple: tuple) -> None:
"""
method to validate a given orderbook
This is needed to check if all required fields for this mechanism are present
Expand Down Expand Up @@ -154,27 +154,35 @@ def setup(self):
for field in self.required_fields:
assert field in self.marketconfig.additional_fields, "missing field"

def accept_orderbook(content: dict, meta):
def accept_orderbook(content: dict, meta: dict):
if not isinstance(content, dict):
return False

if isinstance(meta["sender_addr"], list):
meta["sender_addr"] = tuple(meta["sender_addr"])

return (
content.get("market") == self.marketconfig.name
and content.get("orderbook") is not None
and (meta["sender_addr"], meta["sender_id"]) in self.registered_agents
)

def accept_registration(content: dict, meta):
def accept_registration(content: dict, meta: dict):
if not isinstance(content, dict):
return False
if isinstance(meta["sender_addr"], list):
meta["sender_addr"] = tuple(meta["sender_addr"])

return (
content.get("context") == "registration"
and content.get("market") == self.marketconfig.name
)

def accept_get_unmatched(content: dict, meta):
def accept_get_unmatched(content: dict, meta: dict):
if not isinstance(content, dict):
return False
if isinstance(meta["sender_addr"], list):
meta["sender_addr"] = tuple(meta["sender_addr"])
return (
content.get("context") == "get_unmatched"
and content.get("market") == self.marketconfig.name
Expand Down Expand Up @@ -225,7 +233,7 @@ async def opening(self):
agent_addr, agent_id = agent
await self.context.send_acl_message(
opening_message,
agent_addr,
receiver_addr=agent_addr,
receiver_id=agent_id,
acl_metadata={
"sender_addr": self.context.addr,
Expand Down
17 changes: 0 additions & 17 deletions assume/strategies/flexable.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,23 +614,6 @@ def calculate_EOM_price_if_on(
return bid_price_inflex


def get_starting_costs(time, unit):
"""
Calculates the starting costs of a unit
:return: The starting costs of the unit
:rtype: float
"""
if time < unit.downtime_hot_start:
return unit.hot_start_cost

elif time < unit.downtime_warm_start:
return unit.warm_start_cost

else:
return unit.cold_start_cost


def get_specific_revenue(
unit: SupportsMinMax,
marginal_cost: float,
Expand Down
2 changes: 1 addition & 1 deletion assume/strategies/learning_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def get_actions(self, next_observation):
:rtype: torch.Tensor
"""

# distinction whetere we are in learning mode or not to handle exploration realised with noise
# distinction whether we are in learning mode or not to handle exploration realised with noise
if self.learning_mode:
# if we are in learning mode the first x episodes we want to explore the entire action space
# to get a good initial experience, in the area around the costs of the agent
Expand Down
89 changes: 52 additions & 37 deletions assume/world.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import sys
import time
from datetime import datetime
from sys import platform

import nest_asyncio
import pandas as pd
from mango import RoleAgent, create_container
from mango.container.core import Container
from mango.util.clock import ExternalClock
from mango.util.distributed_clock import DistributedClockAgent, DistributedClockManager
from mango.util.termination_detection import tasks_complete_or_sleeping
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
Expand Down Expand Up @@ -42,12 +44,13 @@ def __init__(
database_uri: str = "",
export_csv_path: str = "",
log_level: str = "INFO",
additional_clearing_mechanisms: dict = {},
distributed_role: bool = None,
):
logging.getLogger("assume").setLevel(log_level)
self.logger = logging.getLogger(__name__)
self.addr = addr
self.container = None
self.distributed_role = distributed_role

self.export_csv_path = export_csv_path
# intialize db connection at beginning of simulation
Expand Down Expand Up @@ -91,7 +94,7 @@ def __init__(
e,
)
self.clearing_mechanisms: dict[str, MarketRole] = clearing_mechanisms
self.clearing_mechanisms.update(additional_clearing_mechanisms)
self.addresses = []
nest_asyncio.apply()
self.loop = asyncio.get_event_loop()
asyncio.set_event_loop(self.loop)
Expand All @@ -103,51 +106,66 @@ async def setup(
simulation_id: str,
index: pd.Series,
save_frequency_hours: int = 24,
same_process: bool = True,
bidding_params: dict = {},
learning_config: LearningConfig = {},
forecaster: Forecaster = None,
manager_address=None,
):
self.clock = ExternalClock(0)
self.start = start
self.end = end
self.learning_config = learning_config
# initiate learning if the learning mode is on and hence we want to learn new strategies
self.evaluation_mode = self.learning_config.get("evaluation_mode", False)

# forecaster is used only when loading custom unit types
self.forecaster = forecaster

self.bidding_params = bidding_params
self.index = index
self.same_process = same_process

# kill old container if exists
if isinstance(self.container, Container) and self.container.running:
await self.container.shutdown()

# create new container
container_kwargs = {}
if self.addr == "world":
connection_type = "external_connection"
elif isinstance(self.addr, tuple):
connection_type = "tcp"
else:
connection_type = "mqtt"
container_kwargs["mqtt_kwargs"] = {
"broker_addr": "localhost",
"client_id": self.addr,
}

self.container = await create_container(
connection_type=connection_type,
codec=mango_codec_factory(),
addr=self.addr,
clock=self.clock,
**container_kwargs,
)
await self.setup_learning()
await self.setup_output_agent(simulation_id, save_frequency_hours)
self.learning_mode = self.learning_config.get("learning_mode", False)
self.output_agent_addr = (self.addr, "export_agent_1")
if self.distributed_role is True:
await self.setup_learning()
await self.setup_output_agent(simulation_id, save_frequency_hours)
self.clock_manager = DistributedClockManager(
self.container, receiver_clock_addresses=self.addresses
)
elif self.distributed_role is None:
await self.setup_learning()
await self.setup_output_agent(simulation_id, save_frequency_hours)
else:
self.clock_agent = DistributedClockAgent(self.container)
self.output_agent_addr = (manager_address, "export_agent_1")

async def setup_learning(self):
self.bidding_params.update(self.learning_config)

# initiate learning if the learning mode is on and hence we want to learn new strategies
self.learning_mode = self.learning_config.get("learning_mode", False)
self.evaluation_mode = self.learning_config.get("evaluation_mode", False)

if self.learning_mode:
# if so, we initate the rl learning role with parameters
from assume.reinforcement_learning.learning_role import Learning
Expand All @@ -157,24 +175,14 @@ async def setup_learning(self):
start=self.start,
end=self.end,
)
# if self.same_process:
# separate process does not support buffer and learning
if True:
self.learning_agent_addr = (self.addr, "learning_agent")
rl_agent = RoleAgent(
self.container, suggested_aid=self.learning_agent_addr[1]
)
rl_agent.add_role(self.learning_role)
else:

def creator(container):
agent = RoleAgent(container, suggested_aid="learning_agent")
agent.add_role(self.learning_role)

await self.container.as_agent_process(agent_creator=creator)
self.learning_agent_addr = (self.addr, "learning_agent")
rl_agent = RoleAgent(
self.container, suggested_aid=self.learning_agent_addr[1]
)
rl_agent.add_role(self.learning_role)

async def setup_output_agent(self, simulation_id: str, save_frequency_hours: int):
self.output_agent_addr = (self.addr, "export_agent_1")
# Add output agent to world
self.logger.debug(f"creating output agent {self.db=} {self.export_csv_path=}")
self.output_role = WriteOutput(
Expand All @@ -187,20 +195,23 @@ async def setup_output_agent(self, simulation_id: str, save_frequency_hours: int
learning_mode=self.learning_mode,
evaluation_mode=self.evaluation_mode,
)
if self.same_process:
output_agent = RoleAgent(
self.container, suggested_aid=self.output_agent_addr[1]
)
output_agent.add_role(self.output_role)
else:
# this does not set the clock in output_agent correctly yet
# see https://gitlab.com/mango-agents/mango/-/issues/59
# but still improves performance

# mango multiprocessing is currently only supported on linux
# with single
if platform == "linux" and self.distributed_role is None:
self.addresses.append(self.addr)

def creator(container):
agent = RoleAgent(container, suggested_aid=self.output_agent_addr[1])
agent.add_role(self.output_role)
clock_agent = DistributedClockAgent(container)

await self.container.as_agent_process(agent_creator=creator)
else:
output_agent = RoleAgent(
self.container, suggested_aid=self.output_agent_addr[1]
)
output_agent.add_role(self.output_role)

def add_unit_operator(
self,
Expand Down Expand Up @@ -369,12 +380,16 @@ def add_market(
self.markets[f"{market_config.name}"] = market_config

async def _step(self):
next_activity = self.clock.get_next_activity()
if self.distributed_role:
next_activity = await self.clock_manager.distribute_time()
else:
next_activity = self.clock.get_next_activity()
if not next_activity:
self.logger.info("simulation finished - no schedules left")
return None
delta = next_activity - self.clock.time
self.clock.set_time(next_activity)
await tasks_complete_or_sleeping(self.container)
return delta

async def async_run(self, start_ts, end_ts):
Expand All @@ -389,6 +404,8 @@ async def async_run(self, start_ts, end_ts):

# allow registration before first opening
self.clock.set_time(start_ts - 1)
if self.distributed_role:
await self.clock_manager.broadcast(self.clock.time)
while self.clock.time < end_ts:
await asyncio.sleep(0)
delta = await self._step()
Expand All @@ -400,8 +417,6 @@ async def async_run(self, start_ts, end_ts):
)
else:
self.clock.set_time(end_ts)

await tasks_complete_or_sleeping(self.container)
pbar.close()
await self.container.shutdown()

Expand Down
Loading

0 comments on commit 4a1f1a7

Please sign in to comment.