-
Notifications
You must be signed in to change notification settings - Fork 920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Inner Join Memory Usage Shoots Up leading to Exception: CUDA ERROR #2268
Comments
It looks like your data is very uniform with the same value repeated for
many rows on each side. My guess is that merging two such tables will
result in a quadratic explosion of data. For example if you had two
modestly sized chunks of 1e6 rows each where each row had the same value on
the join column then the resulting chunk would have 1e12 rows, which would
likely blow up memory all on its own.
To geneate a random dataset for joins you may be interested in this
notebook: https://gist.github.com/mrocklin/6e2c33c33b32bc324e3965212f202f66
…On Sat, Mar 9, 2019 at 7:58 PM Vibhu Jawa ***@***.***> wrote:
Inner Join Memory Usage Shoots Up Leading to Exception: CUDA ERROR.
CUDA_ERROR_INVALID_VALUE
I was attempting to inner join 2 tables with 10mil+(3.6 G) rows and
500mil+ (18G) rows and am facing cuda memory errors.
I tried changing chunk_size to see if smaller partitions will help but
could not get it work. Any pointers will be super helpful.
Code to recreate the error
import dask_cudf
import dask
from collections import OrderedDict
import os
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster(ip='0.0.0.0')
client = Client(cluster)
def generate_file(output_file,rows=100,column_string = 'A,B,C,D,E,F,G,H,I,J,K\n'):
'''
Creates and writes csv to disk
'''
with open(output_file, 'wb') as f:
f.write(column_string.encode('utf-8'))
f.write(b'22,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n'*rows)
f.close()
print("Geneated File {}".format(output_file))
def dask_cudf_load_test_csv(csv_path,chunksize,cols,dtypes):
"""
Loads repl data from csv into a cudf
Returns
-------
GPU DataFrame
"""
print("Chunk Size is {}".format(chunksize))
cwd = os.getcwd()
new_path = 'file://'+cwd+'/'+csv_path
print("Loading Table from {}".format(new_path))
return dask_cudf.read_csv(new_path, names=cols, delimiter=',', dtype=list(dtypes.values()),chunksize=chunksize,skiprows=1)
rows_table_1 = 500_000_000
output_file_1 = 'test_{}.csv'.format(rows_table_1)
generate_file(output_file_1, rows = rows_table_1, column_string = 'A,B,C,D_1,E_1,F_1,G_1,H_1,I_1,J_1,K_1\n')
rows_table_2 = 10_00_000
output_file_2 = 'test_{}.csv'.format(rows_table_2)
generate_file(output_file_2, rows = rows_table_2, column_string = 'A,B,C,D_2,E_2,F_2,G_2,H_2,I_2,J_2,K_2\n')
chunk_size = "3500 MiB"
cols_1 = ['A','B','C','D_1','E_1','F_1','G_1','H_1','I_1','J_1','K_1']
dtypes_1 = OrderedDict([("A", "int64"),
("B", "int64"),
("C","int64"),
("D_1", "float64"),
("E_1", "float64"),
("F_1","float64"),
("G_1","float64"),
("H_1","float64"),
("I_!","float64"),
("J_1","int64"),
("K_1","int64")])
cols_2 = ['A','B','C','D_2','E_2','F_2','G_2','H_2','I_2','J_2','K_2']
dtypes_2 = OrderedDict([("A", "int64"),
("B", "int64"),
("C","int64"),
("D_2", "float64"),
("E_2", "float64"),
("F_2","float64"),
("G_2","float64"),
("H_2","float64"),
("I_2","float64"),
("J_2","int64"),
("K_2","int64")])
ddf_1 = dask_cudf_load_test_csv(output_file_1,chunksize = chunk_size,cols=cols_1,dtypes=dtypes_1)
ddf_2 = dask_cudf_load_test_csv(output_file_2,chunksize = chunk_size,cols=cols_2,dtypes=dtypes_2)
merged_ddf = ddf_1.merge(ddf_2,on=['A','B','C'],how='inner')
len(merged_ddf)
Error Trace
---------------------------------------------------------------------------
CudaAPIError Traceback (most recent call last)
<ipython-input-3-7f5e77f95606> in <module>
----> 1 len(merged_ddf)
/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in __len__(self)
453 def __len__(self):
454 return self.reduction(len, np.sum, token='len', meta=int,
--> 455 split_every=False).compute()
456
457 def __bool__(self):
/conda/envs/rapids/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
/conda/envs/rapids/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
/conda/envs/rapids/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2329 try:
2330 results = self.gather(packed, asynchronous=asynchronous,
-> 2331 direct=direct)
2332 finally:
2333 for f in futures.values():
/conda/envs/rapids/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
1653 return self.sync(self._gather, futures, errors=errors,
1654 direct=direct, local_worker=local_worker,
-> 1655 asynchronous=asynchronous)
1656
1657 @gen.coroutine
/conda/envs/rapids/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
673 return future
674 else:
--> 675 return sync(self.loop, func, *args, **kwargs)
676
677 def __repr__(self):
/conda/envs/rapids/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
275 e.wait(10)
276 if error[0]:
--> 277 six.reraise(*error[0])
278 else:
279 return result[0]
/conda/envs/rapids/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
/conda/envs/rapids/lib/python3.6/site-packages/distributed/utils.py in f()
260 if timeout is not None:
261 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262 result[0] = yield future
263 except Exception as exc:
264 error[0] = sys.exc_info()
/conda/envs/rapids/lib/python3.6/site-packages/tornado/gen.py in run(self)
727
728 try:
--> 729 value = future.result()
730 except Exception:
731 exc_info = sys.exc_info()
/conda/envs/rapids/lib/python3.6/site-packages/tornado/gen.py in run(self)
734 if exc_info is not None:
735 try:
--> 736 yielded = self.gen.throw(*exc_info) # type: ignore
737 finally:
738 # Break up a reference to itself
/conda/envs/rapids/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1494 six.reraise(type(exception),
1495 exception,
-> 1496 traceback)
1497 if errors == 'skip':
1498 bad_keys.add(key)
/conda/envs/rapids/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
690 value = tp()
691 if value.__traceback__ is not tb:
--> 692 raise value.with_traceback(tb)
693 raise value
694 finally:
/conda/envs/rapids/lib/python3.6/site-packages/dask_cudf-0+untagged.1.ge3d3350-py3.6.egg/dask_cudf/join_impl.py in local_shuffle()
9 """Regroup the frame based on the key column(s)
10 """
---> 11 partitions = frame.partition_by_hash(columns=key_columns, nparts=num_new_parts)
12 return dict(enumerate(partitions))
13
/conda/envs/rapids/lib/python3.6/site-packages/cudf-0+unknown-py3.6-linux-x86_64.egg/cudf/dataframe/dataframe.py in partition_by_hash()
1899 key_indices = [names.index(k) for k in columns]
1900 # Allocate output buffers
-> 1901 outputs = [col.copy() for col in cols]
1902 # Call hash_partition
1903 offsets = _gdf.hash_partition(cols, key_indices, nparts, outputs)
/conda/envs/rapids/lib/python3.6/site-packages/cudf-0+unknown-py3.6-linux-x86_64.egg/cudf/dataframe/dataframe.py in <listcomp>()
1899 key_indices = [names.index(k) for k in columns]
1900 # Allocate output buffers
-> 1901 outputs = [col.copy() for col in cols]
1902 # Call hash_partition
1903 offsets = _gdf.hash_partition(cols, key_indices, nparts, outputs)
/conda/envs/rapids/lib/python3.6/site-packages/cudf-0+unknown-py3.6-linux-x86_64.egg/cudf/dataframe/column.py in copy()
325 """
326 if(deep):
--> 327 deep = self.copy_data()
328 if self.has_null_mask:
329 return deep.set_mask(mask=self.mask.copy(),
/conda/envs/rapids/lib/python3.6/site-packages/cudf-0+unknown-py3.6-linux-x86_64.egg/cudf/dataframe/column.py in copy_data()
317 which is shared by the new column.
318 """
--> 319 return self.replace(data=self.data.copy())
320
321 def copy(self, deep=True):
/conda/envs/rapids/lib/python3.6/site-packages/cudf-0+unknown-py3.6-linux-x86_64.egg/cudf/dataframe/buffer.py in copy()
182 """Deep copy the buffer
183 """
--> 184 return Buffer(mem=cudautils.copy_array(self.mem),
185 size=self.size, capacity=self.capacity)
186
/conda/envs/rapids/lib/python3.6/site-packages/cudf-0+unknown-py3.6-linux-x86_64.egg/cudf/utils/cudautils.py in copy_array()
125 assert out.size == arr.size
126 if arr.is_c_contiguous() and out.is_c_contiguous():
--> 127 out.copy_to_device(arr)
128 else:
129 if out.size > 0:
/conda/envs/rapids/lib/python3.6/site-packages/numba/cuda/cudadrv/devices.py in _require_cuda_context()
210 def _require_cuda_context(*args, **kws):
211 get_context()
--> 212 return fn(*args, **kws)
213
214 return _require_cuda_context
/conda/envs/rapids/lib/python3.6/site-packages/numba/cuda/cudadrv/devicearray.py in copy_to_device()
186 sentry_contiguous(ary)
187 check_array_compatibility(self_core, ary_core)
--> 188 _driver.device_to_device(self, ary, self.alloc_size, stream=stream)
189 else:
190 # Ensure same contiguity. Only makes a host-side copy if necessary
/conda/envs/rapids/lib/python3.6/site-packages/numba/cuda/cudadrv/driver.py in device_to_device()
1865 fn = driver.cuMemcpyDtoD
1866
-> 1867 fn(device_pointer(dst), device_pointer(src), size, *varargs)
1868
1869
/conda/envs/rapids/lib/python3.6/site-packages/numba/cuda/cudadrv/driver.py in safe_cuda_api_call()
288 _logger.debug('call driver api: %s', libfn.__name__)
289 retcode = libfn(*args)
--> 290 self._check_error(fname, retcode)
291 return safe_cuda_api_call
292
/conda/envs/rapids/lib/python3.6/site-packages/numba/cuda/cudadrv/driver.py in _check_error()
323 _logger.critical(msg, _getpid(), self.pid)
324 raise CudaDriverError("CUDA initialized before forking")
--> 325 raise CudaAPIError(retcode, msg)
326
327 def get_device(self, devnum=0):
CudaAPIError: [1] Call to cuMemcpyDtoD results in CUDA_ERROR_INVALID_VALUE
Nvidia - SMI ouptut
+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|=============================================================================|
+-----------------------------------------------------------------------------+
Sun Mar 10 03:48:18 2019
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 396.44 Driver Version: 396.44 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
|===============================+======================+======================|
| 0 Tesla P100-SXM2... On | 00000000:06:00.0 Off | 0 |
| N/A 34C P0 42W / 300W | 9580MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 1 Tesla P100-SXM2... On | 00000000:07:00.0 Off | 0 |
| N/A 34C P0 43W / 300W | 8545MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 2 Tesla P100-SXM2... On | 00000000:0A:00.0 Off | 0 |
| N/A 31C P0 40W / 300W | 8545MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 3 Tesla P100-SXM2... On | 00000000:0B:00.0 Off | 0 |
| N/A 32C P0 38W / 300W | 309MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 4 Tesla P100-SXM2... On | 00000000:85:00.0 Off | 0 |
| N/A 33C P0 41W / 300W | 8545MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 5 Tesla P100-SXM2... On | 00000000:86:00.0 Off | 0 |
| N/A 31C P0 43W / 300W | 309MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 6 Tesla P100-SXM2... On | 00000000:89:00.0 Off | 0 |
| N/A 31C P0 35W / 300W | 10MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 7 Tesla P100-SXM2... On | 00000000:8A:00.0 Off | 0 |
| N/A 32C P0 39W / 300W | 9283MiB / 16280MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|=============================================================================|
+-----------------------------------------------------------------------------+
Sun Mar 10 03:48:20 2019
+-----------------------------------------------------------------------------+
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<https://github.com/rapidsai/dask-cudf/issues/147>, or mute the thread
<https://github.com/notifications/unsubscribe-auth/AASszCqrcuoFJ3oZ-kZjo4B01Q3Akcr0ks5vVILVgaJpZM4bnDML>
.
|
Aah. This was a toy example for raising the error. In my real life use case , i have 2200 right rows for each left row. So its 2200x input and i still face the error. Thanks for the random dataset join example. I will post my results with a better simulation. |
The output of the cudf.merge operation will need to fit comfortably in GPU
memory (there may be a few there at once). Is this likely the case for
your computation? If not, then that would explain things.
…On Sat, Mar 9, 2019 at 8:17 PM Vibhu Jawa ***@***.***> wrote:
Aah. This was a toy example for raising the error. In my real life use
case , i have 2200 right rows for each left row. So its 2200x input and i
still face the error.
Thanks for the random dataset join example. I will post my results with a
better simulation.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<https://github.com/rapidsai/dask-cudf/issues/147#issuecomment-471245650>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AASszAwynUzeMBP_7bt9FhXTTi6wUKZtks5vVIdBgaJpZM4bnDML>
.
|
Sorry, Super new with rapids and cudf so i have a few clarifying follow ups .
Thanks for being so responsive. 👍 |
Dask is just calling a bunch of cudf functions. cudf expects the inputs and outputs to fit in memory. Dask compounds this expectation, because it may be managing may have many of these inputs and outputs floating around at any given time on a single GPU, so it's good to keep things relatively small. Something like a tenth would probably be as much as I would recommend. On the CPU at least we find that people often choose chunk sizes of something like 128MB or so on systems that have 30+GB of RAM. Things may be different on the GPU though, I don't know. |
The counter pressure is that if you make them too small then overhead gets you. There is something like 1ms of overhead per chunk. On the GPU there may also be concern about not saturating the full GPU. You probably know more about that than I do though. |
Closing because should be fixed once spill over is fixed. |
Inner Join Memory Usage Shoots Up Leading to Exception: CUDA ERROR. CUDA_ERROR_INVALID_VALUE
I was attempting to inner join 2 tables with 10mil+(3.6 G) rows and 500mil+ (18G) rows and am facing cuda memory errors.
I tried changing chunk_size to see if smaller partitions will help but could not get it work. Any pointers will be super helpful.
Code to recreate the error
Error Trace
Nvidia - SMI ouptut
The text was updated successfully, but these errors were encountered: