diff --git a/tsl/src/hypercore/hypercore_handler.c b/tsl/src/hypercore/hypercore_handler.c index 42018c9febc..66c94d61e66 100644 --- a/tsl/src/hypercore/hypercore_handler.c +++ b/tsl/src/hypercore/hypercore_handler.c @@ -1509,6 +1509,8 @@ typedef struct ConversionState Oid relid; RelationSize before_size; Tuplesortstate *tuplesortstate; + MemoryContext mcxt; + MemoryContextCallback cb; } ConversionState; static ConversionState *conversionstate = NULL; @@ -3391,6 +3393,83 @@ hypercore_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate, return false; } +/* + * Cleanup callback for Hypercore conversion state. + * + * The cleanup happens when the conversion state's memory context is + * destroyed. It ensures cleanup of tuplesort state, unless it was already + * performed. + * + * This is to cover two cases: + * + * 1. The tuplesort state is released due to normal processing and a call to + * tuplesort_end(), in which case the tuplesort state is released before the + * conversion state. This callback should therefore not call tuplesort_end(). + * + * 2. The tuplesort state is released as a result of an ERROR, in which case + * the tuplesort_end() is called in this callback as a result of the + * conversion state being cleaned up. + */ +static void +conversionstate_cleanup(void *arg) +{ + ConversionState *state = arg; + + if (state->tuplesortstate) + { + tuplesort_end(state->tuplesortstate); + state->tuplesortstate = NULL; + } + + if (conversionstate) + { + Assert(state == conversionstate); + conversionstate = NULL; + } +} + +static ConversionState * +conversionstate_create(const HypercoreInfo *hcinfo, const Relation rel) +{ + CompressionSettings *settings = ts_compression_settings_get(hcinfo->compressed_relid); + Tuplesortstate *tuplesortstate; + MemoryContext mcxt; + MemoryContext oldmcxt; + ConversionState *state; + + oldmcxt = MemoryContextSwitchTo(PortalContext); + /* + * We want to ensure the tuplesort state is cleaned up by calling + * tuplesort_end() in case of failures. This is necessary to release disk + * resources. + * + * A memory context callback is used for this purpose. The callback is + * attached to the Hypercore conversion state. The tuplesort state will + * allocate its own child context, but they cannot be children of the + * conversion memory context because children are freed before the + * parent. Instead, make both the tuplesort and conversion state children + * of PortalContext. Since they are destroyed in reverse order, the memory + * context callback for the conversion state can. in case of error, call + * tuplesort_end() before the tuplesort is freed. + */ + tuplesortstate = compression_create_tuplesort_state(settings, rel); + mcxt = AllocSetContextCreate(PortalContext, "Hypercore conversion", ALLOCSET_DEFAULT_SIZES); + + state = MemoryContextAlloc(mcxt, sizeof(ConversionState)); + state->mcxt = mcxt; + state->before_size = ts_relation_size_impl(RelationGetRelid(rel)); + state->tuplesortstate = tuplesortstate; + Assert(state->tuplesortstate); + state->relid = RelationGetRelid(rel); + state->cb.arg = state; + state->cb.func = conversionstate_cleanup; + conversionstate = state; + MemoryContextRegisterResetCallback(state->mcxt, &state->cb); + MemoryContextSwitchTo(oldmcxt); + + return state; +} + /* * Convert a table to Hypercore. * @@ -3401,7 +3480,7 @@ convert_to_hypercore(Oid relid) { Relation relation = table_open(relid, AccessShareLock); bool compress_chunk_created; - HypercoreInfo *hsinfo = lazy_build_hypercore_info_cache(relation, + HypercoreInfo *hcinfo = lazy_build_hypercore_info_cache(relation, false /* create constraints */, &compress_chunk_created); @@ -3410,21 +3489,13 @@ convert_to_hypercore(Oid relid) /* A compressed relation already exists, so converting from legacy * compression. It is only necessary to create the proxy vacuum * index. */ - create_proxy_vacuum_index(relation, hsinfo->compressed_relid); + create_proxy_vacuum_index(relation, hcinfo->compressed_relid); table_close(relation, AccessShareLock); return; } - MemoryContext oldcxt = MemoryContextSwitchTo(CacheMemoryContext); - ConversionState *state = palloc0(sizeof(ConversionState)); - CompressionSettings *settings = ts_compression_settings_get(hsinfo->compressed_relid); - state->before_size = ts_relation_size_impl(relid); - state->tuplesortstate = compression_create_tuplesort_state(settings, relation); - Assert(state->tuplesortstate); - state->relid = relid; - conversionstate = state; - MemoryContextSwitchTo(oldcxt); - table_close(relation, AccessShareLock); + conversionstate = conversionstate_create(hcinfo, relation); + table_close(relation, NoLock); } /* @@ -3504,14 +3575,6 @@ hypercore_xact_event(XactEvent event, void *arg) list_free(cleanup_relids); cleanup_relids = NIL; } - - if (conversionstate) - { - if (conversionstate->tuplesortstate) - tuplesort_end(conversionstate->tuplesortstate); - pfree(conversionstate); - conversionstate = NULL; - } } static void @@ -3598,7 +3661,9 @@ convert_to_hypercore_finish(Oid relid) row_compressor.num_compressed_rows, row_compressor.num_compressed_rows); - conversionstate = NULL; + MemoryContextDelete(conversionstate->mcxt); + /* Deleting the memorycontext should reset the global conversionstate pointer to NULL */ + Assert(conversionstate == NULL); } /*