Skip to content

Commit

Permalink
updated to 1.34.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Carlos Baez committed Oct 2, 2024
1 parent 6ee2dfe commit 22ef40c
Show file tree
Hide file tree
Showing 12 changed files with 592 additions and 458 deletions.
787 changes: 403 additions & 384 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ members = [

[workspace.package]
# This version string will be inherited by sui-core, sui-faucet, sui-node, sui-tools, sui-sdk, sui-move-build, and sui crates.
version = "1.34.0"
version = "1.34.1"

[profile.release]
# debug = 1 means line charts only, which is minimum needed for good stack traces
Expand Down
1 change: 1 addition & 0 deletions crates/sui-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ pub fn default_zklogin_oauth_providers() -> BTreeMap<Chain, BTreeSet<String>> {
"Credenza3".to_string(),
"Playtron".to_string(),
"Onefc".to_string(),
"Threedos".to_string(),
]);
map.insert(Chain::Mainnet, providers.clone());
map.insert(Chain::Testnet, providers);
Expand Down
62 changes: 29 additions & 33 deletions crates/sui-core/src/execution_cache/object_locks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use sui_types::error::{SuiError, SuiResult, UserInputError};
use sui_types::object::Object;
use sui_types::storage::ObjectStore;
use sui_types::transaction::VerifiedSignedTransaction;
use tracing::{debug, info, instrument, trace};
use tracing::{debug, error, info, instrument, trace};

use super::writeback_cache::WritebackCache;

type RefCount = usize;

pub(super) struct ObjectLocks {
// When acquire transaction locks, lock entries are briefly inserted into this map. The map
// exists to provide atomic test-and-set operations on the locks. After all locks have been inserted
Expand All @@ -23,7 +25,7 @@ pub(super) struct ObjectLocks {
// those objects. Therefore we do a db read for each object we are locking.
//
// TODO: find a strategy to allow us to avoid db reads for each object.
locked_transactions: DashMap<ObjectRef, LockDetails>,
locked_transactions: DashMap<ObjectRef, (RefCount, LockDetails)>,
}

impl ObjectLocks {
Expand All @@ -38,29 +40,10 @@ impl ObjectLocks {
obj_ref: &ObjectRef,
epoch_store: &AuthorityPerEpochStore,
) -> SuiResult<Option<LockDetails>> {
match self.locked_transactions.entry(*obj_ref) {
DashMapEntry::Vacant(vacant) => {
let tables = epoch_store.tables()?;
let lock = tables.get_locked_transaction(obj_ref)?;
if let Some(lock_details) = lock {
vacant.insert(lock_details);
}
Ok(lock)
}
DashMapEntry::Occupied(occupied) => {
if cfg!(debug_assertions) {
if let Some(lock_details) = epoch_store
.tables()
.unwrap()
.get_locked_transaction(obj_ref)
.unwrap()
{
assert_eq!(*occupied.get(), lock_details);
}
}
Ok(Some(*occupied.get()))
}
}
// We don't consult the in-memory state here. We are only interested in state that
// has been committed to the db. This is because in memory state is reverted
// if the transaction is not successfully locked.
epoch_store.tables()?.get_locked_transaction(obj_ref)
}

/// Attempts to atomically test-and-set a transaction lock on an object.
Expand Down Expand Up @@ -96,15 +79,18 @@ impl ObjectLocks {
let tables = epoch_store.tables()?;
if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? {
trace!("read lock from db: {:?}", lock_details);
vacant.insert(lock_details);
vacant.insert((1, lock_details));
lock_details
} else {
trace!("set lock: {:?}", new_lock);
vacant.insert(new_lock);
vacant.insert((1, new_lock));
new_lock
}
}
DashMapEntry::Occupied(occupied) => *occupied.get(),
DashMapEntry::Occupied(mut occupied) => {
occupied.get_mut().0 += 1;
occupied.get().1
}
};

if prev_lock != new_lock {
Expand Down Expand Up @@ -156,14 +142,24 @@ impl ObjectLocks {
fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) {
for (obj_ref, lock) in locks {
let entry = self.locked_transactions.entry(*obj_ref);
let occupied = match entry {
DashMapEntry::Vacant(_) => panic!("lock must exist"),
let mut occupied = match entry {
DashMapEntry::Vacant(_) => {
if cfg!(debug_assertions) {
panic!("lock must exist");
} else {
error!(?obj_ref, "lock should exist");
}
continue;
}
DashMapEntry::Occupied(occupied) => occupied,
};

if occupied.get() == lock {
trace!("clearing lock: {:?}", lock);
occupied.remove();
if occupied.get().1 == *lock {
occupied.get_mut().0 -= 1;
if occupied.get().0 == 0 {
trace!("clearing lock: {:?}", lock);
occupied.remove();
}
} else {
// this is impossible because the only case in which we overwrite a
// lock is when the lock is from a previous epoch. but we are holding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,79 @@ async fn test_concurrent_lockers() {
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_concurrent_lockers_same_tx() {
telemetry_subscribers::init_for_testing();

let mut s = Scenario::new(None, Arc::new(AtomicU32::new(0))).await;
let cache = s.cache.clone();
let mut txns = Vec::new();

for i in 0..1000 {
let a = i * 4;
let b = i * 4 + 1;
s.with_created(&[a, b]);
s.do_tx().await;

let a_ref = s.obj_ref(a);
let b_ref = s.obj_ref(b);

let tx1 = s.take_outputs();

let tx1 = s.make_signed_transaction(&tx1.transaction);

txns.push((tx1, a_ref, b_ref));
}

let barrier = Arc::new(tokio::sync::Barrier::new(2));

let t1 = {
let txns = txns.clone();
let cache = cache.clone();
let barrier = barrier.clone();
let epoch_store = s.epoch_store.clone();
tokio::task::spawn(async move {
let mut results = Vec::new();
for (tx1, a_ref, b_ref) in txns {
results.push(
cache
.acquire_transaction_locks(&epoch_store, &[a_ref, b_ref], tx1)
.await,
);
barrier.wait().await;
}
results
})
};

let t2 = {
let txns = txns.clone();
let cache = cache.clone();
let barrier = barrier.clone();
let epoch_store = s.epoch_store.clone();
tokio::task::spawn(async move {
let mut results = Vec::new();
for (tx1, a_ref, b_ref) in txns {
results.push(
cache
.acquire_transaction_locks(&epoch_store, &[a_ref, b_ref], tx1)
.await,
);
barrier.wait().await;
}
results
})
};

let results1 = t1.await.unwrap();
let results2 = t2.await.unwrap();

for (r1, r2) in results1.into_iter().zip(results2) {
assert!(r1.is_ok());
assert!(r2.is_ok());
}
}

#[tokio::test]
async fn latest_object_cache_race_test() {
let authority = TestAuthorityBuilder::new().build().await;
Expand Down
71 changes: 49 additions & 22 deletions crates/sui-indexer/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use clap::Args;
use diesel::migration::{Migration, MigrationSource, MigrationVersion};
use diesel::pg::Pg;
use diesel::table;
use diesel::ExpressionMethods;
use diesel::QueryDsl;
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use std::collections::BTreeSet;
use std::time::Duration;
use tracing::info;

Expand Down Expand Up @@ -111,28 +111,27 @@ async fn check_db_migration_consistency_impl(
// Unfortunately we cannot call applied_migrations() directly on the connection,
// since it implicitly creates the __diesel_schema_migrations table if it doesn't exist,
// which is a write operation that we don't want to do in this function.
let applied_migrations: Vec<MigrationVersion> = __diesel_schema_migrations::table
.select(__diesel_schema_migrations::version)
.order(__diesel_schema_migrations::version.asc())
.load(conn)
.await?;

// We check that the local migrations is a prefix of the applied migrations.
if local_migrations.len() > applied_migrations.len() {
return Err(IndexerError::DbMigrationError(format!(
"The number of local migrations is greater than the number of applied migrations. Local migrations: {:?}, Applied migrations: {:?}",
local_migrations, applied_migrations
)));
}
for (local_migration, applied_migration) in local_migrations.iter().zip(&applied_migrations) {
if local_migration != applied_migration {
return Err(IndexerError::DbMigrationError(format!(
"The next applied migration `{:?}` diverges from the local migration `{:?}`",
applied_migration, local_migration
)));
}
let applied_migrations: BTreeSet<MigrationVersion<'_>> = BTreeSet::from_iter(
__diesel_schema_migrations::table
.select(__diesel_schema_migrations::version)
.load(conn)
.await?,
);

// We check that the local migrations is a subset of the applied migrations.
let unapplied_migrations: Vec<_> = local_migrations
.into_iter()
.filter(|m| !applied_migrations.contains(m))
.collect();

if unapplied_migrations.is_empty() {
return Ok(());
}
Ok(())

Err(IndexerError::DbMigrationError(format!(
"This binary expected the following migrations to have been run, and they were not: {:?}",
unapplied_migrations
)))
}

pub use setup_postgres::{reset_database, run_migrations};
Expand Down Expand Up @@ -314,4 +313,32 @@ mod tests {
.await
.unwrap();
}

#[tokio::test]
async fn db_migration_consistency_subset_test() {
let database = TempDb::new().unwrap();
let pool = ConnectionPool::new(
database.database().url().to_owned(),
ConnectionPoolConfig {
pool_size: 2,
..Default::default()
},
)
.await
.unwrap();

reset_database(pool.dedicated_connection().await.unwrap())
.await
.unwrap();

let migrations: Vec<Box<dyn Migration<Pg>>> = MIGRATIONS.migrations().unwrap();
let mut local_migrations: Vec<_> = migrations.iter().map(|m| m.name().version()).collect();
local_migrations.remove(2);

// Local migrations are missing one record compared to the applied migrations, which should
// still be okay.
check_db_migration_consistency_impl(&mut pool.get().await.unwrap(), local_migrations)
.await
.unwrap();
}
}
25 changes: 15 additions & 10 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use sui_types::base_types::ObjectID;
use sui_types::dynamic_field::DynamicFieldInfo;
use sui_types::dynamic_field::DynamicFieldName;
use sui_types::dynamic_field::DynamicFieldType;
use sui_types::effects::TransactionEffectsAPI;
use sui_types::effects::{ObjectChange, TransactionEffectsAPI};
use sui_types::event::SystemEpochInfoEvent;
use sui_types::messages_checkpoint::{
CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
Expand Down Expand Up @@ -441,10 +441,7 @@ impl CheckpointHandler {
.map(|display| (display.object_type.clone(), display)),
);

let objects = input_objects
.iter()
.chain(output_objects.iter())
.collect::<Vec<_>>();
let objects: Vec<_> = input_objects.iter().chain(output_objects.iter()).collect();

let (balance_change, object_changes) =
TxChangesProcessor::new(&objects, metrics.clone())
Expand Down Expand Up @@ -477,14 +474,21 @@ impl CheckpointHandler {
.expect("committed txns have been validated")
.into_iter()
.map(|obj_kind| obj_kind.object_id())
.collect::<Vec<_>>();
.collect();

// Changed Objects
let changed_objects = fx
.all_changed_objects()
.into_iter()
.map(|(object_ref, _owner, _write_kind)| object_ref.0)
.collect::<Vec<_>>();
.collect();

// Affected Objects
let affected_objects = fx
.object_changes()
.into_iter()
.map(|ObjectChange { id, .. }| id)
.collect();

// Payers
let payers = vec![tx.gas_owner()];
Expand All @@ -501,13 +505,13 @@ impl CheckpointHandler {
_ => None,
})
.unique()
.collect::<Vec<_>>();
.collect();

// Move Calls
let move_calls = tx
.move_calls()
.iter()
.map(|(p, m, f)| (*<&ObjectID>::clone(p), m.to_string(), f.to_string()))
.into_iter()
.map(|(p, m, f)| (*p, m.to_string(), f.to_string()))
.collect();

db_tx_indices.push(TxIndex {
Expand All @@ -516,6 +520,7 @@ impl CheckpointHandler {
checkpoint_sequence_number: *checkpoint_seq,
input_objects,
changed_objects,
affected_objects,
sender,
payers,
recipients,
Expand Down
4 changes: 1 addition & 3 deletions crates/sui-indexer/src/models/tx_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,8 @@ impl TxIndex {
.collect();

let tx_affected_objects = self
.input_objects
.affected_objects
.iter()
.chain(self.changed_objects.iter())
.unique()
.map(|o| StoredTxAffectedObjects {
tx_sequence_number,
affected: o.to_vec(),
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ pub struct TxIndex {
pub checkpoint_sequence_number: u64,
pub input_objects: Vec<ObjectID>,
pub changed_objects: Vec<ObjectID>,
pub affected_objects: Vec<ObjectID>,
pub payers: Vec<SuiAddress>,
pub sender: SuiAddress,
pub recipients: Vec<SuiAddress>,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-open-rpc/spec/openrpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"name": "Apache-2.0",
"url": "https://raw.githubusercontent.com/MystenLabs/sui/main/LICENSE"
},
"version": "1.34.0"
"version": "1.34.1"
},
"methods": [
{
Expand Down
Loading

0 comments on commit 22ef40c

Please sign in to comment.