Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Request mechanism #247

Merged
merged 6 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions assume/common/market_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ class Order(TypedDict):
:param end_time: the end time of the order
:type end_time: datetime
:param volume: the volume of the order
:type volume: int
:type volume: float
:param price: the price of the order
:type price: int
:type price: float
:param accepted_volume: the accepted volume of the order
:type accepted_volume: float
:param accepted_price: the accepted price of the order
:type accepted_price: float
:param only_hours: tuple of hours from which this order is available, on multi day products
:type only_hours: OnlyHours | None
:param agent_id: the id of the agent
Expand Down Expand Up @@ -187,18 +191,18 @@ class OpeningMessage(TypedDict):
:type context: str
:param market_id: the id of the market
:type market_id: str
:param start: the start time of the market
:type start: float
:param stop: the stop time of the market
:type stop: float
:param start_time: the start time of the market
:type start_time: float
:param end_time: the stop time of the market
:type end_time: float
:param products: list of products which are available at the market to be traded
:type products: list[Product]
"""

context: str
market_id: str
start: float
stop: float
start_time: float
end_time: float
products: list[Product]


Expand All @@ -210,13 +214,16 @@ class ClearingMessage(TypedDict):
:type context: str
:param market_id: the id of the market
:type market_id: str
:param orderbook: the orderbook of the market
:type orderbook: Orderbook
:param accepted_orders: the orders accepted by the market
:type accepted_orders: Orderbook
:param rejected_orders: the orders rejected by the market
:type rejected_orders: Orderbook
"""

context: str
market_id: str
orderbook: Orderbook
accepted_orders: Orderbook
rejected_orders: Orderbook


class OrderBookMessage(TypedDict):
Expand All @@ -237,6 +244,14 @@ class RegistrationReplyMessage(TypedDict):
accepted: bool


class DataRequestMessage(TypedDict):
context: str
market_id: str
metric: str
start_time: datetime
end_time: datetime


class MetaDict(TypedDict):
"""
Message Meta of a FIPA ACL Message
Expand Down
2 changes: 2 additions & 0 deletions assume/common/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
"""
Sets up the WriteOutput instance by subscribing to messages and scheduling recurrent tasks of storing the data.
"""
super().setup()

Check warning on line 133 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L133

Added line #L133 was not covered by tests

self.context.subscribe_message(
self,
Expand Down Expand Up @@ -355,6 +356,7 @@
"""
This function makes it possible to calculate Key Performance Indicators
"""
await super().on_stop()

Check warning on line 359 in assume/common/outputs.py

View check run for this annotation

Codecov / codecov/patch

assume/common/outputs.py#L359

Added line #L359 was not covered by tests

# insert left records into db
await self.store_dfs()
Expand Down
64 changes: 53 additions & 11 deletions assume/common/units_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-or-later

import logging
from collections import defaultdict
from datetime import datetime
from itertools import groupby
from operator import itemgetter
Expand All @@ -14,6 +15,7 @@

from assume.common.market_objects import (
ClearingMessage,
DataRequestMessage,
MarketConfig,
MetaDict,
OpeningMessage,
Expand Down Expand Up @@ -58,11 +60,12 @@
self.use_portfolio_opt = opt_portfolio[0]
self.portfolio_strategy = opt_portfolio[1]

# should be a list per product_type
self.valid_orders = []
# valid_orders per product_type
self.valid_orders = defaultdict(list)
self.units: dict[str, BaseUnit] = {}

def setup(self):
super().setup()
self.id = self.context.aid
self.context.subscribe_message(
self,
Expand All @@ -82,6 +85,12 @@
lambda content, meta: content.get("context") == "registration",
)

self.context.subscribe_message(
self,
self.handle_data_request,
lambda content, meta: content.get("context") == "data_request",
)

for market in self.available_markets:
if self.participate(market):
self.context.schedule_timestamp_task(
Expand Down Expand Up @@ -149,6 +158,7 @@
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"reply_with": market.name,
},
),
logger.debug(f"{self.id} sent market registration to {market.name}")
Expand All @@ -164,9 +174,9 @@
:type meta: MetaDict
"""
logger.debug(
f'{self.id} received opening from: {opening["market_id"]} {opening["start"]} until: {opening["stop"]}.'
f'{self.id} received opening from: {opening["market_id"]} {opening["start_time"]} until: {opening["end_time"]}.'
)
self.context.schedule_instant_task(coroutine=self.submit_bids(opening))
self.context.schedule_instant_task(coroutine=self.submit_bids(opening, meta))

def handle_market_feedback(self, content: ClearingMessage, meta: MetaDict):
"""
Expand All @@ -190,11 +200,11 @@
# map bid id to unit id
order["unit_id"] = self.bids_map[order["bid_id"]]

self.valid_orders.extend(orderbook)
marketconfig = self.registered_markets[content["market_id"]]
self.valid_orders[marketconfig.product_type].extend(orderbook)
self.set_unit_dispatch(orderbook, marketconfig)
self.write_learning_params(orderbook, marketconfig)
self.write_actual_dispatch()
self.write_actual_dispatch(marketconfig.product_type)

def handle_registration_feedback(
self, content: RegistrationMessage, meta: MetaDict
Expand All @@ -214,6 +224,31 @@
else:
logger.error("Market %s did not accept registration", meta["sender_id"])

def handle_data_request(self, content: DataRequestMessage, meta: MetaDict):
unit = content["unit"]
metric_type = content["metric"]
start = content["start_time"]
end = content["end_time"]

Check warning on line 231 in assume/common/units_operator.py

View check run for this annotation

Codecov / codecov/patch

assume/common/units_operator.py#L228-L231

Added lines #L228 - L231 were not covered by tests

data = []
try:
data = self.units[unit].outputs[metric_type][start:end]
except Exception:
logger.exception("error handling data request")
self.context.schedule_instant_acl_message(

Check warning on line 238 in assume/common/units_operator.py

View check run for this annotation

Codecov / codecov/patch

assume/common/units_operator.py#L233-L238

Added lines #L233 - L238 were not covered by tests
content={
"context": "data_response",
"data": data,
},
receiver_addr=meta["sender_addr"],
receiver_id=meta["sender_id"],
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": meta.get("reply_with"),
},
)

def set_unit_dispatch(self, orderbook: Orderbook, marketconfig: MarketConfig):
"""
feeds the current market result back to the units
Expand All @@ -233,7 +268,7 @@
orderbook=orderbook,
)

def write_actual_dispatch(self):
def write_actual_dispatch(self, product_type: str):
"""
sends the actual aggregated dispatch curve
works across multiple markets
Expand All @@ -250,7 +285,10 @@
start = datetime.utcfromtimestamp(last)

market_dispatch = aggregate_step_amount(
self.valid_orders, start, now, groupby=["market_id", "unit_id"]
self.valid_orders[product_type],
start,
now,
groupby=["market_id", "unit_id"],
)
unit_dispatch_dfs = []
for unit_id, unit in self.units.items():
Expand All @@ -271,8 +309,11 @@
data["unit"] = unit_id
unit_dispatch_dfs.append(data)

self.valid_orders = list(
filter(lambda x: x["end_time"] > now, self.valid_orders)
self.valid_orders[product_type] = list(
filter(
lambda x: x["end_time"] > now,
self.valid_orders[product_type],
)
)

db_aid = self.context.data_dict.get("output_agent_id")
Expand All @@ -299,7 +340,7 @@
},
)

async def submit_bids(self, opening: OpeningMessage):
async def submit_bids(self, opening: OpeningMessage, meta: MetaDict):
"""
formulates an orderbook and sends it to the market.
This will handle optional portfolio processing
Expand Down Expand Up @@ -332,6 +373,7 @@
"sender_id": self.context.aid,
"sender_addr": self.context.addr,
"conversation_id": "conversation01",
"in_reply_to": meta.get("reply_with"),
}
await self.context.send_acl_message(
content={
Expand Down
50 changes: 47 additions & 3 deletions assume/markets/base_market.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from assume.common.market_objects import (
ClearingMessage,
DataRequestMessage,
MarketConfig,
MarketProduct,
MetaDict,
Expand Down Expand Up @@ -41,9 +42,11 @@
name: str

def __init__(self, marketconfig: MarketConfig):
super().__init__()
self.marketconfig = marketconfig
self.open_auctions = set()
self.all_orders = []
self.results = []

def validate_registration(
self, content: RegistrationMessage, meta: MetaDict
Expand Down Expand Up @@ -160,6 +163,7 @@

Schedules the opening() method to run at the next opening time of the market.
"""
super().setup()
self.marketconfig.addr = self.context.addr
self.marketconfig.aid = self.context.aid

Expand Down Expand Up @@ -201,6 +205,15 @@
and content.get("market_id") == self.marketconfig.name
)

def accept_data_request(content: dict, meta: MetaDict):
return (
content.get("context") == "data_request"
and content.get("market_id") == self.marketconfig.name
)

self.context.subscribe_message(
self, self.handle_data_request, accept_data_request
)
self.context.subscribe_message(self, self.handle_orderbook, accept_orderbook)
self.context.subscribe_message(
self, self.handle_registration, accept_registration
Expand Down Expand Up @@ -235,8 +248,8 @@
opening_message: OpeningMessage = {
"context": "opening",
"market_id": self.marketconfig.name,
"start": market_open,
"stop": market_closing,
"start_time": market_open,
"end_time": market_closing,
"products": products,
}

Expand All @@ -251,6 +264,7 @@
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"reply_with": f"{self.marketconfig.name}_{market_open}",
},
)

Expand Down Expand Up @@ -303,6 +317,7 @@
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": meta.get("reply_with"),
},
)

Expand Down Expand Up @@ -334,10 +349,38 @@
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": 1,
"in_reply_to": meta.get("reply_with"),
},
)

def handle_data_request(self, content: DataRequestMessage, meta: MetaDict):
metric_type = content["metric"]
start = content["start_time"]
end = content["end_time"]

Check warning on line 359 in assume/markets/base_market.py

View check run for this annotation

Codecov / codecov/patch

assume/markets/base_market.py#L357-L359

Added lines #L357 - L359 were not covered by tests

data = []
try:
import pandas as pd

Check warning on line 363 in assume/markets/base_market.py

View check run for this annotation

Codecov / codecov/patch

assume/markets/base_market.py#L361-L363

Added lines #L361 - L363 were not covered by tests

data = pd.DataFrame(self.results)
data.index = data["time"]
data = data[metric_type][start:end]
except Exception:
logger.exception("error handling data request")
self.context.schedule_instant_acl_message(

Check warning on line 370 in assume/markets/base_market.py

View check run for this annotation

Codecov / codecov/patch

assume/markets/base_market.py#L365-L370

Added lines #L365 - L370 were not covered by tests
content={
"context": "data_response",
"data": data,
},
receiver_addr=meta["sender_addr"],
receiver_id=meta["sender_id"],
acl_metadata={
"sender_addr": self.context.addr,
"sender_id": self.context.aid,
"in_reply_to": meta.get("reply_with"),
},
)

def handle_get_unmatched(self, content: dict, meta: MetaDict):
"""
A handler which sends the orderbook with unmatched orders to an agent.
Expand Down Expand Up @@ -439,6 +482,7 @@
)
meta["market_id"] = self.marketconfig.name
meta["time"] = meta["product_start"]
self.results.append(meta)

await self.store_market_results(market_meta)

Expand Down