Skip to content

Commit

Permalink
feat(meta): compress the encoded model if it's too large in kv meta s…
Browse files Browse the repository at this point in the history
…tore (#17315)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored and kwannoel committed Aug 14, 2024
1 parent 3149e4c commit 7bd8bd6
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 51 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ either = "1"
enum-as-inner = "0.6"
etcd-client = { workspace = true }
fail = "0.5"
flate2 = "1"
function_name = "0.3.0"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hex = "0.4"
Expand Down
5 changes: 0 additions & 5 deletions src/meta/src/hummock/model/compact_task_assignment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prost::Message;
use risingwave_hummock_sdk::HummockCompactionTaskId;
use risingwave_pb::hummock::CompactTaskAssignment;

Expand All @@ -32,10 +31,6 @@ impl MetadataModel for CompactTaskAssignment {
self.clone()
}

fn to_protobuf_encoded_vec(&self) -> Vec<u8> {
self.encode_to_vec()
}

fn from_protobuf(prost: Self::PbType) -> Self {
prost
}
Expand Down
5 changes: 0 additions & 5 deletions src/meta/src/hummock/model/pinned_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prost::Message;
use risingwave_hummock_sdk::HummockContextId;
use risingwave_pb::hummock::HummockPinnedSnapshot;

Expand All @@ -32,10 +31,6 @@ impl MetadataModel for HummockPinnedSnapshot {
self.clone()
}

fn to_protobuf_encoded_vec(&self) -> Vec<u8> {
self.encode_to_vec()
}

fn from_protobuf(prost: Self::PbType) -> Self {
prost
}
Expand Down
5 changes: 0 additions & 5 deletions src/meta/src/hummock/model/pinned_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prost::Message;
use risingwave_hummock_sdk::HummockContextId;
use risingwave_pb::hummock::HummockPinnedVersion;

Expand All @@ -32,10 +31,6 @@ impl MetadataModel for HummockPinnedVersion {
self.clone()
}

fn to_protobuf_encoded_vec(&self) -> Vec<u8> {
self.encode_to_vec()
}

fn from_protobuf(prost: Self::PbType) -> Self {
prost
}
Expand Down
5 changes: 0 additions & 5 deletions src/meta/src/hummock/model/version_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prost::Message;
use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::hummock::PbHummockVersionDelta;
Expand All @@ -33,10 +32,6 @@ impl MetadataModel for HummockVersionDelta {
self.to_protobuf()
}

fn to_protobuf_encoded_vec(&self) -> Vec<u8> {
self.to_protobuf().encode_to_vec()
}

fn from_protobuf(prost: Self::PbType) -> Self {
Self::from_persisted_protobuf(&prost)
}
Expand Down
5 changes: 0 additions & 5 deletions src/meta/src/hummock/model/version_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prost::Message;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::hummock::HummockVersionStats;

Expand All @@ -33,10 +32,6 @@ impl MetadataModel for HummockVersionStats {
self.clone()
}

fn to_protobuf_encoded_vec(&self) -> Vec<u8> {
self.encode_to_vec()
}

fn from_protobuf(prost: Self::PbType) -> Self {
prost
}
Expand Down
146 changes: 120 additions & 26 deletions src/meta/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ use std::collections::btree_map::{Entry, VacantEntry};
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::io::{Read, Write};
use std::ops::{Deref, DerefMut};

use anyhow::Context as _;
use async_trait::async_trait;
pub use cluster::*;
pub use error::*;
pub use migration_plan::*;
pub use notification::*;
use prost::Message;
pub use stream::*;

use crate::hummock::model::ext::Transaction as TransactionV2;
Expand Down Expand Up @@ -59,31 +60,83 @@ mod private {
pub trait MetadataModelMarker {}
}

/// Compress the value if it's larger then the threshold to avoid hitting the limit of etcd.
///
/// By default, the maximum size of any request to etcd is 1.5 MB. So we use a slightly
/// smaller value here. However, note that this is still a best-effort approach, as the
/// compressed size may still exceed the limit, in which case we should set the parameter
/// `--max-request-bytes` of etcd to a larger value.
const MODEL_COMPRESSION_THRESHOLD: usize = 1 << 20;

/// `MetadataModel` defines basic model operations in CRUD.
// TODO: better to move the methods that we don't want implementors to override to a separate
// extension trait.
#[async_trait]
pub trait MetadataModel: std::fmt::Debug + Sized + private::MetadataModelMarker {
/// Serialized prost message type.
type PbType: Message + Default;
type PbType: prost::Message + Default;
/// Serialized key type.
type KeyType: Message;
type KeyType: prost::Message;

/// Column family for this model.
fn cf_name() -> String;

/// Serialize to protobuf.
fn to_protobuf(&self) -> Self::PbType;

/// Serialize to protobuf encoded byte vector.
fn to_protobuf_encoded_vec(&self) -> Vec<u8> {
self.to_protobuf().encode_to_vec()
}

/// Deserialize from protobuf.
fn from_protobuf(prost: Self::PbType) -> Self;

/// Current record key.
fn key(&self) -> MetadataModelResult<Self::KeyType>;

/// Encode key to bytes. Should not be overridden.
fn encode_key(key: &Self::KeyType) -> Vec<u8> {
use prost::Message;
key.encode_to_vec()
}

/// Encode value to bytes. Should not be overridden.
fn encode_value(value: &Self::PbType) -> Vec<u8> {
use flate2::write::GzEncoder;
use flate2::Compression;
use prost::Message;

let pb_encoded = value.encode_to_vec();

// Compress the value if it's larger then the threshold to avoid hitting the limit of etcd.
if pb_encoded.len() > MODEL_COMPRESSION_THRESHOLD {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&pb_encoded).unwrap();
encoder.finish().unwrap()
} else {
pb_encoded
}
}

/// Decode value from bytes. Should not be overridden.
fn decode_value(value: &[u8]) -> MetadataModelResult<Self::PbType> {
use flate2::bufread::GzDecoder;
use prost::Message;

let mut decoder = GzDecoder::new(value);
let mut buf = Vec::new();

// If the value is compressed, decode it.
// This works because a protobuf-encoded message is never a valid gzip stream.
// https://stackoverflow.com/questions/63621784/can-a-protobuf-message-begin-with-a-gzip-magic-number
let value = if decoder.header().is_some() {
decoder
.read_to_end(&mut buf)
.context("failed to decode gzipped value")?;
buf.as_slice()
} else {
value
};

Self::PbType::decode(value).map_err(Into::into)
}

/// `list` returns all records in this model.
async fn list<S>(store: &S) -> MetadataModelResult<Vec<Self>>
where
Expand All @@ -92,11 +145,7 @@ pub trait MetadataModel: std::fmt::Debug + Sized + private::MetadataModelMarker
let bytes_vec = store.list_cf(&Self::cf_name()).await?;
bytes_vec
.iter()
.map(|(_k, v)| {
Self::PbType::decode(v.as_slice())
.map(Self::from_protobuf)
.map_err(Into::into)
})
.map(|(_k, v)| Self::decode_value(v.as_slice()).map(Self::from_protobuf))
.collect()
}

Expand All @@ -107,11 +156,7 @@ pub trait MetadataModel: std::fmt::Debug + Sized + private::MetadataModelMarker
let bytes_vec = snapshot.list_cf(&Self::cf_name()).await?;
bytes_vec
.iter()
.map(|(_k, v)| {
Self::PbType::decode(v.as_slice())
.map(Self::from_protobuf)
.map_err(Into::into)
})
.map(|(_k, v)| Self::decode_value(v.as_slice()).map(Self::from_protobuf))
.collect()
}

Expand All @@ -123,8 +168,8 @@ pub trait MetadataModel: std::fmt::Debug + Sized + private::MetadataModelMarker
store
.put_cf(
&Self::cf_name(),
self.key()?.encode_to_vec(),
self.to_protobuf().encode_to_vec(),
Self::encode_key(&self.key()?),
Self::encode_value(&self.to_protobuf()),
)
.await
.map_err(Into::into)
Expand All @@ -136,7 +181,7 @@ pub trait MetadataModel: std::fmt::Debug + Sized + private::MetadataModelMarker
S: MetaStore,
{
store
.delete_cf(&Self::cf_name(), &key.encode_to_vec())
.delete_cf(&Self::cf_name(), &Self::encode_key(key))
.await
.map_err(Into::into)
}
Expand All @@ -146,7 +191,7 @@ pub trait MetadataModel: std::fmt::Debug + Sized + private::MetadataModelMarker
where
S: MetaStore,
{
let byte_vec = match store.get_cf(&Self::cf_name(), &key.encode_to_vec()).await {
let byte_vec = match store.get_cf(&Self::cf_name(), &Self::encode_key(key)).await {
Ok(byte_vec) => byte_vec,
Err(err) => {
if !matches!(err, MetaStoreError::ItemNotFound(_)) {
Expand All @@ -155,7 +200,7 @@ pub trait MetadataModel: std::fmt::Debug + Sized + private::MetadataModelMarker
return Ok(None);
}
};
let model = Self::from_protobuf(Self::PbType::decode(byte_vec.as_slice())?);
let model = Self::from_protobuf(Self::decode_value(byte_vec.as_slice())?);
Ok(Some(model))
}
}
Expand Down Expand Up @@ -210,14 +255,14 @@ where
async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
trx.put(
Self::cf_name(),
self.key()?.encode_to_vec(),
self.to_protobuf_encoded_vec(),
Self::encode_key(&self.key()?),
Self::encode_value(&self.to_protobuf()),
);
Ok(())
}

async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> {
trx.delete(Self::cf_name(), self.key()?.encode_to_vec());
trx.delete(Self::cf_name(), Self::encode_key(&self.key()?));
Ok(())
}
}
Expand Down Expand Up @@ -930,6 +975,8 @@ impl<'a, T: Transactional<Transaction> + Transactional<TransactionV2> + Clone> D

#[cfg(test)]
mod tests {
use itertools::Itertools;

use super::*;
use crate::storage::Operation;

Expand Down Expand Up @@ -958,6 +1005,53 @@ mod tests {
}
}

#[tokio::test]
async fn test_compress_decompress() {
use prost::Message;
use risingwave_pb::catalog::Database;

use crate::storage::MemStore;

async fn do_test(len: usize) {
// Use `Database` as a test model.
type Model = Database;

let store = MemStore::new();
let model = Model {
name: "t".repeat(len),
..Default::default()
};
{
let encoded_len = model.encoded_len();
// Showing that the encoded length is larger than the original length.
// So that a len greater than the threshold will hit the compression branch.
assert!(encoded_len >= len, "encoded_len: {encoded_len}, len: {len}");
}
model.insert(&store).await.unwrap();

// Test `list`
let decoded = Model::list(&store)
.await
.unwrap()
.into_iter()
.exactly_one()
.unwrap();
assert_eq!(model, decoded);

// Test `select`
let decoded = Model::select(&store, &model.key().unwrap())
.await
.unwrap()
.into_iter()
.exactly_one()
.unwrap();
assert_eq!(model, decoded);
}

do_test(1).await;
do_test(MODEL_COMPRESSION_THRESHOLD + 1).await;
}

#[tokio::test]
async fn test_simple_var_transaction_commit() {
let mut kv = TestTransactional {
Expand Down

0 comments on commit 7bd8bd6

Please sign in to comment.