diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index e471a867b25..3be10e468bc 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -132,6 +132,9 @@ typedef struct CrossModuleFunctions PGFunction decompress_chunk; void (*decompress_batches_for_insert)(const ChunkInsertState *state, TupleTableSlot *slot); bool (*decompress_target_segments)(HypertableModifyState *ht_state); + int (*hypercore_decompress_conflict_segment)(Relation relation, const ItemPointer ctid, + TupleTableSlot *slot, Snapshot snapshot, + ItemPointer new_tid); /* The compression functions below are not installed in SQL as part of create extension; * They are installed and tested during testing scripts. They are exposed in cross-module * functions because they may be very useful for debugging customer problems if the sql diff --git a/src/nodes/chunk_dispatch/chunk_dispatch.c b/src/nodes/chunk_dispatch/chunk_dispatch.c index 407c66fee91..9d0839a7849 100644 --- a/src/nodes/chunk_dispatch/chunk_dispatch.c +++ b/src/nodes/chunk_dispatch/chunk_dispatch.c @@ -168,22 +168,16 @@ ts_chunk_dispatch_decompress_batches_for_insert(ChunkDispatch *dispatch, ChunkIn { if (cis->chunk_compressed) { - OnConflictAction onconflict_action = ts_chunk_dispatch_get_on_conflict_action(dispatch); - - if (cis->use_tam && onconflict_action != ONCONFLICT_UPDATE) - { - /* With our own TAM, a unique index covers both the compressed and - * non-compressed data, so there is no need to decompress anything - * when doing inserts. */ - } /* * If this is an INSERT into a compressed chunk with UNIQUE or * PRIMARY KEY constraints we need to make sure any batches that could * potentially lead to a conflict are in the decompressed chunk so * postgres can do proper constraint checking. */ - else if (ts_cm_functions->decompress_batches_for_insert) + if (ts_cm_functions->decompress_batches_for_insert) { + OnConflictAction onconflict_action = ts_chunk_dispatch_get_on_conflict_action(dispatch); + ts_cm_functions->decompress_batches_for_insert(cis, slot); /* mark rows visible */ @@ -445,7 +439,8 @@ chunk_dispatch_exec(CustomScanState *node) on_chunk_insert_state_changed, state); - ts_chunk_dispatch_decompress_batches_for_insert(dispatch, cis, slot); + if (!cis->use_tam) + ts_chunk_dispatch_decompress_batches_for_insert(dispatch, cis, slot); MemoryContextSwitchTo(old); diff --git a/src/nodes/hypertable_modify.c b/src/nodes/hypertable_modify.c index 255bd7d129c..eeeded15cb5 100644 --- a/src/nodes/hypertable_modify.c +++ b/src/nodes/hypertable_modify.c @@ -5,6 +5,7 @@ */ #include #include +#include #include #include #include @@ -31,6 +32,7 @@ #include "hypertable_modify.h" #include "nodes/chunk_append/chunk_append.h" #include "nodes/chunk_dispatch/chunk_dispatch.h" +#include "utils.h" static void fireASTriggers(ModifyTableState *node); static void fireBSTriggers(ModifyTableState *node); @@ -2391,6 +2393,38 @@ ExecOnConflictUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ExecWithCheckOptions(WCO_RLS_CONFLICT_CHECK, resultRelInfo, existing, mtstate->ps.state); } + /* + * If the target relation is using Hypercore TAM, the conflict resolution + * index might point to a compressed segment containing the conflicting + * row. It is possible to decompress the segment immediately so that the + * update can proceed on the decompressed row. + */ + if (ts_is_hypercore_am(resultRelInfo->ri_RelationDesc->rd_rel->relam)) + { + ItemPointerData new_tid; + int ntuples = + ts_cm_functions->hypercore_decompress_conflict_segment(resultRelInfo->ri_RelationDesc, + conflictTid, + existing, + context->estate->es_snapshot, + &new_tid); + + if (ntuples > 0) + { + /* + * The conflicting row was decompressed, so must update the + * conflictTid to point to the decompressed row. + */ + ItemPointerCopy(&new_tid, conflictTid); + /* + * Since data was decompressed, the command counter was + * incremented to make it visible. Make sure the executor uses the + * latest command ID to see the changes. + */ + context->estate->es_output_cid = GetCurrentCommandId(true); + } + } + /* Project the new tuple version */ ExecProject(resultRelInfo->ri_onConflict->oc_ProjInfo); diff --git a/tsl/src/hypercore/hypercore_handler.c b/tsl/src/hypercore/hypercore_handler.c index 66c94d61e66..3114dedb64b 100644 --- a/tsl/src/hypercore/hypercore_handler.c +++ b/tsl/src/hypercore/hypercore_handler.c @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -49,6 +50,7 @@ #include #include #include +#include #include #include #include @@ -1787,6 +1789,87 @@ hypercore_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, Snapsh return result; } +/* + * Decompress a segment that contains the row given by ctid. + * + * This function is typically called during an upsert (ON CONFLICT DO UPDATE), + * where the conflicting row points to a compressed segment that needs to be + * decompressed before the update can take place. This function is used to + * decompress that segment into a set of individual rows. + * + * Returns the number of rows in the segment that were decompressed, or 0 if + * the TID pointed to a regular (non-compressed) tuple. The TID of the + * de-compressed conflicting row is returned via "new_ctid". + */ +int +hypercore_decompress_conflict_segment(Relation relation, const ItemPointer ctid, + TupleTableSlot *slot, Snapshot snapshot, ItemPointer new_ctid) +{ + HypercoreInfo *hcinfo; + Relation crel; + TupleTableSlot *cslot; + ItemPointerData decoded_tid; + TM_Result result; + TM_FailureData tmfd; + int n_batch_rows = 0; + uint16 tuple_index; + bool should_free; + + /* Nothing to do if this is not a compressed segment */ + if (!is_compressed_tid(ctid)) + return 0; + + Assert(TTS_IS_ARROWTUPLE(slot)); + + hcinfo = RelationGetHypercoreInfo(relation); + crel = table_open(hcinfo->compressed_relid, RowExclusiveLock); + tuple_index = hypercore_tid_decode(&decoded_tid, ctid); + + /* Is it guaranteed that "slot" is populated with the conflicting segment + * tuple? Otherwise we need to fetch the compressed segment... Unclear if + * this is ever needed. */ + if (TTS_EMPTY(slot) && !table_tuple_fetch_row_version(relation, ctid, snapshot, slot)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("conflicting compressed segment not found"))); + + Assert(ItemPointerEquals(ctid, &slot->tts_tid)); + cslot = arrow_slot_get_compressed_slot(slot, NULL); + HeapTuple tuple = ExecFetchSlotHeapTuple(cslot, false, &should_free); + + RowDecompressor decompressor = build_decompressor(crel, relation); + heap_deform_tuple(tuple, + RelationGetDescr(crel), + decompressor.compressed_datums, + decompressor.compressed_is_nulls); + + /* Must delete the segment before calling the decompression function below + * or otherwise index updates will lead to conflicts */ + result = table_tuple_delete(decompressor.in_rel, + &cslot->tts_tid, + decompressor.mycid, + snapshot, + InvalidSnapshot, + true, + &tmfd, + false); + + Ensure(result == TM_Ok, "could not delete compressed segment, result: %u", result); + + n_batch_rows = row_decompressor_decompress_row_to_table(&decompressor); + /* Return the TID of the decompressed conflicting tuple. Tuple index is + * 1-indexed, so subtract 1. */ + slot = decompressor.decompressed_slots[tuple_index - 1]; + ItemPointerCopy(&slot->tts_tid, new_ctid); + + /* Need to make decompressed (and deleted segment) visible */ + CommandCounterIncrement(); + row_decompressor_close(&decompressor); + table_close(crel, NoLock); + + return n_batch_rows; +} + #if PG16_LT typedef bool TU_UpdateIndexes; #endif diff --git a/tsl/src/hypercore/hypercore_handler.h b/tsl/src/hypercore/hypercore_handler.h index 71fe74f099c..a2037604d8a 100644 --- a/tsl/src/hypercore/hypercore_handler.h +++ b/tsl/src/hypercore/hypercore_handler.h @@ -30,6 +30,9 @@ extern void hypercore_xact_event(XactEvent event, void *arg); extern bool hypercore_set_truncate_compressed(bool onoff); extern void hypercore_scan_set_skip_compressed(TableScanDesc scan, bool skip); extern void hypercore_skip_compressed_data_for_relation(Oid relid); +extern int hypercore_decompress_conflict_segment(Relation relation, const ItemPointer ctid, + TupleTableSlot *slot, Snapshot snapshot, + ItemPointer new_tid); typedef struct ColumnCompressionSettings { diff --git a/tsl/src/init.c b/tsl/src/init.c index 24f5c917515..d8952e186f2 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -175,6 +175,7 @@ CrossModuleFunctions tsl_cm_functions = { .decompress_target_segments = decompress_target_segments, .hypercore_handler = hypercore_handler, .hypercore_proxy_handler = hypercore_proxy_handler, + .hypercore_decompress_conflict_segment = hypercore_decompress_conflict_segment, .is_compressed_tid = tsl_is_compressed_tid, .ddl_command_start = tsl_ddl_command_start, .ddl_command_end = tsl_ddl_command_end,