Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Formating prints according to CERT-7899 #61

Merged
merged 21 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Quorum/apis/block_explorers/source_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _parse_source_code(self) -> None:
self._parsed_contract = contract_ast[contract_name]['ast']

except Exception as e:
pp.pretty_print(f"Error parsing source code for {self.file_name}: {e}\n"
pp.pprint(f"Error parsing source code for {self.file_name}: {e}\n"
f"Some of the checks will not apply to this contract!!!",
pp.Colors.FAILURE)
finally:
Expand Down
6 changes: 3 additions & 3 deletions Quorum/apis/git_api/git_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ def _load_repos_from_file(self, gt_config: dict[str, any]) -> tuple[dict[str, st
def __clone_or_update_for_repo(repo_name: str, repo_url: str, to_path: Path):
repo_path = to_path / repo_name
if repo_path.exists():
pp.pretty_print(f"Repository {repo_name} already exists at {repo_path}. Updating repo and submodules.", pp.Colors.INFO)
pp.pprint(f"Repository {repo_name} already exists at {repo_path}. Updating repo and submodules.", pp.Colors.INFO)
repo = Repo(repo_path)
repo.git.pull()
repo.git.submodule('update', '--init', '--recursive')
else:
pp.pretty_print(f"Cloning {repo_name} from URL: {repo_url} to {repo_path}...", pp.Colors.INFO)
pp.pprint(f"Cloning {repo_name} from URL: {repo_url} to {repo_path}...", pp.Colors.INFO)
Repo.clone_from(repo_url, repo_path, multi_options=["--recurse-submodules"])


Expand All @@ -68,7 +68,7 @@ def clone_or_update(self) -> None:
If the repository already exists locally, it will update the repository and its submodules.
Otherwise, it will clone the repository and initialize submodules.
"""

pp.pprint('Cloning and updating preliminaries', pp.Colors.INFO)
for repo_name, repo_url in self.repos.items():
GitManager.__clone_or_update_for_repo(repo_name, repo_url, self.modules_path)

Expand Down
44 changes: 26 additions & 18 deletions Quorum/checks/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,29 @@ def __print_diffs_results(self, missing_files: list[SourceCode], files_with_diff
missing_files (list[SourceCode]): A list of SourceCode objects representing missing files.
files_with_diffs (list[Compared]): A list of Compared objects representing files with differences.
"""
total_number_of_files = len(self.source_codes)
number_of_missing_files = len(missing_files)
number_of_files_with_diffs = len(files_with_diffs)

msg = f"Compared {total_number_of_files - number_of_missing_files}/{total_number_of_files} files for proposal {self.proposal_address}"
if number_of_missing_files == 0:
pp.pretty_print(msg, pp.Colors.SUCCESS)
else:
pp.pretty_print(msg, pp.Colors.WARNING)
for source_code in missing_files:
pp.pretty_print(f"Missing file: {source_code.file_name} in local repo", pp.Colors.WARNING)

if number_of_files_with_diffs == 0:
pp.pretty_print("No differences found.", pp.Colors.SUCCESS)
else:
pp.pretty_print(f"Found differences in {number_of_files_with_diffs} files", pp.Colors.FAILURE)
for compared_pair in files_with_diffs:
pp.pretty_print(f"Local: {compared_pair.local_file}\nProposal: {compared_pair.proposal_file}\nDiff: {compared_pair.diff}", pp.Colors.FAILURE)
num_total_files = len(self.source_codes)
num_missing_files = len(missing_files)
num_diff_files = len(files_with_diffs)
num_identical = num_total_files - num_missing_files - num_diff_files

# Identical files message.
pp.pprint(f'Files found identical: {num_identical}/{num_total_files}', pp.Colors.SUCCESS)

# Diffs files message.
if num_diff_files > 0:
diffs_msg = ('Proposal files found to deviate from their source of truth counterpart: '
f'{num_diff_files}/{num_total_files}\n')
for i, compared_pair in enumerate(files_with_diffs, 1):
diffs_msg += (f'\t{i}. Proposal file: {compared_pair.proposal_file}\n'
f'\t Source of truth file: {compared_pair.local_file}\n'
f'\t Diff can be found here: {compared_pair.diff}\n')
pp.pprint(diffs_msg, pp.Colors.FAILURE)

# Missing files message.
if num_missing_files > 0:
missing_msg = ('Proposal files missing from source of truth: '
f'{num_missing_files}/{num_total_files}\n')
for i, source_code in enumerate(missing_files, 1):
missing_msg += f'\t{i}. File: {source_code.file_name}\n'
pp.pprint(missing_msg, pp.Colors.WARNING)

21 changes: 12 additions & 9 deletions Quorum/checks/global_variables.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import re
from pathlib import Path


from Quorum.checks.check import Check
from Quorum.apis.block_explorers.source_code import SourceCode
import Quorum.utils.pretty_printer as pp
Expand Down Expand Up @@ -75,11 +73,16 @@ def __process_results(self, source_code_to_violated_variables: dict[str, list[di
to lists of violated variables.
"""
if not source_code_to_violated_variables:
pp.pretty_print("All global variables are constant or immutable.", pp.Colors.SUCCESS)
else:
pp.pretty_print("Global variable checks failed:", pp.Colors.FAILURE)
for file_name, violated_variables in source_code_to_violated_variables.items():
pp.pretty_print(f"File {file_name} contains variables that are not constant or immutable"
,pp.Colors.FAILURE)
self._write_to_file(Path(file_name).stem.removesuffix(".sol"), violated_variables)
pp.pprint('All global variables are constant or immutable.', pp.Colors.SUCCESS)
return

msg = ("Some global variables aren't constant or immutable. A storage collision may occur!\n"
f'The following variables found to be storage variables: ')
i = 1
for file_name, violated_variables in source_code_to_violated_variables.items():
for var in violated_variables:
msg += f"\t{i}. File {file_name}: {var['name']}"
i += 1
self._write_to_file(Path(file_name).stem.removesuffix('.sol'), violated_variables)
pp.pprint(msg, pp.Colors.FAILURE)

32 changes: 17 additions & 15 deletions Quorum/checks/new_listing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@


class NewListingCheck(Check):

def new_listing_check(self) -> None:
"""
Checks if the proposal address is a new listing on the blockchain.
Expand All @@ -14,13 +13,14 @@ def new_listing_check(self) -> None:
no new listings were found.
"""
functions = self._get_functions_from_source_codes()
if functions.get("newListings", functions.get("newListingsCustom")):
pp.pretty_print(f"New listings detected for {self.proposal_address}", pp.Colors.WARNING)
if functions.get('newListings', functions.get('newListingsCustom')):
pp.pprint(f"New listings detected for {self.proposal_address}", pp.Colors.WARNING)

# Check if Anthropic API key is configured
if not config.ANTHROPIC_API_KEY:
pp.pretty_print(
"First deposit check is skipped. If you have a LLM API key, you can add it to your environment variables to enable this check",
pp.pprint(
'New listings were detected in payload but first deposit check is skipped.\n'
'If you have a LLM API key, you can add it to your environment variables to enable this check',
pp.Colors.WARNING
)
return
Expand All @@ -29,20 +29,22 @@ def new_listing_check(self) -> None:
proposal_code_str = '\n'.join(proposal_code)
listings: ListingArray | None = FirstDepositChain().execute(proposal_code_str)
if listings is None:
pp.pretty_print(f"Failed to retrieve new listings for {self.proposal_address}", pp.Colors.FAILURE)
pp.pprint('New listings were detected in payload but LLM failed to retrieve them.',
pp.Colors.FAILURE)
return
for listing in listings.listings:

msg = f'{len(listings.listings)} new asset listings were detected:\n'
for i, listing in enumerate(listings.listings, 1):
if listing.approve_indicator and listing.supply_indicator:
pp.pretty_print(
f"New listing detected for {listing}", pp.Colors.SUCCESS
)
msg += f'\t{i}. New listing detected for {listing.asset_symbol}\n'
else:
pp.pretty_print(f"New listing detected for {listing.asset_symbol} but no approval or supply detected", pp.Colors.FAILURE)
self._write_to_file("new_listings.json", listings.model_dump())

msg += f'\t{i}. New listing detected for {listing.asset_symbol} but no approval or supply detected\n'
pp.pprint(msg, pp.Colors.INFO)
self._write_to_file('new_listings.json', listings.model_dump())

else:
pp.pretty_print(f"No new listings detected for {self.proposal_address}", pp.Colors.INFO)
pp.pprint(f'No new listings detected for {self.proposal_address}', pp.Colors.INFO)

def _get_functions_from_source_codes(self) -> dict:
"""
Retrieves functions from the source codes.
Expand Down
150 changes: 92 additions & 58 deletions Quorum/checks/price_feed.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,19 @@
from pathlib import Path
import re
from dataclasses import dataclass

from Quorum.apis.price_feeds import PriceFeedProviderBase
from Quorum.apis.price_feeds import PriceFeedProviderBase, CoinGeckoAPI
from Quorum.utils.chain_enum import Chain
from Quorum.checks.check import Check
from Quorum.apis.block_explorers.source_code import SourceCode
import Quorum.utils.pretty_printer as pp


def remove_solidity_comments(source_code: str) -> str:
"""
Removes single-line and multi-line comments from Solidity source code.

Args:
source_code (str): The Solidity source code as a single string.

Returns:
str: The source code with comments removed.
"""
# Regex pattern to match single-line comments (//...)
single_line_comment_pattern = r'//.*?$'

# Regex pattern to match multi-line comments (/*...*/)
multi_line_comment_pattern = r'/\*.*?\*/'

# First, remove multi-line comments
source_code = re.sub(multi_line_comment_pattern, '', source_code, flags=re.DOTALL)

# Then, remove single-line comments
source_code = re.sub(single_line_comment_pattern, '', source_code, flags=re.MULTILINE)

return source_code


class PriceFeedCheck(Check):
"""
The PriceFeedCheck class is responsible for verifying the price feed addresses in the source code
against official Chainlink or Chronical data.
"""

def __init__(
self,
customer: str,
Expand All @@ -62,50 +37,45 @@ def __init__(
self.address_pattern = r'0x[a-fA-F0-9]{40}'
self.providers = providers

def __check_price_feed_address(self, address: str, file_name: str) -> dict | None:
@dataclass
class PriceFeedResult:
yoav-el-certora marked this conversation as resolved.
Show resolved Hide resolved
'''
This dataclass helps organize the results of the check for printing them to the user.
'''
address: str
found_on: str
price_feed: dict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the original pydantic class insead of the dict as its more structured approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def __hash__(self):
return hash(self.address)

def __check_price_feed_address(self, address: str) -> PriceFeedResult | None:
"""
Check if the given address is present in the price feed providers.

Args:
address (str): The address to be checked.
file_name (str): The name of the source code file where the address was found.

Returns:
dict | None: The price feed data if the address is found, otherwise None.
PriceFeedResult | None: The price feed data if the address is found, otherwise None.
"""
for provider in self.providers:
if (price_feed := provider.get_price_feed(self.chain, address)):

color = pp.Colors.SUCCESS
message = f"Found {address} on {provider.get_name()}\n"
message += str(price_feed)
if price_feed.proxy_address and price_feed.proxy_address.lower() != address.lower():
message += f"Proxy address: {price_feed.proxy_address}\n"
if address.lower() != price_feed.address.lower():
color = pp.Colors.FAILURE
message += f"This is an implementation contract with a proxy address\n"
message += f"Origin Address: {price_feed.address}\n"

pp.pretty_print(
message,
color
)
return price_feed.model_dump()

pp.pretty_print(
f"Address {address} not found in any address validation provider: {[p.get_name() for p in self.providers]}",
pp.Colors.INFO
)
return PriceFeedCheck.PriceFeedResult(address, provider.get_name(), price_feed.model_dump())
return None

def verify_price_feed(self) -> None:
"""
Verifies the price feed addresses in the source code against official Chainlink or Chronical data.
Verifies the price feed addresses in the source code against official Chainlink or Chronicle data.

This method iterates through each source code file to find and verify the address variables
against the official Chainlink and Chronical price feeds. It categorizes the addresses into
against the official Chainlink and Chronicle price feeds. It categorizes the addresses into
verified and violated based on whether they are found in the official source.
"""
all_addresses = set()
overall_verified_vars: set[PriceFeedCheck.PriceFeedResult] = set()
overall_unverified_vars: set[str] = set()

# Iterate through each source code file to find and verify address variables
for source_code in self.source_codes:
verified_sources_path = f"{Path(source_code.file_name).stem.removesuffix('.sol')}/verified_sources.json"
Expand All @@ -115,14 +85,78 @@ def verify_price_feed(self) -> None:
contract_text = '\n'.join(source_code.file_content)

# Remove comments from the source code
clean_text = remove_solidity_comments(contract_text)
clean_text = PriceFeedCheck.remove_solidity_comments(contract_text)
yoav-el-certora marked this conversation as resolved.
Show resolved Hide resolved

# Extract unique addresses using regex
addresses = set(re.findall(self.address_pattern, clean_text))

all_addresses.update(addresses)

for address in addresses:
if feed := self.__check_price_feed_address(address, source_code.file_name):
verified_variables.append(feed)

if res := self.__check_price_feed_address(address):
verified_variables.append(res.price_feed)
overall_verified_vars.add(res)
else:
overall_unverified_vars.add(address)

if verified_variables:
self._write_to_file(verified_sources_path, verified_variables)

num_addresses = len(all_addresses)
pp.pprint(f'{num_addresses} addresses identified in the payload.', pp.Colors.INFO)

coingecko_name = CoinGeckoAPI().get_name()
token_validation_res = {r for r in overall_verified_vars if r.found_on == coingecko_name}
price_feed_validation_res = overall_verified_vars - token_validation_res

# Print price feed validation
pp.pprint('Price feed validation', pp.Colors.INFO)
msg = (f'{len(price_feed_validation_res)}/{num_addresses} '
'were identified as price feeds of the configured providers:\n')
for i, var_res in enumerate(price_feed_validation_res, 1):
msg += (f'\t{i}. {var_res.address} found on {var_res.found_on}\n'
f"\t Name: {var_res.price_feed['name']}\n"
f"\t Decimals: {var_res.price_feed['decimals']}\n")
pp.pprint(msg, pp.Colors.SUCCESS)

# Print token validation
pp.pprint('Token validation', pp.Colors.INFO)
msg = (f'{len(token_validation_res)}/{num_addresses} '
'were identified as tokens of the configured providers:\n')
for i, var_res in enumerate(token_validation_res, 1):
msg += (f'\t{i}. {var_res.address} found on {var_res.found_on}\n'
f"\t Name: {var_res.price_feed['name']}\n"
f"\t Symbol: {var_res.price_feed['pair']}\n"
f"\t Decimals: {var_res.price_feed['decimals']}\n")
pp.pprint(msg, pp.Colors.SUCCESS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide to go with that insanity eventually, lets do it properly by split the logic for the entry point to the check in order to achieve a general solution which will support additional providers by not import them and check for their name every time. @yoav-el-certora Feel free to decide whether you want this change as part of this PR or open different ticket which i can take later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that I commented/asked this in your previous PR when splitting providers to price feed and tokens.
Lets create a subtask for this.
Great point

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I actually wanted to do this but didn't want to take it too far though in this ticket so it wouldn't get prolonged. I aim to do at as another ticket later @yoav-el-certora.


# Print not found
msg = (f'{len(overall_unverified_vars)}/{num_addresses} '
'explicit addresses were not identified using any provider:\n')
for i, address in enumerate(overall_unverified_vars, 1):
msg += f'\t{i}. {address}\n'
pp.pprint(msg, pp.Colors.FAILURE)

@staticmethod
def remove_solidity_comments(source_code: str) -> str:
"""
Removes single-line and multi-line comments from Solidity source code.

Args:
source_code (str): The Solidity source code as a single string.

Returns:
str: The source code with comments removed.
"""
# Regex pattern to match single-line comments (//...)
single_line_comment_pattern = r'//.*?$'

# Regex pattern to match multi-line comments (/*...*/)
multi_line_comment_pattern = r'/\*.*?\*/'

# First, remove multi-line comments
source_code = re.sub(multi_line_comment_pattern, '', source_code, flags=re.DOTALL)

# Then, remove single-line comments
source_code = re.sub(single_line_comment_pattern, '', source_code, flags=re.MULTILINE)

return source_code
5 changes: 2 additions & 3 deletions Quorum/checks/review_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,5 @@ def __init__(self, customer: str, chain: Chain, proposal_address: str, source_co
self.target_repo = self.customer_folder / "review_module"

def find_diffs(self) -> list[SourceCode]:
pp.pretty_print(f'Verifying missing files against {self.customer} review repo '
f'(cloned under {self.target_repo})', pp.Colors.INFO)
return super().find_diffs()
pp.pprint(f'Review repo cloned under: {self.target_repo}', pp.Colors.INFO)
return super().find_diffs()
Loading
Loading