Skip to content

Commit

Permalink
Event V2 Translation
Browse files Browse the repository at this point in the history
  • Loading branch information
junkil-park committed Oct 4, 2024
1 parent 6140801 commit c202506
Show file tree
Hide file tree
Showing 17 changed files with 422 additions and 29 deletions.
73 changes: 57 additions & 16 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use aptos_types::{
account_address::AccountAddress,
account_config::{AccountResource, NewBlockEvent},
chain_id::ChainId,
contract_event::EventWithVersion,
contract_event::{ContractEvent, EventWithVersion},
event::EventKey,
indexer::indexer_db_reader::IndexerReader,
ledger_info::LedgerInfoWithSignatures,
Expand Down Expand Up @@ -818,12 +818,17 @@ impl Context {
.into_iter()
.zip(infos)
.enumerate()
.map(|(i, ((txn, txn_output), info))| {
let version = start_version + i as u64;
let (write_set, events, _, _, _) = txn_output.unpack();
self.get_accumulator_root_hash(version)
.map(|h| (version, txn, info, events, h, write_set).into())
})
.map(
|(i, ((txn, txn_output), info))| -> Result<TransactionOnChainData> {
let version = start_version + i as u64;
let (write_set, mut events, _, _, _) = txn_output.unpack();
if self.node_config.indexer_db_config.enable_event_translation {
self.translate_v2_to_v1_events_for_version(version, &mut events)?;
}
let h = self.get_accumulator_root_hash(version)?;
Ok((version, txn, info, events, h, write_set).into())
},
)
.collect()
}

Expand Down Expand Up @@ -878,7 +883,14 @@ impl Context {
})?;
txns.into_inner()
.into_iter()
.map(|t| self.convert_into_transaction_on_chain_data(t))
.map(|t| -> Result<TransactionOnChainData> {
let mut txn = self.convert_into_transaction_on_chain_data(t)?;
if self.node_config.indexer_db_config.enable_event_translation {
let _ =
self.translate_v2_to_v1_events_for_version(txn.version, &mut txn.events);
}
Ok(txn)
})
.collect::<Result<Vec<_>>>()
.context("Failed to parse account transactions")
.map_err(|err| E::internal_with_code(err, AptosErrorCode::InternalError, ledger_info))
Expand All @@ -889,10 +901,18 @@ impl Context {
hash: HashValue,
ledger_version: u64,
) -> Result<Option<TransactionOnChainData>> {
self.db
if let Some(t) = self
.db
.get_transaction_by_hash(hash, ledger_version, true)?
.map(|t| self.convert_into_transaction_on_chain_data(t))
.transpose()
{
let mut txn: TransactionOnChainData = self.convert_into_transaction_on_chain_data(t)?;
if self.node_config.indexer_db_config.enable_event_translation {
let _ = self.translate_v2_to_v1_events_for_version(txn.version, &mut txn.events);
}
Ok(Some(txn))
} else {
Ok(None)
}
}

pub async fn get_pending_transaction_by_hash(
Expand All @@ -915,11 +935,32 @@ impl Context {
version: u64,
ledger_version: u64,
) -> Result<TransactionOnChainData> {
self.convert_into_transaction_on_chain_data(self.db.get_transaction_by_version(
version,
ledger_version,
true,
)?)
let mut txn = self.convert_into_transaction_on_chain_data(
self.db
.get_transaction_by_version(version, ledger_version, true)?,
)?;
if self.node_config.indexer_db_config.enable_event_translation {
self.translate_v2_to_v1_events_for_version(version, &mut txn.events)?;
}
Ok(txn)
}

fn translate_v2_to_v1_events_for_version(
&self,
version: u64,
events: &mut [ContractEvent],
) -> Result<()> {
for (idx, event) in events.iter_mut().enumerate() {
let translated_event = self
.indexer_reader
.as_ref()
.ok_or(anyhow!("Internal indexer reader doesn't exist"))?
.get_translated_v1_event_by_version_and_index(version, idx as u64);
if let Ok(translated_event) = translated_event {
*event = ContractEvent::V1(translated_event);
}
}
Ok(())
}

pub fn get_accumulator_root_hash(&self, version: u64) -> Result<HashValue> {
Expand Down
4 changes: 2 additions & 2 deletions api/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn new_test_context_with_config(test_name: String, node_config: NodeConfig) -> T
fn new_test_context_with_db_sharding_and_internal_indexer(test_name: String) -> TestContext {
let mut node_config = NodeConfig::default();
node_config.storage.rocksdb_configs.enable_storage_sharding = true;
node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 10);
node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, true, 10);
super_new_test_context(test_name, node_config, false, None)
}

Expand All @@ -45,6 +45,6 @@ fn new_test_context_with_sharding_and_delayed_internal_indexer(
) -> TestContext {
let mut node_config = NodeConfig::default();
node_config.storage.rocksdb_configs.enable_storage_sharding = true;
node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 1);
node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, true, 1);
super_new_test_context(test_name, node_config, false, end_version)
}
8 changes: 8 additions & 0 deletions config/src/config/internal_indexer_db_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize};
pub struct InternalIndexerDBConfig {
pub enable_transaction: bool,
pub enable_event: bool,
pub enable_event_translation: bool,
pub enable_statekeys: bool,
pub batch_size: usize,
}
Expand All @@ -20,12 +21,14 @@ impl InternalIndexerDBConfig {
pub fn new(
enable_transaction: bool,
enable_event: bool,
enable_event_translation: bool,
enable_statekeys: bool,
batch_size: usize,
) -> Self {
Self {
enable_transaction,
enable_event,
enable_event_translation,
enable_statekeys,
batch_size,
}
Expand All @@ -39,6 +42,10 @@ impl InternalIndexerDBConfig {
self.enable_event
}

pub fn enable_event_translation(&self) -> bool {
self.enable_event_translation
}

pub fn enable_statekeys(&self) -> bool {
self.enable_statekeys
}
Expand All @@ -57,6 +64,7 @@ impl Default for InternalIndexerDBConfig {
Self {
enable_transaction: false,
enable_event: false,
enable_event_translation: false,
enable_statekeys: false,
batch_size: 10_000,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ impl InternalIndexerDBService {
.expect("Failed to open internal indexer db"),
);

let internal_indexer_db_config = InternalIndexerDBConfig::new(false, false, true, 10_000);
let internal_indexer_db_config =
InternalIndexerDBConfig::new(false, false, false, true, 10_000);
Some(InternalIndexerDB::new(arc_db, internal_indexer_db_config))
}

Expand Down
3 changes: 2 additions & 1 deletion storage/aptosdb/src/event_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use aptos_crypto::{
};
use aptos_db_indexer_schemas::schema::{
event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema,
translated_v1_event::TranslatedV1EventSchema,
};
use aptos_schemadb::{iterator::SchemaIterator, schema::ValueCodec, ReadOptions, SchemaBatch, DB};
use aptos_storage_interface::{db_ensure as ensure, db_other_bail, AptosDbError, Result};
use aptos_types::{
account_address::AccountAddress,
account_config::{new_block_event_key, NewBlockEvent},
contract_event::ContractEvent,
contract_event::{ContractEvent, ContractEventV1},
event::EventKey,
proof::position::Position,
transaction::Version,
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub(crate) struct StateStore {
buffered_state: Mutex<BufferedState>,
buffered_state_target_items: usize,
smt_ancestors: Mutex<SmtAncestors<StateValue>>,
internal_indexer_db: Option<InternalIndexerDB>,
pub internal_indexer_db: Option<InternalIndexerDB>,
}

impl Deref for StateStore {
Expand Down
Loading

0 comments on commit c202506

Please sign in to comment.