Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metric engine): label mismatch in metric engine #3927

Merged
merged 6 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 173 additions & 1 deletion src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,24 @@ impl MemtableBuilder for PartitionTreeMemtableBuilder {

#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
use api::v1::{Row, Rows, SemanticType};
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::data_type::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use datatypes::vectors::Int64Vector;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;

use super::*;
use crate::test_util::memtable_util::{self, collect_iter_timestamps};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::test_util::memtable_util::{
self, collect_iter_timestamps, region_metadata_to_row_schema,
};

#[test]
fn test_memtable_sorted_input() {
Expand Down Expand Up @@ -562,4 +572,166 @@ mod tests {
assert!(config.dedup);
assert_eq!(PartitionTreeConfig::default(), config);
}

fn metadata_for_metric_engine() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"__table_id",
ConcreteDataType::uint32_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 2147483652,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"__ts_id",
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
ConcreteDataType::uint64_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 2147483651,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"test_label",
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_timestamp",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_value",
ConcreteDataType::float64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 1,
})
.primary_key(vec![2147483652, 2147483651, 2]);
let region_metadata = builder.build().unwrap();
Arc::new(region_metadata)
}

fn build_key_values(
metadata: RegionMetadataRef,
labels: &[&str],
table_id: &[u32],
ts_id: &[u64],
ts: &[i64],
values: &[f64],
sequence: u64,
) -> KeyValues {
let column_schema = region_metadata_to_row_schema(&metadata);

let rows = ts
.iter()
.zip(table_id.iter())
.zip(ts_id.iter())
.zip(labels.iter())
.zip(values.iter())
.map(|((((ts, table_id), ts_id), label), val)| Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::U32Value(*table_id)),
},
api::v1::Value {
value_data: Some(ValueData::U64Value(*ts_id)),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(label.to_string())),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
},
api::v1::Value {
value_data: Some(ValueData::F64Value(*val)),
},
],
})
.collect();
let mutation = api::v1::Mutation {
op_type: 1,
sequence,
rows: Some(Rows {
schema: column_schema,
rows,
}),
};
KeyValues::new(metadata.as_ref(), mutation).unwrap()
}

#[test]
fn test_write_freeze() {
let metadata = metadata_for_metric_engine();
let memtable = PartitionTreeMemtableBuilder::new(
PartitionTreeConfig {
index_max_keys_per_shard: 40,
..Default::default()
},
None,
)
.build(1, &metadata);

let codec = McmpRowCodec::new(
metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);

memtable
.write(&build_key_values(
metadata.clone(),
&["daily", "10min", "daily", "10min"],
&[1025, 1025, 1025, 1025],
&[
16442255374049317291,
5686004715529701024,
16442255374049317291,
5686004715529701024,
],
&[1712070000000, 1712717731000, 1712761200000, 1712761200000],
&[0.0, 0.0, 0.0, 0.0],
1,
))
.unwrap();

memtable.freeze().unwrap();
let new_memtable = memtable.fork(2, &metadata);

new_memtable
.write(&build_key_values(
metadata.clone(),
&["10min"],
&[1025],
&[5686004715529701024],
&[1714643131000],
&[0.1],
2,
))
.unwrap();

let mut reader = new_memtable.iter(None, None).unwrap();
let batch = reader.next().unwrap().unwrap();
let pk = codec.decode(batch.primary_key()).unwrap();
if let Value::String(s) = &pk[2] {
assert_eq!("10min", s.as_utf8());
} else {
unreachable!()
}
}
}
74 changes: 48 additions & 26 deletions src/mito2/src/memtable/partition_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use crate::metrics::MEMTABLE_DICT_BYTES;
/// Maximum keys in a [DictBlock].
const MAX_KEYS_PER_BLOCK: u16 = 256;

type PkIndexMap = BTreeMap<Vec<u8>, PkIndex>;
/// The key is mcmp-encoded primary keys, while the values are the pk index and
/// optionally sparsely encoded primary keys.
type PkIndexMap = BTreeMap<Vec<u8>, (PkIndex, Option<Vec<u8>>)>;
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved

/// Builder to build a key dictionary.
pub struct KeyDictBuilder {
Expand Down Expand Up @@ -66,10 +68,15 @@ impl KeyDictBuilder {
///
/// # Panics
/// Panics if the builder is full.
pub fn insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> PkIndex {
pub fn insert_key(
&mut self,
full_primary_key: &[u8],
sparse_key: Option<&[u8]>,
metrics: &mut WriteMetrics,
) -> PkIndex {
assert!(!self.is_full());

if let Some(pk_index) = self.pk_to_index.get(key).copied() {
if let Some(pk_index) = self.pk_to_index.get(full_primary_key).map(|v| v.0) {
// Already in the builder.
return pk_index;
}
Expand All @@ -81,16 +88,22 @@ impl KeyDictBuilder {
}

// Safety: we have checked the buffer length.
let pk_index = self.key_buffer.push_key(key);
self.pk_to_index.insert(key.to_vec(), pk_index);
let pk_index = self.key_buffer.push_key(full_primary_key);
let (sparse_key, sparse_key_len) = if let Some(sparse_key) = sparse_key {
(Some(sparse_key.to_vec()), sparse_key.len())
} else {
(None, 0)
};
self.pk_to_index
.insert(full_primary_key.to_vec(), (pk_index, sparse_key));
self.num_keys += 1;

// Since we store the key twice so the bytes usage doubled.
metrics.key_bytes += key.len() * 2;
self.key_bytes_in_index += key.len();
metrics.key_bytes += full_primary_key.len() * 2 + sparse_key_len;
self.key_bytes_in_index += full_primary_key.len();

// Adds key size of index to the metrics.
MEMTABLE_DICT_BYTES.add(key.len() as i64);
MEMTABLE_DICT_BYTES.add((full_primary_key.len() + sparse_key_len) as i64);

pk_index
}
Expand All @@ -108,37 +121,46 @@ impl KeyDictBuilder {
}

/// Finishes the builder.
pub fn finish(&mut self, pk_to_index: &mut BTreeMap<Vec<u8>, PkIndex>) -> Option<KeyDict> {
pub fn finish(&mut self) -> Option<(KeyDict, BTreeMap<Vec<u8>, PkIndex>)> {
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
if self.key_buffer.is_empty() {
return None;
}
let mut pk_to_index_map = BTreeMap::new();

// Finishes current dict block and resets the pk index.
let dict_block = self.key_buffer.finish(true);
self.dict_blocks.push(dict_block);
// Computes key position and then alter pk index.
let mut key_positions = vec![0; self.pk_to_index.len()];
for (i, pk_index) in self.pk_to_index.values_mut().enumerate() {

for (i, (_full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index))
.into_iter()
.enumerate()
{
// The position of the i-th key is the old pk index.
key_positions[i] = *pk_index;
// Overwrites the pk index.
*pk_index = i as PkIndex;
key_positions[i] = pk_index;
if let Some(sparse_key) = sparse_key {
pk_to_index_map.insert(sparse_key, i as PkIndex);
}
}

self.num_keys = 0;
let key_bytes_in_index = self.key_bytes_in_index;
self.key_bytes_in_index = 0;
*pk_to_index = std::mem::take(&mut self.pk_to_index);

Some(KeyDict {
dict_blocks: std::mem::take(&mut self.dict_blocks),
key_positions,
key_bytes_in_index,
})
Some((
KeyDict {
dict_blocks: std::mem::take(&mut self.dict_blocks),
key_positions,
key_bytes_in_index,
},
pk_to_index_map,
))
}

/// Reads the builder.
pub fn read(&self) -> DictBuilderReader {
let sorted_pk_indices = self.pk_to_index.values().copied().collect();
let sorted_pk_indices = self.pk_to_index.values().map(|v| v.0).collect();
let block = self.key_buffer.finish_cloned();
let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1);
blocks.extend_from_slice(&self.dict_blocks);
Expand Down Expand Up @@ -394,7 +416,7 @@ mod tests {
let mut metrics = WriteMetrics::default();
for key in &keys {
assert!(!builder.is_full());
let pk_index = builder.insert_key(key, &mut metrics);
let pk_index = builder.insert_key(key, None, &mut metrics);
last_pk_index = Some(pk_index);
}
assert_eq!(num_keys - 1, last_pk_index.unwrap());
Expand Down Expand Up @@ -426,14 +448,14 @@ mod tests {
for i in 0..num_keys {
// Each key is 5 bytes.
let key = format!("{i:05}");
builder.insert_key(key.as_bytes(), &mut metrics);
builder.insert_key(key.as_bytes(), None, &mut metrics);
}
let key_bytes = num_keys as usize * 5;
assert_eq!(key_bytes * 2, metrics.key_bytes);
assert_eq!(key_bytes, builder.key_bytes_in_index);
assert_eq!(8850, builder.memory_size());

let dict = builder.finish(&mut BTreeMap::new()).unwrap();
let (dict, _) = builder.finish().unwrap();
assert_eq!(0, builder.key_bytes_in_index);
assert_eq!(key_bytes, dict.key_bytes_in_index);
assert!(dict.shared_memory_size() > key_bytes);
Expand All @@ -446,12 +468,12 @@ mod tests {
for i in 0..MAX_KEYS_PER_BLOCK * 2 {
let key = format!("{i:010}");
assert!(!builder.is_full());
builder.insert_key(key.as_bytes(), &mut metrics);
builder.insert_key(key.as_bytes(), None, &mut metrics);
}
assert!(builder.is_full());
builder.finish(&mut BTreeMap::new());
builder.finish();

assert!(!builder.is_full());
assert_eq!(0, builder.insert_key(b"a0", &mut metrics));
assert_eq!(0, builder.insert_key(b"a0", None, &mut metrics));
}
}
11 changes: 7 additions & 4 deletions src/mito2/src/memtable/partition_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,18 @@ impl Partition {
let sparse_key = primary_key.clone();
primary_key.clear();
row_codec.encode_to_vec(key_value.primary_keys(), primary_key)?;
let pk_id = inner
.shard_builder
.write_with_key(primary_key, &key_value, metrics);
let pk_id = inner.shard_builder.write_with_key(
primary_key,
Some(&sparse_key),
&key_value,
metrics,
);
inner.pk_to_pk_id.insert(sparse_key, pk_id);
evenyag marked this conversation as resolved.
Show resolved Hide resolved
} else {
// `primary_key` is already the full primary key.
let pk_id = inner
.shard_builder
.write_with_key(primary_key, &key_value, metrics);
.write_with_key(primary_key, None, &key_value, metrics);
inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
};

Expand Down
5 changes: 2 additions & 3 deletions src/mito2/src/memtable/partition_tree/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ impl Node for ShardNode {

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::sync::Arc;

use super::*;
Expand Down Expand Up @@ -488,10 +487,10 @@ mod tests {
encode_keys(&metadata, kvs, &mut keys);
}
for key in &keys {
dict_builder.insert_key(key, &mut metrics);
dict_builder.insert_key(key, None, &mut metrics);
}

let dict = dict_builder.finish(&mut BTreeMap::new()).unwrap();
let (dict, _) = dict_builder.finish().unwrap();
let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);

Shard::new(
Expand Down
Loading