-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
aerobulk-python performance #28
Comments
See #27 (comment) for some work on the 'convergence' aspect. |
As suggested in xgcm/aerobulk-python#32 (comment) I checked the scaling of execution time with domain size: import numpy as np
import matplotlib.pyplot as plt
multipliers = [1, 5, 10, 25, 100, 1000, 5000]
times_noskin = []
times_skin = []
elements = []
for m in multipliers:
print(m)
shape = (m, m, 1)
noskinargs = create_data(shape, chunks=None)
tic = time.time()
noskin(*noskinargs)
toc = time.time() - tic
times_noskin.append(toc)
skinargs = create_data(shape, chunks=None, skin_correction=True)
tic = time.time()
skin(*skinargs)
toc = time.time() - tic
times_skin.append(toc)
elements.append(m**2)
plt.plot(elements, times_noskin, label='noskin')
plt.plot(elements, times_skin, label='skin')
plt.xlabel('elements in array')
plt.ylabel('execution time [s]')
plt.legend() This seems like it scales very linearly indeed. |
I have further investigated how aerobulk python scales when parallelizing with dask: To test this I wanted to see how it behaves under 'soft scaling'. This means that we increase the problem size (here timesteps) of the problem proportionally to the number of workers, and under ideal circumstances the execution time stays exactly the same. I tried this: results = {}
cores = range(1,17, 2)#[1, 7, 15]#range(2) # We got 16 cores on the 'huge' option
for n in [10, 100]:#[10, 50, 100]:
print('setup')
cluster = LocalCluster(n_workers=0, threads_per_worker=1)
client = Client(cluster)
nx = ny = n
times = []
for i in cores:
print(i)
shape = (nx, ny, i)
args = create_data(shape, chunks={'dim_2':1})
# set up dask
cluster.scale(i)
client.wait_for_workers(i)
# multipe passes to get rid of random variability
single_time = []
for p in range(1,8):
print(f'executing calculation Pass:{p}')
tic = time.time()
out_args = noskin(*args)
# compute outputs
out_args[0].load() # this should be enough to call the computation?
toc = time.time()-tic
single_time.append(toc)
times.append(np.median(single_time))
results[n] = times
# teardown dask
print('teardown')
client.close()
cluster.close() and for the results I am getting: for n, times in results.items():
normalized_times = [t/times[0] for t in times]
plt.plot(cores, normalized_times, label=n)
plt.legend() These results are consistent across different horizontal sizes of problems (e.g. here 10x10 and 100x100), and they are normalized to the runtime with 1 core. I do not have much experience with these benchmarks, but this seems pretty bad to me? It means that for a 16x increase in workers we only get a 5.3x (16/3) speedup? @rabernat what do you think about this test setup and the results? |
I wonder how much influence the gigantic flood of print messages has on this? These sort of messages are printed for every chunk. |
This looks pretty similar to ocean-eddy-cpt/gcm-filters#45 (comment) Over there, we diagnosed the culprit in poor scaling as related to memory allocation. However, that should really not be an issue with the Fortran. Here's one idea. Since we ended up not trying to implement error handling, you should be able to add the |
Two more thoughts after discussing this today with @paigem and @cmdupuis3
|
I took a stab today at using Some baseline sanity checks that I would expect (thanks @TomNicholas for the quick chat on these):
Snakeviz on the cloud did not workI was able to install on Pangeo Cloud via Snakeviz locallySee the notebook in this PR. I used the
CM2.6-sized input array:Very small
|
I just repeated (part of the scaling test) with a gateway cluster (installing this on the fly is extremely slow): It seems that problem persists unfortunately. I will give adding the 'threadsafe' a shot now. |
As a baseline, you should look at the scaling of some other built in operation, like just taking the mean or something. These results might not be as bad as you think. Also, use a larger problem size. |
Good suggestions. Ill try that. In the meantime the 'threadsafe' addition shows some promising results when running on the threaded scheduler: xgcm/aerobulk-python#34 (comment) |
Still thinking about this: In order to compare this in a fair manner, I would have to do some operation that pulls together the same variables, does something with them and puts them out, right? Something like: def dummy(sst, t_zt, hum_zt, u_zu, v_zu, slp):
return sst+t_zt+hum_zt+u_zu+v_zu+slp or should i take the mean of each variable separately? |
Ok so I have been really frustrated with my attempts to test the scaling with dask gateway. This is what I am trying: from typing import Dict, Tuple
import time
import numpy as np
import pytest
import xarray as xr
from aerobulk import noskin, skin
"""Tests for the xarray wrapper"""
def create_data(
shape: Tuple[int, ...],
chunks: Dict[str, int] = {},
skin_correction: bool = False,
order: str = "F",
):
def _arr(value, chunks, order):
arr = xr.DataArray(np.full(shape, value, order=order))
# adds random noise scaled by a percentage of the value
randomize_factor = 0.001
randomize_range = value * randomize_factor
arr = arr + np.random.rand(*shape) + randomize_range
if chunks:
arr = arr.chunk(chunks)
return arr
sst = _arr(290.0, chunks, order)
t_zt = _arr(280.0, chunks, order)
hum_zt = _arr(0.001, chunks, order)
u_zu = _arr(1.0, chunks, order)
v_zu = _arr(-1.0, chunks, order)
slp = _arr(101000.0, chunks, order)
rad_sw = _arr(0.000001, chunks, order)
rad_lw = _arr(350, chunks, order)
if skin_correction:
return sst, t_zt, hum_zt, u_zu, v_zu, rad_sw, rad_lw, slp
else:
return sst, t_zt, hum_zt, u_zu, v_zu, slp
from dask_gateway import Gateway
gateway = Gateway()
options = gateway.cluster_options()
options.worker_cores = 1
options.worker_memory = 8
# stop all existing clusters
print(gateway.list_clusters())
for c in gateway.list_clusters():
cluster = gateway.connect(c.name)
cluster.shutdown()
def dummy_wrapper(a, b, c, d,e,f):
return (a+b+c+d+e+f).mean(['dim_0', 'dim_1'])
results = {}
results_dummy = {}
cores = [1, 2, 4, 8]#, 8, 16 4, 8, We can get up to 30? workers on dask gateway?
for n in [1500]:#[10, 50, 100, 100, , 1000]:
nx = ny = n
results[n] = []
results_dummy[n] = []
for i in cores:
print(f'set up gateway from scratch for {i} workers')
cluster = gateway.new_cluster(options, shutdown_on_close=True)
client = cluster.get_client()
print('https://us-central1-b.gcp.pangeo.io/'+client.dashboard_link)
plugin = MambaPlugin(['aerobulk-python'])
client.register_worker_plugin(plugin)
cluster.scale(i)
client.wait_for_workers(i)
print(f"Start calculation for {i} workers")
shape = (nx, ny, i*3)
args = create_data(shape, chunks={'dim_2':1}) # this would be the time dimension that is chunked.
# set up dask
# multipe passes to get rid of random variability
single_time = []
single_dummy_time = []
for p in range(5):
print(f'executing calculation Pass:{p}')
tic = time.time()
out_args = noskin(*args)
# compute outputs
out_args[0].load() # this should be enough to call the computation?
toc = time.time()-tic
single_time.append(toc)
# reference experiment
tic = time.time()
dummy = dummy_wrapper(*args)
dummy.load()
toc = time.time()-tic
single_dummy_time.append(toc)
results[n].append(np.median(single_time))
results_dummy[n].append(np.median(single_dummy_time))
cluster.shutdown()
client.close() At some point I always get this sort of error:
As far as I can tell it is not deterministic (sometimes happens at 4 cores, sometimes at e.g. 8). Is there anything stupid I am doing in my setup? |
I am planning to make this a longstanding issue to evaluate and discuss the performance of our package.
I have investigated some aspects that came up in the past, and will summarize them below. The full workflow can be found in this notebook.
How does the computation time scales with the number of time iterations
From this plot I draw two relevant conclusions for our work:
How the order of dimensions (e.g. longest to shortest or vice versa) influences the exection time of aerobulk python
For this one I looped through a buch of different array shapes (all with a total of 1e6 elements) and then plotted the execution time:
The results honestly confuse me a bit. I would have expected the arrays with the smallest number at the end
(...,...,1)
to clearly outperform the others, but that does not seem the case. In fact is seems that in the big picture it matters more whether an array is elongated rather than square. Maybe @rabernat has some more insight here.How does the computation time scale with
niter
For this I simply timed the computation for the same array shape but with increasing values for
niter
So from my POV, we should focus on finding a minimal value for
niter
that still has 'converged' (#27) for now, since that seems to have the largest impact on practical performance?I will work on that aspect next.
The text was updated successfully, but these errors were encountered: