-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Script to purge all unprocessed messages #623
Conversation
WalkthroughThe pull request introduces a new script Changes
Sequence DiagramsequenceDiagram
participant User
participant CLI as Purge Messages Script
participant APIKeys
participant Queue
User->>CLI: Provide private key
CLI->>APIKeys: Initialize with private key
APIKeys-->>CLI: Consumer address
CLI->>Queue: Get unprocessed transaction count
Queue-->>CLI: Return count
CLI->>User: Prompt confirmation (show count)
User->>CLI: Confirm (y/n)
alt Confirmed
loop While unprocessed messages exist
CLI->>Queue: Pop message
Queue-->>CLI: Remove message
CLI->>User: Print messages popped
end
else Cancelled
CLI->>User: Exit without changes
end
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (2)
scripts/purge_messages.py (2)
33-34
: Enhance CLI documentation and logging.The script would benefit from proper CLI documentation, version information, and logging configuration.
Apply this diff to improve the CLI:
+import logging + +app = typer.Typer( + help="Utility script to purge unprocessed messages from the queue.", + name="purge_messages", +) + +@app.command() +def purge( + private_key: str = typer.Option( + ..., + help="Private key of the agent's wallet", + prompt=True, + hide_input=True, + ), + verbose: bool = typer.Option( + False, + "--verbose", + "-v", + help="Enable verbose logging", + ), +) -> None: + """ + Purge all unprocessed messages for a specific agent. + + Requires the agent's wallet private key for authentication. + """ + logging.basicConfig( + level=logging.DEBUG if verbose else logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + ) + main(SecretStr(private_key)) + if __name__ == "__main__": - typer.run(main) + app()
1-34
: Security Advisory: Handle with care!This script handles sensitive data (private keys) and performs destructive operations. Please consider:
- Adding rate limiting to prevent abuse
- Implementing audit logging for tracking purge operations
- Adding a dry-run mode for safety
- Documenting security implications in the README
Would you like me to help implement these security enhancements?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
scripts/purge_messages.py
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: test-build-image
- GitHub Check: pytest-docker
- GitHub Check: pytest
- GitHub Check: mypy
scripts/purge_messages.py
Outdated
import typer | ||
from prediction_market_agent_tooling.gtypes import PrivateKey | ||
from pydantic import SecretStr | ||
|
||
from prediction_market_agent.db.agent_communication import ( | ||
fetch_count_unprocessed_transactions, | ||
pop_message, | ||
) | ||
from prediction_market_agent.utils import APIKeys | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add missing imports and remove unused references.
The BET_FROM_PRIVATE_KEY
is referenced in the code but not imported. Also, consider adding type hints import for better type safety.
Apply this diff to fix the imports:
import typer
+from typing import Optional
from prediction_market_agent_tooling.gtypes import PrivateKey
from pydantic import SecretStr
from prediction_market_agent.db.agent_communication import (
fetch_count_unprocessed_transactions,
pop_message,
)
-from prediction_market_agent.utils import APIKeys
+from prediction_market_agent.utils import APIKeys, BET_FROM_PRIVATE_KEY
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
import typer | |
from prediction_market_agent_tooling.gtypes import PrivateKey | |
from pydantic import SecretStr | |
from prediction_market_agent.db.agent_communication import ( | |
fetch_count_unprocessed_transactions, | |
pop_message, | |
) | |
from prediction_market_agent.utils import APIKeys | |
import typer | |
from typing import Optional | |
from prediction_market_agent_tooling.gtypes import PrivateKey | |
from pydantic import SecretStr | |
from prediction_market_agent.db.agent_communication import ( | |
fetch_count_unprocessed_transactions, | |
pop_message, | |
) | |
from prediction_market_agent.utils import APIKeys, BET_FROM_PRIVATE_KEY |
scripts/purge_messages.py
Outdated
def main(private_key: SecretStr) -> None: | ||
keys = APIKeys(BET_FROM_PRIVATE_KEY=PrivateKey(private_key)) | ||
n_messages = fetch_count_unprocessed_transactions( | ||
consumer_address=keys.bet_from_address | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add input validation and error handling.
The function lacks input validation and error handling for invalid private keys. Also missing is a docstring explaining the function's purpose and parameters.
Apply this diff to improve the function:
def main(private_key: SecretStr) -> None:
+ """Purge all unprocessed messages for a specific agent.
+
+ Args:
+ private_key (SecretStr): The private key of the agent's wallet.
+
+ Raises:
+ ValueError: If the private key format is invalid.
+ ConnectionError: If unable to connect to the message queue.
+ """
+ if not private_key.get_secret_value():
+ raise ValueError("Private key cannot be empty")
+
+ try:
keys = APIKeys(BET_FROM_PRIVATE_KEY=PrivateKey(private_key))
n_messages = fetch_count_unprocessed_transactions(
consumer_address=keys.bet_from_address
)
+ except Exception as e:
+ raise ValueError(f"Invalid private key or connection error: {str(e)}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def main(private_key: SecretStr) -> None: | |
keys = APIKeys(BET_FROM_PRIVATE_KEY=PrivateKey(private_key)) | |
n_messages = fetch_count_unprocessed_transactions( | |
consumer_address=keys.bet_from_address | |
) | |
def main(private_key: SecretStr) -> None: | |
"""Purge all unprocessed messages for a specific agent. | |
Args: | |
private_key (SecretStr): The private key of the agent's wallet. | |
Raises: | |
ValueError: If the private key format is invalid. | |
ConnectionError: If unable to connect to the message queue. | |
""" | |
if not private_key.get_secret_value(): | |
raise ValueError("Private key cannot be empty") | |
try: | |
keys = APIKeys(BET_FROM_PRIVATE_KEY=PrivateKey(private_key)) | |
n_messages = fetch_count_unprocessed_transactions( | |
consumer_address=keys.bet_from_address | |
) | |
except Exception as e: | |
raise ValueError(f"Invalid private key or connection error: {str(e)}") |
if ( | ||
input( | ||
f"Are you sure you want to purge all {n_messages} messages for agent {keys.bet_from_address}? (y/n): " | ||
) | ||
!= "y" | ||
): | ||
return | ||
|
||
popped = 0 | ||
while fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address): | ||
pop_message(api_keys=keys) | ||
popped += 1 | ||
print(f"Popped {popped} messages.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve robustness of the purging logic.
The current implementation has several potential issues:
- No timeout mechanism for the while loop
- No error handling during message purging
- No batch processing for efficiency
- No progress indication for large message counts
Apply this diff to improve the implementation:
+ import time
+ from tqdm import tqdm
+
if (
input(
f"Are you sure you want to purge all {n_messages} messages for agent {keys.bet_from_address}? (y/n): "
)
!= "y"
):
return
popped = 0
- while fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address):
- pop_message(api_keys=keys)
- popped += 1
- print(f"Popped {popped} messages.")
+ start_time = time.time()
+ timeout = 300 # 5 minutes timeout
+
+ with tqdm(total=n_messages, desc="Purging messages") as pbar:
+ while (
+ fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address)
+ and time.time() - start_time < timeout
+ ):
+ try:
+ pop_message(api_keys=keys)
+ popped += 1
+ pbar.update(1)
+ except Exception as e:
+ print(f"\nError while purging message {popped + 1}: {str(e)}")
+ if input("\nContinue purging? (y/n): ") != "y":
+ break
+
+ if popped < n_messages:
+ print(f"\nPurge incomplete. {n_messages - popped} messages remaining.")
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
scripts/purge_messages.py (1)
32-33
: Enhance CLI interface with help text and version info.Add proper CLI documentation and version information for better usability.
if __name__ == "__main__": - typer.run(main) + app = typer.Typer( + help="Purge all unprocessed messages for a specific agent.", + add_completion=False, + ) + + @app.command() + def purge( + private_key: str = typer.Option( + ..., + help="Private key of the agent's wallet", + prompt=True, + hide_input=True, + ), + ) -> None: + """Purge all unprocessed messages for the specified agent.""" + main(SecretStr(private_key)) + + app(prog_name="purge_messages")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
scripts/purge_messages.py
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: test-build-image
- GitHub Check: pytest-docker
- GitHub Check: pytest
- GitHub Check: mypy
🔇 Additional comments (3)
scripts/purge_messages.py (3)
1-9
: Add missing imports and module docstring.The imports section needs the following improvements:
- Add SecretStr for secure private key handling
- Add type hints import
- Add module docstring explaining the script's purpose
+""" +Script to purge all unprocessed messages for a specific agent. + +This script provides a CLI interface to safely remove queued messages +associated with a specific consumer address using their private key. +""" import typer +from typing import Optional from prediction_market_agent_tooling.gtypes import private_key_type +from pydantic import SecretStr from prediction_market_agent.db.agent_communication import ( fetch_count_unprocessed_transactions, pop_message, ) from prediction_market_agent.utils import APIKeys
25-29
: Improve robustness of the purging logic.The current implementation lacks timeout mechanism, error handling, and progress indication.
popped = 0 - while fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address): - pop_message(api_keys=keys) - popped += 1 - print(f"Popped {popped} messages.") + import time + from tqdm import tqdm + + start_time = time.time() + timeout = 300 # 5 minutes timeout + + with tqdm(total=n_messages, desc="Purging messages") as pbar: + while ( + fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address) + and time.time() - start_time < timeout + ): + try: + pop_message(api_keys=keys) + popped += 1 + pbar.update(1) + except Exception as e: + print(f"\nError while purging message {popped + 1}: {str(e)}") + if input("\nContinue purging? (y/n): ") != "y": + break + + if time.time() - start_time >= timeout: + print("\nOperation timed out.") + + if popped < n_messages: + remaining = n_messages - popped + print(f"\nPurge incomplete. {remaining} messages remaining.")
11-12
:⚠️ Potential issueImprove private key handling and type safety.
The private key should be handled more securely using SecretStr, and the function needs proper type annotations.
-def main(private_key: str) -> None: - keys = APIKeys(BET_FROM_PRIVATE_KEY=private_key_type(private_key)) +def main(private_key: SecretStr) -> None: + """Purge all unprocessed messages for a specific agent. + + Args: + private_key (SecretStr): The private key of the agent's wallet. + + Raises: + ValueError: If the private key format is invalid. + ConnectionError: If unable to connect to the message queue. + """ + if not private_key.get_secret_value(): + raise ValueError("Private key cannot be empty") + + try: + keys = APIKeys(BET_FROM_PRIVATE_KEY=private_key_type(private_key.get_secret_value())) + except Exception as e: + raise ValueError(f"Invalid private key format: {str(e)}")Likely invalid or redundant comment.
n_messages = fetch_count_unprocessed_transactions( | ||
consumer_address=keys.bet_from_address | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for database operations.
The database operations should be wrapped in try-except blocks to handle potential connection issues.
+ try:
n_messages = fetch_count_unprocessed_transactions(
consumer_address=keys.bet_from_address
)
+ except Exception as e:
+ raise ConnectionError(f"Failed to fetch unprocessed messages: {str(e)}")
+
+ if n_messages == 0:
+ print("No messages to purge.")
+ return
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
n_messages = fetch_count_unprocessed_transactions( | |
consumer_address=keys.bet_from_address | |
) | |
try: | |
n_messages = fetch_count_unprocessed_transactions( | |
consumer_address=keys.bet_from_address | |
) | |
except Exception as e: | |
raise ConnectionError(f"Failed to fetch unprocessed messages: {str(e)}") | |
if n_messages == 0: | |
print("No messages to purge.") | |
return |
import typer | ||
from prediction_market_agent_tooling.gtypes import private_key_type | ||
|
||
from prediction_market_agent.db.agent_communication import ( | ||
fetch_count_unprocessed_transactions, | ||
pop_message, | ||
) | ||
from prediction_market_agent.utils import APIKeys | ||
|
||
|
||
def main(private_key: str) -> None: | ||
keys = APIKeys(BET_FROM_PRIVATE_KEY=private_key_type(private_key)) | ||
n_messages = fetch_count_unprocessed_transactions( | ||
consumer_address=keys.bet_from_address | ||
) | ||
|
||
if ( | ||
input( | ||
f"Are you sure you want to purge all {n_messages} messages for agent {keys.bet_from_address}? (y/n): " | ||
) | ||
!= "y" | ||
): | ||
return | ||
|
||
popped = 0 | ||
while fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address): | ||
pop_message(api_keys=keys) | ||
popped += 1 | ||
print(f"Popped {popped} messages.") | ||
|
||
|
||
if __name__ == "__main__": | ||
typer.run(main) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Add security warning and restrict file permissions
The script requires security improvements:
- Change file permissions to 600 (
chmod 600 scripts/purge_messages.py
) to restrict access since it handles private keys - Add a warning message at the start of the script:
def main(private_key: str) -> None:
"""
WARNING: This script performs irreversible deletion of messages.
Handle with care as it uses private keys and permanently removes data.
"""
🔗 Analysis chain
Add security considerations and file permissions.
- Add a warning about the destructive nature of this script
- Ensure the script has appropriate file permissions
Run this script to check and fix file permissions:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check and fix file permissions for the script
# Check current permissions
current_perms=$(stat -c "%a" scripts/purge_messages.py)
echo "Current permissions: $current_perms"
# Ensure the script is not world-readable/writable due to sensitive data handling
if [[ $current_perms =~ ^[0-7]{3}$ && $current_perms != "600" ]]; then
echo "Warning: Script should have restricted permissions (600) due to handling sensitive data"
fi
Length of output: 229
Script:
#!/bin/bash
# Check file permissions using ls and search for warning messages
echo "File permissions:"
ls -l scripts/purge_messages.py
echo -e "\nChecking for existing warning messages:"
rg -i "warning|caution|danger" scripts/purge_messages.py
Length of output: 306
while fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address): | ||
pop_message(api_keys=keys) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have expected that fetch_count_unprocessed_transactions
is called only once, yielding N unprocessed transactions. Afterwards, a for loop is created with exactly N, and those are then deleted.
Any reason for continuously sending RPC calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case it goes out of sync for some reason. But it's just script to be run "once in a decade", does that really matter?
No description provided.