-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch Aggregation #785
Batch Aggregation #785
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This all looks good. I have left some feedback and comments throughout the code. One thing is to make sure os.path.join
is used for any file paths to ensure compatibility with different OS (I know that we know the server OS we are running this on, but the tests might be run on a Windows computer by someone downloading the package).
Let me know if you need more clarification on the testing questions and I can dig into the specifics a bit more. When I am struggling with tests I also try to check I have not entered into "over testing" the code. I want to make sure the main goal of the code is tested, but not necessarily the specific implementation to get that result (i.e. if I refactor the code in the future to do the same thing in a different way, the test should still pass).
|
||
for reducer in reducer_list: | ||
# This is an override. The workflow_reducer_config method returns a config object | ||
# that is incompatible with the batch_utils batch_reduce method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it helps the defaults are all taken from this dictionary: https://github.com/zooniverse/aggregation-for-caesar/blob/master/panoptes_aggregation/workflow_config.py#L4-L41
It can be used as a basis for batch_standard_reducers
when we want to extend beyond the initial tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used those as a starting point. The goal was to have multiple reducers run in certain cases (like question & question consensus). The batch_standard_reducers
dict can be moved wherever makes sense later.
This override is because the output of workflow_config.workflow_reducer_config
currently causes an error when passed to batch_utils.parse_reducer_config
. Possibly a bug, or just a discrepancy between the function and yaml outputs. Since batch_utils is now used to process csvs, making changes to batch_utils felt out of scope for this PR.
} | ||
|
||
for task_type, extract_df in extracted_data.items(): | ||
extract_df.to_csv(f'{ba.output_path}/{ba.workflow_id}_{task_type}.csv') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor point, it might be worth using os.path.join
when creating file paths to keep these functions as OS independent as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is done~~
# that is incompatible with the batch_utils batch_reduce method | ||
reducer_config = {'reducer_config': {reducer: {}}} | ||
reduced_data[reducer] = batch_utils.batch_reduce(extract_df, reducer_config) | ||
filename = f'{ba.output_path}/{ba.workflow_id}_reductions.csv' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If multiple reducers are in reducer_list
will this keep overwriting the same file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using mode=a
in the to_csv
call to append to the file (if it exists). The ask here was for a single, concatenated reductions file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when reducers have different column names (I don't know that Pandas default behaviour)? Does it do an outer join?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just concats at the end right now, so it'll have a new header row and then the reduction rows. However, reductions all come out with subject_id,workflow_id,task,reducer,data
and should just glue together, no different columns expected. This is the current behavior (as opposed to one file per reducer).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see, the batch reducer does not flatten out the data
column the same way reduce_panoptes_csv
does. This should not be an issue here.
|
||
def upload_files(self): | ||
self.connect_blob_storage() | ||
reductions_file = f'{self.output_path}/{self.workflow_id}_reductions.csv' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the comment above about multiple reducers/reducer files.
r = http.request('GET', url, preload_content=False) | ||
with open(filepath, 'wb') as out: | ||
while True: | ||
data = r.read(65536) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, what does 65536
refer to in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the chunk size (64kb, in this case) that the data is being read in at. Exports can be big and I'm avoiding reading the whole file into memory at once.
self.assertEqual(mock_reducer.call_count, 2) | ||
|
||
# The reducer's call list includes subsequent calls to to_csv, but the args are methods called on the mock | ||
# rather than use the set values i.e. "<MagicMock name='BatchAggregator().output_path' id='140281634764400'>" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might be able to put a mock directly on the to_csv
function from the pandas to see if that allows you to intercept the calls.
From https://stackoverflow.com/a/65593458/1052418
You could mock out the entire DataFrame class using mock.patch("pandas.DataFrame", ...).
or more specifically mock.patch("pandas.DataFrame.to_csv", ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if mocking to far up is making some tests difficult to write you can split this into multiple tests that place the mocks at different levels for checking different things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly this was confusion about why the to_csv was in the same call stack but as a different mock. As with the other issue, seems like it was just a different mock of the same thing. With all the other spec changes, I just removed this block and I'm satisfied if you are.
panoptes_aggregation/tests/batch_aggregation/test_batch_aggregation.py
Outdated
Show resolved
Hide resolved
|
||
# Why do these mocked methods called in __init__ not get counted as called? | ||
# They are def getting called as the attributes are set | ||
# mock_uuidgen.assert_called_once() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it enough to test that ba.id is not None
to ensure the method has run? (i.e. test for the desired result rather than the specific way of getting that result).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It probably is, yeah. That'll work for UUID and I'll just remove the client check, but I must be mocking something wrong to see ba.id
despite the assert failing.
# Why do these mocked methods called in __init__ not get counted as called? | ||
# They are def getting called as the attributes are set | ||
# mock_uuidgen.assert_called_once() | ||
# mock_client.assert_called_once() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again you can just check mock_client.assert_called_once()
to check that the method ran, no need for a check on the specific method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same with this, The way I had it, mock_client.assert_called_once()
fails with AssertionError: Expected 'connect' to have been called once. Called 0 times
. I mocked that, but still expected that method to have been called to make that assert pass. ¯_(ツ)_/¯
# ]) | ||
|
||
# How do I test the specific instance of BatchAggregator rather than the mocked class? | ||
# mock_aggregator.upload_files.assert_called_once() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can write a test that rather than mock the entire class just mocks the method you wish to test
@patch("panoptes_aggregation.batch_aggregation.BatchAggregator.upload_files")
@patch(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm already mocking the whole BatchAggregator, but I think my issue is that it's mocking a class method when what I really wanna do is set a mock a method on the specific instance of BatchAggregator that the run_aggregation
method is instantiating. Mocking BatchAggregator.upload_files
like above also doesn't work. I put one PDB breakpoint in the method itself and one in the spec:
> /usr/src/aggregation/panoptes_aggregation/batch_aggregation.py(55)run_aggregation()
-> ba.update_panoptes()
(Pdb) ba
<MagicMock name='BatchAggregator()' id='139923834173280'>
(Pdb) ba.upload_files
<MagicMock name='BatchAggregator().upload_files' id='139923833924432'>
(Pdb) ba.upload_files()
<MagicMock name='BatchAggregator().upload_files()' id='139923833994928'>
(Pdb) continue
>> PDB continue (IO-capturing resumed)
>> PDB set_trace (IO-capturing turned off)
/usr/src/aggregation/panoptes_aggregation/tests/batch_aggregation/test_batch_aggregation.py(45)test_run_aggregation()
-> mock_uploader.assert_called_once()
(Pdb) mock_uploader
<MagicMock name='upload_files' id='139923834625088'>
(Pdb) mock_uploader()
<MagicMock name='upload_files()' id='139923834173328'>
Maybe because the thing I'm mocking isn't actually exactly the thing I'm calling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is likely it. I will have to put some more thought into the tests for this. Most of the code is tested, which is good enough for me. I am happy to loop back to these at a later time rather than have it block this feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Figured it out! You can use mocked_class_instance = mocked_class.return_value
to get access to an "instance" of the class, then set mocks on instance methods. Updated a few tests with this syntax.
@CKrawczyk Thanks for all the feedback!! I implemented everything I could and commented, lmk if the changes/new stuff are good 2 go! I plan on merging this PR into the |
* Batch Aggregation (#785) * Add Celery and a test route * Add new dependencies * Test task tests * Docker updates * Scripts folder * Setup deploy to test env * Link redis container via docker * Modify test task * Add redis service to test workflow * Hook up services * Fix test arguments * flake8 * newline * rename and refactor * Taking a swing at extraction * oops * update .gitignore * Remove deploy files * Update .gitignore * Clean up test tests * Add router tests * Extremely placeholder BA lib tests * Only override local import * First few batch agg specs * Updates to BatchAggregation & tests * less flake8y * Add final POST message to Panoptes * Flake * flake * Pull etag before atempting update * Remove unnecessary mocks * Assert result set, not method called * clean up spec mocks * Add permissions checking, fix some specs, refactor Panoptes update * Flake * Use os.path for platform independence * Undeleting deploy template * Batch aggregation staging deploy (#786) * Add logging statements * Update celery task namespace * Add staging deployment template * Clean up new resource names * Build to a single docker image * Rename deployment & use Panoptes staging in staging deploy * Fix secret name * Sringify ID in comparison to value returned from Panoptes * Update test * Fix mock data type * Use client's admin mode * Fix a couple filepaths * Use UUID as tmpdir path * Finish run if Panoptes is unupdateable * When the update panoptes resource doesn't exist but the call is successful * Use jsonify to set mimetype * cast inputs to ints just in case * Enable public access to new containers * Deploy staging with action * hound? * test fixes * new hound * Use correct k8s secret * Use tag deployment for production (#788) * Use tag deployment for production * Add batchagg resources to prod template
This PR contains the following:
This is ready for (re-)review. I have included some questions in the BatchAggregation spec file that could improve them if answered. This is dependent on zooniverse/panoptes#4303 which provides the API to save run data on a Panoptes resource. Looking for feedback on the whole pipeline, which looks like this:
Request sent to Panoptes --> Panoptes sends
run_aggregation
request --> celery job starts --> exports downloaded & processed --> extraction, reduction --> create csv files, zip them --> upload data to storage --> send request back to Panoptes containing run UUID.Merging #783 was an accident, as I had intended to keep that branch-of-a-branch separate. I created a new
batch-aggregation-staging
branch to merge into that I can add a deployment template and deploy directly from for testing. cc @lcjohnso on the new PR.