From 4d4af198fe2a2be8ac557b602af060789e708898 Mon Sep 17 00:00:00 2001 From: phoenix <51927076+phoenix-o@users.noreply.github.com> Date: Mon, 23 Dec 2024 10:11:53 -0500 Subject: [PATCH] [kv store] add watermark table to bigtable (#20705) ## Description Different implementation. Here each update is represented as a separate row. Old entries are expected to get eventually GCed --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- Cargo.lock | 1 + crates/sui-data-ingestion/src/main.rs | 21 ++++++++++-- .../sui-data-ingestion/src/progress_store.rs | 11 +++++++ crates/sui-kvstore/Cargo.toml | 1 + crates/sui-kvstore/src/bigtable/client.rs | 12 ++++++- crates/sui-kvstore/src/bigtable/init.sh | 3 +- crates/sui-kvstore/src/bigtable/mod.rs | 1 + .../src/bigtable/progress_store.rs | 29 ++++++++++++++++ crates/sui-kvstore/src/lib.rs | 2 ++ crates/sui-kvstore/src/main.rs | 33 ++++++++++++------- 10 files changed, 98 insertions(+), 16 deletions(-) create mode 100644 crates/sui-kvstore/src/bigtable/progress_store.rs diff --git a/Cargo.lock b/Cargo.lock index a4984024d8bf6..98dca2e4b7814 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14287,6 +14287,7 @@ dependencies = [ "sui-data-ingestion-core", "sui-types", "telemetry-subscribers", + "tempfile", "tokio", "tonic 0.12.3", "tracing", diff --git a/crates/sui-data-ingestion/src/main.rs b/crates/sui-data-ingestion/src/main.rs index cd2c00694a07d..e82a503588864 100644 --- a/crates/sui-data-ingestion/src/main.rs +++ b/crates/sui-data-ingestion/src/main.rs @@ -13,7 +13,7 @@ use sui_data_ingestion::{ }; use sui_data_ingestion_core::{DataIngestionMetrics, ReaderOptions}; use sui_data_ingestion_core::{IndexerExecutor, WorkerPool}; -use sui_kvstore::{BigTableClient, KvWorker}; +use sui_kvstore::{BigTableClient, BigTableProgressStore, KvWorker}; use tokio::signal; use tokio::sync::oneshot; @@ -119,12 +119,30 @@ async fn main() -> Result<()> { mysten_metrics::init_metrics(®istry); let metrics = DataIngestionMetrics::new(®istry); + let mut bigtable_store = None; + for task in &config.tasks { + if let Task::BigTableKV(kv_config) = &task.task { + std::env::set_var( + "GOOGLE_APPLICATION_CREDENTIALS", + kv_config.credentials.clone(), + ); + let bigtable_client = BigTableClient::new_remote( + kv_config.instance_id.clone(), + false, + Some(Duration::from_secs(kv_config.timeout_secs as u64)), + ) + .await?; + bigtable_store = Some(BigTableProgressStore::new(bigtable_client)); + } + } + let progress_store = DynamoDBProgressStore::new( &config.progress_store.aws_access_key_id, &config.progress_store.aws_secret_access_key, config.progress_store.aws_region, config.progress_store.table_name, config.is_backfill, + bigtable_store, ) .await; let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics); @@ -160,7 +178,6 @@ async fn main() -> Result<()> { executor.register(worker_pool).await?; } Task::BigTableKV(kv_config) => { - std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", kv_config.credentials); let client = BigTableClient::new_remote( kv_config.instance_id, false, diff --git a/crates/sui-data-ingestion/src/progress_store.rs b/crates/sui-data-ingestion/src/progress_store.rs index 02857becfc626..2bc699d93bc18 100644 --- a/crates/sui-data-ingestion/src/progress_store.rs +++ b/crates/sui-data-ingestion/src/progress_store.rs @@ -11,12 +11,14 @@ use aws_sdk_s3::config::{Credentials, Region}; use std::str::FromStr; use std::time::Duration; use sui_data_ingestion_core::ProgressStore; +use sui_kvstore::BigTableProgressStore; use sui_types::messages_checkpoint::CheckpointSequenceNumber; pub struct DynamoDBProgressStore { client: Client, table_name: String, is_backfill: bool, + bigtable_store: Option, } impl DynamoDBProgressStore { @@ -26,6 +28,7 @@ impl DynamoDBProgressStore { aws_region: String, table_name: String, is_backfill: bool, + bigtable_store: Option, ) -> Self { let credentials = Credentials::new( aws_access_key_id, @@ -50,6 +53,7 @@ impl DynamoDBProgressStore { client, table_name, is_backfill, + bigtable_store, } } } @@ -79,6 +83,13 @@ impl ProgressStore for DynamoDBProgressStore { if self.is_backfill && checkpoint_number % 1000 != 0 { return Ok(()); } + if task_name == "bigtable" { + if let Some(ref mut bigtable_store) = self.bigtable_store { + bigtable_store + .save(task_name.clone(), checkpoint_number) + .await?; + } + } let backoff = backoff::ExponentialBackoff::default(); backoff::future::retry(backoff, || async { let result = self diff --git a/crates/sui-kvstore/Cargo.toml b/crates/sui-kvstore/Cargo.toml index a06bacd9ace66..dd78b873ce808 100644 --- a/crates/sui-kvstore/Cargo.toml +++ b/crates/sui-kvstore/Cargo.toml @@ -20,6 +20,7 @@ serde.workspace = true sui-data-ingestion-core.workspace = true sui-types.workspace = true telemetry-subscribers.workspace = true +tempfile.workspace = true tokio = { workspace = true, features = ["full"] } tonic = {version = "0.12.2",features = ["tls", "transport"] } tracing.workspace = true diff --git a/crates/sui-kvstore/src/bigtable/client.rs b/crates/sui-kvstore/src/bigtable/client.rs index 5c85447622827..44e5b9d01a698 100644 --- a/crates/sui-kvstore/src/bigtable/client.rs +++ b/crates/sui-kvstore/src/bigtable/client.rs @@ -35,6 +35,7 @@ const OBJECTS_TABLE: &str = "objects"; const TRANSACTIONS_TABLE: &str = "transactions"; const CHECKPOINTS_TABLE: &str = "checkpoints"; const CHECKPOINTS_BY_DIGEST_TABLE: &str = "checkpoints_by_digest"; +const WATERMARK_TABLE: &str = "watermark"; const COLUMN_FAMILY_NAME: &str = "sui"; const DEFAULT_COLUMN_QUALIFIER: &str = ""; @@ -131,6 +132,15 @@ impl KeyValueStoreWriter for BigTableClient { ) .await } + + async fn save_watermark(&mut self, watermark: CheckpointSequenceNumber) -> Result<()> { + let key = watermark.to_be_bytes().to_vec(); + self.multi_set( + WATERMARK_TABLE, + [(key, vec![(DEFAULT_COLUMN_QUALIFIER, vec![])])], + ) + .await + } } #[async_trait] @@ -239,7 +249,7 @@ impl KeyValueStoreReader for BigTableClient { async fn get_latest_checkpoint(&mut self) -> Result { let upper_limit = u64::MAX.to_be_bytes().to_vec(); match self - .reversed_scan(CHECKPOINTS_TABLE, upper_limit) + .reversed_scan(WATERMARK_TABLE, upper_limit) .await? .pop() { diff --git a/crates/sui-kvstore/src/bigtable/init.sh b/crates/sui-kvstore/src/bigtable/init.sh index f96ac5c1e9827..5d314b275fe37 100755 --- a/crates/sui-kvstore/src/bigtable/init.sh +++ b/crates/sui-kvstore/src/bigtable/init.sh @@ -10,7 +10,7 @@ if [[ -n $BIGTABLE_EMULATOR_HOST ]]; then command+=(-project emulator) fi -for table in objects transactions checkpoints checkpoints_by_digest; do +for table in objects transactions checkpoints checkpoints_by_digest watermark; do ( set -x "${command[@]}" createtable $table @@ -18,3 +18,4 @@ for table in objects transactions checkpoints checkpoints_by_digest; do "${command[@]}" setgcpolicy $table sui maxversions=1 ) done +"${command[@]}" setgcpolicy watermark sui maxage=2d diff --git a/crates/sui-kvstore/src/bigtable/mod.rs b/crates/sui-kvstore/src/bigtable/mod.rs index 9be9541c15ec4..58985d241ca94 100644 --- a/crates/sui-kvstore/src/bigtable/mod.rs +++ b/crates/sui-kvstore/src/bigtable/mod.rs @@ -2,5 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 pub(crate) mod client; +pub(crate) mod progress_store; mod proto; pub(crate) mod worker; diff --git a/crates/sui-kvstore/src/bigtable/progress_store.rs b/crates/sui-kvstore/src/bigtable/progress_store.rs new file mode 100644 index 0000000000000..e03ee86f667da --- /dev/null +++ b/crates/sui-kvstore/src/bigtable/progress_store.rs @@ -0,0 +1,29 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{BigTableClient, KeyValueStoreReader, KeyValueStoreWriter}; +use anyhow::Result; +use async_trait::async_trait; +use sui_data_ingestion_core::ProgressStore; +use sui_types::messages_checkpoint::CheckpointSequenceNumber; + +pub struct BigTableProgressStore { + client: BigTableClient, +} + +impl BigTableProgressStore { + pub fn new(client: BigTableClient) -> Self { + Self { client } + } +} + +#[async_trait] +impl ProgressStore for BigTableProgressStore { + async fn load(&mut self, _: String) -> Result { + self.client.get_latest_checkpoint().await + } + + async fn save(&mut self, _: String, checkpoint_number: CheckpointSequenceNumber) -> Result<()> { + self.client.save_watermark(checkpoint_number).await + } +} diff --git a/crates/sui-kvstore/src/lib.rs b/crates/sui-kvstore/src/lib.rs index 5d3ab55a3f64a..dd0724a48c733 100644 --- a/crates/sui-kvstore/src/lib.rs +++ b/crates/sui-kvstore/src/lib.rs @@ -4,6 +4,7 @@ mod bigtable; use anyhow::Result; use async_trait::async_trait; pub use bigtable::client::BigTableClient; +pub use bigtable::progress_store::BigTableProgressStore; pub use bigtable::worker::KvWorker; use sui_types::base_types::ObjectID; use sui_types::crypto::AuthorityStrongQuorumSignInfo; @@ -41,6 +42,7 @@ pub trait KeyValueStoreWriter { async fn save_objects(&mut self, objects: &[&Object]) -> Result<()>; async fn save_transactions(&mut self, transactions: &[TransactionData]) -> Result<()>; async fn save_checkpoint(&mut self, checkpoint: &CheckpointData) -> Result<()>; + async fn save_watermark(&mut self, watermark: CheckpointSequenceNumber) -> Result<()>; } #[derive(Clone, Debug)] diff --git a/crates/sui-kvstore/src/main.rs b/crates/sui-kvstore/src/main.rs index 82f9ceac61315..b25768216d51f 100644 --- a/crates/sui-kvstore/src/main.rs +++ b/crates/sui-kvstore/src/main.rs @@ -1,10 +1,11 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use sui_data_ingestion_core::setup_single_workflow; -use sui_kvstore::BigTableClient; -use sui_kvstore::KvWorker; +use prometheus::Registry; +use sui_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool}; +use sui_kvstore::{BigTableClient, BigTableProgressStore, KvWorker}; use telemetry_subscribers::TelemetryConfig; +use tokio::sync::oneshot; #[tokio::main] async fn main() -> Result<()> { @@ -20,16 +21,24 @@ async fn main() -> Result<()> { network == "mainnet" || network == "testnet", "Invalid network name" ); + let client = BigTableClient::new_local(instance_id).await?; - let client = BigTableClient::new_remote(instance_id, false, None).await?; - let (executor, _term_sender) = setup_single_workflow( - KvWorker { client }, - format!("https://checkpoints.{}.sui.io", network), - 0, + let (_exit_sender, exit_receiver) = oneshot::channel(); + let mut executor = IndexerExecutor::new( + BigTableProgressStore::new(client.clone()), 1, - None, - ) - .await?; - executor.await?; + DataIngestionMetrics::new(&Registry::new()), + ); + let worker_pool = WorkerPool::new(KvWorker { client }, "bigtable".to_string(), 50); + executor.register(worker_pool).await?; + executor + .run( + tempfile::tempdir()?.into_path(), + Some(format!("https://checkpoints.{}.sui.io", network)), + vec![], + ReaderOptions::default(), + exit_receiver, + ) + .await?; Ok(()) }