Skip to content

Commit

Permalink
feat: mysql implementation for bulk import
Browse files Browse the repository at this point in the history
  • Loading branch information
tamassoltesz committed Dec 4, 2024
1 parent 519561e commit 7033f8b
Show file tree
Hide file tree
Showing 15 changed files with 1,283 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,7 @@

package io.supertokens.storage.mysql;

import java.sql.Array;
import java.sql.Blob;
import java.sql.CallableStatement;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,17 @@

package io.supertokens.storage.mysql;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;

import com.google.gson.JsonObject;

import io.supertokens.pluginInterface.LOG_LEVEL;
import io.supertokens.pluginInterface.exceptions.DbInitException;
import io.supertokens.pluginInterface.exceptions.InvalidConfigException;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException;
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;

/**
* BulkImportProxyStorage is a class extending Start, serving as a Storage instance in the bulk import user cronjob.
* This cronjob extensively utilizes existing queries to import users, all of which internally operate within transactions.
Expand All @@ -56,7 +52,7 @@ public synchronized Connection getTransactionConnection() throws SQLException, S

@Override
protected <T> T startTransactionHelper(TransactionLogic<T> logic, TransactionIsolationLevel isolationLevel)
throws StorageQueryException, StorageTransactionLogicException, SQLException {
throws StorageQueryException, StorageTransactionLogicException, SQLException, TenantOrAppNotFoundException {
return logic.mainLogicAndCommit(new TransactionConnection(getTransactionConnection()));
}

Expand Down
426 changes: 416 additions & 10 deletions src/main/java/io/supertokens/storage/mysql/Start.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.storage.mysql.Start;
import io.supertokens.storage.mysql.config.Config;
import io.supertokens.storage.mysql.utils.Utils;
import org.jetbrains.annotations.TestOnly;

import java.sql.Connection;
import java.sql.SQLException;

import org.jetbrains.annotations.TestOnly;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.update;
Expand Down Expand Up @@ -201,4 +204,28 @@ public static int countUsersThatHaveMoreThanOneLoginMethodOrTOTPEnabledAndActive
return result.next() ? result.getInt("c") : 0;
});
}

public static Map<String, Long> getLastActiveByMultipleUserIds(Start start, AppIdentifier appIdentifier, List<String> userIds)
throws StorageQueryException {
String QUERY = "SELECT user_id, last_active_time FROM " + Config.getConfig(start).getUserLastActiveTable()
+ " WHERE app_id = ? AND user_id IN ( " + Utils.generateCommaSeperatedQuestionMarks(userIds.size())+ " )";

try {
return execute(start, QUERY, pst -> {
pst.setString(1, appIdentifier.getAppId());
for (int i = 0; i < userIds.size(); i++) {
pst.setString(2+i, userIds.get(i));
}
}, res -> {
Map<String, Long> lastActiveByUserIds = new HashMap<>();
if (res.next()) {
String userId = res.getString("user_id");
lastActiveByUserIds.put(userId, res.getLong("last_active_time"));
}
return lastActiveByUserIds;
});
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,6 @@

package io.supertokens.storage.mysql.queries;

import static io.supertokens.storage.mysql.QueryExecutorTemplate.update;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import io.supertokens.pluginInterface.RowMapper;
import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
Expand All @@ -38,6 +26,19 @@
import io.supertokens.storage.mysql.config.Config;
import io.supertokens.storage.mysql.utils.Utils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.update;

public class BulkImportQueries {
static String getQueryToCreateBulkImportUsersTable(Start start) {
String tableName = Config.getConfig(start).getBulkImportUsersTable();
Expand Down Expand Up @@ -134,7 +135,7 @@ public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing
// "SKIP LOCKED" allows other processes to skip locked rows and select the next 1000 available rows.
String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable()
+ " WHERE app_id = ?"
+ " AND (status = 'NEW' OR (status = 'PROCESSING' AND updated_at < (UNIX_TIMESTAMP() * 1000) - 10 * 60 * 1000))" /* 10 mins */
+ " AND (status = 'NEW' OR status = 'PROCESSING')" /* 10 mins */
+ " LIMIT ? FOR UPDATE SKIP LOCKED";


Expand Down Expand Up @@ -314,6 +315,32 @@ public static long getBulkImportUsersCount(Start start, AppIdentifier appIdentif
});
}

public static void updateMultipleBulkImportUsersStatusToError_Transaction(Start start, Connection con, AppIdentifier appIdentifier,
@Nonnull Map<String,String> bulkImportUserIdToErrorMessage)
throws SQLException {
BULK_IMPORT_USER_STATUS errorStatus = BULK_IMPORT_USER_STATUS.FAILED;
String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable()
+ " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?";

PreparedStatement setErrorStatement = con.prepareStatement(query);

int counter = 0;
for(String bulkImportUserId : bulkImportUserIdToErrorMessage.keySet()){
setErrorStatement.setString(1, errorStatus.toString());
setErrorStatement.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
setErrorStatement.setLong(3, System.currentTimeMillis());
setErrorStatement.setString(4, appIdentifier.getAppId());
setErrorStatement.setString(5, bulkImportUserId);
setErrorStatement.addBatch();

if(counter % 100 == 0) {
setErrorStatement.executeBatch();
}
}

setErrorStatement.executeBatch();
}

private static class BulkImportUserRowMapper implements RowMapper<BulkImportUser, ResultSet> {
private static final BulkImportUserRowMapper INSTANCE = new BulkImportUserRowMapper();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.supertokens.pluginInterface.RowMapper;
import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo;
import io.supertokens.pluginInterface.authRecipe.LoginMethod;
import io.supertokens.pluginInterface.emailpassword.EmailPasswordImportUser;
import io.supertokens.pluginInterface.emailpassword.PasswordResetTokenInfo;
import io.supertokens.pluginInterface.emailpassword.exceptions.UnknownUserIdException;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
Expand All @@ -30,6 +31,7 @@
import io.supertokens.storage.mysql.utils.Utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
Expand Down Expand Up @@ -310,6 +312,83 @@ public static AuthRecipeUserInfo signUp(Start start, TenantIdentifier tenantIden
});
}

public static void signUpMultipleForBulkImport_Transaction(Start start, Connection sqlCon, List<EmailPasswordImportUser> usersToSignUp)
throws StorageQueryException, StorageTransactionLogicException, SQLException {
try {
String app_id_to_user_id_QUERY = "INSERT INTO " + getConfig(start).getAppIdToUserIdTable()
+ "(app_id, user_id, primary_or_recipe_user_id, recipe_id)" + " VALUES(?, ?, ?, ?)";

String all_auth_recipe_users_QUERY = "INSERT INTO " + getConfig(start).getUsersTable() +
"(app_id, tenant_id, user_id, primary_or_recipe_user_id, recipe_id, time_joined, " +
"primary_or_recipe_user_time_joined)" +
" VALUES(?, ?, ?, ?, ?, ?, ?)";

String emailpassword_users_QUERY = "INSERT INTO " + getConfig(start).getEmailPasswordUsersTable()
+ "(app_id, user_id, email, password_hash, time_joined)" + " VALUES(?, ?, ?, ?, ?)";

String emailpassword_users_to_tenant_QUERY =
"INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable()
+ "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)";

PreparedStatement appIdToUserId = sqlCon.prepareStatement(app_id_to_user_id_QUERY);
PreparedStatement allAuthRecipeUsers = sqlCon.prepareStatement(all_auth_recipe_users_QUERY);
PreparedStatement emailPasswordUsers = sqlCon.prepareStatement(emailpassword_users_QUERY);
PreparedStatement emailPasswordUsersToTenant = sqlCon.prepareStatement(emailpassword_users_to_tenant_QUERY);

int counter = 0;
for (EmailPasswordImportUser user : usersToSignUp) {
String userId = user.userId;
TenantIdentifier tenantIdentifier = user.tenantIdentifier;

appIdToUserId.setString(1, tenantIdentifier.getAppId());
appIdToUserId.setString(2, userId);
appIdToUserId.setString(3, userId);
appIdToUserId.setString(4, EMAIL_PASSWORD.toString());
appIdToUserId.addBatch();


allAuthRecipeUsers.setString(1, tenantIdentifier.getAppId());
allAuthRecipeUsers.setString(2, tenantIdentifier.getTenantId());
allAuthRecipeUsers.setString(3, userId);
allAuthRecipeUsers.setString(4, userId);
allAuthRecipeUsers.setString(5, EMAIL_PASSWORD.toString());
allAuthRecipeUsers.setLong(6, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.setLong(7, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.addBatch();

emailPasswordUsers.setString(1, tenantIdentifier.getAppId());
emailPasswordUsers.setString(2, userId);
emailPasswordUsers.setString(3, user.email);
emailPasswordUsers.setString(4, user.passwordHash);
emailPasswordUsers.setLong(5, user.timeJoinedMSSinceEpoch);
emailPasswordUsers.addBatch();

emailPasswordUsersToTenant.setString(1, tenantIdentifier.getAppId());
emailPasswordUsersToTenant.setString(2, tenantIdentifier.getTenantId());
emailPasswordUsersToTenant.setString(3, userId);
emailPasswordUsersToTenant.setString(4, user.email);
emailPasswordUsersToTenant.addBatch();
counter++;
if (counter % 100 == 0) {
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
}
}

//execute the remaining ones
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();

//sqlCon.commit();
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
}
}

public static void deleteUser_Transaction(Connection sqlCon, Start start, AppIdentifier appIdentifier,
String userId, boolean deleteUserIdMappingToo)
throws StorageQueryException, SQLException {
Expand Down
Loading

0 comments on commit 7033f8b

Please sign in to comment.