diff --git a/assume/common/mango_serializer.py b/assume/common/mango_serializer.py index 7cddeae2..43cec516 100644 --- a/assume/common/mango_serializer.py +++ b/assume/common/mango_serializer.py @@ -1,8 +1,20 @@ +import calendar import pickle +from datetime import datetime from mango.messages.codecs import JSON, GenericProtoMsg +def datetime_json_serializer(): + def __tostring__(dt: datetime): + return calendar.timegm(dt.utctimetuple()) + + def __fromstring__(dt: datetime): + return datetime.utcfromtimestamp(dt) + + return datetime, __tostring__, __fromstring__ + + def generic_json_serializer(): def __tostring__(generic_obj): return pickle.dumps(generic_obj).hex() @@ -30,5 +42,6 @@ def __fromstring__(data): def mango_codec_factory(): codec = JSON() + codec.add_serializer(*datetime_json_serializer()) codec.add_serializer(*generic_json_serializer()) return codec diff --git a/assume/common/units_operator.py b/assume/common/units_operator.py index 7abbec79..dad50389 100644 --- a/assume/common/units_operator.py +++ b/assume/common/units_operator.py @@ -125,7 +125,7 @@ def register_market(self, market: MarketConfig): self.context.schedule_timestamp_task( self.context.send_acl_message( {"context": "registration", "market": market.name}, - market.addr, + receiver_addr=market.addr, receiver_id=market.aid, acl_metadata={ "sender_addr": self.context.addr, diff --git a/assume/markets/base_market.py b/assume/markets/base_market.py index 68c6a51b..97093277 100644 --- a/assume/markets/base_market.py +++ b/assume/markets/base_market.py @@ -44,7 +44,7 @@ def validate_registration(self, meta: dict) -> bool: """ return True - def validate_orderbook(self, orderbook: Orderbook, agent_tuple) -> None: + def validate_orderbook(self, orderbook: Orderbook, agent_tuple: tuple) -> None: """ method to validate a given orderbook This is needed to check if all required fields for this mechanism are present @@ -154,27 +154,35 @@ def setup(self): for field in self.required_fields: assert field in self.marketconfig.additional_fields, "missing field" - def accept_orderbook(content: dict, meta): + def accept_orderbook(content: dict, meta: dict): if not isinstance(content, dict): return False + if isinstance(meta["sender_addr"], list): + meta["sender_addr"] = tuple(meta["sender_addr"]) + return ( content.get("market") == self.marketconfig.name and content.get("orderbook") is not None and (meta["sender_addr"], meta["sender_id"]) in self.registered_agents ) - def accept_registration(content: dict, meta): + def accept_registration(content: dict, meta: dict): if not isinstance(content, dict): return False + if isinstance(meta["sender_addr"], list): + meta["sender_addr"] = tuple(meta["sender_addr"]) + return ( content.get("context") == "registration" and content.get("market") == self.marketconfig.name ) - def accept_get_unmatched(content: dict, meta): + def accept_get_unmatched(content: dict, meta: dict): if not isinstance(content, dict): return False + if isinstance(meta["sender_addr"], list): + meta["sender_addr"] = tuple(meta["sender_addr"]) return ( content.get("context") == "get_unmatched" and content.get("market") == self.marketconfig.name @@ -225,7 +233,7 @@ async def opening(self): agent_addr, agent_id = agent await self.context.send_acl_message( opening_message, - agent_addr, + receiver_addr=agent_addr, receiver_id=agent_id, acl_metadata={ "sender_addr": self.context.addr, diff --git a/assume/strategies/flexable.py b/assume/strategies/flexable.py index ddd3f593..352e6151 100644 --- a/assume/strategies/flexable.py +++ b/assume/strategies/flexable.py @@ -614,23 +614,6 @@ def calculate_EOM_price_if_on( return bid_price_inflex -def get_starting_costs(time, unit): - """ - Calculates the starting costs of a unit - - :return: The starting costs of the unit - :rtype: float - """ - if time < unit.downtime_hot_start: - return unit.hot_start_cost - - elif time < unit.downtime_warm_start: - return unit.warm_start_cost - - else: - return unit.cold_start_cost - - def get_specific_revenue( unit: SupportsMinMax, marginal_cost: float, diff --git a/assume/strategies/learning_strategies.py b/assume/strategies/learning_strategies.py index a1cc94e4..15eb16ac 100644 --- a/assume/strategies/learning_strategies.py +++ b/assume/strategies/learning_strategies.py @@ -168,7 +168,7 @@ def get_actions(self, next_observation): :rtype: torch.Tensor """ - # distinction whetere we are in learning mode or not to handle exploration realised with noise + # distinction whether we are in learning mode or not to handle exploration realised with noise if self.learning_mode: # if we are in learning mode the first x episodes we want to explore the entire action space # to get a good initial experience, in the area around the costs of the agent diff --git a/assume/world.py b/assume/world.py index d4df257e..f13ce4e0 100644 --- a/assume/world.py +++ b/assume/world.py @@ -4,12 +4,14 @@ import sys import time from datetime import datetime +from sys import platform import nest_asyncio import pandas as pd from mango import RoleAgent, create_container from mango.container.core import Container from mango.util.clock import ExternalClock +from mango.util.distributed_clock import DistributedClockAgent, DistributedClockManager from mango.util.termination_detection import tasks_complete_or_sleeping from sqlalchemy import create_engine from sqlalchemy.exc import OperationalError @@ -42,12 +44,13 @@ def __init__( database_uri: str = "", export_csv_path: str = "", log_level: str = "INFO", - additional_clearing_mechanisms: dict = {}, + distributed_role: bool = None, ): logging.getLogger("assume").setLevel(log_level) self.logger = logging.getLogger(__name__) self.addr = addr self.container = None + self.distributed_role = distributed_role self.export_csv_path = export_csv_path # intialize db connection at beginning of simulation @@ -91,7 +94,7 @@ def __init__( e, ) self.clearing_mechanisms: dict[str, MarketRole] = clearing_mechanisms - self.clearing_mechanisms.update(additional_clearing_mechanisms) + self.addresses = [] nest_asyncio.apply() self.loop = asyncio.get_event_loop() asyncio.set_event_loop(self.loop) @@ -103,51 +106,66 @@ async def setup( simulation_id: str, index: pd.Series, save_frequency_hours: int = 24, - same_process: bool = True, bidding_params: dict = {}, learning_config: LearningConfig = {}, forecaster: Forecaster = None, + manager_address=None, ): self.clock = ExternalClock(0) self.start = start self.end = end self.learning_config = learning_config + # initiate learning if the learning mode is on and hence we want to learn new strategies + self.evaluation_mode = self.learning_config.get("evaluation_mode", False) # forecaster is used only when loading custom unit types self.forecaster = forecaster self.bidding_params = bidding_params self.index = index - self.same_process = same_process # kill old container if exists if isinstance(self.container, Container) and self.container.running: await self.container.shutdown() # create new container + container_kwargs = {} if self.addr == "world": connection_type = "external_connection" elif isinstance(self.addr, tuple): connection_type = "tcp" else: connection_type = "mqtt" + container_kwargs["mqtt_kwargs"] = { + "broker_addr": "localhost", + "client_id": self.addr, + } self.container = await create_container( connection_type=connection_type, codec=mango_codec_factory(), addr=self.addr, clock=self.clock, + **container_kwargs, ) - await self.setup_learning() - await self.setup_output_agent(simulation_id, save_frequency_hours) + self.learning_mode = self.learning_config.get("learning_mode", False) + self.output_agent_addr = (self.addr, "export_agent_1") + if self.distributed_role is True: + await self.setup_learning() + await self.setup_output_agent(simulation_id, save_frequency_hours) + self.clock_manager = DistributedClockManager( + self.container, receiver_clock_addresses=self.addresses + ) + elif self.distributed_role is None: + await self.setup_learning() + await self.setup_output_agent(simulation_id, save_frequency_hours) + else: + self.clock_agent = DistributedClockAgent(self.container) + self.output_agent_addr = (manager_address, "export_agent_1") async def setup_learning(self): self.bidding_params.update(self.learning_config) - # initiate learning if the learning mode is on and hence we want to learn new strategies - self.learning_mode = self.learning_config.get("learning_mode", False) - self.evaluation_mode = self.learning_config.get("evaluation_mode", False) - if self.learning_mode: # if so, we initate the rl learning role with parameters from assume.reinforcement_learning.learning_role import Learning @@ -157,24 +175,14 @@ async def setup_learning(self): start=self.start, end=self.end, ) - # if self.same_process: # separate process does not support buffer and learning - if True: - self.learning_agent_addr = (self.addr, "learning_agent") - rl_agent = RoleAgent( - self.container, suggested_aid=self.learning_agent_addr[1] - ) - rl_agent.add_role(self.learning_role) - else: - - def creator(container): - agent = RoleAgent(container, suggested_aid="learning_agent") - agent.add_role(self.learning_role) - - await self.container.as_agent_process(agent_creator=creator) + self.learning_agent_addr = (self.addr, "learning_agent") + rl_agent = RoleAgent( + self.container, suggested_aid=self.learning_agent_addr[1] + ) + rl_agent.add_role(self.learning_role) async def setup_output_agent(self, simulation_id: str, save_frequency_hours: int): - self.output_agent_addr = (self.addr, "export_agent_1") # Add output agent to world self.logger.debug(f"creating output agent {self.db=} {self.export_csv_path=}") self.output_role = WriteOutput( @@ -187,20 +195,23 @@ async def setup_output_agent(self, simulation_id: str, save_frequency_hours: int learning_mode=self.learning_mode, evaluation_mode=self.evaluation_mode, ) - if self.same_process: - output_agent = RoleAgent( - self.container, suggested_aid=self.output_agent_addr[1] - ) - output_agent.add_role(self.output_role) - else: - # this does not set the clock in output_agent correctly yet - # see https://gitlab.com/mango-agents/mango/-/issues/59 - # but still improves performance + + # mango multiprocessing is currently only supported on linux + # with single + if platform == "linux" and self.distributed_role is None: + self.addresses.append(self.addr) + def creator(container): agent = RoleAgent(container, suggested_aid=self.output_agent_addr[1]) agent.add_role(self.output_role) + clock_agent = DistributedClockAgent(container) await self.container.as_agent_process(agent_creator=creator) + else: + output_agent = RoleAgent( + self.container, suggested_aid=self.output_agent_addr[1] + ) + output_agent.add_role(self.output_role) def add_unit_operator( self, @@ -369,12 +380,16 @@ def add_market( self.markets[f"{market_config.name}"] = market_config async def _step(self): - next_activity = self.clock.get_next_activity() + if self.distributed_role: + next_activity = await self.clock_manager.distribute_time() + else: + next_activity = self.clock.get_next_activity() if not next_activity: self.logger.info("simulation finished - no schedules left") return None delta = next_activity - self.clock.time self.clock.set_time(next_activity) + await tasks_complete_or_sleeping(self.container) return delta async def async_run(self, start_ts, end_ts): @@ -389,6 +404,8 @@ async def async_run(self, start_ts, end_ts): # allow registration before first opening self.clock.set_time(start_ts - 1) + if self.distributed_role: + await self.clock_manager.broadcast(self.clock.time) while self.clock.time < end_ts: await asyncio.sleep(0) delta = await self._step() @@ -400,8 +417,6 @@ async def async_run(self, start_ts, end_ts): ) else: self.clock.set_time(end_ts) - - await tasks_complete_or_sleeping(self.container) pbar.close() await self.container.shutdown() diff --git a/compose.yml b/compose.yml index 87feedf6..3c2d56fe 100644 --- a/compose.yml +++ b/compose.yml @@ -1,8 +1,8 @@ version: "3.9" services: assume_db: - container_name: assume_db image: timescale/timescaledb:latest-pg15 + container_name: assume_db restart: always environment: - POSTGRES_USER=assume @@ -48,18 +48,56 @@ services: placement: constraints: [node.role == manager] + # to enable rendering png screenshots directly from grafana + # for example to embed them in an iframe, you can use this renderer: image: grafana/grafana-image-renderer:latest - profiles: ["enable-renderer"] + container_name: renderer + profiles: ["renderer"] ports: - 8081 + # to run a single simulation in simulation: container_name: simulation image: ghcr.io/assume-framework/assume:latest - profiles: ["cli-only"] + profiles: ["simulation"] build: . depends_on: - assume_db command: -s example_02 -db "postgresql://assume:assume@assume_db:5432/assume" + + # to run the simulation distributed with MQTT + mqtt-broker: + container_name: mqtt-broker + image: eclipse-mosquitto:2 + restart: always + profiles: ["mqtt"] + ports: + - "1883:1883/tcp" + healthcheck: + test: "mosquitto_sub -t '$$SYS/#' -C 1 | grep -v Error || exit 1" + interval: 45s + timeout: 5s + retries: 5 + + # to run a distributed simulation with docker compose + simulation_mgr: + container_name: simulation_mgr + image: ghcr.io/assume-framework/assume:latest + profiles: ["mqtt"] + depends_on: + - assume_db + - mqtt-broker + entrypoint: python3 ./examples/distributed_world_manager.py + + simulation_client01: + container_name: simulation_client01 + image: ghcr.io/assume-framework/assume:latest + profiles: ["mqtt"] + build: . + depends_on: + - assume_db + - mqtt-broker + entrypoint: python3 ./examples/distributed_world_agent.py diff --git a/examples/distributed_world_agent.py b/examples/distributed_world_agent.py new file mode 100644 index 00000000..40aeba2c --- /dev/null +++ b/examples/distributed_world_agent.py @@ -0,0 +1,78 @@ +import logging +from datetime import datetime, timedelta + +import pandas as pd +from dateutil import rrule as rr + +from assume import World +from assume.common.forecasts import NaiveForecast +from assume.common.market_objects import MarketConfig, MarketProduct + +log = logging.getLogger(__name__) + +db_uri = "postgresql://assume:assume@localhost:5432/assume" + +manager_addr = ("0.0.0.0", 9099) +agent_adress = [("0.0.0.0", 9098)] +manager_addr = "manager" +agent_adress = "agent" + +world = World(database_uri=db_uri, addr=agent_adress, distributed_role=False) + + +async def worker(): + start = datetime(2023, 10, 4) + end = datetime(2023, 12, 5) + index = pd.date_range( + start=start, + end=end + timedelta(hours=24), + freq="H", + ) + sim_id = "handmade_simulation" + + await world.setup( + start=start, + end=end, + save_frequency_hours=48, + simulation_id=sim_id, + index=index, + manager_address=manager_addr, + ) + + marketdesign = [ + MarketConfig( + "EOM", + rr.rrule(rr.HOURLY, interval=24, dtstart=start, until=end), + timedelta(hours=1), + "pay_as_clear", + [MarketProduct(timedelta(hours=1), 1, timedelta(hours=1))], + additional_fields=["block_id", "link", "exclusive_id"], + ) + ] + + mo_id = "market_operator" + world.add_market_operator(id=mo_id) + for market_config in marketdesign: + world.add_market(mo_id, market_config) + + world.add_unit_operator("my_operator") + + nuclear_forecast = NaiveForecast(index, availability=1, fuel_price=3, co2_price=0.1) + world.add_unit( + "nuclear1", + "power_plant", + "my_operator", + { + "min_power": 200, + "max_power": 1000, + "bidding_strategies": {"energy": "naive"}, + "technology": "nuclear", + }, + nuclear_forecast, + ) + + await world.clock_agent.stopped + await world.container.shutdown() + + +world.loop.run_until_complete(worker()) diff --git a/examples/distributed_world_manager.py b/examples/distributed_world_manager.py new file mode 100644 index 00000000..2c123285 --- /dev/null +++ b/examples/distributed_world_manager.py @@ -0,0 +1,78 @@ +import logging +from datetime import datetime, timedelta + +import pandas as pd +from dateutil import rrule as rr + +from assume import World +from assume.common.forecasts import NaiveForecast +from assume.common.market_objects import MarketConfig, MarketProduct + +log = logging.getLogger(__name__) + +db_uri = "postgresql://assume:assume@localhost:5432/assume" + +manager_addr = ("0.0.0.0", 9099) +agent_adresses = [("0.0.0.0", 9098)] +manager_addr = "manager" +agent_adresses = ["agent"] + +world = World(database_uri=db_uri, addr=manager_addr, distributed_role=True) + + +async def init(): + start = datetime(2023, 10, 4) + end = datetime(2023, 12, 5) + index = pd.date_range( + start=start, + end=end + timedelta(hours=24), + freq="H", + ) + sim_id = "handmade_simulation" + world.addresses.extend(agent_adresses) + + await world.setup( + start=start, + end=end, + save_frequency_hours=48, + simulation_id=sim_id, + index=index, + ) + + marketdesign = [ + MarketConfig( + "EOM", + rr.rrule(rr.HOURLY, interval=24, dtstart=start, until=end), + timedelta(hours=1), + "pay_as_clear", + [MarketProduct(timedelta(hours=1), 1, timedelta(hours=1))], + additional_fields=["block_id", "link", "exclusive_id"], + ) + ] + + for market_config in marketdesign: + market_config.addr = agent_adresses[0] + market_config.aid = "market_operator" + world.markets[f"{market_config.name}"] = market_config + + world.add_unit_operator("my_demand") + world.add_unit( + "demand1", + "demand", + "my_demand", + # the unit_params have no hints + { + "min_power": 0, + "max_power": 1000, + "bidding_strategies": {"energy": "naive"}, + "technology": "demand", + }, + NaiveForecast(index, demand=100), + ) + + +world.loop.run_until_complete(init()) +import time + +time.sleep(3) +world.run()