Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inner Join Memory Usage Shoots Up leading to Exception: CUDA ERROR #2268

Closed
VibhuJawa opened this issue Mar 10, 2019 · 7 comments
Closed

Inner Join Memory Usage Shoots Up leading to Exception: CUDA ERROR #2268

VibhuJawa opened this issue Mar 10, 2019 · 7 comments
Labels
dask Dask issue

Comments

@VibhuJawa
Copy link
Member

Inner Join Memory Usage Shoots Up Leading to Exception: CUDA ERROR. CUDA_ERROR_INVALID_VALUE

I was attempting to inner join 2 tables with 10mil+(3.6 G) rows and 500mil+ (18G) rows and am facing cuda memory errors.

I tried changing chunk_size to see if smaller partitions will help but could not get it work. Any pointers will be super helpful.

Code to recreate the error

import dask_cudf
import dask
from collections import OrderedDict
import os
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster(ip='0.0.0.0')
client = Client(cluster) 

def generate_file(output_file,rows=100,column_string = 'A,B,C,D,E,F,G,H,I,J,K\n'):
    '''
        Creates and writes csv to disk 
    '''
    with open(output_file, 'wb') as f:
        f.write(column_string.encode('utf-8'))
        f.write(b'22,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n'*rows)
        f.close()
    print("Geneated File {}".format(output_file))


def dask_cudf_load_test_csv(csv_path,chunksize,cols,dtypes):
    """ 
    Loads repl data from csv into a cudf

    Returns
    -------
    GPU DataFrame
    """
    print("Chunk Size is {}".format(chunksize))
    
    
    cwd = os.getcwd()
    new_path = 'file://'+cwd+'/'+csv_path
    
    print("Loading Table from {}".format(new_path))
    
    return dask_cudf.read_csv(new_path, names=cols, delimiter=',', dtype=list(dtypes.values()),chunksize=chunksize,skiprows=1)

rows_table_1 = 500_000_000
output_file_1 = 'test_{}.csv'.format(rows_table_1)
generate_file(output_file_1, rows = rows_table_1, column_string = 'A,B,C,D_1,E_1,F_1,G_1,H_1,I_1,J_1,K_1\n')


rows_table_2 = 10_00_000
output_file_2 = 'test_{}.csv'.format(rows_table_2)
generate_file(output_file_2, rows = rows_table_2, column_string = 'A,B,C,D_2,E_2,F_2,G_2,H_2,I_2,J_2,K_2\n')


chunk_size = "3500 MiB"


cols_1 = ['A','B','C','D_1','E_1','F_1','G_1','H_1','I_1','J_1','K_1']
    
dtypes_1 = OrderedDict([("A", "int64"),
        ("B", "int64"),
        ("C","int64"),
        ("D_1", "float64"),
        ("E_1", "float64"),
        ("F_1","float64"),
        ("G_1","float64"),
        ("H_1","float64"),
        ("I_!","float64"),
        ("J_1","int64"),
        ("K_1","int64")])



cols_2 = ['A','B','C','D_2','E_2','F_2','G_2','H_2','I_2','J_2','K_2']
    
dtypes_2 = OrderedDict([("A", "int64"),
        ("B", "int64"),
        ("C","int64"),
        ("D_2", "float64"),
        ("E_2", "float64"),
        ("F_2","float64"),
        ("G_2","float64"),
        ("H_2","float64"),
        ("I_2","float64"),
        ("J_2","int64"),
        ("K_2","int64")])


ddf_1 = dask_cudf_load_test_csv(output_file_1,chunksize = chunk_size,cols=cols_1,dtypes=dtypes_1)
ddf_2 = dask_cudf_load_test_csv(output_file_2,chunksize = chunk_size,cols=cols_2,dtypes=dtypes_2)

merged_ddf = ddf_1.merge(ddf_2,on=['A','B','C'],how='inner')

len(merged_ddf)

Error Trace

---------------------------------------------------------------------------
CudaAPIError                              Traceback (most recent call last)
<ipython-input-3-7f5e77f95606> in <module>
----> 1 len(merged_ddf)

/conda/envs/rapids/lib/python3.6/site-packages/dask/dataframe/core.py in __len__(self)
    453     def __len__(self):
    454         return self.reduction(len, np.sum, token='len', meta=int,
--> 455                               split_every=False).compute()
    456 
    457     def __bool__(self):

/conda/envs/rapids/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/conda/envs/rapids/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

/conda/envs/rapids/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2329             try:
   2330                 results = self.gather(packed, asynchronous=asynchronous,
-> 2331                                       direct=direct)
   2332             finally:
   2333                 for f in futures.values():

/conda/envs/rapids/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1653             return self.sync(self._gather, futures, errors=errors,
   1654                              direct=direct, local_worker=local_worker,
-> 1655                              asynchronous=asynchronous)
   1656 
   1657     @gen.coroutine

/conda/envs/rapids/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    673             return future
    674         else:
--> 675             return sync(self.loop, func, *args, **kwargs)
    676 
    677     def __repr__(self):

/conda/envs/rapids/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    275             e.wait(10)
    276     if error[0]:
--> 277         six.reraise(*error[0])
    278     else:
    279         return result[0]

/conda/envs/rapids/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/conda/envs/rapids/lib/python3.6/site-packages/distributed/utils.py in f()
    260             if timeout is not None:
    261                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262             result[0] = yield future
    263         except Exception as exc:
    264             error[0] = sys.exc_info()

/conda/envs/rapids/lib/python3.6/site-packages/tornado/gen.py in run(self)
    727 
    728                     try:
--> 729                         value = future.result()
    730                     except Exception:
    731                         exc_info = sys.exc_info()

/conda/envs/rapids/lib/python3.6/site-packages/tornado/gen.py in run(self)
    734                     if exc_info is not None:
    735                         try:
--> 736                             yielded = self.gen.throw(*exc_info)  # type: ignore
    737                         finally:
    738                             # Break up a reference to itself

/conda/envs/rapids/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1494                             six.reraise(type(exception),
   1495                                         exception,
-> 1496                                         traceback)
   1497                     if errors == 'skip':
   1498                         bad_keys.add(key)

/conda/envs/rapids/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

/conda/envs/rapids/lib/python3.6/site-packages/dask_cudf-0+untagged.1.ge3d3350-py3.6.egg/dask_cudf/join_impl.py in local_shuffle()
      9     """Regroup the frame based on the key column(s)
     10     """
---> 11     partitions = frame.partition_by_hash(columns=key_columns, nparts=num_new_parts)
     12     return dict(enumerate(partitions))
     13 

/conda/envs/rapids/lib/python3.6/site-packages/cudf-0+unknown-py3.6-linux-x86_64.egg/cudf/dataframe/dataframe.py in partition_by_hash()
   1899         key_indices = [names.index(k) for k in columns]
   1900         # Allocate output buffers
-> 1901         outputs = [col.copy() for col in cols]
   1902         # Call hash_partition
   1903         offsets = _gdf.hash_partition(cols, key_indices, nparts, outputs)

/conda/envs/rapids/lib/python3.6/site-packages/cudf-0+unknown-py3.6-linux-x86_64.egg/cudf/dataframe/dataframe.py in <listcomp>()
   1899         key_indices = [names.index(k) for k in columns]
   1900         # Allocate output buffers
-> 1901         outputs = [col.copy() for col in cols]
   1902         # Call hash_partition
   1903         offsets = _gdf.hash_partition(cols, key_indices, nparts, outputs)

/conda/envs/rapids/lib/python3.6/site-packages/cudf-0+unknown-py3.6-linux-x86_64.egg/cudf/dataframe/column.py in copy()
    325         """
    326         if(deep):
--> 327             deep = self.copy_data()
    328             if self.has_null_mask:
    329                 return deep.set_mask(mask=self.mask.copy(),

/conda/envs/rapids/lib/python3.6/site-packages/cudf-0+unknown-py3.6-linux-x86_64.egg/cudf/dataframe/column.py in copy_data()
    317         which is shared by the new column.
    318         """
--> 319         return self.replace(data=self.data.copy())
    320 
    321     def copy(self, deep=True):

/conda/envs/rapids/lib/python3.6/site-packages/cudf-0+unknown-py3.6-linux-x86_64.egg/cudf/dataframe/buffer.py in copy()
    182         """Deep copy the buffer
    183         """
--> 184         return Buffer(mem=cudautils.copy_array(self.mem),
    185                       size=self.size, capacity=self.capacity)
    186 

/conda/envs/rapids/lib/python3.6/site-packages/cudf-0+unknown-py3.6-linux-x86_64.egg/cudf/utils/cudautils.py in copy_array()
    125     assert out.size == arr.size
    126     if arr.is_c_contiguous() and out.is_c_contiguous():
--> 127         out.copy_to_device(arr)
    128     else:
    129         if out.size > 0:

/conda/envs/rapids/lib/python3.6/site-packages/numba/cuda/cudadrv/devices.py in _require_cuda_context()
    210     def _require_cuda_context(*args, **kws):
    211         get_context()
--> 212         return fn(*args, **kws)
    213 
    214     return _require_cuda_context

/conda/envs/rapids/lib/python3.6/site-packages/numba/cuda/cudadrv/devicearray.py in copy_to_device()
    186             sentry_contiguous(ary)
    187             check_array_compatibility(self_core, ary_core)
--> 188             _driver.device_to_device(self, ary, self.alloc_size, stream=stream)
    189         else:
    190             # Ensure same contiguity. Only makes a host-side copy if necessary

/conda/envs/rapids/lib/python3.6/site-packages/numba/cuda/cudadrv/driver.py in device_to_device()
   1865         fn = driver.cuMemcpyDtoD
   1866 
-> 1867     fn(device_pointer(dst), device_pointer(src), size, *varargs)
   1868 
   1869 

/conda/envs/rapids/lib/python3.6/site-packages/numba/cuda/cudadrv/driver.py in safe_cuda_api_call()
    288             _logger.debug('call driver api: %s', libfn.__name__)
    289             retcode = libfn(*args)
--> 290             self._check_error(fname, retcode)
    291         return safe_cuda_api_call
    292 

/conda/envs/rapids/lib/python3.6/site-packages/numba/cuda/cudadrv/driver.py in _check_error()
    323                     _logger.critical(msg, _getpid(), self.pid)
    324                     raise CudaDriverError("CUDA initialized before forking")
--> 325             raise CudaAPIError(retcode, msg)
    326 
    327     def get_device(self, devnum=0):

CudaAPIError: [1] Call to cuMemcpyDtoD results in CUDA_ERROR_INVALID_VALUE

Nvidia - SMI ouptut

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
+-----------------------------------------------------------------------------+
Sun Mar 10 03:48:18 2019
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 396.44                 Driver Version: 396.44                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P100-SXM2...  On   | 00000000:06:00.0 Off |                    0 |
| N/A   34C    P0    42W / 300W |   9580MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   1  Tesla P100-SXM2...  On   | 00000000:07:00.0 Off |                    0 |
| N/A   34C    P0    43W / 300W |   8545MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   2  Tesla P100-SXM2...  On   | 00000000:0A:00.0 Off |                    0 |
| N/A   31C    P0    40W / 300W |   8545MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   3  Tesla P100-SXM2...  On   | 00000000:0B:00.0 Off |                    0 |
| N/A   32C    P0    38W / 300W |    309MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   4  Tesla P100-SXM2...  On   | 00000000:85:00.0 Off |                    0 |
| N/A   33C    P0    41W / 300W |   8545MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   5  Tesla P100-SXM2...  On   | 00000000:86:00.0 Off |                    0 |
| N/A   31C    P0    43W / 300W |    309MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   6  Tesla P100-SXM2...  On   | 00000000:89:00.0 Off |                    0 |
| N/A   31C    P0    35W / 300W |     10MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   7  Tesla P100-SXM2...  On   | 00000000:8A:00.0 Off |                    0 |
| N/A   32C    P0    39W / 300W |   9283MiB / 16280MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
+-----------------------------------------------------------------------------+
Sun Mar 10 03:48:20 2019
+-----------------------------------------------------------------------------+


@mrocklin
Copy link
Collaborator

mrocklin commented Mar 10, 2019 via email

@VibhuJawa
Copy link
Member Author

Aah. This was a toy example for raising the error. In my real life use case , i have 2200 right rows for each left row. So its 2200x input and i still face the error.

Thanks for the random dataset join example. I will post my results with a better simulation.

@mrocklin
Copy link
Collaborator

mrocklin commented Mar 10, 2019 via email

@VibhuJawa
Copy link
Member Author

Sorry, Super new with rapids and cudf so i have a few clarifying follow ups .

  1. Are you talking about a single gpu memory or the total memory across GPU's (because it wont fit in 1 GPU).
  2. I don't understand what comfortably means. (1/2 of GPU, 3/4 of GPU ) , Any ballpark will be super helpful as i will decide infrastructure accordingly.

Thanks for being so responsive. 👍

@mrocklin
Copy link
Collaborator

Dask is just calling a bunch of cudf functions. cudf expects the inputs and outputs to fit in memory.

Dask compounds this expectation, because it may be managing may have many of these inputs and outputs floating around at any given time on a single GPU, so it's good to keep things relatively small. Something like a tenth would probably be as much as I would recommend. On the CPU at least we find that people often choose chunk sizes of something like 128MB or so on systems that have 30+GB of RAM. Things may be different on the GPU though, I don't know.

@mrocklin
Copy link
Collaborator

The counter pressure is that if you make them too small then overhead gets you. There is something like 1ms of overhead per chunk.

On the GPU there may also be concern about not saturating the full GPU. You probably know more about that than I do though.

@mike-wendt mike-wendt transferred this issue from rapidsai/dask-cudf Jul 15, 2019
@VibhuJawa
Copy link
Member Author

Closing because should be fixed once spill over is fixed.
rapidsai/dask-cuda#57

@vyasr vyasr added dask Dask issue and removed dask-cudf labels Feb 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dask Dask issue
Projects
None yet
Development

No branches or pull requests

4 participants