Skip to content

Commit

Permalink
Merge branch 'main' into xxh/bench-sink
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Jan 4, 2024
2 parents aab3160 + 4babcf0 commit 7ae1611
Show file tree
Hide file tree
Showing 27 changed files with 556 additions and 397 deletions.
18 changes: 10 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 11 additions & 10 deletions src/batch/src/executor/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,7 @@ impl UpdateExecutor {
#[cfg(debug_assertions)]
table_dml_handle.check_chunk_schema(&stream_chunk);

let cardinality = stream_chunk.cardinality();
write_handle.write_chunk(stream_chunk).await?;

Result::Ok(cardinality / 2)
write_handle.write_chunk(stream_chunk).await
};

let mut rows_updated = 0;
Expand Down Expand Up @@ -181,17 +178,21 @@ impl UpdateExecutor {
.rows()
.zip_eq_debug(updated_data_chunk.rows())
{
let None = builder.append_one_row(row_delete) else {
unreachable!("no chunk should be yielded when appending the deleted row as the chunk size is always even");
};
if let Some(chunk) = builder.append_one_row(row_insert) {
rows_updated += write_txn_data(chunk).await?;
rows_updated += 1;
// If row_delete == row_insert, we don't need to do a actual update
if row_delete != row_insert {
let None = builder.append_one_row(row_delete) else {
unreachable!("no chunk should be yielded when appending the deleted row as the chunk size is always even");
};
if let Some(chunk) = builder.append_one_row(row_insert) {
write_txn_data(chunk).await?;
}
}
}
}

if let Some(chunk) = builder.consume_all() {
rows_updated += write_txn_data(chunk).await?;
write_txn_data(chunk).await?;
}
write_handle.end().await?;

Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl StreamService for StreamServiceImpl {
}

self.mgr
.send_barrier(&barrier, req.actor_ids_to_send, req.actor_ids_to_collect)
.send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect)
.await?;

Ok(Response::new(InjectBarrierResponse {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ mysql_common = { version = "0.31", default-features = false, features = [
] }
nexmark = { version = "0.2", features = ["serde"] }
num-bigint = "0.4"
opendal = { git = "https://github.com/apache/incubator-opendal", rev = "9a222e4d72b328a24d5775b1565292f4636bbe69" }
opendal = "0.44"
parking_lot = "0.12"
paste = "1"
prometheus = { version = "0.13", features = ["process"] }
Expand Down
2 changes: 1 addition & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
hex = "0.4"
hyper = "0.14"
itertools = "0.12"
maplit = "1.0.2"
memcomparable = { version = "0.2" }
mime_guess = "2"
num-integer = "0.1"
Expand Down Expand Up @@ -95,7 +96,6 @@ workspace-hack = { path = "../workspace-hack" }
[dev-dependencies]
assert_matches = "1"
expect-test = "1.4"
maplit = "1.0.2"
rand = "0.8"
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_test_runner = { workspace = true }
Expand Down
28 changes: 27 additions & 1 deletion src/meta/model_v2/migration/src/m20231008_020431_hummock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,24 @@ impl MigrationTrait for Migration {
.to_owned(),
)
.await?;
manager
.create_table(
Table::create()
.table(HummockSequence::Table)
.col(
ColumnDef::new(HummockSequence::Name)
.string()
.not_null()
.primary_key(),
)
.col(
ColumnDef::new(HummockSequence::Seq)
.big_integer()
.not_null(),
)
.to_owned(),
)
.await?;

Ok(())
}
Expand Down Expand Up @@ -201,7 +219,8 @@ impl MigrationTrait for Migration {
HummockPinnedVersion,
HummockPinnedSnapshot,
HummockVersionDelta,
HummockVersionStats
HummockVersionStats,
HummockSequence
);
Ok(())
}
Expand Down Expand Up @@ -260,3 +279,10 @@ enum HummockVersionStats {
Id,
Stats,
}

#[derive(DeriveIden)]
enum HummockSequence {
Table,
Name,
Seq,
}
28 changes: 28 additions & 0 deletions src/meta/model_v2/src/hummock_sequence.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use sea_orm::entity::prelude::*;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)]
#[sea_orm(table_name = "hummock_sequence")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub name: String,
pub seq: i64,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}
1 change: 1 addition & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod fragment;
pub mod function;
pub mod hummock_pinned_snapshot;
pub mod hummock_pinned_version;
pub mod hummock_sequence;
pub mod hummock_version_delta;
pub mod hummock_version_stats;
pub mod index;
Expand Down
40 changes: 4 additions & 36 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ use crate::hummock::error::Result;
use crate::hummock::manager::{read_lock, write_lock};
use crate::hummock::metrics_utils::trigger_gc_stat;
use crate::hummock::HummockManager;
use crate::storage::{MetaStore, MetaStoreError, DEFAULT_COLUMN_FAMILY};

const HUMMOCK_INIT_FLAG_KEY: &[u8] = b"hummock_init_flag";

#[derive(Default)]
pub struct HummockVersionCheckpoint {
Expand Down Expand Up @@ -63,9 +60,8 @@ impl HummockVersionCheckpoint {
/// A hummock version checkpoint compacts previous hummock version delta logs, and stores stale
/// objects from those delta logs.
impl HummockManager {
/// # Panics
/// if checkpoint is not found.
pub async fn read_checkpoint(&self) -> Result<HummockVersionCheckpoint> {
/// Returns Ok(None) if not found.
pub async fn try_read_checkpoint(&self) -> Result<Option<HummockVersionCheckpoint>> {
use prost::Message;
let data = match self
.object_store
Expand All @@ -75,16 +71,13 @@ impl HummockManager {
Ok(data) => data,
Err(e) => {
if e.is_object_not_found_error() {
panic!(
"Hummock version checkpoints do not exist in object store, path: {}",
self.version_checkpoint_path
);
return Ok(None);
}
return Err(e.into());
}
};
let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?;
Ok(HummockVersionCheckpoint::from_protobuf(&ckpt))
Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt)))
}

pub(super) async fn write_checkpoint(
Expand Down Expand Up @@ -173,31 +166,6 @@ impl HummockManager {
Ok(new_checkpoint_id - old_checkpoint_id)
}

pub(super) async fn need_init(&self) -> Result<bool> {
match self
.env
.meta_store()
.get_cf(DEFAULT_COLUMN_FAMILY, HUMMOCK_INIT_FLAG_KEY)
.await
{
Ok(_) => Ok(false),
Err(MetaStoreError::ItemNotFound(_)) => Ok(true),
Err(e) => Err(e.into()),
}
}

pub(super) async fn mark_init(&self) -> Result<()> {
self.env
.meta_store()
.put_cf(
DEFAULT_COLUMN_FAMILY,
HUMMOCK_INIT_FLAG_KEY.to_vec(),
memcomparable::to_vec(&0).unwrap(),
)
.await
.map_err(Into::into)
}

pub fn pause_version_checkpoint(&self) {
self.pause_version_checkpoint.store(true, Ordering::Relaxed);
tracing::info!("hummock version checkpoint is paused.");
Expand Down
14 changes: 8 additions & 6 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub use versioning::HummockVersionSafePoint;
use versioning::*;
pub(crate) mod checkpoint;
mod compaction;
mod sequence;
mod worker;

use compaction::*;
Expand Down Expand Up @@ -479,14 +480,19 @@ impl HummockManager {
.map(|version_delta| (version_delta.id, version_delta))
.collect();

let mut redo_state = if self.need_init().await? {
let checkpoint = self.try_read_checkpoint().await?;
let mut redo_state = if let Some(c) = checkpoint {
versioning_guard.checkpoint = c;
versioning_guard.checkpoint.version.clone()
} else {
let default_compaction_config = self
.compaction_group_manager
.read()
.await
.default_compaction_config();
let checkpoint_version = create_init_version(default_compaction_config);
tracing::info!("init hummock version checkpoint");
// This write to meta store is idempotent. So if `write_checkpoint` fails, restarting meta node is fine.
HummockVersionStats::default()
.insert(self.env.meta_store())
.await?;
Expand All @@ -495,13 +501,9 @@ impl HummockManager {
stale_objects: Default::default(),
};
self.write_checkpoint(&versioning_guard.checkpoint).await?;
self.mark_init().await?;
checkpoint_version
} else {
// Read checkpoint from object store.
versioning_guard.checkpoint = self.read_checkpoint().await?;
versioning_guard.checkpoint.version.clone()
};

versioning_guard.version_stats = HummockVersionStats::list(self.env.meta_store())
.await?
.into_iter()
Expand Down
Loading

0 comments on commit 7ae1611

Please sign in to comment.