Skip to content

Commit

Permalink
Optimize conflict do update (upserts) on Hypercore TAM
Browse files Browse the repository at this point in the history
When a conflict-do-update statement executes on a Hypercore TAM table,
the conflicting TID (and slot) can point to a "row" in a compressed
segment. Therefore, it is possible to directly decompress the
conflicting segment before executing the update statement.

In contrast, the existing on-conflict handling for compression doesn't
have a unique index that points to the conflicting tuple, so it first
needs to scan for a conflicting segment on the compressed chunk, which
might, in some cases, be a full sequence scan on the compressed chunk,
and decompression of the segments that might conflict.

On a synthetic test case, the improvements to upserts are significant,
about 32x faster execution time for a single row upsert, while buffers
read is reduced by 2220x. This makes the upsert performance on par
with that of non-compressed hypertables.
  • Loading branch information
erimatnor committed Dec 18, 2024
1 parent 1a43266 commit 93a5a92
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 10 deletions.
3 changes: 3 additions & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ typedef struct CrossModuleFunctions
PGFunction decompress_chunk;
void (*decompress_batches_for_insert)(const ChunkInsertState *state, TupleTableSlot *slot);
bool (*decompress_target_segments)(HypertableModifyState *ht_state);
int (*hypercore_decompress_update_segment)(Relation relation, const ItemPointer ctid,
TupleTableSlot *slot, Snapshot snapshot,
ItemPointer new_tid);
/* The compression functions below are not installed in SQL as part of create extension;
* They are installed and tested during testing scripts. They are exposed in cross-module
* functions because they may be very useful for debugging customer problems if the sql
Expand Down
15 changes: 5 additions & 10 deletions src/nodes/chunk_dispatch/chunk_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,22 +168,16 @@ ts_chunk_dispatch_decompress_batches_for_insert(ChunkDispatch *dispatch, ChunkIn
{
if (cis->chunk_compressed)
{
OnConflictAction onconflict_action = ts_chunk_dispatch_get_on_conflict_action(dispatch);

if (cis->use_tam && onconflict_action != ONCONFLICT_UPDATE)
{
/* With our own TAM, a unique index covers both the compressed and
* non-compressed data, so there is no need to decompress anything
* when doing inserts. */
}
/*
* If this is an INSERT into a compressed chunk with UNIQUE or
* PRIMARY KEY constraints we need to make sure any batches that could
* potentially lead to a conflict are in the decompressed chunk so
* postgres can do proper constraint checking.
*/
else if (ts_cm_functions->decompress_batches_for_insert)
if (ts_cm_functions->decompress_batches_for_insert)
{
OnConflictAction onconflict_action = ts_chunk_dispatch_get_on_conflict_action(dispatch);

ts_cm_functions->decompress_batches_for_insert(cis, slot);

/* mark rows visible */
Expand Down Expand Up @@ -445,7 +439,8 @@ chunk_dispatch_exec(CustomScanState *node)
on_chunk_insert_state_changed,
state);

ts_chunk_dispatch_decompress_batches_for_insert(dispatch, cis, slot);
if (!cis->use_tam)
ts_chunk_dispatch_decompress_batches_for_insert(dispatch, cis, slot);

MemoryContextSwitchTo(old);

Expand Down
34 changes: 34 additions & 0 deletions src/nodes/hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
#include <postgres.h>
#include <access/tupdesc.h>
#include <access/xact.h>
#include <catalog/pg_attribute.h>
#include <catalog/pg_type.h>
#include <executor/execPartition.h>
Expand All @@ -31,6 +32,7 @@
#include "hypertable_modify.h"
#include "nodes/chunk_append/chunk_append.h"
#include "nodes/chunk_dispatch/chunk_dispatch.h"
#include "utils.h"

static void fireASTriggers(ModifyTableState *node);
static void fireBSTriggers(ModifyTableState *node);
Expand Down Expand Up @@ -2391,6 +2393,38 @@ ExecOnConflictUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
ExecWithCheckOptions(WCO_RLS_CONFLICT_CHECK, resultRelInfo, existing, mtstate->ps.state);
}

/*
* If the target relation is using Hypercore TAM, the conflict resolution
* index might point to a compressed segment containing the conflicting
* row. It is possible to decompress the segment immediately so that the
* update can proceed on the decompressed row.
*/
if (ts_is_hypercore_am(resultRelInfo->ri_RelationDesc->rd_rel->relam))
{
ItemPointerData new_tid;
int ntuples =
ts_cm_functions->hypercore_decompress_update_segment(resultRelInfo->ri_RelationDesc,
conflictTid,
existing,
context->estate->es_snapshot,
&new_tid);

if (ntuples > 0)
{
/*
* The conflicting row was decompressed, so must update the
* conflictTid to point to the decompressed row.
*/
ItemPointerCopy(&new_tid, conflictTid);
/*
* Since data was decompressed, the command counter was
* incremented to make it visible. Make sure the executor uses the
* latest command ID to see the changes.
*/
context->estate->es_output_cid = GetCurrentCommandId(true);
}
}

/* Project the new tuple version */
ExecProject(resultRelInfo->ri_onConflict->oc_ProjInfo);

Expand Down
78 changes: 78 additions & 0 deletions tsl/src/hypercore/hypercore_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <access/attnum.h>
#include <access/heapam.h>
#include <access/hio.h>
#include <access/htup_details.h>
#include <access/rewriteheap.h>
#include <access/sdir.h>
#include <access/skey.h>
Expand Down Expand Up @@ -49,6 +50,7 @@
#include <utils/palloc.h>
#include <utils/rel.h>
#include <utils/sampling.h>
#include <utils/snapmgr.h>
#include <utils/syscache.h>
#include <utils/tuplesort.h>
#include <utils/typcache.h>
Expand Down Expand Up @@ -1787,6 +1789,82 @@ hypercore_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, Snapsh
return result;
}

/*
* Decompress a segment that contains the row given by ctid.
*
* This function is called during an upsert (ON CONFLICT DO UPDATE), where the
* conflicting row points to a compressed segment that needs to be
* decompressed before the update can take place. This function is used to
* decompress that segment into a set of individual rows and insert them into
* the non-compressed region.
*
* Returns the number of rows in the segment that were decompressed, or 0 if
* the TID pointed to a regular (non-compressed) tuple. If any rows are
* decompressed, the TID of the de-compressed conflicting row is returned via
* "new_ctid". If no rows were decompressed, the value of "new_ctid" is
* undefined.
*/
int
hypercore_decompress_update_segment(Relation relation, const ItemPointer ctid, TupleTableSlot *slot,
Snapshot snapshot, ItemPointer new_ctid)
{
HypercoreInfo *hcinfo;
Relation crel;
TupleTableSlot *cslot;
ItemPointerData decoded_tid;
TM_Result result;
TM_FailureData tmfd;
int n_batch_rows = 0;
uint16 tuple_index;
bool should_free;

/* Nothing to do if this is not a compressed segment */
if (!is_compressed_tid(ctid))
return 0;

Assert(TTS_IS_ARROWTUPLE(slot));
Assert(!TTS_EMPTY(slot));
Assert(ItemPointerEquals(ctid, &slot->tts_tid));

hcinfo = RelationGetHypercoreInfo(relation);
crel = table_open(hcinfo->compressed_relid, RowExclusiveLock);
tuple_index = hypercore_tid_decode(&decoded_tid, ctid);
cslot = arrow_slot_get_compressed_slot(slot, NULL);
HeapTuple tuple = ExecFetchSlotHeapTuple(cslot, false, &should_free);

RowDecompressor decompressor = build_decompressor(crel, relation);
heap_deform_tuple(tuple,
RelationGetDescr(crel),
decompressor.compressed_datums,
decompressor.compressed_is_nulls);

/* Must delete the segment before calling the decompression function below
* or otherwise index updates will lead to conflicts */
result = table_tuple_delete(decompressor.in_rel,
&cslot->tts_tid,
decompressor.mycid,
snapshot,
InvalidSnapshot,
true,
&tmfd,
false);

Ensure(result == TM_Ok, "could not delete compressed segment, result: %u", result);

n_batch_rows = row_decompressor_decompress_row_to_table(&decompressor);
/* Return the TID of the decompressed conflicting tuple. Tuple index is
* 1-indexed, so subtract 1. */
slot = decompressor.decompressed_slots[tuple_index - 1];
ItemPointerCopy(&slot->tts_tid, new_ctid);

/* Need to make decompressed (and deleted segment) visible */
CommandCounterIncrement();
row_decompressor_close(&decompressor);
table_close(crel, NoLock);

return n_batch_rows;
}

#if PG16_LT
typedef bool TU_UpdateIndexes;
#endif
Expand Down
3 changes: 3 additions & 0 deletions tsl/src/hypercore/hypercore_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ extern void hypercore_xact_event(XactEvent event, void *arg);
extern bool hypercore_set_truncate_compressed(bool onoff);
extern void hypercore_scan_set_skip_compressed(TableScanDesc scan, bool skip);
extern void hypercore_skip_compressed_data_for_relation(Oid relid);
extern int hypercore_decompress_update_segment(Relation relation, const ItemPointer ctid,
TupleTableSlot *slot, Snapshot snapshot,
ItemPointer new_ctid);

typedef struct ColumnCompressionSettings
{
Expand Down
1 change: 1 addition & 0 deletions tsl/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ CrossModuleFunctions tsl_cm_functions = {
.decompress_target_segments = decompress_target_segments,
.hypercore_handler = hypercore_handler,
.hypercore_proxy_handler = hypercore_proxy_handler,
.hypercore_decompress_update_segment = hypercore_decompress_update_segment,
.is_compressed_tid = tsl_is_compressed_tid,
.ddl_command_start = tsl_ddl_command_start,
.ddl_command_end = tsl_ddl_command_end,
Expand Down

0 comments on commit 93a5a92

Please sign in to comment.