From 8a9e8726757ab4047ad76b0642c9f902ff4f728e Mon Sep 17 00:00:00 2001 From: exAspArk Date: Thu, 4 Apr 2024 10:22:07 -0400 Subject: [PATCH] Add more indexes, convert transactionId to bigint, fetch with a timeout --- core/src/entities/Change.ts | 9 ++-- core/src/ingestion.ts | 19 ++++--- .../migrations/.snapshot-db_799e557d9277.json | 52 +++++++++++++------ .../Migration20240322153919_index_table.ts | 13 +++++ ...240322173541_index_operation_and_unique.ts | 17 ++++++ ...ation20240322174908_return_unique_index.ts | 15 ++++++ ...20240401141659_transaction_id_to_bigint.ts | 13 +++++ docs/docs/postgresql/destination-database.md | 2 +- 8 files changed, 111 insertions(+), 29 deletions(-) create mode 100644 core/src/migrations/Migration20240322153919_index_table.ts create mode 100644 core/src/migrations/Migration20240322173541_index_operation_and_unique.ts create mode 100644 core/src/migrations/Migration20240322174908_return_unique_index.ts create mode 100644 core/src/migrations/Migration20240401141659_transaction_id_to_bigint.ts diff --git a/core/src/entities/Change.ts b/core/src/entities/Change.ts index 6b1d016..b45afbf 100644 --- a/core/src/entities/Change.ts +++ b/core/src/entities/Change.ts @@ -12,12 +12,15 @@ export enum Operation { @Entity({ tableName: 'changes' }) -@Index({ properties: ['primaryKey'] }) @Index({ properties: ['committedAt'] }) +@Index({ properties: ['table'] }) +@Index({ properties: ['primaryKey'] }) +@Index({ properties: ['operation'] }) @Index({ properties: ['context'], type: 'GIN' }) @Index({ properties: ['before'], type: 'GIN' }) @Index({ properties: ['after'], type: 'GIN' }) -@Unique({ properties: ['position', 'operation', 'table', 'schema', 'database'] }) +// There could be CREATE and DELETE changes with the same position without primary key +@Unique({ properties: ['position', 'table', 'schema', 'database', 'operation'] }) export class Change extends BaseEntity { @Property({ nullable: true }) @@ -50,7 +53,7 @@ export class Change extends BaseEntity { @Property() queuedAt: Date; - @Property({ type: 'integer' }) + @Property({ type: 'bigint' }) transactionId: number; @Property({ type: 'bigint' }) diff --git a/core/src/ingestion.ts b/core/src/ingestion.ts index 9792cf6..cbb4f0e 100644 --- a/core/src/ingestion.ts +++ b/core/src/ingestion.ts @@ -9,6 +9,7 @@ import { ChangeMessagesBuffer } from './change-message-buffer' import { stitchChangeMessages } from './stitching' const INSERT_INTERVAL_MS = 1000 // 1 second to avoid overwhelming the database +const FETCH_EXPIRES_MS = 30_000 // 30 seconds, default const sleep = (ms: number) => ( new Promise(resolve => setTimeout(resolve, ms)) @@ -20,7 +21,7 @@ const chunk = (array: T[], size: number): T[][] => ( ) ) -const persistChangeMessages = async ( +const persistChangeMessages = ( { orm, changeMessages, insertBatchSize }: { orm: MikroORM, changeMessages: ChangeMessage[], insertBatchSize: number } ) => { @@ -39,11 +40,11 @@ const fetchMessages = async ( const messageBySequence: { [sequence: number]: Message } = {} let pendingMessageCount = 0 - const iterator = await consumer.fetch({ max_messages: fetchBatchSize }); + const iterator = await consumer.fetch({ max_messages: fetchBatchSize, expires: FETCH_EXPIRES_MS }); for await (const message of iterator) { const { streamSequence, pending } = message.info; - logger.debug(`Stream sequence: ${streamSequence}, pending: ${pending}`) + logger.debug(`Fetched stream sequence: ${streamSequence}, pending: ${pending}`) pendingMessageCount = pending @@ -51,9 +52,6 @@ const fetchMessages = async ( if (!lastStreamSequence || lastStreamSequence < streamSequence) { messageBySequence[streamSequence] = message } - - // Exhausted the batch - if (pendingMessageCount === 0) break; } return { messageBySequence, pendingMessageCount } @@ -113,8 +111,13 @@ export const runIngestionLoop = async ( // Persisting and acking if (stitchedChangeMessages.length) { - await persistChangeMessages({ orm, changeMessages: stitchedChangeMessages, insertBatchSize }) - await orm.em.flush() + persistChangeMessages({ orm, changeMessages: stitchedChangeMessages, insertBatchSize }) + try { + await orm.em.flush() + } catch (e) { + logger.info(`Error while flushing: ${e}`) + throw e + } } if (ackStreamSequence) { logger.debug(`Acking ${ackStreamSequence}...`) diff --git a/core/src/migrations/.snapshot-db_799e557d9277.json b/core/src/migrations/.snapshot-db_799e557d9277.json index 622a30c..08d6e3a 100644 --- a/core/src/migrations/.snapshot-db_799e557d9277.json +++ b/core/src/migrations/.snapshot-db_799e557d9277.json @@ -35,6 +35,16 @@ "nullable": true, "mappedType": "string" }, + "before": { + "name": "before", + "type": "jsonb", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "default": "'{}'", + "mappedType": "json" + }, "after": { "name": "after", "type": "jsonb", @@ -120,12 +130,12 @@ }, "transaction_id": { "name": "transaction_id", - "type": "int", + "type": "bigint", "unsigned": false, "autoincrement": false, "primary": false, "nullable": false, - "mappedType": "integer" + "mappedType": "bigint" }, "position": { "name": "position", @@ -135,16 +145,6 @@ "primary": false, "nullable": false, "mappedType": "bigint" - }, - "before": { - "name": "before", - "type": "jsonb", - "unsigned": false, - "autoincrement": false, - "primary": false, - "nullable": false, - "default": "'{}'", - "mappedType": "json" } }, "name": "changes", @@ -181,9 +181,9 @@ "type": "GIN" }, { - "keyName": "changes_committed_at_index", + "keyName": "changes_operation_index", "columnNames": [ - "committed_at" + "operation" ], "composite": false, "primary": false, @@ -199,13 +199,31 @@ "unique": false }, { - "keyName": "changes_position_operation_table_schema_database_unique", + "keyName": "changes_table_index", + "columnNames": [ + "table" + ], + "composite": false, + "primary": false, + "unique": false + }, + { + "keyName": "changes_committed_at_index", + "columnNames": [ + "committed_at" + ], + "composite": false, + "primary": false, + "unique": false + }, + { + "keyName": "changes_position_table_schema_database_operation_unique", "columnNames": [ "position", - "operation", "table", "schema", - "database" + "database", + "operation" ], "composite": true, "primary": false, diff --git a/core/src/migrations/Migration20240322153919_index_table.ts b/core/src/migrations/Migration20240322153919_index_table.ts new file mode 100644 index 0000000..a96652d --- /dev/null +++ b/core/src/migrations/Migration20240322153919_index_table.ts @@ -0,0 +1,13 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20240322153919_index_table extends Migration { + + async up(): Promise { + this.addSql('CREATE INDEX IF NOT EXISTS "changes_table_index" ON "changes" ("table");'); + } + + async down(): Promise { + this.addSql('drop index "changes_table_index";'); + } + +} diff --git a/core/src/migrations/Migration20240322173541_index_operation_and_unique.ts b/core/src/migrations/Migration20240322173541_index_operation_and_unique.ts new file mode 100644 index 0000000..2624540 --- /dev/null +++ b/core/src/migrations/Migration20240322173541_index_operation_and_unique.ts @@ -0,0 +1,17 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20240322173541_index_operation_and_unique extends Migration { + + async up(): Promise { + this.addSql('alter table "changes" drop constraint "changes_position_operation_table_schema_database_unique";'); + this.addSql('create index "changes_operation_index" on "changes" ("operation");'); + this.addSql('alter table "changes" add constraint "changes_position_table_schema_database_unique" unique ("position", "table", "schema", "database");'); + } + + async down(): Promise { + this.addSql('drop index "changes_operation_index";'); + this.addSql('alter table "changes" drop constraint "changes_position_table_schema_database_unique";'); + this.addSql('alter table "changes" add constraint "changes_position_operation_table_schema_database_unique" unique ("position", "operation", "table", "schema", "database");'); + } + +} diff --git a/core/src/migrations/Migration20240322174908_return_unique_index.ts b/core/src/migrations/Migration20240322174908_return_unique_index.ts new file mode 100644 index 0000000..63fc890 --- /dev/null +++ b/core/src/migrations/Migration20240322174908_return_unique_index.ts @@ -0,0 +1,15 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20240322174908_return_unique_index extends Migration { + + async up(): Promise { + this.addSql('alter table "changes" drop constraint "changes_position_table_schema_database_unique";'); + this.addSql('alter table "changes" add constraint "changes_position_table_schema_database_operation_unique" unique ("position", "table", "schema", "database", "operation");'); + } + + async down(): Promise { + this.addSql('alter table "changes" drop constraint "changes_position_table_schema_database_operation_unique";'); + this.addSql('alter table "changes" add constraint "changes_position_table_schema_database_unique" unique ("position", "table", "schema", "database");'); + } + +} diff --git a/core/src/migrations/Migration20240401141659_transaction_id_to_bigint.ts b/core/src/migrations/Migration20240401141659_transaction_id_to_bigint.ts new file mode 100644 index 0000000..128f6b6 --- /dev/null +++ b/core/src/migrations/Migration20240401141659_transaction_id_to_bigint.ts @@ -0,0 +1,13 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20240401141659_transaction_id_to_bigint extends Migration { + + async up(): Promise { + this.addSql('alter table "changes" alter column "transaction_id" type bigint using ("transaction_id"::bigint);'); + } + + async down(): Promise { + this.addSql('alter table "changes" alter column "transaction_id" type int using ("transaction_id"::int);'); + } + +} diff --git a/docs/docs/postgresql/destination-database.md b/docs/docs/postgresql/destination-database.md index 6c204ae..876ae24 100644 --- a/docs/docs/postgresql/destination-database.md +++ b/docs/docs/postgresql/destination-database.md @@ -28,7 +28,7 @@ Changes performed by creating, updating, or deleting each row are stored in a ta | `committed_at` | `timestamptz(0)` | When the record was changed | | `queued_at` | `timestamptz(0)` | When the changed record was ingested from WAL | | `created_at` | `timestamptz(0)` | When the change record was stored in the database | -| `transaction_id` | `integer` | PostgreSQL transaction ID | +| `transaction_id` | `bigint` | PostgreSQL transaction ID | | `position` | `bigint` | PostgreSQL WAL position | ## Querying Changes