From 4a1f1a717057e96c8127c7711174d7298fc7a051 Mon Sep 17 00:00:00 2001 From: Florian Maurer Date: Wed, 18 Oct 2023 14:38:13 +0200 Subject: [PATCH] distribute current time correctly to agents running in shadow container in different process (#199) While using same_process=False works on current main, this PR fixes the distribution of the current time to the other container. Before, the output agent did not block when joining the dataframes but did not execute its schedule at all (because he did not receive updates of the current time) --- assume/common/mango_serializer.py | 13 ++++ assume/common/units_operator.py | 2 +- assume/markets/base_market.py | 18 +++-- assume/strategies/flexable.py | 17 ----- assume/strategies/learning_strategies.py | 2 +- assume/world.py | 89 ++++++++++++++---------- compose.yml | 44 +++++++++++- examples/distributed_world_agent.py | 78 +++++++++++++++++++++ examples/distributed_world_manager.py | 78 +++++++++++++++++++++ 9 files changed, 277 insertions(+), 64 deletions(-) create mode 100644 examples/distributed_world_agent.py create mode 100644 examples/distributed_world_manager.py diff --git a/assume/common/mango_serializer.py b/assume/common/mango_serializer.py index 7cddeae2..43cec516 100644 --- a/assume/common/mango_serializer.py +++ b/assume/common/mango_serializer.py @@ -1,8 +1,20 @@ +import calendar import pickle +from datetime import datetime from mango.messages.codecs import JSON, GenericProtoMsg +def datetime_json_serializer(): + def __tostring__(dt: datetime): + return calendar.timegm(dt.utctimetuple()) + + def __fromstring__(dt: datetime): + return datetime.utcfromtimestamp(dt) + + return datetime, __tostring__, __fromstring__ + + def generic_json_serializer(): def __tostring__(generic_obj): return pickle.dumps(generic_obj).hex() @@ -30,5 +42,6 @@ def __fromstring__(data): def mango_codec_factory(): codec = JSON() + codec.add_serializer(*datetime_json_serializer()) codec.add_serializer(*generic_json_serializer()) return codec diff --git a/assume/common/units_operator.py b/assume/common/units_operator.py index 7abbec79..dad50389 100644 --- a/assume/common/units_operator.py +++ b/assume/common/units_operator.py @@ -125,7 +125,7 @@ def register_market(self, market: MarketConfig): self.context.schedule_timestamp_task( self.context.send_acl_message( {"context": "registration", "market": market.name}, - market.addr, + receiver_addr=market.addr, receiver_id=market.aid, acl_metadata={ "sender_addr": self.context.addr, diff --git a/assume/markets/base_market.py b/assume/markets/base_market.py index 68c6a51b..97093277 100644 --- a/assume/markets/base_market.py +++ b/assume/markets/base_market.py @@ -44,7 +44,7 @@ def validate_registration(self, meta: dict) -> bool: """ return True - def validate_orderbook(self, orderbook: Orderbook, agent_tuple) -> None: + def validate_orderbook(self, orderbook: Orderbook, agent_tuple: tuple) -> None: """ method to validate a given orderbook This is needed to check if all required fields for this mechanism are present @@ -154,27 +154,35 @@ def setup(self): for field in self.required_fields: assert field in self.marketconfig.additional_fields, "missing field" - def accept_orderbook(content: dict, meta): + def accept_orderbook(content: dict, meta: dict): if not isinstance(content, dict): return False + if isinstance(meta["sender_addr"], list): + meta["sender_addr"] = tuple(meta["sender_addr"]) + return ( content.get("market") == self.marketconfig.name and content.get("orderbook") is not None and (meta["sender_addr"], meta["sender_id"]) in self.registered_agents ) - def accept_registration(content: dict, meta): + def accept_registration(content: dict, meta: dict): if not isinstance(content, dict): return False + if isinstance(meta["sender_addr"], list): + meta["sender_addr"] = tuple(meta["sender_addr"]) + return ( content.get("context") == "registration" and content.get("market") == self.marketconfig.name ) - def accept_get_unmatched(content: dict, meta): + def accept_get_unmatched(content: dict, meta: dict): if not isinstance(content, dict): return False + if isinstance(meta["sender_addr"], list): + meta["sender_addr"] = tuple(meta["sender_addr"]) return ( content.get("context") == "get_unmatched" and content.get("market") == self.marketconfig.name @@ -225,7 +233,7 @@ async def opening(self): agent_addr, agent_id = agent await self.context.send_acl_message( opening_message, - agent_addr, + receiver_addr=agent_addr, receiver_id=agent_id, acl_metadata={ "sender_addr": self.context.addr, diff --git a/assume/strategies/flexable.py b/assume/strategies/flexable.py index ddd3f593..352e6151 100644 --- a/assume/strategies/flexable.py +++ b/assume/strategies/flexable.py @@ -614,23 +614,6 @@ def calculate_EOM_price_if_on( return bid_price_inflex -def get_starting_costs(time, unit): - """ - Calculates the starting costs of a unit - - :return: The starting costs of the unit - :rtype: float - """ - if time < unit.downtime_hot_start: - return unit.hot_start_cost - - elif time < unit.downtime_warm_start: - return unit.warm_start_cost - - else: - return unit.cold_start_cost - - def get_specific_revenue( unit: SupportsMinMax, marginal_cost: float, diff --git a/assume/strategies/learning_strategies.py b/assume/strategies/learning_strategies.py index a1cc94e4..15eb16ac 100644 --- a/assume/strategies/learning_strategies.py +++ b/assume/strategies/learning_strategies.py @@ -168,7 +168,7 @@ def get_actions(self, next_observation): :rtype: torch.Tensor """ - # distinction whetere we are in learning mode or not to handle exploration realised with noise + # distinction whether we are in learning mode or not to handle exploration realised with noise if self.learning_mode: # if we are in learning mode the first x episodes we want to explore the entire action space # to get a good initial experience, in the area around the costs of the agent diff --git a/assume/world.py b/assume/world.py index d4df257e..f13ce4e0 100644 --- a/assume/world.py +++ b/assume/world.py @@ -4,12 +4,14 @@ import sys import time from datetime import datetime +from sys import platform import nest_asyncio import pandas as pd from mango import RoleAgent, create_container from mango.container.core import Container from mango.util.clock import ExternalClock +from mango.util.distributed_clock import DistributedClockAgent, DistributedClockManager from mango.util.termination_detection import tasks_complete_or_sleeping from sqlalchemy import create_engine from sqlalchemy.exc import OperationalError @@ -42,12 +44,13 @@ def __init__( database_uri: str = "", export_csv_path: str = "", log_level: str = "INFO", - additional_clearing_mechanisms: dict = {}, + distributed_role: bool = None, ): logging.getLogger("assume").setLevel(log_level) self.logger = logging.getLogger(__name__) self.addr = addr self.container = None + self.distributed_role = distributed_role self.export_csv_path = export_csv_path # intialize db connection at beginning of simulation @@ -91,7 +94,7 @@ def __init__( e, ) self.clearing_mechanisms: dict[str, MarketRole] = clearing_mechanisms - self.clearing_mechanisms.update(additional_clearing_mechanisms) + self.addresses = [] nest_asyncio.apply() self.loop = asyncio.get_event_loop() asyncio.set_event_loop(self.loop) @@ -103,51 +106,66 @@ async def setup( simulation_id: str, index: pd.Series, save_frequency_hours: int = 24, - same_process: bool = True, bidding_params: dict = {}, learning_config: LearningConfig = {}, forecaster: Forecaster = None, + manager_address=None, ): self.clock = ExternalClock(0) self.start = start self.end = end self.learning_config = learning_config + # initiate learning if the learning mode is on and hence we want to learn new strategies + self.evaluation_mode = self.learning_config.get("evaluation_mode", False) # forecaster is used only when loading custom unit types self.forecaster = forecaster self.bidding_params = bidding_params self.index = index - self.same_process = same_process # kill old container if exists if isinstance(self.container, Container) and self.container.running: await self.container.shutdown() # create new container + container_kwargs = {} if self.addr == "world": connection_type = "external_connection" elif isinstance(self.addr, tuple): connection_type = "tcp" else: connection_type = "mqtt" + container_kwargs["mqtt_kwargs"] = { + "broker_addr": "localhost", + "client_id": self.addr, + } self.container = await create_container( connection_type=connection_type, codec=mango_codec_factory(), addr=self.addr, clock=self.clock, + **container_kwargs, ) - await self.setup_learning() - await self.setup_output_agent(simulation_id, save_frequency_hours) + self.learning_mode = self.learning_config.get("learning_mode", False) + self.output_agent_addr = (self.addr, "export_agent_1") + if self.distributed_role is True: + await self.setup_learning() + await self.setup_output_agent(simulation_id, save_frequency_hours) + self.clock_manager = DistributedClockManager( + self.container, receiver_clock_addresses=self.addresses + ) + elif self.distributed_role is None: + await self.setup_learning() + await self.setup_output_agent(simulation_id, save_frequency_hours) + else: + self.clock_agent = DistributedClockAgent(self.container) + self.output_agent_addr = (manager_address, "export_agent_1") async def setup_learning(self): self.bidding_params.update(self.learning_config) - # initiate learning if the learning mode is on and hence we want to learn new strategies - self.learning_mode = self.learning_config.get("learning_mode", False) - self.evaluation_mode = self.learning_config.get("evaluation_mode", False) - if self.learning_mode: # if so, we initate the rl learning role with parameters from assume.reinforcement_learning.learning_role import Learning @@ -157,24 +175,14 @@ async def setup_learning(self): start=self.start, end=self.end, ) - # if self.same_process: # separate process does not support buffer and learning - if True: - self.learning_agent_addr = (self.addr, "learning_agent") - rl_agent = RoleAgent( - self.container, suggested_aid=self.learning_agent_addr[1] - ) - rl_agent.add_role(self.learning_role) - else: - - def creator(container): - agent = RoleAgent(container, suggested_aid="learning_agent") - agent.add_role(self.learning_role) - - await self.container.as_agent_process(agent_creator=creator) + self.learning_agent_addr = (self.addr, "learning_agent") + rl_agent = RoleAgent( + self.container, suggested_aid=self.learning_agent_addr[1] + ) + rl_agent.add_role(self.learning_role) async def setup_output_agent(self, simulation_id: str, save_frequency_hours: int): - self.output_agent_addr = (self.addr, "export_agent_1") # Add output agent to world self.logger.debug(f"creating output agent {self.db=} {self.export_csv_path=}") self.output_role = WriteOutput( @@ -187,20 +195,23 @@ async def setup_output_agent(self, simulation_id: str, save_frequency_hours: int learning_mode=self.learning_mode, evaluation_mode=self.evaluation_mode, ) - if self.same_process: - output_agent = RoleAgent( - self.container, suggested_aid=self.output_agent_addr[1] - ) - output_agent.add_role(self.output_role) - else: - # this does not set the clock in output_agent correctly yet - # see https://gitlab.com/mango-agents/mango/-/issues/59 - # but still improves performance + + # mango multiprocessing is currently only supported on linux + # with single + if platform == "linux" and self.distributed_role is None: + self.addresses.append(self.addr) + def creator(container): agent = RoleAgent(container, suggested_aid=self.output_agent_addr[1]) agent.add_role(self.output_role) + clock_agent = DistributedClockAgent(container) await self.container.as_agent_process(agent_creator=creator) + else: + output_agent = RoleAgent( + self.container, suggested_aid=self.output_agent_addr[1] + ) + output_agent.add_role(self.output_role) def add_unit_operator( self, @@ -369,12 +380,16 @@ def add_market( self.markets[f"{market_config.name}"] = market_config async def _step(self): - next_activity = self.clock.get_next_activity() + if self.distributed_role: + next_activity = await self.clock_manager.distribute_time() + else: + next_activity = self.clock.get_next_activity() if not next_activity: self.logger.info("simulation finished - no schedules left") return None delta = next_activity - self.clock.time self.clock.set_time(next_activity) + await tasks_complete_or_sleeping(self.container) return delta async def async_run(self, start_ts, end_ts): @@ -389,6 +404,8 @@ async def async_run(self, start_ts, end_ts): # allow registration before first opening self.clock.set_time(start_ts - 1) + if self.distributed_role: + await self.clock_manager.broadcast(self.clock.time) while self.clock.time < end_ts: await asyncio.sleep(0) delta = await self._step() @@ -400,8 +417,6 @@ async def async_run(self, start_ts, end_ts): ) else: self.clock.set_time(end_ts) - - await tasks_complete_or_sleeping(self.container) pbar.close() await self.container.shutdown() diff --git a/compose.yml b/compose.yml index 87feedf6..3c2d56fe 100644 --- a/compose.yml +++ b/compose.yml @@ -1,8 +1,8 @@ version: "3.9" services: assume_db: - container_name: assume_db image: timescale/timescaledb:latest-pg15 + container_name: assume_db restart: always environment: - POSTGRES_USER=assume @@ -48,18 +48,56 @@ services: placement: constraints: [node.role == manager] + # to enable rendering png screenshots directly from grafana + # for example to embed them in an iframe, you can use this renderer: image: grafana/grafana-image-renderer:latest - profiles: ["enable-renderer"] + container_name: renderer + profiles: ["renderer"] ports: - 8081 + # to run a single simulation in simulation: container_name: simulation image: ghcr.io/assume-framework/assume:latest - profiles: ["cli-only"] + profiles: ["simulation"] build: . depends_on: - assume_db command: -s example_02 -db "postgresql://assume:assume@assume_db:5432/assume" + + # to run the simulation distributed with MQTT + mqtt-broker: + container_name: mqtt-broker + image: eclipse-mosquitto:2 + restart: always + profiles: ["mqtt"] + ports: + - "1883:1883/tcp" + healthcheck: + test: "mosquitto_sub -t '$$SYS/#' -C 1 | grep -v Error || exit 1" + interval: 45s + timeout: 5s + retries: 5 + + # to run a distributed simulation with docker compose + simulation_mgr: + container_name: simulation_mgr + image: ghcr.io/assume-framework/assume:latest + profiles: ["mqtt"] + depends_on: + - assume_db + - mqtt-broker + entrypoint: python3 ./examples/distributed_world_manager.py + + simulation_client01: + container_name: simulation_client01 + image: ghcr.io/assume-framework/assume:latest + profiles: ["mqtt"] + build: . + depends_on: + - assume_db + - mqtt-broker + entrypoint: python3 ./examples/distributed_world_agent.py diff --git a/examples/distributed_world_agent.py b/examples/distributed_world_agent.py new file mode 100644 index 00000000..40aeba2c --- /dev/null +++ b/examples/distributed_world_agent.py @@ -0,0 +1,78 @@ +import logging +from datetime import datetime, timedelta + +import pandas as pd +from dateutil import rrule as rr + +from assume import World +from assume.common.forecasts import NaiveForecast +from assume.common.market_objects import MarketConfig, MarketProduct + +log = logging.getLogger(__name__) + +db_uri = "postgresql://assume:assume@localhost:5432/assume" + +manager_addr = ("0.0.0.0", 9099) +agent_adress = [("0.0.0.0", 9098)] +manager_addr = "manager" +agent_adress = "agent" + +world = World(database_uri=db_uri, addr=agent_adress, distributed_role=False) + + +async def worker(): + start = datetime(2023, 10, 4) + end = datetime(2023, 12, 5) + index = pd.date_range( + start=start, + end=end + timedelta(hours=24), + freq="H", + ) + sim_id = "handmade_simulation" + + await world.setup( + start=start, + end=end, + save_frequency_hours=48, + simulation_id=sim_id, + index=index, + manager_address=manager_addr, + ) + + marketdesign = [ + MarketConfig( + "EOM", + rr.rrule(rr.HOURLY, interval=24, dtstart=start, until=end), + timedelta(hours=1), + "pay_as_clear", + [MarketProduct(timedelta(hours=1), 1, timedelta(hours=1))], + additional_fields=["block_id", "link", "exclusive_id"], + ) + ] + + mo_id = "market_operator" + world.add_market_operator(id=mo_id) + for market_config in marketdesign: + world.add_market(mo_id, market_config) + + world.add_unit_operator("my_operator") + + nuclear_forecast = NaiveForecast(index, availability=1, fuel_price=3, co2_price=0.1) + world.add_unit( + "nuclear1", + "power_plant", + "my_operator", + { + "min_power": 200, + "max_power": 1000, + "bidding_strategies": {"energy": "naive"}, + "technology": "nuclear", + }, + nuclear_forecast, + ) + + await world.clock_agent.stopped + await world.container.shutdown() + + +world.loop.run_until_complete(worker()) diff --git a/examples/distributed_world_manager.py b/examples/distributed_world_manager.py new file mode 100644 index 00000000..2c123285 --- /dev/null +++ b/examples/distributed_world_manager.py @@ -0,0 +1,78 @@ +import logging +from datetime import datetime, timedelta + +import pandas as pd +from dateutil import rrule as rr + +from assume import World +from assume.common.forecasts import NaiveForecast +from assume.common.market_objects import MarketConfig, MarketProduct + +log = logging.getLogger(__name__) + +db_uri = "postgresql://assume:assume@localhost:5432/assume" + +manager_addr = ("0.0.0.0", 9099) +agent_adresses = [("0.0.0.0", 9098)] +manager_addr = "manager" +agent_adresses = ["agent"] + +world = World(database_uri=db_uri, addr=manager_addr, distributed_role=True) + + +async def init(): + start = datetime(2023, 10, 4) + end = datetime(2023, 12, 5) + index = pd.date_range( + start=start, + end=end + timedelta(hours=24), + freq="H", + ) + sim_id = "handmade_simulation" + world.addresses.extend(agent_adresses) + + await world.setup( + start=start, + end=end, + save_frequency_hours=48, + simulation_id=sim_id, + index=index, + ) + + marketdesign = [ + MarketConfig( + "EOM", + rr.rrule(rr.HOURLY, interval=24, dtstart=start, until=end), + timedelta(hours=1), + "pay_as_clear", + [MarketProduct(timedelta(hours=1), 1, timedelta(hours=1))], + additional_fields=["block_id", "link", "exclusive_id"], + ) + ] + + for market_config in marketdesign: + market_config.addr = agent_adresses[0] + market_config.aid = "market_operator" + world.markets[f"{market_config.name}"] = market_config + + world.add_unit_operator("my_demand") + world.add_unit( + "demand1", + "demand", + "my_demand", + # the unit_params have no hints + { + "min_power": 0, + "max_power": 1000, + "bidding_strategies": {"energy": "naive"}, + "technology": "demand", + }, + NaiveForecast(index, demand=100), + ) + + +world.loop.run_until_complete(init()) +import time + +time.sleep(3) +world.run()