Skip to content

Commit

Permalink
add data_request mechanism to market and units_operator
Browse files Browse the repository at this point in the history
do not shadow on_stop and setup methods
unify usage of start_time and end_time
  • Loading branch information
maurerle committed Nov 10, 2023
1 parent 53c7563 commit 27d1ff0
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 25 deletions.
37 changes: 26 additions & 11 deletions assume/common/market_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ class Order(TypedDict):
:param end_time: the end time of the order
:type end_time: datetime
:param volume: the volume of the order
:type volume: int
:type volume: float
:param price: the price of the order
:type price: int
:type price: float
:param accepted_volume: the accepted volume of the order
:type accepted_volume: float
:param accepted_price: the accepted price of the order
:type accepted_price: float
:param only_hours: tuple of hours from which this order is available, on multi day products
:type only_hours: OnlyHours | None
:param agent_id: the id of the agent
Expand Down Expand Up @@ -187,18 +191,18 @@ class OpeningMessage(TypedDict):
:type context: str
:param market_id: the id of the market
:type market_id: str
:param start: the start time of the market
:type start: float
:param stop: the stop time of the market
:type stop: float
:param start_time: the start time of the market
:type start_time: float
:param end_time: the stop time of the market
:type end_time: float
:param products: list of products which are available at the market to be traded
:type products: list[Product]
"""

context: str
market_id: str
start: float
stop: float
start_time: float
end_time: float
products: list[Product]


Expand All @@ -210,13 +214,16 @@ class ClearingMessage(TypedDict):
:type context: str
:param market_id: the id of the market
:type market_id: str
:param orderbook: the orderbook of the market
:type orderbook: Orderbook
:param accepted_orders: the orders accepted by the market
:type accepted_orders: Orderbook
:param rejected_orders: the orders rejected by the market
:type rejected_orders: Orderbook
"""

context: str
market_id: str
orderbook: Orderbook
accepted_orders: Orderbook
rejected_orders: Orderbook


class OrderBookMessage(TypedDict):
Expand All @@ -237,6 +244,14 @@ class RegistrationReplyMessage(TypedDict):
accepted: bool


class DataRequestMessage(TypedDict):
context: str
market_id: str
metric: str
start_time: datetime
end_time: datetime


class MetaDict(TypedDict):
"""
Message Meta of a FIPA ACL Message
Expand Down
2 changes: 2 additions & 0 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def setup(self):
"""
Sets up the WriteOutput instance by subscribing to messages and scheduling recurrent tasks of storing the data.
"""
super().setup()

Check warning on line 133 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L133

Added line #L133 was not covered by tests

self.context.subscribe_message(
self,
Expand Down Expand Up @@ -355,6 +356,7 @@ async def on_stop(self):
"""
This function makes it possible to calculate Key Performance Indicators
"""
await super().on_stop()

Check warning on line 359 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L359

Added line #L359 was not covered by tests

# insert left records into db
await self.store_dfs()
Expand Down
64 changes: 53 additions & 11 deletions assume/common/units_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-or-later

import logging
from collections import defaultdict
from datetime import datetime
from itertools import groupby
from operator import itemgetter
Expand All @@ -14,6 +15,7 @@

from assume.common.market_objects import (
ClearingMessage,
DataRequestMessage,
MarketConfig,
MetaDict,
OpeningMessage,
Expand Down Expand Up @@ -58,11 +60,12 @@ def __init__(
self.use_portfolio_opt = opt_portfolio[0]
self.portfolio_strategy = opt_portfolio[1]

# should be a list per product_type
self.valid_orders = []
# valid_orders per product_type
self.valid_orders = defaultdict(list)
self.units: dict[str, BaseUnit] = {}

def setup(self):
super().setup()
self.id = self.context.aid
self.context.subscribe_message(
self,
Expand All @@ -82,6 +85,12 @@ def setup(self):
lambda content, meta: content.get("context") == "registration",
)

self.context.subscribe_message(
self,
self.handle_data_request,
lambda content, meta: content.get("context") == "data_request",
)

for market in self.available_markets:
if self.participate(market):
self.context.schedule_timestamp_task(
Expand Down Expand Up @@ -149,6 +158,7 @@ async def register_market(self, market: MarketConfig):
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"reply_with": market.name,
},
),
logger.debug(f"{self.id} sent market registration to {market.name}")
Expand All @@ -164,9 +174,9 @@ def handle_opening(self, opening: OpeningMessage, meta: MetaDict):
:type meta: MetaDict
"""
logger.debug(
f'{self.id} received opening from: {opening["market_id"]} {opening["start"]} until: {opening["stop"]}.'
f'{self.id} received opening from: {opening["market_id"]} {opening["start_time"]} until: {opening["end_time"]}.'
)
self.context.schedule_instant_task(coroutine=self.submit_bids(opening))
self.context.schedule_instant_task(coroutine=self.submit_bids(opening, meta))

def handle_market_feedback(self, content: ClearingMessage, meta: MetaDict):
"""
Expand All @@ -190,11 +200,11 @@ def handle_market_feedback(self, content: ClearingMessage, meta: MetaDict):
# map bid id to unit id
order["unit_id"] = self.bids_map[order["bid_id"]]

self.valid_orders.extend(orderbook)
marketconfig = self.registered_markets[content["market_id"]]
self.valid_orders[marketconfig.product_type].extend(orderbook)
self.set_unit_dispatch(orderbook, marketconfig)
self.write_learning_params(orderbook, marketconfig)
self.write_actual_dispatch()
self.write_actual_dispatch(marketconfig.product_type)

def handle_registration_feedback(
self, content: RegistrationMessage, meta: MetaDict
Expand All @@ -214,6 +224,31 @@ def handle_registration_feedback(
else:
logger.error("Market %s did not accept registration", meta["sender_id"])

def handle_data_request(self, content: DataRequestMessage, meta: MetaDict):
unit = content["unit"]
metric_type = content["metric"]
start = content["start_time"]
end = content["end_time"]

Check warning on line 231 in assume/common/units_operator.py

View check run for this annotation

Codecov / codecov/patch

assume/common/units_operator.py#L228-L231

Added lines #L228 - L231 were not covered by tests

data = []
try:
data = self.units[unit].outputs[metric_type][start:end]
except Exception:
logger.exception("error handling data request")
self.context.schedule_instant_acl_message(

Check warning on line 238 in assume/common/units_operator.py

View check run for this annotation

Codecov / codecov/patch

assume/common/units_operator.py#L233-L238

Added lines #L233 - L238 were not covered by tests
content={
"context": "data_response",
"data": data,
},
receiver_addr=meta["sender_addr"],
receiver_id=meta["sender_id"],
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": meta.get("reply_with"),
},
)

def set_unit_dispatch(self, orderbook: Orderbook, marketconfig: MarketConfig):
"""
feeds the current market result back to the units
Expand All @@ -233,7 +268,7 @@ def set_unit_dispatch(self, orderbook: Orderbook, marketconfig: MarketConfig):
orderbook=orderbook,
)

def write_actual_dispatch(self):
def write_actual_dispatch(self, product_type: str):
"""
sends the actual aggregated dispatch curve
works across multiple markets
Expand All @@ -250,7 +285,10 @@ def write_actual_dispatch(self):
start = datetime.utcfromtimestamp(last)

market_dispatch = aggregate_step_amount(
self.valid_orders, start, now, groupby=["market_id", "unit_id"]
self.valid_orders[product_type],
start,
now,
groupby=["market_id", "unit_id"],
)
unit_dispatch_dfs = []
for unit_id, unit in self.units.items():
Expand All @@ -271,8 +309,11 @@ def write_actual_dispatch(self):
data["unit"] = unit_id
unit_dispatch_dfs.append(data)

self.valid_orders = list(
filter(lambda x: x["end_time"] > now, self.valid_orders)
self.valid_orders[product_type] = list(
filter(
lambda x: x["end_time"] > now,
self.valid_orders[product_type],
)
)

db_aid = self.context.data_dict.get("output_agent_id")
Expand All @@ -299,7 +340,7 @@ def write_actual_dispatch(self):
},
)

async def submit_bids(self, opening: OpeningMessage):
async def submit_bids(self, opening: OpeningMessage, meta: MetaDict):
"""
formulates an orderbook and sends it to the market.
This will handle optional portfolio processing
Expand Down Expand Up @@ -332,6 +373,7 @@ async def submit_bids(self, opening: OpeningMessage):
"sender_id": self.context.aid,
"sender_addr": self.context.addr,
"conversation_id": "conversation01",
"in_reply_to": meta.get("reply_with"),
}
await self.context.send_acl_message(
content={
Expand Down
50 changes: 47 additions & 3 deletions assume/markets/base_market.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from assume.common.market_objects import (
ClearingMessage,
DataRequestMessage,
MarketConfig,
MarketProduct,
MetaDict,
Expand Down Expand Up @@ -41,9 +42,11 @@ class MarketMechanism:
name: str

def __init__(self, marketconfig: MarketConfig):
super().__init__()
self.marketconfig = marketconfig
self.open_auctions = set()
self.all_orders = []
self.results = []

def validate_registration(
self, content: RegistrationMessage, meta: MetaDict
Expand Down Expand Up @@ -160,6 +163,7 @@ def setup(self):
Schedules the opening() method to run at the next opening time of the market.
"""
super().setup()
self.marketconfig.addr = self.context.addr
self.marketconfig.aid = self.context.aid

Expand Down Expand Up @@ -201,6 +205,15 @@ def accept_get_unmatched(content: dict, meta: MetaDict):
and content.get("market_id") == self.marketconfig.name
)

def accept_data_request(content: dict, meta: MetaDict):
return (
content.get("context") == "data_request"
and content.get("market_id") == self.marketconfig.name
)

self.context.subscribe_message(
self, self.handle_data_request, accept_data_request
)
self.context.subscribe_message(self, self.handle_orderbook, accept_orderbook)
self.context.subscribe_message(
self, self.handle_registration, accept_registration
Expand Down Expand Up @@ -235,8 +248,8 @@ async def opening(self):
opening_message: OpeningMessage = {
"context": "opening",
"market_id": self.marketconfig.name,
"start": market_open,
"stop": market_closing,
"start_time": market_open,
"end_time": market_closing,
"products": products,
}

Expand All @@ -251,6 +264,7 @@ async def opening(self):
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"reply_with": f"{self.marketconfig.name}_{market_open}",
},
)

Expand Down Expand Up @@ -303,6 +317,7 @@ def handle_registration(self, content: RegistrationMessage, meta: MetaDict):
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": meta.get("reply_with"),
},
)

Expand Down Expand Up @@ -334,10 +349,38 @@ def handle_orderbook(self, content: OrderBookMessage, meta: MetaDict):
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": 1,
"in_reply_to": meta.get("reply_with"),
},
)

def handle_data_request(self, content: DataRequestMessage, meta: MetaDict):
metric_type = content["metric"]
start = content["start_time"]
end = content["end_time"]

Check warning on line 359 in assume/markets/base_market.py

View check run for this annotation

Codecov / codecov/patch

assume/markets/base_market.py#L357-L359

Added lines #L357 - L359 were not covered by tests

data = []
try:
import pandas as pd

Check warning on line 363 in assume/markets/base_market.py

View check run for this annotation

Codecov / codecov/patch

assume/markets/base_market.py#L361-L363

Added lines #L361 - L363 were not covered by tests

data = pd.DataFrame(self.results)
data.index = data["time"]
data = data[metric_type][start:end]
except Exception:
logger.exception("error handling data request")
self.context.schedule_instant_acl_message(

Check warning on line 370 in assume/markets/base_market.py

View check run for this annotation

Codecov / codecov/patch

assume/markets/base_market.py#L365-L370

Added lines #L365 - L370 were not covered by tests
content={
"context": "data_response",
"data": data,
},
receiver_addr=meta["sender_addr"],
receiver_id=meta["sender_id"],
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": meta.get("reply_with"),
},
)

def handle_get_unmatched(self, content: dict, meta: MetaDict):
"""
A handler which sends the orderbook with unmatched orders to an agent.
Expand Down Expand Up @@ -439,6 +482,7 @@ async def clear_market(self, market_products: list[MarketProduct]):
)
meta["market_id"] = self.marketconfig.name
meta["time"] = meta["product_start"]
self.results.append(meta)

await self.store_market_results(market_meta)

Expand Down

0 comments on commit 27d1ff0

Please sign in to comment.