Skip to content

Commit

Permalink
Tranlog block-lsn and test
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Hannum <[email protected]>
  • Loading branch information
markhannum committed Dec 5, 2024
1 parent ed43f89 commit 1b72251
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 13 deletions.
59 changes: 47 additions & 12 deletions sqlite/ext/comdb2/tranlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,18 @@
#define TRANLOG_COLUMN_STOP 1
#define TRANLOG_COLUMN_FLAGS 2
#define TRANLOG_COLUMN_TIMEOUT 3
#define TRANLOG_COLUMN_LSN 4
#define TRANLOG_COLUMN_RECTYPE 5
#define TRANLOG_COLUMN_GENERATION 6
#define TRANLOG_COLUMN_TIMESTAMP 7
#define TRANLOG_COLUMN_LOG 8
#define TRANLOG_COLUMN_TXNID 9
#define TRANLOG_COLUMN_UTXNID 10
#define TRANLOG_COLUMN_MAXUTXNID 11
#define TRANLOG_COLUMN_CHILDUTXNID 12
#define TRANLOG_COLUMN_LSN_FILE 13 /* Useful for sorting records by LSN */
#define TRANLOG_COLUMN_LSN_OFFSET 14
#define TRANLOG_COLUMN_BLOCKLSN 4
#define TRANLOG_COLUMN_LSN 5
#define TRANLOG_COLUMN_RECTYPE 6
#define TRANLOG_COLUMN_GENERATION 7
#define TRANLOG_COLUMN_TIMESTAMP 8
#define TRANLOG_COLUMN_LOG 9
#define TRANLOG_COLUMN_TXNID 10
#define TRANLOG_COLUMN_UTXNID 11
#define TRANLOG_COLUMN_MAXUTXNID 12
#define TRANLOG_COLUMN_CHILDUTXNID 13
#define TRANLOG_COLUMN_LSN_FILE 14 /* Useful for sorting records by LSN */
#define TRANLOG_COLUMN_LSN_OFFSET 15

extern int gbl_apprec_gen;
int gbl_tranlog_default_timeout = 30;
Expand All @@ -56,9 +57,11 @@ struct tranlog_cursor {
DB_LSN curLsn; /* Current LSN */
DB_LSN minLsn; /* Minimum LSN */
DB_LSN maxLsn; /* Maximum LSN */
DB_LSN blockLsn; /* Block until this LSN */
char *minLsnStr;
char *maxLsnStr;
char *curLsnStr;
char *blockLsnStr;
int flags; /* 1 if we should block */
int hitLast;
int notDurable;
Expand All @@ -81,7 +84,7 @@ static int tranlogConnect(
int rc;

rc = sqlite3_declare_vtab(db,
"CREATE TABLE x(minlsn hidden,maxlsn hidden,flags hidden,timeout hidden,lsn,rectype integer,generation integer,timestamp integer,payload,txnid integer,utxnid integer,maxutxnid hidden, childutxnid hidden, lsnfile hidden, lsnoffset hidden)");
"CREATE TABLE x(minlsn hidden,maxlsn hidden,flags hidden,timeout hidden,blocklsn hidden,lsn,rectype integer,generation integer,timestamp integer,payload,txnid integer,utxnid integer,maxutxnid hidden, childutxnid hidden, lsnfile hidden, lsnoffset hidden)");
if( rc==SQLITE_OK ){
pNew = *ppVtab = sqlite3_malloc( sizeof(*pNew) );
if( pNew==0 ) return SQLITE_NOMEM;
Expand Down Expand Up @@ -123,6 +126,8 @@ static int tranlogClose(sqlite3_vtab_cursor *cur){
sqlite3_free(pCur->maxLsnStr);
if (pCur->curLsnStr)
sqlite3_free(pCur->curLsnStr);
if (pCur->blockLsnStr)
sqlite3_free(pCur->blockLsnStr);
sqlite3_free(pCur);
return SQLITE_OK;
}
Expand Down Expand Up @@ -218,6 +223,7 @@ static int tranlogNext(sqlite3_vtab_cursor *cur)

if (pCur->timeout > 0 && (comdb2_time_epoch() - pCur->starttime) > pCur->timeout) {
pCur->hitLast = 1;
pCur->notDurable = 1;
return SQLITE_OK;
}

Expand Down Expand Up @@ -251,6 +257,12 @@ static int tranlogNext(sqlite3_vtab_cursor *cur)
if ((rc = comdb2_sql_tick()) != 0)
return rc;

if (pCur->blockLsn.file > 0 &&
log_compare(&pCur->curLsn, &pCur->blockLsn) >= 0) {
pCur->hitLast = 1;
return SQLITE_OK;
}

if (pCur->timeout > 0 && (comdb2_time_epoch() - pCur->starttime) > pCur->timeout) {
pCur->hitLast = 1;
return SQLITE_OK;
Expand Down Expand Up @@ -572,6 +584,13 @@ static int tranlogColumn(
case TRANLOG_COLUMN_TIMEOUT:
sqlite3_result_int64(ctx, pCur->timeout);
break;
case TRANLOG_COLUMN_BLOCKLSN:
if (!pCur->blockLsnStr) {
pCur->blockLsnStr = sqlite3_malloc(32);
}
tranlog_lsn_to_str(pCur->blockLsnStr, &pCur->blockLsn);
sqlite3_result_text(ctx, pCur->blockLsnStr, -1, NULL);
break;
case TRANLOG_COLUMN_LSN:
if (!pCur->curLsnStr) {
pCur->curLsnStr = sqlite3_malloc(32);
Expand Down Expand Up @@ -755,6 +774,13 @@ static int tranlogFilter(
int64_t timeout = sqlite3_value_int64(argv[i++]);
pCur->timeout = timeout;
}
bzero(&pCur->blockLsn, sizeof(pCur->blockLsn));
if( idxNum & 16 ){
const unsigned char *blockLsn = sqlite3_value_text(argv[i++]);
if (blockLsn && parse_lsn(blockLsn, &pCur->blockLsn)) {
return SQLITE_CONV_ERROR;
}
}
pCur->iRowid = 1;
return SQLITE_OK;
}
Expand All @@ -769,6 +795,7 @@ static int tranlogBestIndex(
int stopIdx = -1; /* Index of the stop= constraint, or -1 if none */
int flagsIdx = -1; /* Index of the block= constraint, block waiting if set */
int timeoutIdx = -1;
int blockLsnIdx = -1;
int nArg = 0; /* Number of arguments that seriesFilter() expects */

const struct sqlite3_index_constraint *pConstraint;
Expand All @@ -793,6 +820,10 @@ static int tranlogBestIndex(
timeoutIdx = i;
idxNum |= 8;
break;
case TRANLOG_COLUMN_BLOCKLSN:
blockLsnIdx = i;
idxNum |= 16;
break;
}
}
if( startIdx>=0 ){
Expand All @@ -811,6 +842,10 @@ static int tranlogBestIndex(
pIdxInfo->aConstraintUsage[timeoutIdx].argvIndex = ++nArg;
pIdxInfo->aConstraintUsage[timeoutIdx].omit = 1;
}
if( blockLsnIdx>=0 ){
pIdxInfo->aConstraintUsage[blockLsnIdx].argvIndex = ++nArg;
pIdxInfo->aConstraintUsage[blockLsnIdx].omit = 1;
}
if( (idxNum & 3)==3 ){
/* Both start= and stop= boundaries are available. This is the
** the preferred case */
Expand Down
49 changes: 48 additions & 1 deletion tests/phys_rep_tiered.test/runit
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,48 @@ function run_generated_tests()
done
}

function tranlog_blocklsn()
{
echo "== tranlog blocklsn test"
local master=$(getmaster)
local currentlsn=$($CDB2SQL_EXE -tabs ${CDB2_OPTIONS} $DBNAME default "select lsn from comdb2_transaction_logs(NULL, NULL, 0x4) limit 1")
local lsn=$(echo $currentlsn | tr -d {})
local file=${lsn%:*}
local offset=${lsn#*:}
local waitfile=$(( file + 1 ))
local waitlsn="{$waitfile:$offset}"

echo "Current lsn is $currentlsn, blocking until $waitlsn"
$CDB2SQL_EXE -tabs ${CDB2_OPTIONS} $DBNAME default "select * from comdb2_transaction_logs('${currentlsn}', NULL, 0x1, 0, '${waitlsn}')" > /dev/null &
bpid=$!

echo "Sleeping for 10"
sleep 10

if kill -0 $bpid 2>/dev/null ; then
echo "Blocking transaction still blocking"
else
kill -9 $bpid
cleanFailExit "Blocking transaction returned "
fi

echo "Pushing transaction log forward 3 logs"
$CDB2SQL_EXE -tabs ${CDB2_OPTIONS} $DBNAME --host $master "exec procedure sys.cmd.send('pushlogs 3')"

local exited=0

while [[ "$exited" -eq 0 ]]; do
sleep 1
if kill -0 $bpid 2>/dev/null ; then
echo "Blocking transaction still blocking"
currentlsn=$($CDB2SQL_EXE -tabs ${CDB2_OPTIONS} $DBNAME default "select lsn from comdb2_transaction_logs(NULL, NULL, 0x4) limit 1")
echo "Current lsn is $currentlsn, blocking until $waitlsn"
else
exited=1
fi
done
}

function tranlog_timeout()
{
typeset begin
Expand Down Expand Up @@ -1469,7 +1511,12 @@ function run_tests

testcase="tranlog_timeout"
testcase_preamble $testcase
tranlog_timeout ${REPL_META_DBNAME} ${REPL_META_HOST}
tranlog_timeout
testcase_finish $testcase

testcase="tranlog_blocklsn"
testcase_preamble $testcase
tranlog_blocklsn
testcase_finish $testcase

testcase="revconn_latency $lastNode"
Expand Down

0 comments on commit 1b72251

Please sign in to comment.