diff --git a/.github/workflows/c-cpp.yml b/.github/workflows/c-cpp.yml index 4c6514c42..ee54afb6f 100644 --- a/.github/workflows/c-cpp.yml +++ b/.github/workflows/c-cpp.yml @@ -2,9 +2,9 @@ name: C/C++ CI on: push: - branches: [ master ] + branches: [ main ] pull_request: - branches: [ master ] + branches: [ main ] jobs: build: diff --git a/README.md b/README.md index 18001d848..0580b1e5b 100644 --- a/README.md +++ b/README.md @@ -43,8 +43,8 @@ print(df3) ``` Now lets run a parallel version of this program. Here if we create n processes (parallelism), n instances of the -program will run. They will each load a two DataFrames in their memory and do a distributed join among all the DataFrames. -The results will be created in the n processes as well. +program will run. They will each load two DataFrames in their memory and do a distributed join among the DataFrames. +The results will be created in the parallel processes as well. ```python from pycylon import DataFrame, CylonEnv @@ -76,7 +76,7 @@ Refer to the documentation on how to compile Cylon [Compiling on Linux](https://cylondata.org/docs/) -# Licence +# License Cylon uses the Apache Lincense Version 2.0 diff --git a/build.sh b/build.sh index a5a1c96ff..c46dc3b88 100755 --- a/build.sh +++ b/build.sh @@ -175,6 +175,7 @@ build_cpp(){ mkdir ${BUILD_PATH} pushd ${BUILD_PATH} || exit 1 export ARROW_HOME=${BUILD_PATH}/arrow/install + export ARROW_BOOST_URL="https://sourceforge.net/projects/boost/files/boost/1.72.0/boost_1_72_0.tar.gz/download" cmake -DPYCYLON_BUILD=${PYTHON_BUILD} -DPYTHON_EXEC_PATH=${PYTHON_ENV_PATH} \ -DCMAKE_BUILD_TYPE=${BUILD_MODE} -DCYLON_WITH_TEST=${RUN_CPP_TESTS} $CPPLINT_CMD $INSTALL_CMD \ ${CMAKE_FLAGS} \ @@ -266,6 +267,7 @@ build_pyarrow(){ echo "Building PyArrow" pushd ${BUILD_PATH} || exit 1 export ARROW_HOME=${BUILD_PATH}/arrow/install + export ARROW_BOOST_URL="https://sourceforge.net/projects/boost/files/boost/1.72.0/boost_1_72_0.tar.gz/download" popd || exit source "${PYTHON_ENV_PATH}"/bin/activate || exit 1 read_python_requirements diff --git a/cpp/src/cylon/net/mpi/mpi_communicator.cpp b/cpp/src/cylon/net/mpi/mpi_communicator.cpp index b93d112dd..0a2e697cd 100644 --- a/cpp/src/cylon/net/mpi/mpi_communicator.cpp +++ b/cpp/src/cylon/net/mpi/mpi_communicator.cpp @@ -59,7 +59,11 @@ void MPICommunicator::Init(const std::shared_ptr &config) { MPI_Comm_size(MPI_COMM_WORLD, &this->world_size); } void MPICommunicator::Finalize() { - MPI_Finalize(); + int finalized; + MPI_Finalized(&finalized); + if (!finalized) { + MPI_Finalize(); + } } void MPICommunicator::Barrier() { MPI_Barrier(MPI_COMM_WORLD); diff --git a/python/examples/dataframe/concat.py b/python/examples/dataframe/concat.py index 93a03c259..6aba627f2 100644 --- a/python/examples/dataframe/concat.py +++ b/python/examples/dataframe/concat.py @@ -1,29 +1,44 @@ +import random + +import pycylon as cn from pycylon import DataFrame, CylonEnv from pycylon.net import MPIConfig -import random -df1 = DataFrame([random.sample(range(10, 100), 50), - random.sample(range(10, 100), 50)]) -df2 = DataFrame([random.sample(range(10, 100), 50), - random.sample(range(10, 100), 50)]) -df3 = DataFrame([random.sample(range(10, 100), 50), - random.sample(range(10, 100), 50)]) +df1 = DataFrame([random.sample(range(10, 100), 5), + random.sample(range(10, 100), 5)]) +df2 = DataFrame([random.sample(range(10, 100), 5), + random.sample(range(10, 100), 5)]) +df3 = DataFrame([random.sample(range(10, 100), 10), + random.sample(range(10, 100), 10)]) # local unique -df4 = df1.concat(axis=0, objs=[df2, df3]) -print("Local Unique") +df4 = cn.concat(axis=0, objs=[df1, df2, df3]) +print("Local concat axis0") +print(df4) + +df2.rename(['00', '11']) +df3.rename(['000', '111']) +df4 = cn.concat(axis=1, objs=[df1, df2, df3]) +print("Local concat axis1") print(df4) # distributed unique env = CylonEnv(config=MPIConfig()) +df1 = DataFrame([random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 5), + random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 5)]) +df2 = DataFrame([random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 5), + random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 5)]) +df3 = DataFrame([random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 10), + random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 10)]) +print("Distributed concat axis0", env.rank) +df4 = cn.concat(axis=0, objs=[df1, df2, df3], env=env) +print(df4) -df1 = DataFrame([random.sample(range(10*env.rank, 15*(env.rank+1)), 5), - random.sample(range(10*env.rank, 15*(env.rank+1)), 5)]) -df2 = DataFrame([random.sample(range(10*env.rank, 15*(env.rank+1)), 5), - random.sample(range(10*env.rank, 15*(env.rank+1)), 5)]) -df3 = DataFrame([random.sample(range(10*env.rank, 15*(env.rank+1)), 5), - random.sample(range(10*env.rank, 15*(env.rank+1)), 5)]) -print("Distributed Unique", env.rank) -df4 = df1.concat(axis=0, objs=[df2, df3], env=env) +df2.rename(['00', '11']) +df3.rename(['000', '111']) +df4 = cn.concat(axis=1, objs=[df1, df2, df3], env=env) +print("Distributed concat axis1", env.rank) print(df4) + +env.finalize() diff --git a/python/examples/dataframe/drop_duplicates.py b/python/examples/dataframe/drop_duplicates.py index 84f3191cf..54deb0d65 100644 --- a/python/examples/dataframe/drop_duplicates.py +++ b/python/examples/dataframe/drop_duplicates.py @@ -18,3 +18,5 @@ print("Distributed Unique", env.rank) df3 = df1.drop_duplicates(env=env) print(df3) + +env.finalize() \ No newline at end of file diff --git a/python/examples/dataframe/groupby.py b/python/examples/dataframe/groupby.py index d0e11f62b..daa56f0b4 100644 --- a/python/examples/dataframe/groupby.py +++ b/python/examples/dataframe/groupby.py @@ -1,17 +1,13 @@ -from pycylon import DataFrame, CylonEnv -from pycylon.net import MPIConfig -import random +from pycylon import DataFrame df1 = DataFrame([[0, 0, 1, 1], [1, 10, 1, 5], [10, 20, 30, 40]]) - df3 = df1.groupby(by=0).agg({ "1": "sum", "2": "min" }) print(df3) - df4 = df1.groupby(by=0).min() print(df4) diff --git a/python/examples/dataframe/join.py b/python/examples/dataframe/join.py index 3eb94b842..8e04ae62f 100644 --- a/python/examples/dataframe/join.py +++ b/python/examples/dataframe/join.py @@ -25,3 +25,5 @@ print("Distributed Join") df3 = df1.join(other=df2, on=[0], env=env) print(df3) + +env.finalize() diff --git a/python/examples/dataframe/merge.py b/python/examples/dataframe/merge.py index 2f2159b63..0f23f352c 100644 --- a/python/examples/dataframe/merge.py +++ b/python/examples/dataframe/merge.py @@ -22,3 +22,5 @@ print("Distributed Merge") df3 = df1.merge(right=df2, on=[0], env=env) print(df3) + +env.finalize() diff --git a/python/examples/dataframe/sort.py b/python/examples/dataframe/sort.py index 58e25ac7e..dca8878db 100644 --- a/python/examples/dataframe/sort.py +++ b/python/examples/dataframe/sort.py @@ -1,11 +1,11 @@ +import random + from pycylon import DataFrame, CylonEnv from pycylon.net import MPIConfig -import random df1 = DataFrame([random.sample(range(10, 100), 50), random.sample(range(10, 100), 50)]) - # local sort df3 = df1.sort_values(by=[0]) print("Local Sort") @@ -14,8 +14,16 @@ # distributed sort env = CylonEnv(config=MPIConfig()) -df1 = DataFrame([random.sample(range(10*env.rank, 15*(env.rank+1)), 5), - random.sample(range(10*env.rank, 15*(env.rank+1)), 5)]) +df1 = DataFrame([random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 5), + random.sample(range(10 * env.rank, 15 * (env.rank + 1)), 5)]) print("Distributed Sort", env.rank) df3 = df1.sort_values(by=[0], env=env) print(df3) + +# distributed sort +print("Distributed Sort with sort options", env.rank) +bins = env.world_size * 2 +df3 = df1.sort_values(by=[0], num_bins=bins, num_samples=bins, env=env) +print(df3) + +env.finalize() diff --git a/python/pycylon/__init__.py b/python/pycylon/__init__.py index ac6959f0e..a8cc41170 100644 --- a/python/pycylon/__init__.py +++ b/python/pycylon/__init__.py @@ -26,6 +26,7 @@ from pycylon.frame import DataFrame from pycylon.frame import CylonEnv from pycylon.frame import read_csv +from pycylon.frame import concat from pycylon.util.logging import log_level, disable_logging import os diff --git a/python/pycylon/common/join_config.pyx b/python/pycylon/common/join_config.pyx index e92234ddc..0b5cfbdd4 100644 --- a/python/pycylon/common/join_config.pyx +++ b/python/pycylon/common/join_config.pyx @@ -36,7 +36,8 @@ StrToJoinType = { 'inner': CJoinType.CINNER, 'left': CJoinType.CLEFT, 'right': CJoinType.CRIGHT, - 'fullouter': CJoinType.COUTER + 'fullouter': CJoinType.COUTER, + 'outer': CJoinType.COUTER } cdef class JoinConfig: diff --git a/python/pycylon/data/table.pyx b/python/pycylon/data/table.pyx index fd9da0624..a4f2aaa80 100644 --- a/python/pycylon/data/table.pyx +++ b/python/pycylon/data/table.pyx @@ -63,10 +63,9 @@ import math import pyarrow as pa import numpy as np import pandas as pd -from typing import List, Any +from typing import List import warnings import operator -import copy ''' Cylon Table definition mapping @@ -342,7 +341,7 @@ cdef class Table: """ cdef shared_ptr[CTable] output cdef shared_ptr[CTable] right = pycylon_unwrap_table(table) - cdef CJoinConfig*jcptr + cdef CJoinConfig *jcptr left_cols, right_cols = self._get_join_column_indices(table=table, **kwargs) left_prefix = kwargs.get('left_prefix') if 'left_prefix' in kwargs else "" @@ -368,7 +367,7 @@ cdef class Table: """ cdef shared_ptr[CTable] output cdef shared_ptr[CTable] right = pycylon_unwrap_table(table) - cdef CJoinConfig*jcptr + cdef CJoinConfig *jcptr left_cols, right_cols = self._get_join_column_indices(table=table, **kwargs) left_prefix = kwargs.get('left_prefix') if 'left_prefix' in kwargs else "" @@ -1169,7 +1168,7 @@ cdef class Table: Sets values for a existing table by means of a column Args: key: (str) column-name - value: (Table) data as a single column table + value: (Table) data as a single column table or a scalar Returns: PyCylon Table @@ -2055,7 +2054,7 @@ cdef class Table: return self.get_index() def set_index(self, key, indexing_type: IndexingType = IndexingType.LINEAR, - drop: bool = False): + drop: bool = False): ''' Set Index Operation takes place inplace. @@ -2366,8 +2365,10 @@ cdef class Table: return PyLocIndexer(self, "iloc") @staticmethod - def concat(tables: List[Table], axis: int = 0, join: str = 'inner', algorithm: str = 'sort', distributed=False): + def concat(tables: List[Table], axis: int = 0, join: str = 'inner', algorithm: str = 'sort'): """ + Concatenate tables. axis=0 (row-wise) concat is independent from local/ distributed execution. + Algorithm ========= axis=1 (regular join op considering a column) @@ -2391,46 +2392,45 @@ cdef class Table: Returns: PyCylon Table """ - if axis == 0: - res_table = tables[0] - if not isinstance(res_table, Table): - raise ValueError(f"Invalid object {res_table}, expected Table") - formatted_tables = [] - new_column_names = res_table.column_names - for tb_idx in range(len(tables)): - tb1 = tables[tb_idx] - tb1.reset_index() - res_table = Table.merge(tables) - res_table.set_index(res_table.column_names[0], drop=True) - for tb_idx in range(len(tables)): - tb1 = tables[tb_idx] - tb1.set_index(tb1.column_names[0], drop=True) - return res_table - elif axis == 1: - if not isinstance(tables[0], Table): - raise ValueError(f"Invalid object {tables[0]}, Table expected") - ctx = tables[0].context - res_table = tables[0] - for i in range(1, len(tables)): - tb1 = tables[i] - if not isinstance(tb1, Table): - raise ValueError(f"Invalid object {tb1}, expected Table") - tb1.reset_index() - res_table.reset_index() - if ctx.get_world_size() > 1 and distributed: - res_table = res_table.distributed_join(table=tb1, join_type=join, - algorithm=algorithm, - left_on=[res_table.column_names[0]], - right_on=[tb1.column_names[0]]) - else: - res_table = res_table.join(table=tb1, join_type=join, algorithm=algorithm, - left_on=[res_table.column_names[0]], - right_on=[tb1.column_names[0]]) - res_table.set_index(res_table.column_names[0], drop=True) - res_table.drop([tb1.column_names[0]], inplace=True) - tb1.set_index(tb1.column_names[0], drop=True) - tables[0].set_index(tables[0].column_names[0], drop=True) - return res_table + return _concat_impl(tables=tables, axis=axis, join=join, algorithm=algorithm) + + @staticmethod + def distributed_concat(tables: List[Table], axis: int = 0, join: str = 'inner', algorithm: str = 'sort'): + """ + Concatenate tables. axis=0 (row-wise) concat is independent from local/ distributed execution. + + Algorithm + ========= + axis=1 (regular join op considering a column) + ---------------------------------------------- + + 1. If indexed or not, do a reset_index op (which will add the new column as 'index' in both + tables) + 2. Do the regular join by considering the 'index' column + 3. Set the index by 'index' in the resultant table + + axis=0 (stacking tables or similar to merge function) + ----------------------------------------------------- + assert: column count must match + the two tables are stacked upon each other in order + The index is created by concatenating two indices + Args: + tables: List of PyCylon Tables + axis: 0:row-wise 1:column-wise + join: 'inner' and 'outer' + algorithm: 'sort' or 'hash' + Returns: PyCylon Table + + """ + if not isinstance(tables[0], Table): + raise ValueError(f"Invalid object {tables[0]}, Table expected") + ctx = tables[0].context + + if axis == 0 or (axis == 1 and ctx.get_world_size() == 1): + return _concat_impl(tables=tables, axis=axis, join=join, algorithm=algorithm) + elif axis == 1 and ctx.get_world_size() > 1: + return _concat_impl(tables=tables, axis=axis, join=join, algorithm=algorithm, + axis1_join_func='distributed_join') else: raise ValueError(f"Invalid axis {axis}, must 0 or 1") @@ -2528,6 +2528,48 @@ cdef class Table: def __len__(self) -> int: return self.row_count +def _concat_impl(tables: List[Table], axis: int = 0, join: str = 'inner', algorithm: str = 'sort', + axis1_join_func: str = 'join'): + # row-wise concat -> locally stacking up tables + if axis == 0: + res_table = tables[0] + if not isinstance(res_table, Table): + raise ValueError(f"Invalid object {res_table}, expected Table") + formatted_tables = [] + new_column_names = res_table.column_names + for tb_idx in range(len(tables)): + tb1 = tables[tb_idx] + tb1.reset_index() + res_table = Table.merge(tables) + res_table.set_index(res_table.column_names[0], drop=True) + for tb_idx in range(len(tables)): + tb1 = tables[tb_idx] + tb1.set_index(tb1.column_names[0], drop=True) + return res_table + elif axis == 1: + if not isinstance(tables[0], Table): + raise ValueError(f"Invalid object {tables[0]}, Table expected") + ctx = tables[0].context + res_table = tables[0] + for i in range(1, len(tables)): + tb1 = tables[i] + if not isinstance(tb1, Table): + raise ValueError(f"Invalid object {tb1}, expected Table") + tb1.reset_index() + res_table.reset_index() + # call method name by string + res_table = getattr(res_table, axis1_join_func)(table=tb1, join_type=join, + algorithm=algorithm, + left_on=[res_table.column_names[0]], + right_on=[tb1.column_names[0]]) + res_table.set_index(res_table.column_names[0], drop=True) + res_table.drop([tb1.column_names[0]], inplace=True) + tb1.set_index(tb1.column_names[0], drop=True) + tables[0].set_index(tables[0].column_names[0], drop=True) + return res_table + else: + raise ValueError(f"Invalid axis {axis}, must 0 or 1") + class EmptyTable(Table): ''' diff --git a/python/pycylon/frame.py b/python/pycylon/frame.py index 26703c588..e7fec281d 100644 --- a/python/pycylon/frame.py +++ b/python/pycylon/frame.py @@ -14,26 +14,31 @@ from __future__ import annotations -from typing import Hashable, List, Dict, Optional, Sequence, Union -from copy import copy + from collections.abc import Iterable -import pycylon as cn +from copy import copy +from typing import Hashable, List, Dict, Optional, Sequence, Union + import numpy as np import pandas as pd import pyarrow as pa + +import pycylon as cn +import pycylon.data as pcd +from pycylon import CylonContext from pycylon import Series +from pycylon.data.table import SortOptions from pycylon.index import RangeIndex, CategoricalIndex -from pycylon.io import CSVWriteOptions from pycylon.io import CSVReadOptions -import pycylon.data as pcd - -from pycylon import CylonContext +from pycylon.io import CSVWriteOptions DEVICE_CPU = "cpu" # Data loading Functions -def read_csv(filepath: str, use_threads=True, names=None, sep=",", block_size: int = 1 << 20, skiprows=0, ignore_emptylines=True, na_values=None): +def read_csv(filepath: str, use_threads=True, names=None, sep=",", block_size: int = 1 << 20, + skiprows=0, + ignore_emptylines=True, na_values=None): """ Read a comma-separated values (csv) file into DataFrame. @@ -105,12 +110,6 @@ def finalize(self): def barrier(self): self._context.barrier() - def __del__(self): - """ - On destruction of the application, the environment will be automatically finalized - """ - self.finalize() - class GroupByDataFrame(object): def __init__(self, df: DataFrame, by=None) -> None: @@ -221,16 +220,18 @@ def is_cpu(self): def is_device(self, device): return self._device == device - def _change_context(self, env: CylonEnv): + def _change_context(self, env: CylonEnv) -> DataFrame: """ This is a temporary function to make the DataFrame backed by a Cylon Table with a different context. This should be removed once C++ support Tables which are independent from Contexts """ self._table = self._initialize_dataframe( - data=self._table.to_arrow(), index=self._index, columns=self._columns, copy=False, context=env.context) + data=self._table.to_arrow(), index=self._index, columns=self._columns, copy=False, + context=env.context) return self - def _initialize_dataframe(self, data=None, index=None, columns=None, copy=False, context=CylonContext(config=None, distributed=False)): + def _initialize_dataframe(self, data=None, index=None, columns=None, copy=False, + context=CylonContext(config=None, distributed=False)): rows = 0 cols = 0 self._table = None @@ -418,7 +419,7 @@ def __setitem__(self, key, value): Sets values for a existing dataframe by means of a column Args: key: (str) column-name - value: (DataFrame) data as a single column table + value: (DataFrame) data as a single column table or a scalar Returns: PyCylon DataFrame @@ -447,8 +448,11 @@ def __setitem__(self, key, value): 3 4 8 120 1120 ''' - if isinstance(key, str) and isinstance(value, DataFrame): - self._table.__setitem__(key, value.to_table()) + if isinstance(key, str) : + if isinstance(value, DataFrame): + self._table.__setitem__(key, value.to_table()) + elif np.isscalar(value): + self._table.__setitem__(key, value) else: raise ValueError(f"Not Implemented __setitem__ option for key Type {type(key)} and " f"value type {type(value)}") @@ -1145,7 +1149,7 @@ def add_prefix(self, prefix: str) -> DataFrame: # Indexing def set_index( - self, keys, drop=True, append=False, inplace=False, verify_integrity=False + self, keys, drop=True, append=False, inplace=False, verify_integrity=False ): """ Set the DataFrame index using existing columns. @@ -1243,12 +1247,12 @@ def set_index( return new_df def reset_index( # type: ignore[misc] - self, - level: Optional[Union[Hashable, Sequence[Hashable]]] = ..., - drop: bool = ..., - inplace: False = ..., - col_level: Hashable = ..., - col_fill=..., + self, + level: Optional[Union[Hashable, Sequence[Hashable]]] = ..., + drop: bool = ..., + inplace: False = ..., + col_level: Hashable = ..., + col_fill=..., ) -> DataFrame: # todo this is not a final implementation self._index_columns = [] @@ -1257,6 +1261,121 @@ def reset_index( # type: ignore[misc] # Combining / joining / merging + def sort_values( + self, + by, + axis=0, + ascending=True, + inplace=False, + kind="quicksort", + na_position="last", + ignore_index=False, + key=None, + num_samples: int = 0, + num_bins: int = 0, + env: CylonEnv = None + ) -> DataFrame: + """ + Sort by the values along either axis. + Parameters + ---------- + + axis : %(axes_single_arg)s, default 0 + Axis to be sorted. + ascending : bool or list of bool, default True + Sort ascending vs. descending. Specify list for multiple sort + orders. If this is a list of bools, must match the length of + the by. + inplace(Unsupported) : bool, default False + If True, perform operation in-place. + kind(Unsupported) : {'quicksort', 'mergesort', 'heapsort', 'stable'}, default 'quicksort' + Choice of sorting algorithm. See also :func:`numpy.sort` for more + information. `mergesort` and `stable` are the only stable algorithms. For + DataFrames, this option is only applied when sorting on a single + column or label. + na_position(Unsupported) : {'first', 'last'}, default 'last' + Puts NaNs at the beginning if `first`; `last` puts NaNs at the + end. + ignore_index(Unsupported) : bool, default False + If True, the resulting axis will be labeled 0, 1, …, n - 1. + .. versionadded:: 1.0.0 + key(Unsupported) : callable, optional + Apply the key function to the values + before sorting. This is similar to the `key` argument in the + builtin :meth:`sorted` function, with the notable difference that + this `key` function should be *vectorized*. It should expect a + ``Series`` and return a Series with the same shape as the input. + It will be applied to each column in `by` independently. + .. versionadded:: 1.1.0 + num_samples: int, default 0 + Number of samples to determine key distribution. Only used in a distributed env. Need to pass a + deterministic value common to every process. If num_samples == 0, the value would be handled internally. + num_bins: int, default 0 + Number of bins in the histogram of the key distribution. Only used in a distributed env. Need to pass a + deterministic value common to every process. If num_bins == 0, the value would be handled internally. + env: CylonEnv, default (None) + Execution environment used to distinguish between distributed and local operations. default None (local env) + + Returns + ------- + DataFrame or None + DataFrame with sorted values or None if ``inplace=True``. + See Also + -------- + DataFrame.sort_index : Sort a DataFrame by the index. + Series.sort_values : Similar method for a Series. + Examples + -------- + >>> df = DataFrame({ + ... 'col1': ['A', 'A', 'B', np.nan, 'D', 'C'], + ... 'col2': [2, 1, 9, 8, 7, 4], + ... 'col3': [0, 1, 9, 4, 2, 3], + ... 'col4': ['a', 'B', 'c', 'D', 'e', 'F'] + ... }) + >>> df + col1 col2 col3 col4 + 0 A 2 0 a + 1 A 1 1 B + 2 B 9 9 c + 3 NaN 8 4 D + 4 D 7 2 e + 5 C 4 3 F + Sort by col1 + >>> df.sort_values(by=['col1']) + col1 col2 col3 col4 + 0 A 2 0 a + 1 A 1 1 B + 2 B 9 9 c + 5 C 4 3 F + 4 D 7 2 e + 3 NaN 8 4 D + Sort by multiple columns + >>> df.sort_values(by=['col1', 'col2']) + col1 col2 col3 col4 + 1 A 1 1 B + 0 A 2 0 a + 2 B 9 9 c + 5 C 4 3 F + 4 D 7 2 e + 3 NaN 8 4 D + Sort Descending + >>> df.sort_values(by='col1', ascending=False) + col1 col2 col3 col4 + 4 D 7 2 e + 5 C 4 3 F + 2 B 9 9 c + 0 A 2 0 a + 1 A 1 1 B + 3 NaN 8 4 D + """ + if env is None: + return DataFrame(self._table.sort(order_by=by, ascending=ascending)) + else: + sort_opts = SortOptions(num_bins=num_bins, num_samples=num_samples) + return DataFrame( + self._change_context(env)._table.distributed_sort(order_by=by, ascending=ascending, + sort_options=sort_opts)) + def join(self, other: DataFrame, on=None, how='left', lsuffix='l', rsuffix='r', sort=False, algorithm="sort", env: CylonEnv = None) -> DataFrame: """ @@ -1593,8 +1712,9 @@ def merge(self, right_on = right._index_columns if left_on is None or right_on is None: - raise ValueError("Columns to merge is not specified. Expected on or left_index/right_index." - "Make sure dataframes has specified index columns if using left_index/right_index") + raise ValueError( + "Columns to merge is not specified. Expected on or left_index/right_index." + "Make sure dataframes has specified index columns if using left_index/right_index") if env is None: joined_table = self._table.join(table=right._table, join_type=how, @@ -1608,184 +1728,14 @@ def merge(self, joined_table = self._table.distributed_join(table=right._table, join_type=how, algorithm=algorithm, left_on=left_on, right_on=right_on, - left_prefix=suffixes[0], right_prefix=suffixes[1]) + left_prefix=suffixes[0], + right_prefix=suffixes[1]) return DataFrame(joined_table) - @staticmethod - def concat( - objs: Union[Iterable["DataFrame"]], - axis=0, - join="outer", - ignore_index: bool = False, - keys=None, - levels=None, - names=None, - verify_integrity: bool = False, - sort: bool = False, - copy: bool = True, - env: CylonEnv = None - ) -> DataFrame: - """ - Concatenate DataFrames along a particular axis with optional set logic - along the other axes. - Can also add a layer of hierarchical indexing on the concatenation axis, - which may be useful if the labels are the same (or overlapping) on - the passed axis number. - - Cylon currently support concat along axis=0, for DataFrames having the same schema(Union). - - Parameters - ---------- - objs : a sequence or mapping of Series or DataFrame objects - If a mapping is passed, the sorted keys will be used as the `keys` - argument, unless it is passed, in which case the values will be - selected (see below). Any None objects will be dropped silently unless - they are all None in which case a ValueError will be raised. - axis : {0/'index', 1/'columns' (Unsupported)}, default 0 - The axis to concatenate along. - join(Unsupported) : {'inner', 'outer'}, default 'outer' - How to handle indexes on other axis (or axes). - ignore_index(Unsupported) : bool, default False - If True, do not use the index values along the concatenation axis. The - resulting axis will be labeled 0, ..., n - 1. This is useful if you are - concatenating objects where the concatenation axis does not have - meaningful indexing information. Note the index values on the other - axes are still respected in the join. - keys(Unsupported) : sequence, default None - If multiple levels passed, should contain tuples. Construct - hierarchical index using the passed keys as the outermost level. - levels(Unsupported) : list of sequences, default None - Specific levels (unique values) to use for constructing a - MultiIndex. Otherwise they will be inferred from the keys. - names(Unsupported) : list, default None - Names for the levels in the resulting hierarchical index. - verify_integrity(Unsupported) : bool, default False - Check whether the new concatenated axis contains duplicates. This can - be very expensive relative to the actual data concatenation. - sort(Unsupported) : bool, default False - Sort non-concatenation axis if it is not already aligned when `join` - is 'outer'. - This has no effect when ``join='inner'``, which already preserves - the order of the non-concatenation axis. - .. versionchanged:: 1.0.0 - Changed to not sort by default. - copy(Unsupported) : bool, default True - If False, do not copy data unnecessarily. - Returns - ------- - object, type of objs - When concatenating along - the columns (axis=1) or rows (axis=0), a ``DataFrame`` is returned. - - Examples - -------- - - Combine two ``DataFrame`` objects with identical columns. - - >>> df1 = DataFrame([['a', 1], ['b', 2]], - ... columns=['letter', 'number']) - >>> df1 - letter number - 0 a 1 - 1 b 2 - >>> df2 = DataFrame([['c', 3], ['d', 4]], - ... columns=['letter', 'number']) - >>> df2 - letter number - 0 c 3 - 1 d 4 - >>> DataFrame.concat([df1, df2]) - letter number - 0 a 1 - 1 b 2 - 0 c 3 - 1 d 4 - - (Unsupported) Combine ``DataFrame`` objects with overlapping columns - and return everything. Columns outside the intersection will - be filled with ``NaN`` values. - - >>> df3 = DataFrame([['c', 3, 'cat'], ['d', 4, 'dog']], - ... columns=['letter', 'number', 'animal']) - >>> df3 - letter number animal - 0 c 3 cat - 1 d 4 dog - >>> DataFrame.concat([df1, df3], sort=False) - letter number animal - 0 a 1 NaN - 1 b 2 NaN - 0 c 3 cat - 1 d 4 dog - - (Unsupported) Combine ``DataFrame`` objects with overlapping columns - and return only those that are shared by passing ``inner`` to - the ``join`` keyword argument. - - >>> DataFrame.concat([df1, df3], join="inner") - letter number - 0 a 1 - 1 b 2 - 0 c 3 - 1 d 4 - - (Unsupported) Combine ``DataFrame`` objects horizontally along the x axis by - passing in ``axis=1``. - - >>> df4 = DataFrame([['bird', 'polly'], ['monkey', 'george']], - ... columns=['animal', 'name']) - >>> DataFrame.concat([df1, df4], axis=1) - - letter number animal name - 0 a 1 bird polly - 1 b 2 monkey george - - (Unsupported) Prevent the result from including duplicate index values with the - ``verify_integrity`` option. - - >>> df5 = DataFrame([1], index=['a']) - >>> df5 - 0 - a 1 - >>> df6 = DataFrame([2], index=['a']) - >>> df6 - 0 - a 2 - >>> DataFrame.concat([df5, df6], verify_integrity=True) - Traceback (most recent call last): - ... - ValueError: Indexes have overlapping values: ['a'] - """ - - if len(objs) == 0: - raise ValueError("objs can't be empty") - - if axis == 0: - if env is None: - current_table = objs[0]._table - for i in range(1, len(objs)): - current_table = current_table.union(objs[i]._table) - - return DataFrame(current_table) - else: - # todo not optimum for distributed - current_table = objs[0]._change_context(env)._table - for i in range(1, len(objs)): - current_table = current_table.union( - objs[i]._change_context(env)._table) - - return DataFrame(current_table) - else: - raise NotImplementedError("Unsupported operation") - - def drop_duplicates( - self, - subset: Optional[Union[Hashable, Sequence[Hashable]]] = None, - keep: Union[str, bool] = "first", - inplace: bool = False, - ignore_index: bool = False, - env: CylonEnv = None - ) -> DataFrame: + def drop_duplicates(self, subset: Optional[Union[Hashable, Sequence[Hashable]]] = None, + keep: Union[str, bool] = "first", inplace: bool = False, + ignore_index: bool = False, + env: CylonEnv = None) -> DataFrame: """ Return DataFrame with duplicate rows removed. Considering certain columns is optional. Indexes, including time indexes @@ -1804,7 +1754,7 @@ def drop_duplicates( Whether to drop duplicates in place or to return a copy. ignore_index (Unsupported) : bool, default False If True, the resulting axis will be labeled 0, 1, …, n - 1. - .. versionadded:: 1.0.0 + env: Execution environment used to distinguish between distributed and local operations. default None (local env) Returns ------- DataFrame or None @@ -1852,108 +1802,7 @@ def drop_duplicates( return DataFrame(self._change_context(env)._table.distributed_unique(columns=subset, inplace=inplace)) - def sort_values( - self, - by, - axis=0, - ascending=True, - inplace=False, - kind="quicksort", - na_position="last", - ignore_index=False, - key=None, - env: CylonEnv = None - ) -> DataFrame: - """ - Sort by the values along either axis. - Parameters - ---------- - - axis : %(axes_single_arg)s, default 0 - Axis to be sorted. - ascending : bool or list of bool, default True - Sort ascending vs. descending. Specify list for multiple sort - orders. If this is a list of bools, must match the length of - the by. - inplace(Unsupported) : bool, default False - If True, perform operation in-place. - kind(Unsupported) : {'quicksort', 'mergesort', 'heapsort', 'stable'}, default 'quicksort' - Choice of sorting algorithm. See also :func:`numpy.sort` for more - information. `mergesort` and `stable` are the only stable algorithms. For - DataFrames, this option is only applied when sorting on a single - column or label. - na_position(Unsupported) : {'first', 'last'}, default 'last' - Puts NaNs at the beginning if `first`; `last` puts NaNs at the - end. - ignore_index(Unsupported) : bool, default False - If True, the resulting axis will be labeled 0, 1, …, n - 1. - .. versionadded:: 1.0.0 - key(Unsupported) : callable, optional - Apply the key function to the values - before sorting. This is similar to the `key` argument in the - builtin :meth:`sorted` function, with the notable difference that - this `key` function should be *vectorized*. It should expect a - ``Series`` and return a Series with the same shape as the input. - It will be applied to each column in `by` independently. - .. versionadded:: 1.1.0 - Returns - ------- - DataFrame or None - DataFrame with sorted values or None if ``inplace=True``. - See Also - -------- - DataFrame.sort_index : Sort a DataFrame by the index. - Series.sort_values : Similar method for a Series. - Examples - -------- - >>> df = DataFrame({ - ... 'col1': ['A', 'A', 'B', np.nan, 'D', 'C'], - ... 'col2': [2, 1, 9, 8, 7, 4], - ... 'col3': [0, 1, 9, 4, 2, 3], - ... 'col4': ['a', 'B', 'c', 'D', 'e', 'F'] - ... }) - >>> df - col1 col2 col3 col4 - 0 A 2 0 a - 1 A 1 1 B - 2 B 9 9 c - 3 NaN 8 4 D - 4 D 7 2 e - 5 C 4 3 F - Sort by col1 - >>> df.sort_values(by=['col1']) - col1 col2 col3 col4 - 0 A 2 0 a - 1 A 1 1 B - 2 B 9 9 c - 5 C 4 3 F - 4 D 7 2 e - 3 NaN 8 4 D - Sort by multiple columns - >>> df.sort_values(by=['col1', 'col2']) - col1 col2 col3 col4 - 1 A 1 1 B - 0 A 2 0 a - 2 B 9 9 c - 5 C 4 3 F - 4 D 7 2 e - 3 NaN 8 4 D - Sort Descending - >>> df.sort_values(by='col1', ascending=False) - col1 col2 col3 col4 - 4 D 7 2 e - 5 C 4 3 F - 2 B 9 9 c - 0 A 2 0 a - 1 A 1 1 B - 3 NaN 8 4 D - """ - if env is None: - return DataFrame(self._table.sort(order_by=by, ascending=ascending)) - else: - return DataFrame(self._change_context(env)._table.distributed_sort(order_by=by, ascending=ascending)) - - def groupby(self, by: Union([int, str, List]), env: CylonEnv = None) -> GroupByDataFrame: + def groupby(self, by: Union[int, str, List], env: CylonEnv = None) -> GroupByDataFrame: """ A groupby operation involves some combination of splitting the object, applying a function, and combining the results. This can be used to group large amounts of data and compute operations on these groups. @@ -2000,7 +1849,7 @@ def groupby(self, by: Union([int, str, List]), env: CylonEnv = None) -> GroupByD elif isinstance(by, str): if by not in self.columns: raise ValueError( - str+" is not a column of this table. Expected one of "+str(by)) + str + " is not a column of this table. Expected one of " + str(by)) by_list.append(by) elif isinstance(by, list): if len(by) == 0: @@ -2020,3 +1869,133 @@ def groupby(self, by: Union([int, str, List]), env: CylonEnv = None) -> GroupByD return GroupByDataFrame(self, by_list) else: return GroupByDataFrame(self._change_context(env), by_list) + + +# -------------------- staticmethods --------------------------- + +def concat(objs: Union[Iterable["DataFrame"]], axis=0, join="outer", + env: CylonEnv = None) -> DataFrame: + """ + Concatenate DataFrames along a particular axis with optional set logic + along the other axes. + Can also add a layer of hierarchical indexing on the concatenation axis, + which may be useful if the labels are the same (or overlapping) on + the passed axis number. + + Cylon currently support concat along axis=0, for DataFrames having the same schema(Union). + + Parameters + ---------- + objs : a sequence or mapping of Series or DataFrame objects + If a mapping is passed, the sorted keys will be used as the `keys` + argument, unless it is passed, in which case the values will be + selected (see below). Any None objects will be dropped silently unless + they are all None in which case a ValueError will be raised. + axis : {0/'index', 1/'columns' (Unsupported)}, default 0 + The axis to concatenate along. + join : {'inner', 'outer'}, default 'outer' + How to handle indexes on other axis (or axes). + env: Execution environment used to distinguish between distributed and local operations. default None (local env) + Returns + ------- + object, type of objs + When concatenating along + the columns (axis=1) or rows (axis=0), a ``DataFrame`` is returned. + + Examples + -------- + + Combine two ``DataFrame`` objects with identical columns. + + >>> df1 = DataFrame([['a', 1], ['b', 2]], + ... columns=['letter', 'number']) + >>> df1 + letter number + 0 a 1 + 1 b 2 + >>> df2 = DataFrame([['c', 3], ['d', 4]], + ... columns=['letter', 'number']) + >>> df2 + letter number + 0 c 3 + 1 d 4 + >>> DataFrame.concat([df1, df2]) + letter number + 0 a 1 + 1 b 2 + 0 c 3 + 1 d 4 + + (Unsupported) Combine ``DataFrame`` objects with overlapping columns + and return everything. Columns outside the intersection will + be filled with ``NaN`` values. + + >>> df3 = DataFrame([['c', 3, 'cat'], ['d', 4, 'dog']], + ... columns=['letter', 'number', 'animal']) + >>> df3 + letter number animal + 0 c 3 cat + 1 d 4 dog + >>> DataFrame.concat([df1, df3]) + letter number animal + 0 a 1 NaN + 1 b 2 NaN + 0 c 3 cat + 1 d 4 dog + + (Unsupported) Combine ``DataFrame`` objects with overlapping columns + and return only those that are shared by passing ``inner`` to + the ``join`` keyword argument. + + >>> DataFrame.concat([df1, df3],join="inner") + letter number + 0 a 1 + 1 b 2 + 0 c 3 + 1 d 4 + + (Unsupported) Combine ``DataFrame`` objects horizontally along the x axis by + passing in ``axis=1``. + + >>> df4 = DataFrame([['bird', 'polly'], ['monkey', 'george']], + ... columns=['animal', 'name']) + >>> DataFrame.concat([df1, df4],axis=1) + + letter number animal name + 0 a 1 bird polly + 1 b 2 monkey george + + (Unsupported) Prevent the result from including duplicate index values with the + ``verify_integrity`` option. + + >>> df5 = DataFrame([1], index=['a']) + >>> df5 + 0 + a 1 + >>> df6 = DataFrame([2], index=['a']) + >>> df6 + 0 + a 2 + >>> DataFrame.concat([df5, df6]) + Traceback (most recent call last): + ... + ValueError: Indexes have overlapping values: ['a'] + """ + # ignore_index: bool = False, + # keys=None, + # levels=None, + # names=None, + # verify_integrity: bool = False, + # sort: bool = False, + # copy: bool = True, + + if len(objs) == 0: + raise ValueError("objs can't be empty") + + if env is None: + res_table = cn.Table.concat(tables=[df.to_table() for df in objs], axis=axis, join=join) + else: + res_table = cn.Table.distributed_concat( + tables=[df._change_context(env).to_table() for df in objs], + axis=axis, join=join) + return DataFrame(res_table) diff --git a/python/test/test_all.py b/python/test/test_all.py index 486e9e6a9..9b4079eee 100644 --- a/python/test/test_all.py +++ b/python/test/test_all.py @@ -216,6 +216,12 @@ def test_df_dist_sorting(): "-q python/test/test_df_dist_sorting.py")) assert responses[-1] == 0 + +def test_pd_read_csv(): + print("29. pandas read_csv") + responses.append(os.system("pytest -q python/test/test_pd_read_csv.py")) + assert responses[-1] == 0 + def test_all(): ar = np.array(responses) diff --git a/python/test/test_df_dist_sorting.py b/python/test/test_df_dist_sorting.py index b6df29bb6..203d49361 100644 --- a/python/test/test_df_dist_sorting.py +++ b/python/test/test_df_dist_sorting.py @@ -16,12 +16,13 @@ Run test: >> mpirun -n 4 python -m pytest -q python/test/test_df_dist_sorting.py """ - +import pytest from pycylon import DataFrame, CylonEnv from pycylon.net import MPIConfig import random +@pytest.mark.mpi def test_df_dist_sorting(): df1 = DataFrame([random.sample(range(10, 30), 5), random.sample(range(10, 30), 5)]) @@ -42,7 +43,7 @@ def check_sort(df, col, ascending): check_sort(df, '0', False) # distributed sort - env = CylonEnv(config=MPIConfig()) + env = CylonEnv(config=MPIConfig(), distributed=True) print("Distributed Sort", env.rank, env.world_size)