Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Request mechanism #247

Merged
merged 6 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions assume/common/market_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ class Order(TypedDict):
:param end_time: the end time of the order
:type end_time: datetime
:param volume: the volume of the order
:type volume: int
:type volume: float
:param price: the price of the order
:type price: int
:type price: float
:param accepted_volume: the accepted volume of the order
:type accepted_volume: float
:param accepted_price: the accepted price of the order
:type accepted_price: float
:param only_hours: tuple of hours from which this order is available, on multi day products
:type only_hours: OnlyHours | None
:param agent_id: the id of the agent
Expand Down Expand Up @@ -187,18 +191,18 @@ class OpeningMessage(TypedDict):
:type context: str
:param market_id: the id of the market
:type market_id: str
:param start: the start time of the market
:type start: float
:param stop: the stop time of the market
:type stop: float
:param start_time: the start time of the market
:type start_time: float
:param end_time: the stop time of the market
:type end_time: float
:param products: list of products which are available at the market to be traded
:type products: list[Product]
"""

context: str
market_id: str
start: float
stop: float
start_time: float
end_time: float
products: list[Product]


Expand All @@ -210,13 +214,16 @@ class ClearingMessage(TypedDict):
:type context: str
:param market_id: the id of the market
:type market_id: str
:param orderbook: the orderbook of the market
:type orderbook: Orderbook
:param accepted_orders: the orders accepted by the market
:type accepted_orders: Orderbook
:param rejected_orders: the orders rejected by the market
:type rejected_orders: Orderbook
"""

context: str
market_id: str
orderbook: Orderbook
accepted_orders: Orderbook
rejected_orders: Orderbook


class OrderBookMessage(TypedDict):
Expand All @@ -237,6 +244,14 @@ class RegistrationReplyMessage(TypedDict):
accepted: bool


class DataRequestMessage(TypedDict):
context: str
market_id: str
metric: str
start_time: datetime
end_time: datetime


class MetaDict(TypedDict):
"""
Message Meta of a FIPA ACL Message
Expand Down
2 changes: 2 additions & 0 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
"""
Sets up the WriteOutput instance by subscribing to messages and scheduling recurrent tasks of storing the data.
"""
super().setup()

Check warning on line 133 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L133

Added line #L133 was not covered by tests

self.context.subscribe_message(
self,
Expand Down Expand Up @@ -355,6 +356,7 @@
"""
This function makes it possible to calculate Key Performance Indicators
"""
await super().on_stop()

Check warning on line 359 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L359

Added line #L359 was not covered by tests

# insert left records into db
await self.store_dfs()
Expand Down
69 changes: 54 additions & 15 deletions assume/common/units_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-or-later

import logging
from collections import defaultdict
from datetime import datetime
from itertools import groupby
from operator import itemgetter
Expand All @@ -14,6 +15,7 @@

from assume.common.market_objects import (
ClearingMessage,
DataRequestMessage,
MarketConfig,
MetaDict,
OpeningMessage,
Expand Down Expand Up @@ -46,7 +48,6 @@
):
super().__init__()

self.bids_map = {}
self.available_markets = available_markets
self.registered_markets: dict[str, MarketConfig] = {}
self.last_sent_dispatch = 0
Expand All @@ -58,11 +59,12 @@
self.use_portfolio_opt = opt_portfolio[0]
self.portfolio_strategy = opt_portfolio[1]

# should be a list per product_type
self.valid_orders = []
# valid_orders per product_type
self.valid_orders = defaultdict(list)
self.units: dict[str, BaseUnit] = {}

def setup(self):
super().setup()
self.id = self.context.aid
self.context.subscribe_message(
self,
Expand All @@ -82,6 +84,12 @@
lambda content, meta: content.get("context") == "registration",
)

self.context.subscribe_message(
self,
self.handle_data_request,
lambda content, meta: content.get("context") == "data_request",
)

for market in self.available_markets:
if self.participate(market):
self.context.schedule_timestamp_task(
Expand Down Expand Up @@ -149,6 +157,7 @@
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"reply_with": market.name,
},
),
logger.debug(f"{self.id} sent market registration to {market.name}")
Expand All @@ -164,9 +173,9 @@
:type meta: MetaDict
"""
logger.debug(
f'{self.id} received opening from: {opening["market_id"]} {opening["start"]} until: {opening["stop"]}.'
f'{self.id} received opening from: {opening["market_id"]} {opening["start_time"]} until: {opening["end_time"]}.'
)
self.context.schedule_instant_task(coroutine=self.submit_bids(opening))
self.context.schedule_instant_task(coroutine=self.submit_bids(opening, meta))

def handle_market_feedback(self, content: ClearingMessage, meta: MetaDict):
"""
Expand All @@ -187,14 +196,12 @@

for order in orderbook:
order["market_id"] = content["market_id"]
# map bid id to unit id
order["unit_id"] = self.bids_map[order["bid_id"]]

self.valid_orders.extend(orderbook)
marketconfig = self.registered_markets[content["market_id"]]
self.valid_orders[marketconfig.product_type].extend(orderbook)
self.set_unit_dispatch(orderbook, marketconfig)
self.write_learning_params(orderbook, marketconfig)
self.write_actual_dispatch()
self.write_actual_dispatch(marketconfig.product_type)

def handle_registration_feedback(
self, content: RegistrationMessage, meta: MetaDict
Expand All @@ -214,6 +221,31 @@
else:
logger.error("Market %s did not accept registration", meta["sender_id"])

def handle_data_request(self, content: DataRequestMessage, meta: MetaDict):
unit = content["unit"]
metric_type = content["metric"]
start = content["start_time"]
end = content["end_time"]

data = []
try:
data = self.units[unit].outputs[metric_type][start:end]
except Exception:
logger.exception("error handling data request")

Check warning on line 234 in assume/common/units_operator.py

View check run for this annotation

Codecov / codecov/patch

assume/common/units_operator.py#L233-L234

Added lines #L233 - L234 were not covered by tests
self.context.schedule_instant_acl_message(
content={
"context": "data_response",
"data": data,
},
receiver_addr=meta["sender_addr"],
receiver_id=meta["sender_id"],
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": meta.get("reply_with"),
},
)

def set_unit_dispatch(self, orderbook: Orderbook, marketconfig: MarketConfig):
"""
feeds the current market result back to the units
Expand All @@ -233,7 +265,7 @@
orderbook=orderbook,
)

def write_actual_dispatch(self):
def write_actual_dispatch(self, product_type: str):
"""
sends the actual aggregated dispatch curve
works across multiple markets
Expand All @@ -250,7 +282,10 @@
start = datetime.utcfromtimestamp(last)

market_dispatch = aggregate_step_amount(
self.valid_orders, start, now, groupby=["market_id", "unit_id"]
self.valid_orders[product_type],
start,
now,
groupby=["market_id", "unit_id"],
)
unit_dispatch_dfs = []
for unit_id, unit in self.units.items():
Expand All @@ -269,8 +304,11 @@
data["unit"] = unit_id
unit_dispatch_dfs.append(data)

self.valid_orders = list(
filter(lambda x: x["end_time"] > now, self.valid_orders)
self.valid_orders[product_type] = list(
filter(
lambda x: x["end_time"] > now,
self.valid_orders[product_type],
)
)

db_aid = self.context.data_dict.get("output_agent_id")
Expand All @@ -297,7 +335,7 @@
},
)

async def submit_bids(self, opening: OpeningMessage):
async def submit_bids(self, opening: OpeningMessage, meta: MetaDict):
"""
formulates an orderbook and sends it to the market.
This will handle optional portfolio processing
Expand Down Expand Up @@ -330,6 +368,7 @@
"sender_id": self.context.aid,
"sender_addr": self.context.addr,
"conversation_id": "conversation01",
"in_reply_to": meta.get("reply_with"),
}
await self.context.send_acl_message(
content={
Expand Down Expand Up @@ -404,8 +443,8 @@
order["price"] = round(order["price"] / market.price_tick)

order["bid_id"] = f"{unit_id}_{i+1}"
order["unit_id"] = unit_id
orderbook.append(order)
self.bids_map[order["bid_id"]] = unit_id

return orderbook

Expand Down
50 changes: 47 additions & 3 deletions assume/markets/base_market.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from assume.common.market_objects import (
ClearingMessage,
DataRequestMessage,
MarketConfig,
MarketProduct,
MetaDict,
Expand Down Expand Up @@ -41,9 +42,11 @@ class MarketMechanism:
name: str

def __init__(self, marketconfig: MarketConfig):
super().__init__()
self.marketconfig = marketconfig
self.open_auctions = set()
self.all_orders = []
self.results = []

def validate_registration(
self, content: RegistrationMessage, meta: MetaDict
Expand Down Expand Up @@ -163,6 +166,7 @@ def setup(self):

Schedules the opening() method to run at the next opening time of the market.
"""
super().setup()
self.marketconfig.addr = self.context.addr
self.marketconfig.aid = self.context.aid

Expand Down Expand Up @@ -204,6 +208,15 @@ def accept_get_unmatched(content: dict, meta: MetaDict):
and content.get("market_id") == self.marketconfig.name
)

def accept_data_request(content: dict, meta: MetaDict):
return (
content.get("context") == "data_request"
and content.get("market_id") == self.marketconfig.name
)

self.context.subscribe_message(
self, self.handle_data_request, accept_data_request
)
self.context.subscribe_message(self, self.handle_orderbook, accept_orderbook)
self.context.subscribe_message(
self, self.handle_registration, accept_registration
Expand Down Expand Up @@ -238,8 +251,8 @@ async def opening(self):
opening_message: OpeningMessage = {
"context": "opening",
"market_id": self.marketconfig.name,
"start": market_open,
"stop": market_closing,
"start_time": market_open,
"end_time": market_closing,
"products": products,
}

Expand All @@ -254,6 +267,7 @@ async def opening(self):
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"reply_with": f"{self.marketconfig.name}_{market_open}",
},
)

Expand Down Expand Up @@ -306,6 +320,7 @@ def handle_registration(self, content: RegistrationMessage, meta: MetaDict):
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": meta.get("reply_with"),
},
)

Expand Down Expand Up @@ -337,10 +352,38 @@ def handle_orderbook(self, content: OrderBookMessage, meta: MetaDict):
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": 1,
"in_reply_to": meta.get("reply_with"),
},
)

def handle_data_request(self, content: DataRequestMessage, meta: MetaDict):
metric_type = content["metric"]
start = content["start_time"]
end = content["end_time"]

data = []
try:
import pandas as pd

data = pd.DataFrame(self.results)
data.index = data["time"]
data = data[metric_type][start:end]
except Exception:
logger.exception("error handling data request")
self.context.schedule_instant_acl_message(
content={
"context": "data_response",
"data": data,
},
receiver_addr=meta["sender_addr"],
receiver_id=meta["sender_id"],
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": meta.get("reply_with"),
},
)

def handle_get_unmatched(self, content: dict, meta: MetaDict):
"""
A handler which sends the orderbook with unmatched orders to an agent.
Expand Down Expand Up @@ -442,6 +485,7 @@ async def clear_market(self, market_products: list[MarketProduct]):
)
meta["market_id"] = self.marketconfig.name
meta["time"] = meta["product_start"]
self.results.append(meta)

await self.store_market_results(market_meta)

Expand Down
Loading