Skip to content

Commit

Permalink
fix: integrating bulk execute with QueryExecutorTemplate - fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tamassoltesz committed Dec 19, 2024
1 parent bae00e2 commit 4902c1f
Show file tree
Hide file tree
Showing 12 changed files with 305 additions and 352 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public interface QueryExecutorTemplate {

Expand All @@ -44,6 +45,25 @@ static <T> T execute(Connection con, String QUERY, PreparedStatementValueSetter
}
}

static void executeBatch(Connection connection, String QUERY, List<PreparedStatementValueSetter> setters)
throws SQLException, StorageQueryException {
assert setters != null;
assert !setters.isEmpty();
try (PreparedStatement pst = connection.prepareStatement(QUERY)) {
int counter = 0;
for(PreparedStatementValueSetter setter: setters) {
setter.setValues(pst);
pst.addBatch();
counter++;

if(counter % 100 == 0) {
pst.executeBatch();
}
}
pst.executeBatch(); //for the possible remaining ones
}
}

static int update(Start start, String QUERY, PreparedStatementValueSetter setter)
throws SQLException, StorageQueryException {
try (Connection con = ConnectionPool.getConnection(start)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,21 @@
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.storage.mysql.PreparedStatementValueSetter;
import io.supertokens.storage.mysql.Start;
import io.supertokens.storage.mysql.config.Config;
import io.supertokens.storage.mysql.utils.Utils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.update;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.*;

public class BulkImportQueries {
static String getQueryToCreateBulkImportUsersTable(Start start) {
Expand Down Expand Up @@ -317,28 +316,24 @@ public static long getBulkImportUsersCount(Start start, AppIdentifier appIdentif

public static void updateMultipleBulkImportUsersStatusToError_Transaction(Start start, Connection con, AppIdentifier appIdentifier,
@Nonnull Map<String,String> bulkImportUserIdToErrorMessage)
throws SQLException {
throws SQLException, StorageQueryException {
BULK_IMPORT_USER_STATUS errorStatus = BULK_IMPORT_USER_STATUS.FAILED;
String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable()
+ " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?";

PreparedStatement setErrorStatement = con.prepareStatement(query);
List<PreparedStatementValueSetter> errorSetters = new ArrayList<>();

int counter = 0;
for(String bulkImportUserId : bulkImportUserIdToErrorMessage.keySet()){
setErrorStatement.setString(1, errorStatus.toString());
setErrorStatement.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
setErrorStatement.setLong(3, System.currentTimeMillis());
setErrorStatement.setString(4, appIdentifier.getAppId());
setErrorStatement.setString(5, bulkImportUserId);
setErrorStatement.addBatch();

if(counter % 100 == 0) {
setErrorStatement.executeBatch();
}
errorSetters.add(pst -> {
pst.setString(1, errorStatus.toString());
pst.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
pst.setLong(3, System.currentTimeMillis());
pst.setString(4, appIdentifier.getAppId());
pst.setString(5, bulkImportUserId);
});
}

setErrorStatement.executeBatch();
executeBatch(con, query, errorSetters);
}

private static class BulkImportUserRowMapper implements RowMapper<BulkImportUser, ResultSet> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.storage.mysql.PreparedStatementValueSetter;
import io.supertokens.storage.mysql.Start;
import io.supertokens.storage.mysql.config.Config;
import io.supertokens.storage.mysql.utils.Utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;

import static io.supertokens.pluginInterface.RECIPE_ID.EMAIL_PASSWORD;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.update;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.*;
import static io.supertokens.storage.mysql.config.Config.getConfig;
import static java.lang.System.currentTimeMillis;

Expand Down Expand Up @@ -330,60 +329,53 @@ public static void signUpMultipleForBulkImport_Transaction(Start start, Connecti
"INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable()
+ "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)";

PreparedStatement appIdToUserId = sqlCon.prepareStatement(app_id_to_user_id_QUERY);
PreparedStatement allAuthRecipeUsers = sqlCon.prepareStatement(all_auth_recipe_users_QUERY);
PreparedStatement emailPasswordUsers = sqlCon.prepareStatement(emailpassword_users_QUERY);
PreparedStatement emailPasswordUsersToTenant = sqlCon.prepareStatement(emailpassword_users_to_tenant_QUERY);
List<PreparedStatementValueSetter> appIdToUserIdSetters = new ArrayList<>();
List<PreparedStatementValueSetter> allAuthRecipeUsersSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emailPasswordUsersSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emailPasswordUsersToTenantSetters = new ArrayList<>();;

int counter = 0;
for (EmailPasswordImportUser user : usersToSignUp) {
String userId = user.userId;
TenantIdentifier tenantIdentifier = user.tenantIdentifier;

appIdToUserId.setString(1, tenantIdentifier.getAppId());
appIdToUserId.setString(2, userId);
appIdToUserId.setString(3, userId);
appIdToUserId.setString(4, EMAIL_PASSWORD.toString());
appIdToUserId.addBatch();


allAuthRecipeUsers.setString(1, tenantIdentifier.getAppId());
allAuthRecipeUsers.setString(2, tenantIdentifier.getTenantId());
allAuthRecipeUsers.setString(3, userId);
allAuthRecipeUsers.setString(4, userId);
allAuthRecipeUsers.setString(5, EMAIL_PASSWORD.toString());
allAuthRecipeUsers.setLong(6, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.setLong(7, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.addBatch();

emailPasswordUsers.setString(1, tenantIdentifier.getAppId());
emailPasswordUsers.setString(2, userId);
emailPasswordUsers.setString(3, user.email);
emailPasswordUsers.setString(4, user.passwordHash);
emailPasswordUsers.setLong(5, user.timeJoinedMSSinceEpoch);
emailPasswordUsers.addBatch();

emailPasswordUsersToTenant.setString(1, tenantIdentifier.getAppId());
emailPasswordUsersToTenant.setString(2, tenantIdentifier.getTenantId());
emailPasswordUsersToTenant.setString(3, userId);
emailPasswordUsersToTenant.setString(4, user.email);
emailPasswordUsersToTenant.addBatch();
counter++;
if (counter % 100 == 0) {
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
}
appIdToUserIdSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, userId);
pst.setString(4, EMAIL_PASSWORD.toString());
});

allAuthRecipeUsersSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, userId);
pst.setString(5, EMAIL_PASSWORD.toString());
pst.setLong(6, user.timeJoinedMSSinceEpoch);
pst.setLong(7, user.timeJoinedMSSinceEpoch);
});

emailPasswordUsersSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, user.email);
pst.setString(4, user.passwordHash);
pst.setLong(5, user.timeJoinedMSSinceEpoch);
});

emailPasswordUsersToTenantSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, user.email);
});
}

//execute the remaining ones
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
executeBatch(sqlCon, app_id_to_user_id_QUERY, appIdToUserIdSetters);
executeBatch(sqlCon, all_auth_recipe_users_QUERY, allAuthRecipeUsersSetters);
executeBatch(sqlCon, emailpassword_users_QUERY, emailPasswordUsersSetters);
executeBatch(sqlCon, emailpassword_users_to_tenant_QUERY, emailPasswordUsersToTenantSetters);

//sqlCon.commit();
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;
import io.supertokens.storage.mysql.PreparedStatementValueSetter;
import io.supertokens.storage.mysql.Start;
import io.supertokens.storage.mysql.config.Config;
import io.supertokens.storage.mysql.utils.Utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;

import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.update;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.*;
import static io.supertokens.storage.mysql.config.Config.getConfig;
import static java.lang.System.currentTimeMillis;

Expand Down Expand Up @@ -500,41 +499,25 @@ public static void updateMultipleUsersIsEmailVerified_Transaction(Start start, C
boolean isEmailVerified)
throws SQLException, StorageQueryException {

String QUERY = "";
if (isEmailVerified) {
String QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
+ "(app_id, user_id, email) VALUES(?, ?, ?)";
PreparedStatement insertQuery = con.prepareStatement(QUERY);
int counter = 0;
for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
insertQuery.setString(1, appIdentifier.getAppId());
insertQuery.setString(2, emailToUser.getKey());
insertQuery.setString(3, emailToUser.getValue());
insertQuery.addBatch();

counter++;
if (counter % 100 == 0) {
insertQuery.executeBatch();
}
}
insertQuery.executeBatch();
} else {
String QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
+ " WHERE app_id = ? AND user_id = ? AND email = ?";
PreparedStatement deleteQuery = con.prepareStatement(QUERY);
int counter = 0;
for (Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()) {
deleteQuery.setString(1, appIdentifier.getAppId());
deleteQuery.setString(2, emailToUser.getValue());
deleteQuery.setString(3, emailToUser.getKey());
deleteQuery.addBatch();

counter++;
if (counter % 100 == 0) {
deleteQuery.executeBatch();
}
}
deleteQuery.executeBatch();
}
List<PreparedStatementValueSetter> setters = new ArrayList<>();

for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
setters.add(pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, emailToUser.getKey());
pst.setString(3, emailToUser.getValue());
});
}

executeBatch(con, QUERY, setters);
}

public static Set<String> findUserIdsBeingUsedForEmailVerification(Start start, AppIdentifier appIdentifier, List<String> userIds)
Expand Down Expand Up @@ -587,29 +570,29 @@ public static void updateMultipleIsEmailVerifiedToExternalUserIds(Start start, A
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
String update_email_verification_tokens_table_query = "UPDATE " + getConfig(start).getEmailVerificationTokensTable()
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
PreparedStatement updateEmailVerificationQuery = sqlCon.prepareStatement(update_email_verification_table_query);
PreparedStatement updateEmailVerificationTokensQuery = sqlCon.prepareStatement(update_email_verification_tokens_table_query);

int counter = 0;
List<PreparedStatementValueSetter> emailVerificationSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emailVerificationTokensSetters = new ArrayList<>();

for (String supertokensUserId : supertokensUserIdToExternalUserId.keySet()){
updateEmailVerificationQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
updateEmailVerificationQuery.setString(2, appIdentifier.getAppId());
updateEmailVerificationQuery.setString(3, supertokensUserId);
updateEmailVerificationQuery.addBatch();

updateEmailVerificationTokensQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
updateEmailVerificationTokensQuery.setString(2, appIdentifier.getAppId());
updateEmailVerificationTokensQuery.setString(3, supertokensUserId);
updateEmailVerificationTokensQuery.addBatch();

counter++;
if(counter % 100 == 0) {
updateEmailVerificationQuery.executeBatch();
updateEmailVerificationTokensQuery.executeBatch();
}
emailVerificationSetters.add(pst -> {
pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});

emailVerificationTokensSetters.add(pst -> {
pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});
}
updateEmailVerificationQuery.executeBatch();
updateEmailVerificationTokensQuery.executeBatch();
if(emailVerificationSetters.isEmpty()){
return null;
}

executeBatch(sqlCon, update_email_verification_table_query, emailVerificationSetters);
executeBatch(sqlCon, update_email_verification_tokens_table_query, emailVerificationTokensSetters);

} catch (SQLException e) {
throw new StorageTransactionLogicException(e);
Expand Down
Loading

0 comments on commit 4902c1f

Please sign in to comment.