Skip to content

Commit

Permalink
refactor: simplify the PaginationStream (#3787)
Browse files Browse the repository at this point in the history
* refactor: simplify the `PaginationStream`

* refactor: refactor decode fn

* fix: fix clippy
  • Loading branch information
WenyXu authored Apr 24, 2024
1 parent b619950 commit 20a933e
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 41 deletions.
8 changes: 4 additions & 4 deletions src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ impl MigrateTableMetadata {
let key = v1SchemaKey::parse(key_str)
.unwrap_or_else(|e| panic!("schema key is corrupted: {e}, key: {key_str}"));

Ok((key, ()))
Ok(key)
}),
);
while let Some((key, _)) = stream.try_next().await.context(error::IterStreamSnafu)? {
while let Some(key) = stream.try_next().await.context(error::IterStreamSnafu)? {
let _ = self.migrate_schema_key(&key).await;
keys.push(key.to_string().as_bytes().to_vec());
}
Expand Down Expand Up @@ -244,10 +244,10 @@ impl MigrateTableMetadata {
let key = v1CatalogKey::parse(key_str)
.unwrap_or_else(|e| panic!("catalog key is corrupted: {e}, key: {key_str}"));

Ok((key, ()))
Ok(key)
}),
);
while let Some((key, _)) = stream.try_next().await.context(error::IterStreamSnafu)? {
while let Some(key) = stream.try_next().await.context(error::IterStreamSnafu)? {
let _ = self.migrate_catalog_key(&key).await;
keys.push(key.to_string().as_bytes().to_vec());
}
Expand Down
7 changes: 3 additions & 4 deletions src/common/meta/src/key/catalog_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;

use common_catalog::consts::DEFAULT_CATALOG_NAME;
use futures::stream::BoxStream;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

Expand Down Expand Up @@ -84,11 +83,11 @@ impl<'a> TryFrom<&'a str> for CatalogNameKey<'a> {
}

/// Decoder `KeyValue` to ({catalog},())
pub fn catalog_decoder(kv: KeyValue) -> Result<(String, ())> {
pub fn catalog_decoder(kv: KeyValue) -> Result<String> {
let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?;
let catalog_name = CatalogNameKey::try_from(str)?;

Ok((catalog_name.catalog.to_string(), ()))
Ok(catalog_name.catalog.to_string())
}

pub struct CatalogManager {
Expand Down Expand Up @@ -134,7 +133,7 @@ impl CatalogManager {
Arc::new(catalog_decoder),
);

Box::pin(stream.map(|kv| kv.map(|kv| kv.0)))
Box::pin(stream)
}
}

Expand Down
9 changes: 3 additions & 6 deletions src/common/meta/src/key/datanode_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::sync::Arc;

use futures::stream::BoxStream;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use store_api::storage::RegionNumber;
Expand Down Expand Up @@ -126,10 +125,8 @@ impl DatanodeTableValue {
}

/// Decodes `KeyValue` to ((),`DatanodeTableValue`)
pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<((), DatanodeTableValue)> {
let value = DatanodeTableValue::try_from_raw_value(&kv.value)?;

Ok(((), value))
pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<DatanodeTableValue> {
DatanodeTableValue::try_from_raw_value(&kv.value)
}

pub struct DatanodeTableManager {
Expand Down Expand Up @@ -163,7 +160,7 @@ impl DatanodeTableManager {
Arc::new(datanode_table_value_decoder),
);

Box::pin(stream.map(|kv| kv.map(|kv| kv.1)))
Box::pin(stream)
}

/// Builds the create datanode table transactions. It only executes while the primary keys comparing successes.
Expand Down
7 changes: 3 additions & 4 deletions src/common/meta/src/key/schema_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::time::Duration;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use futures::stream::BoxStream;
use futures::StreamExt;
use humantime_serde::re::humantime;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
Expand Down Expand Up @@ -103,11 +102,11 @@ impl TableMetaKey for SchemaNameKey<'_> {
}

/// Decodes `KeyValue` to ({schema},())
pub fn schema_decoder(kv: KeyValue) -> Result<(String, ())> {
pub fn schema_decoder(kv: KeyValue) -> Result<String> {
let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?;
let schema_name = SchemaNameKey::try_from(str)?;

Ok((schema_name.schema.to_string(), ()))
Ok(schema_name.schema.to_string())
}

impl<'a> TryFrom<&'a str> for SchemaNameKey<'a> {
Expand Down Expand Up @@ -193,7 +192,7 @@ impl SchemaManager {
Arc::new(schema_decoder),
);

Box::pin(stream.map(|kv| kv.map(|kv| kv.0)))
Box::pin(stream)
}
}

Expand Down
33 changes: 17 additions & 16 deletions src/common/meta/src/range_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ use crate::rpc::store::{RangeRequest, RangeResponse};
use crate::rpc::KeyValue;
use crate::util::get_next_prefix_key;

pub type KeyValueDecoderFn<K, V> = dyn Fn(KeyValue) -> Result<(K, V)> + Send + Sync;
pub type KeyValueDecoderFn<T> = dyn Fn(KeyValue) -> Result<T> + Send + Sync;

enum PaginationStreamState<K, V> {
enum PaginationStreamState<T> {
/// At the start of reading.
Init,
/// Decoding key value pairs.
Decoding(SimpleKeyValueDecoder<K, V>),
Decoding(SimpleKeyValueDecoder<T>),
/// Retrieving data from backend.
Reading(BoxFuture<'static, Result<(PaginationStreamFactory, Option<RangeResponse>)>>),
/// Error
Expand Down Expand Up @@ -77,7 +77,7 @@ struct PaginationStreamFactory {
}

impl PaginationStreamFactory {
pub fn new(
fn new(
kv: &KvBackendRef,
key: Vec<u8>,
range_end: Vec<u8>,
Expand Down Expand Up @@ -137,7 +137,7 @@ impl PaginationStreamFactory {
}
}

pub async fn read_next(mut self) -> Result<(Self, Option<RangeResponse>)> {
async fn read_next(mut self) -> Result<(Self, Option<RangeResponse>)> {
if self.more {
let resp = self
.adaptive_range(RangeRequest {
Expand Down Expand Up @@ -174,18 +174,19 @@ impl PaginationStreamFactory {
}
}

pub struct PaginationStream<K, V> {
state: PaginationStreamState<K, V>,
decoder_fn: Arc<KeyValueDecoderFn<K, V>>,
pub struct PaginationStream<T> {
state: PaginationStreamState<T>,
decoder_fn: Arc<KeyValueDecoderFn<T>>,
factory: Option<PaginationStreamFactory>,
}

impl<K, V> PaginationStream<K, V> {
impl<T> PaginationStream<T> {
/// Returns a new [PaginationStream].
pub fn new(
kv: KvBackendRef,
req: RangeRequest,
page_size: usize,
decoder_fn: Arc<KeyValueDecoderFn<K, V>>,
decoder_fn: Arc<KeyValueDecoderFn<T>>,
) -> Self {
Self {
state: PaginationStreamState::Init,
Expand All @@ -202,13 +203,13 @@ impl<K, V> PaginationStream<K, V> {
}
}

struct SimpleKeyValueDecoder<K, V> {
struct SimpleKeyValueDecoder<T> {
kv: VecDeque<KeyValue>,
decoder: Arc<KeyValueDecoderFn<K, V>>,
decoder: Arc<KeyValueDecoderFn<T>>,
}

impl<K, V> Iterator for SimpleKeyValueDecoder<K, V> {
type Item = Result<(K, V)>;
impl<T> Iterator for SimpleKeyValueDecoder<T> {
type Item = Result<T>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(kv) = self.kv.pop_front() {
Expand All @@ -219,8 +220,8 @@ impl<K, V> Iterator for SimpleKeyValueDecoder<K, V> {
}
}

impl<K, V> Stream for PaginationStream<K, V> {
type Item = Result<(K, V)>;
impl<T> Stream for PaginationStream<T> {
type Item = Result<T>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
Expand Down
9 changes: 2 additions & 7 deletions src/meta-srv/src/service/store/cached_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,10 @@ impl LeaderCachedKvBackend {
self.store.clone(),
RangeRequest::new().with_prefix(prefix.as_bytes()),
DEFAULT_PAGE_SIZE,
Arc::new(|kv| Ok((kv, ()))),
Arc::new(Ok),
);

let kvs = stream
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|(kv, _)| kv)
.collect();
let kvs = stream.try_collect::<Vec<_>>().await?.into_iter().collect();

self.cache
.batch_put(BatchPutRequest {
Expand Down

0 comments on commit 20a933e

Please sign in to comment.