Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore swap agent #110

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 91 additions & 56 deletions submodules/moragents_dockers/agents/src/agents/token_swap/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,45 @@

class TokenSwapAgent:
def __init__(self, config, llm, embeddings):
self.config = config
self.config = Config.get_instance()
self.llm = llm
self.embeddings = embeddings
self.tools_provided = tools.get_tools()
self.context = []

async def set_inch_api_key(self, request):
try:
if isinstance(request, dict):
data = request
else:
data = await request.json()

if "inch_api_key" not in data:
logger.warning("Missing required API credentials")
return {"error": "Missing 1inch API key"}, 400

self.config.inch_api_key = data["inch_api_key"]
logger.info("1inch API key saved successfully")

return {"success": "1inch API key saved successfully"}, 200

except Exception as e:
logger.error(f"Error setting 1inch API key: {e}")
return {"error": str(e)}, 500

def api_request_url(self, method_name, query_params, chain_id):
base_url = Config.APIBASEURL + str(chain_id)
return f"{base_url}{method_name}?{'&'.join([f'{key}={value}' for key, value in query_params.items()])}"
"""Build 1inch API request URL with parameters"""
base_url = f"{self.config.APIBASEURL}{chain_id}"
param_string = "&".join([f"{k}={v}" for k, v in query_params.items()])
return f"{base_url}{method_name}?{param_string}"

def check_allowance(self, token_address, wallet_address, chain_id):
url = self.api_request_url(
"/approve/allowance",
{"tokenAddress": token_address, "walletAddress": wallet_address},
chain_id,
)
response = requests.get(url, headers=Config.HEADERS)
response = requests.get(url, headers=tools.get_headers())
data = response.json()
return data

Expand All @@ -39,66 +61,78 @@ def approve_transaction(self, token_address, chain_id, amount=None):
else {"tokenAddress": token_address}
)
url = self.api_request_url("/approve/transaction", query_params, chain_id)
response = requests.get(url, headers=Config.HEADERS)
response = requests.get(url, headers=tools.get_headers())
transaction = response.json()
return transaction

def build_tx_for_swap(self, swap_params, chain_id):
"""Build swap transaction using stored API key"""
url = self.api_request_url("/swap", swap_params, chain_id)
swap_transaction = requests.get(url, headers=Config.HEADERS).json()
return swap_transaction
logger.debug(f"Swap request URL: {url}")

response = requests.get(url, headers=tools.get_headers())
if response.status_code != 200:
logger.error(f"1inch API error: {response.text}")
raise ValueError(f"1inch API error: {response.text}")

raw_swap_transaction = response.json()
return raw_swap_transaction

def get_response(self, message, chain_id, wallet_address):
system_prompt = (
"Don't make assumptions about the value of the arguments for the function "
"they should always be supplied by the user and do not alter the value of the arguments. "
"Don't make assumptions about what values to plug into functions. Ask for clarification if a user "
"request is ambiguous. you only need the value of token1 we dont need the value of token2. After "
"starting from scratch do not assume the name of token1 or token2"
)
if not self.config.inch_api_key:
return "Please set your 1inch API key in Settings before making swap requests.", "assistant", None

system_message = """You are a helpful assistant that processes token swap requests.
When a user wants to swap tokens, analyze their request and provide a SINGLE tool call with complete information.

Always return a single 'swap_agent' tool call with all three required parameters:
- token1: the source token
- token2: the destination token
- value: the amount to swap

If any information is missing from the user's request, do not make a tool call. Instead, respond asking for the missing information."""

messages = [
{"role": "system", "content": system_prompt},
{"role": "system", "content": system_message}
]
messages.extend(message)

logger.info("Sending request to LLM with %d messages", len(messages))


llm_with_tools = self.llm.bind_tools(self.tools_provided)

try:
result = llm_with_tools.invoke(messages)
logger.info("Received response from LLM: %s", result)

if result.tool_calls:
tool_call = result.tool_calls[0]
logger.info("Selected tool: %s", tool_call)
func_name = tool_call.get("name")
args = tool_call.get("args")
logger.info("LLM suggested using tool: %s", func_name)

if func_name == "swap_agent":
tok1 = args["token1"]
tok2 = args["token2"]
value = args["value"]
try:
res, role = tools.swap_coins(
tok1, tok2, float(value), chain_id, wallet_address
)
except (
tools.InsufficientFundsError,
tools.TokenNotFoundError,
tools.SwapNotPossibleError,
) as e:
self.context = []
return str(e), "assistant", None
return res, role, None
else:
logger.info("LLM provided a direct response without using tools")
return result.content, "assistant", "crypto swap agent"

if not result.tool_calls:
self.context = [] # Clear context when asking for info
return "Please specify the tokens you want to swap and the amount.", "assistant", None

# Get the most complete tool call
tool_call = max(result.tool_calls,
key=lambda x: sum(1 for v in x.get("args", {}).values() if v))
args = tool_call.get("args", {})
tok1 = args.get("token1")
tok2 = args.get("token2")
value = args.get("value")

if not all([tok1, tok2, value]):
self.context = [] # Clear context when asking for missing params
return "Please specify both tokens and the amount you want to swap.", "assistant", None

try:
logger.info(f"Processing swap: {tok1} -> {tok2}, amount: {value}")
res, role = tools.swap_coins(tok1, tok2, value, chain_id, wallet_address)
if "error" in res:
return res["error"], "assistant", None
return res, role, None
except (tools.InsufficientFundsError, tools.TokenNotFoundError, tools.SwapNotPossibleError) as e:
self.context = []
return str(e), "assistant", None

except Exception as e:
logger.error(f"Error in get_response: {str(e)}")
return f"An error occurred: {str(e)}", "assistant", None
logger.error("Error in get_response: %s", str(e))
self.context = []
return f"Sorry, there was an error processing your request: {str(e)}", "assistant", None

def get_status(self, flag, tx_hash, tx_type):
response = ""
Expand Down Expand Up @@ -136,21 +170,22 @@ def generate_response(self, prompt, chain_id, wallet_address):
)
return response, role, next_turn_agent

def chat(self, request: ChatRequest):
data = request.dict()
def chat(self, chat_request: ChatRequest):
try:
if "prompt" in data:
prompt = data["prompt"]
wallet_address = data["wallet_address"]
chain_id = data["chain_id"]
if "prompt" in chat_request.dict():
prompt = chat_request.prompt.dict()
chain_id = chat_request.chain_id
wallet_address = chat_request.wallet_address

response, role, next_turn_agent = self.generate_response(
prompt, chain_id, wallet_address
)
return {
"role": role,
"content": response,
"next_turn_agent": next_turn_agent,
"next_turn_agent": next_turn_agent
}

else:
return {"error": "Missing required parameters"}, 400
except Exception as e:
Expand Down Expand Up @@ -189,7 +224,7 @@ def approve(self, request_data):
chain_id = request_data["chain_id"]
amount = request_data["amount"]
res = self.approve_transaction(token, chain_id, amount)
return {"response": res}
return res
else:
return {"error": "Missing required parameters"}, 400
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ class Config:
INCH_URL = "https://api.1inch.dev/token"
QUOTE_URL = "https://api.1inch.dev/swap"
APIBASEURL = f"https://api.1inch.dev/swap/v6.0/"
HEADERS = {
"Authorization": "Bearer WvQuxaMYpPvDiiOL5RHWUm7OzOd20nt4",
"accept": "application/json",
}
WEB3RPCURL = {
"56": "https://bsc-dataseed.binance.org",
"42161": "https://arb1.arbitrum.io/rpc",
Expand Down Expand Up @@ -52,3 +48,20 @@ class Config:
},
]
INCH_NATIVE_TOKEN_ADDRESS = "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE"

_instance = None
_inch_api_key = None

@classmethod
def get_instance(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance

@property
def inch_api_key(self):
return self._inch_api_key

@inch_api_key.setter
def inch_api_key(self, value):
self._inch_api_key = value
77 changes: 67 additions & 10 deletions submodules/moragents_dockers/agents/src/agents/token_swap/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from web3 import Web3

from src.agents.token_swap.config import Config
logger = logging.getLogger(__name__)



class InsufficientFundsError(Exception):
Expand All @@ -18,16 +20,39 @@ class SwapNotPossibleError(Exception):
pass


def search_tokens(query, chain_id, limit=1, ignore_listed="false"):
def get_headers(api_key: str | None = None) -> dict[str, str]:
"""Get headers for 1inch API requests with optional API key override"""
config = Config.get_instance()
headers = {
"Authorization": f"Bearer {api_key or config.inch_api_key or ''}",
"accept": "application/json",
}
return headers


def search_tokens(
query: str,
chain_id: int,
limit: int = 1,
ignore_listed: str = "false",
inch_api_key: str | None = None
) -> dict | None:
logger.info(f"Searching tokens - Query: {query}, Chain ID: {chain_id}")
endpoint = f"/v1.2/{chain_id}/search"
params = {"query": query, "limit": limit, "ignore_listed": ignore_listed}

response = requests.get(
Config.INCH_URL + endpoint, params=params, headers=Config.HEADERS
Config.INCH_URL + endpoint,
params=params,
headers=get_headers(inch_api_key)
)
logger.info(f"Search tokens response status: {response.status_code}")
if response.status_code == 200:
return response.json()
result = response.json()
logger.info(f"Found tokens: {result}")
return result
else:
logging.error(f"Failed to search tokens. Status code: {response.status_code}")
logger.error(f"Failed to search tokens. Status code: {response.status_code}, Response: {response.text}")
return None


Expand Down Expand Up @@ -99,16 +124,24 @@ def validate_swap(web3: Web3, token1, token2, chain_id, amount, wallet_address):
return t1[0]["address"], t1[0]["symbol"], t2[0]["address"], t2[0]["symbol"]


def get_quote(token1, token2, amount_in_wei, chain_id):
def get_quote(token1, token2, amount_in_wei, chain_id, inch_api_key=None):
logger.info(f"Getting quote - Token1: {token1}, Token2: {token2}, Amount: {amount_in_wei}, Chain ID: {chain_id}")
endpoint = f"/v6.0/{chain_id}/quote"
params = {"src": token1, "dst": token2, "amount": int(amount_in_wei)}
logger.debug(f"Quote request - URL: {Config.QUOTE_URL + endpoint}, Params: {params}")

response = requests.get(
Config.QUOTE_URL + endpoint, params=params, headers=Config.HEADERS
Config.QUOTE_URL + endpoint,
params=params,
headers=get_headers(inch_api_key)
)
logger.info(f"Quote response status: {response.status_code}")
if response.status_code == 200:
return response.json()
result = response.json()
logger.info(f"Quote received: {result}")
return result
else:
logging.error(f"Failed to get quote. Status code: {response.status_code}")
logger.error(f"Failed to get quote. Status code: {response.status_code}, Response: {response.text}")
return None


Expand All @@ -134,8 +167,32 @@ def convert_to_readable_unit(
return smallest_unit_amount / (10**decimals)


def swap_coins(token1, token2, amount, chain_id, wallet_address):
"""Swap two crypto coins with each other"""
def swap_coins(token1: str, token2: str, amount: str | float, chain_id: int, wallet_address: str) -> tuple[str, str]:
"""Swap two tokens"""
logger.info(f"Attempting swap: {token1} -> {token2}, amount: {amount}")

# Validate amount first
if not amount or (isinstance(amount, str) and not amount.strip()):
return {
"error": "Please specify the amount you want to swap"
}, "assistant"

try:
amount = float(amount)
except ValueError:
return {
"error": f"Invalid amount format: {amount}. Please provide a valid number."
}, "assistant"

if amount <= 0:
return {
"error": "Amount must be greater than 0"
}, "assistant"

# Normalize token symbols
token1 = token1.strip().upper()
token2 = token2.strip().upper()

web3 = Web3(Web3.HTTPProvider(Config.WEB3RPCURL[str(chain_id)]))
t1_a, t1_id, t2_a, t2_id = validate_swap(
web3, token1, token2, chain_id, amount, wallet_address
Expand Down
Loading
Loading