Skip to content

Commit

Permalink
fix: bulk import fixes (#238)
Browse files Browse the repository at this point in the history
* fix: fixing the OneMillionUsersTest

* fix: introduce batchExecute for QueryExecutorTemplate
  • Loading branch information
tamassoltesz authored Dec 20, 2024
1 parent c2ed495 commit d309722
Show file tree
Hide file tree
Showing 12 changed files with 298 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public interface QueryExecutorTemplate {

Expand All @@ -44,6 +45,25 @@ static <T> T execute(Connection con, String QUERY, PreparedStatementValueSetter
}
}

static void executeBatch(Connection connection, String QUERY, List<PreparedStatementValueSetter> setters)
throws SQLException, StorageQueryException {
assert setters != null;
assert !setters.isEmpty();
try (PreparedStatement pst = connection.prepareStatement(QUERY)) {
int counter = 0;
for(PreparedStatementValueSetter setter: setters) {
setter.setValues(pst);
pst.addBatch();
counter++;

if(counter % 100 == 0) {
pst.executeBatch();
}
}
pst.executeBatch(); //for the possible remaining ones
}
}

static int update(Start start, String QUERY, PreparedStatementValueSetter setter)
throws SQLException, StorageQueryException {
try (Connection con = ConnectionPool.getConnection(start)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,21 @@
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.storage.postgresql.PreparedStatementValueSetter;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
import io.supertokens.storage.postgresql.utils.Utils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.*;

public class BulkImportQueries {
static String getQueryToCreateBulkImportUsersTable(Start start) {
Expand Down Expand Up @@ -125,28 +124,23 @@ public static void updateBulkImportUserStatus_Transaction(Start start, Connectio

public static void updateMultipleBulkImportUsersStatusToError_Transaction(Start start, Connection con, AppIdentifier appIdentifier,
@Nonnull Map<String,String> bulkImportUserIdToErrorMessage)
throws SQLException {
throws SQLException, StorageQueryException {
BULK_IMPORT_USER_STATUS errorStatus = BULK_IMPORT_USER_STATUS.FAILED;
String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable()
+ " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?";
List<PreparedStatementValueSetter> setters = new ArrayList<>();

PreparedStatement setErrorStatement = con.prepareStatement(query);

int counter = 0;
for(String bulkImportUserId : bulkImportUserIdToErrorMessage.keySet()){
setErrorStatement.setString(1, errorStatus.toString());
setErrorStatement.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
setErrorStatement.setLong(3, System.currentTimeMillis());
setErrorStatement.setString(4, appIdentifier.getAppId());
setErrorStatement.setString(5, bulkImportUserId);
setErrorStatement.addBatch();

if(counter % 100 == 0) {
setErrorStatement.executeBatch();
}
setters.add(pst -> {
pst.setString(1, errorStatus.toString());
pst.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
pst.setLong(3, System.currentTimeMillis());
pst.setString(4, appIdentifier.getAppId());
pst.setString(5, bulkImportUserId);
});
}

setErrorStatement.executeBatch();
executeBatch(con, query, setters);
}

public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing(Start start,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.storage.postgresql.PreparedStatementValueSetter;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
import io.supertokens.storage.postgresql.utils.Utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;

import static io.supertokens.pluginInterface.RECIPE_ID.EMAIL_PASSWORD;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.*;
import static io.supertokens.storage.postgresql.config.Config.getConfig;
import static java.lang.System.currentTimeMillis;

Expand Down Expand Up @@ -354,59 +353,52 @@ public static void signUpMultipleForBulkImport_Transaction(Start start, Connecti
"INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable()
+ "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)";

PreparedStatement appIdToUserId = sqlCon.prepareStatement(app_id_to_user_id_QUERY);
PreparedStatement allAuthRecipeUsers = sqlCon.prepareStatement(all_auth_recipe_users_QUERY);
PreparedStatement emailPasswordUsers = sqlCon.prepareStatement(emailpassword_users_QUERY);
PreparedStatement emailPasswordUsersToTenant = sqlCon.prepareStatement(emailpassword_users_to_tenant_QUERY);
List<PreparedStatementValueSetter> appIdToUserIdSetters = new ArrayList<>();
List<PreparedStatementValueSetter> allAuthRecipeUsersSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emailPasswordUsersSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emailPasswordUsersToTenantSetters = new ArrayList<>();

int counter = 0;
for (EmailPasswordImportUser user : usersToSignUp) {
String userId = user.userId;
TenantIdentifier tenantIdentifier = user.tenantIdentifier;

appIdToUserId.setString(1, tenantIdentifier.getAppId());
appIdToUserId.setString(2, userId);
appIdToUserId.setString(3, userId);
appIdToUserId.setString(4, EMAIL_PASSWORD.toString());
appIdToUserId.addBatch();


allAuthRecipeUsers.setString(1, tenantIdentifier.getAppId());
allAuthRecipeUsers.setString(2, tenantIdentifier.getTenantId());
allAuthRecipeUsers.setString(3, userId);
allAuthRecipeUsers.setString(4, userId);
allAuthRecipeUsers.setString(5, EMAIL_PASSWORD.toString());
allAuthRecipeUsers.setLong(6, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.setLong(7, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.addBatch();

emailPasswordUsers.setString(1, tenantIdentifier.getAppId());
emailPasswordUsers.setString(2, userId);
emailPasswordUsers.setString(3, user.email);
emailPasswordUsers.setString(4, user.passwordHash);
emailPasswordUsers.setLong(5, user.timeJoinedMSSinceEpoch);
emailPasswordUsers.addBatch();

emailPasswordUsersToTenant.setString(1, tenantIdentifier.getAppId());
emailPasswordUsersToTenant.setString(2, tenantIdentifier.getTenantId());
emailPasswordUsersToTenant.setString(3, userId);
emailPasswordUsersToTenant.setString(4, user.email);
emailPasswordUsersToTenant.addBatch();
counter++;
if (counter % 100 == 0) {
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
}
}
appIdToUserIdSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, userId);
pst.setString(4, EMAIL_PASSWORD.toString());
});

allAuthRecipeUsersSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, userId);
pst.setString(5, EMAIL_PASSWORD.toString());
pst.setLong(6, user.timeJoinedMSSinceEpoch);
pst.setLong(7, user.timeJoinedMSSinceEpoch);
});

//execute the remaining ones
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
emailPasswordUsersSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, user.email);
pst.setString(4, user.passwordHash);
pst.setLong(5, user.timeJoinedMSSinceEpoch);
});

emailPasswordUsersToTenantSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, user.email);
});
}

executeBatch(sqlCon, app_id_to_user_id_QUERY, appIdToUserIdSetters);
executeBatch(sqlCon, all_auth_recipe_users_QUERY, allAuthRecipeUsersSetters);
executeBatch(sqlCon, emailpassword_users_QUERY, emailPasswordUsersSetters);
executeBatch(sqlCon, emailpassword_users_to_tenant_QUERY, emailPasswordUsersToTenantSetters);
sqlCon.commit();
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;
import io.supertokens.storage.postgresql.PreparedStatementValueSetter;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
import io.supertokens.storage.postgresql.utils.Utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;

import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.*;
import static io.supertokens.storage.postgresql.config.Config.getConfig;
import static java.lang.System.currentTimeMillis;

Expand Down Expand Up @@ -130,41 +129,25 @@ public static void updateMultipleUsersIsEmailVerified_Transaction(Start start, C
boolean isEmailVerified)
throws SQLException, StorageQueryException {

String QUERY;
if (isEmailVerified) {
String QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
+ "(app_id, user_id, email) VALUES(?, ?, ?)";
PreparedStatement insertQuery = con.prepareStatement(QUERY);
int counter = 0;
for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
insertQuery.setString(1, appIdentifier.getAppId());
insertQuery.setString(2, emailToUser.getKey());
insertQuery.setString(3, emailToUser.getValue());
insertQuery.addBatch();

counter++;
if (counter % 100 == 0) {
insertQuery.executeBatch();
}
}
insertQuery.executeBatch();
} else {
String QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
+ " WHERE app_id = ? AND user_id = ? AND email = ?";
PreparedStatement deleteQuery = con.prepareStatement(QUERY);
int counter = 0;
for (Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()) {
deleteQuery.setString(1, appIdentifier.getAppId());
deleteQuery.setString(2, emailToUser.getValue());
deleteQuery.setString(3, emailToUser.getKey());
deleteQuery.addBatch();

counter++;
if (counter % 100 == 0) {
deleteQuery.executeBatch();
}
}
deleteQuery.executeBatch();
}

List<PreparedStatementValueSetter> setters = new ArrayList<>();

for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
setters.add(pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, emailToUser.getKey());
pst.setString(3, emailToUser.getValue());
});
}
executeBatch(con, QUERY, setters);
}

public static void deleteAllEmailVerificationTokensForUser_Transaction(Start start, Connection con,
Expand Down Expand Up @@ -610,30 +593,30 @@ public static void updateMultipleIsEmailVerifiedToExternalUserIds(Start start, A
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
String update_email_verification_tokens_table_query = "UPDATE " + getConfig(start).getEmailVerificationTokensTable()
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
PreparedStatement updateEmailVerificationQuery = sqlCon.prepareStatement(update_email_verification_table_query);
PreparedStatement updateEmailVerificationTokensQuery = sqlCon.prepareStatement(update_email_verification_tokens_table_query);

int counter = 0;
List<PreparedStatementValueSetter> emailVerificationSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emalVerificationTokensSetters = new ArrayList<>();

for (String supertokensUserId : supertokensUserIdToExternalUserId.keySet()){
updateEmailVerificationQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
updateEmailVerificationQuery.setString(2, appIdentifier.getAppId());
updateEmailVerificationQuery.setString(3, supertokensUserId);
updateEmailVerificationQuery.addBatch();

updateEmailVerificationTokensQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
updateEmailVerificationTokensQuery.setString(2, appIdentifier.getAppId());
updateEmailVerificationTokensQuery.setString(3, supertokensUserId);
updateEmailVerificationTokensQuery.addBatch();

counter++;
if(counter % 100 == 0) {
updateEmailVerificationQuery.executeBatch();
updateEmailVerificationTokensQuery.executeBatch();
}
emailVerificationSetters.add(pst -> {
pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});

emalVerificationTokensSetters.add(pst -> {
pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});
}

if(emailVerificationSetters.isEmpty()){
return null;
}
updateEmailVerificationQuery.executeBatch();
updateEmailVerificationTokensQuery.executeBatch();

executeBatch(sqlCon, update_email_verification_table_query, emailVerificationSetters);
executeBatch(sqlCon, update_email_verification_tokens_table_query, emalVerificationTokensSetters);
} catch (SQLException e) {
throw new StorageTransactionLogicException(e);
}
Expand Down
Loading

0 comments on commit d309722

Please sign in to comment.