Skip to content

Commit

Permalink
Refactor ld_transform with the ld_store idea.
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitri committed Jul 2, 2024
1 parent 7998f27 commit bdb7593
Show file tree
Hide file tree
Showing 13 changed files with 909 additions and 318 deletions.
20 changes: 16 additions & 4 deletions src/bin/pgcopydb/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,18 +420,18 @@ static char *targetDBcreateDDLs[] = {
static char *replayDBcreateDDLs[] = {
"create table output("
" id integer primary key, "
" action text, xid integer, lsn pg_lsn, timestamp text, "
" action text, xid integer, lsn integer, timestamp text, "
" message text)",

"create unique index o_a_lsn on output(action, lsn)",
"create unique index o_a_xid on output(action, xid)",
"create index o_a_xid on output(action, xid)",

"create table stmt(hash text primary key, sql text)",
"create unique index stmt_hash on stmt(hash)",

"create table replay("
" id integer primary key, "
" action text, xid integer, lsn pg_lsn, timestamp text, "
" action text, xid integer, lsn integer, endlsn integer, timestamp text, "
" stmt_hash text references stmt(hash), stmt_args jsonb)",

"create index r_xid on replay(xid)",
Expand Down Expand Up @@ -7904,7 +7904,19 @@ catalog_sql_execute(SQLiteQuery *query)
if (rc != SQLITE_ROW)
{
log_error("Failed to step through statement: %s", query->sql);
log_error("[SQLite %d] %s", rc, sqlite3_errstr(rc));

int offset = sqlite3_error_offset(query->db);

if (offset != -1)
{
/* "Failed to step through statement: %s" is 34 chars of prefix */
log_error("%34s%*s^", " ", offset, " ");
}

log_error("[SQLite %d: %s]: %s",
rc,
sqlite3_errstr(rc),
sqlite3_errmsg(query->db));

(void) sqlite3_clear_bindings(query->ppStmt);
(void) sqlite3_finalize(query->ppStmt);
Expand Down
18 changes: 12 additions & 6 deletions src/bin/pgcopydb/cli_sentinel.c
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ cli_sentinel_get(int argc, char **argv)
char startpos[PG_LSN_MAXLENGTH] = { 0 };
char endpos[PG_LSN_MAXLENGTH] = { 0 };
char write_lsn[PG_LSN_MAXLENGTH] = { 0 };
char transform_lsn[PG_LSN_MAXLENGTH] = { 0 };
char flush_lsn[PG_LSN_MAXLENGTH] = { 0 };
char replay_lsn[PG_LSN_MAXLENGTH] = { 0 };

Expand All @@ -625,6 +626,8 @@ cli_sentinel_get(int argc, char **argv)
LSN_FORMAT_ARGS(sentinel.endpos));
sformat(write_lsn, PG_LSN_MAXLENGTH, "%X/%X",
LSN_FORMAT_ARGS(sentinel.write_lsn));
sformat(transform_lsn, PG_LSN_MAXLENGTH, "%X/%X",
LSN_FORMAT_ARGS(sentinel.transform_lsn));
sformat(flush_lsn, PG_LSN_MAXLENGTH, "%X/%X",
LSN_FORMAT_ARGS(sentinel.flush_lsn));
sformat(replay_lsn, PG_LSN_MAXLENGTH, "%X/%X",
Expand All @@ -634,6 +637,7 @@ cli_sentinel_get(int argc, char **argv)
json_object_set_string(jsobj, "endpos", startpos);
json_object_set_boolean(jsobj, "apply", sentinel.apply);
json_object_set_string(jsobj, "write_lsn", write_lsn);
json_object_set_string(jsobj, "transform_lsn", transform_lsn);
json_object_set_string(jsobj, "flush_lsn", flush_lsn);
json_object_set_string(jsobj, "replay_lsn", replay_lsn);

Expand All @@ -645,17 +649,19 @@ cli_sentinel_get(int argc, char **argv)
}
else
{
fformat(stdout, "%-10s %X/%X\n", "startpos",
fformat(stdout, "%-15s %X/%X\n", "startpos",
LSN_FORMAT_ARGS(sentinel.startpos));
fformat(stdout, "%-10s %X/%X\n", "endpos",
fformat(stdout, "%-15s %X/%X\n", "endpos",
LSN_FORMAT_ARGS(sentinel.endpos));
fformat(stdout, "%-10s %s\n", "apply",
fformat(stdout, "%-15s %s\n", "apply",
sentinel.apply ? "enabled" : "disabled");
fformat(stdout, "%-10s %X/%X\n", "write_lsn",
fformat(stdout, "%-15s %X/%X\n", "write_lsn",
LSN_FORMAT_ARGS(sentinel.write_lsn));
fformat(stdout, "%-10s %X/%X\n", "flush_lsn",
fformat(stdout, "%-15s %X/%X\n", "transform_lsn",
LSN_FORMAT_ARGS(sentinel.transform_lsn));
fformat(stdout, "%-15s %X/%X\n", "flush_lsn",
LSN_FORMAT_ARGS(sentinel.flush_lsn));
fformat(stdout, "%-10s %X/%X\n", "replay_lsn",
fformat(stdout, "%-15s %X/%X\n", "replay_lsn",
LSN_FORMAT_ARGS(sentinel.replay_lsn));
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/bin/pgcopydb/ld_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ stream_apply_catchup(StreamSpecs *specs)
{
StreamApplyContext context = { 0 };

return true;

if (!stream_apply_setup(specs, &context))
{
log_error("Failed to setup for catchup, see above for details");
Expand Down
Loading

0 comments on commit bdb7593

Please sign in to comment.