Skip to content

Commit

Permalink
db: Support read-only mode
Browse files Browse the repository at this point in the history
## Description

Make it possible to create a database connection pool that defaults to
read-only transactions. This will be used by RPCs to make sure the RPC
doesn't accidentally write to the database it should be reading from.

## Test plan

New unit test for `Db::for_read`:

```
sui$ cargo nextest run -p sui-pg-db -- test_read_only
```
  • Loading branch information
amnn committed Dec 19, 2024
1 parent ac4f3ac commit 034bfa7
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 19 deletions.
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl Indexer {
metrics_address,
} = indexer_args;

let db = Db::new(db_args)
let db = Db::for_write(db_args)
.await
.context("Failed to connect to database")?;

Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-jsonrpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub async fn start_rpc(args: Args) -> anyhow::Result<()> {
.await
.context("Failed to start RPC service")?;

let db = Db::new(db_args)
let db = Db::for_read(db_args)
.await
.context("Failed to connect to database")?;

Expand Down
119 changes: 102 additions & 17 deletions crates/sui-pg-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct DbArgs {

#[derive(Clone)]
pub struct Db {
read_only: bool,
pool: Pool<AsyncPgConnection>,
}

Expand All @@ -48,24 +49,35 @@ impl DbArgs {
}

impl Db {
/// Construct a new DB connection pool. Instances of [Db] can be cloned to share access to the
/// same pool.
pub async fn new(config: DbArgs) -> anyhow::Result<Self> {
let manager = AsyncDieselConnectionManager::new(config.database_url.as_str());

let pool = Pool::builder()
.max_size(config.connection_pool_size)
.connection_timeout(config.connection_timeout())
.build(manager)
.await?;
/// Construct a new DB connection pool that supports write and reads. Instances of [Db] can be
/// cloned to share access to the same pool.
pub async fn for_write(config: DbArgs) -> anyhow::Result<Self> {
Ok(Self {
read_only: false,
pool: pool(config).await?,
})
}

Ok(Self { pool })
/// Construct a new DB connection pool that defaults to read-only transactions. Instances of
/// [Db] can be cloned to share access to the same pool.
pub async fn for_read(config: DbArgs) -> anyhow::Result<Self> {
Ok(Self {
read_only: true,
pool: pool(config).await?,
})
}

/// Retrieves a connection from the pool. Can fail with a timeout if a connection cannot be
/// established before the [DbArgs::connection_timeout] has elapsed.
pub async fn connect(&self) -> anyhow::Result<Connection<'_>> {
Ok(self.pool.get().await?)
let mut conn = self.pool.get().await?;
if self.read_only {
diesel::sql_query("SET default_transaction_read_only = 'on'")
.execute(&mut conn)
.await?;
}

Ok(conn)
}

/// Statistics about the connection pool
Expand Down Expand Up @@ -168,7 +180,7 @@ pub async fn reset_database<S: MigrationSource<Pg> + Send + Sync + 'static>(
db_config: DbArgs,
migrations: Option<S>,
) -> anyhow::Result<()> {
let db = Db::new(db_config).await?;
let db = Db::for_write(db_config).await?;
db.clear_database().await?;
if let Some(migrations) = migrations {
db.run_migrations(migrations).await?;
Expand All @@ -177,6 +189,16 @@ pub async fn reset_database<S: MigrationSource<Pg> + Send + Sync + 'static>(
Ok(())
}

async fn pool(args: DbArgs) -> anyhow::Result<Pool<AsyncPgConnection>> {
let manager = AsyncDieselConnectionManager::new(args.database_url.as_str());

Ok(Pool::builder()
.max_size(args.connection_pool_size)
.connection_timeout(args.connection_timeout())
.build(manager)
.await?)
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -196,7 +218,7 @@ mod tests {
..Default::default()
};

let db = Db::new(db_args).await.unwrap();
let db = Db::for_write(db_args).await.unwrap();
let mut conn = db.connect().await.unwrap();

// Run a simple query to verify the db can properly be queried
Expand Down Expand Up @@ -224,7 +246,7 @@ mod tests {
..Default::default()
};

let db = Db::new(db_args.clone()).await.unwrap();
let db = Db::for_write(db_args.clone()).await.unwrap();
let mut conn = db.connect().await.unwrap();
diesel::sql_query("CREATE TABLE test_table (id INTEGER PRIMARY KEY)")
.execute(&mut conn)
Expand All @@ -243,12 +265,75 @@ mod tests {
.unwrap();

let mut conn = db.connect().await.unwrap();
let cnt = diesel::sql_query(
let cnt: CountResult = diesel::sql_query(
"SELECT COUNT(*) as cnt FROM information_schema.tables WHERE table_name = 'test_table'",
)
.get_result::<CountResult>(&mut conn)
.get_result(&mut conn)
.await
.unwrap();
assert_eq!(cnt.cnt, 0);
}

#[tokio::test]
async fn test_read_only() {
let temp_db = temp::TempDb::new().unwrap();
let url = temp_db.database().url();

let db_args = DbArgs {
database_url: url.clone(),
..Default::default()
};

let writer = Db::for_write(db_args.clone()).await.unwrap();
let reader = Db::for_read(db_args).await.unwrap();

{
// Create a table
let mut conn = writer.connect().await.unwrap();
diesel::sql_query("CREATE TABLE test_table (id INTEGER PRIMARY KEY)")
.execute(&mut conn)
.await
.unwrap();
}

{
// Try an insert into it using the read-only connection, which should fail
let mut conn = reader.connect().await.unwrap();
let result = diesel::sql_query("INSERT INTO test_table (id) VALUES (1)")
.execute(&mut conn)
.await;
assert!(result.is_err());
}

{
// Try and select from it using the read-only connection, which should succeed, but
// return no results.
let mut conn = reader.connect().await.unwrap();
let cnt: CountResult = diesel::sql_query("SELECT COUNT(*) as cnt FROM test_table")
.get_result(&mut conn)
.await
.unwrap();
assert_eq!(cnt.cnt, 0);
}

{
// Then try to write to it using the write connection, which should succeed
let mut conn = writer.connect().await.unwrap();
diesel::sql_query("INSERT INTO test_table (id) VALUES (1)")
.execute(&mut conn)
.await
.unwrap();
}

{
// Finally, try to read from it using the read-only connection, which should now return
// results.
let mut conn = reader.connect().await.unwrap();
let cnt: CountResult = diesel::sql_query("SELECT COUNT(*) as cnt FROM test_table")
.get_result(&mut conn)
.await
.unwrap();
assert_eq!(cnt.cnt, 1);
}
}
}

0 comments on commit 034bfa7

Please sign in to comment.