Skip to content

Commit

Permalink
ENH: new function zip_sp_matmul_topn that can zip matrices zip_j A.do…
Browse files Browse the repository at this point in the history
…t(B_j)

Function will return a zipped matrix Z in CSR format, zip_j C_j, where
Z = [sorted top n results > lower_bound for each row of C_j], where C_j = A.dot(B_j)
and where B has been split row-wise into sub-matrices B_j.

Function only allows for sorted variant of sp_matzip function;
unsorted variant (sorted based on insertion order) cannot be (made) equal to unsorted function on full matrices.
zip_sp_matmul_topn by default sorts by value.

And added python function to zip split matrices.
Plus added two unit tests to test functionality.

NB Skip unit test test_stack_zip_sp_matmul_topn for python 3.8 due to bug in scipy vstack function,
it does not support all data types.
  • Loading branch information
kx79wq authored and mbaak committed Mar 1, 2024
1 parent f66d530 commit ce7239d
Show file tree
Hide file tree
Showing 12 changed files with 518 additions and 6 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ set(SDTN_SRC_FILES
${SDTN_SRC_PREF}/extension.cpp
${SDTN_SRC_PREF}/sp_matmul_bindings.cpp
${SDTN_SRC_PREF}/sp_matmul_topn_bindings.cpp
${SDTN_SRC_PREF}/zip_sp_matmul_topn_bindings.cpp
)

include(FindDependencies)
Expand Down
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,47 @@ pip install sparse_dot_topn --no-binary sparse_dot_topn
Building from source can enable architecture specific optimisations and is recommended for those that have a C++ compiler installed.
See INSTALLATION.md for details.

## Distributing the top-n multiplication of two large O(10M+) sparse matrices over a cluster

The top-n multiplication of two large O(10M+) sparse matrices can be broken down into smaller chunks.
For example, one may want to split sparse matrices into matrices with just 1M rows, and do the
the (top-n) multiplication of all those matrix pairs.
Reasons to do this are to reduce the memory footprint of each pair, and to employ available distributed computing power.

The pairs can be distributed and calculated over a cluster (eg. we use a spark cluster).
The resulting matrix-products are then zipped and stacked in order to reproduce the full matrix product.

Here's an example how to do this, where we are matching 1000 rows in sparse matrix A against 600 rows in sparse matrix B,
and both A and B are split into chunks.

```python
import numpy as np
import scipy.sparse as sparse
from sparse_dot_topn import sp_matmul_topn, zip_sp_matmul_topn

# 1a. Example matching 1000 rows in sparse matrix A against 600 rows in sparse matrix B.
A = sparse.random(1000, 2000, density=0.1, format="csr", dtype=np.float32, random_state=rng)
B = sparse.random(600, 2000, density=0.1, format="csr", dtype=np.float32, random_state=rng)

# 1b. Reference full matrix product with top-n
C_ref = sp_matmul_topn(A, B.T, top_n=10, threshold=0.01, sort=True)

# 2a. Split the sparse matrices. Here A is split into three parts, and B into five parts.
As = [A[i*200:(i+1)*200] for i in range(5)]
Bs = [B[:100], B[100:300], B[300:]]

# 2b. Perform the top-n multiplication of all sub-matrix pairs, here in a double loop.
# E.g. all sub-matrix pairs could be distributed over a cluster and multiplied there.
Cs = [[sp_matmul_topn(Aj, Bi.T, top_n=10, threshold=0.01, sort=True) for Bi in Bs] for Aj in As]

# 2c. top-n zipping of the C-matrices, done over the index of the B sub-matrices.
Czip = [zip_sp_matmul_topn(top_n=10, C_mats=Cis) for Cis in Cs]

# 2d. stacking over zipped C-matrices, done over the index of the A sub-matrices
# The resulting matrix C equals C_ref.
C = sparse.vstack(Czip, dtype=np.float32)
```

## Migrating to v1.

**sparse\_dot\_topn** v1 is a significant change from `v0.*` with a new bindings and API.
Expand Down Expand Up @@ -147,4 +188,5 @@ You can read about it in a [blog](https://medium.com/@ingwbaa/https-medium-com-i
* [Stephane Collet](https://github.com/stephanecollot)
* [Particular Miner](https://github.com/ParticularMiner) (no ING affiliation)
* [Ralph Urlus](https://github.com/RUrlus)
* [Max Baak](https://github.com/mbaak)

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ select = [
"RSE",
# flynt
"FLY",
"CPY001"
]

ignore = [
"E501", # line length
"PLR0913", # too many arguments
Expand Down
12 changes: 10 additions & 2 deletions src/sparse_dot_topn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@
import importlib.metadata

__version__ = importlib.metadata.version("sparse_dot_topn")
from sparse_dot_topn.api import awesome_cossim_topn, sp_matmul, sp_matmul_topn
from sparse_dot_topn.api import awesome_cossim_topn, sp_matmul, sp_matmul_topn, zip_sp_matmul_topn
from sparse_dot_topn.lib import _sparse_dot_topn_core as _core
from sparse_dot_topn.lib._sparse_dot_topn_core import _has_openmp_support

__all__ = ["awesome_cossim_topn", "sp_matmul", "sp_matmul_topn", "_core", "__version__", "_has_openmp_support"]
__all__ = [
"awesome_cossim_topn",
"sp_matmul",
"sp_matmul_topn",
"zip_sp_matmul_topn",
"_core",
"__version__",
"_has_openmp_support",
]
57 changes: 57 additions & 0 deletions src/sparse_dot_topn/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,60 @@ def sp_matmul_topn(
msg = "sparse_dot_topn: extension was compiled without parallelisation (OpenMP) support, ignoring ``n_threads``"
warnings.warn(msg, stacklevel=1)
return csr_matrix(func(**kwargs), shape=(A_nrows, B_ncols))


def zip_sp_matmul_topn(top_n: int, C_mats: list[csr_matrix]) -> csr_matrix:
"""Compute zip-matrix C = zip_i C_i = zip_i A * B_i = A * B whilst only storing the `top_n` elements.
Combine the sub-matrices together and keep only the `top_n` elements per row.
Pre-calling this function, matrix B has been split row-wise into chunks B_i, and C_i = A * B_i have been calculated.
This function computes C = zip_i C_i, which is equivalent to A * B when only keeping the `top_n` elements.
It allows very large matrices to be split and multiplied with a limited memory footprint.
Args:
top_n: the number of results to retain; should be smaller or equal to top_n used to obtain C_mats.
C_mats: a list with each C_i sub-matrix, with format csr_matrix.
Returns:
C: zipped result matrix
Raises:
TypeError: when not all elements of `C_mats` is a csr_matrix or trivially convertable
ValueError: when not all elements of `C_mats` has the same number of rows
"""
_nrows = []
ncols = []
data = []
indptr = []
indices = []
for C in C_mats:
# check correct type of each C
if isinstance(C, (coo_matrix, csc_matrix)):
C = C.tocsr(False)
elif not isinstance(C, csr_matrix):
msg = f"type of `C` must be one of `csr_matrix`, `csc_matrix` or `csr_matrix`, got `{type(C)}`"
raise TypeError(msg)

nrows, c_nc = C.shape
_nrows.append(nrows)
ncols.append(c_nc)
data.append(C.data)
indptr.append(C.indptr)
indices.append(C.indices)

if not np.all(np.diff(_nrows) == 0):
msg = "Each `C` in `C_mats` should have the same number of rows."
raise ValueError(msg)

return csr_matrix(
_core.zip_sp_matmul_topn(
top_n=top_n,
Z_max_nnz=nrows * top_n,
nrows=nrows,
B_ncols=np.asarray(ncols, int),
data=data,
indptr=indptr,
indices=indices,
)
)
4 changes: 2 additions & 2 deletions src/sparse_dot_topn_core/include/sparse_dot_topn/maxheap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class MaxHeap {
}

/**
* \brief Sort the heap accoring to the insertion order.
* \brief Sort the heap according to the insertion order.
*
* \details Note that calling `insertion_sort` invalidates the heap.
* Calls should be followed by a call to `reset`.
Expand All @@ -106,7 +106,7 @@ class MaxHeap {
}

/**
* \brief Sort the heap accoring to values.
* \brief Sort the heap according to values.
*
* \details Note that calling `value_sort` invalidates the heap.
* Calls should be followed by a call to `reset`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <algorithm>
#include <cstring>
#include <limits>
#include <memory>
#include <numeric>
#include <tuple>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/* Copyright (c) 2023 ING Analytics Wholesale Banking
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <limits>
#include <vector>

#include <sparse_dot_topn/common.hpp>
#include <sparse_dot_topn/maxheap.hpp>

namespace sdtn::core {

/**
* \brief Zip and compute Z = zip_j C_j = zip_j A.dot(B_j) keeping only the
* top-n of the zipped results.
*
* \details This function will return a zipped matrix Z in CSR format, zip_j
* C_j, where Z = [sorted top n results > lower_bound for each row of C_j],
* where C_j = A.dot(B_j) and where B has been split row-wise into sub-matrices
* B_j. Note that `C_j` must be `CSR` format where the nonzero elements of the
* `i`th row are located in ``data[indptr[i]:indptr[i+1]]``. The column indices
* for row `i` are stored in
* ``indices[indptr[i]:indptr[i+1]]``.
*
* \tparam eT element type of the matrices
* \tparam idxT integer type of the index arrays, must be at least 32 bit int
* \param[in] top_n the top n values to store
* \param[in] nrowsA the number of rows in A
* \param[in] ncolsB_vec the number of columns in each B_i sub-matrix
* \param[in] C_data_vec vector of the nonzero elements of each C_data_j
* sub-matrix
* \param[in] C_indptr_vec vector of arrays containing the row indices for
* `C_data_j` sub-matrices
* \param[in] C_indices_vec vector of arrays containing the column indices
for the C_j sub-matrices
* \param[out] Z_data the nonzero elements of zipped Z matrix
* \param[out] Z_indptr array containing the row indices for zipped `Z_data`
* \param[out] Z_indices array containing the zipped column indices
*/
template <typename eT, typename idxT, iffInt<idxT> = true>
inline void zip_sp_matmul_topn(
const idxT top_n,
const idxT nrows,
const idxT* B_ncols,
const std::vector<const eT*>& C_data,
const std::vector<const idxT*>& C_indptrs,
const std::vector<const idxT*>& C_indices,
eT* __restrict Z_data,
idxT* __restrict Z_indptr,
idxT* __restrict Z_indices
) {
idxT nnz = 0;
Z_indptr[0] = 0;
eT* Z_data_head = Z_data;
idxT* Z_indices_head = Z_indices;
const int n_mat = C_data.size();

// threshold is already consistent between matrices, so accept every line.
auto max_heap = MaxHeap<eT, idxT>(top_n, std::numeric_limits<eT>::min());

// offset the index when concatenating the C sub-matrices (split by row)
std::vector<idxT> offset(n_mat, idxT(0));
for (int i = 0; i < n_mat - 1; ++i) {
for (int j = i; j < n_mat - 1; ++j) {
offset[j + 1] += B_ncols[i];
}
}

// concatenate the results of each row, apply top_n and add those results to
// the C matrix
for (idxT i = 0; i < nrows; ++i) {
eT min = max_heap.reset();

// keep topn of stacked lines for each row insert in reverse order,
// similar to the reverse linked list in sp_matmul_topn
for (int j = n_mat - 1; j >= 0; --j) {
const idxT* C_indptr_j = C_indptrs[j];
const idxT* C_indices_j = C_indices[j];
for (idxT k = C_indptr_j[i]; k < C_indptr_j[i + 1]; ++k) {
eT val = (C_data[j])[k];
if (val > min) {
min = max_heap.push_pop(offset[j] + C_indices_j[k], val);
}
}
}

// sort the heap s.t. the first value is the largest
max_heap.value_sort();

// fill the zipped sparse matrix Z
int n_set = max_heap.get_n_set();
for (int ii = 0; ii < n_set; ++ii) {
*Z_indices_head = max_heap.heap[ii].idx;
*Z_data_head = max_heap.heap[ii].val;
Z_indices_head++;
Z_data_head++;
}
nnz += n_set;
Z_indptr[i + 1] = nnz;
}
}

} // namespace sdtn::core
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/* Copyright (c) 2023 ING Analytics Wholesale Banking
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <nanobind/nanobind.h>
#include <nanobind/ndarray.h>
#include <nanobind/stl/vector.h>

#include <memory>
#include <numeric>
#include <vector>

#include <sparse_dot_topn/common.hpp>
#include <sparse_dot_topn/maxheap.hpp>
#include <sparse_dot_topn/zip_sp_matmul_topn.hpp>

namespace sdtn {
namespace nb = nanobind;

namespace api {

template <typename eT, typename idxT, core::iffInt<idxT> = true>
inline nb::tuple zip_sp_matmul_topn(
const int top_n,
const idxT Z_max_nnz,
const idxT nrows,
const nb_vec<idxT>& B_ncols,
const std::vector<nb_vec<eT>>& data,
const std::vector<nb_vec<idxT>>& indptr,
const std::vector<nb_vec<idxT>>& indices
) {
const int n_mats = B_ncols.size();
std::vector<const eT*> data_ptrs;
data_ptrs.reserve(n_mats);
std::vector<const idxT*> indptr_ptrs;
indptr_ptrs.reserve(n_mats);
std::vector<const idxT*> indices_ptrs;
indices_ptrs.reserve(n_mats);

for (int i = 0; i < n_mats; ++i) {
data_ptrs.push_back(data[i].data());
indptr_ptrs.push_back(indptr[i].data());
indices_ptrs.push_back(indices[i].data());
}

auto Z_indptr = std::unique_ptr<idxT[]>(new idxT[nrows + 1]);
auto Z_indices = std::unique_ptr<idxT>(new idxT[Z_max_nnz]);
auto Z_data = std::unique_ptr<eT>(new eT[Z_max_nnz]);

core::zip_sp_matmul_topn<eT, idxT>(
top_n,
nrows,
B_ncols.data(),
data_ptrs,
indptr_ptrs,
indices_ptrs,
Z_data.get(),
Z_indptr.get(),
Z_indices.get()
);

return nb::make_tuple(
to_nbvec<eT>(Z_data.release(), Z_max_nnz),
to_nbvec<idxT>(Z_indices.release(), Z_max_nnz),
to_nbvec<idxT>(Z_indptr.release(), nrows + 1)
);
}
} // namespace api

namespace bindings {
void bind_zip_sp_matmul_topn(nb::module_& m);
}

} // namespace sdtn
Loading

0 comments on commit ce7239d

Please sign in to comment.