Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Upgrade to Diesel 2.2 #1647

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,656 changes: 966 additions & 690 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 3 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@ actix-web = "4"
docopt = "1.1"
base64 = "0.22"

# Updating to 2.* requires changes to the Connection code for logging.
# (Adding an `instrumentation()` and `set_instrumentation()` method.)
# More investigation required.
diesel = "1.4"
diesel_migrations = "1.4"
diesel_logger = "0.1"
diesel = "2.2.6"
diesel_migrations = "2.2.0"
diesel_logger = "0.4.0"

cadence = "1.3"
backtrace = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions syncserver-db-common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ enum SqlErrorKind {
Pool(diesel::r2d2::PoolError),

#[error("Error migrating the database: {}", _0)]
Migration(diesel_migrations::RunMigrationsError),
Migration(diesel_migrations::MigrationError),
}

impl From<SqlErrorKind> for SqlError {
Expand Down Expand Up @@ -75,7 +75,7 @@ from_error!(
);
from_error!(diesel::r2d2::PoolError, SqlError, SqlErrorKind::Pool);
from_error!(
diesel_migrations::RunMigrationsError,
diesel_migrations::MigrationError,
SqlError,
SqlErrorKind::Migration
);
4 changes: 2 additions & 2 deletions syncserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ tokenserver-db = { path = "../tokenserver-db" }
tokenserver-settings = { path = "../tokenserver-settings" }
tokio = { workspace = true, features = ["macros", "sync"] }
urlencoding = "2.1"
validator = "0.18"
validator_derive = "0.18"
validator = "0.19"
validator_derive = "0.19"
woothee = "0.13"

[features]
Expand Down
10 changes: 5 additions & 5 deletions syncstorage-db-common/src/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ pub struct GetQuotaUsage {

#[derive(Debug, Default, Deserialize, Queryable, QueryableByName, Serialize)]
pub struct GetBso {
#[sql_type = "Text"]
#[diesel(sql_type = Text)]
pub id: String,
#[sql_type = "BigInt"]
#[diesel(sql_type = BigInt)]
pub modified: SyncTimestamp,
#[sql_type = "Text"]
#[diesel(sql_type = Text)]
pub payload: String,
#[serde(skip_serializing_if = "Option::is_none")]
#[sql_type = "Nullable<Integer>"]
#[diesel(sql_type = Nullable<Integer>)]
pub sortindex: Option<i32>,
// NOTE: expiry (ttl) is never rendered to clients and only loaded for
// tests: this and its associated queries/loading could be wrapped in
// #[cfg(test)]
#[serde(skip_serializing)]
#[serde(skip_deserializing)]
#[sql_type = "BigInt"]
#[diesel(sql_type = BigInt)]
pub expiry: i64,
}

Expand Down
2 changes: 1 addition & 1 deletion syncstorage-db-common/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ where
i64: FromSql<BigInt, DB>,
DB: Backend,
{
fn from_sql(value: Option<&<DB as Backend>::RawValue>) -> deserialize::Result<Self> {
fn from_sql(value: <DB as Backend>::RawValue<'_>) -> deserialize::Result<Self> {
let i64_value = <i64 as FromSql<BigInt, DB>>::from_sql(value)?;
SyncTimestamp::from_i64(i64_value)
.map_err(|e| format!("Invalid SyncTimestamp i64 {}", e).into())
Expand Down
1 change: 0 additions & 1 deletion syncstorage-mysql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ slog-scope.workspace = true
thiserror.workspace = true

async-trait = "0.1.40"
# There appears to be a compilation error with diesel
diesel = { workspace = true, features = ["mysql", "r2d2"] }
diesel_logger = { workspace = true }
diesel_migrations = { workspace = true, features = ["mysql"] }
Expand Down
37 changes: 20 additions & 17 deletions syncstorage-mysql/src/batch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use base64::Engine;
use std::collections::HashSet;
use std::{
collections::HashSet,
sync::atomic::{AtomicU32, Ordering},
};

use diesel::{
self,
Expand All @@ -21,6 +24,8 @@ use super::{

const MAXTTL: i32 = 2_100_000_000;

static COUNTER: AtomicU32 = AtomicU32::new(0);

pub fn create(db: &MysqlDb, params: params::CreateBatch) -> DbResult<results::CreateBatch> {
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection)?;
Expand All @@ -32,18 +37,16 @@ pub fn create(db: &MysqlDb, params: params::CreateBatch) -> DbResult<results::Cr
// sharding writes via (batchid % num_tables), and leaving it as zero would
// skew the sharding distribution.
//
// So we mix in the lowest digit of the uid to improve the distribution
// while still letting us treat these ids as millisecond timestamps. It's
// yuck, but it works and it keeps the weirdness contained to this single
// line of code.
let batch_id = db.timestamp().as_i64() + (user_id % 10);
// We mix in a per-process counter to make batch IDs (more) unique within
// a timestamp.
let batch_id = db.timestamp().as_i64() + COUNTER.fetch_add(1, Ordering::Relaxed) as i64 % 10;
insert_into(batch_uploads::table)
.values((
batch_uploads::batch_id.eq(&batch_id),
batch_uploads::user_id.eq(&user_id),
batch_uploads::collection_id.eq(&collection_id),
))
.execute(&db.conn)
.execute(&mut *db.conn.write()?)
.map_err(|e| -> DbError {
match e {
// The user tried to create two batches with the same timestamp
Expand Down Expand Up @@ -74,7 +77,7 @@ pub fn validate(db: &MysqlDb, params: params::ValidateBatch) -> DbResult<bool> {
.filter(batch_uploads::batch_id.eq(&batch_id))
.filter(batch_uploads::user_id.eq(&user_id))
.filter(batch_uploads::collection_id.eq(&collection_id))
.get_result::<i32>(&db.conn)
.get_result::<i32>(&mut *db.conn.write()?)
.optional()?;
Ok(exists.is_some())
}
Expand Down Expand Up @@ -124,11 +127,11 @@ pub fn delete(db: &MysqlDb, params: params::DeleteBatch) -> DbResult<()> {
.filter(batch_uploads::batch_id.eq(&batch_id))
.filter(batch_uploads::user_id.eq(&user_id))
.filter(batch_uploads::collection_id.eq(&collection_id))
.execute(&db.conn)?;
.execute(&mut *db.conn.write()?)?;
diesel::delete(batch_upload_items::table)
.filter(batch_upload_items::batch_id.eq(&batch_id))
.filter(batch_upload_items::user_id.eq(&user_id))
.execute(&db.conn)?;
.execute(&mut *db.conn.write()?)?;
Ok(())
}

Expand All @@ -148,9 +151,9 @@ pub fn commit(db: &MysqlDb, params: params::CommitBatch) -> DbResult<results::Co
.bind::<BigInt, _>(user_id)
.bind::<BigInt, _>(&db.timestamp().as_i64())
.bind::<BigInt, _>(&db.timestamp().as_i64())
.execute(&db.conn)?;
.execute(&mut *db.conn.write()?)?;

db.update_collection(user_id as u32, collection_id)?;
db.update_collection(user_id as u32, collection_id, None)?;

delete(
db,
Expand Down Expand Up @@ -186,14 +189,14 @@ pub fn do_append(
// values contain a key that's already in the database, less so if the
// the duplicate is in the value set we're inserting.
#[derive(Debug, QueryableByName)]
#[table_name = "batch_upload_items"]
#[diesel(table_name = batch_upload_items)]
struct ExistsResult {
batch_id: i64,
id: String,
}

#[derive(AsChangeset)]
#[table_name = "batch_upload_items"]
#[diesel(table_name = batch_upload_items)]
struct UpdateBatches {
payload: Option<String>,
payload_size: Option<i64>,
Expand All @@ -208,7 +211,7 @@ pub fn do_append(
)
.bind::<BigInt, _>(user_id.legacy_id as i64)
.bind::<BigInt, _>(batch_id)
.get_results::<ExistsResult>(&db.conn)?
.get_results::<ExistsResult>(&mut *db.conn.write()?)?
{
existing.insert(exist_idx(
user_id.legacy_id,
Expand All @@ -232,7 +235,7 @@ pub fn do_append(
payload_size,
ttl_offset: bso.ttl.map(|ttl| ttl as i32),
})
.execute(&db.conn)?;
.execute(&mut *db.conn.write()?)?;
} else {
diesel::insert_into(batch_upload_items::table)
.values((
Expand All @@ -244,7 +247,7 @@ pub fn do_append(
batch_upload_items::payload_size.eq(payload_size),
batch_upload_items::ttl_offset.eq(bso.ttl.map(|ttl| ttl as i32)),
))
.execute(&db.conn)?;
.execute(&mut *db.conn.write()?)?;
// make sure to include the key into our table check.
existing.insert(exist_idx);
}
Expand Down
52 changes: 43 additions & 9 deletions syncstorage-mysql/src/diesel_ext.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::{fmt::Debug, marker::PhantomData};

use diesel::{
backend::Backend,
insertable::CanInsertInSingleQuery,
mysql::Mysql,
query_builder::{AstPass, InsertStatement, QueryFragment, QueryId},
query_dsl::methods::LockingDsl,
result::QueryResult,
Expression, RunQueryDsl, Table,
Expression, QuerySource, RunQueryDsl,
};

/// Emit MySQL <= 5.7's `LOCK IN SHARE MODE`
Expand All @@ -32,36 +34,68 @@ where
pub struct LockInShareMode;

impl QueryFragment<Mysql> for LockInShareMode {
fn walk_ast(&self, mut out: AstPass<'_, Mysql>) -> QueryResult<()> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Mysql>) -> QueryResult<()> {
out.push_sql(" LOCK IN SHARE MODE");
Ok(())
}
}

#[derive(Debug, Clone)]
pub struct OnDuplicateKeyUpdate<T, U, Op, Ret, X>(Box<InsertStatement<T, U, Op, Ret>>, X);
pub struct OnDuplicateKeyUpdate<T, U, Op, Ret, DB, X>(
Box<InsertStatement<T, U, Op, Ret>>,
X,
PhantomData<DB>,
)
where
DB: Backend,
T: QuerySource,
T::FromClause: QueryFragment<DB> + Clone + Debug,
U: QueryFragment<DB> + CanInsertInSingleQuery<DB>,
Op: QueryFragment<DB>,
Ret: QueryFragment<DB>,
X: Expression;

impl<T, U, Op, Ret, DB, X> QueryFragment<DB> for OnDuplicateKeyUpdate<T, U, Op, Ret, X>
impl<T, U, Op, Ret, DB, X> QueryFragment<DB> for OnDuplicateKeyUpdate<T, U, Op, Ret, DB, X>
where
DB: Backend,
T: Table,
T::FromClause: QueryFragment<DB>,
T: QuerySource,
T::FromClause: QueryFragment<DB> + Clone + Debug,
InsertStatement<T, U, Op, Ret>: QueryFragment<DB>,
U: QueryFragment<DB> + CanInsertInSingleQuery<DB>,
Op: QueryFragment<DB>,
Ret: QueryFragment<DB>,
X: Expression,
{
fn walk_ast(&self, mut out: AstPass<'_, DB>) -> QueryResult<()> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, DB>) -> QueryResult<()> {
self.0.walk_ast(out.reborrow())?;
out.push_sql(" ON DUPLICATE KEY UPDATE ");
//self.1.walk_ast(out.reborrow())?;
Ok(())
}
}

impl<T, U, Op, Ret, DB, X> RunQueryDsl<DB> for OnDuplicateKeyUpdate<T, U, Op, Ret, X> {}
impl<T, U, Op, Ret, DB, X> RunQueryDsl<DB> for OnDuplicateKeyUpdate<T, U, Op, Ret, DB, X>
where
DB: Backend,
T: QuerySource,
T::FromClause: QueryFragment<DB> + Clone + Debug,
U: QueryFragment<DB> + CanInsertInSingleQuery<DB>,
Op: QueryFragment<DB>,
Ret: QueryFragment<DB>,
X: Expression,
{
}

impl<T, U, Op, Ret, X> QueryId for OnDuplicateKeyUpdate<T, U, Op, Ret, X> {
impl<T, U, Op, Ret, DB, X> QueryId for OnDuplicateKeyUpdate<T, U, Op, Ret, DB, X>
where
DB: Backend,
T: QuerySource,
T::FromClause: QueryFragment<DB> + Clone + Debug,
U: QueryFragment<DB> + CanInsertInSingleQuery<DB>,
Op: QueryFragment<DB>,
Ret: QueryFragment<DB>,
X: Expression,
{
type QueryId = ();

const HAS_STATIC_QUERY_ID: bool = false;
Expand Down
19 changes: 15 additions & 4 deletions syncstorage-mysql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,20 @@ from_error!(
|error: diesel::r2d2::PoolError| DbError::from(DbErrorKind::Mysql(SqlError::from(error)))
);
from_error!(
diesel_migrations::RunMigrationsError,
diesel_migrations::MigrationError,
DbError,
|error: diesel_migrations::RunMigrationsError| DbError::from(DbErrorKind::Mysql(
SqlError::from(error)
))
|error: diesel_migrations::MigrationError| DbError::from(DbErrorKind::Mysql(SqlError::from(
error
)))
);
from_error!(
std::boxed::Box<dyn std::error::Error + std::marker::Send + Sync>,
DbError,
|error: std::boxed::Box<dyn std::error::Error>| DbError::internal_error(error.to_string())
);

impl<Guard> From<std::sync::PoisonError<Guard>> for DbError {
fn from(inner: std::sync::PoisonError<Guard>) -> DbError {
DbError::internal_error(inner.to_string())
}
}
1 change: 0 additions & 1 deletion syncstorage-mysql/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#[macro_use]
extern crate diesel;
#[macro_use]
extern crate diesel_migrations;
#[macro_use]
extern crate slog_scope;
Expand Down
Loading