Skip to content

Commit

Permalink
Add more indexes, convert transactionId to bigint, fetch with a timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Apr 4, 2024
1 parent 47cb49a commit 8a9e872
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 29 deletions.
9 changes: 6 additions & 3 deletions core/src/entities/Change.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@ export enum Operation {

@Entity({ tableName: 'changes' })

@Index({ properties: ['primaryKey'] })
@Index({ properties: ['committedAt'] })
@Index({ properties: ['table'] })
@Index({ properties: ['primaryKey'] })
@Index({ properties: ['operation'] })
@Index({ properties: ['context'], type: 'GIN' })
@Index({ properties: ['before'], type: 'GIN' })
@Index({ properties: ['after'], type: 'GIN' })
@Unique({ properties: ['position', 'operation', 'table', 'schema', 'database'] })
// There could be CREATE and DELETE changes with the same position without primary key
@Unique({ properties: ['position', 'table', 'schema', 'database', 'operation'] })

export class Change extends BaseEntity {
@Property({ nullable: true })
Expand Down Expand Up @@ -50,7 +53,7 @@ export class Change extends BaseEntity {
@Property()
queuedAt: Date;

@Property({ type: 'integer' })
@Property({ type: 'bigint' })
transactionId: number;

@Property({ type: 'bigint' })
Expand Down
19 changes: 11 additions & 8 deletions core/src/ingestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { ChangeMessagesBuffer } from './change-message-buffer'
import { stitchChangeMessages } from './stitching'

const INSERT_INTERVAL_MS = 1000 // 1 second to avoid overwhelming the database
const FETCH_EXPIRES_MS = 30_000 // 30 seconds, default

const sleep = (ms: number) => (
new Promise(resolve => setTimeout(resolve, ms))
Expand All @@ -20,7 +21,7 @@ const chunk = <T>(array: T[], size: number): T[][] => (
)
)

const persistChangeMessages = async (
const persistChangeMessages = (
{ orm, changeMessages, insertBatchSize }:
{ orm: MikroORM<PostgreSqlDriver>, changeMessages: ChangeMessage[], insertBatchSize: number }
) => {
Expand All @@ -39,21 +40,18 @@ const fetchMessages = async (
const messageBySequence: { [sequence: number]: Message } = {}
let pendingMessageCount = 0

const iterator = await consumer.fetch({ max_messages: fetchBatchSize });
const iterator = await consumer.fetch({ max_messages: fetchBatchSize, expires: FETCH_EXPIRES_MS });

for await (const message of iterator) {
const { streamSequence, pending } = message.info;
logger.debug(`Stream sequence: ${streamSequence}, pending: ${pending}`)
logger.debug(`Fetched stream sequence: ${streamSequence}, pending: ${pending}`)

pendingMessageCount = pending

// Accumulate the batch
if (!lastStreamSequence || lastStreamSequence < streamSequence) {
messageBySequence[streamSequence] = message
}

// Exhausted the batch
if (pendingMessageCount === 0) break;
}

return { messageBySequence, pendingMessageCount }
Expand Down Expand Up @@ -113,8 +111,13 @@ export const runIngestionLoop = async (

// Persisting and acking
if (stitchedChangeMessages.length) {
await persistChangeMessages({ orm, changeMessages: stitchedChangeMessages, insertBatchSize })
await orm.em.flush()
persistChangeMessages({ orm, changeMessages: stitchedChangeMessages, insertBatchSize })
try {
await orm.em.flush()
} catch (e) {
logger.info(`Error while flushing: ${e}`)
throw e
}
}
if (ackStreamSequence) {
logger.debug(`Acking ${ackStreamSequence}...`)
Expand Down
52 changes: 35 additions & 17 deletions core/src/migrations/.snapshot-db_799e557d9277.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@
"nullable": true,
"mappedType": "string"
},
"before": {
"name": "before",
"type": "jsonb",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"default": "'{}'",
"mappedType": "json"
},
"after": {
"name": "after",
"type": "jsonb",
Expand Down Expand Up @@ -120,12 +130,12 @@
},
"transaction_id": {
"name": "transaction_id",
"type": "int",
"type": "bigint",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"mappedType": "integer"
"mappedType": "bigint"
},
"position": {
"name": "position",
Expand All @@ -135,16 +145,6 @@
"primary": false,
"nullable": false,
"mappedType": "bigint"
},
"before": {
"name": "before",
"type": "jsonb",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"default": "'{}'",
"mappedType": "json"
}
},
"name": "changes",
Expand Down Expand Up @@ -181,9 +181,9 @@
"type": "GIN"
},
{
"keyName": "changes_committed_at_index",
"keyName": "changes_operation_index",
"columnNames": [
"committed_at"
"operation"
],
"composite": false,
"primary": false,
Expand All @@ -199,13 +199,31 @@
"unique": false
},
{
"keyName": "changes_position_operation_table_schema_database_unique",
"keyName": "changes_table_index",
"columnNames": [
"table"
],
"composite": false,
"primary": false,
"unique": false
},
{
"keyName": "changes_committed_at_index",
"columnNames": [
"committed_at"
],
"composite": false,
"primary": false,
"unique": false
},
{
"keyName": "changes_position_table_schema_database_operation_unique",
"columnNames": [
"position",
"operation",
"table",
"schema",
"database"
"database",
"operation"
],
"composite": true,
"primary": false,
Expand Down
13 changes: 13 additions & 0 deletions core/src/migrations/Migration20240322153919_index_table.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Migration } from '@mikro-orm/migrations';

export class Migration20240322153919_index_table extends Migration {

async up(): Promise<void> {
this.addSql('CREATE INDEX IF NOT EXISTS "changes_table_index" ON "changes" ("table");');
}

async down(): Promise<void> {
this.addSql('drop index "changes_table_index";');
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Migration } from '@mikro-orm/migrations';

export class Migration20240322173541_index_operation_and_unique extends Migration {

async up(): Promise<void> {
this.addSql('alter table "changes" drop constraint "changes_position_operation_table_schema_database_unique";');
this.addSql('create index "changes_operation_index" on "changes" ("operation");');
this.addSql('alter table "changes" add constraint "changes_position_table_schema_database_unique" unique ("position", "table", "schema", "database");');
}

async down(): Promise<void> {
this.addSql('drop index "changes_operation_index";');
this.addSql('alter table "changes" drop constraint "changes_position_table_schema_database_unique";');
this.addSql('alter table "changes" add constraint "changes_position_operation_table_schema_database_unique" unique ("position", "operation", "table", "schema", "database");');
}

}
15 changes: 15 additions & 0 deletions core/src/migrations/Migration20240322174908_return_unique_index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Migration } from '@mikro-orm/migrations';

export class Migration20240322174908_return_unique_index extends Migration {

async up(): Promise<void> {
this.addSql('alter table "changes" drop constraint "changes_position_table_schema_database_unique";');
this.addSql('alter table "changes" add constraint "changes_position_table_schema_database_operation_unique" unique ("position", "table", "schema", "database", "operation");');
}

async down(): Promise<void> {
this.addSql('alter table "changes" drop constraint "changes_position_table_schema_database_operation_unique";');
this.addSql('alter table "changes" add constraint "changes_position_table_schema_database_unique" unique ("position", "table", "schema", "database");');
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Migration } from '@mikro-orm/migrations';

export class Migration20240401141659_transaction_id_to_bigint extends Migration {

async up(): Promise<void> {
this.addSql('alter table "changes" alter column "transaction_id" type bigint using ("transaction_id"::bigint);');
}

async down(): Promise<void> {
this.addSql('alter table "changes" alter column "transaction_id" type int using ("transaction_id"::int);');
}

}
2 changes: 1 addition & 1 deletion docs/docs/postgresql/destination-database.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Changes performed by creating, updating, or deleting each row are stored in a ta
| `committed_at` | `timestamptz(0)` | When the record was changed |
| `queued_at` | `timestamptz(0)` | When the changed record was ingested from WAL |
| `created_at` | `timestamptz(0)` | When the change record was stored in the database |
| `transaction_id` | `integer` | PostgreSQL transaction ID |
| `transaction_id` | `bigint` | PostgreSQL transaction ID |
| `position` | `bigint` | PostgreSQL WAL position |

## Querying Changes
Expand Down

0 comments on commit 8a9e872

Please sign in to comment.