Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Automate Comparison of Checksums for Forest and Lotus Snapshots #2963

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions scripts/benchmark_db/bench.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
%r{/(?<height>\d+)_*}
].freeze

HEIGHTS_TO_VALIDATE = 40
TIPSETS_TO_VALIDATE = 40

MINUTE = 60
HOUR = MINUTE * MINUTE
Expand All @@ -34,18 +34,19 @@

# Define default options and parse command line options.
options = {
heights: HEIGHTS_TO_VALIDATE,
tipsets: TIPSETS_TO_VALIDATE,
pattern: 'baseline',
chain: 'mainnet'
}
OptionParser.new do |opts|
opts.banner = 'Usage: bench.rb [options] snapshot'
opts.on('--dry-run', 'Only print the commands that will be run') { |v| options[:dry_run] = v }
opts.on('--heights [Integer]', Integer, 'Number of heights to validate') { |v| options[:heights] = v }
opts.on('--tipsets [Integer]', Integer, 'Number of tipsets to validate') { |v| options[:tipsets] = v }
opts.on('--pattern [String]', 'Run benchmarks that match the pattern') { |v| options[:pattern] = v }
opts.on('--chain [String]', 'Choose network chain [default: mainnet]') { |v| options[:chain] = v }
opts.on('--tempdir [String]', 'Specify a custom directory for running benchmarks') { |v| options[:tempdir] = v }
opts.on('--daily', 'Run snapshot import and validation time metrics') { |v| options[:daily] = v }
opts.on('--checksum', 'Run snapshot export checksum comparison') { |v| options[:checksum] = v }
end.parse!

# Create random temporary directory (or user-specified dir) for benchmarks,
Expand Down Expand Up @@ -230,23 +231,23 @@ def download_snapshot(output_dir: WORKING_DIR, chain: 'calibnet', url: nil)
# run metrics, and assign metrics for each benchmark.
def benchmarks_loop(benchmarks, options, bench_metrics)
benchmarks.each do |bench|
bench.dry_run, bench.snapshot_path, bench.heights, bench.chain = bench_loop_assignments(options)
bench.run(options[:daily], @snapshot_downloaded)
bench.dry_run, bench.snapshot_path, bench.tipsets, bench.chain = bench_loop_assignments(options)
bench.run(options, @snapshot_downloaded)

bench_metrics[bench.name] = bench.metrics

puts "\n"
rescue StandardError, Interrupt
@logger.error('Fiasco during benchmark run. Exiting...')
# Delete snapshot if downloaded, but not if user-provided.
FileUtils.rm_f(@snapshot_path) if @snapshot_downloaded
exit(1)
# rescue StandardError, Interrupt
# @logger.error('Fiasco during benchmark run. Exiting...')
# # Delete snapshot if downloaded, but not if user-provided.
# FileUtils.rm_f(@snapshot_path) if @snapshot_downloaded
# exit(1)
end
end

# Helper function for to create assignments for `benchmarks_loop` function.
def bench_loop_assignments(options)
[options[:dry_run], options[:snapshot_path], options[:heights], options[:chain]]
[options[:dry_run], options[:snapshot_path], options[:tipsets], options[:chain]]
end

# Run benchmarks and write to `CSV` if daily or markdown file if `DB` benchmarks.
Expand All @@ -261,6 +262,8 @@ def run_benchmarks(benchmarks, options)
end
if options[:daily]
write_csv(bench_metrics)
elsif options[:checksum]
#TODO
else
write_markdown(bench_metrics)
end
Expand Down Expand Up @@ -313,6 +316,11 @@ def run_benchmarks(benchmarks, options)
LotusBenchmark.new(name: 'lotus')
]
run_benchmarks(selection, options)
elsif options[:checksum]
# Run Forest client and export snapshot.
run_benchmarks([ForestBenchmark.new(name: 'forest')], options)
# Run Lotus and export snapshot at same height as Forest snapshot.
run_benchmarks([LotusBenchmark.new(name: 'lotus')], options)
else
# Benchmarks for database metrics.
selection = Set[]
Expand Down
94 changes: 64 additions & 30 deletions scripts/benchmark_db/benchmark_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,30 @@ def measure_online_validation(benchmark, pid, metrics)
end
end

def export_snapshot(benchmark, pid, metrics)
Thread.new do
loop do
status = benchmark.start_export_command
if !status.nil?
@logger.info 'Exporting snapshot'
syscall(*@export_command)
break
end
sleep 5
end

@logger.info 'Stopping process...'
benchmark.stop_command(pid)
end
end

# Calls online validation function and runs monitor to measure memory usage.
def proc_monitor(pid, benchmark)
def proc_monitor(options, pid, benchmark)
metrics = Concurrent::Hash.new
metrics[:rss] = []
metrics[:vsz] = []
measure_online_validation(benchmark, pid, metrics) if benchmark
measure_online_validation(benchmark, pid, metrics) if benchmark && !options[:checksum]
export_snapshot(benchmark, pid, metrics) if options[:checksum]
handle = Thread.new do
loop do
sample_proc(pid, metrics)
Expand All @@ -59,12 +77,12 @@ def proc_monitor(pid, benchmark)

# Helper function for measuring execution time; passes process ID to online
# validation and process monitor.
def exec_command_aux(command, metrics, benchmark)
def exec_command_aux(options, command, metrics, benchmark)
Open3.popen2(*command) do |i, o, t|
pid = t.pid
i.close

handle, proc_metrics = proc_monitor(pid, benchmark)
handle, proc_metrics = proc_monitor(options, pid, benchmark)
o.each_line do |l|
print l
end
Expand All @@ -75,13 +93,13 @@ def exec_command_aux(command, metrics, benchmark)
end

# Measures execution time of command.
def exec_command(command, benchmark = nil)
def exec_command(command, benchmark = nil, options = {})
@logger.info "$ #{command.join(' ')}"
return {} if @dry_run

metrics = Concurrent::Hash.new
t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
exec_command_aux(command, metrics, benchmark)
exec_command_aux(options, command, metrics, benchmark)
t1 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
metrics[:elapsed] = trunc_seconds(t1 - t0)
metrics
Expand All @@ -104,7 +122,7 @@ def build_config_file
# path, and start epoch for building and running client.
def build_substitution_hash
height = snapshot_height(@snapshot_path)
start = height - @heights
start = height - @tipsets

return { c: '<tbd>', s: '<tbd>', h: start } if @dry_run

Expand Down Expand Up @@ -148,37 +166,37 @@ def build_artefacts
# Mixin module for base benchmark class run and validation commands.
module RunCommands
# Create and call proper validation command, then write results to metrics.
def run_validation_step(daily, args, metrics)
unless daily
def run_validation_step(options, args, metrics)
unless options[:daily] || options[:checksum]
validate_command = splice_args(@validate_command, args)
metrics[:validate] = exec_command(validate_command)
return
end

validate_online_command = splice_args(@validate_online_command, args)
new_metrics = exec_command(validate_online_command, self)
new_metrics = exec_command(validate_online_command, self, options)
new_metrics[:tpm] =
new_metrics[:num_epochs] ? new_metrics[:num_epochs] / online_validation_secs : 'n/a'
new_metrics[:tpm] = new_metrics[:tpm].ceil(3)
metrics[:validate_online] = new_metrics
new_metrics[:tpm] = new_metrics[:tpm].ceil(3) unless new_metrics[:tpm] == 'n/a'
metrics[:validate_online] = new_metrics
end

# Import snapshot, write metrics, and call validation function, returning metrics.
def import_and_validation(daily, args, metrics)
def import_and_validation(options, args, metrics)
import_command = splice_args(@import_command, args)
metrics[:import] = exec_command(import_command)

# Save db size just after import.
metrics[:import][:db_size] = db_size unless @dry_run
metrics[:import][:db_size] = db_size unless @dry_run || options[:checksum]

run_validation_step(daily, args, metrics)
run_validation_step(options, args, metrics)
metrics
rescue StandardError, Interrupt
@logger.error('Fiasco during benchmark run. Deleting downloaded files, cleaning DB and stopping process...')
FileUtils.rm_f(@snapshot_path) if @snapshot_downloaded
clean_db
FileUtils.rm_rf(repository_name) if @created_repository
exit(1)
# rescue StandardError, Interrupt
# @logger.error('Fiasco during benchmark run. Deleting downloaded files, cleaning DB and stopping process...')
# FileUtils.rm_f(@snapshot_path) if @snapshot_downloaded
# clean_db
# FileUtils.rm_rf(repository_name) if @created_repository
# exit(1)
end

def forest_init(args)
Expand All @@ -188,23 +206,31 @@ def forest_init(args)

# This is the primary function called in `bench.rb` to run the metrics for
# each benchmark.
def run(daily, snapshot_downloaded)
def run(options, snapshot_downloaded)
begin
@snapshot_downloaded = snapshot_downloaded
@logger.info "Running bench: #{@name}"

metrics = Concurrent::Hash.new
args = build_artefacts
@sync_status_command = splice_args(@sync_status_command, args)
@export_command = splice_args(@export_command, args)

forest_init(args) if @name == 'forest'

@metrics = import_and_validation(daily, args, metrics)
rescue StandardError, Interrupt
@logger.error('Fiasco during benchmark run. Deleting downloaded files and stopping process...')
FileUtils.rm_f(@snapshot_path) if @snapshot_downloaded
FileUtils.rm_rf(repository_name) if @created_repository
exit(1)
if options[:checksum]
# Re-using this function to run the import and to ensure we reach the
# end of message sync stage. We can discard the metrics when we're
# just checking the checksum.
import_and_validation(options, args, metrics)
else
metrics = Concurrent::Hash.new
@metrics = import_and_validation(options, args, metrics)
end
# rescue StandardError, Interrupt
# @logger.error('Fiasco during benchmark run. Deleting downloaded files and stopping process...')
# FileUtils.rm_f(@snapshot_path) if @snapshot_downloaded
# FileUtils.rm_rf(repository_name) if @created_repository
# exit(1)
end

@logger.info 'Cleaning database'
Expand All @@ -231,6 +257,14 @@ def start_online_validation_command
end
end

# Check to see if message sync is finished
def start_export_command
puts 'Checking status'
output = syscall(*@sync_status_command)
status = output.match(/complete/m)
status
end

# Raise an error if repository name is not defined in each class instance.
def repository_name
raise 'repository_name method should be implemented'
Expand All @@ -256,7 +290,7 @@ class BenchmarkBase
include BuildCommands
include RunCommands
attr_reader :name, :metrics
attr_accessor :dry_run, :snapshot_path, :heights, :chain
attr_accessor :dry_run, :snapshot_path, :tipsets, :chain

def initialize(name:, config: {})
@name = name
Expand Down
1 change: 1 addition & 0 deletions scripts/benchmark_db/forest_bench.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def initialize(name:, config: {})
@sync_status_command = [
target_cli, '--config', '%<c>s', 'sync', 'status'
]
@export_command = [target_cli, 'snapshot', 'export']
@metrics = Concurrent::Hash.new
end
end
Expand Down