Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multithreaded bulk import (#1077) #1079

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ highlighting the necessary changes)
- If no such branch exists, then create one from the latest released branch.
- [ ] If added a foreign key constraint on `app_id_to_user_id` table, make sure to delete from this table when deleting
the user as well if `deleteUserIdMappingToo` is false.
- [ ] If added a new recipe, then make sure to update the bulk import API to include the new recipe.

## Remaining TODOs for this PR

Expand Down
40 changes: 40 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,46 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [9.4.0]

### Added
- Adds property `bulk_migration_parallelism` for fine-tuning the worker threads number
- Adds APIs to bulk import users
- GET `/bulk-import/users`
- POST `/bulk-import/users`
- GET `/bulk-import/users/count`
- POST `/bulk-import/users/remove`
- POST `/bulk-import/users/import`
- POST `/bulk-import/backgroundjob`
- GET `/bulk-import/backgroundjob`
- Adds `ProcessBulkImportUsers` cron job to process bulk import users
- Adds multithreaded worker support for the `ProcessBulkImportUsers` cron job for faster bulk imports
- Adds support for lazy importing users

### Migrations

```sql
"CREATE TABLE IF NOT EXISTS bulk_import_users (
id CHAR(36),
app_id VARCHAR(64) NOT NULL DEFAULT 'public',
primary_user_id VARCHAR(36),
raw_data TEXT NOT NULL,
status VARCHAR(128) DEFAULT 'NEW',
error_msg TEXT,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL,
CONSTRAINT bulk_import_users_pkey PRIMARY KEY(app_id, id),
CONSTRAINT bulk_import_users__app_id_fkey FOREIGN KEY(app_id) REFERENCES apps(app_id) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS bulk_import_users_status_updated_at_index ON bulk_import_users (app_id, status, updated_at);

CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index1 ON bulk_import_users (app_id, status, created_at DESC,
id DESC);

CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index2 ON bulk_import_users (app_id, created_at DESC, id DESC);
```

## [9.3.1]

- Includes exception class name in 500 error message
Expand Down
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ compileTestJava { options.encoding = "UTF-8" }
// }
//}

version = "9.3.1"

version = "9.4.0"

repositories {
mavenCentral()
Expand Down
4 changes: 4 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,7 @@ core_config_version: 0

# (Optional | Default: null) string value. The encryption key used for saving OAuth client secret on the database.
# oauth_client_secret_encryption_key:

# (DIFFERENT_ACROSS_APPS | OPTIONAL | Default: number of available processor cores) int value. If specified,
# the supertokens core will use the specified number of threads to complete the migration of users.
# bulk_migration_parallelism:
5 changes: 5 additions & 0 deletions devConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,8 @@ disable_telemetry: true

# (Optional | Default: null) string value. The encryption key used for saving OAuth client secret on the database.
# oauth_client_secret_encryption_key:

# (DIFFERENT_ACROSS_APPS | OPTIONAL | Default: number of available processor cores) int value. If specified,
# the supertokens core will use the specified number of threads to complete the migration of users.
# bulk_migration_parallelism:

6 changes: 6 additions & 0 deletions src/main/java/io/supertokens/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.supertokens.config.Config;
import io.supertokens.config.CoreConfig;
import io.supertokens.cronjobs.Cronjobs;
import io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers;
import io.supertokens.cronjobs.cleanupOAuthSessionsAndChallenges.CleanupOAuthSessionsAndChallenges;
import io.supertokens.cronjobs.deleteExpiredAccessTokenSigningKeys.DeleteExpiredAccessTokenSigningKeys;
import io.supertokens.cronjobs.deleteExpiredDashboardSessions.DeleteExpiredDashboardSessions;
Expand Down Expand Up @@ -61,6 +62,8 @@ public class Main {

// this is a special variable that will be set to true by TestingProcessManager
public static boolean isTesting = false;
// this flag is used in ProcessBulkImportUsersCronJobTest to skip the user validation
public static boolean isTesting_skipBulkImportUserValidationInCronJob = false;

// this is a special variable that will be set to true by TestingProcessManager
public static boolean makeConsolePrintSilent = false;
Expand Down Expand Up @@ -257,6 +260,9 @@ private void init() throws IOException, StorageQueryException {
// starts DeleteExpiredAccessTokenSigningKeys cronjob if the access token signing keys can change
Cronjobs.addCronjob(this, DeleteExpiredAccessTokenSigningKeys.init(this, uniqueUserPoolIdsTenants));

// initializes ProcessBulkImportUsers cronjob to process bulk import users
Cronjobs.addCronjob(this, ProcessBulkImportUsers.init(this, uniqueUserPoolIdsTenants));

Cronjobs.addCronjob(this, CleanupOAuthSessionsAndChallenges.init(this, uniqueUserPoolIdsTenants));

// this is to ensure tenantInfos are in sync for the new cron job as well
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens;

import io.supertokens.pluginInterface.Storage;
import io.supertokens.pluginInterface.useridmapping.UserIdMapping;

public class StorageAndUserIdMappingForBulkImport extends StorageAndUserIdMapping {

public String userIdInQuestion;

public StorageAndUserIdMappingForBulkImport(Storage storage,
UserIdMapping userIdMapping, String userIdInQuestion) {
super(storage, userIdMapping);
this.userIdInQuestion = userIdInQuestion;
}
}
Loading
Loading