Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Aggregation #785

Merged
merged 38 commits into from
Jun 10, 2024
Merged

Batch Aggregation #785

merged 38 commits into from
Jun 10, 2024

Conversation

zwolf
Copy link
Member

@zwolf zwolf commented May 28, 2024

This PR contains the following:

  • Celery backbone for background job processing
  • New routes for starting a new run and checking the status of running tasks
  • BatchAggregation Celery task
  • BatchAggregation lib that handles logistics
  • Associated specs

This is ready for (re-)review. I have included some questions in the BatchAggregation spec file that could improve them if answered. This is dependent on zooniverse/panoptes#4303 which provides the API to save run data on a Panoptes resource. Looking for feedback on the whole pipeline, which looks like this:

Request sent to Panoptes --> Panoptes sends run_aggregation request --> celery job starts --> exports downloaded & processed --> extraction, reduction --> create csv files, zip them --> upload data to storage --> send request back to Panoptes containing run UUID.

Merging #783 was an accident, as I had intended to keep that branch-of-a-branch separate. I created a new batch-aggregation-staging branch to merge into that I can add a deployment template and deploy directly from for testing. cc @lcjohnso on the new PR.

@zwolf zwolf requested a review from CKrawczyk May 28, 2024 23:42
hound[bot]

This comment was marked as outdated.

hound[bot]

This comment was marked as resolved.

hound[bot]

This comment was marked as resolved.

@zwolf zwolf changed the base branch from master to batch-aggregation-staging May 29, 2024 00:48
Copy link
Collaborator

@CKrawczyk CKrawczyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all looks good. I have left some feedback and comments throughout the code. One thing is to make sure os.path.join is used for any file paths to ensure compatibility with different OS (I know that we know the server OS we are running this on, but the tests might be run on a Windows computer by someone downloading the package).

Let me know if you need more clarification on the testing questions and I can dig into the specifics a bit more. When I am struggling with tests I also try to check I have not entered into "over testing" the code. I want to make sure the main goal of the code is tested, but not necessarily the specific implementation to get that result (i.e. if I refactor the code in the future to do the same thing in a different way, the test should still pass).


for reducer in reducer_list:
# This is an override. The workflow_reducer_config method returns a config object
# that is incompatible with the batch_utils batch_reduce method
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it helps the defaults are all taken from this dictionary: https://github.com/zooniverse/aggregation-for-caesar/blob/master/panoptes_aggregation/workflow_config.py#L4-L41

It can be used as a basis for batch_standard_reducers when we want to extend beyond the initial tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used those as a starting point. The goal was to have multiple reducers run in certain cases (like question & question consensus). The batch_standard_reducers dict can be moved wherever makes sense later.

This override is because the output of workflow_config.workflow_reducer_config currently causes an error when passed to batch_utils.parse_reducer_config. Possibly a bug, or just a discrepancy between the function and yaml outputs. Since batch_utils is now used to process csvs, making changes to batch_utils felt out of scope for this PR.

}

for task_type, extract_df in extracted_data.items():
extract_df.to_csv(f'{ba.output_path}/{ba.workflow_id}_{task_type}.csv')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor point, it might be worth using os.path.join when creating file paths to keep these functions as OS independent as possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is done~~

# that is incompatible with the batch_utils batch_reduce method
reducer_config = {'reducer_config': {reducer: {}}}
reduced_data[reducer] = batch_utils.batch_reduce(extract_df, reducer_config)
filename = f'{ba.output_path}/{ba.workflow_id}_reductions.csv'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If multiple reducers are in reducer_list will this keep overwriting the same file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using mode=a in the to_csv call to append to the file (if it exists). The ask here was for a single, concatenated reductions file.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when reducers have different column names (I don't know that Pandas default behaviour)? Does it do an outer join?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just concats at the end right now, so it'll have a new header row and then the reduction rows. However, reductions all come out with subject_id,workflow_id,task,reducer,data and should just glue together, no different columns expected. This is the current behavior (as opposed to one file per reducer).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, the batch reducer does not flatten out the data column the same way reduce_panoptes_csv does. This should not be an issue here.


def upload_files(self):
self.connect_blob_storage()
reductions_file = f'{self.output_path}/{self.workflow_id}_reductions.csv'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment above about multiple reducers/reducer files.

r = http.request('GET', url, preload_content=False)
with open(filepath, 'wb') as out:
while True:
data = r.read(65536)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what does 65536 refer to in this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the chunk size (64kb, in this case) that the data is being read in at. Exports can be big and I'm avoiding reading the whole file into memory at once.

self.assertEqual(mock_reducer.call_count, 2)

# The reducer's call list includes subsequent calls to to_csv, but the args are methods called on the mock
# rather than use the set values i.e. "<MagicMock name='BatchAggregator().output_path' id='140281634764400'>"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be able to put a mock directly on the to_csv function from the pandas to see if that allows you to intercept the calls.

From https://stackoverflow.com/a/65593458/1052418

You could mock out the entire DataFrame class using mock.patch("pandas.DataFrame", ...).

or more specifically mock.patch("pandas.DataFrame.to_csv", ...)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if mocking to far up is making some tests difficult to write you can split this into multiple tests that place the mocks at different levels for checking different things.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly this was confusion about why the to_csv was in the same call stack but as a different mock. As with the other issue, seems like it was just a different mock of the same thing. With all the other spec changes, I just removed this block and I'm satisfied if you are.


# Why do these mocked methods called in __init__ not get counted as called?
# They are def getting called as the attributes are set
# mock_uuidgen.assert_called_once()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it enough to test that ba.id is not None to ensure the method has run? (i.e. test for the desired result rather than the specific way of getting that result).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably is, yeah. That'll work for UUID and I'll just remove the client check, but I must be mocking something wrong to see ba.id despite the assert failing.

# Why do these mocked methods called in __init__ not get counted as called?
# They are def getting called as the attributes are set
# mock_uuidgen.assert_called_once()
# mock_client.assert_called_once()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again you can just check mock_client.assert_called_once() to check that the method ran, no need for a check on the specific method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with this, The way I had it, mock_client.assert_called_once() fails with AssertionError: Expected 'connect' to have been called once. Called 0 times. I mocked that, but still expected that method to have been called to make that assert pass. ¯_(ツ)_/¯

# ])

# How do I test the specific instance of BatchAggregator rather than the mocked class?
# mock_aggregator.upload_files.assert_called_once()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can write a test that rather than mock the entire class just mocks the method you wish to test

@patch("panoptes_aggregation.batch_aggregation.BatchAggregator.upload_files")
@patch(...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm already mocking the whole BatchAggregator, but I think my issue is that it's mocking a class method when what I really wanna do is set a mock a method on the specific instance of BatchAggregator that the run_aggregation method is instantiating. Mocking BatchAggregator.upload_files like above also doesn't work. I put one PDB breakpoint in the method itself and one in the spec:

> /usr/src/aggregation/panoptes_aggregation/batch_aggregation.py(55)run_aggregation()
-> ba.update_panoptes()
(Pdb) ba
<MagicMock name='BatchAggregator()' id='139923834173280'>
(Pdb) ba.upload_files
<MagicMock name='BatchAggregator().upload_files' id='139923833924432'>
(Pdb) ba.upload_files()
<MagicMock name='BatchAggregator().upload_files()' id='139923833994928'>
(Pdb) continue
>> PDB continue (IO-capturing resumed)
>> PDB set_trace (IO-capturing turned off) 
/usr/src/aggregation/panoptes_aggregation/tests/batch_aggregation/test_batch_aggregation.py(45)test_run_aggregation()
-> mock_uploader.assert_called_once()
(Pdb) mock_uploader
<MagicMock name='upload_files' id='139923834625088'>
(Pdb) mock_uploader()
<MagicMock name='upload_files()' id='139923834173328'>

Maybe because the thing I'm mocking isn't actually exactly the thing I'm calling?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is likely it. I will have to put some more thought into the tests for this. Most of the code is tested, which is good enough for me. I am happy to loop back to these at a later time rather than have it block this feature.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured it out! You can use mocked_class_instance = mocked_class.return_value to get access to an "instance" of the class, then set mocks on instance methods. Updated a few tests with this syntax.

hound[bot]

This comment was marked as resolved.

@zwolf
Copy link
Member Author

zwolf commented Jun 4, 2024

@CKrawczyk Thanks for all the feedback!! I implemented everything I could and commented, lmk if the changes/new stuff are good 2 go!

I plan on merging this PR into the batch-aggregation-staging branch, then creating a PR off of that branch to add a new deploy template for a staging instance that deploys from that branch alone. I'll test everything (adding redis and volumes, celery, API, etc) without affecting the existing app.

@zwolf zwolf merged commit f48cb40 into batch-aggregation-staging Jun 10, 2024
8 checks passed
zwolf added a commit that referenced this pull request Jul 2, 2024
* Batch Aggregation (#785)

* Add Celery and a test route

* Add new dependencies

* Test task tests

* Docker updates

* Scripts folder

* Setup deploy to test env

* Link redis container via docker

* Modify test task

* Add redis service to test workflow

* Hook up services

* Fix test arguments

* flake8

* newline

* rename and refactor

* Taking a swing at extraction

* oops

* update .gitignore

* Remove deploy files

* Update .gitignore

* Clean up test tests

* Add router tests

* Extremely placeholder BA lib tests

* Only override local import

* First few batch agg specs

* Updates to BatchAggregation & tests

* less flake8y

* Add final POST message to Panoptes

* Flake

* flake

* Pull etag before atempting update

* Remove unnecessary mocks

* Assert result set, not method called

* clean up spec mocks

* Add permissions checking, fix some specs, refactor Panoptes update

* Flake

* Use os.path for platform independence

* Undeleting deploy template

* Batch aggregation staging deploy (#786)

* Add logging statements

* Update celery task namespace

* Add staging deployment template

* Clean up new resource names

* Build to a single docker image

* Rename deployment & use Panoptes staging in staging deploy

* Fix secret name

* Sringify ID in comparison to value returned from Panoptes

* Update test

* Fix mock data type

* Use client's admin mode

* Fix a couple filepaths

* Use UUID as tmpdir path

* Finish run if Panoptes is unupdateable

* When the update panoptes resource doesn't exist but the call is successful

* Use jsonify to set mimetype

* cast inputs to ints just in case

* Enable public access to new containers

* Deploy staging with action

* hound?

* test fixes

* new hound

* Use correct k8s secret

* Use tag deployment for production (#788)

* Use tag deployment for production

* Add batchagg resources to prod template
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants