Skip to content

Commit

Permalink
chore: bulkimport changelog (#1082)
Browse files Browse the repository at this point in the history
* chore: update changelog for bulkimport

* fix: fixing tests for mongo

* fix: fixing tests

---------

Co-authored-by: Sattvik Chakravarthy <[email protected]>
  • Loading branch information
tamassoltesz and sattvikc authored Dec 20, 2024
1 parent 409dbc2 commit ccaf20c
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 124 deletions.
26 changes: 24 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
- GET `/bulk-import/users/count`
- POST `/bulk-import/users/remove`
- POST `/bulk-import/users/import`
- POST `/bulk-import/backgroundjob`
- GET `/bulk-import/backgroundjob`
- Adds `ProcessBulkImportUsers` cron job to process bulk import users
- Adds multithreaded worker support for the `ProcessBulkImportUsers` cron job for faster bulk imports
- Adds support for lazy importing users

### Migrations

For PostgreSQL, run the following SQL script:
```sql
"CREATE TABLE IF NOT EXISTS bulk_import_users (
id CHAR(36),
Expand All @@ -47,6 +46,29 @@ CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index1 ON bulk_import_us
CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index2 ON bulk_import_users (app_id, created_at DESC, id DESC);
```
For MySQL run the following SQL script:
```sql
CREATE TABLE IF NOT EXISTS bulk_import_users (
id CHAR(36),
app_id VARCHAR(64) NOT NULL DEFAULT 'public',
primary_user_id VARCHAR(36),
raw_data TEXT NOT NULL,
status VARCHAR(128) DEFAULT 'NEW',
error_msg TEXT,
created_at BIGINT UNSIGNED NOT NULL,
updated_at BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (app_id, id),
FOREIGN KEY(app_id) REFERENCES apps(app_id) ON DELETE CASCADE
);
CREATE INDEX bulk_import_users_status_updated_at_index ON bulk_import_users (app_id, status, updated_at);
CREATE INDEX bulk_import_users_pagination_index1 ON bulk_import_users (app_id, status, created_at DESC,
id DESC);
CREATE INDEX bulk_import_users_pagination_index2 ON bulk_import_users (app_id, created_at DESC, id DESC);
```
## [9.3.1]
- Includes exception class name in 500 error message
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/io/supertokens/inmemorydb/QueryExecutorTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public interface QueryExecutorTemplate {

Expand All @@ -44,6 +45,25 @@ public static <T> T execute(Connection con, String QUERY, PreparedStatementValue
}
}

static void executeBatch(Connection connection, String QUERY, List<PreparedStatementValueSetter> setters)
throws SQLException, StorageQueryException {
assert setters != null;
assert !setters.isEmpty();
try (PreparedStatement pst = connection.prepareStatement(QUERY)) {
int counter = 0;
for(PreparedStatementValueSetter setter: setters) {
setter.setValues(pst);
pst.addBatch();
counter++;

if(counter % 100 == 0) {
pst.executeBatch();
}
}
pst.executeBatch(); //for the possible remaining ones
}
}

public static int update(Start start, String QUERY, PreparedStatementValueSetter setter)
throws SQLException, StorageQueryException {
try (Connection con = ConnectionPool.getConnection(start)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.supertokens.inmemorydb.queries;

import io.supertokens.inmemorydb.ConnectionWithLocks;
import io.supertokens.inmemorydb.PreparedStatementValueSetter;
import io.supertokens.inmemorydb.Start;
import io.supertokens.inmemorydb.Utils;
import io.supertokens.inmemorydb.config.Config;
Expand All @@ -29,13 +30,11 @@
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;

import static io.supertokens.inmemorydb.QueryExecutorTemplate.execute;
import static io.supertokens.inmemorydb.QueryExecutorTemplate.update;
import static io.supertokens.inmemorydb.QueryExecutorTemplate.*;
import static io.supertokens.inmemorydb.config.Config.getConfig;
import static java.lang.System.currentTimeMillis;

Expand Down Expand Up @@ -109,29 +108,25 @@ public static void updateMultipleUsersIsEmailVerified_Transaction(Start start, C
boolean isEmailVerified)
throws SQLException, StorageQueryException {

String QUERY;
if (isEmailVerified) {
String QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
+ "(app_id, user_id, email) VALUES(?, ?, ?)";
PreparedStatement insertQuery = con.prepareStatement(QUERY);
for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
insertQuery.setString(1, appIdentifier.getAppId());
insertQuery.setString(2, emailToUser.getValue());
insertQuery.setString(3, emailToUser.getKey());
insertQuery.addBatch();
}
insertQuery.executeBatch();
} else {
String QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
+ " WHERE app_id = ? AND user_id = ? AND email = ?";
PreparedStatement deleteQuery = con.prepareStatement(QUERY);
for (Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()) {
deleteQuery.setString(1, appIdentifier.getAppId());
deleteQuery.setString(2, emailToUser.getValue());
deleteQuery.setString(3, emailToUser.getKey());
deleteQuery.addBatch();
}
deleteQuery.executeBatch();
}

List<PreparedStatementValueSetter> setters = new ArrayList<>();
for (Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()) {
setters.add(pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, emailToUser.getValue());
pst.setString(3, emailToUser.getKey());
});
}

executeBatch(con, QUERY, setters);
}

public static void deleteAllEmailVerificationTokensForUser_Transaction(Start start, Connection con,
Expand Down
97 changes: 43 additions & 54 deletions src/main/java/io/supertokens/inmemorydb/queries/GeneralQueries.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package io.supertokens.inmemorydb.queries;

import io.supertokens.Main;
import io.supertokens.inmemorydb.ConnectionPool;
import io.supertokens.inmemorydb.ConnectionWithLocks;
import io.supertokens.inmemorydb.Start;
import io.supertokens.inmemorydb.Utils;
import io.supertokens.inmemorydb.*;
import io.supertokens.inmemorydb.config.Config;
import io.supertokens.pluginInterface.KeyValueInfo;
import io.supertokens.pluginInterface.RECIPE_ID;
Expand All @@ -35,15 +32,17 @@
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

import java.sql.*;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;

import static io.supertokens.ProcessState.PROCESS_STATE.CREATING_NEW_TABLE;
import static io.supertokens.ProcessState.getInstance;
import static io.supertokens.inmemorydb.PreparedStatementValueSetter.NO_OP_SETTER;
import static io.supertokens.inmemorydb.QueryExecutorTemplate.execute;
import static io.supertokens.inmemorydb.QueryExecutorTemplate.update;
import static io.supertokens.inmemorydb.QueryExecutorTemplate.*;
import static io.supertokens.inmemorydb.config.Config.getConfig;
import static io.supertokens.inmemorydb.queries.EmailPasswordQueries.getQueryToCreatePasswordResetTokenExpiryIndex;
import static io.supertokens.inmemorydb.queries.EmailPasswordQueries.getQueryToCreatePasswordResetTokensTable;
Expand Down Expand Up @@ -966,26 +965,21 @@ public static void makePrimaryUsers_Transaction(Start start, Connection sqlCon,
String appid_to_userid_update_QUERY = "UPDATE " + getConfig(start).getAppIdToUserIdTable() +
" SET is_linked_or_is_a_primary_user = true WHERE app_id = ? AND user_id = ?";

PreparedStatement usersUpdateStatement = sqlCon.prepareStatement(users_update_QUERY);
PreparedStatement appIdToUserIdUpdateStatement = sqlCon.prepareStatement(appid_to_userid_update_QUERY);
int counter = 0;
for(String userId: userIds){
usersUpdateStatement.setString(1, appIdentifier.getAppId());
usersUpdateStatement.setString(2, userId);
usersUpdateStatement.addBatch();

appIdToUserIdUpdateStatement.setString(1, appIdentifier.getAppId());
appIdToUserIdUpdateStatement.setString(2, userId);
appIdToUserIdUpdateStatement.addBatch();

counter++;
if(counter % 100 == 0) {
usersUpdateStatement.executeBatch();
appIdToUserIdUpdateStatement.executeBatch();
}
List<PreparedStatementValueSetter> usersSetter = new ArrayList<>();
List<PreparedStatementValueSetter> appIdToUserIdSetter = new ArrayList<>();

for(String userId: userIds) {
usersSetter.add(pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, userId);
});
appIdToUserIdSetter.add(pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, userId);
});
}
usersUpdateStatement.executeBatch();
appIdToUserIdUpdateStatement.executeBatch();
executeBatch(sqlCon, users_update_QUERY, usersSetter);
executeBatch(sqlCon, appid_to_userid_update_QUERY, appIdToUserIdSetter);
}

public static void linkAccounts_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier,
Expand Down Expand Up @@ -1034,34 +1028,27 @@ public static void linkMultipleAccounts_Transaction(Start start, Connection sqlC
" SET is_linked_or_is_a_primary_user = true, primary_or_recipe_user_id = ? WHERE app_id = ? AND " +
"user_id = ?";

PreparedStatement updateUsers = sqlCon.prepareStatement(update_users_QUERY);
PreparedStatement updateAppIdToUserId = sqlCon.prepareStatement(update_appid_to_userid_QUERY);
List<PreparedStatementValueSetter> updateUsersSetter = new ArrayList<>();
List<PreparedStatementValueSetter> updateAppIdToUserIdSetter = new ArrayList<>();

int counter = 0;
for(Map.Entry<String, String> linkEntry : recipeUserIdToPrimaryUserId.entrySet()) {
String primaryUserId = linkEntry.getValue();
String recipeUserId = linkEntry.getKey();

updateUsers.setString(1, primaryUserId);
updateUsers.setString(2, appIdentifier.getAppId());
updateUsers.setString(3, recipeUserId);
updateUsers.addBatch();

updateAppIdToUserId.setString(1, primaryUserId);
updateAppIdToUserId.setString(2, appIdentifier.getAppId());
updateAppIdToUserId.setString(3, recipeUserId);
updateAppIdToUserId.addBatch();

counter++;
if (counter % 100 == 0) {
updateUsers.executeBatch();
updateAppIdToUserId.executeBatch();
}
updateUsersSetter.add(pst -> {
pst.setString(1, primaryUserId);
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, recipeUserId);
});
updateUsersSetter.add(pst -> {
pst.setString(1, primaryUserId);
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, recipeUserId);
});
}

updateUsers.executeBatch();
updateAppIdToUserId.executeBatch();

executeBatch(sqlCon, update_users_QUERY, updateUsersSetter);
executeBatch(sqlCon, update_appid_to_userid_QUERY, updateAppIdToUserIdSetter);
updateTimeJoinedForPrimaryUsers_Transaction(start, sqlCon, appIdentifier,
new ArrayList<>(recipeUserIdToPrimaryUserId.values()));
}
Expand Down Expand Up @@ -1764,16 +1751,18 @@ public static void updateTimeJoinedForPrimaryUsers_Transaction(Start start, Conn
" SET primary_or_recipe_user_time_joined = (SELECT MIN(time_joined) FROM " +
getConfig(start).getUsersTable() + " WHERE app_id = ? AND primary_or_recipe_user_id = ?) WHERE " +
" app_id = ? AND primary_or_recipe_user_id = ?";
PreparedStatement updateStatement = sqlCon.prepareStatement(QUERY);

List<PreparedStatementValueSetter> setters = new ArrayList<>();
for(String primaryUserId : primaryUserIds) {
updateStatement.setString(1, appIdentifier.getAppId());
updateStatement.setString(2, primaryUserId);
updateStatement.setString(3, appIdentifier.getAppId());
updateStatement.setString(4, primaryUserId);
updateStatement.addBatch();
setters.add(pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, primaryUserId);
pst.setString(3, appIdentifier.getAppId());
pst.setString(4, primaryUserId);
});
}

updateStatement.executeBatch();
executeBatch(sqlCon, QUERY, setters);
}

private static class AllAuthRecipeUsersResultHolder {
Expand Down
14 changes: 7 additions & 7 deletions src/test/java/io/supertokens/test/CronjobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ public void testThatReAddingSameCronTaskDoesNotScheduleMoreExecutors() throws Ex

Thread.sleep(5000);
assertTrue(CounterCronJob.getInstance(process.getProcess()).getCount() > 3 &&
CounterCronJob.getInstance(process.getProcess()).getCount() < 8);
CounterCronJob.getInstance(process.getProcess()).getCount() < 10);

process.kill();
assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STOPPED));
Expand Down Expand Up @@ -964,7 +964,7 @@ public void testThatCronJobsHaveTenantsInfoAfterRestart() throws Exception {
{
List<List<List<TenantIdentifier>>> tenantsInfos = Cronjobs.getInstance(process.getProcess())
.getTenantInfos();
assertEquals(11, tenantsInfos.size());
assertEquals(12, tenantsInfos.size());
int count = 0;
for (List<List<TenantIdentifier>> tenantsInfo : tenantsInfos) {
if (tenantsInfo != null) {
Expand All @@ -974,7 +974,7 @@ public void testThatCronJobsHaveTenantsInfoAfterRestart() throws Exception {
count++;
}
}
assertEquals(10, count);
assertEquals(11, count);
}

process.kill(false);
Expand All @@ -991,7 +991,7 @@ public void testThatCronJobsHaveTenantsInfoAfterRestart() throws Exception {
{
List<List<List<TenantIdentifier>>> tenantsInfos = Cronjobs.getInstance(process.getProcess())
.getTenantInfos();
assertEquals(11, tenantsInfos.size());
assertEquals(12, tenantsInfos.size());
int count = 0;
for (List<List<TenantIdentifier>> tenantsInfo : tenantsInfos) {
if (tenantsInfo != null) {
Expand All @@ -1001,7 +1001,7 @@ public void testThatCronJobsHaveTenantsInfoAfterRestart() throws Exception {
count++;
}
}
assertEquals(10, count);
assertEquals(11, count);
}

process.kill();
Expand Down Expand Up @@ -1048,7 +1048,7 @@ public void testThatThereAreTasksOfAllCronTaskClassesAndHaveCorrectIntervals() t
intervals.put("io.supertokens.cronjobs.telemetry.Telemetry", 86400);
intervals.put("io.supertokens.cronjobs.deleteExpiredAccessTokenSigningKeys.DeleteExpiredAccessTokenSigningKeys",
86400);
intervals.put("io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers", 60);
intervals.put("io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers", 300);
intervals.put("io.supertokens.cronjobs.cleanupOAuthSessionsAndChallenges.CleanupOAuthSessionsAndChallenges",
86400);

Expand All @@ -1070,7 +1070,7 @@ public void testThatThereAreTasksOfAllCronTaskClassesAndHaveCorrectIntervals() t
0);

List<CronTask> allTasks = Cronjobs.getInstance(process.getProcess()).getTasks();
assertEquals(11, allTasks.size());
assertEquals(12, allTasks.size());

for (CronTask task : allTasks) {
assertEquals(intervals.get(task.getClass().getName()).intValue(), task.getIntervalTimeSeconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void beforeEach() {
public void testWithALotOfUsers() throws Exception {
Main main = startCronProcess("14");

int NUMBER_OF_USERS_TO_UPLOAD = 100000;
int NUMBER_OF_USERS_TO_UPLOAD = 10000;

if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) {
return;
Expand Down Expand Up @@ -115,6 +115,7 @@ public void testWithALotOfUsers() throws Exception {
int failedUsersNumber = loadBulkImportUsersCountWithStatus(main,
BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt();
count = newUsersNumber + processingUsersNumber;
System.out.println("Remaining users: " + count);

if (count == 0) {
break;
Expand Down
Loading

0 comments on commit ccaf20c

Please sign in to comment.