From df9df6ba785d9cda7f6785ea333fb02f8bef1e5a Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 15 Dec 2023 09:26:35 +0000 Subject: [PATCH 01/20] feat(inverted_index.create): add read/write for external intermediate files Signed-off-by: Zhenchi --- Cargo.lock | 162 ++++++++++------- src/index/Cargo.toml | 2 + src/index/src/inverted_index.rs | 1 + src/index/src/inverted_index/create.rs | 15 ++ src/index/src/inverted_index/create/sort.rs | 24 +++ .../create/sort/intermediate_rw.rs | 141 +++++++++++++++ .../create/sort/intermediate_rw/codec_v1.rs | 165 ++++++++++++++++++ src/index/src/inverted_index/error.rs | 12 ++ src/index/src/lib.rs | 2 + 9 files changed, 463 insertions(+), 61 deletions(-) create mode 100644 src/index/src/inverted_index/create.rs create mode 100644 src/index/src/inverted_index/create/sort.rs create mode 100644 src/index/src/inverted_index/create/sort/intermediate_rw.rs create mode 100644 src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs diff --git a/Cargo.lock b/Cargo.lock index ffb2589a0dde..d8f8ebce2501 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -70,7 +70,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "befdff0b4683a0824fc8719ce639a252d9d62cd89c8d0004c39e2417128c1eb8" dependencies = [ "axum", - "bytes", + "bytes 1.5.0", "cfg-if 1.0.0", "http", "indexmap 1.9.3", @@ -326,7 +326,7 @@ version = "47.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fda119225204141138cb0541c692fbfef0e875ba01bfdeaed09e9d354f9d6195" dependencies = [ - "bytes", + "bytes 1.5.0", "half 2.3.1", "num", ] @@ -392,7 +392,7 @@ dependencies = [ "arrow-ipc", "arrow-schema", "base64 0.21.5", - "bytes", + "bytes 1.5.0", "futures", "paste", "prost 0.12.2", @@ -707,7 +707,7 @@ dependencies = [ "async-trait", "axum-core", "bitflags 1.3.2", - "bytes", + "bytes 1.5.0", "futures-util", "headers", "http", @@ -738,7 +738,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" dependencies = [ "async-trait", - "bytes", + "bytes 1.5.0", "futures-util", "http", "http-body", @@ -766,7 +766,7 @@ version = "0.1.1" source = "git+https://github.com/sunng87/axum-test-helper.git?branch=patch-1#5aa7843ce2250144ea1b7f589f274c00cf1af4ab" dependencies = [ "axum", - "bytes", + "bytes 1.5.0", "http", "http-body", "hyper", @@ -785,7 +785,7 @@ checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9" dependencies = [ "fastrand 1.9.0", "futures-core", - "pin-project", + "pin-project 1.1.3", "tokio", ] @@ -828,7 +828,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf16bec990f8ea25cab661199904ef452fcf11f565c404ce6cffbdf3f8cbbc47" dependencies = [ - "bytes", + "bytes 1.5.0", "smallvec", ] @@ -1081,6 +1081,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" +[[package]] +name = "bytes" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" + [[package]] name = "bytes" version = "1.5.0" @@ -1583,7 +1589,7 @@ version = "0.4.4" dependencies = [ "anymap", "bitvec", - "bytes", + "bytes 1.5.0", "common-error", "common-macro", "paste", @@ -1620,7 +1626,7 @@ dependencies = [ "arrow-schema", "async-compression 0.3.15", "async-trait", - "bytes", + "bytes 1.5.0", "common-error", "common-macro", "common-runtime", @@ -1794,7 +1800,7 @@ dependencies = [ "async-stream", "async-trait", "base64 0.21.5", - "bytes", + "bytes 1.5.0", "chrono", "common-catalog", "common-error", @@ -2390,7 +2396,7 @@ dependencies = [ "arrow-schema", "async-compression 0.4.5", "async-trait", - "bytes", + "bytes 1.5.0", "bzip2", "chrono", "dashmap", @@ -2599,7 +2605,7 @@ dependencies = [ "axum", "axum-macros", "axum-test-helper", - "bytes", + "bytes 1.5.0", "catalog", "client", "common-base", @@ -2635,7 +2641,7 @@ dependencies = [ "meta-client", "mito2", "object-store", - "pin-project", + "pin-project 1.1.3", "prometheus", "prost 0.12.2", "query", @@ -3507,6 +3513,18 @@ dependencies = [ "slab", ] +[[package]] +name = "futures_codec" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce54d63f8b0c75023ed920d46fd71d0cbbb830b0ee012726b5b4f506fb6dea5b" +dependencies = [ + "bytes 0.5.6", + "futures", + "memchr", + "pin-project 0.4.30", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -3606,7 +3624,7 @@ version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" dependencies = [ - "bytes", + "bytes 1.5.0", "fnv", "futures-core", "futures-sink", @@ -3693,7 +3711,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" dependencies = [ "base64 0.21.5", - "bytes", + "bytes 1.5.0", "headers-core", "http", "httpdate", @@ -3790,7 +3808,7 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" dependencies = [ - "bytes", + "bytes 1.5.0", "fnv", "itoa", ] @@ -3801,7 +3819,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ - "bytes", + "bytes 1.5.0", "http", "pin-project-lite", ] @@ -3846,7 +3864,7 @@ version = "0.14.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" dependencies = [ - "bytes", + "bytes 1.5.0", "futures-channel", "futures-core", "futures-util", @@ -3960,11 +3978,13 @@ version = "0.4.4" dependencies = [ "async-trait", "bytemuck", + "bytes 0.5.6", "common-base", "common-error", "common-macro", "fst", "futures", + "futures_codec", "greptime-proto", "mockall", "prost 0.12.2", @@ -4039,7 +4059,7 @@ name = "influxdb_line_protocol" version = "0.1.0" source = "git+https://github.com/evenyag/influxdb_iox?branch=feat/line-protocol#10ef0d0b02705ac7518717390939fa3a9bcfcacc" dependencies = [ - "bytes", + "bytes 1.5.0", "nom", "smallvec", "snafu", @@ -4420,7 +4440,7 @@ dependencies = [ "async-stream", "async-trait", "byteorder", - "bytes", + "bytes 1.5.0", "common-base", "common-config", "common-error", @@ -4633,7 +4653,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "376101dbd964fc502d5902216e180f92b3d003b5cc3d2e40e044eb5470fca677" dependencies = [ - "bytes", + "bytes 1.5.0", "serde", "thiserror", ] @@ -4874,7 +4894,7 @@ dependencies = [ "async-compat", "async-stream", "async-trait", - "bytes", + "bytes 1.5.0", "chrono", "common-base", "common-catalog", @@ -5012,7 +5032,7 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6750b17ce50f8f112ef1a8394121090d47c596b56a6a17569ca680a9626e2ef2" dependencies = [ - "bytes", + "bytes 1.5.0", "crossbeam", "flate2", "futures-core", @@ -5026,7 +5046,7 @@ dependencies = [ "once_cell", "pem 3.0.2", "percent-encoding", - "pin-project", + "pin-project 1.1.3", "rand", "rustls 0.21.9", "rustls-pemfile 1.0.4", @@ -5056,7 +5076,7 @@ dependencies = [ "bitvec", "btoi", "byteorder", - "bytes", + "bytes 1.5.0", "cc", "chrono", "cmake", @@ -5360,7 +5380,7 @@ version = "0.4.4" dependencies = [ "anyhow", "async-trait", - "bytes", + "bytes 1.5.0", "common-error", "common-macro", "common-runtime", @@ -5384,7 +5404,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f930c88a43b1c3f6e776dfe495b4afab89882dbc81530c632db2ed65451ebcb4" dependencies = [ "async-trait", - "bytes", + "bytes 1.5.0", "chrono", "futures", "humantime", @@ -5421,7 +5441,7 @@ dependencies = [ "async-trait", "backon", "base64 0.21.5", - "bytes", + "bytes 1.5.0", "chrono", "flagset", "futures", @@ -5432,7 +5452,7 @@ dependencies = [ "once_cell", "parking_lot 0.12.1", "percent-encoding", - "pin-project", + "pin-project 1.1.3", "quick-xml 0.29.0", "reqsign", "reqwest", @@ -5666,7 +5686,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "900310981898f6e3877286f1272b75f5c4a604628594a0a7026311b93a2aa5e6" dependencies = [ "arrow", - "bytes", + "bytes 1.5.0", "chrono", "fallible-streaming-iterator", "flate2", @@ -5838,7 +5858,7 @@ dependencies = [ "arrow-select", "base64 0.21.5", "brotli", - "bytes", + "bytes 1.5.0", "chrono", "flate2", "futures", @@ -6016,7 +6036,7 @@ checksum = "7f7f181d085a224ff2b2ea46bd2066b487b87e83dabbcdfe60bf3f027f5d0593" dependencies = [ "async-trait", "base64 0.21.5", - "bytes", + "bytes 1.5.0", "chrono", "derive-new 0.6.0", "futures", @@ -6084,13 +6104,33 @@ dependencies = [ "uncased", ] +[[package]] +name = "pin-project" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ef0f924a5ee7ea9cbcea77529dba45f8a9ba9f622419fe3386ca581a3ae9d5a" +dependencies = [ + "pin-project-internal 0.4.30", +] + [[package]] name = "pin-project" version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" dependencies = [ - "pin-project-internal", + "pin-project-internal 1.1.3", +] + +[[package]] +name = "pin-project-internal" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "851c8d0ce9bebe43790dedfc86614c23494ac9f423dd618d3a61fc693eafe61e" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", ] [[package]] @@ -6230,7 +6270,7 @@ checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" dependencies = [ "base64 0.21.5", "byteorder", - "bytes", + "bytes 1.5.0", "fallible-iterator", "hmac", "md-5", @@ -6247,7 +6287,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d2234cdee9408b523530a9b6d2d6b373d1db34f6a8e51dc03ded1828d7fb67c" dependencies = [ "array-init", - "bytes", + "bytes 1.5.0", "chrono", "fallible-iterator", "postgres-protocol", @@ -6498,7 +6538,7 @@ version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ - "bytes", + "bytes 1.5.0", "prost-derive 0.11.9", ] @@ -6508,7 +6548,7 @@ version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5a410fc7882af66deb8d01d01737353cf3ad6204c408177ba494291a626312" dependencies = [ - "bytes", + "bytes 1.5.0", "prost-derive 0.12.2", ] @@ -6518,7 +6558,7 @@ version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" dependencies = [ - "bytes", + "bytes 1.5.0", "heck", "itertools 0.10.5", "lazy_static", @@ -6540,7 +6580,7 @@ version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fa3d084c8704911bfefb2771be2f9b6c5c0da7343a71e0021ee3c665cada738" dependencies = [ - "bytes", + "bytes 1.5.0", "heck", "itertools 0.11.0", "log", @@ -6606,7 +6646,7 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" dependencies = [ - "bytes", + "bytes 1.5.0", ] [[package]] @@ -6670,7 +6710,7 @@ dependencies = [ "common-macro", "derive_builder 0.12.0", "futures", - "pin-project", + "pin-project 1.1.3", "serde", "serde_json", "snafu", @@ -7147,7 +7187,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ "base64 0.21.5", - "bytes", + "bytes 1.5.0", "encoding_rs", "futures-core", "futures-util", @@ -7462,7 +7502,7 @@ checksum = "06676aec5ccb8fc1da723cc8c0f9a46549f21ebb8753d3915c6c41db1e7f1dc4" dependencies = [ "arrayvec", "borsh", - "bytes", + "bytes 1.5.0", "num-traits", "rand", "rkyv", @@ -8346,7 +8386,7 @@ dependencies = [ "axum-test-helper", "base64 0.21.5", "build-data", - "bytes", + "bytes 1.5.0", "catalog", "chrono", "client", @@ -8388,7 +8428,7 @@ dependencies = [ "opentelemetry-proto 0.3.0", "parking_lot 0.12.1", "pgwire", - "pin-project", + "pin-project 1.1.3", "postgres-types", "pprof", "prometheus", @@ -8823,7 +8863,7 @@ dependencies = [ "base64 0.13.1", "bitflags 1.3.2", "byteorder", - "bytes", + "bytes 1.5.0", "chrono", "crc", "crossbeam-queue", @@ -8954,7 +8994,7 @@ dependencies = [ "aquamarine", "async-stream", "async-trait", - "bytes", + "bytes 1.5.0", "common-base", "common-error", "common-macro", @@ -9091,7 +9131,7 @@ version = "0.4.4" dependencies = [ "async-recursion", "async-trait", - "bytes", + "bytes 1.5.0", "catalog", "common-catalog", "common-error", @@ -9593,7 +9633,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" dependencies = [ "backtrace", - "bytes", + "bytes 1.5.0", "libc", "mio", "num_cpus", @@ -9635,7 +9675,7 @@ checksum = "d340244b32d920260ae7448cb72b6e238bddc3d4f7603394e7dd46ed8e48f5b8" dependencies = [ "async-trait", "byteorder", - "bytes", + "bytes 1.5.0", "fallible-iterator", "futures-channel", "futures-util", @@ -9717,7 +9757,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89b3cbabd3ae862100094ae433e1def582cf86451b4e9bf83aa7ac1d8a7d719" dependencies = [ "async-stream", - "bytes", + "bytes 1.5.0", "futures-core", "tokio", "tokio-stream", @@ -9729,7 +9769,7 @@ version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ - "bytes", + "bytes 1.5.0", "futures-core", "futures-io", "futures-sink", @@ -9801,7 +9841,7 @@ dependencies = [ "async-trait", "axum", "base64 0.21.5", - "bytes", + "bytes 1.5.0", "futures-core", "futures-util", "h2", @@ -9810,7 +9850,7 @@ dependencies = [ "hyper", "hyper-timeout", "percent-encoding", - "pin-project", + "pin-project 1.1.3", "prost 0.11.9", "tokio", "tokio-stream", @@ -9830,14 +9870,14 @@ dependencies = [ "async-trait", "axum", "base64 0.21.5", - "bytes", + "bytes 1.5.0", "h2", "http", "http-body", "hyper", "hyper-timeout", "percent-encoding", - "pin-project", + "pin-project 1.1.3", "prost 0.12.2", "rustls 0.21.9", "rustls-pemfile 1.0.4", @@ -9899,7 +9939,7 @@ dependencies = [ "futures-util", "hdrhistogram", "indexmap 1.9.3", - "pin-project", + "pin-project 1.1.3", "pin-project-lite", "rand", "slab", @@ -9919,7 +9959,7 @@ dependencies = [ "async-compression 0.3.15", "base64 0.13.1", "bitflags 1.3.2", - "bytes", + "bytes 1.5.0", "futures-core", "futures-util", "http", @@ -10005,7 +10045,7 @@ checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ "futures", "futures-task", - "pin-project", + "pin-project 1.1.3", "tracing", ] @@ -10960,7 +11000,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66534846dec7a11d7c50a74b7cdb208b9a581cad890b7866430d438455847c85" dependencies = [ "bcder", - "bytes", + "bytes 1.5.0", "chrono", "der 0.7.8", "hex", diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index 800420164ac5..5ac20bcd9465 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -7,11 +7,13 @@ license.workspace = true [dependencies] async-trait.workspace = true bytemuck.workspace = true +bytes = "0.5.6" common-base.workspace = true common-error.workspace = true common-macro.workspace = true fst.workspace = true futures.workspace = true +futures_codec = "0.4.1" greptime-proto.workspace = true mockall.workspace = true prost.workspace = true diff --git a/src/index/src/inverted_index.rs b/src/index/src/inverted_index.rs index 96db32a0cb4f..a793d1a25238 100644 --- a/src/index/src/inverted_index.rs +++ b/src/index/src/inverted_index.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod create; pub mod error; pub mod format; pub mod search; diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs new file mode 100644 index 000000000000..a57c2c36b7e6 --- /dev/null +++ b/src/index/src/inverted_index/create.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod sort; diff --git a/src/index/src/inverted_index/create/sort.rs b/src/index/src/inverted_index/create/sort.rs new file mode 100644 index 000000000000..2331ed8dcb81 --- /dev/null +++ b/src/index/src/inverted_index/create/sort.rs @@ -0,0 +1,24 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::BitVec; +use futures::Stream; + +use crate::inverted_index::error::Result; +use crate::inverted_index::Bytes; + +mod intermediate_rw; + +/// A stream of sorted values along with their associated bitmap +pub type SortedStream = Box> + Send + Unpin>; diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw.rs b/src/index/src/inverted_index/create/sort/intermediate_rw.rs new file mode 100644 index 000000000000..bd800e4add84 --- /dev/null +++ b/src/index/src/inverted_index/create/sort/intermediate_rw.rs @@ -0,0 +1,141 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod codec_v1; + +use std::collections::BTreeMap; + +use common_base::BitVec; +use futures::{stream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt}; +use futures_codec::{FramedRead, FramedWrite}; +use snafu::ResultExt; + +use crate::inverted_index::create::sort::SortedStream; +use crate::inverted_index::error::{ + CloseSnafu, FlushSnafu, ReadSnafu, Result, UnknownIntermediateCodecMagicSnafu, WriteSnafu, +}; +use crate::inverted_index::Bytes; + +/// `IntermediateWriter` serializes and writes intermediate data to the wrapped `writer` +pub struct IntermediateWriter { + writer: W, +} + +impl IntermediateWriter { + /// Creates a new `IntermediateWriter` wrapping an `AsyncWrite` + pub fn new(writer: W) -> IntermediateWriter { + IntermediateWriter { writer } + } + + /// Serializes and writes all provided values to the wrapped writer + pub async fn write_all(mut self, values: BTreeMap) -> Result<()> { + let (codec_magic, encoder) = (codec_v1::MAGIC_CODEC_V1, codec_v1::IntermediateCodecV1); + + self.writer + .write_all(codec_magic) + .await + .context(WriteSnafu)?; + + let value_stream = stream::iter(values.into_iter().map(Ok)); + let frame_write = FramedWrite::new(&mut self.writer, encoder); + value_stream.forward(frame_write).await?; + + self.writer.flush().await.context(FlushSnafu)?; + self.writer.close().await.context(CloseSnafu) + } +} + +/// Reads intermediate serialized data from an `AsyncRead` source and converts it to a [`SortedStream`] +pub struct IntermediaReader { + reader: R, +} + +impl IntermediaReader { + pub fn new(reader: R) -> IntermediaReader { + IntermediaReader { reader } + } + + /// Reads the magic header, determines the codec, and returns a stream of deserialized values. + pub async fn into_stream(mut self) -> Result { + let mut magic = [0u8; 4]; + self.reader + .read_exact(&mut magic) + .await + .context(ReadSnafu)?; + + let decoder = match &magic { + codec_v1::MAGIC_CODEC_V1 => codec_v1::IntermediateCodecV1, + _ => return UnknownIntermediateCodecMagicSnafu { magic }.fail(), + }; + + Ok(Box::new(FramedRead::new(self.reader, decoder))) + } +} + +#[cfg(test)] +mod tests { + use futures::io::Cursor; + + use super::*; + use crate::inverted_index::error::Error; + + #[tokio::test] + async fn test_intermedia_read_write_basic() { + let mut buf = vec![]; + + let values = BTreeMap::from_iter([ + (Bytes::from("a"), BitVec::from_slice(&[0b10101010])), + (Bytes::from("b"), BitVec::from_slice(&[0b01010101])), + ]); + + let writer = IntermediateWriter::new(&mut buf); + writer.write_all(values.clone()).await.unwrap(); + + let reader = IntermediaReader::new(Cursor::new(buf)); + let mut stream = reader.into_stream().await.unwrap(); + + let a = stream.next().await.unwrap().unwrap(); + assert_eq!(a, (Bytes::from("a"), BitVec::from_slice(&[0b10101010]))); + let b = stream.next().await.unwrap().unwrap(); + assert_eq!(b, (Bytes::from("b"), BitVec::from_slice(&[0b01010101]))); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn test_intermedia_read_write_empty() { + let mut buf = vec![]; + + let values = BTreeMap::new(); + + let writer = IntermediateWriter::new(&mut buf); + writer.write_all(values.clone()).await.unwrap(); + + let reader = IntermediaReader::new(Cursor::new(buf)); + let mut stream = reader.into_stream().await.unwrap(); + + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn test_intermedia_read_with_invalid_magic() { + let buf = b"invalid".to_vec(); + + let reader = IntermediaReader::new(Cursor::new(buf)); + let result = reader.into_stream().await; + assert!(matches!( + result, + Err(Error::UnknownIntermediateCodecMagic { .. }) + )) + } +} diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs new file mode 100644 index 000000000000..1e0894451ab2 --- /dev/null +++ b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs @@ -0,0 +1,165 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; + +use bytes::{Buf, BufMut}; +use common_base::BitVec; +use futures_codec::{BytesMut, Decoder, Encoder}; +use snafu::{location, Location}; + +use crate::inverted_index::error::{Error, Result}; +use crate::inverted_index::Bytes; + +const U64_LENGTH: usize = std::mem::size_of::(); + +/// Magic number for this intermediate codec version +pub const MAGIC_CODEC_V1: &[u8; 4] = b"im01"; + +/// Codec for serializing and deserializing intermediate data for external sorting. +/// +/// Binary format serialization. The item is laid out as follows: +/// ```text +/// [value len][value][bitmap len][bitmap] +/// [8] [?] [8] [?] +/// ``` +pub struct IntermediateCodecV1; + +/// [`FramedWrite`] requires the [`Encoder`] trait to be implemented. +impl Encoder for IntermediateCodecV1 { + type Item = (Bytes, BitVec); + type Error = Error; + + fn encode(&mut self, item: (Bytes, BitVec), dst: &mut BytesMut) -> Result<()> { + let value_bytes = item.0; + let bitmap_bytes = item.1.into_vec(); + + dst.reserve(U64_LENGTH * 2 + value_bytes.len() + bitmap_bytes.len()); + dst.put_u64_le(value_bytes.len() as u64); + dst.extend_from_slice(&value_bytes); + dst.put_u64_le(bitmap_bytes.len() as u64); + dst.extend_from_slice(&bitmap_bytes); + Ok(()) + } +} + +/// [`FramedRead`] requires the [`Decoder`] trait to be implemented. +impl Decoder for IntermediateCodecV1 { + type Item = (Bytes, BitVec); + type Error = Error; + + /// Decodes the buffer into a tuple of bytes and a bitmap. Returns `None` if + /// the buffer does not contain enough data for a complete item. + /// + /// After successful decoding, the buffer is advanced. + fn decode(&mut self, src: &mut BytesMut) -> Result> { + // [value len][value][bitmap len][bitmap] + // [8] [?] [8] [?] + + // decode value len + if src.len() < U64_LENGTH { + return Ok(None); + } + let (value_len, buf) = src.split_at(U64_LENGTH); + let value_len = u64::from_le_bytes(value_len.try_into().unwrap()) as usize; + + // decode value + if buf.len() < value_len { + return Ok(None); + } + let (value_bytes, buf) = buf.split_at(value_len); + + // decode bitmap len + if buf.len() < U64_LENGTH { + return Ok(None); + } + let (bitmap_len, buf) = buf.split_at(U64_LENGTH); + let bitmap_len = u64::from_le_bytes(bitmap_len.try_into().unwrap()) as usize; + + // decode bitmap + if buf.len() < bitmap_len { + return Ok(None); + } + let bitmap_bytes = &buf[..bitmap_len]; + + let item = (value_bytes.to_vec(), BitVec::from_slice(bitmap_bytes)); + + src.advance(U64_LENGTH * 2 + value_len + bitmap_len); + Ok(Some(item)) + } +} + +/// Required for [`Encoder`] and [`Decoder`] implementations. +impl From for Error { + fn from(error: io::Error) -> Self { + Error::CommonIoError { + error, + location: location!(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_intermediate_codec_basic() { + let mut codec = IntermediateCodecV1; + let mut buf = BytesMut::new(); + + let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010])); + codec.encode(item.clone(), &mut buf).unwrap(); + assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item); + assert_eq!(codec.decode(&mut buf).unwrap(), None); + + let item1 = (b"world".to_vec(), BitVec::from_slice(&[0b01010101])); + codec.encode(item.clone(), &mut buf).unwrap(); + codec.encode(item1.clone(), &mut buf).unwrap(); + assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item); + assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item1); + assert_eq!(codec.decode(&mut buf).unwrap(), None); + assert!(buf.is_empty()); + } + + #[test] + fn test_intermediate_codec_empty_item() { + let mut codec = IntermediateCodecV1; + let mut buf = BytesMut::new(); + + let item = (b"".to_vec(), BitVec::from_slice(&[])); + codec.encode(item.clone(), &mut buf).unwrap(); + assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item); + assert_eq!(codec.decode(&mut buf).unwrap(), None); + assert!(buf.is_empty()); + } + + #[test] + fn test_intermediate_codec_parital() { + let mut codec = IntermediateCodecV1; + let mut buf = BytesMut::new(); + + let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010])); + codec.encode(item.clone(), &mut buf).unwrap(); + + let partial_length = U64_LENGTH + 3; + let mut partial_bytes = buf.split_to(partial_length); + + assert_eq!(codec.decode(&mut partial_bytes).unwrap(), None); // not enough data + partial_bytes.extend_from_slice(&buf[..]); + assert_eq!(codec.decode(&mut partial_bytes).unwrap().unwrap(), item); + assert_eq!(codec.decode(&mut partial_bytes).unwrap(), None); + assert!(partial_bytes.is_empty()); + } +} diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index 44fd77c413de..afb8ae12838b 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -150,6 +150,16 @@ pub enum Error { error: fst::Error, location: Location, }, + + #[snafu(display("Failed to perform IO operation"))] + CommonIoError { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display("Unknown intermediate codec magic: {magic:?}"))] + UnknownIntermediateCodecMagic { magic: [u8; 4], location: Location }, } impl ErrorExt for Error { @@ -168,6 +178,8 @@ impl ErrorExt for Error { | DecodeProto { .. } | DecodeFst { .. } | KeysApplierUnexpectedPredicates { .. } + | CommonIoError { .. } + | UnknownIntermediateCodecMagic { .. } | FstCompile { .. } => StatusCode::Unexpected, ParseRegex { .. } diff --git a/src/index/src/lib.rs b/src/index/src/lib.rs index e7f448c398ef..296efb315d09 100644 --- a/src/index/src/lib.rs +++ b/src/index/src/lib.rs @@ -13,5 +13,7 @@ // limitations under the License. #![feature(iter_partition_in_place)] +// TODO(zhongzc): remove once further code is added +#![allow(dead_code)] pub mod inverted_index; From 96b10cc140b4f935724516360927ffc3f9263f86 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 15 Dec 2023 09:43:27 +0000 Subject: [PATCH 02/20] chore: MAGIC_CODEC_V1 -> CODEC_V1_MAGIC Signed-off-by: Zhenchi --- src/index/src/inverted_index/create/sort/intermediate_rw.rs | 4 ++-- .../inverted_index/create/sort/intermediate_rw/codec_v1.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw.rs b/src/index/src/inverted_index/create/sort/intermediate_rw.rs index bd800e4add84..3074586202a3 100644 --- a/src/index/src/inverted_index/create/sort/intermediate_rw.rs +++ b/src/index/src/inverted_index/create/sort/intermediate_rw.rs @@ -40,7 +40,7 @@ impl IntermediateWriter { /// Serializes and writes all provided values to the wrapped writer pub async fn write_all(mut self, values: BTreeMap) -> Result<()> { - let (codec_magic, encoder) = (codec_v1::MAGIC_CODEC_V1, codec_v1::IntermediateCodecV1); + let (codec_magic, encoder) = (codec_v1::CODEC_V1_MAGIC, codec_v1::IntermediateCodecV1); self.writer .write_all(codec_magic) @@ -75,7 +75,7 @@ impl IntermediaReader { .context(ReadSnafu)?; let decoder = match &magic { - codec_v1::MAGIC_CODEC_V1 => codec_v1::IntermediateCodecV1, + codec_v1::CODEC_V1_MAGIC => codec_v1::IntermediateCodecV1, _ => return UnknownIntermediateCodecMagicSnafu { magic }.fail(), }; diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs index 1e0894451ab2..32c2bd25f88b 100644 --- a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs +++ b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs @@ -24,8 +24,8 @@ use crate::inverted_index::Bytes; const U64_LENGTH: usize = std::mem::size_of::(); -/// Magic number for this intermediate codec version -pub const MAGIC_CODEC_V1: &[u8; 4] = b"im01"; +/// Magic bytes for this intermediate codec version +pub const CODEC_V1_MAGIC: &[u8; 4] = b"im01"; /// Codec for serializing and deserializing intermediate data for external sorting. /// From 629ce26476080d1dd51f9a1b1cb3fa4d6a266475 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 15 Dec 2023 15:06:45 +0000 Subject: [PATCH 03/20] chore: polish comments Signed-off-by: Zhenchi --- .../inverted_index/create/sort/intermediate_rw/codec_v1.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs index 32c2bd25f88b..d343be46e0c0 100644 --- a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs +++ b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs @@ -59,10 +59,11 @@ impl Decoder for IntermediateCodecV1 { type Item = (Bytes, BitVec); type Error = Error; - /// Decodes the buffer into a tuple of bytes and a bitmap. Returns `None` if - /// the buffer does not contain enough data for a complete item. + /// Decodes the `src` into `(Bytes, BitVec)`. Returns `None` if + /// the `src` does not contain enough data for a complete item. /// - /// After successful decoding, the buffer is advanced. + /// Only after successful decoding, the `src` is advanced. Otherwise, + /// it is left untouched to wait for filling more data and retrying. fn decode(&mut self, src: &mut BytesMut) -> Result> { // [value len][value][bitmap len][bitmap] // [8] [?] [8] [?] From 2c7a7374b7722036c8d8634cc72665829abdadde Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 15 Dec 2023 15:30:07 +0000 Subject: [PATCH 04/20] chore: fix typos intermedia -> intermediate Signed-off-by: Zhenchi --- .../create/sort/intermediate_rw.rs | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw.rs b/src/index/src/inverted_index/create/sort/intermediate_rw.rs index 3074586202a3..e1f625c33283 100644 --- a/src/index/src/inverted_index/create/sort/intermediate_rw.rs +++ b/src/index/src/inverted_index/create/sort/intermediate_rw.rs @@ -57,13 +57,13 @@ impl IntermediateWriter { } /// Reads intermediate serialized data from an `AsyncRead` source and converts it to a [`SortedStream`] -pub struct IntermediaReader { +pub struct IntermediateReader { reader: R, } -impl IntermediaReader { - pub fn new(reader: R) -> IntermediaReader { - IntermediaReader { reader } +impl IntermediateReader { + pub fn new(reader: R) -> IntermediateReader { + IntermediateReader { reader } } /// Reads the magic header, determines the codec, and returns a stream of deserialized values. @@ -91,7 +91,7 @@ mod tests { use crate::inverted_index::error::Error; #[tokio::test] - async fn test_intermedia_read_write_basic() { + async fn test_intermediate_read_write_basic() { let mut buf = vec![]; let values = BTreeMap::from_iter([ @@ -102,7 +102,7 @@ mod tests { let writer = IntermediateWriter::new(&mut buf); writer.write_all(values.clone()).await.unwrap(); - let reader = IntermediaReader::new(Cursor::new(buf)); + let reader = IntermediateReader::new(Cursor::new(buf)); let mut stream = reader.into_stream().await.unwrap(); let a = stream.next().await.unwrap().unwrap(); @@ -113,7 +113,7 @@ mod tests { } #[tokio::test] - async fn test_intermedia_read_write_empty() { + async fn test_intermediate_read_write_empty() { let mut buf = vec![]; let values = BTreeMap::new(); @@ -121,17 +121,17 @@ mod tests { let writer = IntermediateWriter::new(&mut buf); writer.write_all(values.clone()).await.unwrap(); - let reader = IntermediaReader::new(Cursor::new(buf)); + let reader = IntermediateReader::new(Cursor::new(buf)); let mut stream = reader.into_stream().await.unwrap(); assert!(stream.next().await.is_none()); } #[tokio::test] - async fn test_intermedia_read_with_invalid_magic() { + async fn test_intermediate_read_with_invalid_magic() { let buf = b"invalid".to_vec(); - let reader = IntermediaReader::new(Cursor::new(buf)); + let reader = IntermediateReader::new(Cursor::new(buf)); let result = reader.into_stream().await; assert!(matches!( result, From f592c18766464e2575a0c9f01fd62e846a3fdf80 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Sat, 16 Dec 2023 11:39:11 +0000 Subject: [PATCH 05/20] fix: typos Signed-off-by: Zhenchi --- .../src/inverted_index/create/sort/intermediate_rw/codec_v1.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs index d343be46e0c0..a340c7d01169 100644 --- a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs +++ b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs @@ -147,7 +147,7 @@ mod tests { } #[test] - fn test_intermediate_codec_parital() { + fn test_intermediate_codec_partial() { let mut codec = IntermediateCodecV1; let mut buf = BytesMut::new(); From 1efbb49b8ea776d409f47f8c9f4c2e6def8e21f9 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 15 Dec 2023 09:33:12 +0000 Subject: [PATCH 06/20] feat(inverted_index.create): add external sorter Signed-off-by: Zhenchi --- Cargo.lock | 1 + src/index/Cargo.toml | 1 + src/index/src/inverted_index/create/sort.rs | 27 +- .../create/sort/external_provider.rs | 39 ++ .../create/sort/external_sort.rs | 373 ++++++++++++++++++ .../create/sort/merge_stream.rs | 164 ++++++++ 6 files changed, 603 insertions(+), 2 deletions(-) create mode 100644 src/index/src/inverted_index/create/sort/external_provider.rs create mode 100644 src/index/src/inverted_index/create/sort/external_sort.rs create mode 100644 src/index/src/inverted_index/create/sort/merge_stream.rs diff --git a/Cargo.lock b/Cargo.lock index d8f8ebce2501..161ef6884333 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3988,6 +3988,7 @@ dependencies = [ "greptime-proto", "mockall", "prost 0.12.2", + "rand", "regex", "regex-automata 0.1.10", "snafu", diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index 5ac20bcd9465..b87475e80f04 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -22,5 +22,6 @@ regex.workspace = true snafu.workspace = true [dev-dependencies] +rand.workspace = true tokio-util.workspace = true tokio.workspace = true diff --git a/src/index/src/inverted_index/create/sort.rs b/src/index/src/inverted_index/create/sort.rs index 2331ed8dcb81..c05bbcce9299 100644 --- a/src/index/src/inverted_index/create/sort.rs +++ b/src/index/src/inverted_index/create/sort.rs @@ -12,13 +12,36 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod external_provider; +mod external_sort; +mod intermediate_rw; +mod merge_stream; + +use async_trait::async_trait; use common_base::BitVec; use futures::Stream; use crate::inverted_index::error::Result; use crate::inverted_index::Bytes; -mod intermediate_rw; - /// A stream of sorted values along with their associated bitmap pub type SortedStream = Box> + Send + Unpin>; + +/// Output of a sorting operation, encapsulating a bitmap for null values and a stream of sorted items +pub struct SortOutput { + /// Bitmap indicating positions of null values + pub null_bitmap: BitVec, + + /// Stream of sorted items + pub sorted_stream: SortedStream, +} + +/// Handles data sorting, supporting incremental input and retrieval of sorted output +#[async_trait] +pub trait Sorter: Send { + /// Inputs a non-null or null value into the sorter + async fn push(&mut self, value: Option) -> Result<()>; + + /// Completes the sorting process and returns the sorted data + async fn output(&mut self) -> Result; +} diff --git a/src/index/src/inverted_index/create/sort/external_provider.rs b/src/index/src/inverted_index/create/sort/external_provider.rs new file mode 100644 index 000000000000..6205a866f944 --- /dev/null +++ b/src/index/src/inverted_index/create/sort/external_provider.rs @@ -0,0 +1,39 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use futures::{AsyncRead, AsyncWrite}; + +use crate::inverted_index::error::Result; + +/// Trait for managing intermediate files during external sorting for a particular index. +#[mockall::automock] +#[async_trait] +pub trait ExternalTempFileProvider: Send + Sync { + /// Creates and opens a new intermediate file associated with a specific index for writing. + /// Should return an error if the file already exists. + /// + /// - `index_name`: the name of the index for which the file will be associated. + /// - `file_id`: a unique identifier for the new file. + async fn create( + &self, + index_name: &str, + file_id: &str, + ) -> Result>; + + /// Retrieves all intermediate files associated with a specific index for an external sorting operation. + /// + /// `index_name`: the name of the index to retrieve intermediate files for. + async fn read_all(&self, index_name: &str) -> Result>>; +} diff --git a/src/index/src/inverted_index/create/sort/external_sort.rs b/src/index/src/inverted_index/create/sort/external_sort.rs new file mode 100644 index 000000000000..5e3bced107ed --- /dev/null +++ b/src/index/src/inverted_index/create/sort/external_sort.rs @@ -0,0 +1,373 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; +use std::mem; +use std::sync::Arc; + +use async_trait::async_trait; +use common_base::BitVec; +use futures::stream; + +use crate::inverted_index::create::sort::external_provider::ExternalTempFileProvider; +use crate::inverted_index::create::sort::intermediate_rw::{IntermediaReader, IntermediateWriter}; +use crate::inverted_index::create::sort::merge_stream::MergeSortedStream; +use crate::inverted_index::create::sort::{SortOutput, SortedStream, Sorter}; +use crate::inverted_index::error::Result; +use crate::inverted_index::Bytes; + +/// `ExternalSorter` manages the sorting of data using both in-memory structures and external files. +/// It dumps data to external files when the in-memory buffer crosses a certain memory threshold. +pub struct ExternalSorter { + /// The index name associated with the sorting operation + index_name: String, + + /// Manages creation and access to external temporary files + temp_file_provider: Arc, + + /// Bitmap indicating which segments have null values + null_bitmap: BitVec, + + /// In-memory buffer to hold values and their corresponding bitmaps until memory threshold is exceeded + values_buffer: BTreeMap, + + /// Count of all rows ingested so far + total_row_count: usize, + + /// The number of rows per group for bitmap indexing which determines how rows are + /// batched for indexing. It is used to determine which segment a row belongs to. + segment_row_count: usize, + + /// Tracks memory usage of the buffer + current_memory_usage: usize, + + /// The memory usage threshold at which the buffer should be dumped to an external file + memory_usage_threshold: usize, +} + +#[async_trait] +impl Sorter for ExternalSorter { + /// Pushes a value into the sorter, adding it to the in-memory buffer and dumping the buffer to + /// an external file if necessary + async fn push(&mut self, value: Option) -> Result<()> { + self.total_row_count += 1; + let bitmap_offset = self.current_segment_index(); + + if let Some(value) = value { + let memory_diff = self.push_not_null(value, bitmap_offset); + self.may_dump_buffer(memory_diff).await + } else { + set_bit(&mut self.null_bitmap, bitmap_offset); + Ok(()) + } + } + + /// Finalizes the sorting operation, merging data from both memory and external files into a sorted stream + async fn output(&mut self) -> Result { + let readers = self.temp_file_provider.read_all(&self.index_name).await?; + + let buf_values = mem::take(&mut self.values_buffer).into_iter(); + let mut merging_sorted_stream: SortedStream = Box::new(stream::iter(buf_values.map(Ok))); + + // Sequentially merge each intermedia file's stream into the merged stream + for reader in readers { + let intermedia = IntermediaReader::new(reader).into_stream().await?; + merging_sorted_stream = MergeSortedStream::merge(merging_sorted_stream, intermedia); + } + + Ok(SortOutput { + null_bitmap: mem::take(&mut self.null_bitmap), + sorted_stream: merging_sorted_stream, + }) + } +} + +impl ExternalSorter { + /// Constructs a new `ExternalSorter` + pub fn new( + index_name: String, + temp_file_provider: Arc, + segment_row_count: usize, + memory_usage_threshold: usize, + ) -> Self { + Self { + index_name, + temp_file_provider, + + null_bitmap: BitVec::new(), + values_buffer: BTreeMap::new(), + + total_row_count: 0, + segment_row_count, + + current_memory_usage: 0, + memory_usage_threshold, + } + } + + /// Determines the current data segment based on processed rows + fn current_segment_index(&self) -> usize { + (self.total_row_count - 1) / self.segment_row_count + } + + /// Adds a non-null value to the buffer or updates an existing value's bitmap. + /// Returns the memory usage difference of the buffer after the operation. + fn push_not_null(&mut self, value: Bytes, offset: usize) -> usize { + match self.values_buffer.entry(value) { + Entry::Occupied(mut entry) => { + let bitmap = entry.get_mut(); + let old_len = bitmap.as_raw_slice().len(); + set_bit(bitmap, offset); + + bitmap.as_raw_slice().len() - old_len + } + Entry::Vacant(entry) => { + let key_len = entry.key().len(); + + let bitmap = entry.insert(BitVec::default()); + set_bit(bitmap, offset); + + bitmap.as_raw_slice().len() + key_len + } + } + } + + /// Checks if the in-memory buffer exceeds the threshold and offloads it to external storage if necessary + async fn may_dump_buffer(&mut self, memory_diff: usize) -> Result<()> { + self.current_memory_usage += memory_diff; + if self.current_memory_usage < self.memory_usage_threshold { + return Ok(()); + } + + let values = mem::take(&mut self.values_buffer); + let file_id = &format!("{:012}", self.total_row_count); + + let writer = self + .temp_file_provider + .create(&self.index_name, file_id) + .await?; + IntermediateWriter::new(writer).write_all(values).await?; + + self.current_memory_usage = 0; + Ok(()) + } +} + +/// Sets or appends a bit at the specified offset within the bitmap +fn set_bit(bitmap: &mut BitVec, offset: usize) { + if offset >= bitmap.len() { + bitmap.resize(offset + 1, false); + } + bitmap.set(offset, true); +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::iter; + use std::sync::Mutex; + + use futures::{AsyncRead, StreamExt}; + use rand::Rng; + use tokio::io::duplex; + use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; + + use super::*; + use crate::inverted_index::create::sort::external_provider::MockExternalTempFileProvider; + + fn generate_random_bytes_option(size: usize) -> Option> { + let mut rng = rand::thread_rng(); + + if rng.gen() { + let mut buffer = vec![0u8; size]; + rng.fill(&mut buffer[..]); + Some(buffer) + } else { + None + } + } + + #[allow(clippy::type_complexity)] + fn shuffle_values_and_sorted_result( + row_count: usize, + ) -> (Vec>>, BTreeMap>, Vec>) { + let mut mock_values = iter::repeat_with(|| generate_random_bytes_option(100)) + .take(row_count) + .collect::>(); + + let mut sorted_result = BTreeMap::new(); + for (i, value) in mock_values.iter_mut().enumerate() { + sorted_result + .entry(value.clone()) + .or_insert_with(Vec::new) + .push(i); + } + + (mock_values, sorted_result) + } + + #[tokio::test] + async fn test_external_sorter_pure_in_memory() { + let memory_usage_threshold = usize::MAX; + + let mut mock_provider = MockExternalTempFileProvider::new(); + mock_provider.expect_create().never(); + mock_provider.expect_read_all().returning(|_| Ok(vec![])); + + let mut sorter = ExternalSorter::new( + "test".to_owned(), + Arc::new(mock_provider), + 1, + memory_usage_threshold, + ); + + let (mock_values, mut sorted_result) = shuffle_values_and_sorted_result(100); + + for value in mock_values { + sorter.push(value).await.unwrap(); + } + + let SortOutput { + null_bitmap, + mut sorted_stream, + } = sorter.output().await.unwrap(); + let n = sorted_result.remove(&None); + assert_eq!( + null_bitmap.iter_ones().collect::>(), + n.unwrap_or_default() + ); + for (value, offsets) in sorted_result { + let item = sorted_stream.next().await.unwrap().unwrap(); + assert_eq!(item.0, value.unwrap()); + assert_eq!(item.1.iter_ones().collect::>(), offsets); + } + } + + #[tokio::test] + async fn test_external_sorter_pure_external() { + let memory_usage_threshold = 0; + + let mut mock_provider = MockExternalTempFileProvider::new(); + + let mock_files: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + + mock_provider.expect_create().returning({ + let files = Arc::clone(&mock_files); + move |index_name, file_id| { + assert_eq!(index_name, "test"); + let mut files = files.lock().unwrap(); + let (writer, reader) = duplex(8 * 1024); + files.insert(file_id.to_string(), Box::new(reader.compat())); + Ok(Box::new(writer.compat_write())) + } + }); + + mock_provider.expect_read_all().returning({ + let files = Arc::clone(&mock_files); + move |index_name| { + assert_eq!(index_name, "test"); + let mut files = files.lock().unwrap(); + Ok(files.drain().map(|f| f.1).collect::>()) + } + }); + + let mut sorter = ExternalSorter::new( + "test".to_owned(), + Arc::new(mock_provider), + 1, + memory_usage_threshold, + ); + + let (mock_values, mut sorted_result) = shuffle_values_and_sorted_result(100); + + for value in mock_values { + sorter.push(value).await.unwrap(); + } + + let SortOutput { + null_bitmap, + mut sorted_stream, + } = sorter.output().await.unwrap(); + let n = sorted_result.remove(&None); + assert_eq!( + null_bitmap.iter_ones().collect::>(), + n.unwrap_or_default() + ); + for (value, offsets) in sorted_result { + let item = sorted_stream.next().await.unwrap().unwrap(); + assert_eq!(item.0, value.unwrap()); + assert_eq!(item.1.iter_ones().collect::>(), offsets); + } + } + + #[tokio::test] + async fn test_external_sorter_mixed() { + let memory_usage_threshold = 1024; + + let mut mock_provider = MockExternalTempFileProvider::new(); + + let mock_files: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + + mock_provider.expect_create().times(1..).returning({ + let files = Arc::clone(&mock_files); + move |index_name, file_id| { + assert_eq!(index_name, "test"); + let mut files = files.lock().unwrap(); + let (writer, reader) = duplex(8 * 1024); + files.insert(file_id.to_string(), Box::new(reader.compat())); + Ok(Box::new(writer.compat_write())) + } + }); + + mock_provider.expect_read_all().returning({ + let files = Arc::clone(&mock_files); + move |index_name| { + assert_eq!(index_name, "test"); + let mut files = files.lock().unwrap(); + Ok(files.drain().map(|f| f.1).collect::>()) + } + }); + + let mut sorter = ExternalSorter::new( + "test".to_owned(), + Arc::new(mock_provider), + 1, + memory_usage_threshold, + ); + + let (mock_values, mut sorted_result) = shuffle_values_and_sorted_result(100); + + for value in mock_values { + sorter.push(value).await.unwrap(); + } + + let SortOutput { + null_bitmap, + mut sorted_stream, + } = sorter.output().await.unwrap(); + let n = sorted_result.remove(&None); + assert_eq!( + null_bitmap.iter_ones().collect::>(), + n.unwrap_or_default() + ); + for (value, offsets) in sorted_result { + let item = sorted_stream.next().await.unwrap().unwrap(); + assert_eq!(item.0, value.unwrap()); + assert_eq!(item.1.iter_ones().collect::>(), offsets); + } + } +} diff --git a/src/index/src/inverted_index/create/sort/merge_stream.rs b/src/index/src/inverted_index/create/sort/merge_stream.rs new file mode 100644 index 000000000000..247f22aee3e1 --- /dev/null +++ b/src/index/src/inverted_index/create/sort/merge_stream.rs @@ -0,0 +1,164 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use common_base::BitVec; +use futures::{ready, Stream, StreamExt}; + +use crate::inverted_index::create::sort::SortedStream; +use crate::inverted_index::error::Result; +use crate::inverted_index::Bytes; + +/// A [`Stream`] implementation that merges two sorted streams into a single sorted stream +pub struct MergeSortedStream { + stream1: SortedStream, + stream2: SortedStream, + res1: Option<(Bytes, BitVec)>, + res2: Option<(Bytes, BitVec)>, +} + +impl MergeSortedStream { + /// Creates a new `MergeSortedStream` that will return elements from `stream1` and `stream2` + /// in sorted order, merging duplicate items by unioning their bitmaps + pub fn merge(stream1: SortedStream, stream2: SortedStream) -> SortedStream { + Box::new(MergeSortedStream { + stream1, + stream2, + res1: None, + res2: None, + }) + } +} + +impl Stream for MergeSortedStream { + type Item = Result<(Bytes, BitVec)>; + + /// Polls both streams and returns the next item from the stream that has the smaller next item + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.res1.is_none() { + if let Some(item) = ready!(self.stream1.poll_next_unpin(cx)) { + self.res1 = Some(item?); + } + } + if self.res2.is_none() { + if let Some(item) = ready!(self.stream2.poll_next_unpin(cx)) { + self.res2 = Some(item?); + } + } + + Poll::Ready(match (self.res1.take(), self.res2.take()) { + (Some((v1, b1)), Some((v2, b2))) => match v1.cmp(&v2) { + Ordering::Less => { + self.res2 = Some((v2, b2)); // Preserve the rest of stream2 + Some(Ok((v1, b1))) + } + Ordering::Greater => { + self.res1 = Some((v1, b1)); // Preserve the rest of stream1 + Some(Ok((v2, b2))) + } + Ordering::Equal => Some(Ok((v1, merge_bitmaps(b1, b2)))), + }, + (None, Some(item)) | (Some(item), None) => Some(Ok(item)), + (None, None) => None, + }) + } +} + +/// Merges two bitmaps by bit-wise OR'ing them together, preserving all bits from both +fn merge_bitmaps(bitmap1: BitVec, bitmap2: BitVec) -> BitVec { + // make sure longer bitmap is on the left to avoid truncation + #[allow(clippy::if_same_then_else)] + if bitmap1.len() > bitmap2.len() { + bitmap1 | bitmap2 + } else { + bitmap2 | bitmap1 + } +} + +#[cfg(test)] +mod tests { + use futures::stream; + + use super::*; + use crate::inverted_index::error::Error; + + fn sorted_stream_from_vec(vec: Vec<(Bytes, BitVec)>) -> SortedStream { + Box::new(stream::iter(vec.into_iter().map(Ok::<_, Error>))) + } + + #[tokio::test] + async fn test_merge_sorted_stream_non_overlapping() { + let stream1 = sorted_stream_from_vec(vec![ + (Bytes::from("apple"), BitVec::from_slice(&[0b10101010])), + (Bytes::from("orange"), BitVec::from_slice(&[0b01010101])), + ]); + let stream2 = sorted_stream_from_vec(vec![ + (Bytes::from("banana"), BitVec::from_slice(&[0b10101010])), + (Bytes::from("peach"), BitVec::from_slice(&[0b01010101])), + ]); + + let mut merged_stream = MergeSortedStream::merge(stream1, stream2); + + let item = merged_stream.next().await.unwrap().unwrap(); + assert_eq!(item.0, Bytes::from("apple")); + assert_eq!(item.1, BitVec::from_slice(&[0b10101010])); + let item = merged_stream.next().await.unwrap().unwrap(); + assert_eq!(item.0, Bytes::from("banana")); + assert_eq!(item.1, BitVec::from_slice(&[0b10101010])); + let item = merged_stream.next().await.unwrap().unwrap(); + assert_eq!(item.0, Bytes::from("orange")); + assert_eq!(item.1, BitVec::from_slice(&[0b01010101])); + let item = merged_stream.next().await.unwrap().unwrap(); + assert_eq!(item.0, Bytes::from("peach")); + assert_eq!(item.1, BitVec::from_slice(&[0b01010101])); + assert!(merged_stream.next().await.is_none()); + } + + #[tokio::test] + async fn test_merge_sorted_stream_overlapping() { + let stream1 = sorted_stream_from_vec(vec![ + (Bytes::from("apple"), BitVec::from_slice(&[0b10101010])), + (Bytes::from("orange"), BitVec::from_slice(&[0b10101010])), + ]); + let stream2 = sorted_stream_from_vec(vec![ + (Bytes::from("apple"), BitVec::from_slice(&[0b01010101])), + (Bytes::from("peach"), BitVec::from_slice(&[0b01010101])), + ]); + + let mut merged_stream = MergeSortedStream::merge(stream1, stream2); + + let item = merged_stream.next().await.unwrap().unwrap(); + assert_eq!(item.0, Bytes::from("apple")); + assert_eq!(item.1, BitVec::from_slice(&[0b11111111])); + let item = merged_stream.next().await.unwrap().unwrap(); + assert_eq!(item.0, Bytes::from("orange")); + assert_eq!(item.1, BitVec::from_slice(&[0b10101010])); + let item = merged_stream.next().await.unwrap().unwrap(); + assert_eq!(item.0, Bytes::from("peach")); + assert_eq!(item.1, BitVec::from_slice(&[0b01010101])); + assert!(merged_stream.next().await.is_none()); + } + + #[tokio::test] + async fn test_merge_sorted_stream_empty_streams() { + let stream1 = sorted_stream_from_vec(vec![]); + let stream2 = sorted_stream_from_vec(vec![]); + + let mut merged_stream = MergeSortedStream::merge(stream1, stream2); + assert!(merged_stream.next().await.is_none()); + } +} From eab60dbf377dd2f7fcc97b9395aeaa43fb1b87fd Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 15 Dec 2023 15:32:43 +0000 Subject: [PATCH 07/20] chore: fix typos intermedia -> intermediate Signed-off-by: Zhenchi --- .../src/inverted_index/create/sort/external_sort.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/index/src/inverted_index/create/sort/external_sort.rs b/src/index/src/inverted_index/create/sort/external_sort.rs index 5e3bced107ed..6573c10e1f52 100644 --- a/src/index/src/inverted_index/create/sort/external_sort.rs +++ b/src/index/src/inverted_index/create/sort/external_sort.rs @@ -22,7 +22,9 @@ use common_base::BitVec; use futures::stream; use crate::inverted_index::create::sort::external_provider::ExternalTempFileProvider; -use crate::inverted_index::create::sort::intermediate_rw::{IntermediaReader, IntermediateWriter}; +use crate::inverted_index::create::sort::intermediate_rw::{ + IntermediateReader, IntermediateWriter, +}; use crate::inverted_index::create::sort::merge_stream::MergeSortedStream; use crate::inverted_index::create::sort::{SortOutput, SortedStream, Sorter}; use crate::inverted_index::error::Result; @@ -81,10 +83,10 @@ impl Sorter for ExternalSorter { let buf_values = mem::take(&mut self.values_buffer).into_iter(); let mut merging_sorted_stream: SortedStream = Box::new(stream::iter(buf_values.map(Ok))); - // Sequentially merge each intermedia file's stream into the merged stream + // Sequentially merge each intermediate file's stream into the merged stream for reader in readers { - let intermedia = IntermediaReader::new(reader).into_stream().await?; - merging_sorted_stream = MergeSortedStream::merge(merging_sorted_stream, intermedia); + let intermediate = IntermediateReader::new(reader).into_stream().await?; + merging_sorted_stream = MergeSortedStream::merge(merging_sorted_stream, intermediate); } Ok(SortOutput { From 5bb88faf382533dfe1f257ca4f6c58437cb7c8e4 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 15 Dec 2023 15:41:27 +0000 Subject: [PATCH 08/20] chore: polish comments Signed-off-by: Zhenchi --- src/index/src/inverted_index/create/sort/merge_stream.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/index/src/inverted_index/create/sort/merge_stream.rs b/src/index/src/inverted_index/create/sort/merge_stream.rs index 247f22aee3e1..d73d3489191a 100644 --- a/src/index/src/inverted_index/create/sort/merge_stream.rs +++ b/src/index/src/inverted_index/create/sort/merge_stream.rs @@ -47,7 +47,8 @@ impl MergeSortedStream { impl Stream for MergeSortedStream { type Item = Result<(Bytes, BitVec)>; - /// Polls both streams and returns the next item from the stream that has the smaller next item + /// Polls both streams and returns the next item from the stream that has the smaller next item. + /// If both streams have the same next item, the bitmaps are unioned together. fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.res1.is_none() { if let Some(item) = ready!(self.stream1.poll_next_unpin(cx)) { From 36cf2f0fec755f6c692a309f0ccabc01fb42cd59 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 15 Dec 2023 15:50:39 +0000 Subject: [PATCH 09/20] chore: polish comments Signed-off-by: Zhenchi --- .../src/inverted_index/create/sort/external_provider.rs | 8 ++++---- src/index/src/inverted_index/create/sort/external_sort.rs | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/index/src/inverted_index/create/sort/external_provider.rs b/src/index/src/inverted_index/create/sort/external_provider.rs index 6205a866f944..a86f3e06aad4 100644 --- a/src/index/src/inverted_index/create/sort/external_provider.rs +++ b/src/index/src/inverted_index/create/sort/external_provider.rs @@ -22,10 +22,10 @@ use crate::inverted_index::error::Result; #[async_trait] pub trait ExternalTempFileProvider: Send + Sync { /// Creates and opens a new intermediate file associated with a specific index for writing. - /// Should return an error if the file already exists. + /// The implementation should ensure that the file does not already exist. /// - /// - `index_name`: the name of the index for which the file will be associated. - /// - `file_id`: a unique identifier for the new file. + /// - `index_name`: the name of the index for which the file will be associated + /// - `file_id`: a unique identifier for the new file async fn create( &self, index_name: &str, @@ -34,6 +34,6 @@ pub trait ExternalTempFileProvider: Send + Sync { /// Retrieves all intermediate files associated with a specific index for an external sorting operation. /// - /// `index_name`: the name of the index to retrieve intermediate files for. + /// `index_name`: the name of the index to retrieve intermediate files for async fn read_all(&self, index_name: &str) -> Result>>; } diff --git a/src/index/src/inverted_index/create/sort/external_sort.rs b/src/index/src/inverted_index/create/sort/external_sort.rs index 6573c10e1f52..3eff21d528d1 100644 --- a/src/index/src/inverted_index/create/sort/external_sort.rs +++ b/src/index/src/inverted_index/create/sort/external_sort.rs @@ -76,7 +76,8 @@ impl Sorter for ExternalSorter { } } - /// Finalizes the sorting operation, merging data from both memory and external files into a sorted stream + /// Finalizes the sorting operation, merging data from both in-memory buffer and external files + /// into a sorted stream async fn output(&mut self) -> Result { let readers = self.temp_file_provider.read_all(&self.index_name).await?; From ea982d03a7a871fb60fc2161b5a7723bd4816784 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 15 Dec 2023 16:24:40 +0000 Subject: [PATCH 10/20] refactor: drop the stream as early as possible to avoid recursive calls to poll Signed-off-by: Zhenchi --- .../create/sort/merge_stream.rs | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/src/index/src/inverted_index/create/sort/merge_stream.rs b/src/index/src/inverted_index/create/sort/merge_stream.rs index d73d3489191a..e476b82d1c8c 100644 --- a/src/index/src/inverted_index/create/sort/merge_stream.rs +++ b/src/index/src/inverted_index/create/sort/merge_stream.rs @@ -25,10 +25,11 @@ use crate::inverted_index::Bytes; /// A [`Stream`] implementation that merges two sorted streams into a single sorted stream pub struct MergeSortedStream { - stream1: SortedStream, - stream2: SortedStream, - res1: Option<(Bytes, BitVec)>, - res2: Option<(Bytes, BitVec)>, + stream1: Option, + peek1: Option<(Bytes, BitVec)>, + + stream2: Option, + peek2: Option<(Bytes, BitVec)>, } impl MergeSortedStream { @@ -36,10 +37,11 @@ impl MergeSortedStream { /// in sorted order, merging duplicate items by unioning their bitmaps pub fn merge(stream1: SortedStream, stream2: SortedStream) -> SortedStream { Box::new(MergeSortedStream { - stream1, - stream2, - res1: None, - res2: None, + stream1: Some(stream1), + peek1: None, + + stream2: Some(stream2), + peek2: None, }) } } @@ -50,25 +52,28 @@ impl Stream for MergeSortedStream { /// Polls both streams and returns the next item from the stream that has the smaller next item. /// If both streams have the same next item, the bitmaps are unioned together. fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.res1.is_none() { - if let Some(item) = ready!(self.stream1.poll_next_unpin(cx)) { - self.res1 = Some(item?); + if let (true, Some(stream1)) = (self.peek1.is_none(), self.stream1.as_mut()) { + match ready!(stream1.poll_next_unpin(cx)) { + Some(item) => self.peek1 = Some(item?), + None => self.stream1 = None, // `stream1` is exhausted, don't poll it next time } } - if self.res2.is_none() { - if let Some(item) = ready!(self.stream2.poll_next_unpin(cx)) { - self.res2 = Some(item?); + + if let (true, Some(stream2)) = (self.peek2.is_none(), self.stream2.as_mut()) { + match ready!(stream2.poll_next_unpin(cx)) { + Some(item) => self.peek2 = Some(item?), + None => self.stream2 = None, // `stream2` is exhausted, don't poll it next time } } - Poll::Ready(match (self.res1.take(), self.res2.take()) { + Poll::Ready(match (self.peek1.take(), self.peek2.take()) { (Some((v1, b1)), Some((v2, b2))) => match v1.cmp(&v2) { Ordering::Less => { - self.res2 = Some((v2, b2)); // Preserve the rest of stream2 + self.peek2 = Some((v2, b2)); // Preserve the rest of `stream2` Some(Ok((v1, b1))) } Ordering::Greater => { - self.res1 = Some((v1, b1)); // Preserve the rest of stream1 + self.peek1 = Some((v1, b1)); // Preserve the rest of `stream1` Some(Ok((v2, b2))) } Ordering::Equal => Some(Ok((v1, merge_bitmaps(b1, b2)))), From 18c5a786d79c39dd742e3c5b381f56b4f844b846 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Sat, 16 Dec 2023 08:54:08 +0000 Subject: [PATCH 11/20] refactor: project merge sorted stream Signed-off-by: Zhenchi --- Cargo.lock | 1 + src/index/Cargo.toml | 1 + .../create/sort/merge_stream.rs | 24 +++++++++++-------- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 161ef6884333..a3a2aad476e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3987,6 +3987,7 @@ dependencies = [ "futures_codec", "greptime-proto", "mockall", + "pin-project 1.1.3", "prost 0.12.2", "rand", "regex", diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index b87475e80f04..06c2087448b3 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -16,6 +16,7 @@ futures.workspace = true futures_codec = "0.4.1" greptime-proto.workspace = true mockall.workspace = true +pin-project.workspace = true prost.workspace = true regex-automata.workspace = true regex.workspace = true diff --git a/src/index/src/inverted_index/create/sort/merge_stream.rs b/src/index/src/inverted_index/create/sort/merge_stream.rs index e476b82d1c8c..84debecb8ada 100644 --- a/src/index/src/inverted_index/create/sort/merge_stream.rs +++ b/src/index/src/inverted_index/create/sort/merge_stream.rs @@ -18,12 +18,14 @@ use std::task::{Context, Poll}; use common_base::BitVec; use futures::{ready, Stream, StreamExt}; +use pin_project::pin_project; use crate::inverted_index::create::sort::SortedStream; use crate::inverted_index::error::Result; use crate::inverted_index::Bytes; /// A [`Stream`] implementation that merges two sorted streams into a single sorted stream +#[pin_project] pub struct MergeSortedStream { stream1: Option, peek1: Option<(Bytes, BitVec)>, @@ -51,29 +53,31 @@ impl Stream for MergeSortedStream { /// Polls both streams and returns the next item from the stream that has the smaller next item. /// If both streams have the same next item, the bitmaps are unioned together. - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let (true, Some(stream1)) = (self.peek1.is_none(), self.stream1.as_mut()) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + if let (None, Some(stream1)) = (&this.peek1, this.stream1.as_mut()) { match ready!(stream1.poll_next_unpin(cx)) { - Some(item) => self.peek1 = Some(item?), - None => self.stream1 = None, // `stream1` is exhausted, don't poll it next time + Some(item) => *this.peek1 = Some(item?), + None => *this.stream1 = None, // `stream1` is exhausted, don't poll it next time } } - if let (true, Some(stream2)) = (self.peek2.is_none(), self.stream2.as_mut()) { + if let (None, Some(stream2)) = (&this.peek2, this.stream2.as_mut()) { match ready!(stream2.poll_next_unpin(cx)) { - Some(item) => self.peek2 = Some(item?), - None => self.stream2 = None, // `stream2` is exhausted, don't poll it next time + Some(item) => *this.peek2 = Some(item?), + None => *this.stream2 = None, // `stream2` is exhausted, don't poll it next time } } - Poll::Ready(match (self.peek1.take(), self.peek2.take()) { + Poll::Ready(match (this.peek1.take(), this.peek2.take()) { (Some((v1, b1)), Some((v2, b2))) => match v1.cmp(&v2) { Ordering::Less => { - self.peek2 = Some((v2, b2)); // Preserve the rest of `stream2` + *this.peek2 = Some((v2, b2)); // Preserve the rest of `stream2` Some(Ok((v1, b1))) } Ordering::Greater => { - self.peek1 = Some((v1, b1)); // Preserve the rest of `stream1` + *this.peek1 = Some((v1, b1)); // Preserve the rest of `stream1` Some(Ok((v2, b2))) } Ordering::Equal => Some(Ok((v1, merge_bitmaps(b1, b2)))), From d2cf989bb5aa038d00c65c6011557171ea9cb21b Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Sat, 16 Dec 2023 10:53:42 +0000 Subject: [PATCH 12/20] feat: add total_row_count to SortOutput Signed-off-by: Zhenchi --- src/index/src/inverted_index/create/sort.rs | 3 +++ .../create/sort/external_sort.rs | 7 ++++++ src/index/src/inverted_index/format/writer.rs | 3 ++- .../src/inverted_index/format/writer/blob.rs | 25 ++++++++----------- 4 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/index/src/inverted_index/create/sort.rs b/src/index/src/inverted_index/create/sort.rs index c05bbcce9299..55edeccb504e 100644 --- a/src/index/src/inverted_index/create/sort.rs +++ b/src/index/src/inverted_index/create/sort.rs @@ -34,6 +34,9 @@ pub struct SortOutput { /// Stream of sorted items pub sorted_stream: SortedStream, + + /// Total number of rows in the sorted data + pub total_row_count: usize, } /// Handles data sorting, supporting incremental input and retrieval of sorted output diff --git a/src/index/src/inverted_index/create/sort/external_sort.rs b/src/index/src/inverted_index/create/sort/external_sort.rs index 3eff21d528d1..4a0a3c9bda8d 100644 --- a/src/index/src/inverted_index/create/sort/external_sort.rs +++ b/src/index/src/inverted_index/create/sort/external_sort.rs @@ -93,6 +93,7 @@ impl Sorter for ExternalSorter { Ok(SortOutput { null_bitmap: mem::take(&mut self.null_bitmap), sorted_stream: merging_sorted_stream, + total_row_count: self.total_row_count, }) } } @@ -245,7 +246,9 @@ mod tests { let SortOutput { null_bitmap, mut sorted_stream, + total_row_count, } = sorter.output().await.unwrap(); + assert_eq!(total_row_count, 100); let n = sorted_result.remove(&None); assert_eq!( null_bitmap.iter_ones().collect::>(), @@ -303,7 +306,9 @@ mod tests { let SortOutput { null_bitmap, mut sorted_stream, + total_row_count, } = sorter.output().await.unwrap(); + assert_eq!(total_row_count, 100); let n = sorted_result.remove(&None); assert_eq!( null_bitmap.iter_ones().collect::>(), @@ -361,7 +366,9 @@ mod tests { let SortOutput { null_bitmap, mut sorted_stream, + total_row_count, } = sorter.output().await.unwrap(); + assert_eq!(total_row_count, 100); let n = sorted_result.remove(&None); assert_eq!( null_bitmap.iter_ones().collect::>(), diff --git a/src/index/src/inverted_index/format/writer.rs b/src/index/src/inverted_index/format/writer.rs index 8cb9e408df38..752f0561237d 100644 --- a/src/index/src/inverted_index/format/writer.rs +++ b/src/index/src/inverted_index/format/writer.rs @@ -37,5 +37,6 @@ pub trait InvertedIndexWriter { S: Stream> + Send + Unpin; /// Finalizes the index writing process, ensuring all data is written. - async fn finish(&mut self) -> Result<()>; + /// `total_row_count` and `segment_row_count` is used to fill in the metadata. + async fn finish(&mut self, total_row_count: u64, segment_row_count: u64) -> Result<()>; } diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index e38319f48a27..fe99f46d77bf 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -58,7 +58,10 @@ impl InvertedIndexWriter for InvertedIndexBlobWrit Ok(()) } - async fn finish(&mut self) -> Result<()> { + async fn finish(&mut self, total_row_count: u64, segment_row_count: u64) -> Result<()> { + self.metas.segment_row_count = segment_row_count; + self.metas.total_row_count = total_row_count; + let metas_bytes = self.metas.encode_to_vec(); self.blob_writer .write_all(&metas_bytes) @@ -78,19 +81,11 @@ impl InvertedIndexWriter for InvertedIndexBlobWrit } impl InvertedIndexBlobWriter { - pub fn new( - blob_writer: W, - total_row_count: u64, - segment_row_count: u64, - ) -> InvertedIndexBlobWriter { + pub fn new(blob_writer: W) -> InvertedIndexBlobWriter { InvertedIndexBlobWriter { blob_writer, written_size: 0, - metas: InvertedIndexMetas { - total_row_count, - segment_row_count, - ..Default::default() - }, + metas: InvertedIndexMetas::default(), } } } @@ -109,8 +104,8 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_writer_write_empty() { let mut blob = Vec::new(); - let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1); - writer.finish().await.unwrap(); + let mut writer = InvertedIndexBlobWriter::new(&mut blob); + writer.finish(8, 1).await.unwrap(); let cursor = Cursor::new(blob); let mut reader = InvertedIndexBlobReader::new(cursor); @@ -123,7 +118,7 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_writer_write_basic() { let mut blob = Vec::new(); - let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1); + let mut writer = InvertedIndexBlobWriter::new(&mut blob); writer .add_index( "tag0".to_string(), @@ -148,7 +143,7 @@ mod tests { ) .await .unwrap(); - writer.finish().await.unwrap(); + writer.finish(8, 1).await.unwrap(); let cursor = Cursor::new(blob); let mut reader = InvertedIndexBlobReader::new(cursor); From 3bc75de61184728421c9fad7a616080b475b4fea Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Sat, 16 Dec 2023 11:05:36 +0000 Subject: [PATCH 13/20] feat: remove change of format Signed-off-by: Zhenchi --- src/index/src/inverted_index/format/writer.rs | 3 +-- .../src/inverted_index/format/writer/blob.rs | 25 +++++++++++-------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/index/src/inverted_index/format/writer.rs b/src/index/src/inverted_index/format/writer.rs index 752f0561237d..8cb9e408df38 100644 --- a/src/index/src/inverted_index/format/writer.rs +++ b/src/index/src/inverted_index/format/writer.rs @@ -37,6 +37,5 @@ pub trait InvertedIndexWriter { S: Stream> + Send + Unpin; /// Finalizes the index writing process, ensuring all data is written. - /// `total_row_count` and `segment_row_count` is used to fill in the metadata. - async fn finish(&mut self, total_row_count: u64, segment_row_count: u64) -> Result<()>; + async fn finish(&mut self) -> Result<()>; } diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index fe99f46d77bf..e38319f48a27 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -58,10 +58,7 @@ impl InvertedIndexWriter for InvertedIndexBlobWrit Ok(()) } - async fn finish(&mut self, total_row_count: u64, segment_row_count: u64) -> Result<()> { - self.metas.segment_row_count = segment_row_count; - self.metas.total_row_count = total_row_count; - + async fn finish(&mut self) -> Result<()> { let metas_bytes = self.metas.encode_to_vec(); self.blob_writer .write_all(&metas_bytes) @@ -81,11 +78,19 @@ impl InvertedIndexWriter for InvertedIndexBlobWrit } impl InvertedIndexBlobWriter { - pub fn new(blob_writer: W) -> InvertedIndexBlobWriter { + pub fn new( + blob_writer: W, + total_row_count: u64, + segment_row_count: u64, + ) -> InvertedIndexBlobWriter { InvertedIndexBlobWriter { blob_writer, written_size: 0, - metas: InvertedIndexMetas::default(), + metas: InvertedIndexMetas { + total_row_count, + segment_row_count, + ..Default::default() + }, } } } @@ -104,8 +109,8 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_writer_write_empty() { let mut blob = Vec::new(); - let mut writer = InvertedIndexBlobWriter::new(&mut blob); - writer.finish(8, 1).await.unwrap(); + let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1); + writer.finish().await.unwrap(); let cursor = Cursor::new(blob); let mut reader = InvertedIndexBlobReader::new(cursor); @@ -118,7 +123,7 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_writer_write_basic() { let mut blob = Vec::new(); - let mut writer = InvertedIndexBlobWriter::new(&mut blob); + let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1); writer .add_index( "tag0".to_string(), @@ -143,7 +148,7 @@ mod tests { ) .await .unwrap(); - writer.finish(8, 1).await.unwrap(); + writer.finish().await.unwrap(); let cursor = Cursor::new(blob); let mut reader = InvertedIndexBlobReader::new(cursor); From 71b82eb00e3e76f7bf8307ce14fa0c9cb4163e6e Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 15 Dec 2023 09:39:44 +0000 Subject: [PATCH 14/20] feat(inverted_index.create): add index creator Signed-off-by: Zhenchi --- src/index/src/inverted_index/create.rs | 21 ++ .../create/sort/external_sort.rs | 16 ++ .../src/inverted_index/create/sort_create.rs | 201 ++++++++++++++++++ src/index/src/inverted_index/format/writer.rs | 12 +- .../src/inverted_index/format/writer/blob.rs | 19 +- 5 files changed, 257 insertions(+), 12 deletions(-) create mode 100644 src/index/src/inverted_index/create/sort_create.rs diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs index a57c2c36b7e6..27d59147e9de 100644 --- a/src/index/src/inverted_index/create.rs +++ b/src/index/src/inverted_index/create.rs @@ -13,3 +13,24 @@ // limitations under the License. mod sort; +mod sort_create; + +use async_trait::async_trait; + +use crate::inverted_index::error::Result; +use crate::inverted_index::Bytes; + +/// `IndexCreator` provides functionality to construct and finalize an index +#[async_trait] +pub trait IndexCreator { + /// Adds a value to the named index. A `None` value represents an absence of data (null) + /// + /// - `index_name`: Identifier for the index being built + /// - `value`: The data to be indexed, or `None` for a null entry + /// + /// Note: Call this method for each row in the dataset + async fn push_with_name(&mut self, index_name: &str, value: Option) -> Result<()>; + + /// Finalizes the index creation process, ensuring all data is properly indexed and stored + async fn finish(&mut self) -> Result<()>; +} diff --git a/src/index/src/inverted_index/create/sort/external_sort.rs b/src/index/src/inverted_index/create/sort/external_sort.rs index 4a0a3c9bda8d..01303ff04dba 100644 --- a/src/index/src/inverted_index/create/sort/external_sort.rs +++ b/src/index/src/inverted_index/create/sort/external_sort.rs @@ -27,6 +27,7 @@ use crate::inverted_index::create::sort::intermediate_rw::{ }; use crate::inverted_index::create::sort::merge_stream::MergeSortedStream; use crate::inverted_index::create::sort::{SortOutput, SortedStream, Sorter}; +use crate::inverted_index::create::sort_create::SorterFactory; use crate::inverted_index::error::Result; use crate::inverted_index::Bytes; @@ -121,6 +122,21 @@ impl ExternalSorter { } } + /// Generates a factory function that creates new `ExternalSorter` instances + pub fn factory( + temp_file_provider: Arc, + memory_usage_threshold: usize, + ) -> SorterFactory { + Box::new(move |index_name, segment_row_count| { + Box::new(Self::new( + index_name, + temp_file_provider.clone(), + segment_row_count, + memory_usage_threshold, + )) + }) + } + /// Determines the current data segment based on processed rows fn current_segment_index(&self) -> usize { (self.total_row_count - 1) / self.segment_row_count diff --git a/src/index/src/inverted_index/create/sort_create.rs b/src/index/src/inverted_index/create/sort_create.rs new file mode 100644 index 000000000000..b66829373ab1 --- /dev/null +++ b/src/index/src/inverted_index/create/sort_create.rs @@ -0,0 +1,201 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use async_trait::async_trait; + +use crate::inverted_index::create::sort::{SortOutput, Sorter}; +use crate::inverted_index::create::IndexCreator; +use crate::inverted_index::error::Result; +use crate::inverted_index::format::writer::InvertedIndexWriter; +use crate::inverted_index::Bytes; + +type IndexName = String; +type SegmentRowCount = usize; + +/// Factory type to produce `Sorter` instances associated with an index name and segment row count +pub type SorterFactory = Box Box + Send>; + +/// `SortIndexCreator` orchestrates indexing by sorting input data for each named index +/// and writing to an inverted index writer +pub struct SortIndexCreator { + /// Factory for producing `Sorter` instances + sorter_factory: SorterFactory, + + /// Map of index names to sorters + sorters: HashMap>, + + /// Writer for inverted index data + index_writer: Box, + + /// Number of rows in each segment, used to produce sorters + segment_row_count: usize, +} + +#[async_trait] +impl IndexCreator for SortIndexCreator { + /// Inserts a value or null into the sorter for the specified index + async fn push_with_name(&mut self, index_name: &str, value: Option) -> Result<()> { + match self.sorters.get_mut(index_name) { + Some(sorter) => sorter.push(value).await, + None => { + let mut sorter = + (self.sorter_factory)(index_name.to_owned(), self.segment_row_count); + sorter.push(value).await?; + self.sorters.insert(index_name.to_owned(), sorter); + Ok(()) + } + } + } + + /// Finalizes the sorting for all indexes and writes them using the inverted index writer + async fn finish(&mut self) -> Result<()> { + for (index_name, mut sorter) in self.sorters.drain() { + let SortOutput { + null_bitmap, + sorted_stream, + } = sorter.output().await?; + + self.index_writer + .add_index(index_name, null_bitmap, sorted_stream) + .await?; + } + + self.index_writer.finish().await + } +} + +impl SortIndexCreator { + /// Creates a new `SortIndexCreator` with the given sorter factory and index writer + pub fn new( + sorter_factory: SorterFactory, + index_writer: Box, + segment_row_count: usize, + ) -> Self { + Self { + sorter_factory, + sorters: HashMap::new(), + index_writer, + segment_row_count, + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use common_base::BitVec; + use futures::{stream, StreamExt}; + + use super::*; + use crate::inverted_index::create::sort::SortedStream; + use crate::inverted_index::format::writer::MockInvertedIndexWriter; + + fn stream_to_values(stream: SortedStream) -> Vec { + futures::executor::block_on(async { + stream.map(|r| r.unwrap().0).collect::>().await + }) + } + + #[tokio::test] + async fn test_sort_index_creator_basic() { + let mut mock_writer = MockInvertedIndexWriter::new(); + mock_writer + .expect_add_index() + .times(3) + .returning(|name, null_bitmap, stream| { + assert!(null_bitmap.is_empty()); + match name.as_str() { + "a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]), + "b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]), + "c" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]), + _ => panic!("unexpected index name: {}", name), + } + Ok(()) + }); + mock_writer.expect_finish().times(1).returning(|| Ok(())); + + let mut creator = SortIndexCreator::new(NaiveSorter::factory(), Box::new(mock_writer), 1); + + let index_values = vec![ + ("a", vec![b"3", b"2", b"1"]), + ("b", vec![b"6", b"5", b"4"]), + ("c", vec![b"1", b"2", b"3"]), + ]; + + for (index_name, values) in index_values { + for value in values { + creator + .push_with_name(index_name, Some(value.into())) + .await + .unwrap(); + } + } + + creator.finish().await.unwrap(); + } + + fn set_bit(bit_vec: &mut BitVec, index: usize) { + if index >= bit_vec.len() { + bit_vec.resize(index + 1, false); + } + bit_vec.set(index, true); + } + + struct NaiveSorter { + total_row_count: usize, + segment_row_count: usize, + values: BTreeMap, BitVec>, + } + + impl NaiveSorter { + fn factory() -> SorterFactory { + Box::new(|_index_name, segment_row_count| { + Box::new(NaiveSorter { + total_row_count: 0, + segment_row_count, + values: BTreeMap::new(), + }) + }) + } + } + + #[async_trait] + impl Sorter for NaiveSorter { + async fn push(&mut self, value: Option) -> Result<()> { + let segment_index = self.total_row_count / self.segment_row_count; + self.total_row_count += 1; + + let bitmap = self.values.entry(value).or_default(); + set_bit(bitmap, segment_index); + + Ok(()) + } + + async fn output(&mut self) -> Result { + let null_bitmap = self.values.remove(&None).unwrap_or_default(); + + Ok(SortOutput { + null_bitmap, + sorted_stream: Box::new(stream::iter( + std::mem::take(&mut self.values) + .into_iter() + .map(|(v, b)| Ok((v.unwrap(), b))), + )), + }) + } + } +} diff --git a/src/index/src/inverted_index/format/writer.rs b/src/index/src/inverted_index/format/writer.rs index 8cb9e408df38..2f240463d23d 100644 --- a/src/index/src/inverted_index/format/writer.rs +++ b/src/index/src/inverted_index/format/writer.rs @@ -24,17 +24,21 @@ pub use crate::inverted_index::format::writer::blob::InvertedIndexBlobWriter; use crate::inverted_index::Bytes; /// Trait for writing inverted index data to underlying storage. +#[mockall::automock] #[async_trait] -pub trait InvertedIndexWriter { +pub trait InvertedIndexWriter: Send { /// Adds entries to an index. /// /// * `name` is the index identifier. /// * `null_bitmap` marks positions of null entries. /// * `values` is a stream of values and their locations, yielded lexicographically. /// Errors occur if the values are out of order. - async fn add_index(&mut self, name: String, null_bitmap: BitVec, values: S) -> Result<()> - where - S: Stream> + Send + Unpin; + async fn add_index( + &mut self, + name: String, + null_bitmap: BitVec, + values: Box> + Send + Unpin>, + ) -> Result<()>; /// Finalizes the index writing process, ensuring all data is written. async fn finish(&mut self) -> Result<()>; diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index e38319f48a27..03d47d545007 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -39,10 +39,12 @@ pub struct InvertedIndexBlobWriter { #[async_trait] impl InvertedIndexWriter for InvertedIndexBlobWriter { - async fn add_index(&mut self, name: String, null_bitmap: BitVec, values: S) -> Result<()> - where - S: Stream> + Send + Unpin, - { + async fn add_index( + &mut self, + name: String, + null_bitmap: BitVec, + values: Box> + Send + Unpin>, + ) -> Result<()> { let single_writer = SingleIndexWriter::new( name.clone(), self.written_size, @@ -98,6 +100,7 @@ impl InvertedIndexBlobWriter { #[cfg(test)] mod tests { use futures::io::Cursor; + use futures::stream; use super::*; use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader}; @@ -128,11 +131,11 @@ mod tests { .add_index( "tag0".to_string(), BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), - futures::stream::iter(vec![ + Box::new(stream::iter(vec![ Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))), Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))), Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))), - ]), + ])), ) .await .unwrap(); @@ -140,11 +143,11 @@ mod tests { .add_index( "tag1".to_string(), BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), - futures::stream::iter(vec![ + Box::new(stream::iter(vec![ Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))), Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))), Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))), - ]), + ])), ) .await .unwrap(); From 1a02afaec90e535351e11a16385757635a5b8115 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 15 Dec 2023 15:51:31 +0000 Subject: [PATCH 15/20] chore: polish comments Signed-off-by: Zhenchi --- src/index/src/inverted_index/create.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs index 27d59147e9de..864d8a707ad1 100644 --- a/src/index/src/inverted_index/create.rs +++ b/src/index/src/inverted_index/create.rs @@ -20,7 +20,7 @@ use async_trait::async_trait; use crate::inverted_index::error::Result; use crate::inverted_index::Bytes; -/// `IndexCreator` provides functionality to construct and finalize an index +/// `IndexCreator` provides functionality to construct an inverted index #[async_trait] pub trait IndexCreator { /// Adds a value to the named index. A `None` value represents an absence of data (null) @@ -28,7 +28,7 @@ pub trait IndexCreator { /// - `index_name`: Identifier for the index being built /// - `value`: The data to be indexed, or `None` for a null entry /// - /// Note: Call this method for each row in the dataset + /// Note: Caller should call this method for each row in the dataset async fn push_with_name(&mut self, index_name: &str, value: Option) -> Result<()>; /// Finalizes the index creation process, ensuring all data is properly indexed and stored From 1a775551ad90de72015252a3ff73dd0ea6b6c3d6 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Sat, 16 Dec 2023 10:56:08 +0000 Subject: [PATCH 16/20] feat: add check for total_row_count Signed-off-by: Zhenchi --- .../src/inverted_index/create/sort_create.rs | 72 ++++++++++++++++++- src/index/src/inverted_index/error.rs | 8 +++ 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/src/index/src/inverted_index/create/sort_create.rs b/src/index/src/inverted_index/create/sort_create.rs index b66829373ab1..19e8699655b7 100644 --- a/src/index/src/inverted_index/create/sort_create.rs +++ b/src/index/src/inverted_index/create/sort_create.rs @@ -15,10 +15,11 @@ use std::collections::HashMap; use async_trait::async_trait; +use snafu::ensure; use crate::inverted_index::create::sort::{SortOutput, Sorter}; use crate::inverted_index::create::IndexCreator; -use crate::inverted_index::error::Result; +use crate::inverted_index::error::{InconsistentRowCountSnafu, Result}; use crate::inverted_index::format::writer::InvertedIndexWriter; use crate::inverted_index::Bytes; @@ -62,18 +63,36 @@ impl IndexCreator for SortIndexCreator { /// Finalizes the sorting for all indexes and writes them using the inverted index writer async fn finish(&mut self) -> Result<()> { + let mut row_count = None; + for (index_name, mut sorter) in self.sorters.drain() { let SortOutput { null_bitmap, sorted_stream, + total_row_count, } = sorter.output().await?; + let expected_row_count = *row_count.get_or_insert(total_row_count); + ensure!( + expected_row_count == total_row_count, + InconsistentRowCountSnafu { + index_name, + total_row_count, + expected_row_count, + } + ); + self.index_writer .add_index(index_name, null_bitmap, sorted_stream) .await?; } - self.index_writer.finish().await + self.index_writer + .finish( + row_count.unwrap_or_default() as _, + self.segment_row_count as _, + ) + .await } } @@ -102,6 +121,7 @@ mod tests { use super::*; use crate::inverted_index::create::sort::SortedStream; + use crate::inverted_index::error::Error; use crate::inverted_index::format::writer::MockInvertedIndexWriter; fn stream_to_values(stream: SortedStream) -> Vec { @@ -126,7 +146,14 @@ mod tests { } Ok(()) }); - mock_writer.expect_finish().times(1).returning(|| Ok(())); + mock_writer + .expect_finish() + .times(1) + .returning(|total_row_count, segment_row_count| { + assert_eq!(total_row_count, 3); + assert_eq!(segment_row_count, 1); + Ok(()) + }); let mut creator = SortIndexCreator::new(NaiveSorter::factory(), Box::new(mock_writer), 1); @@ -148,6 +175,44 @@ mod tests { creator.finish().await.unwrap(); } + #[tokio::test] + async fn test_sort_index_creator_inconsistant_row_count() { + let mut mock_writer = MockInvertedIndexWriter::new(); + mock_writer + .expect_add_index() + .returning(|name, null_bitmap, stream| { + assert!(null_bitmap.is_empty()); + match name.as_str() { + "a" => assert_eq!(stream_to_values(stream), vec![b"1", b"2", b"3"]), + "b" => assert_eq!(stream_to_values(stream), vec![b"4", b"5", b"6"]), + "c" => assert_eq!(stream_to_values(stream), vec![b"1", b"2"]), + _ => panic!("unexpected index name: {}", name), + } + Ok(()) + }); + mock_writer.expect_finish().never(); + + let mut creator = SortIndexCreator::new(NaiveSorter::factory(), Box::new(mock_writer), 1); + + let index_values = vec![ + ("a", vec![b"3", b"2", b"1"]), + ("b", vec![b"6", b"5", b"4"]), + ("c", vec![b"1", b"2"]), + ]; + + for (index_name, values) in index_values { + for value in values { + creator + .push_with_name(index_name, Some(value.into())) + .await + .unwrap(); + } + } + + let res = creator.finish().await; + assert!(matches!(res, Err(Error::InconsistentRowCount { .. }))); + } + fn set_bit(bit_vec: &mut BitVec, index: usize) { if index >= bit_vec.len() { bit_vec.resize(index + 1, false); @@ -195,6 +260,7 @@ mod tests { .into_iter() .map(|(v, b)| Ok((v.unwrap(), b))), )), + total_row_count: self.total_row_count, }) } } diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index afb8ae12838b..b795e33003b7 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -160,6 +160,13 @@ pub enum Error { #[snafu(display("Unknown intermediate codec magic: {magic:?}"))] UnknownIntermediateCodecMagic { magic: [u8; 4], location: Location }, + + #[snafu(display("Inconsistent row count, index_name: {index_name}, total_row_count: {total_row_count}, expected: {expected_row_count}"))] + InconsistentRowCount { + index_name: String, + total_row_count: usize, + expected_row_count: usize, + }, } impl ErrorExt for Error { @@ -188,6 +195,7 @@ impl ErrorExt for Error { | IntersectionApplierWithInList { .. } | EmptyPredicates { .. } | FstInsert { .. } + | InconsistentRowCount { .. } | IndexNotFound { .. } => StatusCode::InvalidArguments, } } From a6c1f0d3b99dcb52b0a4a54969675614ec1137f5 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Sat, 16 Dec 2023 11:08:24 +0000 Subject: [PATCH 17/20] feat: lazy set meta of writer This reverts commit 63cb5bdb5c3a08406d978357d8167ca18ed1b83b. --- src/index/src/inverted_index/format/writer.rs | 3 ++- .../src/inverted_index/format/writer/blob.rs | 25 ++++++++----------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/src/index/src/inverted_index/format/writer.rs b/src/index/src/inverted_index/format/writer.rs index 2f240463d23d..21eec69abacf 100644 --- a/src/index/src/inverted_index/format/writer.rs +++ b/src/index/src/inverted_index/format/writer.rs @@ -41,5 +41,6 @@ pub trait InvertedIndexWriter: Send { ) -> Result<()>; /// Finalizes the index writing process, ensuring all data is written. - async fn finish(&mut self) -> Result<()>; + /// `total_row_count` and `segment_row_count` is used to fill in the metadata. + async fn finish(&mut self, total_row_count: u64, segment_row_count: u64) -> Result<()>; } diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index 03d47d545007..8a0c048f5dcb 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -60,7 +60,10 @@ impl InvertedIndexWriter for InvertedIndexBlobWrit Ok(()) } - async fn finish(&mut self) -> Result<()> { + async fn finish(&mut self, total_row_count: u64, segment_row_count: u64) -> Result<()> { + self.metas.segment_row_count = segment_row_count; + self.metas.total_row_count = total_row_count; + let metas_bytes = self.metas.encode_to_vec(); self.blob_writer .write_all(&metas_bytes) @@ -80,19 +83,11 @@ impl InvertedIndexWriter for InvertedIndexBlobWrit } impl InvertedIndexBlobWriter { - pub fn new( - blob_writer: W, - total_row_count: u64, - segment_row_count: u64, - ) -> InvertedIndexBlobWriter { + pub fn new(blob_writer: W) -> InvertedIndexBlobWriter { InvertedIndexBlobWriter { blob_writer, written_size: 0, - metas: InvertedIndexMetas { - total_row_count, - segment_row_count, - ..Default::default() - }, + metas: InvertedIndexMetas::default(), } } } @@ -112,8 +107,8 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_writer_write_empty() { let mut blob = Vec::new(); - let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1); - writer.finish().await.unwrap(); + let mut writer = InvertedIndexBlobWriter::new(&mut blob); + writer.finish(8, 1).await.unwrap(); let cursor = Cursor::new(blob); let mut reader = InvertedIndexBlobReader::new(cursor); @@ -126,7 +121,7 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_writer_write_basic() { let mut blob = Vec::new(); - let mut writer = InvertedIndexBlobWriter::new(&mut blob, 8, 1); + let mut writer = InvertedIndexBlobWriter::new(&mut blob); writer .add_index( "tag0".to_string(), @@ -151,7 +146,7 @@ mod tests { ) .await .unwrap(); - writer.finish().await.unwrap(); + writer.finish(8, 1).await.unwrap(); let cursor = Cursor::new(blob); let mut reader = InvertedIndexBlobReader::new(cursor); From 63ab226d4553cd652063e63bb1406a4eb214ae3a Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Sat, 16 Dec 2023 11:38:39 +0000 Subject: [PATCH 18/20] feat: lazyily provide inverted index writer Signed-off-by: Zhenchi --- src/index/src/inverted_index/create.rs | 8 ++- .../src/inverted_index/create/sort_create.rs | 70 ++++++++----------- 2 files changed, 36 insertions(+), 42 deletions(-) diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs index 864d8a707ad1..41f964e04c28 100644 --- a/src/index/src/inverted_index/create.rs +++ b/src/index/src/inverted_index/create.rs @@ -18,11 +18,12 @@ mod sort_create; use async_trait::async_trait; use crate::inverted_index::error::Result; +use crate::inverted_index::format::writer::InvertedIndexWriter; use crate::inverted_index::Bytes; -/// `IndexCreator` provides functionality to construct an inverted index +/// `InvertedIndexCreator` provides functionality to construct an inverted index #[async_trait] -pub trait IndexCreator { +pub trait InvertedIndexCreator { /// Adds a value to the named index. A `None` value represents an absence of data (null) /// /// - `index_name`: Identifier for the index being built @@ -32,5 +33,6 @@ pub trait IndexCreator { async fn push_with_name(&mut self, index_name: &str, value: Option) -> Result<()>; /// Finalizes the index creation process, ensuring all data is properly indexed and stored - async fn finish(&mut self) -> Result<()>; + /// in the provided writer + async fn finish(&mut self, writer: &mut dyn InvertedIndexWriter) -> Result<()>; } diff --git a/src/index/src/inverted_index/create/sort_create.rs b/src/index/src/inverted_index/create/sort_create.rs index 19e8699655b7..96c3f8dfa136 100644 --- a/src/index/src/inverted_index/create/sort_create.rs +++ b/src/index/src/inverted_index/create/sort_create.rs @@ -18,7 +18,7 @@ use async_trait::async_trait; use snafu::ensure; use crate::inverted_index::create::sort::{SortOutput, Sorter}; -use crate::inverted_index::create::IndexCreator; +use crate::inverted_index::create::InvertedIndexCreator; use crate::inverted_index::error::{InconsistentRowCountSnafu, Result}; use crate::inverted_index::format::writer::InvertedIndexWriter; use crate::inverted_index::Bytes; @@ -38,15 +38,12 @@ pub struct SortIndexCreator { /// Map of index names to sorters sorters: HashMap>, - /// Writer for inverted index data - index_writer: Box, - /// Number of rows in each segment, used to produce sorters segment_row_count: usize, } #[async_trait] -impl IndexCreator for SortIndexCreator { +impl InvertedIndexCreator for SortIndexCreator { /// Inserts a value or null into the sorter for the specified index async fn push_with_name(&mut self, index_name: &str, value: Option) -> Result<()> { match self.sorters.get_mut(index_name) { @@ -62,7 +59,7 @@ impl IndexCreator for SortIndexCreator { } /// Finalizes the sorting for all indexes and writes them using the inverted index writer - async fn finish(&mut self) -> Result<()> { + async fn finish(&mut self, writer: &mut dyn InvertedIndexWriter) -> Result<()> { let mut row_count = None; for (index_name, mut sorter) in self.sorters.drain() { @@ -82,12 +79,12 @@ impl IndexCreator for SortIndexCreator { } ); - self.index_writer + writer .add_index(index_name, null_bitmap, sorted_stream) .await?; } - self.index_writer + writer .finish( row_count.unwrap_or_default() as _, self.segment_row_count as _, @@ -98,15 +95,10 @@ impl IndexCreator for SortIndexCreator { impl SortIndexCreator { /// Creates a new `SortIndexCreator` with the given sorter factory and index writer - pub fn new( - sorter_factory: SorterFactory, - index_writer: Box, - segment_row_count: usize, - ) -> Self { + pub fn new(sorter_factory: SorterFactory, segment_row_count: usize) -> Self { Self { sorter_factory, sorters: HashMap::new(), - index_writer, segment_row_count, } } @@ -132,6 +124,23 @@ mod tests { #[tokio::test] async fn test_sort_index_creator_basic() { + let mut creator = SortIndexCreator::new(NaiveSorter::factory(), 1); + + let index_values = vec![ + ("a", vec![b"3", b"2", b"1"]), + ("b", vec![b"6", b"5", b"4"]), + ("c", vec![b"1", b"2", b"3"]), + ]; + + for (index_name, values) in index_values { + for value in values { + creator + .push_with_name(index_name, Some(value.into())) + .await + .unwrap(); + } + } + let mut mock_writer = MockInvertedIndexWriter::new(); mock_writer .expect_add_index() @@ -155,12 +164,17 @@ mod tests { Ok(()) }); - let mut creator = SortIndexCreator::new(NaiveSorter::factory(), Box::new(mock_writer), 1); + creator.finish(&mut mock_writer).await.unwrap(); + } + + #[tokio::test] + async fn test_sort_index_creator_inconsistent_row_count() { + let mut creator = SortIndexCreator::new(NaiveSorter::factory(), 1); let index_values = vec![ ("a", vec![b"3", b"2", b"1"]), ("b", vec![b"6", b"5", b"4"]), - ("c", vec![b"1", b"2", b"3"]), + ("c", vec![b"1", b"2"]), ]; for (index_name, values) in index_values { @@ -172,11 +186,6 @@ mod tests { } } - creator.finish().await.unwrap(); - } - - #[tokio::test] - async fn test_sort_index_creator_inconsistant_row_count() { let mut mock_writer = MockInvertedIndexWriter::new(); mock_writer .expect_add_index() @@ -192,24 +201,7 @@ mod tests { }); mock_writer.expect_finish().never(); - let mut creator = SortIndexCreator::new(NaiveSorter::factory(), Box::new(mock_writer), 1); - - let index_values = vec![ - ("a", vec![b"3", b"2", b"1"]), - ("b", vec![b"6", b"5", b"4"]), - ("c", vec![b"1", b"2"]), - ]; - - for (index_name, values) in index_values { - for value in values { - creator - .push_with_name(index_name, Some(value.into())) - .await - .unwrap(); - } - } - - let res = creator.finish().await; + let res = creator.finish(&mut mock_writer).await; assert!(matches!(res, Err(Error::InconsistentRowCount { .. }))); } From ef240ea37e7af779e241871cb50cf8ada86f775e Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Sat, 16 Dec 2023 11:46:55 +0000 Subject: [PATCH 19/20] chore: polish readability Signed-off-by: Zhenchi --- src/index/src/inverted_index/create/sort_create.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/index/src/inverted_index/create/sort_create.rs b/src/index/src/inverted_index/create/sort_create.rs index 96c3f8dfa136..adb10775c86b 100644 --- a/src/index/src/inverted_index/create/sort_create.rs +++ b/src/index/src/inverted_index/create/sort_create.rs @@ -84,12 +84,9 @@ impl InvertedIndexCreator for SortIndexCreator { .await?; } - writer - .finish( - row_count.unwrap_or_default() as _, - self.segment_row_count as _, - ) - .await + let total_row_count = row_count.unwrap_or_default() as _; + let segment_row_count = self.segment_row_count as _; + writer.finish(total_row_count, segment_row_count).await } } From 534774fa41f4f1f7eeb486f1f21c2db7f8394ff5 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 19 Dec 2023 09:22:17 +0000 Subject: [PATCH 20/20] feat: add push_with_name_n Signed-off-by: Zhenchi --- src/index/src/inverted_index/create.rs | 24 +++++++- .../src/inverted_index/create/sort_create.rs | 60 +++++++++++++++---- src/index/src/inverted_index/format/writer.rs | 4 +- .../src/inverted_index/format/writer/blob.rs | 10 ++-- 4 files changed, 76 insertions(+), 22 deletions(-) diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs index 98996765da16..db6bf1ad2595 100644 --- a/src/index/src/inverted_index/create.rs +++ b/src/index/src/inverted_index/create.rs @@ -29,9 +29,27 @@ pub trait InvertedIndexCreator { /// - `index_name`: Identifier for the index being built /// - `value`: The data to be indexed, or `None` for a null entry /// - /// Note: Caller should call this method for each row in the dataset - async fn push_with_name(&mut self, index_name: &str, value: Option>) - -> Result<()>; + /// It should be equivalent to calling `push_with_name_n` with `n = 1` + async fn push_with_name( + &mut self, + index_name: &str, + value: Option>, + ) -> Result<()> { + self.push_with_name_n(index_name, value, 1).await + } + + /// Adds `n` identical values to the named index. `None` values represent absence of data (null) + /// + /// - `index_name`: Identifier for the index being built + /// - `value`: The data to be indexed, or `None` for a null entry + /// + /// It should be equivalent to calling `push_with_name` `n` times + async fn push_with_name_n( + &mut self, + index_name: &str, + value: Option>, + n: usize, + ) -> Result<()>; /// Finalizes the index creation process, ensuring all data is properly indexed and stored /// in the provided writer diff --git a/src/index/src/inverted_index/create/sort_create.rs b/src/index/src/inverted_index/create/sort_create.rs index 54274f4a7ac3..b491c0a8b444 100644 --- a/src/index/src/inverted_index/create/sort_create.rs +++ b/src/index/src/inverted_index/create/sort_create.rs @@ -45,19 +45,23 @@ pub struct SortIndexCreator { #[async_trait] impl InvertedIndexCreator for SortIndexCreator { - /// Inserts a value or null into the sorter for the specified index - async fn push_with_name( + /// Inserts `n` values or nulls into the sorter for the specified index. + /// + /// If the index does not exist, a new index is created even if `n` is 0. + /// Caller may leverage this behavior to create indexes with no data. + async fn push_with_name_n( &mut self, index_name: &str, value: Option>, + n: usize, ) -> Result<()> { match self.sorters.get_mut(index_name) { - Some(sorter) => sorter.push(value).await, + Some(sorter) => sorter.push_n(value, n).await, None => { - let mut sorter = - (self.sorter_factory)(index_name.to_owned(), self.segment_row_count); - sorter.push(value).await?; - self.sorters.insert(index_name.to_owned(), sorter); + let index_name = index_name.to_string(); + let mut sorter = (self.sorter_factory)(index_name.clone(), self.segment_row_count); + sorter.push_n(value, n).await?; + self.sorters.insert(index_name, sorter); Ok(()) } } @@ -118,12 +122,6 @@ mod tests { use crate::inverted_index::format::writer::MockInvertedIndexWriter; use crate::inverted_index::Bytes; - fn stream_to_values(stream: SortedStream) -> Vec { - futures::executor::block_on(async { - stream.map(|r| r.unwrap().0).collect::>().await - }) - } - #[tokio::test] async fn test_sort_index_creator_basic() { let mut creator = @@ -209,6 +207,36 @@ mod tests { assert!(matches!(res, Err(Error::InconsistentRowCount { .. }))); } + #[tokio::test] + async fn test_sort_index_creator_create_indexes_without_data() { + let mut creator = + SortIndexCreator::new(NaiveSorter::factory(), NonZeroUsize::new(1).unwrap()); + + creator.push_with_name_n("a", None, 0).await.unwrap(); + creator.push_with_name_n("b", None, 0).await.unwrap(); + creator.push_with_name_n("c", None, 0).await.unwrap(); + + let mut mock_writer = MockInvertedIndexWriter::new(); + mock_writer + .expect_add_index() + .returning(|name, null_bitmap, stream| { + assert!(null_bitmap.is_empty()); + assert!(matches!(name.as_str(), "a" | "b" | "c")); + assert!(stream_to_values(stream).is_empty()); + Ok(()) + }); + mock_writer + .expect_finish() + .times(1) + .returning(|total_row_count, segment_row_count| { + assert_eq!(total_row_count, 0); + assert_eq!(segment_row_count.get(), 1); + Ok(()) + }); + + creator.finish(&mut mock_writer).await.unwrap(); + } + fn set_bit(bit_vec: &mut BitVec, index: usize) { if index >= bit_vec.len() { bit_vec.resize(index + 1, false); @@ -267,4 +295,10 @@ mod tests { }) } } + + fn stream_to_values(stream: SortedStream) -> Vec { + futures::executor::block_on(async { + stream.map(|r| r.unwrap().0).collect::>().await + }) + } } diff --git a/src/index/src/inverted_index/format/writer.rs b/src/index/src/inverted_index/format/writer.rs index 32115ec1816a..176b1f1561f1 100644 --- a/src/index/src/inverted_index/format/writer.rs +++ b/src/index/src/inverted_index/format/writer.rs @@ -25,6 +25,8 @@ use crate::inverted_index::error::Result; pub use crate::inverted_index::format::writer::blob::InvertedIndexBlobWriter; use crate::inverted_index::Bytes; +pub type ValueStream = Box> + Send + Unpin>; + /// Trait for writing inverted index data to underlying storage. #[mockall::automock] #[async_trait] @@ -39,7 +41,7 @@ pub trait InvertedIndexWriter: Send { &mut self, name: String, null_bitmap: BitVec, - values: Box> + Send + Unpin>, + values: ValueStream, ) -> Result<()>; /// Finalizes the index writing process, ensuring all data is written. diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index 21f45b687827..07f39af46ecc 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -16,15 +16,14 @@ use std::num::NonZeroUsize; use async_trait::async_trait; use common_base::BitVec; -use futures::{AsyncWrite, AsyncWriteExt, Stream}; +use futures::{AsyncWrite, AsyncWriteExt}; use greptime_proto::v1::index::InvertedIndexMetas; use prost::Message; use snafu::ResultExt; -use super::single::SingleIndexWriter; use crate::inverted_index::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu}; -use crate::inverted_index::format::writer::InvertedIndexWriter; -use crate::inverted_index::Bytes; +use crate::inverted_index::format::writer::single::SingleIndexWriter; +use crate::inverted_index::format::writer::{InvertedIndexWriter, ValueStream}; /// `InvertedIndexBlobWriter`, implemented [`InvertedIndexWriter`], manages /// writing of an inverted index to a blob storage. @@ -45,7 +44,7 @@ impl InvertedIndexWriter for InvertedIndexBlobWrit &mut self, name: String, null_bitmap: BitVec, - values: Box> + Send + Unpin>, + values: ValueStream, ) -> Result<()> { let single_writer = SingleIndexWriter::new( name.clone(), @@ -105,6 +104,7 @@ mod tests { use super::*; use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader}; + use crate::inverted_index::Bytes; fn unpack(fst_value: u64) -> [u32; 2] { bytemuck::cast::(fst_value)