diff --git a/packages/valory/agents/market_maker/aea-config.yaml b/packages/valory/agents/market_maker/aea-config.yaml index 6410bd7..b16abe3 100644 --- a/packages/valory/agents/market_maker/aea-config.yaml +++ b/packages/valory/agents/market_maker/aea-config.yaml @@ -218,4 +218,12 @@ models: google_engine_id: ${str:google_engine_id} openai_api_key: ${str:openai_api_key} mech_contract_address: ${str:0x77af31de935740567cf4ff1986d04b2c964a786a} + use_subgraph_for_redeeming: ${bool:true} + contract_timeout: ${float:300.0} + max_filtering_retries: ${int:6} + reduce_factor: ${float:0.25} + conditional_tokens_address: ${str:0xCeAfDD6bc0bEF976fdCd1112955828E00543c0Ce} + redeeming_batch_size: ${int:1} + dust_threshold: ${int:10000000000000} + minimum_batch_size: ${int:500} is_abstract: false diff --git a/packages/valory/services/market_maker/service.yaml b/packages/valory/services/market_maker/service.yaml index d9d87aa..2f991f9 100644 --- a/packages/valory/services/market_maker/service.yaml +++ b/packages/valory/services/market_maker/service.yaml @@ -103,6 +103,13 @@ models: google_api_key: ${GOOGLE_API_KEY:str:google_api_key} google_engine_id: ${GOOGLE_ENGINE_ID:str:google_engine_id} openai_api_key: ${OPENAI_API_KEY:str:openai_api_key} + use_subgraph_for_redeeming: ${USE_SUBGRAPH_FOR_REDEEMING:bool:true} + contract_timeout: ${CONTRACT_TIMEOUT:float:300.0} + max_filtering_retries: ${MAX_FILTERING_RETRIES:int:6} + reduce_factor: ${REDUCE_FACTOR:float:0.25} + redeeming_batch_size: ${REDEEMING_BATCH_SIZE:int:1} + dust_threshold: ${DUST_THRESHOLD:int:10000000000000} + minimum_batch_size: ${MINIMUM_BATCH_SIZE:int:500} --- public_id: valory/ledger:0.19.0 type: connection diff --git a/packages/valory/skills/market_creation_manager_abci/behaviours.py b/packages/valory/skills/market_creation_manager_abci/behaviours.py index cc85755..a012586 100644 --- a/packages/valory/skills/market_creation_manager_abci/behaviours.py +++ b/packages/valory/skills/market_creation_manager_abci/behaviours.py @@ -111,7 +111,7 @@ PostTransactionRound, PrepareTransactionPayload, PrepareTransactionRound, - RedeemBondRound, + RedeemRound, RemoveFundingRound, RetrieveApprovedMarketPayload, RetrieveApprovedMarketRound, @@ -127,13 +127,19 @@ from packages.valory.skills.transaction_settlement_abci.payload_tools import ( hash_payload_to_hex, ) +from packages.valory.skills.market_creation_manager_abci.behaviours_.base import ( + MarketCreationManagerBaseBehaviour, + ETHER_VALUE, + SAFE_TX_GAS, + HTTP_OK +) +from packages.valory.skills.market_creation_manager_abci.behaviours_.reedem import ( + RedeemBehaviour +) -HTTP_OK = 200 HTTP_NO_CONTENT = 204 MAX_RETRIES = 3 -SAFE_TX_GAS = 0 -ETHER_VALUE = 0 MAX_PREVIOUS = 0 MIN_BALANCE_WITHDRAW_REALITIO = 100000000000000000 # 0.1 DAI @@ -244,14 +250,6 @@ ADDITIONAL_INFO_LIMIT = 5_000 -def to_content(query: str) -> bytes: - """Convert the given query string to payload content, i.e., add it under a `queries` key and convert it to bytes.""" - finalized_query = {"query": query} - encoded_query = json.dumps(finalized_query, sort_keys=True).encode("utf-8") - - return encoded_query - - def parse_date_timestring(string: str) -> Optional[datetime]: """Parse and return a datetime string.""" for format in AVAILABLE_FORMATS: @@ -266,183 +264,6 @@ def get_callable_name(method: Callable) -> str: """Return callable name.""" return getattr(method, "__name__") # noqa: B009 - -class MarketCreationManagerBaseBehaviour(BaseBehaviour, ABC): - """Base behaviour for the market_creation_manager_abci skill.""" - - @property - def synchronized_data(self) -> SynchronizedData: - """Return the synchronized data.""" - return cast(SynchronizedData, super().synchronized_data) - - @property - def params(self) -> MarketCreationManagerParams: - """Return the params.""" - return cast(MarketCreationManagerParams, super().params) - - @property - def last_synced_timestamp(self) -> int: - """ - Get last synced timestamp. - - This is the last timestamp guaranteed to be the same by 2/3 of the agents. - :returns: the last synced timestamp. - """ - state = cast(SharedState, self.context.state) - last_timestamp = ( - state.round_sequence.last_round_transition_timestamp.timestamp() - ) - return int(last_timestamp) - - @property - def shared_state(self) -> SharedState: - """Get the shared state.""" - return cast(SharedState, self.context.state) - - def _calculate_condition_id( - self, - oracle_contract: str, - question_id: str, - outcome_slot_count: int = 2, - ) -> Generator[None, None, str]: - """Calculate question ID.""" - response = yield from self.get_contract_api_response( - performative=ContractApiMessage.Performative.GET_STATE, - contract_address=self.params.conditional_tokens_contract, - contract_id=str(ConditionalTokensContract.contract_id), - contract_callable="calculate_condition_id", - oracle_contract=oracle_contract, - question_id=question_id, - outcome_slot_count=outcome_slot_count, - ) - return cast(str, response.state.body["condition_id"]) - - def _get_safe_tx_hash( - self, - to_address: str, - data: bytes, - value: int = ETHER_VALUE, - safe_tx_gas: int = SAFE_TX_GAS, - operation: int = SafeOperation.CALL.value, - ) -> Generator[None, None, Optional[str]]: - """Prepares and returns the safe tx hash.""" - response = yield from self.get_contract_api_response( - performative=ContractApiMessage.Performative.GET_STATE, # type: ignore - contract_address=self.synchronized_data.safe_contract_address, # the safe contract address - contract_id=str(GnosisSafeContract.contract_id), - contract_callable="get_raw_safe_transaction_hash", - to_address=to_address, # the contract the safe will invoke - value=value, - data=data, - safe_tx_gas=safe_tx_gas, - operation=operation, - ) - if response.performative != ContractApiMessage.Performative.STATE: - self.context.logger.error( - f"Couldn't get safe hash. " - f"Expected response performative {ContractApiMessage.Performative.STATE.value}, " # type: ignore - f"received {response.performative.value}." - ) - return None - - # strip "0x" from the response hash - tx_hash = cast(str, response.state.body["tx_hash"])[2:] - return tx_hash - - def _to_multisend( - self, transactions: List[Dict] - ) -> Generator[None, None, Optional[str]]: - """Transform payload to MultiSend.""" - multi_send_txs = [] - for transaction in transactions: - transaction = { - "operation": transaction.get("operation", MultiSendOperation.CALL), - "to": transaction["to"], - "value": transaction["value"], - "data": transaction.get("data", b""), - } - multi_send_txs.append(transaction) - - response = yield from self.get_contract_api_response( - performative=ContractApiMessage.Performative.GET_RAW_TRANSACTION, # type: ignore - contract_address=self.params.multisend_address, - contract_id=str(MultiSendContract.contract_id), - contract_callable="get_tx_data", - multi_send_txs=multi_send_txs, - ) - if response.performative != ContractApiMessage.Performative.RAW_TRANSACTION: - self.context.logger.error( - f"Couldn't compile the multisend tx. " - f"Expected performative {ContractApiMessage.Performative.RAW_TRANSACTION.value}, " # type: ignore - f"received {response.performative.value}." - ) - return None - - # strip "0x" from the response - multisend_data_str = cast(str, response.raw_transaction.body["data"])[2:] - tx_data = bytes.fromhex(multisend_data_str) - tx_hash = yield from self._get_safe_tx_hash( - self.params.multisend_address, - tx_data, - operation=SafeOperation.DELEGATE_CALL.value, - ) - if tx_hash is None: - return None - - payload_data = hash_payload_to_hex( - safe_tx_hash=tx_hash, - ether_value=ETHER_VALUE, - safe_tx_gas=SAFE_TX_GAS, - operation=SafeOperation.DELEGATE_CALL.value, - to_address=self.params.multisend_address, - data=tx_data, - ) - return payload_data - - def get_subgraph_result( - self, - query: str, - ) -> Generator[None, None, Optional[Dict[str, Any]]]: - """Get question ids.""" - response = yield from self.get_http_response( - content=to_content(query), - **self.context.omen_subgraph.get_spec(), - ) - - if response is None or response.status_code != HTTP_OK: - self.context.logger.error( - f"Could not retrieve response from Omen subgraph." - f"Received status code {response.status_code}.\n{response}" - ) - return None - - return json.loads(response.body.decode()) - - def do_llm_request( - self, - llm_message: LlmMessage, - llm_dialogue: LlmDialogue, - timeout: Optional[float] = None, - ) -> Generator[None, None, LlmMessage]: - """ - Do a request and wait the response, asynchronously. - - :param llm_message: The request message - :param llm_dialogue: the HTTP dialogue associated to the request - :param timeout: seconds to wait for the reply. - :yield: LLMMessage object - :return: the response message - """ - self.context.outbox.put_message(message=llm_message) - request_nonce = self._get_request_nonce_from_dialogue(llm_dialogue) - cast(Requests, self.context.requests).request_id_to_callback[ - request_nonce - ] = self.get_callback_request() - # notify caller by propagating potential timeout exception. - response = yield from self.wait_for_message(timeout=timeout) - return response - - class CollectRandomnessBehaviour(RandomnessBehaviour): """CollectRandomnessBehaviour""" @@ -1020,84 +841,84 @@ def _get_deposit_tx( return data -class RedeemBondBehaviour(MarketCreationManagerBaseBehaviour): - """RedeemBondBehaviour""" - - matching_round = RedeemBondRound - - def async_act(self) -> Generator: - """Implement the act.""" - with self.context.benchmark_tool.measure(self.behaviour_id).local(): - sender = self.context.agent_address - content = yield from self.get_payload() - payload = RedeemBondPayload(sender=sender, content=content) - with self.context.benchmark_tool.measure(self.behaviour_id).consensus(): - yield from self.send_a2a_transaction(payload) - yield from self.wait_until_round_end() - self.set_done() - - def get_balance(self, address: str) -> Generator[None, None, Optional[int]]: - """Get the balance of the provided address""" - safe_address = self.synchronized_data.safe_contract_address - response = yield from self.get_contract_api_response( - performative=ContractApiMessage.Performative.GET_STATE, # type: ignore - contract_address=self.params.realitio_contract, - contract_id=str(RealitioContract.contract_id), - contract_callable="balance_of", - address=safe_address, - ) - - if response.performative != ContractApiMessage.Performative.STATE: - self.context.logger.warning(f"balance_of unsuccessful!: {response}") - return None - - balance = cast(int, response.state.body["data"]) - self.context.logger.info(f"balance: {balance / 10 ** 18} xDAI") - return balance - - def get_payload(self) -> Generator[None, None, str]: - """Get the payload.""" - safe_address = self.synchronized_data.safe_contract_address - balance = yield from self.get_balance(safe_address) - if balance is None: - return RedeemBondRound.ERROR_PAYLOAD - - if balance <= MIN_BALANCE_WITHDRAW_REALITIO: - return RedeemBondRound.NO_TX_PAYLOAD - - withdraw_tx = yield from self._get_withdraw_tx() - if withdraw_tx is None: - return RedeemBondRound.ERROR_PAYLOAD - - tx_hash = yield from self._to_multisend( - transactions=[ - withdraw_tx, - ] - ) - if tx_hash is None: - return RedeemBondRound.ERROR_PAYLOAD - - return tx_hash - - def _get_withdraw_tx(self) -> Generator[None, None, Optional[Dict]]: - """Prepare a withdraw tx""" - self.context.logger.info("Starting RealitioContract.build_withdraw_tx") - response = yield from self.get_contract_api_response( - performative=ContractApiMessage.Performative.GET_STATE, - contract_address=self.params.realitio_contract, - contract_id=str(RealitioContract.contract_id), - contract_callable=get_callable_name(RealitioContract.build_withdraw_tx), - ) - if response.performative != ContractApiMessage.Performative.STATE: - self.context.logger.warning( - f"RealitioContract.build_withdraw_tx unsuccessful! : {response}" - ) - return None - return { - "to": self.params.realitio_contract, - "data": response.state.body["data"], - "value": ETHER_VALUE, - } +# class RedeemBondBehaviour(MarketCreationManagerBaseBehaviour): +# """RedeemBondBehaviour""" + +# matching_round = RedeemRound + +# def async_act(self) -> Generator: +# """Implement the act.""" +# with self.context.benchmark_tool.measure(self.behaviour_id).local(): +# sender = self.context.agent_address +# content = yield from self.get_payload() +# payload = RedeemBondPayload(sender=sender, content=content) +# with self.context.benchmark_tool.measure(self.behaviour_id).consensus(): +# yield from self.send_a2a_transaction(payload) +# yield from self.wait_until_round_end() +# self.set_done() + +# def get_balance(self, address: str) -> Generator[None, None, Optional[int]]: +# """Get the balance of the provided address""" +# safe_address = self.synchronized_data.safe_contract_address +# response = yield from self.get_contract_api_response( +# performative=ContractApiMessage.Performative.GET_STATE, # type: ignore +# contract_address=self.params.realitio_contract, +# contract_id=str(RealitioContract.contract_id), +# contract_callable="balance_of", +# address=safe_address, +# ) + +# if response.performative != ContractApiMessage.Performative.STATE: +# self.context.logger.warning(f"balance_of unsuccessful!: {response}") +# return None + +# balance = cast(int, response.state.body["data"]) +# self.context.logger.info(f"balance: {balance / 10 ** 18} xDAI") +# return balance + +# def get_payload(self) -> Generator[None, None, str]: +# """Get the payload.""" +# safe_address = self.synchronized_data.safe_contract_address +# balance = yield from self.get_balance(safe_address) +# if balance is None: +# return RedeemRound.ERROR_PAYLOAD + +# if balance <= MIN_BALANCE_WITHDRAW_REALITIO: +# return RedeemRound.NO_TX_PAYLOAD + +# withdraw_tx = yield from self._get_withdraw_tx() +# if withdraw_tx is None: +# return RedeemRound.ERROR_PAYLOAD + +# tx_hash = yield from self._to_multisend( +# transactions=[ +# withdraw_tx, +# ] +# ) +# if tx_hash is None: +# return RedeemRound.ERROR_PAYLOAD + +# return tx_hash + +# def _get_withdraw_tx(self) -> Generator[None, None, Optional[Dict]]: +# """Prepare a withdraw tx""" +# self.context.logger.info("Starting RealitioContract.build_withdraw_tx") +# response = yield from self.get_contract_api_response( +# performative=ContractApiMessage.Performative.GET_STATE, +# contract_address=self.params.realitio_contract, +# contract_id=str(RealitioContract.contract_id), +# contract_callable=get_callable_name(RealitioContract.build_withdraw_tx), +# ) +# if response.performative != ContractApiMessage.Performative.STATE: +# self.context.logger.warning( +# f"RealitioContract.build_withdraw_tx unsuccessful! : {response}" +# ) +# return None +# return { +# "to": self.params.realitio_contract, +# "data": response.state.body["data"], +# "value": ETHER_VALUE, +# } class SyncMarketsBehaviour(MarketCreationManagerBaseBehaviour): @@ -2124,7 +1945,7 @@ def get_payload(self) -> Generator[None, None, str]: ): return PostTransactionRound.MECH_REQUEST_DONE_PAYLOAD - if self.synchronized_data.tx_sender == RedeemBondRound.auto_round_id(): + if self.synchronized_data.tx_sender == RedeemRound.auto_round_id(): return PostTransactionRound.REDEEM_BOND_DONE_PAYLOAD is_approved_question_data_set = ( @@ -2517,6 +2338,6 @@ class MarketCreationManagerRoundBehaviour(AbstractRoundBehaviour): SyncMarketsBehaviour, RemoveFundingBehaviour, DepositDaiBehaviour, - RedeemBondBehaviour, + RedeemBehaviour, PostTransactionBehaviour, } diff --git a/packages/valory/skills/market_creation_manager_abci/behaviours_/__init__.py b/packages/valory/skills/market_creation_manager_abci/behaviours_/__init__.py new file mode 100644 index 0000000..3103e7c --- /dev/null +++ b/packages/valory/skills/market_creation_manager_abci/behaviours_/__init__.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# ------------------------------------------------------------------------------ +# +# Copyright 2024 Valory AG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------------ + +"""This package contains the behaviours for the 'market_creation_manager_abci' skill.""" diff --git a/packages/valory/skills/market_creation_manager_abci/behaviours_/base.py b/packages/valory/skills/market_creation_manager_abci/behaviours_/base.py new file mode 100644 index 0000000..07305a9 --- /dev/null +++ b/packages/valory/skills/market_creation_manager_abci/behaviours_/base.py @@ -0,0 +1,462 @@ +# -*- coding: utf-8 -*- +# ------------------------------------------------------------------------------ +# +# Copyright 2023-2024 Valory AG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------------ + +"""This package contains round behaviours of MarketCreationManagerAbciApp.""" + +import json +import dataclasses +from aea.configurations.data_types import PublicId +from abc import ABC +from typing import ( + Any, + Callable, + Dict, + Generator, + List, + Optional, + cast, +) + +from packages.valory.skills.transaction_settlement_abci.rounds import TX_HASH_LENGTH +from packages.valory.contracts.conditional_tokens.contract import ( + ConditionalTokensContract, +) +from packages.valory.contracts.gnosis_safe.contract import ( + GnosisSafeContract, + SafeOperation, +) +from packages.valory.contracts.multisend.contract import ( + MultiSendContract, + MultiSendOperation, +) +from packages.valory.protocols.contract_api import ContractApiMessage +from packages.valory.protocols.llm.message import LlmMessage +from packages.valory.skills.abstract_round_abci.behaviours import ( + BaseBehaviour, +) +from packages.valory.skills.abstract_round_abci.models import Requests +from packages.valory.skills.market_creation_manager_abci.models import ( + MarketCreationManagerParams, + MultisendBatch, + SharedState, +) +from packages.valory.skills.transaction_settlement_abci.payload_tools import ( + hash_payload_to_hex, +) +from packages.valory.skills.market_creation_manager_abci.dialogues import ( + LlmDialogue, +) +from packages.valory.skills.market_creation_manager_abci.rounds import ( + SynchronizedData +) +from packages.valory.skills.abstract_round_abci.base import BaseTxPayload + +from datetime import datetime, timedelta +from packages.valory.skills.abstract_round_abci.behaviour_utils import TimeoutException +WaitableConditionType = Generator[None, None, bool] + + +SAFE_TX_GAS = 0 +ETHER_VALUE = 0 +HTTP_OK = 200 + + +class MarketCreationManagerBaseBehaviour(BaseBehaviour, ABC): + """Base behaviour for the market_creation_manager_abci skill.""" + + def __init__(self, **kwargs: Any) -> None: + """Initialize the bet placement behaviour.""" + super().__init__(**kwargs) + self.token_balance = 0 + self.wallet_balance = 0 + self.multisend_batches: List[MultisendBatch] = [] + self.multisend_data = b"" + self._safe_tx_hash = "" + + @property + def synchronized_data(self) -> SynchronizedData: + """Return the synchronized data.""" + return cast(SynchronizedData, super().synchronized_data) + + @property + def params(self) -> MarketCreationManagerParams: + """Return the params.""" + return cast(MarketCreationManagerParams, super().params) + + @property + def last_synced_timestamp(self) -> int: + """ + Get last synced timestamp. + + This is the last timestamp guaranteed to be the same by 2/3 of the agents. + :returns: the last synced timestamp. + """ + state = cast(SharedState, self.context.state) + last_timestamp = ( + state.round_sequence.last_round_transition_timestamp.timestamp() + ) + return int(last_timestamp) + + @property + def shared_state(self) -> SharedState: + """Get the shared state.""" + return cast(SharedState, self.context.state) + + def _calculate_condition_id( + self, + oracle_contract: str, + question_id: str, + outcome_slot_count: int = 2, + ) -> Generator[None, None, str]: + """Calculate question ID.""" + response = yield from self.get_contract_api_response( + performative=ContractApiMessage.Performative.GET_STATE, + contract_address=self.params.conditional_tokens_contract, + contract_id=str(ConditionalTokensContract.contract_id), + contract_callable="calculate_condition_id", + oracle_contract=oracle_contract, + question_id=question_id, + outcome_slot_count=outcome_slot_count, + ) + return cast(str, response.state.body["condition_id"]) + + def _get_safe_tx_hash( + self, + to_address: str, + data: bytes, + value: int = ETHER_VALUE, + safe_tx_gas: int = SAFE_TX_GAS, + operation: int = SafeOperation.CALL.value, + ) -> Generator[None, None, Optional[str]]: + """Prepares and returns the safe tx hash.""" + response = yield from self.get_contract_api_response( + performative=ContractApiMessage.Performative.GET_STATE, # type: ignore + contract_address=self.synchronized_data.safe_contract_address, # the safe contract address + contract_id=str(GnosisSafeContract.contract_id), + contract_callable="get_raw_safe_transaction_hash", + to_address=to_address, # the contract the safe will invoke + value=value, + data=data, + safe_tx_gas=safe_tx_gas, + operation=operation, + ) + if response.performative != ContractApiMessage.Performative.STATE: + self.context.logger.error( + f"Couldn't get safe hash. " + f"Expected response performative {ContractApiMessage.Performative.STATE.value}, " # type: ignore + f"received {response.performative.value}." + ) + return None + + # strip "0x" from the response hash + tx_hash = cast(str, response.state.body["tx_hash"])[2:] + return tx_hash + + def _to_multisend( + self, transactions: List[Dict] + ) -> Generator[None, None, Optional[str]]: + """Transform payload to MultiSend.""" + multi_send_txs = [] + for transaction in transactions: + transaction = { + "operation": transaction.get("operation", MultiSendOperation.CALL), + "to": transaction["to"], + "value": transaction["value"], + "data": transaction.get("data", b""), + } + multi_send_txs.append(transaction) + + response = yield from self.get_contract_api_response( + performative=ContractApiMessage.Performative.GET_RAW_TRANSACTION, # type: ignore + contract_address=self.params.multisend_address, + contract_id=str(MultiSendContract.contract_id), + contract_callable="get_tx_data", + multi_send_txs=multi_send_txs, + ) + if response.performative != ContractApiMessage.Performative.RAW_TRANSACTION: + self.context.logger.error( + f"Couldn't compile the multisend tx. " + f"Expected performative {ContractApiMessage.Performative.RAW_TRANSACTION.value}, " # type: ignore + f"received {response.performative.value}." + ) + return None + + # strip "0x" from the response + multisend_data_str = cast(str, response.raw_transaction.body["data"])[2:] + tx_data = bytes.fromhex(multisend_data_str) + tx_hash = yield from self._get_safe_tx_hash( + self.params.multisend_address, + tx_data, + operation=SafeOperation.DELEGATE_CALL.value, + ) + if tx_hash is None: + return None + + payload_data = hash_payload_to_hex( + safe_tx_hash=tx_hash, + ether_value=ETHER_VALUE, + safe_tx_gas=SAFE_TX_GAS, + operation=SafeOperation.DELEGATE_CALL.value, + to_address=self.params.multisend_address, + data=tx_data, + ) + return payload_data + + def get_subgraph_result( + self, + query: str, + ) -> Generator[None, None, Optional[Dict[str, Any]]]: + """Get question ids.""" + response = yield from self.get_http_response( + content=self.to_content(query), + **self.context.omen_subgraph.get_spec(), + ) + + if response is None or response.status_code != HTTP_OK: + self.context.logger.error( + f"Could not retrieve response from Omen subgraph." + f"Received status code {response.status_code}.\n{response}" + ) + return None + + return json.loads(response.body.decode()) + + def do_llm_request( + self, + llm_message: LlmMessage, + llm_dialogue: LlmDialogue, + timeout: Optional[float] = None, + ) -> Generator[None, None, LlmMessage]: + """ + Do a request and wait the response, asynchronously. + + :param llm_message: The request message + :param llm_dialogue: the HTTP dialogue associated to the request + :param timeout: seconds to wait for the reply. + :yield: LLMMessage object + :return: the response message + """ + self.context.outbox.put_message(message=llm_message) + request_nonce = self._get_request_nonce_from_dialogue(llm_dialogue) + cast(Requests, self.context.requests).request_id_to_callback[ + request_nonce + ] = self.get_callback_request() + # notify caller by propagating potential timeout exception. + response = yield from self.wait_for_message(timeout=timeout) + return response + + def to_content(self, query: str) -> bytes: + """Convert the given query string to payload content, i.e., add it under a `queries` key and convert it to bytes.""" + finalized_query = {"query": query} + encoded_query = json.dumps(finalized_query, sort_keys=True).encode("utf-8") + return encoded_query + + def wait_for_condition_with_sleep( + self, + condition_gen: Callable[[], WaitableConditionType], + timeout: Optional[float] = None, + ) -> Generator[None, None, None]: + """Wait for a condition to happen and sleep in-between checks. + + This is a modified version of the base `wait_for_condition` method which: + 1. accepts a generator that creates the condition instead of a callable + 2. sleeps in-between checks + + :param condition_gen: a generator of the condition to wait for + :param timeout: the maximum amount of time to wait + :yield: None + """ + + deadline = ( + datetime.now() + timedelta(0, timeout) + if timeout is not None + else datetime.max + ) + + while True: + condition_satisfied = yield from condition_gen() + if condition_satisfied: + break + if timeout is not None and datetime.now() > deadline: + raise TimeoutException() + self.context.logger.info(f"Retrying in {self.params.sleep_time} seconds.") + yield from self.sleep(self.params.sleep_time) + + def contract_interact( + self, + performative: ContractApiMessage.Performative, + contract_address: str, + contract_public_id: PublicId, + contract_callable: str, + data_key: str, + placeholder: str, + **kwargs: Any, + ) -> WaitableConditionType: + """Interact with a contract.""" + contract_id = str(contract_public_id) + response_msg = yield from self.get_contract_api_response( + performative, + contract_address, + contract_id, + contract_callable, + **kwargs, + ) + if response_msg.performative != ContractApiMessage.Performative.RAW_TRANSACTION: + self.default_error(contract_id, contract_callable, response_msg) + return False + + propagated = self._propagate_contract_messages(response_msg) + data = response_msg.raw_transaction.body.get(data_key, None) + if data is None: + if not propagated: + self.default_error(contract_id, contract_callable, response_msg) + return False + + setattr(self, placeholder, data) + return True + + def default_error( + self, contract_id: str, contract_callable: str, response_msg: ContractApiMessage + ) -> None: + """Return a default contract interaction error message.""" + self.context.logger.error( + f"Could not successfully interact with the {contract_id} contract " + f"using {contract_callable!r}: {response_msg}" + ) + + def _propagate_contract_messages(self, response_msg: ContractApiMessage) -> bool: + """Propagate the contract's message to the logger, if exists. + + Contracts can only return one message at a time. + + :param response_msg: the response message from the contract method. + :return: whether a message has been propagated. + """ + for level in ("info", "warning", "error"): + msg = response_msg.raw_transaction.body.get(level, None) + if msg is not None: + logger = getattr(self.context.logger, level) + logger(msg) + return True + return False + + def finish_behaviour(self, payload: BaseTxPayload) -> Generator: + """Finish the behaviour.""" + with self.context.benchmark_tool.measure(self.behaviour_id).consensus(): + yield from self.send_a2a_transaction(payload) + yield from self.wait_until_round_end() + + self.set_done() + + @staticmethod + def wei_to_native(wei: int) -> float: + """Convert WEI to native token.""" + return wei / 10**18 + + @property + def txs_value(self) -> int: + """Get the total value of the transactions.""" + return sum(batch.value for batch in self.multisend_batches) + + @property + def multi_send_txs(self) -> List[dict]: + """Get the multisend transactions as a list of dictionaries.""" + return [dataclasses.asdict(batch) for batch in self.multisend_batches] + + @property + def safe_tx_hash(self) -> str: + """Get the safe_tx_hash.""" + return self._safe_tx_hash + + @safe_tx_hash.setter + def safe_tx_hash(self, safe_hash: str) -> None: + """Set the safe_tx_hash.""" + length = len(safe_hash) + if length != TX_HASH_LENGTH: + raise ValueError( + f"Incorrect length {length} != {TX_HASH_LENGTH} detected " + f"when trying to assign a safe transaction hash: {safe_hash}" + ) + self._safe_tx_hash = safe_hash[2:] + + def _build_multisend_data( + self, + ) -> WaitableConditionType: + """Get the multisend tx.""" + response_msg = yield from self.get_contract_api_response( + performative=ContractApiMessage.Performative.GET_RAW_TRANSACTION, # type: ignore + contract_address=self.params.multisend_address, + contract_id=str(MultiSendContract.contract_id), + contract_callable="get_tx_data", + multi_send_txs=self.multi_send_txs, + ) + expected_performative = ContractApiMessage.Performative.RAW_TRANSACTION + if response_msg.performative != expected_performative: + self.context.logger.error( + f"Couldn't compile the multisend tx. " + f"Expected response performative {expected_performative.value}, " # type: ignore + f"received {response_msg.performative.value}: {response_msg}" + ) + return False + + multisend_data_str = response_msg.raw_transaction.body.get("data", None) + if multisend_data_str is None: + self.context.logger.error( + f"Something went wrong while trying to prepare the multisend data: {response_msg}" + ) + return False + + # strip "0x" from the response + multisend_data_str = str(response_msg.raw_transaction.body["data"])[2:] + self.multisend_data = bytes.fromhex(multisend_data_str) + return True + + def _build_multisend_safe_tx_hash(self) -> WaitableConditionType: + """Prepares and returns the safe tx hash for a multisend tx.""" + response_msg = yield from self.get_contract_api_response( + performative=ContractApiMessage.Performative.GET_STATE, # type: ignore + contract_address=self.synchronized_data.safe_contract_address, + contract_id=str(GnosisSafeContract.contract_id), + contract_callable="get_raw_safe_transaction_hash", + to_address=self.params.multisend_address, + value=self.txs_value, + data=self.multisend_data, + safe_tx_gas=SAFE_TX_GAS, + operation=SafeOperation.DELEGATE_CALL.value, + ) + + if response_msg.performative != ContractApiMessage.Performative.STATE: + self.context.logger.error( + "Couldn't get safe tx hash. Expected response performative " + f"{ContractApiMessage.Performative.STATE.value}, " # type: ignore + f"received {response_msg.performative.value}: {response_msg}." + ) + return False + + tx_hash = response_msg.state.body.get("tx_hash", None) + if tx_hash is None or len(tx_hash) != TX_HASH_LENGTH: + self.context.logger.error( + "Something went wrong while trying to get the buy transaction's hash. " + f"Invalid hash {tx_hash!r} was returned." + ) + return False + + # strip "0x" from the response hash + self.safe_tx_hash = tx_hash + return True diff --git a/packages/valory/skills/market_creation_manager_abci/behaviours_/reedem.py b/packages/valory/skills/market_creation_manager_abci/behaviours_/reedem.py new file mode 100644 index 0000000..eee7e41 --- /dev/null +++ b/packages/valory/skills/market_creation_manager_abci/behaviours_/reedem.py @@ -0,0 +1,842 @@ +# -*- coding: utf-8 -*- +# ------------------------------------------------------------------------------ +# +# Copyright 2023-2024 Valory AG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------------ + +"""This module contains the redeeming state of the decision-making abci app.""" + +import json +import time +from abc import ABC +from sys import maxsize +from typing import Any, Dict, Generator, Iterator, List, Optional, Set, Union + +from hexbytes import HexBytes +from web3.constants import HASH_ZERO + +from packages.valory.contracts.conditional_tokens.contract import ( + ConditionalTokensContract, +) +from packages.valory.contracts.realitio.contract import RealitioContract +from packages.valory.contracts.realitio_proxy.contract import RealitioProxyContract +from packages.valory.protocols.contract_api import ContractApiMessage +from packages.valory.protocols.ledger_api import LedgerApiMessage +from packages.valory.skills.abstract_round_abci.base import BaseTxPayload, get_name +from packages.valory.skills.decision_maker_abci.behaviours.base import ( + DecisionMakerBaseBehaviour, + WaitableConditionType, +) +from packages.valory.skills.decision_maker_abci.models import ( + MultisendBatch, + RedeemingProgress, +) +from packages.valory.skills.decision_maker_abci.payloads import RedeemPayload +from packages.valory.skills.decision_maker_abci.redeem_info import ( + Condition, + FPMM, + Trade, +) +from packages.valory.skills.decision_maker_abci.states.redeem import RedeemRound +from packages.valory.skills.market_manager_abci.graph_tooling.requests import ( + FetchStatus, + QueryingBehaviour, +) +from packages.valory.skills.market_manager_abci.graph_tooling.utils import ( + filter_claimed_conditions, + get_condition_id_to_balances, +) +from packages.valory.skills.market_creation_manager_abci.behaviours_.base import ( + MarketCreationManagerBaseBehaviour, + ETHER_VALUE, + SAFE_TX_GAS, + HTTP_OK +) + + +ZERO_HEX = HASH_ZERO[2:] +ZERO_BYTES = bytes.fromhex(ZERO_HEX) +BLOCK_NUMBER_KEY = "number" +DEFAULT_TO_BLOCK = "latest" + + +class RedeemBehaviour(MarketCreationManagerBaseBehaviour): + """Redeem the winnings.""" + + matching_round = RedeemRound + + def __init__(self, **kwargs: Any) -> None: + """Initialize `RedeemBehaviour`.""" + super().__init__(**kwargs) + self._claim_params_batch: list = [] + self._latest_block_number: Optional[int] = None + self._finalized: bool = False + self._already_resolved: bool = False + self._payouts: Dict[str, int] = {} + self._built_data: Optional[HexBytes] = None + self._current_redeem_info: Optional[Trade] = None + self._expected_winnings: int = 0 + self._history_hash: bytes = ZERO_BYTES + self._claim_winnings_simulation_ok: bool = False + self.redeemed_condition_ids: Set[str] = set() + self.payout_so_far: int = 0 + self.trades: Set[Trade] = set() + self.earliest_block_number: int = 0 + + @property + def redeeming_progress(self) -> RedeemingProgress: + """Get the redeeming check progress from the shared state.""" + return self.shared_state.redeeming_progress + + @redeeming_progress.setter + def redeeming_progress(self, progress: RedeemingProgress) -> None: + """Set the redeeming check progress in the shared state.""" + self.shared_state.redeeming_progress = progress + + @property + def latest_block_number(self) -> int: + """Get the latest block number.""" + if self._latest_block_number is None: + error = "Attempting to retrieve the latest block number, but it hasn't been set yet." + raise ValueError(error) + return self._latest_block_number + + @latest_block_number.setter + def latest_block_number(self, latest_block_number: str) -> None: + """Set the latest block number.""" + try: + self._latest_block_number = int(latest_block_number) + except (TypeError, ValueError) as exc: + error = f"{latest_block_number=} cannot be converted to a valid integer." + raise ValueError(error) from exc + + @property + def current_redeem_info(self) -> Trade: + """Get the current redeem info.""" + if self._current_redeem_info is None: + raise ValueError("Current redeem information have not been set.") + return self._current_redeem_info + + @property + def current_fpmm(self) -> FPMM: + """Get the current FPMM.""" + return self.current_redeem_info.fpmm + + @property + def current_condition(self) -> Condition: + """Get the current condition.""" + return self.current_fpmm.condition + + @property + def current_question_id(self) -> bytes: + """Get the current question's id.""" + return self.current_fpmm.question.id + + @property + def current_collateral_token(self) -> str: + """Get the current collateral token.""" + return self.current_fpmm.collateralToken + + @property + def current_condition_id(self) -> HexBytes: + """Get the current condition id.""" + return self.current_condition.id + + @property + def current_index_sets(self) -> List[int]: + """Get the current index sets.""" + return self.current_condition.index_sets + + @property + def current_claimable_amount(self) -> int: + """Return the current claimable amount.""" + return self.claimable_amounts[self.current_condition_id] + + @property + def is_dust(self) -> bool: + """Return whether the claimable amount of the given condition id is dust or not.""" + return self.current_claimable_amount < self.params.dust_threshold + + @property + def payouts_batch(self) -> Dict[str, int]: + """Get the trades' transaction hashes mapped to payouts for the current market.""" + return self._payouts + + @payouts_batch.setter + def payouts_batch(self, payouts: Dict[str, int]) -> None: + """Set the trades' transaction hashes mapped to payouts for the current market.""" + self._payouts = payouts + + @property + def finalized(self) -> bool: + """Get whether the current market has been finalized.""" + return self._finalized + + @finalized.setter + def finalized(self, flag: bool) -> None: + """Set whether the current market has been finalized.""" + self._finalized = flag + + @property + def history_hash(self) -> bytes: + """Get the history hash for the current question.""" + return self._history_hash + + @history_hash.setter + def history_hash(self, history_hash: bytes) -> None: + """Set the history hash for the current question.""" + self._history_hash = history_hash + + @property + def is_history_hash_null(self) -> bool: + """Return whether the current history hash is null.""" + return self.history_hash == b"\x00" * 32 + + @property + def already_resolved(self) -> bool: + """Get whether the current market has already been resolved.""" + return self._already_resolved + + @already_resolved.setter + def already_resolved(self, flag: bool) -> None: + """Set whether the current market has already been resolved.""" + self._already_resolved = flag + + @property + def claim_params_batch(self) -> list: + """Get the current batch of the claim parameters.""" + return self._claim_params_batch + + @claim_params_batch.setter + def claim_params_batch(self, claim_params_batch: list) -> None: + """Set the current batch of the claim parameters.""" + self._claim_params_batch = claim_params_batch + + @property + def built_data(self) -> HexBytes: + """Get the built transaction's data.""" + return self._built_data + + @built_data.setter + def built_data(self, built_data: Union[str, bytes]) -> None: + """Set the built transaction's data.""" + self._built_data = HexBytes(built_data) + + @property + def claim_winnings_simulation_ok(self) -> bool: + """Get whether the claim winnings simulation is ok.""" + return self._claim_winnings_simulation_ok + + @claim_winnings_simulation_ok.setter + def claim_winnings_simulation_ok(self, claim_winnings_simulation_ok: bool) -> None: + """Get whether the claim winnings simulation is ok.""" + self._claim_winnings_simulation_ok = claim_winnings_simulation_ok + + def _store_progress(self) -> None: + """Store the redeeming progress.""" + self.redeeming_progress.trades = self.trades + self.redeeming_progress.claimable_amounts = self.claimable_amounts + self.redeeming_progress.earliest_block_number = self.earliest_block_number + + def _load_progress(self) -> None: + """Load the redeeming progress.""" + self.trades = self.redeeming_progress.trades + self.claimable_amounts = self.redeeming_progress.claimable_amounts + self.earliest_block_number = self.redeeming_progress.earliest_block_number + + + + + + + + + + + def _get_redeem_info( + self, + ) -> Generator: + """Fetch the trades from all the prediction markets and store them as redeeming information.""" + while True: + can_proceed = self._prepare_fetching() + if not can_proceed: + break + + trades_market_chunk = yield from self._fetch_redeem_info() + if trades_market_chunk is not None: + yield from self.update_redeem_info(trades_market_chunk) + + self.context.logger.info(f"Fetched redeeming information: {self.trades}") + + + + + + + + + + + + def _filter_trades(self) -> None: + """Filter the trades, removing the redeemed condition ids.""" + redeemed_condition_ids = [ + condition_id.lower() for condition_id in self.redeemed_condition_ids + ] + self.trades = { + trade + for trade in self.trades + if trade.fpmm.condition.id.hex().lower() not in redeemed_condition_ids + } + self.redeeming_progress.trades = self.trades + + def _conditional_tokens_interact( + self, contract_callable: str, data_key: str, placeholder: str, **kwargs: Any + ) -> WaitableConditionType: + """Interact with the conditional tokens contract.""" + status = yield from self.contract_interact( + performative=ContractApiMessage.Performative.GET_RAW_TRANSACTION, # type: ignore + contract_address=self.params.conditional_tokens_address, + contract_public_id=ConditionalTokensContract.contract_id, + contract_callable=contract_callable, + data_key=data_key, + placeholder=placeholder, + **kwargs, + ) + return status + + def _get_latest_block(self) -> WaitableConditionType: + """Get the latest block's timestamp.""" + ledger_api_response = yield from self.get_ledger_api_response( + performative=LedgerApiMessage.Performative.GET_STATE, # type: ignore + ledger_callable="get_block", + block_identifier=DEFAULT_TO_BLOCK, + ) + if ledger_api_response.performative != LedgerApiMessage.Performative.STATE: + self.context.logger.error(f"Failed to get block: {ledger_api_response}") + return False + self.latest_block_number = ledger_api_response.state.body.get(BLOCK_NUMBER_KEY) + return True + + def _check_already_redeemed_via_events(self) -> WaitableConditionType: + """Check whether the condition ids have already been redeemed via events.""" + if len(self.trades) == 0: + return True + + safe_address_lower = self.synchronized_data.safe_contract_address.lower() + kwargs: Dict[str, Any] = { + key: [] + for key in ( + "collateral_tokens", + "parent_collection_ids", + "condition_ids", + "index_sets", + ) + } + for trade in self.trades: + kwargs["collateral_tokens"].append(trade.fpmm.collateralToken) + kwargs["parent_collection_ids"].append(ZERO_BYTES) + kwargs["condition_ids"].append(trade.fpmm.condition.id) + kwargs["index_sets"].append(trade.fpmm.condition.index_sets) + + if not self.redeeming_progress.check_started: + self.redeeming_progress.check_from_block = self.earliest_block_number + yield from self.wait_for_condition_with_sleep(self._get_latest_block) + self.redeeming_progress.check_to_block = self.latest_block_number + self.redeeming_progress.check_started = True + + n_retries = 0 + from_block = self.redeeming_progress.check_from_block + batch_size = self.redeeming_progress.event_filtering_batch_size + while from_block < self.redeeming_progress.check_to_block: + max_to_block = from_block + batch_size + to_block = min(max_to_block, self.redeeming_progress.check_to_block) + result = yield from self._conditional_tokens_interact( + contract_callable="check_redeemed", + data_key="payouts", + placeholder=get_name(RedeemBehaviour.payouts_batch), + redeemer=safe_address_lower, + from_block=from_block, + to_block=to_block, + timeout=self.params.contract_timeout, + **kwargs, + ) + + if not result and n_retries == self.params.max_filtering_retries: + err = "Skipping the redeeming round as the RPC is misbehaving." + self.context.logger.error(err) + return False + + if not result: + n_retries += 1 + keep_fraction = 1 - self.params.reduce_factor + reduced_batch_size = int(batch_size * keep_fraction) + # ensure that the batch size is at least the minimum batch size + batch_size = max(reduced_batch_size, self.params.minimum_batch_size) + self.redeeming_progress.event_filtering_batch_size = batch_size + self.context.logger.warning( + f"Repeating this call with a decreased batch size of {batch_size}." + ) + + continue + + self.redeeming_progress.payouts.update(self.payouts_batch) + self.redeeming_progress.check_from_block = to_block + from_block += batch_size + + return True + + def _check_already_redeemed_via_subgraph(self) -> WaitableConditionType: + """Check whether the condition ids have already been redeemed via subgraph.""" + safe_address = self.synchronized_data.safe_contract_address.lower() + from_timestamp, to_timestamp = 0.0, time.time() # from beginning to now + + # get the trades + trades = yield from self.fetch_trades( + safe_address, from_timestamp, to_timestamp + ) + if trades is None: + return False + + # get the user's positions + user_positions = yield from self.fetch_user_positions(safe_address) + if user_positions is None: + return False + + # process the positions + payouts, unredeemed_raw = get_condition_id_to_balances(trades, user_positions) + + # filter out positions that are already claimed + unredeemed = filter_claimed_conditions( + unredeemed_raw, self.redeeming_progress.claimed_condition_ids + ) + + self.redeeming_progress.payouts = payouts + self.redeeming_progress.unredeemed_trades = unredeemed + + return True + + def _check_already_redeemed(self) -> WaitableConditionType: + """Check whether we have already redeemed for this bet.""" + if self.params.use_subgraph_for_redeeming: + return self._check_already_redeemed_via_subgraph() + + return self._check_already_redeemed_via_events() + + def _clean_redeem_info(self) -> WaitableConditionType: + """Clean the redeeming information based on whether any positions have already been redeemed.""" + if self.payout_so_far > 0: + # filter the trades to avoid checking positions that we are already aware have been redeemed. + self._filter_trades() + + success = yield from self._check_already_redeemed() + if not success: + return False + + payouts = self.redeeming_progress.payouts + payouts_amount = sum(payouts.values()) + if payouts_amount > 0: + self.redeemed_condition_ids |= set(payouts.keys()) + if self.params.use_subgraph_for_redeeming: + self.payout_so_far = payouts_amount + else: + self.payout_so_far += payouts_amount + + # filter the trades again if new payouts have been found + self._filter_trades() + wxdai_amount = self.wei_to_native(self.payout_so_far) + msg = f"The total payout so far has been {wxdai_amount} wxDAI." + self.context.logger.info(msg) + + return True + + def _realitio_interact( + self, contract_callable: str, data_key: str, placeholder: str, **kwargs: Any + ) -> WaitableConditionType: + """Interact with the realitio contract.""" + status = yield from self.contract_interact( + performative=ContractApiMessage.Performative.GET_RAW_TRANSACTION, # type: ignore + contract_address=self.params.realitio_address, + contract_public_id=RealitioContract.contract_id, + contract_callable=contract_callable, + data_key=data_key, + placeholder=placeholder, + **kwargs, + ) + return status + + def _check_finalized(self) -> WaitableConditionType: + """Check whether the question has been finalized.""" + result = yield from self._realitio_interact( + contract_callable="check_finalized", + data_key="finalized", + placeholder=get_name(RedeemBehaviour.finalized), + question_id=self.current_question_id, + ) + return result + + def _get_history_hash(self) -> WaitableConditionType: + """Get the history hash for the current question id.""" + result = yield from self._realitio_interact( + contract_callable="get_history_hash", + data_key="data", + placeholder=get_name(RedeemBehaviour.history_hash), + question_id=self.current_question_id, + ) + return result + + def _check_already_resolved(self) -> WaitableConditionType: + """Check whether someone has already resolved for this market.""" + result = yield from self._conditional_tokens_interact( + contract_callable="check_resolved", + data_key="resolved", + placeholder=get_name(RedeemBehaviour.already_resolved), + condition_id=self.current_condition_id, + ) + return result + + def _build_resolve_data(self) -> WaitableConditionType: + """Prepare the safe tx to resolve the condition.""" + result = yield from self.contract_interact( + performative=ContractApiMessage.Performative.GET_RAW_TRANSACTION, # type: ignore + contract_address=self.params.realitio_proxy_address, + contract_public_id=RealitioProxyContract.contract_id, + contract_callable="build_resolve_tx", + data_key="data", + placeholder=get_name(RedeemBehaviour.built_data), + question_id=self.current_question_id, + template_id=self.current_fpmm.templateId, + question=self.current_fpmm.question.data, + num_outcomes=self.current_condition.outcomeSlotCount, + ) + + if not result: + return False + + batch = MultisendBatch( + to=self.params.realitio_proxy_address, + data=HexBytes(self.built_data), + ) + self.multisend_batches.append(batch) + return True + + def _simulate_claiming(self) -> WaitableConditionType: + """Check whether we have already claimed the winnings.""" + result = yield from self._realitio_interact( + contract_callable="simulate_claim_winnings", + data_key="data", + placeholder=get_name(RedeemBehaviour.claim_winnings_simulation_ok), + question_id=self.current_question_id, + claim_params=self.redeeming_progress.claim_params, + sender_address=self.synchronized_data.safe_contract_address, + ) + return result + + def _build_claim_data(self) -> WaitableConditionType: + """Prepare the safe tx to claim the winnings.""" + claim_params = self.redeeming_progress.claim_params + if claim_params is None: + self.context.logger.error( + f"Cannot parse incorrectly formatted realitio `LogNewAnswer` events: {self.redeeming_progress.answered}" + ) + return False + + result = yield from self._realitio_interact( + contract_callable="build_claim_winnings", + data_key="data", + placeholder=get_name(RedeemBehaviour.built_data), + question_id=self.current_question_id, + claim_params=self.redeeming_progress.claim_params, + ) + + if not result: + return False + + batch = MultisendBatch( + to=self.params.realitio_address, + data=HexBytes(self.built_data), + ) + self.multisend_batches.append(batch) + return True + + def get_claim_params(self) -> WaitableConditionType: + """Get the claim params for the current question id.""" + if self.params.use_subgraph_for_redeeming: + return self._get_claim_params_via_subgraph() + + return self._get_claim_params_via_events() + + def _get_claim_params_via_events(self) -> WaitableConditionType: + """Get claim params using an RPC to get the events.""" + if not self.redeeming_progress.claim_started: + self.redeeming_progress.claim_from_block = self.earliest_block_number + self.redeeming_progress.claim_to_block = ( + self.redeeming_progress.check_to_block + ) + self.redeeming_progress.claim_started = True + + n_retries = 0 + from_block = self.redeeming_progress.claim_from_block + batch_size = self.redeeming_progress.event_filtering_batch_size + while from_block < self.redeeming_progress.claim_to_block: + max_to_block = from_block + batch_size + to_block = min(max_to_block, self.redeeming_progress.claim_to_block) + result = yield from self._realitio_interact( + contract_callable="get_claim_params", + data_key="answered", + placeholder=get_name(RedeemBehaviour.claim_params_batch), + from_block=from_block, + to_block=to_block, + question_id=self.current_question_id, + timeout=self.params.contract_timeout, + ) + + if not result and n_retries == self.params.max_filtering_retries: + err = "Skipping redeeming for the current position as the RPC is misbehaving." + self.context.logger.error(err) + return False + + if not result: + n_retries += 1 + keep_fraction = 1 - self.params.reduce_factor + batch_size = int(batch_size * keep_fraction) + self.redeeming_progress.event_filtering_batch_size = batch_size + self.context.logger.warning( + f"Repeating this call with a decreased batch size of {batch_size}." + ) + continue + + self.redeeming_progress.answered.extend(self.claim_params_batch) + self.redeeming_progress.claim_from_block = to_block + from_block += batch_size + + return True + + def _get_claim_params_via_subgraph(self) -> WaitableConditionType: + """Get claim params using a subgraph.""" + question_id_str = "0x" + self.current_question_id.hex() + result = yield from self.fetch_claim_params(question_id_str) + if not result: + return False + + self.redeeming_progress.answered = result + return True + + def _build_redeem_data(self) -> WaitableConditionType: + """Prepare the safe tx to redeem the position.""" + result = yield from self._conditional_tokens_interact( + contract_callable="build_redeem_positions_tx", + data_key="data", + placeholder=get_name(RedeemBehaviour.built_data), + collateral_token=self.current_collateral_token, + parent_collection_id=ZERO_BYTES, + condition_id=self.current_condition_id, + index_sets=self.current_index_sets, + ) + + if not result: + return False + + batch = MultisendBatch( + to=self.params.conditional_tokens_address, + data=HexBytes(self.built_data), + ) + self.multisend_batches.append(batch) + return True + + def _prepare_single_redeem(self) -> WaitableConditionType: + """Prepare a multisend transaction for a single redeeming action.""" + yield from self.wait_for_condition_with_sleep(self._check_already_resolved) + steps = [] + if not self.already_resolved: + # 1. resolve the question if it hasn't been resolved yet + steps.append(self._build_resolve_data) + + yield from self.wait_for_condition_with_sleep(self._get_history_hash) + if not self.is_history_hash_null: + # 2. claim the winnings if claiming has not been done yet + if not self.redeeming_progress.claim_finished: + success = yield from self.get_claim_params() + if not success: + return False + + # simulate claiming to get the claim params + success = yield from self._simulate_claiming() + if not success: + return False + + if self.claim_winnings_simulation_ok: + steps.append(self._build_claim_data) + + # 3. we always redeem the position + steps.append(self._build_redeem_data) + for build_step in steps: + yield from self.wait_for_condition_with_sleep(build_step) + + return True + + def _process_candidate( + self, redeem_candidate: Trade + ) -> Generator[None, None, bool]: + """Process a redeeming candidate and return whether winnings were found.""" + self._current_redeem_info = redeem_candidate + + msg = f"Processing position with condition id {self.current_condition_id!r}..." + self.context.logger.info(msg) + + # double check whether the market is finalized + yield from self.wait_for_condition_with_sleep(self._check_finalized) + if not self.finalized: + self.context.logger.warning( + f"Conflict found! The current market, with condition id {self.current_condition_id!r}, " + f"is reported as not finalized by the realitio contract. " + f"However, an answer was finalized on {redeem_candidate.fpmm.answerFinalizedTimestamp}, " + f"and the last service transition occurred on {self.last_synced_timestamp}." + ) + return False + + if self.params.use_subgraph_for_redeeming: + condition_id = redeem_candidate.fpmm.condition.id.hex().lower() + if ( + condition_id not in self.redeeming_progress.unredeemed_trades + or self.redeeming_progress.unredeemed_trades[condition_id] == 0 + ): + return False + + # in case that the claimable amount is dust + if self.is_dust: + self.context.logger.info("Position's redeeming amount is dust.") + return False + + if self.params.use_subgraph_for_redeeming: + condition_id = redeem_candidate.fpmm.condition.id.hex().lower() + if ( + condition_id not in self.redeeming_progress.unredeemed_trades + or self.redeeming_progress.unredeemed_trades[condition_id] == 0 + ): + return False + + success = yield from self._prepare_single_redeem() + if not success: + return False + + self._expected_winnings += self.current_claimable_amount + return True + + def _prepare_safe_tx(self) -> Generator[None, None, Optional[str]]: + """ + Prepare the safe tx to redeem the positions of the trader. + + Steps: + 1. Get all the trades of the trader. + 2. For each trade, check if the trader has not already redeemed a non-dust winning position. + 3. If so, prepare a multisend transaction like this: + TXS: + 1. resolve (optional) + Check if the condition needs to be resolved. If so, add the tx to the multisend. + + 2. claimWinnings + Prepare a claim winnings tx for each winning position. Add it to the multisend. + + 3. redeemPositions + Prepare a redeem positions tx for each winning position. Add it to the multisend. + + We do not convert claimed wxDAI to xDAI, because this is the currency that the service is using to place bets. + + :yields: None + :returns: the safe's transaction hash for the redeeming operation. + """ + if len(self.trades) > 0: + self.context.logger.info("Preparing a multisend tx to redeem payout...") + + winnings_found = 0 + + for redeem_candidate in self.trades: + is_claimable = yield from self._process_candidate(redeem_candidate) + if not is_claimable: + msg = "Not redeeming position. Moving to the next one..." + self.context.logger.info(msg) + continue + + if self.params.redeeming_batch_size > 1: + self.context.logger.info("Adding position to the multisend batch...") + + winnings_found += 1 + # we mark this condition id as being claimed. + # once the transaction gets successfully through, it will be moved to + # self.redeeming_progress.claiming_condition_ids, and will no longer be taken into + # consideration. This is done to avoid cases where the subgraph is not up-to date + # and the same condition id is returned multiple times. + claiming_condition_id = redeem_candidate.fpmm.condition.id.hex() + self.redeeming_progress.claiming_condition_ids.append(claiming_condition_id) + + if winnings_found == self.params.redeeming_batch_size: + break + + if winnings_found == 0: + self.context.logger.info("No winnings to redeem.") + return None + + winnings = self.wei_to_native(self._expected_winnings) + self.context.logger.info( + "Preparing the multisend transaction to redeem winnings of " + f"{winnings} wxDAI for {winnings_found} position(s)." + ) + for build_step in ( + self._build_multisend_data, + self._build_multisend_safe_tx_hash, + ): + yield from self.wait_for_condition_with_sleep(build_step) + + self.context.logger.info("Transaction successfully prepared.") + return self.tx_hex + + def finish_behaviour(self, payload: BaseTxPayload) -> Generator: + """Finish the behaviour.""" + yield from super().finish_behaviour(payload) + + def async_act(self) -> Generator: + """Do the action.""" + with self.context.benchmark_tool.measure(self.behaviour_id).local(): + if not self.redeeming_progress.check_started: + yield from self._get_redeem_info() + self._store_progress() + else: + msg = "Picking up progress from where it was left off before the timeout occurred." + self.context.logger.info(msg) + self._load_progress() + + if not self.redeeming_progress.check_finished: + self.redeeming_progress.cleaned = yield from self._clean_redeem_info() + + agent = self.context.agent_address + payload = RedeemPayload(agent) + + if self.redeeming_progress.cleaned: + redeem_tx_hex = yield from self._prepare_safe_tx() + if redeem_tx_hex is not None: + tx_submitter = self.matching_round.auto_round_id() + condition_ids = json.dumps(list(self.redeemed_condition_ids)) + payout = self.payout_so_far + payload = RedeemPayload( + agent, + tx_submitter, + redeem_tx_hex, + condition_ids, + payout, + ) + + yield from self.finish_behaviour(payload) diff --git a/packages/valory/skills/market_creation_manager_abci/models.py b/packages/valory/skills/market_creation_manager_abci/models.py index ec08972..eacc380 100644 --- a/packages/valory/skills/market_creation_manager_abci/models.py +++ b/packages/valory/skills/market_creation_manager_abci/models.py @@ -19,7 +19,9 @@ """This module contains the shared state for the abci skill of MarketCreationManagerAbciApp.""" +from dataclasses import dataclass, field from typing import Any, Dict, List, Set, Type +from hexbytes import HexBytes from aea.skills.base import SkillContext @@ -35,6 +37,78 @@ from packages.valory.skills.market_creation_manager_abci.rounds import ( MarketCreationManagerAbciApp, ) +from packages.valory.contracts.multisend.contract import MultiSendOperation + + +@dataclass +class MultisendBatch: + """A structure representing a single transaction of a multisend.""" + + to: str + data: HexBytes + value: int = 0 + operation: MultiSendOperation = MultiSendOperation.CALL + + +@dataclass +class RedeemingProgress: + """A structure to keep track of the redeeming check progress.""" + + trades: Set[Trade] = field(default_factory=lambda: set()) + utilized_tools: Dict[str, int] = field(default_factory=lambda: {}) + policy: Optional[EGreedyPolicy] = None + claimable_amounts: Dict[HexBytes, int] = field(default_factory=lambda: {}) + earliest_block_number: int = 0 + event_filtering_batch_size: int = 0 + check_started: bool = False + check_from_block: BlockIdentifier = "earliest" + check_to_block: BlockIdentifier = "latest" + cleaned: bool = False + payouts: Dict[str, int] = field(default_factory=lambda: {}) + unredeemed_trades: Dict[str, int] = field(default_factory=lambda: {}) + claim_started: bool = False + claim_from_block: BlockIdentifier = "earliest" + claim_to_block: BlockIdentifier = "latest" + answered: list = field(default_factory=lambda: []) + claiming_condition_ids: List[str] = field(default_factory=lambda: []) + claimed_condition_ids: List[str] = field(default_factory=lambda: []) + + @property + def check_finished(self) -> bool: + """Whether the check has finished.""" + return self.check_started and self.check_from_block == self.check_to_block + + @property + def claim_finished(self) -> bool: + """Whether the claiming has finished.""" + return self.claim_started and self.claim_from_block == self.claim_to_block + + @property + def claim_params(self) -> Optional[ClaimParamsType]: + """The claim parameters, prepared for the `claimWinnings` call.""" + history_hashes = [] + addresses = [] + bonds = [] + answers = [] + try: + for i, answer in enumerate(reversed(self.answered)): + # history_hashes second-last-to-first, the hash of each history entry, calculated as described here: + # https://realitio.github.io/docs/html/contract_explanation.html#answer-history-entries. + if i == len(self.answered) - 1: + history_hashes.append(ZERO_BYTES) + else: + history_hashes.append(self.answered[i + 1]["args"]["history_hash"]) + + # last-to-first, the address of each answerer or commitment sender + addresses.append(answer["args"]["user"]) + # last-to-first, the bond supplied with each answer or commitment + bonds.append(answer["args"]["bond"]) + # last-to-first, each answer supplied, or commitment ID if the answer was supplied with commit->reveal + answers.append(answer["args"]["answer"]) + except KeyError: + return None + + return history_hashes, addresses, bonds, answers class SharedState(BaseSharedState): @@ -46,6 +120,7 @@ def __init__(self, *args: Any, skill_context: SkillContext, **kwargs: Any) -> No """Initialize the shared state object.""" self.questions_requested_mech: Dict[str, Any] = {} self.questions_responded: Set[str] = set() + self.redeeming_progress: RedeemingProgress = RedeemingProgress() super().__init__(*args, skill_context=skill_context, **kwargs) @@ -160,6 +235,20 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.openai_api_key = self._ensure("openai_api_key", kwargs, type_=str) self.initial_funds = self._ensure("initial_funds", kwargs, type_=float) self.xdai_threshold = self._ensure("xdai_threshold", kwargs, type_=int) + + self.use_subgraph_for_redeeming = self._ensure("use_subgraph_for_redeeming", kwargs, type_=bool) + self.contract_timeout = self._ensure("contract_timeout", kwargs, type_=float) + self.max_filtering_retries = self._ensure("max_filtering_retries", kwargs, type_=int) + self.reduce_factor = self._ensure("reduce_factor", kwargs, type_=float) + self.redeeming_batch_size = self._ensure("redeeming_batch_size", kwargs, type_=int) + self.dust_threshold = self._ensure("dust_threshold", kwargs, type_=int) + self.minimum_batch_size = self._ensure("minimum_batch_size", kwargs, type_=int) + + # TODO These variables are re-defined with names compatible with trader service + self.conditional_tokens_address = self.conditional_tokens_contract + self.realitio_address = self.realitio_contract + self.realitio_proxy_address = self.realitio_oracle_proxy_contract + super().__init__(*args, **kwargs) diff --git a/packages/valory/skills/market_creation_manager_abci/rounds.py b/packages/valory/skills/market_creation_manager_abci/rounds.py index d87f96f..235921a 100644 --- a/packages/valory/skills/market_creation_manager_abci/rounds.py +++ b/packages/valory/skills/market_creation_manager_abci/rounds.py @@ -274,7 +274,7 @@ def end_block(self) -> Optional[Tuple[BaseSynchronizedData, Enum]]: return synced_data, event -class RedeemBondRound(CollectSameUntilThresholdRound): +class RedeemRound(CollectSameUntilThresholdRound): """A round for redeeming Realitio""" ERROR_PAYLOAD = "ERROR_PAYLOAD" @@ -871,7 +871,7 @@ class FinishedWithDepositDaiRound(DegenerateRound): """FinishedMarketCreationManagerRound""" -class FinishedWithRedeemBondRound(DegenerateRound): +class FinishedWithRedeemRound(DegenerateRound): """FinishedMarketCreationManagerRound""" @@ -928,15 +928,15 @@ class MarketCreationManagerAbciApp(AbciApp[Event]): Event.ROUND_TIMEOUT: CollectRandomnessRound, }, SelectKeeperRound: { - Event.DONE: RedeemBondRound, + Event.DONE: RedeemRound, Event.NO_MAJORITY: CollectRandomnessRound, Event.ROUND_TIMEOUT: CollectRandomnessRound, }, - RedeemBondRound: { - Event.DONE: FinishedWithRedeemBondRound, + RedeemRound: { + Event.DONE: FinishedWithRedeemRound, Event.NO_TX: CollectProposedMarketsRound, - Event.NO_MAJORITY: RedeemBondRound, - Event.ERROR: RedeemBondRound, + Event.NO_MAJORITY: RedeemRound, + Event.ERROR: RedeemRound, }, CollectProposedMarketsRound: { Event.DONE: ApproveMarketsRound, @@ -1007,7 +1007,7 @@ class MarketCreationManagerAbciApp(AbciApp[Event]): FinishedWithRemoveFundingRound: {}, FinishedWithDepositDaiRound: {}, FinishedWithGetPendingQuestionsRound: {}, - FinishedWithRedeemBondRound: {}, + FinishedWithRedeemRound: {}, FinishedWithoutTxRound: {}, } final_states: Set[AppState] = { @@ -1017,7 +1017,7 @@ class MarketCreationManagerAbciApp(AbciApp[Event]): FinishedWithRemoveFundingRound, FinishedWithDepositDaiRound, FinishedWithGetPendingQuestionsRound, - FinishedWithRedeemBondRound, + FinishedWithRedeemRound, FinishedWithoutTxRound, } event_to_timeout: EventToTimeout = { @@ -1042,7 +1042,7 @@ class MarketCreationManagerAbciApp(AbciApp[Event]): FinishedWithDepositDaiRound: { get_name(SynchronizedData.most_voted_tx_hash), }, - FinishedWithRedeemBondRound: { + FinishedWithRedeemRound: { get_name(SynchronizedData.most_voted_tx_hash), }, FinishedMarketCreationManagerRound: { diff --git a/packages/valory/skills/market_creation_manager_abci/skill.yaml b/packages/valory/skills/market_creation_manager_abci/skill.yaml index b106e51..2477a38 100644 --- a/packages/valory/skills/market_creation_manager_abci/skill.yaml +++ b/packages/valory/skills/market_creation_manager_abci/skill.yaml @@ -217,6 +217,14 @@ models: light_slash_unit_amount: 5000000000000000 serious_slash_unit_amount: 8000000000000000 mech_contract_address: '0x77af31de935740567cf4ff1986d04b2c964a786a' + use_subgraph_for_redeeming: true + contract_timeout: 300.0 + max_filtering_retries: 6 + reduce_factor: 0.25 + conditional_tokens_address: '0xCeAfDD6bc0bEF976fdCd1112955828E00543c0Ce' + redeeming_batch_size: 1 + dust_threshold: 10000000000000 + minimum_batch_size: 500 class_name: MarketCreationManagerParams randomness_api: args: diff --git a/packages/valory/skills/market_maker_abci/composition.py b/packages/valory/skills/market_maker_abci/composition.py index 8867823..075a530 100644 --- a/packages/valory/skills/market_maker_abci/composition.py +++ b/packages/valory/skills/market_maker_abci/composition.py @@ -52,7 +52,7 @@ FinishedRegistrationRound: MarketCreationManagerAbci.GetPendingQuestionsRound, MarketCreationManagerAbci.FinishedWithoutTxRound: ResetAndPauseRound, MarketCreationManagerAbci.FinishedWithDepositDaiRound: TransactionSettlementAbci.RandomnessTransactionSubmissionRound, - MarketCreationManagerAbci.FinishedWithRedeemBondRound: TransactionSettlementAbci.RandomnessTransactionSubmissionRound, + MarketCreationManagerAbci.FinishedWithRedeemRound: TransactionSettlementAbci.RandomnessTransactionSubmissionRound, MarketCreationManagerAbci.FinishedMarketCreationManagerRound: TransactionSettlementAbci.RandomnessTransactionSubmissionRound, MarketCreationManagerAbci.FinishedWithRemoveFundingRound: TransactionSettlementAbci.RandomnessTransactionSubmissionRound, MarketCreationManagerAbci.FinishedWithGetPendingQuestionsRound: MechRequestStates.MechRequestRound, diff --git a/packages/valory/skills/market_maker_abci/skill.yaml b/packages/valory/skills/market_maker_abci/skill.yaml index 544aee4..32d7695 100644 --- a/packages/valory/skills/market_maker_abci/skill.yaml +++ b/packages/valory/skills/market_maker_abci/skill.yaml @@ -213,6 +213,14 @@ models: light_slash_unit_amount: 5000000000000000 serious_slash_unit_amount: 8000000000000000 mech_contract_address: '0x77af31de935740567cf4ff1986d04b2c964a786a' + use_subgraph_for_redeeming: true + contract_timeout: 300.0 + max_filtering_retries: 6 + reduce_factor: 0.25 + conditional_tokens_address: '0xCeAfDD6bc0bEF976fdCd1112955828E00543c0Ce' + redeeming_batch_size: 1 + dust_threshold: 10000000000000 + minimum_batch_size: 500 class_name: Params randomness_api: args: