Skip to content

Commit

Permalink
Refactor conversionstate cleanup in Hypercore TAM
Browse files Browse the repository at this point in the history
When converting a table to using Hypercore TAM, the table is rewritten
using a tuple sort state. This state was allocated on the
CacheMemoryContext and cleaned up in a transaction callback in case of
abort.

Instead, allocate the conversion state in its own memory context,
which is a child of PortalContext. That makes it possible to do clean
up in a memory context callback that guarantees cleanup when the
conversion command is complete or when a failure occurs.
  • Loading branch information
erimatnor committed Dec 11, 2024
1 parent 7b32f8d commit ce6be27
Showing 1 changed file with 86 additions and 21 deletions.
107 changes: 86 additions & 21 deletions tsl/src/hypercore/hypercore_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -1509,6 +1509,8 @@ typedef struct ConversionState
Oid relid;
RelationSize before_size;
Tuplesortstate *tuplesortstate;
MemoryContext mcxt;
MemoryContextCallback cb;
} ConversionState;

static ConversionState *conversionstate = NULL;
Expand Down Expand Up @@ -3391,6 +3393,83 @@ hypercore_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate,
return false;
}

/*
* Cleanup callback for Hypercore conversion state.
*
* The cleanup happens when the conversion state's memory context is
* destroyed. It ensures cleanup of tuplesort state, unless it was already
* performed.
*
* This is to cover two cases:
*
* 1. The tuplesort state is released due to normal processing and a call to
* tuplesort_end(), in which case the tuplesort state is released before the
* conversion state. This callback should therefore not call tuplesort_end().
*
* 2. The tuplesort state is released as a result of an ERROR, in which case
* the tuplesort_end() is called in this callback as a result of the
* conversion state being cleaned up.
*/
static void
conversionstate_cleanup(void *arg)
{
ConversionState *state = arg;

if (state->tuplesortstate)
{
tuplesort_end(state->tuplesortstate);
state->tuplesortstate = NULL;
}

if (conversionstate)
{
Assert(state == conversionstate);
conversionstate = NULL;
}
}

static ConversionState *
conversionstate_create(const HypercoreInfo *hcinfo, const Relation rel)
{
CompressionSettings *settings = ts_compression_settings_get(hcinfo->compressed_relid);
Tuplesortstate *tuplesortstate;
MemoryContext mcxt;
MemoryContext oldmcxt;
ConversionState *state;

oldmcxt = MemoryContextSwitchTo(PortalContext);
/*
* We want to ensure the tuplesort state is cleaned up by calling
* tuplesort_end() in case of failures. This is necessary to release disk
* resources.
*
* A memory context callback is used for this purpose. The callback is
* attached to the Hypercore conversion state. The tuplesort state will
* allocate its own child context, but they cannot be children of the
* conversion memory context because children are freed before the
* parent. Instead, make both the tuplesort and conversion state children
* of PortalContext. Since they are destroyed in reverse order, the memory
* context callback for the conversion state can. in case of error, call
* tuplesort_end() before the tuplesort is freed.
*/
tuplesortstate = compression_create_tuplesort_state(settings, rel);
mcxt = AllocSetContextCreate(PortalContext, "Hypercore conversion", ALLOCSET_DEFAULT_SIZES);

state = MemoryContextAlloc(mcxt, sizeof(ConversionState));
state->mcxt = mcxt;
state->before_size = ts_relation_size_impl(RelationGetRelid(rel));
state->tuplesortstate = tuplesortstate;
Assert(state->tuplesortstate);
state->relid = RelationGetRelid(rel);
state->cb.arg = state;
state->cb.func = conversionstate_cleanup;
conversionstate = state;
MemoryContextRegisterResetCallback(state->mcxt, &state->cb);
MemoryContextSwitchTo(oldmcxt);

return state;
}

/*
* Convert a table to Hypercore.
*
Expand All @@ -3401,7 +3480,7 @@ convert_to_hypercore(Oid relid)
{
Relation relation = table_open(relid, AccessShareLock);
bool compress_chunk_created;
HypercoreInfo *hsinfo = lazy_build_hypercore_info_cache(relation,
HypercoreInfo *hcinfo = lazy_build_hypercore_info_cache(relation,
false /* create constraints */,
&compress_chunk_created);

Expand All @@ -3410,21 +3489,13 @@ convert_to_hypercore(Oid relid)
/* A compressed relation already exists, so converting from legacy
* compression. It is only necessary to create the proxy vacuum
* index. */
create_proxy_vacuum_index(relation, hsinfo->compressed_relid);
create_proxy_vacuum_index(relation, hcinfo->compressed_relid);
table_close(relation, AccessShareLock);
return;
}

MemoryContext oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
ConversionState *state = palloc0(sizeof(ConversionState));
CompressionSettings *settings = ts_compression_settings_get(hsinfo->compressed_relid);
state->before_size = ts_relation_size_impl(relid);
state->tuplesortstate = compression_create_tuplesort_state(settings, relation);
Assert(state->tuplesortstate);
state->relid = relid;
conversionstate = state;
MemoryContextSwitchTo(oldcxt);
table_close(relation, AccessShareLock);
conversionstate = conversionstate_create(hcinfo, relation);
table_close(relation, NoLock);
}

/*
Expand Down Expand Up @@ -3504,14 +3575,6 @@ hypercore_xact_event(XactEvent event, void *arg)
list_free(cleanup_relids);
cleanup_relids = NIL;
}

if (conversionstate)
{
if (conversionstate->tuplesortstate)
tuplesort_end(conversionstate->tuplesortstate);
pfree(conversionstate);
conversionstate = NULL;
}
}

static void
Expand Down Expand Up @@ -3598,7 +3661,9 @@ convert_to_hypercore_finish(Oid relid)
row_compressor.num_compressed_rows,
row_compressor.num_compressed_rows);

conversionstate = NULL;
MemoryContextDelete(conversionstate->mcxt);
/* Deleting the memorycontext should reset the global conversionstate pointer to NULL */
Assert(conversionstate == NULL);
}

/*
Expand Down

0 comments on commit ce6be27

Please sign in to comment.