Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{WIP} Use SQLite as the file format for CDC streaming. #715

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/include/stream-sentinel-get.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
--apply Get only the apply value
--write-lsn Get only the write LSN value
--flush-lsn Get only the flush LSN value
--transform-lsn Get only the tranform LSN value
--replay-lsn Get only the replay LSN value

1 change: 0 additions & 1 deletion docs/include/stream-sentinel-set-endpos.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@
pgcopydb stream sentinel set endpos: Set the sentinel end position LSN
usage: pgcopydb stream sentinel set endpos [ --source ... ] [ <end lsn> | --current ]

--source Postgres URI to the source database
--current Use pg_current_wal_flush_lsn() as the endpos

122 changes: 116 additions & 6 deletions src/bin/pgcopydb/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,14 @@ static char *sourceDBcreateDDLs[] = {
"create table sentinel("
" id integer primary key check (id = 1), "
" startpos pg_lsn, endpos pg_lsn, apply bool, "
" write_lsn pg_lsn, flush_lsn pg_lsn, replay_lsn pg_lsn)",
" write_lsn pg_lsn, flush_lsn pg_lsn, "
" transform_lsn pg_lsn, "
" replay_lsn pg_lsn)",

"create table cdc_files("
" id integer primary key, filename text unique, timeline integer, "
" startpos pg_lsn, endpos pg_lsn, "
" start_time_epoch integer, done_time_epoch integer)",

"create table timeline_history("
" tli integer primary key, startpos pg_lsn, endpos pg_lsn)"
Expand Down Expand Up @@ -413,6 +420,28 @@ static char *targetDBcreateDDLs[] = {
};


static char *replayDBcreateDDLs[] = {
"create table output("
" id integer primary key, "
" action text, xid integer, lsn integer, timestamp text, "
" message text)",

"create unique index o_a_lsn on output(action, lsn)",
"create index o_a_xid on output(action, xid)",

"create table stmt(hash text primary key, sql text)",
"create unique index stmt_hash on stmt(hash)",

"create table replay("
" id integer primary key, "
" action text, xid integer, lsn integer, endlsn integer, timestamp text, "
" stmt_hash text references stmt(hash), stmt_args jsonb)",

"create index r_xid on replay(xid)",
"create index r_lsn on replay(lsn)",
};


static char *sourceDBdropDDLs[] = {
"drop table if exists setup",
"drop table if exists section",
Expand Down Expand Up @@ -480,6 +509,12 @@ static char *targetDBdropDDLs[] = {
};


static char *replayDBdropDDLs[] = {
"drop table if exists output",
"drop table if exists replay"
};


/*
* catalog_init_from_specs initializes our internal catalog database file from
* a specification.
Expand Down Expand Up @@ -804,6 +839,8 @@ catalog_init(DatabaseCatalog *catalog)

if (sqlite3_open(catalog->dbfile, &(catalog->db)) != SQLITE_OK)
{
/* ensure a db is NULL unless it's opened */
catalog->db = NULL;
log_error("Failed to open \"%s\": %s",
catalog->dbfile,
sqlite3_errmsg(catalog->db));
Expand Down Expand Up @@ -952,6 +989,13 @@ catalog_create_schema(DatabaseCatalog *catalog)
break;
}

case DATABASE_CATALOG_TYPE_REPLAY:
{
createDDLs = replayDBcreateDDLs;
count = sizeof(replayDBcreateDDLs) / sizeof(replayDBcreateDDLs[0]);
break;
}

default:
{
log_error("BUG: called catalog_init for unknown type %d",
Expand Down Expand Up @@ -1012,6 +1056,13 @@ catalog_drop_schema(DatabaseCatalog *catalog)
break;
}

case DATABASE_CATALOG_TYPE_REPLAY:
{
dropDDLs = replayDBdropDDLs;
count = sizeof(replayDBdropDDLs) / sizeof(replayDBdropDDLs[0]);
break;
}

default:
{
log_error("BUG: called catalog_drop_schema for unknown type %d",
Expand Down Expand Up @@ -8051,7 +8102,9 @@ catalog_sql_bind(SQLiteQuery *query, BindParam *params, int count)
{
if (!catalog_bind_parameters(query->db, query->ppStmt, params, count))
{
/* errors have already been logged */
log_error("[SQLite] Failed to bind parameters in query: %s",
query->sql);

(void) sqlite3_clear_bindings(query->ppStmt);
(void) sqlite3_finalize(query->ppStmt);
return false;
Expand Down Expand Up @@ -8097,7 +8150,19 @@ catalog_sql_execute(SQLiteQuery *query)
if (rc != SQLITE_DONE)
{
log_error("Failed to execute statement: %s", query->sql);
log_error("[SQLite %d] %s", rc, sqlite3_errstr(rc));

int offset = sqlite3_error_offset(query->db);

if (offset != -1)
{
/* "Failed to step through statement: %s" is 34 chars of prefix */
log_error("%34s%*s^", " ", offset, " ");
}

log_error("[SQLite %d: %s]: %s",
rc,
sqlite3_errstr(rc),
sqlite3_errmsg(query->db));

(void) sqlite3_clear_bindings(query->ppStmt);
(void) sqlite3_finalize(query->ppStmt);
Expand All @@ -8123,7 +8188,19 @@ catalog_sql_execute(SQLiteQuery *query)
if (rc != SQLITE_ROW)
{
log_error("Failed to step through statement: %s", query->sql);
log_error("[SQLite %d] %s", rc, sqlite3_errstr(rc));

int offset = sqlite3_error_offset(query->db);

if (offset != -1)
{
/* "Failed to step through statement: %s" is 34 chars of prefix */
log_error("%34s%*s^", " ", offset, " ");
}

log_error("[SQLite %d: %s]: %s",
rc,
sqlite3_errstr(rc),
sqlite3_errmsg(query->db));

(void) sqlite3_clear_bindings(query->ppStmt);
(void) sqlite3_finalize(query->ppStmt);
Expand All @@ -8144,7 +8221,19 @@ catalog_sql_execute(SQLiteQuery *query)
if (catalog_sql_step(query) != SQLITE_DONE)
{
log_error("Failed to execute statement: %s", query->sql);
log_error("[SQLite %d] %s", rc, sqlite3_errstr(rc));

int offset = sqlite3_error_offset(query->db);

if (offset != -1)
{
/* "Failed to step through statement: %s" is 34 chars of prefix */
log_error("%34s%*s^", " ", offset, " ");
}

log_error("[SQLite %d: %s]: %s",
rc,
sqlite3_errstr(rc),
sqlite3_errmsg(query->db));

(void) sqlite3_clear_bindings(query->ppStmt);
(void) sqlite3_finalize(query->ppStmt);
Expand Down Expand Up @@ -8206,7 +8295,7 @@ catalog_sql_step(SQLiteQuery *query)
int sleepTimeMs =
pgsql_compute_connection_retry_sleep_time(&retryPolicy);

log_sqlite("[SQLite %d]: %s, try again in %dms",
log_notice("[SQLite %d]: %s, try again in %dms",
rc,
sqlite3_errmsg(query->db),
sleepTimeMs);
Expand Down Expand Up @@ -8269,6 +8358,27 @@ catalog_bind_parameters(sqlite3 *db,

switch (p->type)
{
case BIND_PARAMETER_TYPE_NULL:
{
int rc = sqlite3_bind_null(ppStmt, n);

if (rc != SQLITE_OK)
{
log_error("[SQLite %d] Failed to bind \"%s\" to NULL: %s",
rc,
p->name,
sqlite3_errstr(rc));
return false;
}

if (logSQL)
{
appendPQExpBuffer(debugParameters, "%s", "null");
}

break;
}

case BIND_PARAMETER_TYPE_INT:
{
int rc = sqlite3_bind_int(ppStmt, n, p->intVal);
Expand Down
1 change: 1 addition & 0 deletions src/bin/pgcopydb/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ bool catalog_timeline_history_fetch(SQLiteQuery *query);
typedef enum
{
BIND_PARAMETER_TYPE_UNKNOWN = 0,
BIND_PARAMETER_TYPE_NULL,
BIND_PARAMETER_TYPE_INT,
BIND_PARAMETER_TYPE_INT64,
BIND_PARAMETER_TYPE_TEXT
Expand Down
2 changes: 2 additions & 0 deletions src/bin/pgcopydb/cli_clone_follow.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ clone_and_follow(CopyDataSpec *copySpecs)
copyDBoptions.endpos,
STREAM_MODE_CATCHUP,
&(copySpecs->catalogs.source),
&(copySpecs->catalogs.replay),
copyDBoptions.stdIn,
copyDBoptions.stdOut,
logSQL))
Expand Down Expand Up @@ -388,6 +389,7 @@ cli_follow(int argc, char **argv)
copyDBoptions.endpos,
STREAM_MODE_CATCHUP,
&(copySpecs.catalogs.source),
&(copySpecs.catalogs.replay),
copyDBoptions.stdIn,
copyDBoptions.stdOut,
logSQL))
Expand Down
30 changes: 23 additions & 7 deletions src/bin/pgcopydb/cli_sentinel.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ CommandLine sentinel_get_command =
" --apply Get only the apply value\n"
" --write-lsn Get only the write LSN value\n"
" --flush-lsn Get only the flush LSN value\n"
" --transform-lsn Get only the tranform LSN value\n"
" --replay-lsn Get only the replay LSN value\n",
cli_sentinel_getopts,
cli_sentinel_get);
Expand Down Expand Up @@ -127,6 +128,7 @@ cli_sentinel_getopts(int argc, char **argv)
{ "endpos", no_argument, NULL, 'e' },
{ "apply", no_argument, NULL, 'a' },
{ "write-lsn", no_argument, NULL, 'w' },
{ "transform-lsn", no_argument, NULL, 't' },
{ "flush-lsn", no_argument, NULL, 'f' },
{ "replay-lsn", no_argument, NULL, 'r' },
{ "current", no_argument, NULL, 'C' },
Expand Down Expand Up @@ -209,6 +211,14 @@ cli_sentinel_getopts(int argc, char **argv)
break;
}

case 't':
{
++sentinelOptionsCount;
options.sentinelOptions.transformLSN = true;
log_trace("--transform-lsn");
break;
}

case 'f':
{
++sentinelOptionsCount;
Expand Down Expand Up @@ -316,7 +326,7 @@ cli_sentinel_getopts(int argc, char **argv)
if (sentinelOptionsCount > 1)
{
log_fatal("Please choose only one of --startpos --endpos --apply "
"--write-lsn --flush-lsn --replay-lsn");
"--write-lsn --transform-lsn --flush-lsn --replay-lsn");
++errors;
}

Expand Down Expand Up @@ -653,6 +663,7 @@ cli_sentinel_get(int argc, char **argv)
char startpos[PG_LSN_MAXLENGTH] = { 0 };
char endpos[PG_LSN_MAXLENGTH] = { 0 };
char write_lsn[PG_LSN_MAXLENGTH] = { 0 };
char transform_lsn[PG_LSN_MAXLENGTH] = { 0 };
char flush_lsn[PG_LSN_MAXLENGTH] = { 0 };
char replay_lsn[PG_LSN_MAXLENGTH] = { 0 };

Expand All @@ -662,6 +673,8 @@ cli_sentinel_get(int argc, char **argv)
LSN_FORMAT_ARGS(sentinel.endpos));
sformat(write_lsn, PG_LSN_MAXLENGTH, "%X/%X",
LSN_FORMAT_ARGS(sentinel.write_lsn));
sformat(transform_lsn, PG_LSN_MAXLENGTH, "%X/%X",
LSN_FORMAT_ARGS(sentinel.transform_lsn));
sformat(flush_lsn, PG_LSN_MAXLENGTH, "%X/%X",
LSN_FORMAT_ARGS(sentinel.flush_lsn));
sformat(replay_lsn, PG_LSN_MAXLENGTH, "%X/%X",
Expand All @@ -671,6 +684,7 @@ cli_sentinel_get(int argc, char **argv)
json_object_set_string(jsobj, "endpos", startpos);
json_object_set_boolean(jsobj, "apply", sentinel.apply);
json_object_set_string(jsobj, "write_lsn", write_lsn);
json_object_set_string(jsobj, "transform_lsn", transform_lsn);
json_object_set_string(jsobj, "flush_lsn", flush_lsn);
json_object_set_string(jsobj, "replay_lsn", replay_lsn);

Expand All @@ -682,17 +696,19 @@ cli_sentinel_get(int argc, char **argv)
}
else
{
fformat(stdout, "%-10s %X/%X\n", "startpos",
fformat(stdout, "%-15s %X/%X\n", "startpos",
LSN_FORMAT_ARGS(sentinel.startpos));
fformat(stdout, "%-10s %X/%X\n", "endpos",
fformat(stdout, "%-15s %X/%X\n", "endpos",
LSN_FORMAT_ARGS(sentinel.endpos));
fformat(stdout, "%-10s %s\n", "apply",
fformat(stdout, "%-15s %s\n", "apply",
sentinel.apply ? "enabled" : "disabled");
fformat(stdout, "%-10s %X/%X\n", "write_lsn",
fformat(stdout, "%-15s %X/%X\n", "write_lsn",
LSN_FORMAT_ARGS(sentinel.write_lsn));
fformat(stdout, "%-10s %X/%X\n", "flush_lsn",
fformat(stdout, "%-15s %X/%X\n", "transform_lsn",
LSN_FORMAT_ARGS(sentinel.transform_lsn));
fformat(stdout, "%-15s %X/%X\n", "flush_lsn",
LSN_FORMAT_ARGS(sentinel.flush_lsn));
fformat(stdout, "%-10s %X/%X\n", "replay_lsn",
fformat(stdout, "%-15s %X/%X\n", "replay_lsn",
LSN_FORMAT_ARGS(sentinel.replay_lsn));
}
}
Expand Down
1 change: 1 addition & 0 deletions src/bin/pgcopydb/cli_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ cli_create_snapshot(int argc, char **argv)
createSNoptions.endpos,
STREAM_MODE_CATCHUP,
&(copySpecs.catalogs.source),
&(copySpecs.catalogs.replay),
createSNoptions.stdIn,
createSNoptions.stdOut,
logSQL))
Expand Down
Loading