diff --git a/assume/common/market_objects.py b/assume/common/market_objects.py index 2d4f49bd..78f99cde 100644 --- a/assume/common/market_objects.py +++ b/assume/common/market_objects.py @@ -36,9 +36,13 @@ class Order(TypedDict): :param end_time: the end time of the order :type end_time: datetime :param volume: the volume of the order - :type volume: int + :type volume: float :param price: the price of the order - :type price: int + :type price: float + :param accepted_volume: the accepted volume of the order + :type accepted_volume: float + :param accepted_price: the accepted price of the order + :type accepted_price: float :param only_hours: tuple of hours from which this order is available, on multi day products :type only_hours: OnlyHours | None :param agent_id: the id of the agent @@ -187,18 +191,18 @@ class OpeningMessage(TypedDict): :type context: str :param market_id: the id of the market :type market_id: str - :param start: the start time of the market - :type start: float - :param stop: the stop time of the market - :type stop: float + :param start_time: the start time of the market + :type start_time: float + :param end_time: the stop time of the market + :type end_time: float :param products: list of products which are available at the market to be traded :type products: list[Product] """ context: str market_id: str - start: float - stop: float + start_time: float + end_time: float products: list[Product] @@ -210,13 +214,16 @@ class ClearingMessage(TypedDict): :type context: str :param market_id: the id of the market :type market_id: str - :param orderbook: the orderbook of the market - :type orderbook: Orderbook + :param accepted_orders: the orders accepted by the market + :type accepted_orders: Orderbook + :param rejected_orders: the orders rejected by the market + :type rejected_orders: Orderbook """ context: str market_id: str - orderbook: Orderbook + accepted_orders: Orderbook + rejected_orders: Orderbook class OrderBookMessage(TypedDict): @@ -237,6 +244,14 @@ class RegistrationReplyMessage(TypedDict): accepted: bool +class DataRequestMessage(TypedDict): + context: str + market_id: str + metric: str + start_time: datetime + end_time: datetime + + class MetaDict(TypedDict): """ Message Meta of a FIPA ACL Message diff --git a/assume/common/outputs.py b/assume/common/outputs.py index 1c09e454..d17d84d4 100644 --- a/assume/common/outputs.py +++ b/assume/common/outputs.py @@ -130,6 +130,7 @@ def setup(self): """ Sets up the WriteOutput instance by subscribing to messages and scheduling recurrent tasks of storing the data. """ + super().setup() self.context.subscribe_message( self, @@ -355,6 +356,7 @@ async def on_stop(self): """ This function makes it possible to calculate Key Performance Indicators """ + await super().on_stop() # insert left records into db await self.store_dfs() diff --git a/assume/common/units_operator.py b/assume/common/units_operator.py index 3ef14334..0a2effb6 100644 --- a/assume/common/units_operator.py +++ b/assume/common/units_operator.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-or-later import logging +from collections import defaultdict from datetime import datetime from itertools import groupby from operator import itemgetter @@ -14,6 +15,7 @@ from assume.common.market_objects import ( ClearingMessage, + DataRequestMessage, MarketConfig, MetaDict, OpeningMessage, @@ -58,11 +60,12 @@ def __init__( self.use_portfolio_opt = opt_portfolio[0] self.portfolio_strategy = opt_portfolio[1] - # should be a list per product_type - self.valid_orders = [] + # valid_orders per product_type + self.valid_orders = defaultdict(list) self.units: dict[str, BaseUnit] = {} def setup(self): + super().setup() self.id = self.context.aid self.context.subscribe_message( self, @@ -82,6 +85,12 @@ def setup(self): lambda content, meta: content.get("context") == "registration", ) + self.context.subscribe_message( + self, + self.handle_data_request, + lambda content, meta: content.get("context") == "data_request", + ) + for market in self.available_markets: if self.participate(market): self.context.schedule_timestamp_task( @@ -149,6 +158,7 @@ async def register_market(self, market: MarketConfig): acl_metadata={ "sender_addr": self.context.addr, "sender_id": self.context.aid, + "reply_with": market.name, }, ), logger.debug(f"{self.id} sent market registration to {market.name}") @@ -164,9 +174,9 @@ def handle_opening(self, opening: OpeningMessage, meta: MetaDict): :type meta: MetaDict """ logger.debug( - f'{self.id} received opening from: {opening["market_id"]} {opening["start"]} until: {opening["stop"]}.' + f'{self.id} received opening from: {opening["market_id"]} {opening["start_time"]} until: {opening["end_time"]}.' ) - self.context.schedule_instant_task(coroutine=self.submit_bids(opening)) + self.context.schedule_instant_task(coroutine=self.submit_bids(opening, meta)) def handle_market_feedback(self, content: ClearingMessage, meta: MetaDict): """ @@ -190,11 +200,11 @@ def handle_market_feedback(self, content: ClearingMessage, meta: MetaDict): # map bid id to unit id order["unit_id"] = self.bids_map[order["bid_id"]] - self.valid_orders.extend(orderbook) marketconfig = self.registered_markets[content["market_id"]] + self.valid_orders[marketconfig.product_type].extend(orderbook) self.set_unit_dispatch(orderbook, marketconfig) self.write_learning_params(orderbook, marketconfig) - self.write_actual_dispatch() + self.write_actual_dispatch(marketconfig.product_type) def handle_registration_feedback( self, content: RegistrationMessage, meta: MetaDict @@ -214,6 +224,31 @@ def handle_registration_feedback( else: logger.error("Market %s did not accept registration", meta["sender_id"]) + def handle_data_request(self, content: DataRequestMessage, meta: MetaDict): + unit = content["unit"] + metric_type = content["metric"] + start = content["start_time"] + end = content["end_time"] + + data = [] + try: + data = self.units[unit].outputs[metric_type][start:end] + except Exception: + logger.exception("error handling data request") + self.context.schedule_instant_acl_message( + content={ + "context": "data_response", + "data": data, + }, + receiver_addr=meta["sender_addr"], + receiver_id=meta["sender_id"], + acl_metadata={ + "sender_addr": self.context.addr, + "sender_id": self.context.aid, + "in_reply_to": meta.get("reply_with"), + }, + ) + def set_unit_dispatch(self, orderbook: Orderbook, marketconfig: MarketConfig): """ feeds the current market result back to the units @@ -233,7 +268,7 @@ def set_unit_dispatch(self, orderbook: Orderbook, marketconfig: MarketConfig): orderbook=orderbook, ) - def write_actual_dispatch(self): + def write_actual_dispatch(self, product_type: str): """ sends the actual aggregated dispatch curve works across multiple markets @@ -250,7 +285,10 @@ def write_actual_dispatch(self): start = datetime.utcfromtimestamp(last) market_dispatch = aggregate_step_amount( - self.valid_orders, start, now, groupby=["market_id", "unit_id"] + self.valid_orders[product_type], + start, + now, + groupby=["market_id", "unit_id"], ) unit_dispatch_dfs = [] for unit_id, unit in self.units.items(): @@ -271,8 +309,11 @@ def write_actual_dispatch(self): data["unit"] = unit_id unit_dispatch_dfs.append(data) - self.valid_orders = list( - filter(lambda x: x["end_time"] > now, self.valid_orders) + self.valid_orders[product_type] = list( + filter( + lambda x: x["end_time"] > now, + self.valid_orders[product_type], + ) ) db_aid = self.context.data_dict.get("output_agent_id") @@ -299,7 +340,7 @@ def write_actual_dispatch(self): }, ) - async def submit_bids(self, opening: OpeningMessage): + async def submit_bids(self, opening: OpeningMessage, meta: MetaDict): """ formulates an orderbook and sends it to the market. This will handle optional portfolio processing @@ -332,6 +373,7 @@ async def submit_bids(self, opening: OpeningMessage): "sender_id": self.context.aid, "sender_addr": self.context.addr, "conversation_id": "conversation01", + "in_reply_to": meta.get("reply_with"), } await self.context.send_acl_message( content={ diff --git a/assume/markets/base_market.py b/assume/markets/base_market.py index 2b9c0dcf..3a9fd43c 100644 --- a/assume/markets/base_market.py +++ b/assume/markets/base_market.py @@ -13,6 +13,7 @@ from assume.common.market_objects import ( ClearingMessage, + DataRequestMessage, MarketConfig, MarketProduct, MetaDict, @@ -41,9 +42,11 @@ class MarketMechanism: name: str def __init__(self, marketconfig: MarketConfig): + super().__init__() self.marketconfig = marketconfig self.open_auctions = set() self.all_orders = [] + self.results = [] def validate_registration( self, content: RegistrationMessage, meta: MetaDict @@ -160,6 +163,7 @@ def setup(self): Schedules the opening() method to run at the next opening time of the market. """ + super().setup() self.marketconfig.addr = self.context.addr self.marketconfig.aid = self.context.aid @@ -201,6 +205,15 @@ def accept_get_unmatched(content: dict, meta: MetaDict): and content.get("market_id") == self.marketconfig.name ) + def accept_data_request(content: dict, meta: MetaDict): + return ( + content.get("context") == "data_request" + and content.get("market_id") == self.marketconfig.name + ) + + self.context.subscribe_message( + self, self.handle_data_request, accept_data_request + ) self.context.subscribe_message(self, self.handle_orderbook, accept_orderbook) self.context.subscribe_message( self, self.handle_registration, accept_registration @@ -235,8 +248,8 @@ async def opening(self): opening_message: OpeningMessage = { "context": "opening", "market_id": self.marketconfig.name, - "start": market_open, - "stop": market_closing, + "start_time": market_open, + "end_time": market_closing, "products": products, } @@ -251,6 +264,7 @@ async def opening(self): acl_metadata={ "sender_addr": self.context.addr, "sender_id": self.context.aid, + "reply_with": f"{self.marketconfig.name}_{market_open}", }, ) @@ -303,6 +317,7 @@ def handle_registration(self, content: RegistrationMessage, meta: MetaDict): acl_metadata={ "sender_addr": self.context.addr, "sender_id": self.context.aid, + "in_reply_to": meta.get("reply_with"), }, ) @@ -334,10 +349,38 @@ def handle_orderbook(self, content: OrderBookMessage, meta: MetaDict): acl_metadata={ "sender_addr": self.context.addr, "sender_id": self.context.aid, - "in_reply_to": 1, + "in_reply_to": meta.get("reply_with"), }, ) + def handle_data_request(self, content: DataRequestMessage, meta: MetaDict): + metric_type = content["metric"] + start = content["start_time"] + end = content["end_time"] + + data = [] + try: + import pandas as pd + + data = pd.DataFrame(self.results) + data.index = data["time"] + data = data[metric_type][start:end] + except Exception: + logger.exception("error handling data request") + self.context.schedule_instant_acl_message( + content={ + "context": "data_response", + "data": data, + }, + receiver_addr=meta["sender_addr"], + receiver_id=meta["sender_id"], + acl_metadata={ + "sender_addr": self.context.addr, + "sender_id": self.context.aid, + "in_reply_to": meta.get("reply_with"), + }, + ) + def handle_get_unmatched(self, content: dict, meta: MetaDict): """ A handler which sends the orderbook with unmatched orders to an agent. @@ -439,6 +482,7 @@ async def clear_market(self, market_products: list[MarketProduct]): ) meta["market_id"] = self.marketconfig.name meta["time"] = meta["product_start"] + self.results.append(meta) await self.store_market_results(market_meta)