-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: Optimize dask.bag for file sizes + add dask.bag options to config #580
Comments
Hi @kalleknast, sorry you're running into this issue. Thank you for taking the time to provide a detailed suggestion on how to deal with it.
This seems like the easiest thing to do that would have the least impact on anyone else. I would be open to adding this option to [PREP]
...
npartitions = 20 A more standard .toml config (as described in #345) could have a
This workaround let you finish
Could you point me to what you are looking at for automatic optimization of Thanks again, happy to help make this easier for you. I can add the |
Dask uses the multiprocessing package (i.e. no extra dependencies). We can get the number of cups from that: In import multiprocessing and changing lines 229-230 to: npartitions = multiprocessing.cpu_count() - 1
bag = db.from_sequence(audio_files, npartitions=npartitions) Preparing 14.7 GB of WAVs to 83.2 GB of spectrograms took 11m 46s. |
I tried by setting import psutil
partition_size = int(.03*psutil.virtual_memory().available)
bag = db.from_sequence(audio_files, partition_size=partition_size) However, the computation took way longer 32m 20s instead of 11m 46s. |
Thank you @kalleknast this is helpful. I am working on understanding what's going on here a little better. Can I ask you to do a couple things so I can help you?
|
Also can I ask what operating system you are on? I'm wondering if it's Windows, which in the past is the platform where I have run into more Nothing against Windows--I would love the code to work there--I'm just trying to get to the root of the error |
System: Ubuntu 20.04.5 LTS with 62.7 GM memory Files: 107 wav files sampled at 96kHz with sizes between 13.4 and 212.6 MB (most around 130 MB) Traceback: $ vak prep train_config.toml
2022-10-14 22:59:43,745 - vak.cli.prep - INFO - Determined that purpose of config file is: train.
Will add 'csv_path' option to 'TRAIN' section.
2022-10-14 22:59:43,745 - vak.core.prep - INFO - purpose for dataset: train
2022-10-14 22:59:43,745 - vak.core.prep - INFO - will split dataset
2022-10-14 22:59:44,315 - vak.io.dataframe - INFO - making array files containing spectrograms from audio files in: data/WAV
2022-10-14 22:59:44,319 - vak.io.audio - INFO - creating array files with spectrograms
[ ] | 0% Completed | 12.69 sms
Traceback (most recent call last):
File "/home/hjalmar/callclass/bin/vak", line 8, in <module>
sys.exit(main())
File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/__main__.py", line 45, in main
cli.cli(command=args.command, config_file=args.configfile)
File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/cli/cli.py", line 30, in cli
COMMAND_FUNCTION_MAP[command](toml_path=config_file)
File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/cli/prep.py", line 132, in prep
vak_df, csv_path = core.prep(
File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/core/prep.py", line 201, in prep
vak_df = dataframe.from_files(
File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/io/dataframe.py", line 134, in from_files
spect_files = audio.to_spect(
File "/home/hjalmar/callclass/lib/python3.8/site-packages/vak/io/audio.py", line 236, in to_spect
spect_files = list(bag.map(_spect_file))
File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/bag/core.py", line 1480, in __iter__
return iter(self.compute())
File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/base.py", line 315, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/base.py", line 600, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/multiprocessing.py", line 233, in get
result = get_async(
File "/home/hjalmar/callclass/lib/python3.8/site-packages/dask/local.py", line 500, in get_async
for key, res_info, failed in queue_get(queue).result():
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending. |
That's perfect, thanks so much. Will keep doing homework and let you know what I find out in the next couple of days |
Just updating that I asked a question in the dask forum here: |
Hi again @kalleknast just checking back -- I never got a reply on the Any chance you'd be willing to share data so I could replicate and test some of the different approaches here? I don't have any audio files that are quite that size right at hand although I could probably find some -- I think some of the USV files end up being pretty big because of the high sampling rate, e.g. https://zenodo.org/record/3428024 You could contact me at nicholdav at gmail if that's easier. Thanks! |
Hi @NickleDave. I just sent you a link to a part of the data set. |
Awesome, thank you @kalleknast! I will download this weekend to verify I can access, and see if I can replicate the issue on my machine. I'm catching up on the backlog here and a bit busy at my day job right now, but hoping to get to this over the Thanksgiving holiday at the latest. I will ask you to test any fix on your machine and of course add you as a contributor if we end up merging something. |
Confirming I was able to download the files and replicate this issue, thank you @kalleknast -- I've got this on the to-do list for features/enhancements now |
Hi @kalleknast just want to let you know we didn't forget about you. @yardencsGitHub has me working on other things (#605) but this affects his group and we know it's an issue for people working with large files due to high sample rates, e.g. bat calls. So I will get to it |
Hi @NickleDave import multiprocessing replace line 229 in the original bag = db.from_sequence(audio_files) with npartitions = multiprocessing.cpu_count()
bag = db.from_sequence(audio_files, npartitions=npartitions) |
Thank you @kalleknast I will test this. Should have a chance this weekend. That way we can get things working for you sooner. |
@kalleknast just icymi I did release a version 0.8.0 today that includes this You should be able to |
Worked perfectly! Thanks |
Awesome, glad to hear it |
Problem
I'm prepping a big dataset (15 GB of WAV). When computing the spectrograms I run out of memory with the error:
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Suggestion
This can be fixed by limiting the number of partitions used by
dask.bag
to 20. I.e. changing line 229 inaudio.py
frombag = db.from_sequence(audio_files)
tobag = db.from_sequence(audio_files, npartitions=20)
fixes the problem (settingpartition_size
would probably also work). The best would be to automatically figure out the number of workers and memory available and setnpartitions
/partition_size
accordingly. Unfortunately, I cannot see a way to do that without additional dependencies.An alternative would be to add the number of workers to the
[PREP]
section inconfig.toml
.The text was updated successfully, but these errors were encountered: