Skip to content

Commit

Permalink
[kv store] add watermark table to bigtable (#20705)
Browse files Browse the repository at this point in the history
## Description 

Different implementation. Here each update is represented as a separate
row. Old entries are expected to get eventually GCed


---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
phoenix-o authored Dec 23, 2024
1 parent ad58cb9 commit 4d4af19
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 19 additions & 2 deletions crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use sui_data_ingestion::{
};
use sui_data_ingestion_core::{DataIngestionMetrics, ReaderOptions};
use sui_data_ingestion_core::{IndexerExecutor, WorkerPool};
use sui_kvstore::{BigTableClient, KvWorker};
use sui_kvstore::{BigTableClient, BigTableProgressStore, KvWorker};
use tokio::signal;
use tokio::sync::oneshot;

Expand Down Expand Up @@ -119,12 +119,30 @@ async fn main() -> Result<()> {
mysten_metrics::init_metrics(&registry);
let metrics = DataIngestionMetrics::new(&registry);

let mut bigtable_store = None;
for task in &config.tasks {
if let Task::BigTableKV(kv_config) = &task.task {
std::env::set_var(
"GOOGLE_APPLICATION_CREDENTIALS",
kv_config.credentials.clone(),
);
let bigtable_client = BigTableClient::new_remote(
kv_config.instance_id.clone(),
false,
Some(Duration::from_secs(kv_config.timeout_secs as u64)),
)
.await?;
bigtable_store = Some(BigTableProgressStore::new(bigtable_client));
}
}

let progress_store = DynamoDBProgressStore::new(
&config.progress_store.aws_access_key_id,
&config.progress_store.aws_secret_access_key,
config.progress_store.aws_region,
config.progress_store.table_name,
config.is_backfill,
bigtable_store,
)
.await;
let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics);
Expand Down Expand Up @@ -160,7 +178,6 @@ async fn main() -> Result<()> {
executor.register(worker_pool).await?;
}
Task::BigTableKV(kv_config) => {
std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", kv_config.credentials);
let client = BigTableClient::new_remote(
kv_config.instance_id,
false,
Expand Down
11 changes: 11 additions & 0 deletions crates/sui-data-ingestion/src/progress_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ use aws_sdk_s3::config::{Credentials, Region};
use std::str::FromStr;
use std::time::Duration;
use sui_data_ingestion_core::ProgressStore;
use sui_kvstore::BigTableProgressStore;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

pub struct DynamoDBProgressStore {
client: Client,
table_name: String,
is_backfill: bool,
bigtable_store: Option<BigTableProgressStore>,
}

impl DynamoDBProgressStore {
Expand All @@ -26,6 +28,7 @@ impl DynamoDBProgressStore {
aws_region: String,
table_name: String,
is_backfill: bool,
bigtable_store: Option<BigTableProgressStore>,
) -> Self {
let credentials = Credentials::new(
aws_access_key_id,
Expand All @@ -50,6 +53,7 @@ impl DynamoDBProgressStore {
client,
table_name,
is_backfill,
bigtable_store,
}
}
}
Expand Down Expand Up @@ -79,6 +83,13 @@ impl ProgressStore for DynamoDBProgressStore {
if self.is_backfill && checkpoint_number % 1000 != 0 {
return Ok(());
}
if task_name == "bigtable" {
if let Some(ref mut bigtable_store) = self.bigtable_store {
bigtable_store
.save(task_name.clone(), checkpoint_number)
.await?;
}
}
let backoff = backoff::ExponentialBackoff::default();
backoff::future::retry(backoff, || async {
let result = self
Expand Down
1 change: 1 addition & 0 deletions crates/sui-kvstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ serde.workspace = true
sui-data-ingestion-core.workspace = true
sui-types.workspace = true
telemetry-subscribers.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["full"] }
tonic = {version = "0.12.2",features = ["tls", "transport"] }
tracing.workspace = true
12 changes: 11 additions & 1 deletion crates/sui-kvstore/src/bigtable/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const OBJECTS_TABLE: &str = "objects";
const TRANSACTIONS_TABLE: &str = "transactions";
const CHECKPOINTS_TABLE: &str = "checkpoints";
const CHECKPOINTS_BY_DIGEST_TABLE: &str = "checkpoints_by_digest";
const WATERMARK_TABLE: &str = "watermark";

const COLUMN_FAMILY_NAME: &str = "sui";
const DEFAULT_COLUMN_QUALIFIER: &str = "";
Expand Down Expand Up @@ -131,6 +132,15 @@ impl KeyValueStoreWriter for BigTableClient {
)
.await
}

async fn save_watermark(&mut self, watermark: CheckpointSequenceNumber) -> Result<()> {
let key = watermark.to_be_bytes().to_vec();
self.multi_set(
WATERMARK_TABLE,
[(key, vec![(DEFAULT_COLUMN_QUALIFIER, vec![])])],
)
.await
}
}

#[async_trait]
Expand Down Expand Up @@ -239,7 +249,7 @@ impl KeyValueStoreReader for BigTableClient {
async fn get_latest_checkpoint(&mut self) -> Result<CheckpointSequenceNumber> {
let upper_limit = u64::MAX.to_be_bytes().to_vec();
match self
.reversed_scan(CHECKPOINTS_TABLE, upper_limit)
.reversed_scan(WATERMARK_TABLE, upper_limit)
.await?
.pop()
{
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-kvstore/src/bigtable/init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ if [[ -n $BIGTABLE_EMULATOR_HOST ]]; then
command+=(-project emulator)
fi

for table in objects transactions checkpoints checkpoints_by_digest; do
for table in objects transactions checkpoints checkpoints_by_digest watermark; do
(
set -x
"${command[@]}" createtable $table
"${command[@]}" createfamily $table sui
"${command[@]}" setgcpolicy $table sui maxversions=1
)
done
"${command[@]}" setgcpolicy watermark sui maxage=2d
1 change: 1 addition & 0 deletions crates/sui-kvstore/src/bigtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

pub(crate) mod client;
pub(crate) mod progress_store;
mod proto;
pub(crate) mod worker;
29 changes: 29 additions & 0 deletions crates/sui-kvstore/src/bigtable/progress_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{BigTableClient, KeyValueStoreReader, KeyValueStoreWriter};
use anyhow::Result;
use async_trait::async_trait;
use sui_data_ingestion_core::ProgressStore;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

pub struct BigTableProgressStore {
client: BigTableClient,
}

impl BigTableProgressStore {
pub fn new(client: BigTableClient) -> Self {
Self { client }
}
}

#[async_trait]
impl ProgressStore for BigTableProgressStore {
async fn load(&mut self, _: String) -> Result<CheckpointSequenceNumber> {
self.client.get_latest_checkpoint().await
}

async fn save(&mut self, _: String, checkpoint_number: CheckpointSequenceNumber) -> Result<()> {
self.client.save_watermark(checkpoint_number).await
}
}
2 changes: 2 additions & 0 deletions crates/sui-kvstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod bigtable;
use anyhow::Result;
use async_trait::async_trait;
pub use bigtable::client::BigTableClient;
pub use bigtable::progress_store::BigTableProgressStore;
pub use bigtable::worker::KvWorker;
use sui_types::base_types::ObjectID;
use sui_types::crypto::AuthorityStrongQuorumSignInfo;
Expand Down Expand Up @@ -41,6 +42,7 @@ pub trait KeyValueStoreWriter {
async fn save_objects(&mut self, objects: &[&Object]) -> Result<()>;
async fn save_transactions(&mut self, transactions: &[TransactionData]) -> Result<()>;
async fn save_checkpoint(&mut self, checkpoint: &CheckpointData) -> Result<()>;
async fn save_watermark(&mut self, watermark: CheckpointSequenceNumber) -> Result<()>;
}

#[derive(Clone, Debug)]
Expand Down
33 changes: 21 additions & 12 deletions crates/sui-kvstore/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use sui_data_ingestion_core::setup_single_workflow;
use sui_kvstore::BigTableClient;
use sui_kvstore::KvWorker;
use prometheus::Registry;
use sui_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool};
use sui_kvstore::{BigTableClient, BigTableProgressStore, KvWorker};
use telemetry_subscribers::TelemetryConfig;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -20,16 +21,24 @@ async fn main() -> Result<()> {
network == "mainnet" || network == "testnet",
"Invalid network name"
);
let client = BigTableClient::new_local(instance_id).await?;

let client = BigTableClient::new_remote(instance_id, false, None).await?;
let (executor, _term_sender) = setup_single_workflow(
KvWorker { client },
format!("https://checkpoints.{}.sui.io", network),
0,
let (_exit_sender, exit_receiver) = oneshot::channel();
let mut executor = IndexerExecutor::new(
BigTableProgressStore::new(client.clone()),
1,
None,
)
.await?;
executor.await?;
DataIngestionMetrics::new(&Registry::new()),
);
let worker_pool = WorkerPool::new(KvWorker { client }, "bigtable".to_string(), 50);
executor.register(worker_pool).await?;
executor
.run(
tempfile::tempdir()?.into_path(),
Some(format!("https://checkpoints.{}.sui.io", network)),
vec![],
ReaderOptions::default(),
exit_receiver,
)
.await?;
Ok(())
}

0 comments on commit 4d4af19

Please sign in to comment.