Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask (xgboost and lightgbm with Dask) -- likely WRONG results #49

Closed
szilard opened this issue Jan 20, 2021 · 8 comments
Closed

Dask (xgboost and lightgbm with Dask) -- likely WRONG results #49

szilard opened this issue Jan 20, 2021 · 8 comments

Comments

@szilard
Copy link
Owner

szilard commented Jan 20, 2021

UPDATE: I suspect a data leakage in Dask when lumping train and test to do a consistent label encoding and then splitting it back into train-test. Data leakage might occur because of partitions (??). That might be responsible for higher AUC. So instead of this look at this new github issue #50 with the analysis redone using integer encoding outside of Dask.


m5.4xlarge 16c (8+8HT)

1M rows

integer encoding for simplicity

@szilard
Copy link
Owner Author

szilard commented Jan 20, 2021

xgboost setup:

docker run --rm  -ti -p 8787:8787 continuumio/anaconda3 /bin/bash

pip3 install -U dask-ml xgboost 
ipython

regular XGBoost (without Dask):

import pandas as pd
import numpy as np
from sklearn import preprocessing 
from sklearn import metrics

import xgboost as xgb


d_train = pd.read_csv("https://s3.amazonaws.com/benchm-ml--main/train-1m.csv")
d_test = pd.read_csv("https://s3.amazonaws.com/benchm-ml--main/test.csv")


d_all = pd.concat([d_train,d_test])

vars_cat = ["Month","DayofMonth","DayOfWeek","UniqueCarrier", "Origin", "Dest"]
vars_num = ["DepTime","Distance"]
for col in vars_cat:
  d_all[col] = preprocessing.LabelEncoder().fit_transform(d_all[col])
  
X_all = d_all[vars_cat+vars_num].to_numpy()      
y_all = np.where(d_all["dep_delayed_15min"]=="Y",1,0)    

X_train = X_all[0:d_train.shape[0],]
y_train = y_all[0:d_train.shape[0]]
X_test = X_all[d_train.shape[0]:(d_train.shape[0]+d_test.shape[0]),]
y_test = y_all[d_train.shape[0]:(d_train.shape[0]+d_test.shape[0])]


dxgb_train = xgb.DMatrix(X_train, label = y_train)
dxgb_test = xgb.DMatrix(X_test)

param = {'max_depth':10, 'eta':0.1, 'objective':'binary:logistic', 'tree_method':'hist'}             
%time md = xgb.train(param, dxgb_train, num_boost_round = 100)

y_pred = md.predict(dxgb_test)   
print(metrics.roc_auc_score(y_test, y_pred))

Results:

Wall time: 3.29 s
0.7524071455769888

With Dask:

import pandas as pd
from sklearn import metrics

from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import dask.array as da
from dask_ml import preprocessing

import xgboost as xgb


cluster = LocalCluster(n_workers=16, threads_per_worker=1)
client = Client(cluster)

d_train = pd.read_csv("https://s3.amazonaws.com/benchm-ml--main/train-1m.csv")
d_test = pd.read_csv("https://s3.amazonaws.com/benchm-ml--main/test.csv")
d_all = pd.concat([d_train,d_test])

dx_all = dd.from_pandas(d_all, npartitions=16)

vars_cat = ["Month","DayofMonth","DayOfWeek","UniqueCarrier", "Origin", "Dest"]
vars_num = ["DepTime","Distance"]
for col in vars_cat:
  dx_all[col] = preprocessing.LabelEncoder().fit_transform(dx_all[col])
  
X_all = dx_all[vars_cat+vars_num].to_dask_array(lengths=True)      
y_all = da.where((dx_all["dep_delayed_15min"]=="Y").to_dask_array(lengths=True),1,0)  

X_train = X_all[0:d_train.shape[0],]
y_train = y_all[0:d_train.shape[0]]
X_test = X_all[d_train.shape[0]:(d_train.shape[0]+d_test.shape[0]),]
y_test = y_all[d_train.shape[0]:(d_train.shape[0]+d_test.shape[0])]

X_train.persist()
y_train.persist()

client.has_what()


dxgb_train = xgb.dask.DaskDMatrix(client, X_train, y_train)
dxgb_test = xgb.dask.DaskDMatrix(client, X_test)


param = {'objective':'binary:logistic', 'tree_method':'hist', 'max_depth':10, 'eta':0.1}             
%time md = xgb.dask.train(client, param, dxgb_train, num_boost_round = 100)


y_pred = xgb.dask.predict(client, md, dxgb_test)
y_pred_loc = y_pred.compute()
y_test_loc = y_test.compute()
print(metrics.roc_auc_score(y_test_loc, y_pred_loc))

Results:

Wall time: 20.5 s
0.7958538649110775

@szilard
Copy link
Owner Author

szilard commented Jan 20, 2021

Changing number of workers, threads, partitions:

n_workers n_threads n_partitions Time AUC
16 1 16 20.5 0.795853
1 16 16 4.66 0.796310
1 16 1 4.65 0.796310
1 1 1 20.9 0.796310
4 4 16 10.9 0.796428
no dask 16 3.29 0.752407
no dask 1 19.4 0.752407

UPDATE: I suspect a data leakage in Dask when lumping train and test to do a consistent label encoding and then splitting it back into train-test. Data leakage might occur because of partitions (??). That might be responsible for higher AUC. So instead of this look at this new github issue #50 with the analysis redone using integer encoding outside of Dask.

@szilard
Copy link
Owner Author

szilard commented Jan 20, 2021

10M rows:

n_workers n_threads n_partitions time
16 1 16 44.4
4 4 16 40.2
no dask 27.2

@szilard
Copy link
Owner Author

szilard commented Jan 20, 2021

RAM usage 1M rows:

regular XGBoost ~1GB (data+while training)

Dask 16 workers (1 thread each) ~3GB data ~5GB when training

@szilard
Copy link
Owner Author

szilard commented Jan 20, 2021

lightgbm without Dask:

pip3 install -U lightgbm
import pandas as pd
import numpy as np
from sklearn import preprocessing 
from sklearn import metrics

import lightgbm as lgb


d_train = pd.read_csv("https://s3.amazonaws.com/benchm-ml--main/train-1m.csv")
d_test = pd.read_csv("https://s3.amazonaws.com/benchm-ml--main/test.csv")


d_all = pd.concat([d_train,d_test])

vars_cat = ["Month","DayofMonth","DayOfWeek","UniqueCarrier", "Origin", "Dest"]
vars_num = ["DepTime","Distance"]
for col in vars_cat:
  d_all[col] = preprocessing.LabelEncoder().fit_transform(d_all[col])
  
X_all = d_all[vars_cat+vars_num].to_numpy()      
y_all = np.where(d_all["dep_delayed_15min"]=="Y",1,0)    

X_train = X_all[0:d_train.shape[0],]
y_train = y_all[0:d_train.shape[0]]
X_test = X_all[d_train.shape[0]:(d_train.shape[0]+d_test.shape[0]),]
y_test = y_all[d_train.shape[0]:(d_train.shape[0]+d_test.shape[0])]


md = lgb.LGBMClassifier(num_leaves=512, learning_rate=0.1, n_estimators=100)
%time md.fit(X_train, y_train)


y_pred = md.predict_proba(X_test)[:,1]
print(metrics.roc_auc_score(y_test, y_pred))

Results:

Wall time: 3.76 s
0.7635078895240786

@szilard
Copy link
Owner Author

szilard commented Jan 20, 2021

lightgbm with Dask:

so far in development, use this:

wget https://raw.githubusercontent.com/jameslamb/talks/main/recent-developments-in-lightgbm/Dockerfile
sudo docker build -t dasklgbm .
sudo docker run --rm  -p 8787:8787 dasklgbm
sudo docker ps -a
sudo docker exec -ti ... /bin/bash
pip3 install -U dask-ml 
ipython

@szilard
Copy link
Owner Author

szilard commented Jan 20, 2021

import pandas as pd
from sklearn import metrics

from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import dask.array as da
from dask_ml import preprocessing

from lightgbm.dask import DaskLGBMClassifier


cluster = LocalCluster(n_workers=16, threads_per_worker=1)
client = Client(cluster)

d_train = pd.read_csv("https://s3.amazonaws.com/benchm-ml--main/train-1m.csv")
d_test = pd.read_csv("https://s3.amazonaws.com/benchm-ml--main/test.csv")
d_all = pd.concat([d_train,d_test])

dx_all = dd.from_pandas(d_all, npartitions=16)

vars_cat = ["Month","DayofMonth","DayOfWeek","UniqueCarrier", "Origin", "Dest"]
vars_num = ["DepTime","Distance"]
for col in vars_cat:
  dx_all[col] = preprocessing.LabelEncoder().fit_transform(dx_all[col])
  
X_all = dx_all[vars_cat+vars_num].to_dask_array(lengths=True)      
y_all = da.where((dx_all["dep_delayed_15min"]=="Y").to_dask_array(lengths=True),1,0)  

X_train = X_all[0:d_train.shape[0],]
y_train = y_all[0:d_train.shape[0]]
X_test = X_all[d_train.shape[0]:(d_train.shape[0]+d_test.shape[0]),]
y_test = y_all[d_train.shape[0]:(d_train.shape[0]+d_test.shape[0])]

X_train.persist()
y_train.persist()

client.has_what()


md = DaskLGBMClassifier(num_leaves=512, learning_rate=0.1, n_estimators=100, tree_learner="data")
%time md.fit( client=client, X=X_train, y=y_train)

md_loc = md.to_local()
X_test_loc = X_test.compute()

y_pred = md_loc.predict_proba(X_test)[:,1]
print(metrics.roc_auc_score(y_test, y_pred))

Results:

Wall time: 34min 4s
0.5345739251687514

@szilard szilard changed the title Dask (xgboost and lightgbm with Dask) Dask (xgboost and lightgbm with Dask) -- likely WRONG results Jan 21, 2021
@szilard
Copy link
Owner Author

szilard commented Jan 21, 2021

UPDATE: I suspect a data leakage in Dask when lumping train and test to do a consistent label encoding and then splitting it back into train-test. Data leakage might occur because of partitions (??). That might be responsible for higher AUC. So instead of this look at this new github issue #50 with the analysis redone using integer encoding outside of Dask.

@szilard szilard closed this as completed Jan 21, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant