From 9202ee552b08a3d3504d6379f27e386a4c08abac Mon Sep 17 00:00:00 2001 From: Tamas Soltesz Date: Fri, 20 Dec 2024 05:18:17 +0100 Subject: [PATCH] chore: bulk import changelog (#137) * chore: update changelog * chore: fix formatting * fix: integrating bulk execute with QueryExecutorTemplate - fixing tests --- CHANGELOG.md | 26 +++++ .../storage/mysql/QueryExecutorTemplate.java | 20 ++++ .../mysql/queries/BulkImportQueries.java | 29 +++--- .../mysql/queries/EmailPasswordQueries.java | 90 ++++++++--------- .../queries/EmailVerificationQueries.java | 89 +++++++---------- .../storage/mysql/queries/GeneralQueries.java | 98 +++++++++---------- .../mysql/queries/PasswordlessQueries.java | 93 ++++++++---------- .../storage/mysql/queries/TOTPQueries.java | 57 +++++------ .../mysql/queries/ThirdPartyQueries.java | 95 +++++++++--------- .../mysql/queries/UserIdMappingQueries.java | 24 ++--- .../mysql/queries/UserMetadataQueries.java | 29 +++--- .../mysql/queries/UserRolesQueries.java | 26 ++--- .../mysql/test/OneMillionUsersTest.java | 7 +- 13 files changed, 331 insertions(+), 352 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 936c0ca..025e719 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,35 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [7.3.0] + - Adds queries for Bulk Import - Adds support for multithreaded bulk import +### Migration + +```sql +CREATE TABLE IF NOT EXISTS bulk_import_users ( + id CHAR(36), + app_id VARCHAR(64) NOT NULL DEFAULT 'public', + primary_user_id VARCHAR(36), + raw_data TEXT NOT NULL, + status VARCHAR(128) DEFAULT 'NEW', + error_msg TEXT, + created_at BIGINT UNSIGNED NOT NULL, + updated_at BIGINT UNSIGNED NOT NULL, + PRIMARY KEY (app_id, id), + FOREIGN KEY(app_id) REFERENCES apps(app_id) ON DELETE CASCADE +); + +CREATE INDEX bulk_import_users_status_updated_at_index ON bulk_import_users (app_id, status, updated_at); + +CREATE INDEX bulk_import_users_pagination_index1 ON bulk_import_users (app_id, status, created_at DESC, + id DESC); + +CREATE INDEX bulk_import_users_pagination_index2 ON bulk_import_users (app_id, created_at DESC, id DESC); +``` + ## [7.2.0] - 2024-10-03 - Compatible with plugin interface version 6.3 diff --git a/src/main/java/io/supertokens/storage/mysql/QueryExecutorTemplate.java b/src/main/java/io/supertokens/storage/mysql/QueryExecutorTemplate.java index 17affad..ce5280c 100644 --- a/src/main/java/io/supertokens/storage/mysql/QueryExecutorTemplate.java +++ b/src/main/java/io/supertokens/storage/mysql/QueryExecutorTemplate.java @@ -22,6 +22,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.List; public interface QueryExecutorTemplate { @@ -44,6 +45,25 @@ static T execute(Connection con, String QUERY, PreparedStatementValueSetter } } + static void executeBatch(Connection connection, String QUERY, List setters) + throws SQLException, StorageQueryException { + assert setters != null; + assert !setters.isEmpty(); + try (PreparedStatement pst = connection.prepareStatement(QUERY)) { + int counter = 0; + for(PreparedStatementValueSetter setter: setters) { + setter.setValues(pst); + pst.addBatch(); + counter++; + + if(counter % 100 == 0) { + pst.executeBatch(); + } + } + pst.executeBatch(); //for the possible remaining ones + } + } + static int update(Start start, String QUERY, PreparedStatementValueSetter setter) throws SQLException, StorageQueryException { try (Connection con = ConnectionPool.getConnection(start)) { diff --git a/src/main/java/io/supertokens/storage/mysql/queries/BulkImportQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/BulkImportQueries.java index 8414886..5df1cdf 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/BulkImportQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/BulkImportQueries.java @@ -22,6 +22,7 @@ import io.supertokens.pluginInterface.exceptions.StorageQueryException; import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; import io.supertokens.pluginInterface.multitenancy.AppIdentifier; +import io.supertokens.storage.mysql.PreparedStatementValueSetter; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; import io.supertokens.storage.mysql.utils.Utils; @@ -29,15 +30,13 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; +import static io.supertokens.storage.mysql.QueryExecutorTemplate.*; public class BulkImportQueries { static String getQueryToCreateBulkImportUsersTable(Start start) { @@ -317,28 +316,24 @@ public static long getBulkImportUsersCount(Start start, AppIdentifier appIdentif public static void updateMultipleBulkImportUsersStatusToError_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull Map bulkImportUserIdToErrorMessage) - throws SQLException { + throws SQLException, StorageQueryException { BULK_IMPORT_USER_STATUS errorStatus = BULK_IMPORT_USER_STATUS.FAILED; String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?"; - PreparedStatement setErrorStatement = con.prepareStatement(query); + List errorSetters = new ArrayList<>(); - int counter = 0; for(String bulkImportUserId : bulkImportUserIdToErrorMessage.keySet()){ - setErrorStatement.setString(1, errorStatus.toString()); - setErrorStatement.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId)); - setErrorStatement.setLong(3, System.currentTimeMillis()); - setErrorStatement.setString(4, appIdentifier.getAppId()); - setErrorStatement.setString(5, bulkImportUserId); - setErrorStatement.addBatch(); - - if(counter % 100 == 0) { - setErrorStatement.executeBatch(); - } + errorSetters.add(pst -> { + pst.setString(1, errorStatus.toString()); + pst.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId)); + pst.setLong(3, System.currentTimeMillis()); + pst.setString(4, appIdentifier.getAppId()); + pst.setString(5, bulkImportUserId); + }); } - setErrorStatement.executeBatch(); + executeBatch(con, query, errorSetters); } private static class BulkImportUserRowMapper implements RowMapper { diff --git a/src/main/java/io/supertokens/storage/mysql/queries/EmailPasswordQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/EmailPasswordQueries.java index 359d641..9fa03e5 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/EmailPasswordQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/EmailPasswordQueries.java @@ -26,20 +26,19 @@ import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; import io.supertokens.pluginInterface.multitenancy.AppIdentifier; import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; +import io.supertokens.storage.mysql.PreparedStatementValueSetter; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; import io.supertokens.storage.mysql.utils.Utils; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; import java.util.stream.Collectors; import static io.supertokens.pluginInterface.RECIPE_ID.EMAIL_PASSWORD; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; +import static io.supertokens.storage.mysql.QueryExecutorTemplate.*; import static io.supertokens.storage.mysql.config.Config.getConfig; import static java.lang.System.currentTimeMillis; @@ -330,60 +329,53 @@ public static void signUpMultipleForBulkImport_Transaction(Start start, Connecti "INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable() + "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)"; - PreparedStatement appIdToUserId = sqlCon.prepareStatement(app_id_to_user_id_QUERY); - PreparedStatement allAuthRecipeUsers = sqlCon.prepareStatement(all_auth_recipe_users_QUERY); - PreparedStatement emailPasswordUsers = sqlCon.prepareStatement(emailpassword_users_QUERY); - PreparedStatement emailPasswordUsersToTenant = sqlCon.prepareStatement(emailpassword_users_to_tenant_QUERY); + List appIdToUserIdSetters = new ArrayList<>(); + List allAuthRecipeUsersSetters = new ArrayList<>(); + List emailPasswordUsersSetters = new ArrayList<>(); + List emailPasswordUsersToTenantSetters = new ArrayList<>();; - int counter = 0; for (EmailPasswordImportUser user : usersToSignUp) { String userId = user.userId; TenantIdentifier tenantIdentifier = user.tenantIdentifier; - appIdToUserId.setString(1, tenantIdentifier.getAppId()); - appIdToUserId.setString(2, userId); - appIdToUserId.setString(3, userId); - appIdToUserId.setString(4, EMAIL_PASSWORD.toString()); - appIdToUserId.addBatch(); - - - allAuthRecipeUsers.setString(1, tenantIdentifier.getAppId()); - allAuthRecipeUsers.setString(2, tenantIdentifier.getTenantId()); - allAuthRecipeUsers.setString(3, userId); - allAuthRecipeUsers.setString(4, userId); - allAuthRecipeUsers.setString(5, EMAIL_PASSWORD.toString()); - allAuthRecipeUsers.setLong(6, user.timeJoinedMSSinceEpoch); - allAuthRecipeUsers.setLong(7, user.timeJoinedMSSinceEpoch); - allAuthRecipeUsers.addBatch(); - - emailPasswordUsers.setString(1, tenantIdentifier.getAppId()); - emailPasswordUsers.setString(2, userId); - emailPasswordUsers.setString(3, user.email); - emailPasswordUsers.setString(4, user.passwordHash); - emailPasswordUsers.setLong(5, user.timeJoinedMSSinceEpoch); - emailPasswordUsers.addBatch(); - - emailPasswordUsersToTenant.setString(1, tenantIdentifier.getAppId()); - emailPasswordUsersToTenant.setString(2, tenantIdentifier.getTenantId()); - emailPasswordUsersToTenant.setString(3, userId); - emailPasswordUsersToTenant.setString(4, user.email); - emailPasswordUsersToTenant.addBatch(); - counter++; - if (counter % 100 == 0) { - appIdToUserId.executeBatch(); - allAuthRecipeUsers.executeBatch(); - emailPasswordUsers.executeBatch(); - emailPasswordUsersToTenant.executeBatch(); - } + appIdToUserIdSetters.add(pst -> { + pst.setString(1, tenantIdentifier.getAppId()); + pst.setString(2, userId); + pst.setString(3, userId); + pst.setString(4, EMAIL_PASSWORD.toString()); + }); + + allAuthRecipeUsersSetters.add(pst -> { + pst.setString(1, tenantIdentifier.getAppId()); + pst.setString(2, tenantIdentifier.getTenantId()); + pst.setString(3, userId); + pst.setString(4, userId); + pst.setString(5, EMAIL_PASSWORD.toString()); + pst.setLong(6, user.timeJoinedMSSinceEpoch); + pst.setLong(7, user.timeJoinedMSSinceEpoch); + }); + + emailPasswordUsersSetters.add(pst -> { + pst.setString(1, tenantIdentifier.getAppId()); + pst.setString(2, userId); + pst.setString(3, user.email); + pst.setString(4, user.passwordHash); + pst.setLong(5, user.timeJoinedMSSinceEpoch); + }); + + emailPasswordUsersToTenantSetters.add(pst -> { + pst.setString(1, tenantIdentifier.getAppId()); + pst.setString(2, tenantIdentifier.getTenantId()); + pst.setString(3, userId); + pst.setString(4, user.email); + }); } - //execute the remaining ones - appIdToUserId.executeBatch(); - allAuthRecipeUsers.executeBatch(); - emailPasswordUsers.executeBatch(); - emailPasswordUsersToTenant.executeBatch(); + executeBatch(sqlCon, app_id_to_user_id_QUERY, appIdToUserIdSetters); + executeBatch(sqlCon, all_auth_recipe_users_QUERY, allAuthRecipeUsersSetters); + executeBatch(sqlCon, emailpassword_users_QUERY, emailPasswordUsersSetters); + executeBatch(sqlCon, emailpassword_users_to_tenant_QUERY, emailPasswordUsersToTenantSetters); - //sqlCon.commit(); } catch (SQLException throwables) { throw new StorageTransactionLogicException(throwables); } diff --git a/src/main/java/io/supertokens/storage/mysql/queries/EmailVerificationQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/EmailVerificationQueries.java index 1e96fa5..d1eb10e 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/EmailVerificationQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/EmailVerificationQueries.java @@ -23,18 +23,17 @@ import io.supertokens.pluginInterface.multitenancy.AppIdentifier; import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; import io.supertokens.pluginInterface.sqlStorage.TransactionConnection; +import io.supertokens.storage.mysql.PreparedStatementValueSetter; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; import io.supertokens.storage.mysql.utils.Utils; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; +import static io.supertokens.storage.mysql.QueryExecutorTemplate.*; import static io.supertokens.storage.mysql.config.Config.getConfig; import static java.lang.System.currentTimeMillis; @@ -500,41 +499,25 @@ public static void updateMultipleUsersIsEmailVerified_Transaction(Start start, C boolean isEmailVerified) throws SQLException, StorageQueryException { + String QUERY = ""; if (isEmailVerified) { - String QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable() + QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable() + "(app_id, user_id, email) VALUES(?, ?, ?)"; - PreparedStatement insertQuery = con.prepareStatement(QUERY); - int counter = 0; - for(Map.Entry emailToUser : emailToUserIds.entrySet()){ - insertQuery.setString(1, appIdentifier.getAppId()); - insertQuery.setString(2, emailToUser.getKey()); - insertQuery.setString(3, emailToUser.getValue()); - insertQuery.addBatch(); - - counter++; - if (counter % 100 == 0) { - insertQuery.executeBatch(); - } - } - insertQuery.executeBatch(); } else { - String QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable() + QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable() + " WHERE app_id = ? AND user_id = ? AND email = ?"; - PreparedStatement deleteQuery = con.prepareStatement(QUERY); - int counter = 0; - for (Map.Entry emailToUser : emailToUserIds.entrySet()) { - deleteQuery.setString(1, appIdentifier.getAppId()); - deleteQuery.setString(2, emailToUser.getValue()); - deleteQuery.setString(3, emailToUser.getKey()); - deleteQuery.addBatch(); - - counter++; - if (counter % 100 == 0) { - deleteQuery.executeBatch(); - } - } - deleteQuery.executeBatch(); } + List setters = new ArrayList<>(); + + for(Map.Entry emailToUser : emailToUserIds.entrySet()){ + setters.add(pst -> { + pst.setString(1, appIdentifier.getAppId()); + pst.setString(2, emailToUser.getKey()); + pst.setString(3, emailToUser.getValue()); + }); + } + + executeBatch(con, QUERY, setters); } public static Set findUserIdsBeingUsedForEmailVerification(Start start, AppIdentifier appIdentifier, List userIds) @@ -587,29 +570,29 @@ public static void updateMultipleIsEmailVerifiedToExternalUserIds(Start start, A + " SET user_id = ? WHERE app_id = ? AND user_id = ?"; String update_email_verification_tokens_table_query = "UPDATE " + getConfig(start).getEmailVerificationTokensTable() + " SET user_id = ? WHERE app_id = ? AND user_id = ?"; - PreparedStatement updateEmailVerificationQuery = sqlCon.prepareStatement(update_email_verification_table_query); - PreparedStatement updateEmailVerificationTokensQuery = sqlCon.prepareStatement(update_email_verification_tokens_table_query); - int counter = 0; + List emailVerificationSetters = new ArrayList<>(); + List emailVerificationTokensSetters = new ArrayList<>(); + for (String supertokensUserId : supertokensUserIdToExternalUserId.keySet()){ - updateEmailVerificationQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId)); - updateEmailVerificationQuery.setString(2, appIdentifier.getAppId()); - updateEmailVerificationQuery.setString(3, supertokensUserId); - updateEmailVerificationQuery.addBatch(); - - updateEmailVerificationTokensQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId)); - updateEmailVerificationTokensQuery.setString(2, appIdentifier.getAppId()); - updateEmailVerificationTokensQuery.setString(3, supertokensUserId); - updateEmailVerificationTokensQuery.addBatch(); - - counter++; - if(counter % 100 == 0) { - updateEmailVerificationQuery.executeBatch(); - updateEmailVerificationTokensQuery.executeBatch(); - } + emailVerificationSetters.add(pst -> { + pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId)); + pst.setString(2, appIdentifier.getAppId()); + pst.setString(3, supertokensUserId); + }); + + emailVerificationTokensSetters.add(pst -> { + pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId)); + pst.setString(2, appIdentifier.getAppId()); + pst.setString(3, supertokensUserId); + }); } - updateEmailVerificationQuery.executeBatch(); - updateEmailVerificationTokensQuery.executeBatch(); + if(emailVerificationSetters.isEmpty()){ + return null; + } + + executeBatch(sqlCon, update_email_verification_table_query, emailVerificationSetters); + executeBatch(sqlCon, update_email_verification_tokens_table_query, emailVerificationTokensSetters); } catch (SQLException e) { throw new StorageTransactionLogicException(e); diff --git a/src/main/java/io/supertokens/storage/mysql/queries/GeneralQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/GeneralQueries.java index e3c7e30..c34c3d8 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/GeneralQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/GeneralQueries.java @@ -26,6 +26,7 @@ import io.supertokens.pluginInterface.multitenancy.AppIdentifier; import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; import io.supertokens.storage.mysql.ConnectionPool; +import io.supertokens.storage.mysql.PreparedStatementValueSetter; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; import io.supertokens.storage.mysql.utils.Utils; @@ -33,15 +34,17 @@ import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; -import java.sql.*; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.*; import java.util.stream.Collectors; import static io.supertokens.storage.mysql.PreparedStatementValueSetter.NO_OP_SETTER; import static io.supertokens.storage.mysql.ProcessState.PROCESS_STATE.CREATING_NEW_TABLE; import static io.supertokens.storage.mysql.ProcessState.getInstance; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; +import static io.supertokens.storage.mysql.QueryExecutorTemplate.*; import static io.supertokens.storage.mysql.queries.EmailPasswordQueries.getQueryToCreatePasswordResetTokenExpiryIndex; import static io.supertokens.storage.mysql.queries.EmailPasswordQueries.getQueryToCreatePasswordResetTokensTable; import static io.supertokens.storage.mysql.queries.EmailVerificationQueries.*; @@ -1002,26 +1005,21 @@ public static void makePrimaryUsers_Transaction(Start start, Connection sqlCon, String appid_to_userid_update_QUERY = "UPDATE " + Config.getConfig(start).getAppIdToUserIdTable() + " SET is_linked_or_is_a_primary_user = true WHERE app_id = ? AND user_id = ?"; - PreparedStatement usersUpdateStatement = sqlCon.prepareStatement(users_update_QUERY); - PreparedStatement appIdToUserIdUpdateStatement = sqlCon.prepareStatement(appid_to_userid_update_QUERY); - int counter = 0; + List usersUpdateBatch = new ArrayList<>(); + List appIdToUserIdUpdateBatch = new ArrayList<>(); + for(String userId: userIds){ - usersUpdateStatement.setString(1, appIdentifier.getAppId()); - usersUpdateStatement.setString(2, userId); - usersUpdateStatement.addBatch(); - - appIdToUserIdUpdateStatement.setString(1, appIdentifier.getAppId()); - appIdToUserIdUpdateStatement.setString(2, userId); - appIdToUserIdUpdateStatement.addBatch(); - - counter++; - if(counter % 100 == 0) { - usersUpdateStatement.executeBatch(); - appIdToUserIdUpdateStatement.executeBatch(); - } + usersUpdateBatch.add(pst -> { + pst.setString(1, appIdentifier.getAppId()); + pst.setString(2, userId); + }); + appIdToUserIdUpdateBatch.add(pst -> { + pst.setString(1, appIdentifier.getAppId()); + pst.setString(2, userId); + }); } - usersUpdateStatement.executeBatch(); - appIdToUserIdUpdateStatement.executeBatch(); + executeBatch(sqlCon, users_update_QUERY, usersUpdateBatch); + executeBatch(sqlCon, appid_to_userid_update_QUERY, appIdToUserIdUpdateBatch); } public static void linkAccounts_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier, @@ -1070,33 +1068,27 @@ public static void linkMultipleAccounts_Transaction(Start start, Connection sqlC " SET is_linked_or_is_a_primary_user = true, primary_or_recipe_user_id = ? WHERE app_id = ? AND " + "user_id = ?"; - PreparedStatement updateUsers = sqlCon.prepareStatement(update_users_QUERY); - PreparedStatement updateAppIdToUserId = sqlCon.prepareStatement(update_appid_to_userid_QUERY); + List usersUpdateBatch = new ArrayList<>(); + List appIdToUserIdUpdateBatch = new ArrayList<>(); - int counter = 0; for(Map.Entry linkEntry : recipeUserIdToPrimaryUserId.entrySet()) { String primaryUserId = linkEntry.getValue(); String recipeUserId = linkEntry.getKey(); - updateUsers.setString(1, primaryUserId); - updateUsers.setString(2, appIdentifier.getAppId()); - updateUsers.setString(3, recipeUserId); - updateUsers.addBatch(); - - updateAppIdToUserId.setString(1, primaryUserId); - updateAppIdToUserId.setString(2, appIdentifier.getAppId()); - updateAppIdToUserId.setString(3, recipeUserId); - updateAppIdToUserId.addBatch(); - - counter++; - if (counter % 100 == 0) { - updateUsers.executeBatch(); - updateAppIdToUserId.executeBatch(); - } + usersUpdateBatch.add(pst -> { + pst.setString(1, primaryUserId); + pst.setString(2, appIdentifier.getAppId()); + pst.setString(3, recipeUserId); + }); + appIdToUserIdUpdateBatch.add(pst -> { + pst.setString(1, primaryUserId); + pst.setString(2, appIdentifier.getAppId()); + pst.setString(3, recipeUserId); + }); } - updateUsers.executeBatch(); - updateAppIdToUserId.executeBatch(); + executeBatch(sqlCon, update_users_QUERY, usersUpdateBatch); + executeBatch(sqlCon, update_appid_to_userid_QUERY, appIdToUserIdUpdateBatch); updateTimeJoinedForPrimaryUsers_Transaction(start, sqlCon, appIdentifier, new ArrayList<>(recipeUserIdToPrimaryUserId.values())); @@ -1105,20 +1097,22 @@ public static void linkMultipleAccounts_Transaction(Start start, Connection sqlC public static void updateTimeJoinedForPrimaryUsers_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier, List primaryUserIds) throws SQLException, StorageQueryException { + String QUERY = "UPDATE " + Config.getConfig(start).getUsersTable() + - " SET primary_or_recipe_user_time_joined = (SELECT MIN(time_joined) FROM " + - Config.getConfig(start).getUsersTable() + " WHERE app_id = ? AND primary_or_recipe_user_id = ?) WHERE " + - " app_id = ? AND primary_or_recipe_user_id = ?"; - PreparedStatement updateStatement = sqlCon.prepareStatement(QUERY); + " JOIN (SELECT primary_or_recipe_user_id, MIN(time_joined) AS min_time_joined FROM " + + Config.getConfig(start).getUsersTable() + " WHERE app_id = ? AND primary_or_recipe_user_id = ? )" + + " AS temp_table ON " + Config.getConfig(start).getUsersTable() + ".primary_or_recipe_user_id = " + + "temp_table.primary_or_recipe_user_id " + + " SET primary_or_recipe_user_time_joined = min_time_joined"; + + List updateBatch = new ArrayList<>(); for(String primaryUserId : primaryUserIds) { - updateStatement.setString(1, appIdentifier.getAppId()); - updateStatement.setString(2, primaryUserId); - updateStatement.setString(3, appIdentifier.getAppId()); - updateStatement.setString(4, primaryUserId); - updateStatement.addBatch(); + updateBatch.add(pst -> { + pst.setString(1, appIdentifier.getAppId()); + pst.setString(2, primaryUserId); + }); } - - updateStatement.executeBatch(); + executeBatch(sqlCon, QUERY, updateBatch); } public static void unlinkAccounts_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier, diff --git a/src/main/java/io/supertokens/storage/mysql/queries/PasswordlessQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/PasswordlessQueries.java index 537fd61..07915ad 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/PasswordlessQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/PasswordlessQueries.java @@ -29,6 +29,7 @@ import io.supertokens.pluginInterface.passwordless.PasswordlessImportUser; import io.supertokens.pluginInterface.sqlStorage.SQLStorage.TransactionIsolationLevel; import io.supertokens.storage.mysql.ConnectionPool; +import io.supertokens.storage.mysql.PreparedStatementValueSetter; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; import io.supertokens.storage.mysql.utils.Utils; @@ -36,15 +37,13 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; import java.util.stream.Collectors; import static io.supertokens.pluginInterface.RECIPE_ID.PASSWORDLESS; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; +import static io.supertokens.storage.mysql.QueryExecutorTemplate.*; import static io.supertokens.storage.mysql.config.Config.getConfig; public class PasswordlessQueries { @@ -157,73 +156,65 @@ public static void createDeviceWithCode(Start start, TenantIdentifier tenantIden } public static void importUsers_Transaction(Connection sqlCon, Start start, - Collection users) throws SQLException { + Collection users) + throws StorageQueryException, SQLException { String app_id_to_user_id_QUERY = "INSERT INTO " + getConfig(start).getAppIdToUserIdTable() + "(app_id, user_id, primary_or_recipe_user_id, recipe_id)" + " VALUES(?, ?, ?, ?)"; - PreparedStatement appIdToUserIdStatement = sqlCon.prepareStatement(app_id_to_user_id_QUERY); + List appIdToUserIdSetters = new ArrayList<>(); String all_auth_recipe_users_QUERY = "INSERT INTO " + getConfig(start).getUsersTable() + "(app_id, tenant_id, user_id, primary_or_recipe_user_id, recipe_id, time_joined, " + "primary_or_recipe_user_time_joined)" + " VALUES(?, ?, ?, ?, ?, ?, ?)"; - PreparedStatement allAuthRecipeUsersStatement = sqlCon.prepareStatement(all_auth_recipe_users_QUERY); + List allAuthRecipeUsersSetters = new ArrayList<>(); String passwordless_users_QUERY = "INSERT INTO " + getConfig(start).getPasswordlessUsersTable() + "(app_id, user_id, email, phone_number, time_joined)" + " VALUES(?, ?, ?, ?, ?)"; - PreparedStatement passwordlessUsersStatement = sqlCon.prepareStatement(passwordless_users_QUERY); + List passwordlessUsersSetters = new ArrayList<>(); String passwordless_user_to_tenant_QUERY = "INSERT INTO " + getConfig(start).getPasswordlessUserToTenantTable() + "(app_id, tenant_id, user_id, email, phone_number)" + " VALUES(?, ?, ?, ?, ?)"; - PreparedStatement passwordlessUserToTenantStatement = sqlCon.prepareStatement(passwordless_user_to_tenant_QUERY); + List passwordlessUserToTenantSetters = new ArrayList<>(); - int counter = 0; for (PasswordlessImportUser user: users){ TenantIdentifier tenantIdentifier = user.tenantIdentifier; - appIdToUserIdStatement.setString(1, tenantIdentifier.getAppId()); - appIdToUserIdStatement.setString(2, user.userId); - appIdToUserIdStatement.setString(3, user.userId); - appIdToUserIdStatement.setString(4, PASSWORDLESS.toString()); - appIdToUserIdStatement.addBatch(); - - allAuthRecipeUsersStatement.setString(1, tenantIdentifier.getAppId()); - allAuthRecipeUsersStatement.setString(2, tenantIdentifier.getTenantId()); - allAuthRecipeUsersStatement.setString(3, user.userId); - allAuthRecipeUsersStatement.setString(4, user.userId); - allAuthRecipeUsersStatement.setString(5, PASSWORDLESS.toString()); - allAuthRecipeUsersStatement.setLong(6, user.timeJoinedMSSinceEpoch); - allAuthRecipeUsersStatement.setLong(7, user.timeJoinedMSSinceEpoch); - allAuthRecipeUsersStatement.addBatch(); - - passwordlessUsersStatement.setString(1, tenantIdentifier.getAppId()); - passwordlessUsersStatement.setString(2, user.userId); - passwordlessUsersStatement.setString(3, user.email); - passwordlessUsersStatement.setString(4, user.phoneNumber); - passwordlessUsersStatement.setLong(5, user.timeJoinedMSSinceEpoch); - passwordlessUsersStatement.addBatch(); - - passwordlessUserToTenantStatement.setString(1, tenantIdentifier.getAppId()); - passwordlessUserToTenantStatement.setString(2, tenantIdentifier.getTenantId()); - passwordlessUserToTenantStatement.setString(3, user.userId); - passwordlessUserToTenantStatement.setString(4, user.email); - passwordlessUserToTenantStatement.setString(5, user.phoneNumber); - passwordlessUserToTenantStatement.addBatch(); - - counter++; - - if(counter % 100 == 0) { - appIdToUserIdStatement.executeBatch(); - allAuthRecipeUsersStatement.executeBatch(); - passwordlessUsersStatement.executeBatch(); - passwordlessUserToTenantStatement.executeBatch(); - } - } + appIdToUserIdSetters.add(pst -> { + pst.setString(1, user.tenantIdentifier.getAppId()); + pst.setString(2, user.userId); + pst.setString(3, user.userId); + pst.setString(4, PASSWORDLESS.toString()); + }); - appIdToUserIdStatement.executeBatch(); - allAuthRecipeUsersStatement.executeBatch(); - passwordlessUsersStatement.executeBatch(); - passwordlessUserToTenantStatement.executeBatch(); + allAuthRecipeUsersSetters.add(pst -> { + pst.setString(1, tenantIdentifier.getAppId()); + pst.setString(2, tenantIdentifier.getTenantId()); + pst.setString(3, user.userId); + pst.setString(4, user.userId); + pst.setString(5, PASSWORDLESS.toString()); + pst.setLong(6, user.timeJoinedMSSinceEpoch); + pst.setLong(7, user.timeJoinedMSSinceEpoch); + }); + passwordlessUsersSetters.add(pst -> { + pst.setString(1, tenantIdentifier.getAppId()); + pst.setString(2, user.userId); + pst.setString(3, user.email); + pst.setString(4, user.phoneNumber); + pst.setLong(5, user.timeJoinedMSSinceEpoch); + }); + passwordlessUserToTenantSetters.add(pst -> { + pst.setString(1, tenantIdentifier.getAppId()); + pst.setString(2, tenantIdentifier.getTenantId()); + pst.setString(3, user.userId); + pst.setString(4, user.email); + pst.setString(5, user.phoneNumber); + }); + } + executeBatch(sqlCon, app_id_to_user_id_QUERY, appIdToUserIdSetters); + executeBatch(sqlCon, all_auth_recipe_users_QUERY, allAuthRecipeUsersSetters); + executeBatch(sqlCon, passwordless_users_QUERY, passwordlessUsersSetters); + executeBatch(sqlCon, passwordless_user_to_tenant_QUERY, passwordlessUserToTenantSetters); } public static PasswordlessDevice getDevice_Transaction(Start start, Connection con, TenantIdentifier tenantIdentifier, String deviceIdHash) diff --git a/src/main/java/io/supertokens/storage/mysql/queries/TOTPQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/TOTPQueries.java index 5d8cdd4..d22e2da 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/TOTPQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/TOTPQueries.java @@ -6,12 +6,12 @@ import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; import io.supertokens.pluginInterface.totp.TOTPDevice; import io.supertokens.pluginInterface.totp.TOTPUsedCode; +import io.supertokens.storage.mysql.PreparedStatementValueSetter; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; import io.supertokens.storage.mysql.utils.Utils; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; @@ -19,8 +19,7 @@ import java.util.List; import java.util.Map; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; +import static io.supertokens.storage.mysql.QueryExecutorTemplate.*; public class TOTPQueries { public static String getQueryToCreateUsersTable(Start start) { @@ -303,38 +302,34 @@ public static void createDevices_Transaction(Start start, Connection sqlCon, App " (app_id, user_id, device_name, secret_key, period, skew, verified, created_at) VALUES (?, ?, ?, ?, " + "?, ?, ?, ?) ON DUPLICATE KEY UPDATE secret_key = ?, period = ?, skew = ?, created_at = ?, verified = ?"; - PreparedStatement insertUserStatement = sqlCon.prepareStatement(insert_user_QUERY); - PreparedStatement insertDeviceStatement = sqlCon.prepareStatement(insert_device_QUERY); + List insertUserSetters = new ArrayList<>(); + List insertDeviceSetters = new ArrayList<>(); - int counter = 0; for(TOTPDevice device : devices){ - insertUserStatement.setString(1, appIdentifier.getAppId()); - insertUserStatement.setString(2, device.userId); - insertUserStatement.addBatch(); - - insertDeviceStatement.setString(1, appIdentifier.getAppId()); - insertDeviceStatement.setString(2, device.userId); - insertDeviceStatement.setString(3, device.deviceName); - insertDeviceStatement.setString(4, device.secretKey); - insertDeviceStatement.setInt(5, device.period); - insertDeviceStatement.setInt(6, device.skew); - insertDeviceStatement.setBoolean(7, device.verified); - insertDeviceStatement.setLong(8, device.createdAt); - insertDeviceStatement.setString(9, device.secretKey); - insertDeviceStatement.setInt(10, device.period); - insertDeviceStatement.setInt(11, device.skew); - insertDeviceStatement.setLong(12, device.createdAt); - insertDeviceStatement.setBoolean(13, device.verified); - insertDeviceStatement.addBatch(); - counter++; - if(counter % 100 == 0) { - insertUserStatement.executeBatch(); - insertDeviceStatement.executeBatch(); - } + insertUserSetters.add(pst -> { + pst.setString(1, appIdentifier.getAppId()); + pst.setString(2, device.userId); + }); + + insertDeviceSetters.add(insertDeviceStatement -> { + insertDeviceStatement.setString(1, appIdentifier.getAppId()); + insertDeviceStatement.setString(2, device.userId); + insertDeviceStatement.setString(3, device.deviceName); + insertDeviceStatement.setString(4, device.secretKey); + insertDeviceStatement.setInt(5, device.period); + insertDeviceStatement.setInt(6, device.skew); + insertDeviceStatement.setBoolean(7, device.verified); + insertDeviceStatement.setLong(8, device.createdAt); + insertDeviceStatement.setString(9, device.secretKey); + insertDeviceStatement.setInt(10, device.period); + insertDeviceStatement.setInt(11, device.skew); + insertDeviceStatement.setLong(12, device.createdAt); + insertDeviceStatement.setBoolean(13, device.verified); + }); } - insertUserStatement.executeBatch(); - insertDeviceStatement.executeBatch(); + executeBatch(sqlCon, insert_user_QUERY, insertUserSetters); + executeBatch(sqlCon, insert_device_QUERY, insertDeviceSetters); } public static Map> getDevicesForMultipleUsers(Start start, AppIdentifier appIdentifier, List userIds) diff --git a/src/main/java/io/supertokens/storage/mysql/queries/ThirdPartyQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/ThirdPartyQueries.java index dcb0a1f..56a139f 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/ThirdPartyQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/ThirdPartyQueries.java @@ -26,20 +26,19 @@ import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; import io.supertokens.pluginInterface.thirdparty.ThirdPartyImportUser; import io.supertokens.storage.mysql.ConnectionPool; +import io.supertokens.storage.mysql.PreparedStatementValueSetter; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; import io.supertokens.storage.mysql.utils.Utils; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; import java.util.stream.Collectors; import static io.supertokens.pluginInterface.RECIPE_ID.THIRD_PARTY; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; +import static io.supertokens.storage.mysql.QueryExecutorTemplate.*; public class ThirdPartyQueries { @@ -585,7 +584,7 @@ private static List fillUserInfoWithTenantIds_transaction(Start public static void importUser_Transaction(Start start, Connection sqlConnection, Collection users) - throws SQLException { + throws SQLException, StorageQueryException { String app_id_userid_QUERY = "INSERT INTO " + Config.getConfig(start).getAppIdToUserIdTable() + "(app_id, user_id, primary_or_recipe_user_id, recipe_id)" + " VALUES(?, ?, ?, ?)"; @@ -604,58 +603,54 @@ public static void importUser_Transaction(Start start, Connection sqlConnection, + "(app_id, tenant_id, user_id, third_party_id, third_party_user_id)" + " VALUES(?, ?, ?, ?, ?)"; - PreparedStatement appIdToUserIdStatement = sqlConnection.prepareStatement(app_id_userid_QUERY); - PreparedStatement allAuthRecipeUsersStatement = sqlConnection.prepareStatement(all_auth_recipe_users_QUERY); - PreparedStatement thirdPartyUsersStatement = sqlConnection.prepareStatement(thirdparty_users_QUERY); - PreparedStatement thirdPartyUsersToTenantStatement = sqlConnection.prepareStatement( - thirdparty_user_to_tenant_QUERY); + + List appIdToUserIdSetters = new ArrayList<>(); + List allAuthRecipeUsersSetters = new ArrayList<>(); + List thirdPartyUsersSetters = new ArrayList<>(); + List thirdPartyUsersToTenantSetters = new ArrayList<>(); int counter = 0; for (ThirdPartyImportUser user : users) { TenantIdentifier tenantIdentifier = user.tenantIdentifier; - appIdToUserIdStatement.setString(1, tenantIdentifier.getAppId()); - appIdToUserIdStatement.setString(2, user.userId); - appIdToUserIdStatement.setString(3, user.userId); - appIdToUserIdStatement.setString(4, THIRD_PARTY.toString()); - appIdToUserIdStatement.addBatch(); - - allAuthRecipeUsersStatement.setString(1, tenantIdentifier.getAppId()); - allAuthRecipeUsersStatement.setString(2, tenantIdentifier.getTenantId()); - allAuthRecipeUsersStatement.setString(3, user.userId); - allAuthRecipeUsersStatement.setString(4, user.userId); - allAuthRecipeUsersStatement.setString(5, THIRD_PARTY.toString()); - allAuthRecipeUsersStatement.setLong(6, user.timeJoinedMSSinceEpoch); - allAuthRecipeUsersStatement.setLong(7, user.timeJoinedMSSinceEpoch); - allAuthRecipeUsersStatement.addBatch(); - - thirdPartyUsersStatement.setString(1, tenantIdentifier.getAppId()); - thirdPartyUsersStatement.setString(2, user.thirdpartyId); - thirdPartyUsersStatement.setString(3, user.thirdpartyUserId); - thirdPartyUsersStatement.setString(4, user.userId); - thirdPartyUsersStatement.setString(5, user.email); - thirdPartyUsersStatement.setLong(6, user.timeJoinedMSSinceEpoch); - thirdPartyUsersStatement.addBatch(); - - thirdPartyUsersToTenantStatement.setString(1, tenantIdentifier.getAppId()); - thirdPartyUsersToTenantStatement.setString(2, tenantIdentifier.getTenantId()); - thirdPartyUsersToTenantStatement.setString(3, user.userId); - thirdPartyUsersToTenantStatement.setString(4, user.thirdpartyId); - thirdPartyUsersToTenantStatement.setString(5, user.thirdpartyUserId); - thirdPartyUsersToTenantStatement.addBatch(); - - counter++; - if(counter % 100 == 0) { - appIdToUserIdStatement.executeBatch(); - allAuthRecipeUsersStatement.executeBatch(); - thirdPartyUsersStatement.executeBatch(); - thirdPartyUsersToTenantStatement.executeBatch(); - } + appIdToUserIdSetters.add(pst -> { + pst.setString(1, tenantIdentifier.getAppId()); + pst.setString(2, user.userId); + pst.setString(3, user.userId); + pst.setString(4, THIRD_PARTY.toString()); + }); + allAuthRecipeUsersSetters.add(pst -> { + pst.setString(1, tenantIdentifier.getAppId()); + pst.setString(2, tenantIdentifier.getTenantId()); + pst.setString(3, user.userId); + pst.setString(4, user.userId); + pst.setString(5, THIRD_PARTY.toString()); + pst.setLong(6, user.timeJoinedMSSinceEpoch); + pst.setLong(7, user.timeJoinedMSSinceEpoch); + }); + thirdPartyUsersSetters.add(pst -> { + pst.setString(1, tenantIdentifier.getAppId()); + pst.setString(2, user.thirdpartyId); + pst.setString(3, user.thirdpartyUserId); + pst.setString(4, user.userId); + pst.setString(5, user.email); + pst.setLong(6, user.timeJoinedMSSinceEpoch); + }); + + thirdPartyUsersToTenantSetters.add(pst -> { + pst.setString(1, tenantIdentifier.getAppId()); + pst.setString(2, tenantIdentifier.getTenantId()); + pst.setString(3, user.userId); + pst.setString(4, user.thirdpartyId); + pst.setString(5, user.thirdpartyUserId); + }); + } - appIdToUserIdStatement.executeBatch(); - allAuthRecipeUsersStatement.executeBatch(); - thirdPartyUsersStatement.executeBatch(); - thirdPartyUsersToTenantStatement.executeBatch(); + executeBatch(sqlConnection, app_id_userid_QUERY, appIdToUserIdSetters); + executeBatch(sqlConnection, all_auth_recipe_users_QUERY, allAuthRecipeUsersSetters); + executeBatch(sqlConnection, thirdparty_users_QUERY, thirdPartyUsersSetters); + executeBatch(sqlConnection, thirdparty_user_to_tenant_QUERY, thirdPartyUsersToTenantSetters); + } private static class UserInfoPartial { diff --git a/src/main/java/io/supertokens/storage/mysql/queries/UserIdMappingQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/UserIdMappingQueries.java index f0335b3..5e325e9 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/UserIdMappingQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/UserIdMappingQueries.java @@ -21,13 +21,13 @@ import io.supertokens.pluginInterface.multitenancy.AppIdentifier; import io.supertokens.pluginInterface.useridmapping.UserIdMapping; import io.supertokens.storage.mysql.ConnectionPool; +import io.supertokens.storage.mysql.PreparedStatementValueSetter; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; import io.supertokens.storage.mysql.utils.Utils; import javax.annotation.Nullable; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; @@ -35,8 +35,7 @@ import java.util.List; import java.util.Map; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; +import static io.supertokens.storage.mysql.QueryExecutorTemplate.*; import static io.supertokens.storage.mysql.config.Config.getConfig; public class UserIdMappingQueries { @@ -95,21 +94,16 @@ public static void createBulkUserIdMapping(Start start, AppIdentifier appIdentif + " (app_id, supertokens_user_id, external_user_id)" + " VALUES(?, ?, ?)"; Connection sqlConnection = ConnectionPool.getConnection(start); - PreparedStatement insertStatement = sqlConnection.prepareStatement(QUERY); + List setters = new ArrayList<>(); - int counter = 0; for(String superTokensUserId : superTokensUserIdToExternalUserId.keySet()) { - insertStatement.setString(1, appIdentifier.getAppId()); - insertStatement.setString(2, superTokensUserId); - insertStatement.setString(3, superTokensUserIdToExternalUserId.get(superTokensUserId)); - insertStatement.addBatch(); - - counter++; - if(counter % 100 == 0) { - insertStatement.executeBatch(); - } + setters.add(pst -> { + pst.setString(1, appIdentifier.getAppId()); + pst.setString(2, superTokensUserId); + pst.setString(3, superTokensUserIdToExternalUserId.get(superTokensUserId)); + }); } - insertStatement.executeBatch(); + executeBatch(sqlConnection, QUERY, setters); } public static UserIdMapping getUserIdMappingWithExternalUserId(Start start, AppIdentifier appIdentifier, diff --git a/src/main/java/io/supertokens/storage/mysql/queries/UserMetadataQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/UserMetadataQueries.java index 5e9a0d5..aa634a2 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/UserMetadataQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/UserMetadataQueries.java @@ -21,19 +21,19 @@ import io.supertokens.pluginInterface.exceptions.StorageQueryException; import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; import io.supertokens.pluginInterface.multitenancy.AppIdentifier; +import io.supertokens.storage.mysql.PreparedStatementValueSetter; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; import io.supertokens.storage.mysql.utils.Utils; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; +import static io.supertokens.storage.mysql.QueryExecutorTemplate.*; import static io.supertokens.storage.mysql.config.Config.getConfig; public class UserMetadataQueries { @@ -132,23 +132,18 @@ public static void setMultipleUsersMetadatas_Transaction(Start start, Connection String QUERY = "INSERT INTO " + getConfig(start).getUserMetadataTable() + " (app_id, user_id, user_metadata) VALUES(?, ?, ?)" + " ON DUPLICATE KEY UPDATE user_metadata = ?"; - PreparedStatement insertStatement = con.prepareStatement(QUERY); - int counter = 0; + List setters = new ArrayList<>(); + for(Map.Entry metadataByUserId : metadatasByUserId.entrySet()){ - insertStatement.setString(1, appIdentifier.getAppId()); - insertStatement.setString(2, metadataByUserId.getKey()); - insertStatement.setString(3, metadataByUserId.getValue().toString()); - insertStatement.setString(4, metadataByUserId.getValue().toString()); - insertStatement.addBatch(); - - counter++; - if(counter % 100 == 0) { - insertStatement.executeBatch(); - } + setters.add(pst -> { + pst.setString(1, appIdentifier.getAppId()); + pst.setString(2, metadataByUserId.getKey()); + pst.setString(3, metadataByUserId.getValue().toString()); + pst.setString(4, metadataByUserId.getValue().toString()); + }); } - - insertStatement.executeBatch(); + executeBatch(con, QUERY, setters); } public static Map getMultipleUsersMetadatas_Transaction(Start start, Connection con, AppIdentifier appIdentifier, diff --git a/src/main/java/io/supertokens/storage/mysql/queries/UserRolesQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/UserRolesQueries.java index 45154b6..ea10349 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/UserRolesQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/UserRolesQueries.java @@ -19,12 +19,12 @@ import io.supertokens.pluginInterface.exceptions.StorageQueryException; import io.supertokens.pluginInterface.multitenancy.AppIdentifier; import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; +import io.supertokens.storage.mysql.PreparedStatementValueSetter; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; import io.supertokens.storage.mysql.utils.Utils; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; @@ -32,8 +32,7 @@ import java.util.List; import java.util.Map; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; -import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; +import static io.supertokens.storage.mysql.QueryExecutorTemplate.*; public class UserRolesQueries { public static String getQueryToCreateRolesTable(Start start) { @@ -375,27 +374,22 @@ public static void addRolesToUsers_Transaction(Start start, Connection connectio throws SQLException, StorageQueryException { String QUERY = "INSERT INTO " + Config.getConfig(start).getUserRolesTable() + "(app_id, tenant_id, user_id, role) VALUES(?, ?, ?, ?);"; - PreparedStatement insertStatement = connection.prepareStatement(QUERY); + List setters = new ArrayList<>(); - int counter = 0; for(Map.Entry>> tenantsEntry : rolesToUserByTenants.entrySet()) { for(Map.Entry> rolesToUser : tenantsEntry.getValue().entrySet()) { for(String roleForUser : rolesToUser.getValue()){ - insertStatement.setString(1, tenantsEntry.getKey().getAppId()); - insertStatement.setString(2, tenantsEntry.getKey().getTenantId()); - insertStatement.setString(3, rolesToUser.getKey()); - insertStatement.setString(4, roleForUser); - insertStatement.addBatch(); - counter++; - - if(counter % 100 == 0) { - insertStatement.executeBatch(); - } + setters.add(pst -> { + pst.setString(1, tenantsEntry.getKey().getAppId()); + pst.setString(2, tenantsEntry.getKey().getTenantId()); + pst.setString(3, rolesToUser.getKey()); + pst.setString(4, roleForUser); + }); } } } - insertStatement.executeBatch(); + executeBatch(connection, QUERY, setters); } public static List doesMultipleRoleExist_transaction(Start start, Connection con, AppIdentifier appIdentifier, diff --git a/src/test/java/io/supertokens/storage/mysql/test/OneMillionUsersTest.java b/src/test/java/io/supertokens/storage/mysql/test/OneMillionUsersTest.java index 1a23e3b..73078ea 100644 --- a/src/test/java/io/supertokens/storage/mysql/test/OneMillionUsersTest.java +++ b/src/test/java/io/supertokens/storage/mysql/test/OneMillionUsersTest.java @@ -987,6 +987,10 @@ private static long measureTime(Supplier function) { @Test public void testWithOneMillionUsers() throws Exception { + if (System.getenv("ONE_MILLION_USERS_TEST") == null) { + return; + } + Main main = startCronProcess(String.valueOf(NUM_THREADS)); int NUMBER_OF_USERS_TO_UPLOAD = 1000000; // million @@ -1007,6 +1011,7 @@ public void testWithOneMillionUsers() throws Exception { JsonObject request = generateUsersJson(10000, i * 10000); // API allows 10k users upload at once JsonObject response = uploadBulkImportUsersJson(main, request); assertEquals("OK", response.get("status").getAsString()); + System.out.println("Uploaded " + (i + 1) * 10000 + " users"); } } @@ -1031,7 +1036,7 @@ public void testWithOneMillionUsers() throws Exception { int failedUsersNumber = loadBulkImportUsersCountWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt(); count = newUsersNumber + processingUsersNumber; - + System.out.println("Remaining users to process: " + count + " (new: " + newUsersNumber + ", processing: " + processingUsersNumber + ", failed: " + failedUsersNumber + ")"); if (count == 0) { break; }