-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add post query cycle script execution hook #32
base: main
Are you sure you want to change the base?
Conversation
@@ -44,6 +44,8 @@ type Stage struct { | |||
PostStageShellScripts []string `json:"post_stage_scripts,omitempty"` | |||
// Run shell scripts after executing each query. | |||
PostQueryShellScripts []string `json:"post_query_scripts,omitempty"` | |||
// Run shell scripts after finishing full query cycle runs each query. | |||
PostQueryCycleShellScripts []string `json:"post_query_cycle_scripts,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@steveburnett for doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A full cycle here means when we set cold_runs
and warm_runs
, each query in the benchmark will be run cold_runs + warm_runs
times in total.
post_query_scripts
will be called after each query execution, post_query_cycle_scripts
will be called after all the cold_runs
and warm_runs
are done for a unique query.
sys.exit(-1) | ||
|
||
file_path = sys.argv[1] | ||
increment_file_value(file_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we touched this file in this script, then I guess the unit test assert result should be updated?
conn = create_connection(hostname, username, password, catalogName) | ||
cur = conn.cursor() | ||
cur.execute(query) | ||
rows = cur.fetchall() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the error behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it could be because of either the connection issue or query execution issue.
Let me wrap it under try-except
4ea4c28
to
d90f64e
Compare
@ethanyzhang Thanks for your review. I have addressed your earlier comments. Also, I have added one more commit in this PR for adding one more hook to enable |
import paramiko | ||
import argparse | ||
|
||
def create_connection(host_name, user_name, user_password, db_name): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ethanyzhang how does pbench manage the utility functions like create_connection? It also appears in benchmarks/scripts/cache_cleaning_coordinator_post_query.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@agrawalreetika I think it may be better to move create_connection
to a different file then include it in both the worker script and the coordinator script.
Say this file is called presto_utils.py
, then here you can do:
from presto_utils import create_connection
@@ -0,0 +1,11 @@ | |||
import sys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is for the two .py file names. Why did you add my_
in front? I think it's better to remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this folder is for testing and demoing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "my" stand for?
@@ -0,0 +1,80 @@ | |||
import prestodb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache cleaning should be done before (not after) each query cycle (cold+warm runs). And it should only be applied to TPC Power Tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in the current implementation, I added 2 hooks -
- post_query_scripts: run this script before this stage is started
- pre_stage_scripts run this script after each query in this stage is complete
Which will eventually trigger the clean-up script, once when the overall query execution starts as partpre_stage_scripts
and then usingpost_query_scripts
after each query cycle (cold+warm runs) the same clean-up script would be executed. So by using both the hooks, I was running clean up script.
But now with this script name, I think it looks a little confusing. I will add one more hook named pre_query_scripts
just call that before each query cycle (cold+warm runs) for each query and rename these clean up script from post*
to pre*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @agrawalreetika I think the clean cache scripts should be called in every pre_query_cycle_scripts, not post_query_scripts and pre_stage_scripts. So we want this order suppose cold runs=1 and warm runs=1
clean cache, q1, q1, clean cache, q2, q2, clean cache, q3, q3, ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @agrawalreetika, I developed some questions about error propagation. See my comments and I am happy to discuss further.
|
||
def clean_directory_list_cache(hostname, username, password, catalogName): | ||
query = "CALL " + catalogName + ".system.invalidate_directory_list_cache()" | ||
conn = create_connection(hostname, username, password, catalogName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at your implementation here it seems like this can just use execute_presto_select_query
.
Maybe rename execute_presto_select_query
to execute_presto_query
.
|
||
def clean_metastore_cache(hostname, username, password, catalogName): | ||
query = "CALL " + catalogName + ".system.invalidate_metastore_cache()" | ||
conn = create_connection(hostname, username, password, catalogName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
stage/stage.go
Outdated
// run post query cycle shell scripts | ||
postQueryCycleErr := s.runShellScripts(ctx, s.PostQueryCycleShellScripts) | ||
if retErr == nil { | ||
retErr = postQueryCycleErr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, if the query was executed successfully, but something went wrong when you run the script to clean up the cache, what should we do? I guess we should not ignore the cache cleanup error because it will give us false benchmark signals?
If so, the way for the PostQueryCycleShellScripts errors to propagate is to return a non-zero return value from the script using sys.exit(1)
. Then this will cause postQueryCycleErr
here to be set as an execution error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ethanyzhang @agrawalreetika As we haven't tried it in real runs yet, this may or may not happen. But assume it happens because of some non-recoverable errors, the users won't be able to run any benchmark at all. Maybe we can consider issuing a warning but still continue the queries?
if rows[0][0] == True: | ||
print("Directory list cache clean up is successfull for", catalogName) | ||
else: | ||
print("Directory list cache clean up is failed for", catalogName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comments below at stage.go
for adding error propagation here.
if rows[0][0] == True: | ||
print("Metastore cache clean up is successfull for", catalogName) | ||
else: | ||
print("Metastore cache clean up is failed for", catalogName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for error propagation
import paramiko | ||
import argparse | ||
|
||
def create_connection(host_name, user_name, user_password, db_name): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@agrawalreetika I think it may be better to move create_connection
to a different file then include it in both the worker script and the coordinator script.
Say this file is called presto_utils.py
, then here you can do:
from presto_utils import create_connection
cleanup_worker_disk_cache(worker_public_ips, native_cache_directory_worker, "centos", args.sshkey) | ||
|
||
if is_worker_os_cache_cleanup_enabled: | ||
cleanup_worker_os_cache(worker_public_ips, "centos", args.sshkey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar question about propagating errors that could happen here back to pbench. The benchmark should fail if we actually couldn't effectively clean up the cache.
@@ -0,0 +1,11 @@ | |||
import sys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this folder is for testing and demoing.
958fee3
to
6517d0b
Compare
@ethanyzhang @yingsu00 Thanks for your review. I have made the changes based on the review comments. Please check. |
def cleanup_worker_disk_cache(worker_public_ips, directory_to_cleanup, login_user, ssh_key_path): | ||
for worker_ip in worker_public_ips: | ||
try: | ||
ssh = paramiko.SSHClient() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to extract the SSH connection to a utility function
# Directory list cache clean up | ||
if is_list_cache_cleanup_enabled: | ||
for catalogName in catalog_list: | ||
print("Cleaning up directory list cache for", catalogName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a space after "for"? Same for other occurrances
for catalogName in catalog_list: | ||
print("Cleaning up directory list cache for", catalogName) | ||
rows = clean_directory_list_cache(args.host, args.username, args.password, catalogName) | ||
print("directory_list_cache_cleanup_query Query Result:", rows) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space after "Result:"?
is_worker_os_cache_cleanup_enabled = True | ||
|
||
if is_worker_disk_cache_cleanup_enabled: | ||
native_cache_directory_worker = "/home/centos/presto/async_data_cache" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this work on Ubuntu?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currenlty pbench is getting called on Presto clusters, which runs on CentOs, where this is tested.
@@ -0,0 +1,114 @@ | |||
from mysql_utils import create_connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this being called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In https://github.com/prestodb/pbench/pull/32/files#diff-178b21505ceae58580985f1d1c6c27f05c66411c197c31a5efd7ca0acc94dd32R74 to get the worker list from mysql
339f67e
to
4c9dc6f
Compare
is_metadata_cache_cleanup_enabled = False | ||
|
||
# Directory list cache clean up | ||
if is_list_cache_cleanup_enabled: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@agrawalreetika Is it possible to attach the output of these? What would the output look like if output rows are multiple lines?
|
||
def clean_directory_list_cache(hostname, username, password, catalogName): | ||
query = "CALL " + catalogName + ".system.invalidate_directory_list_cache()" | ||
conn = create_connection(hostname, username, password, catalogName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we extract line 7-11 to a util function?
|
||
def clean_metastore_cache(hostname, username, password, catalogName): | ||
query = "CALL " + catalogName + ".system.invalidate_metastore_cache()" | ||
conn = create_connection(hostname, username, password, catalogName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same as the previous
db_name=database | ||
) | ||
|
||
clusterName = args.clustername |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we use Camel notation or snake_case notation? we need to unify them
args = parser.parse_args() | ||
|
||
with open(args.mysql, 'r') as file: | ||
mysqlDetails = json.load(file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Camal?
4c9dc6f
to
a857c16
Compare
This PR includes 2 changes -