Skip to content

Commit

Permalink
Bug fixes after 0.4.0 release (#421)
Browse files Browse the repository at this point in the history
* Update README.md

(cherry picked from commit 5609535)

* adding missing test case

(cherry picked from commit 016c5c9)

* Update README.md

(cherry picked from commit 149919c)

* adding finalize check and removing destructor finalize call. (#412)

* adding finalize check and removing destructor finalize call.

* adding env.finalize for dataframe examples

* finalize checks.

(cherry picked from commit 0200c02)

* Update c-cpp.yml

adding github CI again

(cherry picked from commit e0ba964)

* fixing #415 and #416

* changes to concat operation

* unwrapping sort options onto sort method

* Enabling scalars in df set_item (#425)

* fixing minor bug in select

* adding select tests

* accommodating comments

* minor improvements to #189

* enabling scalars in df set_item

* fixing boost error

(cherry picked from commit 10f5a6a)

Co-authored-by: Supun Kamburugamuve <[email protected]>
Co-authored-by: Vibhatha Lakmal Abeykoon <[email protected]>
  • Loading branch information
3 people authored May 4, 2021
1 parent 76d150e commit 1476926
Show file tree
Hide file tree
Showing 16 changed files with 452 additions and 391 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/c-cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: C/C++ CI

on:
push:
branches: [ master ]
branches: [ main ]
pull_request:
branches: [ master ]
branches: [ main ]

jobs:
build:
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ print(df3)
```

Now lets run a parallel version of this program. Here if we create n processes (parallelism), n instances of the
program will run. They will each load a two DataFrames in their memory and do a distributed join among all the DataFrames.
The results will be created in the n processes as well.
program will run. They will each load two DataFrames in their memory and do a distributed join among the DataFrames.
The results will be created in the parallel processes as well.

```python
from pycylon import DataFrame, CylonEnv
Expand Down Expand Up @@ -76,7 +76,7 @@ Refer to the documentation on how to compile Cylon

[Compiling on Linux](https://cylondata.org/docs/)

# Licence
# License

Cylon uses the Apache Lincense Version 2.0

Expand Down
2 changes: 2 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ build_cpp(){
mkdir ${BUILD_PATH}
pushd ${BUILD_PATH} || exit 1
export ARROW_HOME=${BUILD_PATH}/arrow/install
export ARROW_BOOST_URL="https://sourceforge.net/projects/boost/files/boost/1.72.0/boost_1_72_0.tar.gz/download"
cmake -DPYCYLON_BUILD=${PYTHON_BUILD} -DPYTHON_EXEC_PATH=${PYTHON_ENV_PATH} \
-DCMAKE_BUILD_TYPE=${BUILD_MODE} -DCYLON_WITH_TEST=${RUN_CPP_TESTS} $CPPLINT_CMD $INSTALL_CMD \
${CMAKE_FLAGS} \
Expand Down Expand Up @@ -266,6 +267,7 @@ build_pyarrow(){
echo "Building PyArrow"
pushd ${BUILD_PATH} || exit 1
export ARROW_HOME=${BUILD_PATH}/arrow/install
export ARROW_BOOST_URL="https://sourceforge.net/projects/boost/files/boost/1.72.0/boost_1_72_0.tar.gz/download"
popd || exit
source "${PYTHON_ENV_PATH}"/bin/activate || exit 1
read_python_requirements
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/cylon/net/mpi/mpi_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ void MPICommunicator::Init(const std::shared_ptr<CommConfig> &config) {
MPI_Comm_size(MPI_COMM_WORLD, &this->world_size);
}
void MPICommunicator::Finalize() {
MPI_Finalize();
int finalized;
MPI_Finalized(&finalized);
if (!finalized) {
MPI_Finalize();
}
}
void MPICommunicator::Barrier() {
MPI_Barrier(MPI_COMM_WORLD);
Expand Down
49 changes: 32 additions & 17 deletions python/examples/dataframe/concat.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,44 @@
import random

import pycylon as cn
from pycylon import DataFrame, CylonEnv
from pycylon.net import MPIConfig
import random

df1 = DataFrame([random.sample(range(10, 100), 50),
random.sample(range(10, 100), 50)])
df2 = DataFrame([random.sample(range(10, 100), 50),
random.sample(range(10, 100), 50)])
df3 = DataFrame([random.sample(range(10, 100), 50),
random.sample(range(10, 100), 50)])
df1 = DataFrame([random.sample(range(10, 100), 5),
random.sample(range(10, 100), 5)])
df2 = DataFrame([random.sample(range(10, 100), 5),
random.sample(range(10, 100), 5)])
df3 = DataFrame([random.sample(range(10, 100), 10),
random.sample(range(10, 100), 10)])

# local unique
df4 = df1.concat(axis=0, objs=[df2, df3])
print("Local Unique")
df4 = cn.concat(axis=0, objs=[df1, df2, df3])
print("Local concat axis0")
print(df4)

df2.rename(['00', '11'])
df3.rename(['000', '111'])
df4 = cn.concat(axis=1, objs=[df1, df2, df3])
print("Local concat axis1")
print(df4)

# distributed unique
env = CylonEnv(config=MPIConfig())

df1 = DataFrame([random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 5),
random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 5)])
df2 = DataFrame([random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 5),
random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 5)])
df3 = DataFrame([random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 10),
random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 10)])
print("Distributed concat axis0", env.rank)
df4 = cn.concat(axis=0, objs=[df1, df2, df3], env=env)
print(df4)

df1 = DataFrame([random.sample(range(10*env.rank, 15*(env.rank+1)), 5),
random.sample(range(10*env.rank, 15*(env.rank+1)), 5)])
df2 = DataFrame([random.sample(range(10*env.rank, 15*(env.rank+1)), 5),
random.sample(range(10*env.rank, 15*(env.rank+1)), 5)])
df3 = DataFrame([random.sample(range(10*env.rank, 15*(env.rank+1)), 5),
random.sample(range(10*env.rank, 15*(env.rank+1)), 5)])
print("Distributed Unique", env.rank)
df4 = df1.concat(axis=0, objs=[df2, df3], env=env)
df2.rename(['00', '11'])
df3.rename(['000', '111'])
df4 = cn.concat(axis=1, objs=[df1, df2, df3], env=env)
print("Distributed concat axis1", env.rank)
print(df4)

env.finalize()
2 changes: 2 additions & 0 deletions python/examples/dataframe/drop_duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@
print("Distributed Unique", env.rank)
df3 = df1.drop_duplicates(env=env)
print(df3)

env.finalize()
6 changes: 1 addition & 5 deletions python/examples/dataframe/groupby.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
from pycylon import DataFrame, CylonEnv
from pycylon.net import MPIConfig
import random
from pycylon import DataFrame

df1 = DataFrame([[0, 0, 1, 1], [1, 10, 1, 5], [10, 20, 30, 40]])


df3 = df1.groupby(by=0).agg({
"1": "sum",
"2": "min"
})
print(df3)


df4 = df1.groupby(by=0).min()
print(df4)

Expand Down
2 changes: 2 additions & 0 deletions python/examples/dataframe/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@
print("Distributed Join")
df3 = df1.join(other=df2, on=[0], env=env)
print(df3)

env.finalize()
2 changes: 2 additions & 0 deletions python/examples/dataframe/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@
print("Distributed Merge")
df3 = df1.merge(right=df2, on=[0], env=env)
print(df3)

env.finalize()
16 changes: 12 additions & 4 deletions python/examples/dataframe/sort.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import random

from pycylon import DataFrame, CylonEnv
from pycylon.net import MPIConfig
import random

df1 = DataFrame([random.sample(range(10, 100), 50),
random.sample(range(10, 100), 50)])


# local sort
df3 = df1.sort_values(by=[0])
print("Local Sort")
Expand All @@ -14,8 +14,16 @@
# distributed sort
env = CylonEnv(config=MPIConfig())

df1 = DataFrame([random.sample(range(10*env.rank, 15*(env.rank+1)), 5),
random.sample(range(10*env.rank, 15*(env.rank+1)), 5)])
df1 = DataFrame([random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 5),
random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 5)])
print("Distributed Sort", env.rank)
df3 = df1.sort_values(by=[0], env=env)
print(df3)

# distributed sort
print("Distributed Sort with sort options", env.rank)
bins = env.world_size * 2
df3 = df1.sort_values(by=[0], num_bins=bins, num_samples=bins, env=env)
print(df3)

env.finalize()
1 change: 1 addition & 0 deletions python/pycylon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from pycylon.frame import DataFrame
from pycylon.frame import CylonEnv
from pycylon.frame import read_csv
from pycylon.frame import concat
from pycylon.util.logging import log_level, disable_logging

import os
Expand Down
3 changes: 2 additions & 1 deletion python/pycylon/common/join_config.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ StrToJoinType = {
'inner': CJoinType.CINNER,
'left': CJoinType.CLEFT,
'right': CJoinType.CRIGHT,
'fullouter': CJoinType.COUTER
'fullouter': CJoinType.COUTER,
'outer': CJoinType.COUTER
}

cdef class JoinConfig:
Expand Down
Loading

0 comments on commit 1476926

Please sign in to comment.