Skip to content

Commit

Permalink
chore: bulk import changelog (#137)
Browse files Browse the repository at this point in the history
* chore: update changelog

* chore: fix formatting

* fix: integrating bulk execute with QueryExecutorTemplate - fixing tests
  • Loading branch information
tamassoltesz authored Dec 20, 2024
1 parent 5d7c0a0 commit 9202ee5
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 352 deletions.
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,35 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [7.3.0]

- Adds queries for Bulk Import
- Adds support for multithreaded bulk import

### Migration

```sql
CREATE TABLE IF NOT EXISTS bulk_import_users (
id CHAR(36),
app_id VARCHAR(64) NOT NULL DEFAULT 'public',
primary_user_id VARCHAR(36),
raw_data TEXT NOT NULL,
status VARCHAR(128) DEFAULT 'NEW',
error_msg TEXT,
created_at BIGINT UNSIGNED NOT NULL,
updated_at BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (app_id, id),
FOREIGN KEY(app_id) REFERENCES apps(app_id) ON DELETE CASCADE
);

CREATE INDEX bulk_import_users_status_updated_at_index ON bulk_import_users (app_id, status, updated_at);

CREATE INDEX bulk_import_users_pagination_index1 ON bulk_import_users (app_id, status, created_at DESC,
id DESC);

CREATE INDEX bulk_import_users_pagination_index2 ON bulk_import_users (app_id, created_at DESC, id DESC);
```

## [7.2.0] - 2024-10-03

- Compatible with plugin interface version 6.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public interface QueryExecutorTemplate {

Expand All @@ -44,6 +45,25 @@ static <T> T execute(Connection con, String QUERY, PreparedStatementValueSetter
}
}

static void executeBatch(Connection connection, String QUERY, List<PreparedStatementValueSetter> setters)
throws SQLException, StorageQueryException {
assert setters != null;
assert !setters.isEmpty();
try (PreparedStatement pst = connection.prepareStatement(QUERY)) {
int counter = 0;
for(PreparedStatementValueSetter setter: setters) {
setter.setValues(pst);
pst.addBatch();
counter++;

if(counter % 100 == 0) {
pst.executeBatch();
}
}
pst.executeBatch(); //for the possible remaining ones
}
}

static int update(Start start, String QUERY, PreparedStatementValueSetter setter)
throws SQLException, StorageQueryException {
try (Connection con = ConnectionPool.getConnection(start)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,21 @@
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.storage.mysql.PreparedStatementValueSetter;
import io.supertokens.storage.mysql.Start;
import io.supertokens.storage.mysql.config.Config;
import io.supertokens.storage.mysql.utils.Utils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.update;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.*;

public class BulkImportQueries {
static String getQueryToCreateBulkImportUsersTable(Start start) {
Expand Down Expand Up @@ -317,28 +316,24 @@ public static long getBulkImportUsersCount(Start start, AppIdentifier appIdentif

public static void updateMultipleBulkImportUsersStatusToError_Transaction(Start start, Connection con, AppIdentifier appIdentifier,
@Nonnull Map<String,String> bulkImportUserIdToErrorMessage)
throws SQLException {
throws SQLException, StorageQueryException {
BULK_IMPORT_USER_STATUS errorStatus = BULK_IMPORT_USER_STATUS.FAILED;
String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable()
+ " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?";

PreparedStatement setErrorStatement = con.prepareStatement(query);
List<PreparedStatementValueSetter> errorSetters = new ArrayList<>();

int counter = 0;
for(String bulkImportUserId : bulkImportUserIdToErrorMessage.keySet()){
setErrorStatement.setString(1, errorStatus.toString());
setErrorStatement.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
setErrorStatement.setLong(3, System.currentTimeMillis());
setErrorStatement.setString(4, appIdentifier.getAppId());
setErrorStatement.setString(5, bulkImportUserId);
setErrorStatement.addBatch();

if(counter % 100 == 0) {
setErrorStatement.executeBatch();
}
errorSetters.add(pst -> {
pst.setString(1, errorStatus.toString());
pst.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
pst.setLong(3, System.currentTimeMillis());
pst.setString(4, appIdentifier.getAppId());
pst.setString(5, bulkImportUserId);
});
}

setErrorStatement.executeBatch();
executeBatch(con, query, errorSetters);
}

private static class BulkImportUserRowMapper implements RowMapper<BulkImportUser, ResultSet> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.storage.mysql.PreparedStatementValueSetter;
import io.supertokens.storage.mysql.Start;
import io.supertokens.storage.mysql.config.Config;
import io.supertokens.storage.mysql.utils.Utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;

import static io.supertokens.pluginInterface.RECIPE_ID.EMAIL_PASSWORD;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.update;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.*;
import static io.supertokens.storage.mysql.config.Config.getConfig;
import static java.lang.System.currentTimeMillis;

Expand Down Expand Up @@ -330,60 +329,53 @@ public static void signUpMultipleForBulkImport_Transaction(Start start, Connecti
"INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable()
+ "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)";

PreparedStatement appIdToUserId = sqlCon.prepareStatement(app_id_to_user_id_QUERY);
PreparedStatement allAuthRecipeUsers = sqlCon.prepareStatement(all_auth_recipe_users_QUERY);
PreparedStatement emailPasswordUsers = sqlCon.prepareStatement(emailpassword_users_QUERY);
PreparedStatement emailPasswordUsersToTenant = sqlCon.prepareStatement(emailpassword_users_to_tenant_QUERY);
List<PreparedStatementValueSetter> appIdToUserIdSetters = new ArrayList<>();
List<PreparedStatementValueSetter> allAuthRecipeUsersSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emailPasswordUsersSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emailPasswordUsersToTenantSetters = new ArrayList<>();;

int counter = 0;
for (EmailPasswordImportUser user : usersToSignUp) {
String userId = user.userId;
TenantIdentifier tenantIdentifier = user.tenantIdentifier;

appIdToUserId.setString(1, tenantIdentifier.getAppId());
appIdToUserId.setString(2, userId);
appIdToUserId.setString(3, userId);
appIdToUserId.setString(4, EMAIL_PASSWORD.toString());
appIdToUserId.addBatch();


allAuthRecipeUsers.setString(1, tenantIdentifier.getAppId());
allAuthRecipeUsers.setString(2, tenantIdentifier.getTenantId());
allAuthRecipeUsers.setString(3, userId);
allAuthRecipeUsers.setString(4, userId);
allAuthRecipeUsers.setString(5, EMAIL_PASSWORD.toString());
allAuthRecipeUsers.setLong(6, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.setLong(7, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.addBatch();

emailPasswordUsers.setString(1, tenantIdentifier.getAppId());
emailPasswordUsers.setString(2, userId);
emailPasswordUsers.setString(3, user.email);
emailPasswordUsers.setString(4, user.passwordHash);
emailPasswordUsers.setLong(5, user.timeJoinedMSSinceEpoch);
emailPasswordUsers.addBatch();

emailPasswordUsersToTenant.setString(1, tenantIdentifier.getAppId());
emailPasswordUsersToTenant.setString(2, tenantIdentifier.getTenantId());
emailPasswordUsersToTenant.setString(3, userId);
emailPasswordUsersToTenant.setString(4, user.email);
emailPasswordUsersToTenant.addBatch();
counter++;
if (counter % 100 == 0) {
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
}
appIdToUserIdSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, userId);
pst.setString(4, EMAIL_PASSWORD.toString());
});

allAuthRecipeUsersSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, userId);
pst.setString(5, EMAIL_PASSWORD.toString());
pst.setLong(6, user.timeJoinedMSSinceEpoch);
pst.setLong(7, user.timeJoinedMSSinceEpoch);
});

emailPasswordUsersSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, user.email);
pst.setString(4, user.passwordHash);
pst.setLong(5, user.timeJoinedMSSinceEpoch);
});

emailPasswordUsersToTenantSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, user.email);
});
}

//execute the remaining ones
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
executeBatch(sqlCon, app_id_to_user_id_QUERY, appIdToUserIdSetters);
executeBatch(sqlCon, all_auth_recipe_users_QUERY, allAuthRecipeUsersSetters);
executeBatch(sqlCon, emailpassword_users_QUERY, emailPasswordUsersSetters);
executeBatch(sqlCon, emailpassword_users_to_tenant_QUERY, emailPasswordUsersToTenantSetters);

//sqlCon.commit();
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;
import io.supertokens.storage.mysql.PreparedStatementValueSetter;
import io.supertokens.storage.mysql.Start;
import io.supertokens.storage.mysql.config.Config;
import io.supertokens.storage.mysql.utils.Utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;

import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.update;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.*;
import static io.supertokens.storage.mysql.config.Config.getConfig;
import static java.lang.System.currentTimeMillis;

Expand Down Expand Up @@ -500,41 +499,25 @@ public static void updateMultipleUsersIsEmailVerified_Transaction(Start start, C
boolean isEmailVerified)
throws SQLException, StorageQueryException {

String QUERY = "";
if (isEmailVerified) {
String QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
+ "(app_id, user_id, email) VALUES(?, ?, ?)";
PreparedStatement insertQuery = con.prepareStatement(QUERY);
int counter = 0;
for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
insertQuery.setString(1, appIdentifier.getAppId());
insertQuery.setString(2, emailToUser.getKey());
insertQuery.setString(3, emailToUser.getValue());
insertQuery.addBatch();

counter++;
if (counter % 100 == 0) {
insertQuery.executeBatch();
}
}
insertQuery.executeBatch();
} else {
String QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
+ " WHERE app_id = ? AND user_id = ? AND email = ?";
PreparedStatement deleteQuery = con.prepareStatement(QUERY);
int counter = 0;
for (Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()) {
deleteQuery.setString(1, appIdentifier.getAppId());
deleteQuery.setString(2, emailToUser.getValue());
deleteQuery.setString(3, emailToUser.getKey());
deleteQuery.addBatch();

counter++;
if (counter % 100 == 0) {
deleteQuery.executeBatch();
}
}
deleteQuery.executeBatch();
}
List<PreparedStatementValueSetter> setters = new ArrayList<>();

for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
setters.add(pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, emailToUser.getKey());
pst.setString(3, emailToUser.getValue());
});
}

executeBatch(con, QUERY, setters);
}

public static Set<String> findUserIdsBeingUsedForEmailVerification(Start start, AppIdentifier appIdentifier, List<String> userIds)
Expand Down Expand Up @@ -587,29 +570,29 @@ public static void updateMultipleIsEmailVerifiedToExternalUserIds(Start start, A
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
String update_email_verification_tokens_table_query = "UPDATE " + getConfig(start).getEmailVerificationTokensTable()
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
PreparedStatement updateEmailVerificationQuery = sqlCon.prepareStatement(update_email_verification_table_query);
PreparedStatement updateEmailVerificationTokensQuery = sqlCon.prepareStatement(update_email_verification_tokens_table_query);

int counter = 0;
List<PreparedStatementValueSetter> emailVerificationSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emailVerificationTokensSetters = new ArrayList<>();

for (String supertokensUserId : supertokensUserIdToExternalUserId.keySet()){
updateEmailVerificationQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
updateEmailVerificationQuery.setString(2, appIdentifier.getAppId());
updateEmailVerificationQuery.setString(3, supertokensUserId);
updateEmailVerificationQuery.addBatch();

updateEmailVerificationTokensQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
updateEmailVerificationTokensQuery.setString(2, appIdentifier.getAppId());
updateEmailVerificationTokensQuery.setString(3, supertokensUserId);
updateEmailVerificationTokensQuery.addBatch();

counter++;
if(counter % 100 == 0) {
updateEmailVerificationQuery.executeBatch();
updateEmailVerificationTokensQuery.executeBatch();
}
emailVerificationSetters.add(pst -> {
pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});

emailVerificationTokensSetters.add(pst -> {
pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});
}
updateEmailVerificationQuery.executeBatch();
updateEmailVerificationTokensQuery.executeBatch();
if(emailVerificationSetters.isEmpty()){
return null;
}

executeBatch(sqlCon, update_email_verification_table_query, emailVerificationSetters);
executeBatch(sqlCon, update_email_verification_tokens_table_query, emailVerificationTokensSetters);

} catch (SQLException e) {
throw new StorageTransactionLogicException(e);
Expand Down
Loading

0 comments on commit 9202ee5

Please sign in to comment.