Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: bulkimport changelog #1082

Merged
merged 4 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
- GET `/bulk-import/users/count`
- POST `/bulk-import/users/remove`
- POST `/bulk-import/users/import`
- POST `/bulk-import/backgroundjob`
- GET `/bulk-import/backgroundjob`
- Adds `ProcessBulkImportUsers` cron job to process bulk import users
- Adds multithreaded worker support for the `ProcessBulkImportUsers` cron job for faster bulk imports
- Adds support for lazy importing users

### Migrations

For PostgreSQL, run the following SQL script:
```sql
"CREATE TABLE IF NOT EXISTS bulk_import_users (
id CHAR(36),
Expand All @@ -47,6 +46,29 @@ CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index1 ON bulk_import_us
CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index2 ON bulk_import_users (app_id, created_at DESC, id DESC);
```

For MySQL run the following SQL script:
```sql
CREATE TABLE IF NOT EXISTS bulk_import_users (
id CHAR(36),
app_id VARCHAR(64) NOT NULL DEFAULT 'public',
primary_user_id VARCHAR(36),
raw_data TEXT NOT NULL,
status VARCHAR(128) DEFAULT 'NEW',
error_msg TEXT,
created_at BIGINT UNSIGNED NOT NULL,
updated_at BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (app_id, id),
FOREIGN KEY(app_id) REFERENCES apps(app_id) ON DELETE CASCADE
);

CREATE INDEX bulk_import_users_status_updated_at_index ON bulk_import_users (app_id, status, updated_at);

CREATE INDEX bulk_import_users_pagination_index1 ON bulk_import_users (app_id, status, created_at DESC,
id DESC);

CREATE INDEX bulk_import_users_pagination_index2 ON bulk_import_users (app_id, created_at DESC, id DESC);
```

## [9.3.1]

- Includes exception class name in 500 error message
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/io/supertokens/inmemorydb/QueryExecutorTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public interface QueryExecutorTemplate {

Expand All @@ -44,6 +45,25 @@ public static <T> T execute(Connection con, String QUERY, PreparedStatementValue
}
}

static void executeBatch(Connection connection, String QUERY, List<PreparedStatementValueSetter> setters)
throws SQLException, StorageQueryException {
assert setters != null;
assert !setters.isEmpty();
try (PreparedStatement pst = connection.prepareStatement(QUERY)) {
int counter = 0;
for(PreparedStatementValueSetter setter: setters) {
setter.setValues(pst);
pst.addBatch();
counter++;

if(counter % 100 == 0) {
pst.executeBatch();
}
}
pst.executeBatch(); //for the possible remaining ones
}
}

public static int update(Start start, String QUERY, PreparedStatementValueSetter setter)
throws SQLException, StorageQueryException {
try (Connection con = ConnectionPool.getConnection(start)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.supertokens.inmemorydb.queries;

import io.supertokens.inmemorydb.ConnectionWithLocks;
import io.supertokens.inmemorydb.PreparedStatementValueSetter;
import io.supertokens.inmemorydb.Start;
import io.supertokens.inmemorydb.Utils;
import io.supertokens.inmemorydb.config.Config;
Expand All @@ -29,13 +30,11 @@
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;

import static io.supertokens.inmemorydb.QueryExecutorTemplate.execute;
import static io.supertokens.inmemorydb.QueryExecutorTemplate.update;
import static io.supertokens.inmemorydb.QueryExecutorTemplate.*;
import static io.supertokens.inmemorydb.config.Config.getConfig;
import static java.lang.System.currentTimeMillis;

Expand Down Expand Up @@ -109,29 +108,25 @@ public static void updateMultipleUsersIsEmailVerified_Transaction(Start start, C
boolean isEmailVerified)
throws SQLException, StorageQueryException {

String QUERY;
if (isEmailVerified) {
String QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
+ "(app_id, user_id, email) VALUES(?, ?, ?)";
PreparedStatement insertQuery = con.prepareStatement(QUERY);
for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
insertQuery.setString(1, appIdentifier.getAppId());
insertQuery.setString(2, emailToUser.getValue());
insertQuery.setString(3, emailToUser.getKey());
insertQuery.addBatch();
}
insertQuery.executeBatch();
} else {
String QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
+ " WHERE app_id = ? AND user_id = ? AND email = ?";
PreparedStatement deleteQuery = con.prepareStatement(QUERY);
for (Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()) {
deleteQuery.setString(1, appIdentifier.getAppId());
deleteQuery.setString(2, emailToUser.getValue());
deleteQuery.setString(3, emailToUser.getKey());
deleteQuery.addBatch();
}
deleteQuery.executeBatch();
}

List<PreparedStatementValueSetter> setters = new ArrayList<>();
for (Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()) {
setters.add(pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, emailToUser.getValue());
pst.setString(3, emailToUser.getKey());
});
}

executeBatch(con, QUERY, setters);
}

public static void deleteAllEmailVerificationTokensForUser_Transaction(Start start, Connection con,
Expand Down
97 changes: 43 additions & 54 deletions src/main/java/io/supertokens/inmemorydb/queries/GeneralQueries.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package io.supertokens.inmemorydb.queries;

import io.supertokens.Main;
import io.supertokens.inmemorydb.ConnectionPool;
import io.supertokens.inmemorydb.ConnectionWithLocks;
import io.supertokens.inmemorydb.Start;
import io.supertokens.inmemorydb.Utils;
import io.supertokens.inmemorydb.*;
import io.supertokens.inmemorydb.config.Config;
import io.supertokens.pluginInterface.KeyValueInfo;
import io.supertokens.pluginInterface.RECIPE_ID;
Expand All @@ -35,15 +32,17 @@
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

import java.sql.*;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;

import static io.supertokens.ProcessState.PROCESS_STATE.CREATING_NEW_TABLE;
import static io.supertokens.ProcessState.getInstance;
import static io.supertokens.inmemorydb.PreparedStatementValueSetter.NO_OP_SETTER;
import static io.supertokens.inmemorydb.QueryExecutorTemplate.execute;
import static io.supertokens.inmemorydb.QueryExecutorTemplate.update;
import static io.supertokens.inmemorydb.QueryExecutorTemplate.*;
import static io.supertokens.inmemorydb.config.Config.getConfig;
import static io.supertokens.inmemorydb.queries.EmailPasswordQueries.getQueryToCreatePasswordResetTokenExpiryIndex;
import static io.supertokens.inmemorydb.queries.EmailPasswordQueries.getQueryToCreatePasswordResetTokensTable;
Expand Down Expand Up @@ -966,26 +965,21 @@ public static void makePrimaryUsers_Transaction(Start start, Connection sqlCon,
String appid_to_userid_update_QUERY = "UPDATE " + getConfig(start).getAppIdToUserIdTable() +
" SET is_linked_or_is_a_primary_user = true WHERE app_id = ? AND user_id = ?";

PreparedStatement usersUpdateStatement = sqlCon.prepareStatement(users_update_QUERY);
PreparedStatement appIdToUserIdUpdateStatement = sqlCon.prepareStatement(appid_to_userid_update_QUERY);
int counter = 0;
for(String userId: userIds){
usersUpdateStatement.setString(1, appIdentifier.getAppId());
usersUpdateStatement.setString(2, userId);
usersUpdateStatement.addBatch();

appIdToUserIdUpdateStatement.setString(1, appIdentifier.getAppId());
appIdToUserIdUpdateStatement.setString(2, userId);
appIdToUserIdUpdateStatement.addBatch();

counter++;
if(counter % 100 == 0) {
usersUpdateStatement.executeBatch();
appIdToUserIdUpdateStatement.executeBatch();
}
List<PreparedStatementValueSetter> usersSetter = new ArrayList<>();
List<PreparedStatementValueSetter> appIdToUserIdSetter = new ArrayList<>();

for(String userId: userIds) {
usersSetter.add(pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, userId);
});
appIdToUserIdSetter.add(pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, userId);
});
}
usersUpdateStatement.executeBatch();
appIdToUserIdUpdateStatement.executeBatch();
executeBatch(sqlCon, users_update_QUERY, usersSetter);
executeBatch(sqlCon, appid_to_userid_update_QUERY, appIdToUserIdSetter);
}

public static void linkAccounts_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier,
Expand Down Expand Up @@ -1034,34 +1028,27 @@ public static void linkMultipleAccounts_Transaction(Start start, Connection sqlC
" SET is_linked_or_is_a_primary_user = true, primary_or_recipe_user_id = ? WHERE app_id = ? AND " +
"user_id = ?";

PreparedStatement updateUsers = sqlCon.prepareStatement(update_users_QUERY);
PreparedStatement updateAppIdToUserId = sqlCon.prepareStatement(update_appid_to_userid_QUERY);
List<PreparedStatementValueSetter> updateUsersSetter = new ArrayList<>();
List<PreparedStatementValueSetter> updateAppIdToUserIdSetter = new ArrayList<>();

int counter = 0;
for(Map.Entry<String, String> linkEntry : recipeUserIdToPrimaryUserId.entrySet()) {
String primaryUserId = linkEntry.getValue();
String recipeUserId = linkEntry.getKey();

updateUsers.setString(1, primaryUserId);
updateUsers.setString(2, appIdentifier.getAppId());
updateUsers.setString(3, recipeUserId);
updateUsers.addBatch();

updateAppIdToUserId.setString(1, primaryUserId);
updateAppIdToUserId.setString(2, appIdentifier.getAppId());
updateAppIdToUserId.setString(3, recipeUserId);
updateAppIdToUserId.addBatch();

counter++;
if (counter % 100 == 0) {
updateUsers.executeBatch();
updateAppIdToUserId.executeBatch();
}
updateUsersSetter.add(pst -> {
pst.setString(1, primaryUserId);
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, recipeUserId);
});
updateUsersSetter.add(pst -> {
pst.setString(1, primaryUserId);
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, recipeUserId);
});
}

updateUsers.executeBatch();
updateAppIdToUserId.executeBatch();

executeBatch(sqlCon, update_users_QUERY, updateUsersSetter);
executeBatch(sqlCon, update_appid_to_userid_QUERY, updateAppIdToUserIdSetter);
updateTimeJoinedForPrimaryUsers_Transaction(start, sqlCon, appIdentifier,
new ArrayList<>(recipeUserIdToPrimaryUserId.values()));
}
Expand Down Expand Up @@ -1764,16 +1751,18 @@ public static void updateTimeJoinedForPrimaryUsers_Transaction(Start start, Conn
" SET primary_or_recipe_user_time_joined = (SELECT MIN(time_joined) FROM " +
getConfig(start).getUsersTable() + " WHERE app_id = ? AND primary_or_recipe_user_id = ?) WHERE " +
" app_id = ? AND primary_or_recipe_user_id = ?";
PreparedStatement updateStatement = sqlCon.prepareStatement(QUERY);

List<PreparedStatementValueSetter> setters = new ArrayList<>();
for(String primaryUserId : primaryUserIds) {
updateStatement.setString(1, appIdentifier.getAppId());
updateStatement.setString(2, primaryUserId);
updateStatement.setString(3, appIdentifier.getAppId());
updateStatement.setString(4, primaryUserId);
updateStatement.addBatch();
setters.add(pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, primaryUserId);
pst.setString(3, appIdentifier.getAppId());
pst.setString(4, primaryUserId);
});
}

updateStatement.executeBatch();
executeBatch(sqlCon, QUERY, setters);
}

private static class AllAuthRecipeUsersResultHolder {
Expand Down
14 changes: 7 additions & 7 deletions src/test/java/io/supertokens/test/CronjobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ public void testThatReAddingSameCronTaskDoesNotScheduleMoreExecutors() throws Ex

Thread.sleep(5000);
assertTrue(CounterCronJob.getInstance(process.getProcess()).getCount() > 3 &&
CounterCronJob.getInstance(process.getProcess()).getCount() < 8);
CounterCronJob.getInstance(process.getProcess()).getCount() < 10);

process.kill();
assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STOPPED));
Expand Down Expand Up @@ -964,7 +964,7 @@ public void testThatCronJobsHaveTenantsInfoAfterRestart() throws Exception {
{
List<List<List<TenantIdentifier>>> tenantsInfos = Cronjobs.getInstance(process.getProcess())
.getTenantInfos();
assertEquals(11, tenantsInfos.size());
assertEquals(12, tenantsInfos.size());
int count = 0;
for (List<List<TenantIdentifier>> tenantsInfo : tenantsInfos) {
if (tenantsInfo != null) {
Expand All @@ -974,7 +974,7 @@ public void testThatCronJobsHaveTenantsInfoAfterRestart() throws Exception {
count++;
}
}
assertEquals(10, count);
assertEquals(11, count);
}

process.kill(false);
Expand All @@ -991,7 +991,7 @@ public void testThatCronJobsHaveTenantsInfoAfterRestart() throws Exception {
{
List<List<List<TenantIdentifier>>> tenantsInfos = Cronjobs.getInstance(process.getProcess())
.getTenantInfos();
assertEquals(11, tenantsInfos.size());
assertEquals(12, tenantsInfos.size());
int count = 0;
for (List<List<TenantIdentifier>> tenantsInfo : tenantsInfos) {
if (tenantsInfo != null) {
Expand All @@ -1001,7 +1001,7 @@ public void testThatCronJobsHaveTenantsInfoAfterRestart() throws Exception {
count++;
}
}
assertEquals(10, count);
assertEquals(11, count);
}

process.kill();
Expand Down Expand Up @@ -1048,7 +1048,7 @@ public void testThatThereAreTasksOfAllCronTaskClassesAndHaveCorrectIntervals() t
intervals.put("io.supertokens.cronjobs.telemetry.Telemetry", 86400);
intervals.put("io.supertokens.cronjobs.deleteExpiredAccessTokenSigningKeys.DeleteExpiredAccessTokenSigningKeys",
86400);
intervals.put("io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers", 60);
intervals.put("io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers", 300);
intervals.put("io.supertokens.cronjobs.cleanupOAuthSessionsAndChallenges.CleanupOAuthSessionsAndChallenges",
86400);

Expand All @@ -1070,7 +1070,7 @@ public void testThatThereAreTasksOfAllCronTaskClassesAndHaveCorrectIntervals() t
0);

List<CronTask> allTasks = Cronjobs.getInstance(process.getProcess()).getTasks();
assertEquals(11, allTasks.size());
assertEquals(12, allTasks.size());

for (CronTask task : allTasks) {
assertEquals(intervals.get(task.getClass().getName()).intValue(), task.getIntervalTimeSeconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void beforeEach() {
public void testWithALotOfUsers() throws Exception {
Main main = startCronProcess("14");

int NUMBER_OF_USERS_TO_UPLOAD = 100000;
int NUMBER_OF_USERS_TO_UPLOAD = 10000;

if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) {
return;
Expand Down Expand Up @@ -115,6 +115,7 @@ public void testWithALotOfUsers() throws Exception {
int failedUsersNumber = loadBulkImportUsersCountWithStatus(main,
BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt();
count = newUsersNumber + processingUsersNumber;
System.out.println("Remaining users: " + count);

if (count == 0) {
break;
Expand Down
Loading
Loading