diff --git a/include/pisa/block_inverted_index.hpp b/include/pisa/block_inverted_index.hpp new file mode 100644 index 00000000..775d273d --- /dev/null +++ b/include/pisa/block_inverted_index.hpp @@ -0,0 +1,307 @@ +#pragma once + +#include "bit_vector.hpp" +#include "codec/block_codec.hpp" +#include "codec/block_codecs.hpp" +#include "codec/compact_elias_fano.hpp" +#include "concepts.hpp" +#include "concepts/inverted_index.hpp" +#include "global_parameters.hpp" +#include "mappable/mappable_vector.hpp" +#include "mappable/mapper.hpp" +#include "memory_source.hpp" + +namespace pisa { + +class BlockInvertedIndexCursor { + public: + BlockInvertedIndexCursor(BlockCodec const* block_codec, std::uint8_t const* data, std::uint64_t universe) + : m_base(TightVariableByte::decode(data, &m_n, 1)), + m_blocks(ceil_div(m_n, block_codec->block_size())), + m_block_maxs(m_base), + m_block_endpoints(m_block_maxs + 4 * m_blocks), + m_blocks_data(m_block_endpoints + 4 * (m_blocks - 1)), + m_universe(universe), + m_block_codec(block_codec), + m_block_size(block_codec->block_size()) { + PISA_ASSERT_CONCEPT( + (concepts::FrequencyPostingCursor + && concepts::SortedPostingCursor) + ); + + m_docs_buf.resize(m_block_size); + m_freqs_buf.resize(m_block_size); + reset(); + } + + void reset() { decode_docs_block(0); } + + void PISA_ALWAYSINLINE next() { + ++m_pos_in_block; + if PISA_UNLIKELY (m_pos_in_block == m_cur_block_size) { + if (m_cur_block + 1 == m_blocks) { + m_cur_docid = m_universe; + return; + } + decode_docs_block(m_cur_block + 1); + } else { + m_cur_docid += m_docs_buf[m_pos_in_block] + 1; + } + } + + /** + * Moves to the next document, counting from the current position, + * with the ID equal to or greater than `lower_bound`. + * + * In particular, if called with a value that is less than or equal + * to the current document ID, the position will not change. + */ + void PISA_ALWAYSINLINE next_geq(uint64_t lower_bound) { + if PISA_UNLIKELY (lower_bound > m_cur_block_max) { + // binary search seems to perform worse here + if (lower_bound > block_max(m_blocks - 1)) { + m_cur_docid = m_universe; + return; + } + + uint64_t block = m_cur_block + 1; + while (block_max(block) < lower_bound) { + ++block; + } + + decode_docs_block(block); + } + + while (docid() < lower_bound) { + m_cur_docid += m_docs_buf[++m_pos_in_block] + 1; + assert(m_pos_in_block < m_cur_block_size); + } + } + + void PISA_ALWAYSINLINE move(uint64_t pos) { + assert(pos >= position()); + uint64_t block = pos / m_block_size; + if PISA_UNLIKELY (block != m_cur_block) { + decode_docs_block(block); + } + while (position() < pos) { + m_cur_docid += m_docs_buf[++m_pos_in_block] + 1; + } + } + + uint64_t docid() const { return m_cur_docid; } + + uint64_t PISA_ALWAYSINLINE freq() { + if (!m_freqs_decoded) { + decode_freqs_block(); + } + return m_freqs_buf[m_pos_in_block] + 1; + } + + uint64_t PISA_ALWAYSINLINE value() { return freq(); } + + uint64_t position() const { return m_cur_block * m_block_size + m_pos_in_block; } + + uint64_t size() const noexcept { return m_n; } + + uint64_t num_blocks() const { return m_blocks; } + + uint64_t stats_freqs_size() const { + // XXX rewrite in terms of get_blocks() + uint64_t bytes = 0; + uint8_t const* ptr = m_blocks_data; + static const uint64_t block_size = m_block_size; + std::vector buf(block_size); + for (size_t b = 0; b < m_blocks; ++b) { + uint32_t cur_block_size = + ((b + 1) * block_size <= size()) ? block_size : (size() % block_size); + + uint32_t cur_base = (b != 0U ? block_max(b - 1) : uint32_t(-1)) + 1; + uint8_t const* freq_ptr = m_block_codec->decode( + ptr, buf.data(), block_max(b) - cur_base - (cur_block_size - 1), cur_block_size + ); + ptr = m_block_codec->decode(freq_ptr, buf.data(), uint32_t(-1), cur_block_size); + bytes += ptr - freq_ptr; + } + + return bytes; + } + + struct block_data { + uint32_t index; + uint32_t max; + uint32_t size; + uint32_t doc_gaps_universe; + uint8_t const* docs_begin; + uint8_t const* freqs_begin; + uint8_t const* end; + BlockCodec const* block_codec; + + void append_docs_block(std::vector& out) const { + out.insert(out.end(), docs_begin, freqs_begin); + } + + void append_freqs_block(std::vector& out) const { + out.insert(out.end(), freqs_begin, end); + } + + void decode_doc_gaps(std::vector& out) const { + out.resize(size); + block_codec->decode(docs_begin, out.data(), doc_gaps_universe, size); + } + + void decode_freqs(std::vector& out) const { + out.resize(size); + block_codec->decode(freqs_begin, out.data(), uint32_t(-1), size); + } + }; + + std::vector get_blocks() { + std::vector blocks; + + uint8_t const* ptr = m_blocks_data; + static const uint64_t block_size = m_block_size; + std::vector buf(block_size); + for (size_t b = 0; b < m_blocks; ++b) { + blocks.emplace_back(); + uint32_t cur_block_size = + ((b + 1) * block_size <= size()) ? block_size : (size() % block_size); + + uint32_t cur_base = (b != 0U ? block_max(b - 1) : uint32_t(-1)) + 1; + uint32_t gaps_universe = block_max(b) - cur_base - (cur_block_size - 1); + + blocks.back().index = b; + blocks.back().size = cur_block_size; + blocks.back().docs_begin = ptr; + blocks.back().doc_gaps_universe = gaps_universe; + blocks.back().max = block_max(b); + blocks.back().block_codec = m_block_codec; + + uint8_t const* freq_ptr = + m_block_codec->decode(ptr, buf.data(), gaps_universe, cur_block_size); + blocks.back().freqs_begin = freq_ptr; + ptr = m_block_codec->decode(freq_ptr, buf.data(), uint32_t(-1), cur_block_size); + blocks.back().end = ptr; + } + + assert(blocks.size() == num_blocks()); + return blocks; + } + + private: + uint32_t block_max(uint32_t block) const { return ((uint32_t const*)m_block_maxs)[block]; } + + void PISA_NOINLINE decode_docs_block(uint64_t block) { + static const uint64_t block_size = m_block_size; + uint32_t endpoint = block != 0U ? ((uint32_t const*)m_block_endpoints)[block - 1] : 0; + uint8_t const* block_data = m_blocks_data + endpoint; + m_cur_block_size = ((block + 1) * block_size <= size()) ? block_size : (size() % block_size); + uint32_t cur_base = (block != 0U ? block_max(block - 1) : uint32_t(-1)) + 1; + m_cur_block_max = block_max(block); + m_freqs_block_data = m_block_codec->decode( + block_data, m_docs_buf.data(), m_cur_block_max - cur_base - (m_cur_block_size - 1), m_cur_block_size + ); + intrinsics::prefetch(m_freqs_block_data); + + m_docs_buf[0] += cur_base; + + m_cur_block = block; + m_pos_in_block = 0; + m_cur_docid = m_docs_buf[0]; + m_freqs_decoded = false; + } + + void PISA_NOINLINE decode_freqs_block() { + uint8_t const* next_block = m_block_codec->decode( + m_freqs_block_data, m_freqs_buf.data(), uint32_t(-1), m_cur_block_size + ); + intrinsics::prefetch(next_block); + m_freqs_decoded = true; + } + + uint32_t m_n{0}; + uint8_t const* m_base; + uint32_t m_blocks; + uint8_t const* m_block_maxs; + uint8_t const* m_block_endpoints; + uint8_t const* m_blocks_data; + uint64_t m_universe; + + uint32_t m_cur_block{0}; + uint32_t m_pos_in_block{0}; + uint32_t m_cur_block_max{0}; + uint32_t m_cur_block_size{0}; + uint32_t m_cur_docid{0}; + + uint8_t const* m_freqs_block_data{nullptr}; + bool m_freqs_decoded{false}; + + std::vector m_docs_buf; + std::vector m_freqs_buf; + BlockCodec const* m_block_codec; + std::size_t m_block_size; +}; + +class BlockInvertedIndex { + private: + global_parameters m_params; + std::size_t m_size{0}; + std::size_t m_num_docs{0}; + bit_vector m_endpoints; + mapper::mappable_vector m_lists; + MemorySource m_source; + std::unique_ptr m_block_codec; + + public: + using document_enumerator = BlockInvertedIndexCursor; + + explicit BlockInvertedIndex(MemorySource source, std::unique_ptr block_codec) + : m_source(std::move(source)), m_block_codec(std::move(block_codec)) { + PISA_ASSERT_CONCEPT( + (concepts::SortedInvertedIndex) + ); + mapper::map(*this, m_source.data(), mapper::map_flags::warmup); + } + + template + void map(Visitor& visit) { + visit(m_params, "m_params")(m_size, "m_size")(m_num_docs, "m_num_docs")( + m_endpoints, "m_endpoints")(m_lists, "m_lists"); + } + + [[nodiscard]] auto operator[](std::size_t term_id) const -> BlockInvertedIndexCursor { + // check_term_range(term_id); + compact_elias_fano::enumerator endpoints(m_endpoints, 0, m_lists.size(), m_size, m_params); + auto endpoint = endpoints.move(term_id).second; + return BlockInvertedIndexCursor(m_block_codec.get(), m_lists.data() + endpoint, num_docs()); + } + + /** + * \returns The size of the index, i.e., the number of terms (posting lists). + */ + [[nodiscard]] std::size_t size() const noexcept { return m_size; } + + /** + * \returns The number of distinct documents in the index. + */ + [[nodiscard]] std::uint64_t num_docs() const noexcept { return m_num_docs; } + + void warmup(std::size_t term_id) const { + // check_term_range(term_id); + compact_elias_fano::enumerator endpoints(m_endpoints, 0, m_lists.size(), m_size, m_params); + + auto begin = endpoints.move(term_id).second; + auto end = m_lists.size(); + if (term_id + 1 != size()) { + end = endpoints.move(term_id + 1).second; + } + + volatile std::uint32_t tmp; + for (std::size_t i = begin; i != end; ++i) { + tmp = m_lists[i]; + } + (void)tmp; + } +}; + +}; // namespace pisa diff --git a/include/pisa/codec/block_codec.hpp b/include/pisa/codec/block_codec.hpp new file mode 100644 index 00000000..feee0429 --- /dev/null +++ b/include/pisa/codec/block_codec.hpp @@ -0,0 +1,59 @@ +#pragma once + +#include +#include + +#include "codec/block_codecs.hpp" +#include "util/util.hpp" + +extern "C" { +#include "simdcomp/include/simdbitpacking.h" +} + +namespace pisa { + +class BlockCodec { + public: + virtual void encode( + std::uint32_t const* in, std::uint32_t sum_of_values, std::size_t n, std::vector& out + ) const = 0; + + virtual std::uint8_t const* decode( + std::uint8_t const* in, std::uint32_t* out, std::uint32_t sum_of_values, std::size_t n + ) const = 0; + + [[nodiscard]] virtual auto block_size() const noexcept -> std::size_t = 0; +}; + +class SimdBpBlockCodec: public BlockCodec { + static constexpr std::uint64_t m_block_size = 128; + + void encode(uint32_t const* in, uint32_t sum_of_values, size_t n, std::vector& out) const { + assert(n <= m_block_size); + auto* src = const_cast(in); + if (n < m_block_size) { + interpolative_block::encode(src, sum_of_values, n, out); + return; + } + uint32_t b = maxbits(in); + thread_local std::vector buf(8 * n); + uint8_t* buf_ptr = buf.data(); + *buf_ptr++ = b; + simdpackwithoutmask(src, (__m128i*)buf_ptr, b); + out.insert(out.end(), buf.data(), buf.data() + b * sizeof(__m128i) + 1); + } + + uint8_t const* decode(uint8_t const* in, uint32_t* out, uint32_t sum_of_values, size_t n) const { + assert(n <= m_block_size); + if PISA_UNLIKELY (n < m_block_size) { + return interpolative_block::decode(in, out, sum_of_values, n); + } + uint32_t b = *in++; + simdunpack((const __m128i*)in, out, b); + return in + b * sizeof(__m128i); + } + + auto block_size() const noexcept -> std::size_t { return m_block_size; } +}; + +}; // namespace pisa diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index b60dc377..9007819e 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -10,6 +10,7 @@ target_link_libraries(app pisa CLI11) add_tool(compress_inverted_index compress_inverted_index.cpp) add_tool(create_wand_data create_wand_data.cpp) add_tool(queries queries.cpp) +add_tool(queries_dynamic queries_dynamic.cpp) add_tool(evaluate_queries evaluate_queries.cpp) add_tool(thresholds thresholds.cpp) add_tool(profile_queries profile_queries.cpp) diff --git a/tools/queries_dynamic.cpp b/tools/queries_dynamic.cpp new file mode 100644 index 00000000..282fabf4 --- /dev/null +++ b/tools/queries_dynamic.cpp @@ -0,0 +1,366 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "accumulator/lazy_accumulator.hpp" +#include "accumulator/simple_accumulator.hpp" +#include "app.hpp" +#include "block_inverted_index.hpp" +#include "codec/block_codec.hpp" +#include "cursor/block_max_scored_cursor.hpp" +#include "cursor/cursor.hpp" +#include "cursor/max_scored_cursor.hpp" +#include "cursor/scored_cursor.hpp" +#include "index_types.hpp" +#include "memory_source.hpp" +#include "query/algorithm/and_query.hpp" +#include "query/algorithm/block_max_maxscore_query.hpp" +#include "query/algorithm/block_max_ranked_and_query.hpp" +#include "query/algorithm/block_max_wand_query.hpp" +#include "query/algorithm/maxscore_query.hpp" +#include "query/algorithm/or_query.hpp" +#include "query/algorithm/ranked_and_query.hpp" +#include "query/algorithm/ranked_or_query.hpp" +#include "query/algorithm/ranked_or_taat_query.hpp" +#include "query/algorithm/wand_query.hpp" +#include "scorer/scorer.hpp" +#include "timer.hpp" +#include "topk_queue.hpp" +#include "type_alias.hpp" +#include "util/do_not_optimize_away.hpp" +#include "util/util.hpp" +#include "wand_data.hpp" +#include "wand_data_compressed.hpp" +#include "wand_data_raw.hpp" + +using namespace pisa; +using ranges::views::enumerate; + +template +void extract_times( + Fn fn, + std::vector const& queries, + std::vector const& thresholds, + std::string const& index_type, + std::string const& query_type, + size_t runs, + std::ostream& os +) { + std::vector times(runs); + for (auto&& [qid, query]: enumerate(queries)) { + do_not_optimize_away(fn(query, thresholds[qid])); + std::generate(times.begin(), times.end(), [&fn, &q = query, &t = thresholds[qid]]() { + return run_with_timer( + [&]() { do_not_optimize_away(fn(q, t)); } + ).count(); + }); + auto mean = std::accumulate(times.begin(), times.end(), std::size_t{0}, std::plus<>()) / runs; + os << fmt::format("{}\t{}\n", query.id().value_or(std::to_string(qid)), mean); + } +} + +template +void op_perftest( + Functor query_func, + std::vector const& queries, + std::vector const& thresholds, + std::string const& index_type, + std::string const& query_type, + size_t runs, + std::uint64_t k, + bool safe +) { + std::vector query_times; + std::size_t num_reruns = 0; + spdlog::info("Safe: {}", safe); + + for (size_t run = 0; run <= runs; ++run) { + size_t idx = 0; + for (auto const& query: queries) { + auto usecs = run_with_timer([&]() { + uint64_t result = query_func(query, thresholds[idx]); + if (safe && result < k) { + num_reruns += 1; + result = query_func(query, 0); + } + do_not_optimize_away(result); + }); + if (run != 0) { // first run is not timed + query_times.push_back(usecs.count()); + } + idx += 1; + } + } + + if (false) { + for (auto t: query_times) { + std::cout << (t / 1000) << std::endl; + } + } else { + std::sort(query_times.begin(), query_times.end()); + double avg = + std::accumulate(query_times.begin(), query_times.end(), double()) / query_times.size(); + double q50 = query_times[query_times.size() / 2]; + double q90 = query_times[90 * query_times.size() / 100]; + double q95 = query_times[95 * query_times.size() / 100]; + double q99 = query_times[99 * query_times.size() / 100]; + + spdlog::info("---- {} {}", index_type, query_type); + spdlog::info("Mean: {}", avg); + spdlog::info("50% quantile: {}", q50); + spdlog::info("90% quantile: {}", q90); + spdlog::info("95% quantile: {}", q95); + spdlog::info("99% quantile: {}", q99); + spdlog::info("Num. reruns: {}", num_reruns); + + stats_line()("type", index_type)("query", query_type)("avg", avg)("q50", q50)("q90", q90)( + "q95", q95)("q99", q99); + } +} + +template +void perftest( + const std::string& index_filename, + const std::optional& wand_data_filename, + const std::vector& queries, + const std::optional& thresholds_filename, + std::string const& type, + std::string const& query_type, + uint64_t k, + const ScorerParams& scorer_params, + const bool weighted, + bool extract, + bool safe +) { + spdlog::info("Loading index from {}", index_filename); + BlockInvertedIndex index( + MemorySource::mapped_file(index_filename), std::make_unique() + ); + + spdlog::info("Warming up posting lists"); + std::unordered_set warmed_up; + for (auto const& q: queries) { + for (auto [t, _]: q.terms()) { + if (!warmed_up.count(t)) { + index.warmup(t); + warmed_up.insert(t); + } + } + } + + WandType const wdata = [&] { + if (wand_data_filename) { + return WandType(MemorySource::mapped_file(*wand_data_filename)); + } + return WandType{}; + }(); + + std::vector thresholds(queries.size(), 0.0); + if (thresholds_filename) { + std::string t; + std::ifstream tin(*thresholds_filename); + size_t idx = 0; + while (std::getline(tin, t)) { + thresholds[idx] = std::stof(t); + idx += 1; + } + if (idx != queries.size()) { + throw std::invalid_argument("Invalid thresholds file."); + } + } + + auto scorer = scorer::from_params(scorer_params, wdata); + + spdlog::info("Performing {} queries", type); + spdlog::info("K: {}", k); + + std::vector query_types; + boost::algorithm::split(query_types, query_type, boost::is_any_of(":")); + + for (auto&& t: query_types) { + spdlog::info("Query type: {}", t); + std::function query_fun; + if (t == "and") { + query_fun = [&](Query query, Score) { + and_query and_q; + return and_q(make_cursors(index, query), index.num_docs()).size(); + }; + } else if (t == "or") { + query_fun = [&](Query query, Score) { + or_query or_q; + return or_q(make_cursors(index, query), index.num_docs()); + }; + } else if (t == "or_freq") { + query_fun = [&](Query query, Score) { + or_query or_q; + return or_q(make_cursors(index, query), index.num_docs()); + }; + } else if (t == "wand" && wand_data_filename) { + query_fun = [&](Query query, Score threshold) { + topk_queue topk(k, threshold); + wand_query wand_q(topk); + wand_q( + make_max_scored_cursors(index, wdata, *scorer, query, weighted), index.num_docs() + ); + topk.finalize(); + return topk.topk().size(); + }; + } else if (t == "block_max_wand" && wand_data_filename) { + query_fun = [&](Query query, Score threshold) { + topk_queue topk(k, threshold); + block_max_wand_query block_max_wand_q(topk); + block_max_wand_q( + make_block_max_scored_cursors(index, wdata, *scorer, query, weighted), + index.num_docs() + ); + topk.finalize(); + return topk.topk().size(); + }; + } else if (t == "block_max_maxscore" && wand_data_filename) { + query_fun = [&](Query query, Score threshold) { + topk_queue topk(k, threshold); + block_max_maxscore_query block_max_maxscore_q(topk); + block_max_maxscore_q( + make_block_max_scored_cursors(index, wdata, *scorer, query, weighted), + index.num_docs() + ); + topk.finalize(); + return topk.topk().size(); + }; + } else if (t == "ranked_and" && wand_data_filename) { + query_fun = [&](Query query, Score threshold) { + topk_queue topk(k, threshold); + ranked_and_query ranked_and_q(topk); + ranked_and_q(make_scored_cursors(index, *scorer, query, weighted), index.num_docs()); + topk.finalize(); + return topk.topk().size(); + }; + } else if (t == "block_max_ranked_and" && wand_data_filename) { + query_fun = [&](Query query, Score threshold) { + topk_queue topk(k, threshold); + block_max_ranked_and_query block_max_ranked_and_q(topk); + block_max_ranked_and_q( + make_block_max_scored_cursors(index, wdata, *scorer, query, weighted), + index.num_docs() + ); + topk.finalize(); + return topk.topk().size(); + }; + } else if (t == "ranked_or" && wand_data_filename) { + query_fun = [&](Query query, Score threshold) { + topk_queue topk(k, threshold); + ranked_or_query ranked_or_q(topk); + ranked_or_q(make_scored_cursors(index, *scorer, query, weighted), index.num_docs()); + topk.finalize(); + return topk.topk().size(); + }; + } else if (t == "maxscore" && wand_data_filename) { + query_fun = [&](Query query, Score threshold) { + topk_queue topk(k, threshold); + maxscore_query maxscore_q(topk); + maxscore_q( + make_max_scored_cursors(index, wdata, *scorer, query, weighted), index.num_docs() + ); + topk.finalize(); + return topk.topk().size(); + }; + } else if (t == "ranked_or_taat" && wand_data_filename) { + SimpleAccumulator accumulator(index.num_docs()); + topk_queue topk(k); + query_fun = [&, topk, accumulator](Query query, Score threshold) mutable { + ranked_or_taat_query ranked_or_taat_q(topk); + topk.clear(threshold); + ranked_or_taat_q( + make_scored_cursors(index, *scorer, query, weighted), index.num_docs(), accumulator + ); + topk.finalize(); + return topk.topk().size(); + }; + } else if (t == "ranked_or_taat_lazy" && wand_data_filename) { + LazyAccumulator<4> accumulator(index.num_docs()); + topk_queue topk(k); + query_fun = [&, topk, accumulator](Query query, Score threshold) mutable { + ranked_or_taat_query ranked_or_taat_q(topk); + topk.clear(threshold); + ranked_or_taat_q( + make_scored_cursors(index, *scorer, query, weighted), index.num_docs(), accumulator + ); + topk.finalize(); + return topk.topk().size(); + }; + } else { + spdlog::error("Unsupported query type: {}", t); + break; + } + if (extract) { + extract_times(query_fun, queries, thresholds, type, t, 2, std::cout); + } else { + op_perftest(query_fun, queries, thresholds, type, t, 2, k, safe); + } + } +} + +using wand_raw_index = wand_data; +using wand_uniform_index = wand_data>; +using wand_uniform_index_quantized = wand_data>; + +int main(int argc, const char** argv) { + bool extract = false; + bool safe = false; + bool quantized = false; + + App, + arg::Query, + arg::Algorithm, + arg::Scorer, + arg::Thresholds, + arg::LogLevel> + app{"Benchmarks queries on a given index."}; + app.add_flag("--quantized", quantized, "Quantized scores"); + app.add_flag("--extract", extract, "Extract individual query times"); + app.add_flag("--safe", safe, "Rerun if not enough results with pruning.") + ->needs(app.thresholds_option()); + CLI11_PARSE(app, argc, argv); + + spdlog::set_default_logger(spdlog::stderr_color_mt("stderr")); + spdlog::set_level(app.log_level()); + if (extract) { + std::cout << "qid\tusec\n"; + } + + auto params = std::make_tuple( + app.index_filename(), + app.wand_data_path(), + app.queries(), + app.thresholds_file(), + app.index_encoding(), + app.algorithm(), + app.k(), + app.scorer_params(), + app.weighted(), + extract, + safe + ); + + /**/ + if (app.is_wand_compressed()) { + if (quantized) { + std::apply(perftest, params); + } else { + std::apply(perftest, params); + } + } else { + std::apply(perftest, params); + } +}