diff --git a/tsl/src/compression/CMakeLists.txt b/tsl/src/compression/CMakeLists.txt index e0719823484..75e8486ddd5 100644 --- a/tsl/src/compression/CMakeLists.txt +++ b/tsl/src/compression/CMakeLists.txt @@ -1,12 +1,11 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/api.c - ${CMAKE_CURRENT_SOURCE_DIR}/array.c ${CMAKE_CURRENT_SOURCE_DIR}/compression.c + ${CMAKE_CURRENT_SOURCE_DIR}/compression_dml.c + ${CMAKE_CURRENT_SOURCE_DIR}/compression_scankey.c ${CMAKE_CURRENT_SOURCE_DIR}/compression_storage.c ${CMAKE_CURRENT_SOURCE_DIR}/create.c - ${CMAKE_CURRENT_SOURCE_DIR}/datum_serialize.c - ${CMAKE_CURRENT_SOURCE_DIR}/deltadelta.c - ${CMAKE_CURRENT_SOURCE_DIR}/dictionary.c - ${CMAKE_CURRENT_SOURCE_DIR}/gorilla.c ${CMAKE_CURRENT_SOURCE_DIR}/segment_meta.c) target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) + +add_subdirectory(algorithms) diff --git a/tsl/src/compression/algorithms/CMakeLists.txt b/tsl/src/compression/algorithms/CMakeLists.txt new file mode 100644 index 00000000000..aabea5ad60c --- /dev/null +++ b/tsl/src/compression/algorithms/CMakeLists.txt @@ -0,0 +1,7 @@ +set(SOURCES + ${CMAKE_CURRENT_SOURCE_DIR}/array.c + ${CMAKE_CURRENT_SOURCE_DIR}/datum_serialize.c + ${CMAKE_CURRENT_SOURCE_DIR}/deltadelta.c + ${CMAKE_CURRENT_SOURCE_DIR}/dictionary.c + ${CMAKE_CURRENT_SOURCE_DIR}/gorilla.c) +target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) diff --git a/tsl/src/compression/array.c b/tsl/src/compression/algorithms/array.c similarity index 99% rename from tsl/src/compression/array.c rename to tsl/src/compression/algorithms/array.c index fbe82618d18..69852d6e114 100644 --- a/tsl/src/compression/array.c +++ b/tsl/src/compression/algorithms/array.c @@ -14,11 +14,11 @@ #include #include -#include "compression/array.h" +#include "array.h" #include "compression/compression.h" -#include "compression/simple8b_rle.h" -#include "compression/simple8b_rle_bitmap.h" #include "datum_serialize.h" +#include "simple8b_rle.h" +#include "simple8b_rle_bitmap.h" #include "compression/arrow_c_data_interface.h" diff --git a/tsl/src/compression/array.h b/tsl/src/compression/algorithms/array.h similarity index 100% rename from tsl/src/compression/array.h rename to tsl/src/compression/algorithms/array.h diff --git a/tsl/src/compression/datum_serialize.c b/tsl/src/compression/algorithms/datum_serialize.c similarity index 99% rename from tsl/src/compression/datum_serialize.c rename to tsl/src/compression/algorithms/datum_serialize.c index 8cdaa8ff1cc..18d79e76634 100644 --- a/tsl/src/compression/datum_serialize.c +++ b/tsl/src/compression/algorithms/datum_serialize.c @@ -17,10 +17,9 @@ #include #include -#include "compat/compat.h" +#include #include "datum_serialize.h" - -#include "compression.h" +#include typedef struct DatumSerializer { diff --git a/tsl/src/compression/datum_serialize.h b/tsl/src/compression/algorithms/datum_serialize.h similarity index 100% rename from tsl/src/compression/datum_serialize.h rename to tsl/src/compression/algorithms/datum_serialize.h diff --git a/tsl/src/compression/deltadelta.c b/tsl/src/compression/algorithms/deltadelta.c similarity index 99% rename from tsl/src/compression/deltadelta.c rename to tsl/src/compression/algorithms/deltadelta.c index ebdc257c266..d44b305b712 100644 --- a/tsl/src/compression/deltadelta.c +++ b/tsl/src/compression/algorithms/deltadelta.c @@ -4,7 +4,7 @@ * LICENSE-TIMESCALE for a copy of the license. */ -#include "compression/deltadelta.h" +#include "deltadelta.h" #include #include @@ -22,8 +22,8 @@ #include "compression/arrow_c_data_interface.h" #include "compression/compression.h" -#include "compression/simple8b_rle.h" -#include "compression/simple8b_rle_bitmap.h" +#include "simple8b_rle.h" +#include "simple8b_rle_bitmap.h" static uint64 zig_zag_encode(uint64 value); static uint64 zig_zag_decode(uint64 value); diff --git a/tsl/src/compression/deltadelta.h b/tsl/src/compression/algorithms/deltadelta.h similarity index 100% rename from tsl/src/compression/deltadelta.h rename to tsl/src/compression/algorithms/deltadelta.h diff --git a/tsl/src/compression/deltadelta_impl.c b/tsl/src/compression/algorithms/deltadelta_impl.c similarity index 100% rename from tsl/src/compression/deltadelta_impl.c rename to tsl/src/compression/algorithms/deltadelta_impl.c diff --git a/tsl/src/compression/dictionary.c b/tsl/src/compression/algorithms/dictionary.c similarity index 99% rename from tsl/src/compression/dictionary.c rename to tsl/src/compression/algorithms/dictionary.c index 7393f88de34..c7cf51d981f 100644 --- a/tsl/src/compression/dictionary.c +++ b/tsl/src/compression/algorithms/dictionary.c @@ -19,14 +19,14 @@ #include #include -#include "compression/array.h" +#include "array.h" #include "compression/arrow_c_data_interface.h" #include "compression/compression.h" -#include "compression/datum_serialize.h" -#include "compression/dictionary.h" -#include "compression/dictionary_hash.h" -#include "compression/simple8b_rle.h" -#include "compression/simple8b_rle_bitmap.h" +#include "datum_serialize.h" +#include "dictionary.h" +#include "dictionary_hash.h" +#include "simple8b_rle.h" +#include "simple8b_rle_bitmap.h" /* * A compression bitmap is stored as diff --git a/tsl/src/compression/dictionary.h b/tsl/src/compression/algorithms/dictionary.h similarity index 100% rename from tsl/src/compression/dictionary.h rename to tsl/src/compression/algorithms/dictionary.h diff --git a/tsl/src/compression/dictionary_hash.h b/tsl/src/compression/algorithms/dictionary_hash.h similarity index 100% rename from tsl/src/compression/dictionary_hash.h rename to tsl/src/compression/algorithms/dictionary_hash.h diff --git a/tsl/src/compression/float_utils.h b/tsl/src/compression/algorithms/float_utils.h similarity index 100% rename from tsl/src/compression/float_utils.h rename to tsl/src/compression/algorithms/float_utils.h diff --git a/tsl/src/compression/gorilla.c b/tsl/src/compression/algorithms/gorilla.c similarity index 99% rename from tsl/src/compression/gorilla.c rename to tsl/src/compression/algorithms/gorilla.c index 83b9eed6e23..bcc7a543fbf 100644 --- a/tsl/src/compression/gorilla.c +++ b/tsl/src/compression/algorithms/gorilla.c @@ -14,14 +14,14 @@ #include #include -#include "compression/gorilla.h" +#include "gorilla.h" #include "adts/bit_array.h" #include "compression/arrow_c_data_interface.h" #include "compression/compression.h" -#include "compression/simple8b_rle.h" -#include "compression/simple8b_rle_bitmap.h" #include "float_utils.h" +#include "simple8b_rle.h" +#include "simple8b_rle_bitmap.h" /* * Gorilla compressed data is stored as diff --git a/tsl/src/compression/gorilla.h b/tsl/src/compression/algorithms/gorilla.h similarity index 100% rename from tsl/src/compression/gorilla.h rename to tsl/src/compression/algorithms/gorilla.h diff --git a/tsl/src/compression/gorilla_impl.c b/tsl/src/compression/algorithms/gorilla_impl.c similarity index 100% rename from tsl/src/compression/gorilla_impl.c rename to tsl/src/compression/algorithms/gorilla_impl.c diff --git a/tsl/src/compression/simple8b_rle.h b/tsl/src/compression/algorithms/simple8b_rle.h similarity index 100% rename from tsl/src/compression/simple8b_rle.h rename to tsl/src/compression/algorithms/simple8b_rle.h diff --git a/tsl/src/compression/simple8b_rle_bitmap.h b/tsl/src/compression/algorithms/simple8b_rle_bitmap.h similarity index 99% rename from tsl/src/compression/simple8b_rle_bitmap.h rename to tsl/src/compression/algorithms/simple8b_rle_bitmap.h index 921f876a3c9..11f245ee781 100644 --- a/tsl/src/compression/simple8b_rle_bitmap.h +++ b/tsl/src/compression/algorithms/simple8b_rle_bitmap.h @@ -10,7 +10,7 @@ * elements are only 0 and 1. It also counts the number of ones. */ -#include "compression/simple8b_rle.h" +#include "simple8b_rle.h" typedef struct Simple8bRleBitmap { diff --git a/tsl/src/compression/simple8b_rle_decompress_all.h b/tsl/src/compression/algorithms/simple8b_rle_decompress_all.h similarity index 100% rename from tsl/src/compression/simple8b_rle_decompress_all.h rename to tsl/src/compression/algorithms/simple8b_rle_decompress_all.h diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 75c9d7696f3..5db42ada1d9 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -4,68 +4,35 @@ * LICENSE-TIMESCALE for a copy of the license. */ #include -#include -#include -#include -#include -#include #include -#include -#include #include -#include -#include #include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include -#include #include -#include -#include -#include -#include -#include -#include #include #include -#include #include #include "compat/compat.h" -#include "array.h" +#include "algorithms/array.h" +#include "algorithms/deltadelta.h" +#include "algorithms/dictionary.h" +#include "algorithms/gorilla.h" #include "chunk.h" #include "compression.h" #include "create.h" #include "custom_type_cache.h" #include "debug_assert.h" #include "debug_point.h" -#include "deltadelta.h" -#include "dictionary.h" -#include "expression_utils.h" -#include "gorilla.h" #include "guc.h" -#include "indexing.h" #include "nodes/chunk_dispatch/chunk_insert_state.h" -#include "nodes/hypertable_modify.h" #include "segment_meta.h" #include "ts_catalog/array_utils.h" #include "ts_catalog/catalog.h" #include "ts_catalog/compression_chunk_size.h" #include "ts_catalog/compression_settings.h" -#include "wal_utils.h" StaticAssertDecl(GLOBAL_MAX_ROWS_PER_COMPRESSION >= TARGET_COMPRESSED_BATCH_SIZE, "max row numbers must be harmonized"); @@ -127,16 +94,7 @@ static void row_compressor_append_row(RowCompressor *row_compressor, TupleTableS static void row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool changed_groups); -static int create_segment_filter_scankey(Relation in_rel, char *segment_filter_col_name, - StrategyNumber strategy, Oid subtype, - ScanKeyData *scankeys, int num_scankeys, - Bitmapset **null_columns, Datum value, bool is_null_check, - bool is_array_op); static void create_per_compressed_column(RowDecompressor *decompressor); -static ScanKeyData *build_scankeys_for_uncompressed(Oid ht_relid, CompressionSettings *settings, - Relation out_rel, Bitmapset *key_columns, - TupleTableSlot *slot, int *num_scankeys); -static bool batch_matches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys); /******************** ** compress_chunk ** @@ -1589,7 +1547,7 @@ create_per_compressed_column(RowDecompressor *decompressor) * Decompresses the current compressed batch into decompressed_slots, and returns * the number of rows in batch. */ -static int +int decompress_batch(RowDecompressor *decompressor) { if (decompressor->unprocessed_tuples) @@ -2024,1719 +1982,9 @@ compression_get_default_algorithm(Oid typeoid) } } -/* - * Build scankeys for decompression of specific batches. key_columns references the - * columns of the uncompressed chunk. - */ -static ScanKeyData * -build_heap_scankeys(Oid hypertable_relid, Relation in_rel, Relation out_rel, - CompressionSettings *settings, Bitmapset *key_columns, Bitmapset **null_columns, - TupleTableSlot *slot, int *num_scankeys) -{ - int key_index = 0; - ScanKeyData *scankeys = NULL; - - if (!bms_is_empty(key_columns)) - { - scankeys = palloc0(bms_num_members(key_columns) * 2 * sizeof(ScanKeyData)); - int i = -1; - while ((i = bms_next_member(key_columns, i)) > 0) - { - AttrNumber attno = i + FirstLowInvalidHeapAttributeNumber; - char *attname = get_attname(out_rel->rd_id, attno, false); - bool isnull; - AttrNumber ht_attno = get_attnum(hypertable_relid, attname); - - /* - * This is a not very precise but easy assertion to detect attno - * mismatch at least in some cases. The mismatch might happen if the - * hypertable and chunk layout are different because of dropped - * columns, and we're using a wrong slot type here. - */ - PG_USED_FOR_ASSERTS_ONLY Oid ht_atttype = get_atttype(hypertable_relid, ht_attno); - PG_USED_FOR_ASSERTS_ONLY Oid slot_atttype = - slot->tts_tupleDescriptor->attrs[AttrNumberGetAttrOffset(ht_attno)].atttypid; - Assert(ht_atttype == slot_atttype); - - Datum value = slot_getattr(slot, ht_attno, &isnull); - - /* - * There are 3 possible scenarios we have to consider - * when dealing with columns which are part of unique - * constraints. - * - * 1. Column is segmentby-Column - * In this case we can add a single ScanKey with an - * equality check for the value. - * 2. Column is orderby-Column - * In this we can add 2 ScanKeys with range constraints - * utilizing batch metadata. - * 3. Column is neither segmentby nor orderby - * In this case we cannot utilize this column for - * batch filtering as the values are compressed and - * we have no metadata. - */ - if (ts_array_is_member(settings->fd.segmentby, attname)) - { - key_index = create_segment_filter_scankey(in_rel, - attname, - BTEqualStrategyNumber, - InvalidOid, - scankeys, - key_index, - null_columns, - value, - isnull, - false); - } - if (ts_array_is_member(settings->fd.orderby, attname)) - { - /* Cannot optimize orderby columns with NULL values since those - * are not visible in metadata - */ - if (isnull) - continue; - - int16 index = ts_array_position(settings->fd.orderby, attname); - - key_index = create_segment_filter_scankey(in_rel, - column_segment_min_name(index), - BTLessEqualStrategyNumber, - InvalidOid, - scankeys, - key_index, - null_columns, - value, - false, - false); /* is_null_check */ - key_index = create_segment_filter_scankey(in_rel, - column_segment_max_name(index), - BTGreaterEqualStrategyNumber, - InvalidOid, - scankeys, - key_index, - null_columns, - value, - false, - false); /* is_null_check */ - } - } - } - - *num_scankeys = key_index; - return scankeys; -} - -static int -create_segment_filter_scankey(Relation in_rel, char *segment_filter_col_name, - StrategyNumber strategy, Oid subtype, ScanKeyData *scankeys, - int num_scankeys, Bitmapset **null_columns, Datum value, - bool is_null_check, bool is_array_op) -{ - AttrNumber cmp_attno = get_attnum(in_rel->rd_id, segment_filter_col_name); - Assert(cmp_attno != InvalidAttrNumber); - /* This should never happen but if it does happen, we can't generate a scan key for - * the filter column so just skip it */ - if (cmp_attno == InvalidAttrNumber) - return num_scankeys; - - int flags = is_array_op ? SK_SEARCHARRAY : 0; - - /* - * In PG versions <= 14 NULL values are always considered distinct - * from other NULL values and therefore NULLABLE multi-columnn - * unique constraints might expose unexpected behaviour in the - * presence of NULL values. - * Since SK_SEARCHNULL is not supported by heap scans we cannot - * build a ScanKey for NOT NULL and instead have to do those - * checks manually. - */ - if (is_null_check) - { - *null_columns = bms_add_member(*null_columns, cmp_attno); - return num_scankeys; - } - - Oid atttypid = in_rel->rd_att->attrs[AttrNumberGetAttrOffset(cmp_attno)].atttypid; - - TypeCacheEntry *tce = lookup_type_cache(atttypid, TYPECACHE_BTREE_OPFAMILY); - if (!OidIsValid(tce->btree_opf)) - elog(ERROR, "no btree opfamily for type \"%s\"", format_type_be(atttypid)); - - Oid opr = get_opfamily_member(tce->btree_opf, atttypid, atttypid, strategy); - - /* - * Fall back to btree operator input type when it is binary compatible with - * the column type and no operator for column type could be found. - */ - if (!OidIsValid(opr) && IsBinaryCoercible(atttypid, tce->btree_opintype)) - { - opr = - get_opfamily_member(tce->btree_opf, tce->btree_opintype, tce->btree_opintype, strategy); - } - - /* No operator could be found so we can't create the scankey. */ - if (!OidIsValid(opr)) - return num_scankeys; - - opr = get_opcode(opr); - Assert(OidIsValid(opr)); - /* We should never end up here but: no opcode, no optimization */ - if (!OidIsValid(opr)) - return num_scankeys; - - ScanKeyEntryInitialize(&scankeys[num_scankeys++], - flags, - cmp_attno, - strategy, - subtype, - in_rel->rd_att->attrs[AttrNumberGetAttrOffset(cmp_attno)].attcollation, - opr, - value); - - return num_scankeys; -} - -/* - * For insert into compressed chunks with unique index determine the - * columns which can be used for INSERT batch filtering. - * The passed in relation is the uncompressed chunk. - * - * In case of multiple unique indexes we have to return the shared columns. - * For expression indexes we ignore the columns with expressions, for partial - * indexes we ignore predicate. - */ -static Bitmapset * -compressed_insert_key_columns(Relation relation) -{ - Bitmapset *shared_attrs = NULL; /* indexed columns */ - ListCell *l; - - /* Fast path if definitely no indexes */ - if (!RelationGetForm(relation)->relhasindex) - return NULL; - - List *indexoidlist = RelationGetIndexList(relation); - - /* Fall out if no indexes (but relhasindex was set) */ - if (indexoidlist == NIL) - return NULL; - - foreach (l, indexoidlist) - { - Oid indexOid = lfirst_oid(l); - Relation indexDesc = index_open(indexOid, AccessShareLock); - - /* - * We are only interested in unique indexes. PRIMARY KEY indexes also have - * indisunique set to true so we do not need to check for them separately. - */ - if (!indexDesc->rd_index->indislive || !indexDesc->rd_index->indisvalid || - !indexDesc->rd_index->indisunique) - { - index_close(indexDesc, AccessShareLock); - continue; - } - - Bitmapset *idx_attrs = NULL; - /* - * Collect attributes of current index. - * For covering indexes we need to ignore the included columns. - */ - for (int i = 0; i < indexDesc->rd_index->indnkeyatts; i++) - { - int attrnum = indexDesc->rd_index->indkey.values[i]; - /* We are not interested in expression columns which will have attrnum = 0 */ - if (!attrnum) - continue; - - idx_attrs = bms_add_member(idx_attrs, attrnum - FirstLowInvalidHeapAttributeNumber); - } - index_close(indexDesc, AccessShareLock); - - shared_attrs = shared_attrs ? bms_intersect(idx_attrs, shared_attrs) : idx_attrs; - - if (!shared_attrs) - return NULL; - } - - return shared_attrs; -} - -/* This method is used to find matching index on compressed chunk - * and build scan keys from the slot data - */ -static ScanKeyData * -build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation out_rel, - Bitmapset *key_columns, TupleTableSlot *slot, - Relation *result_index_rel, Bitmapset **index_columns, - int *num_scan_keys) -{ - List *index_oids; - ListCell *lc; - ScanKeyData *scankeys = NULL; - /* get list of indexes defined on compressed chunk */ - index_oids = RelationGetIndexList(in_rel); - *num_scan_keys = 0; - - foreach (lc, index_oids) - { - Relation index_rel = index_open(lfirst_oid(lc), AccessShareLock); - IndexInfo *index_info = BuildIndexInfo(index_rel); - - /* Can't use partial or expression indexes */ - if (index_info->ii_Predicate != NIL || index_info->ii_Expressions != NIL) - { - index_close(index_rel, AccessShareLock); - continue; - } - - /* Can only use Btree indexes */ - if (index_info->ii_Am != BTREE_AM_OID) - { - index_close(index_rel, AccessShareLock); - continue; - } - - /* - * Must have at least two attributes, index we are looking for contains - * at least one segmentby column and a sequence number. - */ - if (index_rel->rd_index->indnatts < 2) - { - index_close(index_rel, AccessShareLock); - continue; - } - - scankeys = palloc0((index_rel->rd_index->indnatts) * sizeof(ScanKeyData)); - - /* - * Using only key attributes to exclude covering columns - * only interested in filtering here - */ - for (int i = 0; i < index_rel->rd_index->indnkeyatts; i++) - { - AttrNumber idx_attnum = AttrOffsetGetAttrNumber(i); - AttrNumber in_attnum = index_rel->rd_index->indkey.values[i]; - const NameData *attname = attnumAttName(in_rel, in_attnum); - - /* Make sure we find columns in key columns in order to select the right index */ - if (!bms_is_member(get_attnum(out_rel->rd_id, NameStr(*attname)), key_columns)) - { - break; - } - - bool isnull; - AttrNumber ht_attno = get_attnum(hypertable_relid, NameStr(*attname)); - Datum value = slot_getattr(slot, ht_attno, &isnull); - - if (isnull) - { - ScanKeyEntryInitialize(&scankeys[(*num_scan_keys)++], - SK_ISNULL | SK_SEARCHNULL, - idx_attnum, - InvalidStrategy, /* no strategy */ - InvalidOid, /* no strategy subtype */ - InvalidOid, /* no collation */ - InvalidOid, /* no reg proc for this */ - (Datum) 0); /* constant */ - continue; - } - - Oid atttypid = attnumTypeId(index_rel, idx_attnum); - - TypeCacheEntry *tce = lookup_type_cache(atttypid, TYPECACHE_BTREE_OPFAMILY); - if (!OidIsValid(tce->btree_opf)) - elog(ERROR, "no btree opfamily for type \"%s\"", format_type_be(atttypid)); - - Oid opr = - get_opfamily_member(tce->btree_opf, atttypid, atttypid, BTEqualStrategyNumber); - - /* - * Fall back to btree operator input type when it is binary compatible with - * the column type and no operator for column type could be found. - */ - if (!OidIsValid(opr) && IsBinaryCoercible(atttypid, tce->btree_opintype)) - { - opr = get_opfamily_member(tce->btree_opf, - tce->btree_opintype, - tce->btree_opintype, - BTEqualStrategyNumber); - } - - /* No operator could be found so we can't create the scankey. */ - if (!OidIsValid(opr)) - continue; - - Oid opcode = get_opcode(opr); - Ensure(OidIsValid(opcode), - "no opcode found for column operator of a hypertable column"); - - ScanKeyEntryInitialize(&scankeys[(*num_scan_keys)++], - 0, /* flags */ - idx_attnum, - BTEqualStrategyNumber, - InvalidOid, /* No strategy subtype. */ - attnumCollationId(index_rel, idx_attnum), - opcode, - value); - } - - if (*num_scan_keys > 0) - { - *result_index_rel = index_rel; - break; - } - else - { - index_close(index_rel, AccessShareLock); - pfree(scankeys); - scankeys = NULL; - } - } - - return scankeys; -} - -static void -report_error(TM_Result result) -{ - switch (result) - { - case TM_Deleted: - { - if (IsolationUsesXactSnapshot()) - { - /* For Repeatable Read isolation level report error */ - ereport(ERROR, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("could not serialize access due to concurrent update"))); - } - } - break; - /* - * If another transaction is updating the compressed data, - * we have to abort the transaction to keep consistency. - */ - case TM_Updated: - { - elog(ERROR, "tuple concurrently updated"); - } - break; - case TM_Invisible: - { - elog(ERROR, "attempted to lock invisible tuple"); - } - break; - case TM_Ok: - break; - default: - { - elog(ERROR, "unexpected tuple operation result: %d", result); - } - break; - } -} - -static inline TM_Result -delete_compressed_tuple(RowDecompressor *decompressor, Snapshot snapshot, - HeapTuple compressed_tuple) -{ - TM_FailureData tmfd; - TM_Result result; - result = table_tuple_delete(decompressor->in_rel, - &compressed_tuple->t_self, - decompressor->mycid, - snapshot, - InvalidSnapshot, - true, - &tmfd, - false); - return result; -} - -struct decompress_batches_stats -{ - int64 batches_decompressed; - int64 tuples_decompressed; -}; - -/* - * This method will: - * 1.Scan the index created with SEGMENT BY columns. - * 2.Fetch matching rows and decompress the row - * 3.insert decompressed rows to uncompressed chunk - * 4.delete this row from compressed chunk - * - * Returns whether we decompressed anything. - */ -static struct decompress_batches_stats -decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_rel, - Snapshot snapshot, ScanKeyData *index_scankeys, int num_index_scankeys, - ScanKeyData *heap_scankeys, int num_heap_scankeys, - ScanKeyData *mem_scankeys, int num_mem_scankeys, - Bitmapset *null_columns, List *is_nulls) -{ - HeapTuple compressed_tuple; - RowDecompressor decompressor; - bool decompressor_initialized = false; - bool valid = false; - int num_segmentby_filtered_rows = 0; - int num_heap_filtered_rows = 0; - - struct decompress_batches_stats stats = { - .batches_decompressed = 0, - .tuples_decompressed = 0, - }; - - /* TODO: Optimization by reusing the index scan while working on a single chunk */ - IndexScanDesc scan = index_beginscan(in_rel, index_rel, snapshot, num_index_scankeys, 0); - TupleTableSlot *slot = table_slot_create(in_rel, NULL); - index_rescan(scan, index_scankeys, num_index_scankeys, NULL, 0); - while (index_getnext_slot(scan, ForwardScanDirection, slot)) - { - TM_Result result; - /* Deconstruct the tuple */ - Assert(slot->tts_ops->get_heap_tuple); - compressed_tuple = slot->tts_ops->get_heap_tuple(slot); - num_segmentby_filtered_rows++; - if (num_heap_scankeys) - { - /* filter tuple based on compress_orderby columns */ - valid = false; -#if PG16_LT - HeapKeyTest(compressed_tuple, - RelationGetDescr(in_rel), - num_heap_scankeys, - heap_scankeys, - valid); -#else - valid = HeapKeyTest(compressed_tuple, - RelationGetDescr(in_rel), - num_heap_scankeys, - heap_scankeys); -#endif - if (!valid) - { - num_heap_filtered_rows++; - continue; - } - } - - int attrno = bms_next_member(null_columns, -1); - int pos = 0; - bool is_null_condition = 0; - bool seg_col_is_null = false; - valid = true; - for (; attrno >= 0; attrno = bms_next_member(null_columns, attrno)) - { - is_null_condition = list_nth_int(is_nulls, pos); - seg_col_is_null = slot_attisnull(slot, attrno); - if ((seg_col_is_null && !is_null_condition) || (!seg_col_is_null && is_null_condition)) - { - /* - * if segment by column in the scanned tuple has non null value - * and IS NULL is specified, OR segment by column has null value - * and IS NOT NULL is specified then skip this tuple - */ - valid = false; - break; - } - pos++; - } - if (!valid) - { - num_heap_filtered_rows++; - continue; - } - - if (!decompressor_initialized) - { - decompressor = build_decompressor(in_rel, out_rel); - decompressor_initialized = true; - } - - heap_deform_tuple(compressed_tuple, - decompressor.in_desc, - decompressor.compressed_datums, - decompressor.compressed_is_nulls); - - write_logical_replication_msg_decompression_start(); - result = delete_compressed_tuple(&decompressor, snapshot, compressed_tuple); - /* skip reporting error if isolation level is < Repeatable Read - * since somebody decompressed the data concurrently, we need to take - * that data into account as well when in Read Committed level - */ - if (result == TM_Deleted && !IsolationUsesXactSnapshot()) - { - write_logical_replication_msg_decompression_end(); - stats.batches_decompressed++; - continue; - } - if (result != TM_Ok) - { - write_logical_replication_msg_decompression_end(); - row_decompressor_close(&decompressor); - index_endscan(scan); - index_close(index_rel, AccessShareLock); - report_error(result); - return stats; - } - stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor); - stats.batches_decompressed++; - write_logical_replication_msg_decompression_end(); - } - - if (ts_guc_debug_compression_path_info) - { - elog(INFO, - "Number of compressed rows fetched from index: %d. " - "Number of compressed rows filtered by heap filters: %d.", - num_segmentby_filtered_rows, - num_heap_filtered_rows); - } - - ExecDropSingleTupleTableSlot(slot); - index_endscan(scan); - if (decompressor_initialized) - { - row_decompressor_close(&decompressor); - } - CommandCounterIncrement(); - return stats; -} -/* - * This method will: - * 1.scan compressed chunk - * 2.decompress the row - * 3.delete this row from compressed chunk - * 4.insert decompressed rows to uncompressed chunk - * - * Return value: - * if all 4 steps defined above pass return whether we decompressed anything. - * if step 4 fails return false. Step 3 will fail if there are conflicting concurrent operations on - * same chunk. - */ -static struct decompress_batches_stats -decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot, - ScanKeyData *scankeys, int num_scankeys, ScanKeyData *mem_scankeys, - int num_mem_scankeys, Bitmapset *null_columns, List *is_nulls) -{ - RowDecompressor decompressor; - bool decompressor_initialized = false; - - TupleTableSlot *slot = table_slot_create(in_rel, NULL); - TableScanDesc scan = table_beginscan(in_rel, snapshot, num_scankeys, scankeys); - int num_scanned_rows = 0; - int num_filtered_rows = 0; - struct decompress_batches_stats stats = { - .batches_decompressed = 0, - .tuples_decompressed = 0, - }; - - while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) - { - num_scanned_rows++; - bool skip_tuple = false; - int attrno = bms_next_member(null_columns, -1); - int pos = 0; - bool is_null_condition = 0; - bool seg_col_is_null = false; - /* - * Since the heap scan API does not support SK_SEARCHNULL we have to check - * for NULL values manually when those are part of the constraints. - */ - for (; attrno >= 0; attrno = bms_next_member(null_columns, attrno)) - { - /* Treat all conditions as IS NULL if the list is empty */ - is_null_condition = is_nulls == NIL || list_nth_int(is_nulls, pos); - seg_col_is_null = slot_attisnull(slot, attrno); - if ((seg_col_is_null && !is_null_condition) || (!seg_col_is_null && is_null_condition)) - { - /* - * if segment by column in the scanned tuple has non null value - * and IS NULL is specified, OR segment by column has null value - * and IS NOT NULL is specified then skip this tuple - */ - skip_tuple = true; - break; - } - pos++; - } - if (skip_tuple) - { - num_filtered_rows++; - continue; - } - - TM_Result result; - Assert(slot->tts_ops->get_heap_tuple); - HeapTuple compressed_tuple = slot->tts_ops->get_heap_tuple(slot); - - if (!decompressor_initialized) - { - decompressor = build_decompressor(in_rel, out_rel); - decompressor_initialized = true; - } - - heap_deform_tuple(compressed_tuple, - decompressor.in_desc, - decompressor.compressed_datums, - decompressor.compressed_is_nulls); - - if (num_mem_scankeys && !batch_matches(&decompressor, mem_scankeys, num_mem_scankeys)) - { - row_decompressor_reset(&decompressor); - continue; - } - - write_logical_replication_msg_decompression_start(); - result = delete_compressed_tuple(&decompressor, snapshot, compressed_tuple); - /* skip reporting error if isolation level is < Repeatable Read - * since somebody decompressed the data concurrently, we need to take - * that data into account as well when in Read Committed level - */ - if (result == TM_Deleted && !IsolationUsesXactSnapshot()) - { - write_logical_replication_msg_decompression_end(); - stats.batches_decompressed++; - continue; - } - if (result != TM_Ok) - { - table_endscan(scan); - report_error(result); - } - stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor); - stats.batches_decompressed++; - write_logical_replication_msg_decompression_end(); - } - if (scankeys) - pfree(scankeys); - ExecDropSingleTupleTableSlot(slot); - table_endscan(scan); - if (decompressor_initialized) - { - row_decompressor_close(&decompressor); - } - - if (ts_guc_debug_compression_path_info) - { - elog(INFO, - "Number of compressed rows fetched from table scan: %d. " - "Number of compressed rows filtered: %d.", - num_scanned_rows, - num_filtered_rows); - } - return stats; -} - -void -decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot) -{ - /* - * This is supposed to be called with the actual tuple that is being - * inserted, so it cannot be empty. - */ - Assert(!TTS_EMPTY(slot)); - - Relation out_rel = cis->rel; - - if (!ts_indexing_relation_has_primary_or_unique_index(out_rel)) - { - /* - * If there are no unique constraints there is nothing to do here. - */ - return; - } - - if (!ts_guc_enable_dml_decompression) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("inserting into compressed chunk with unique constraints disabled"), - errhint("Set timescaledb.enable_dml_decompression to TRUE."))); - - Assert(OidIsValid(cis->compressed_chunk_table_id)); - Relation in_rel = relation_open(cis->compressed_chunk_table_id, RowExclusiveLock); - CompressionSettings *settings = ts_compression_settings_get(cis->compressed_chunk_table_id); - Assert(settings); - - Bitmapset *key_columns = compressed_insert_key_columns(out_rel); - Bitmapset *index_columns = NULL; - Bitmapset *null_columns = NULL; - struct decompress_batches_stats stats; - - /* the scan keys used for in memory tests of the decompressed tuples */ - int num_mem_scankeys; - ScanKeyData *mem_scankeys = build_scankeys_for_uncompressed(cis->hypertable_relid, - settings, - out_rel, - key_columns, - slot, - &num_mem_scankeys); - - int num_index_scankeys; - Relation index_rel = NULL; - ScanKeyData *index_scankeys = build_index_scankeys_using_slot(cis->hypertable_relid, - in_rel, - out_rel, - key_columns, - slot, - &index_rel, - &index_columns, - &num_index_scankeys); - - if (index_rel) - { - /* - * Prepare the heap scan keys for all - * key columns not found in the index - */ - key_columns = bms_difference(key_columns, index_columns); - - int num_heap_scankeys; - ScanKeyData *heap_scankeys = build_heap_scankeys(cis->hypertable_relid, - in_rel, - out_rel, - settings, - key_columns, - &null_columns, - slot, - &num_heap_scankeys); - bms_free(key_columns); - - /* - * Using latest snapshot to scan the heap since we are doing this to build - * the index on the uncompressed chunks in order to do speculative insertion - * which is always built from all tuples (even in higher levels of isolation). - */ - stats = decompress_batches_indexscan(in_rel, - out_rel, - index_rel, - GetLatestSnapshot(), - index_scankeys, - num_index_scankeys, - heap_scankeys, - num_heap_scankeys, - mem_scankeys, - num_mem_scankeys, - NULL, /* no null column check for non-segmentby - columns */ - NIL); - index_close(index_rel, AccessShareLock); - } - else - { - int num_heap_scankeys; - ScanKeyData *heap_scankeys = build_heap_scankeys(cis->hypertable_relid, - in_rel, - out_rel, - settings, - key_columns, - &null_columns, - slot, - &num_heap_scankeys); - - stats = decompress_batches_seqscan(in_rel, - out_rel, - GetLatestSnapshot(), - heap_scankeys, - num_heap_scankeys, - mem_scankeys, - num_mem_scankeys, - null_columns, - NIL); - bms_free(key_columns); - } - - Assert(cis->cds != NULL); - cis->cds->batches_decompressed += stats.batches_decompressed; - cis->cds->tuples_decompressed += stats.tuples_decompressed; - - CommandCounterIncrement(); - table_close(in_rel, NoLock); -} - -const CompressionAlgorithmDefinition * -algorithm_definition(CompressionAlgorithm algo) +const CompressionAlgorithmDefinition * +algorithm_definition(CompressionAlgorithm algo) { Assert(algo > 0 && algo < _END_COMPRESSION_ALGORITHMS); return &definitions[algo]; } - -static BatchFilter * -make_batchfilter(char *column_name, StrategyNumber strategy, Oid collation, RegProcedure opcode, - Const *value, bool is_null_check, bool is_null, bool is_array_op) -{ - BatchFilter *segment_filter = palloc0(sizeof(*segment_filter)); - - *segment_filter = (BatchFilter){ - .strategy = strategy, - .collation = collation, - .opcode = opcode, - .value = value, - .is_null_check = is_null_check, - .is_null = is_null, - .is_array_op = is_array_op, - }; - namestrcpy(&segment_filter->column_name, column_name); - - return segment_filter; -} - -/* - * A compressed chunk can have multiple indexes. For a given list - * of columns in index_filters, find the matching index which has - * the most columns based on index_filters and adjust the filters - * if necessary. - * Return matching index if found else return NULL. - * - * Note: This method will find the best matching index based on - * number of filters it matches. If an index matches all the filters, - * it will be chosen. Otherwise, it will try to select the index - * which has most matches. If there are multiple indexes have - * the same number of matches, it will pick the first one it finds. - * For example - * for a given condition like "WHERE X = 10 AND Y = 8" - * if there are multiple indexes like - * 1. index (a,b,c,x) - * 2. index (a,x,y) - * 3. index (x) - * In this case 2nd index is returned. If that one didn't exist, - * it would return the 1st index. - */ -static Relation -find_matching_index(Relation comp_chunk_rel, List **index_filters, List **heap_filters) -{ - List *index_oids; - ListCell *lc; - int total_filters = list_length(*index_filters); - int max_match_count = 0; - Relation result_rel = NULL; - - /* get list of indexes defined on compressed chunk */ - index_oids = RelationGetIndexList(comp_chunk_rel); - foreach (lc, index_oids) - { - int match_count = 0; - Relation index_rel = index_open(lfirst_oid(lc), AccessShareLock); - IndexInfo *index_info = BuildIndexInfo(index_rel); - - /* Can't use partial or expression indexes */ - if (index_info->ii_Predicate != NIL || index_info->ii_Expressions != NIL) - { - index_close(index_rel, AccessShareLock); - continue; - } - - /* Can only use Btree indexes */ - if (index_info->ii_Am != BTREE_AM_OID) - { - index_close(index_rel, AccessShareLock); - continue; - } - - ListCell *li; - foreach (li, *index_filters) - { - for (int i = 0; i < index_rel->rd_index->indnatts; i++) - { - AttrNumber attnum = index_rel->rd_index->indkey.values[i]; - char *attname = get_attname(RelationGetRelid(comp_chunk_rel), attnum, false); - BatchFilter *sf = lfirst(li); - /* ensure column exists in index relation */ - if (!strcmp(attname, NameStr(sf->column_name))) - { - match_count++; - break; - } - } - } - if (match_count == total_filters) - { - /* found index which has all columns specified in WHERE */ - if (result_rel) - index_close(result_rel, AccessShareLock); - if (ts_guc_debug_compression_path_info) - elog(INFO, "Index \"%s\" is used for scan. ", RelationGetRelationName(index_rel)); - return index_rel; - } - - if (match_count > max_match_count) - { - max_match_count = match_count; - result_rel = index_rel; - continue; - } - index_close(index_rel, AccessShareLock); - } - - /* No matching index whatsoever */ - if (!result_rel) - { - *heap_filters = list_concat(*heap_filters, *index_filters); - *index_filters = list_truncate(*index_filters, 0); - return NULL; - } - - /* We found an index which matches partially. - * It can be used but we need to transfer the unmatched - * filters from index_filters to heap filters. - */ - for (int i = 0; i < list_length(*index_filters); i++) - { - BatchFilter *sf = list_nth(*index_filters, i); - bool match = false; - for (int j = 0; j < result_rel->rd_index->indnatts; j++) - { - AttrNumber attnum = result_rel->rd_index->indkey.values[j]; - char *attname = get_attname(RelationGetRelid(comp_chunk_rel), attnum, false); - /* ensure column exists in index relation */ - if (!strcmp(attname, NameStr(sf->column_name))) - { - match = true; - break; - } - } - - if (!match) - { - *heap_filters = lappend(*heap_filters, sf); - *index_filters = list_delete_nth_cell(*index_filters, i); - } - } - if (ts_guc_debug_compression_path_info) - elog(INFO, "Index \"%s\" is used for scan. ", RelationGetRelationName(result_rel)); - return result_rel; -} - -/* - * This method will evaluate the predicates, extract - * left and right operands, check if any of the operands - * can be used for batch filtering and if so, it will - * create a BatchFilter object and add it to the corresponding - * list. - * Any segmentby filter is put into index_filters list other - * filters are put into heap_filters list. - */ -static void -process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, List **heap_filters, - List **index_filters, List **is_null) -{ - ListCell *lc; - /* - * We dont want to forward boundParams from the execution state here - * as we dont want to constify join params in the predicates. - * Constifying JOIN params would not be safe as we don't redo - * this part in rescan. - */ - PlannerGlobal glob = { .boundParams = NULL }; - PlannerInfo root = { .glob = &glob }; - - foreach (lc, predicates) - { - Node *node = copyObject(lfirst(lc)); - Var *var; - Expr *expr; - Oid collation, opno; - RegProcedure opcode; - char *column_name; - - switch (nodeTag(node)) - { - case T_OpExpr: - { - OpExpr *opexpr = castNode(OpExpr, node); - collation = opexpr->inputcollid; - Const *arg_value; - - if (!ts_extract_expr_args(&opexpr->xpr, &var, &expr, &opno, &opcode)) - continue; - - if (!IsA(expr, Const)) - { - expr = (Expr *) estimate_expression_value(&root, (Node *) expr); - - if (!IsA(expr, Const)) - continue; - } - - arg_value = castNode(Const, expr); - - column_name = get_attname(ch->table_id, var->varattno, false); - TypeCacheEntry *tce = lookup_type_cache(var->vartype, TYPECACHE_BTREE_OPFAMILY); - int op_strategy = get_op_opfamily_strategy(opno, tce->btree_opf); - if (ts_array_is_member(settings->fd.segmentby, column_name)) - { - switch (op_strategy) - { - case BTEqualStrategyNumber: - case BTLessStrategyNumber: - case BTLessEqualStrategyNumber: - case BTGreaterStrategyNumber: - case BTGreaterEqualStrategyNumber: - { - /* save segment by column name and its corresponding value specified in - * WHERE */ - *index_filters = lappend(*index_filters, - make_batchfilter(column_name, - op_strategy, - collation, - opcode, - arg_value, - false, /* is_null_check */ - false, /* is_null */ - false /* is_array_op */ - )); - } - } - continue; - } - - int min_attno = compressed_column_metadata_attno(settings, - ch->table_id, - var->varattno, - settings->fd.relid, - "min"); - int max_attno = compressed_column_metadata_attno(settings, - ch->table_id, - var->varattno, - settings->fd.relid, - "max"); - - if (min_attno != InvalidAttrNumber && max_attno != InvalidAttrNumber) - { - switch (op_strategy) - { - case BTEqualStrategyNumber: - { - /* orderby col = value implies min <= value and max >= value */ - *heap_filters = lappend(*heap_filters, - make_batchfilter(get_attname(settings->fd.relid, - min_attno, - false), - BTLessEqualStrategyNumber, - collation, - opcode, - arg_value, - false, /* is_null_check */ - false, /* is_null */ - false /* is_array_op */ - )); - *heap_filters = lappend(*heap_filters, - make_batchfilter(get_attname(settings->fd.relid, - max_attno, - false), - BTGreaterEqualStrategyNumber, - collation, - opcode, - arg_value, - false, /* is_null_check */ - false, /* is_null */ - false /* is_array_op */ - )); - } - break; - case BTLessStrategyNumber: - case BTLessEqualStrategyNumber: - { - /* orderby col <[=] value implies min <[=] value */ - *heap_filters = lappend(*heap_filters, - make_batchfilter(get_attname(settings->fd.relid, - min_attno, - false), - op_strategy, - collation, - opcode, - arg_value, - false, /* is_null_check */ - false, /* is_null */ - false /* is_array_op */ - )); - } - break; - case BTGreaterStrategyNumber: - case BTGreaterEqualStrategyNumber: - { - /* orderby col >[=] value implies max >[=] value */ - *heap_filters = lappend(*heap_filters, - make_batchfilter(get_attname(settings->fd.relid, - max_attno, - false), - op_strategy, - collation, - opcode, - arg_value, - false, /* is_null_check */ - false, /* is_null */ - false /* is_array_op */ - )); - } - } - } - } - break; - case T_ScalarArrayOpExpr: - { - ScalarArrayOpExpr *sa_expr = castNode(ScalarArrayOpExpr, node); - if (!ts_extract_expr_args(&sa_expr->xpr, &var, &expr, &opno, &opcode)) - continue; - - if (!IsA(expr, Const)) - { - expr = (Expr *) estimate_expression_value(&root, (Node *) expr); - if (!IsA(expr, Const)) - continue; - } - - Const *arg_value = castNode(Const, expr); - collation = sa_expr->inputcollid; - - column_name = get_attname(ch->table_id, var->varattno, false); - TypeCacheEntry *tce = lookup_type_cache(var->vartype, TYPECACHE_BTREE_OPFAMILY); - int op_strategy = get_op_opfamily_strategy(opno, tce->btree_opf); - if (ts_array_is_member(settings->fd.segmentby, column_name)) - { - switch (op_strategy) - { - case BTEqualStrategyNumber: - case BTLessStrategyNumber: - case BTLessEqualStrategyNumber: - case BTGreaterStrategyNumber: - case BTGreaterEqualStrategyNumber: - { - /* save segment by column name and its corresponding value specified in - * WHERE */ - *index_filters = lappend(*index_filters, - make_batchfilter(column_name, - op_strategy, - collation, - opcode, - arg_value, - false, /* is_null_check */ - false, /* is_null */ - true /* is_array_op */ - )); - } - } - continue; - } - - break; - } - case T_NullTest: - { - NullTest *ntest = (NullTest *) node; - if (IsA(ntest->arg, Var)) - { - var = (Var *) ntest->arg; - /* ignore system-defined attributes */ - if (var->varattno <= 0) - continue; - column_name = get_attname(ch->table_id, var->varattno, false); - if (ts_array_is_member(settings->fd.segmentby, column_name)) - { - *index_filters = - lappend(*index_filters, - make_batchfilter(column_name, - InvalidStrategy, - InvalidOid, - InvalidOid, - NULL, - true, /* is_null_check */ - ntest->nulltesttype == IS_NULL, /* is_null */ - false /* is_array_op */ - )); - if (ntest->nulltesttype == IS_NULL) - *is_null = lappend_int(*is_null, 1); - else - *is_null = lappend_int(*is_null, 0); - } - /* We cannot optimize filtering decompression using ORDERBY - * metadata and null check qualifiers. We could possibly do that by checking the - * compressed data in combination with the ORDERBY nulls first setting and - * verifying that the first or last tuple of a segment contains a NULL value. - * This is left for future optimization */ - } - } - break; - default: - break; - } - } -} - -/* - * Get the subtype for an indexscan from the provided filter. We also - * need to handle array constants appropriately. - */ -static Oid -deduce_filter_subtype(BatchFilter *filter, Oid att_typoid) -{ - Oid subtype = InvalidOid; - - if (!filter->value) - return InvalidOid; - - /* - * Check if the filter type is different from the att type. If yes, the - * subtype needs to be set appropriately. - */ - if (att_typoid != filter->value->consttype) - { - /* For an array type get its element type */ - if (filter->is_array_op) - subtype = get_element_type(filter->value->consttype); - else - subtype = filter->value->consttype; - } - - return subtype; -} - -/* - * This method will build scan keys for predicates including - * SEGMENT BY column with attribute number from compressed chunk - * if condition is like = , else - * OUT param null_columns is saved with column attribute number. - */ -static ScanKeyData * -build_update_delete_scankeys(Relation in_rel, List *heap_filters, int *num_scankeys, - Bitmapset **null_columns) -{ - ListCell *lc; - BatchFilter *filter; - int key_index = 0; - - ScanKeyData *scankeys = palloc0(heap_filters->length * sizeof(ScanKeyData)); - - foreach (lc, heap_filters) - { - filter = lfirst(lc); - AttrNumber attno = get_attnum(in_rel->rd_id, NameStr(filter->column_name)); - Oid typoid = get_atttype(in_rel->rd_id, attno); - if (attno == InvalidAttrNumber) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" of relation \"%s\" does not exist", - NameStr(filter->column_name), - RelationGetRelationName(in_rel)))); - - key_index = create_segment_filter_scankey(in_rel, - NameStr(filter->column_name), - filter->strategy, - deduce_filter_subtype(filter, typoid), - scankeys, - key_index, - null_columns, - filter->value ? filter->value->constvalue : 0, - filter->is_null_check, - filter->is_array_op); - } - *num_scankeys = key_index; - return scankeys; -} - -/* - * This method will build scan keys required to do index - * scans on compressed chunks. - */ -static ScanKeyData * -build_index_scankeys(Relation index_rel, List *index_filters, int *num_scankeys) -{ - ListCell *lc; - BatchFilter *filter = NULL; - *num_scankeys = list_length(index_filters); - ScanKeyData *scankey = palloc0(sizeof(ScanKeyData) * (*num_scankeys)); - int idx = 0; - int flags; - - /* Order scankeys based on index attribute order */ - for (int idx_attno = 1; idx_attno <= index_rel->rd_index->indnkeyatts && idx < *num_scankeys; - idx_attno++) - { - AttrNumber attno = index_rel->rd_index->indkey.values[AttrNumberGetAttrOffset(idx_attno)]; - char *attname = get_attname(index_rel->rd_index->indrelid, attno, false); - Oid typoid = attnumTypeId(index_rel, idx_attno); - foreach (lc, index_filters) - { - filter = lfirst(lc); - if (!strcmp(attname, NameStr(filter->column_name))) - { - flags = 0; - if (filter->is_null_check) - { - flags = SK_ISNULL | (filter->is_null ? SK_SEARCHNULL : SK_SEARCHNOTNULL); - } - if (filter->is_array_op) - { - flags |= SK_SEARCHARRAY; - } - - ScanKeyEntryInitialize(&scankey[idx++], - flags, - idx_attno, - filter->strategy, - deduce_filter_subtype(filter, typoid), /* subtype */ - filter->collation, - filter->opcode, - filter->value ? filter->value->constvalue : 0); - break; - } - } - } - - Assert(idx == *num_scankeys); - return scankey; -} - -/* - * This method will: - * 1. Evaluate WHERE clauses and check if SEGMENT BY columns - * are specified or not. - * 2. Build scan keys for SEGMENT BY columns. - * 3. Move scanned rows to staging area. - * 4. Update catalog table to change status of moved chunk. - * - * Returns true if it decompresses any data. - */ -static bool -decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chunk, - List *predicates, EState *estate) -{ - /* process each chunk with its corresponding predicates */ - - List *heap_filters = NIL; - List *index_filters = NIL; - List *is_null = NIL; - ListCell *lc = NULL; - Relation chunk_rel; - Relation comp_chunk_rel; - Relation matching_index_rel = NULL; - Chunk *comp_chunk; - BatchFilter *filter; - - ScanKeyData *scankeys = NULL; - Bitmapset *null_columns = NULL; - int num_scankeys = 0; - ScanKeyData *index_scankeys = NULL; - int num_index_scankeys = 0; - struct decompress_batches_stats stats; - - comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true); - CompressionSettings *settings = ts_compression_settings_get(comp_chunk->table_id); - - process_predicates(chunk, settings, predicates, &heap_filters, &index_filters, &is_null); - - chunk_rel = table_open(chunk->table_id, RowExclusiveLock); - comp_chunk_rel = table_open(comp_chunk->table_id, RowExclusiveLock); - - if (index_filters) - { - matching_index_rel = find_matching_index(comp_chunk_rel, &index_filters, &heap_filters); - } - - if (heap_filters) - { - scankeys = build_update_delete_scankeys(comp_chunk_rel, - heap_filters, - &num_scankeys, - &null_columns); - } - if (matching_index_rel) - { - index_scankeys = - build_index_scankeys(matching_index_rel, index_filters, &num_index_scankeys); - stats = decompress_batches_indexscan(comp_chunk_rel, - chunk_rel, - matching_index_rel, - GetTransactionSnapshot(), - index_scankeys, - num_index_scankeys, - scankeys, - num_scankeys, - NULL, - 0, - null_columns, - is_null); - /* close the selected index */ - index_close(matching_index_rel, AccessShareLock); - } - else - { - stats = decompress_batches_seqscan(comp_chunk_rel, - chunk_rel, - GetTransactionSnapshot(), - scankeys, - num_scankeys, - NULL, - 0, - null_columns, - is_null); - } - - /* - * tuples from compressed chunk has been decompressed and moved - * to staging area, thus mark this chunk as partially compressed - */ - if (stats.batches_decompressed > 0) - ts_chunk_set_partial(chunk); - - table_close(chunk_rel, NoLock); - table_close(comp_chunk_rel, NoLock); - - foreach (lc, heap_filters) - { - filter = lfirst(lc); - pfree(filter); - } - foreach (lc, index_filters) - { - filter = lfirst(lc); - pfree(filter); - } - ht_state->batches_decompressed += stats.batches_decompressed; - ht_state->tuples_decompressed += stats.tuples_decompressed; - - return stats.batches_decompressed > 0; -} - -/* - * Traverse the plan tree to look for Scan nodes on uncompressed chunks. - * Once Scan node is found check if chunk is compressed, if so then - * decompress those segments which match the filter conditions if present. - */ - -struct decompress_chunk_context -{ - List *relids; - HypertableModifyState *ht_state; - /* indicates decompression actually occurred */ - bool batches_decompressed; -}; - -static bool decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx); - -bool -decompress_target_segments(HypertableModifyState *ht_state) -{ - ModifyTableState *ps = - linitial_node(ModifyTableState, castNode(CustomScanState, ht_state)->custom_ps); - - struct decompress_chunk_context ctx = { - .ht_state = ht_state, - .relids = castNode(ModifyTable, ps->ps.plan)->resultRelations, - }; - Assert(ctx.relids); - - decompress_chunk_walker(&ps->ps, &ctx); - return ctx.batches_decompressed; -} - -static bool -decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx) -{ - RangeTblEntry *rte = NULL; - bool needs_decompression = false; - bool should_rescan = false; - bool batches_decompressed = false; - List *predicates = NIL; - Chunk *current_chunk; - if (ps == NULL) - return false; - - switch (nodeTag(ps)) - { - /* Note: IndexOnlyScans will never be selected for target - * tables because system columns are necessary in order to modify the - * data and those columns cannot be a part of the index - */ - case T_IndexScanState: - { - /* Get the index quals on the original table and also include - * any filters that are used for filtering heap tuples - */ - predicates = list_union(((IndexScan *) ps->plan)->indexqualorig, ps->plan->qual); - needs_decompression = true; - break; - } - case T_BitmapHeapScanState: - predicates = list_union(((BitmapHeapScan *) ps->plan)->bitmapqualorig, ps->plan->qual); - needs_decompression = true; - should_rescan = true; - break; - case T_SeqScanState: - case T_SampleScanState: - case T_TidScanState: - case T_TidRangeScanState: - { - predicates = list_copy(ps->plan->qual); - needs_decompression = true; - break; - } - default: - break; - } - if (needs_decompression) - { - /* - * We are only interested in chunk scans of chunks that are the - * target of the DML statement not chunk scan on joined hypertables - * even when it is a self join - */ - int scanrelid = ((Scan *) ps->plan)->scanrelid; - if (list_member_int(ctx->relids, scanrelid)) - { - rte = rt_fetch(scanrelid, ps->state->es_range_table); - current_chunk = ts_chunk_get_by_relid(rte->relid, false); - if (current_chunk && ts_chunk_is_compressed(current_chunk)) - { - if (!ts_guc_enable_dml_decompression) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("UPDATE/DELETE is disabled on compressed chunks"), - errhint("Set timescaledb.enable_dml_decompression to TRUE."))); - - batches_decompressed = decompress_batches_for_update_delete(ctx->ht_state, - current_chunk, - predicates, - ps->state); - ctx->batches_decompressed |= batches_decompressed; - - /* This is a workaround specifically for bitmap heap scans: - * during node initialization, initialize the scan state with the active snapshot - * but since we are inserting data to be modified during the same query, they end up - * missing that data by using a snapshot which doesn't account for this decompressed - * data. To circumvent this issue, we change the internal scan state to use the - * transaction snapshot and execute a rescan so the scan state is set correctly and - * includes the new data. - */ - if (should_rescan) - { - ScanState *ss = ((ScanState *) ps); - if (ss && ss->ss_currentScanDesc) - { - ss->ss_currentScanDesc->rs_snapshot = GetTransactionSnapshot(); - ExecReScan(ps); - } - } - } - } - } - - if (predicates) - pfree(predicates); - - return planstate_tree_walker(ps, decompress_chunk_walker, ctx); -} - -/* - * Build scankeys for decompressed tuple to check if it is part of the batch. - * - * The key_columns are the columns of the uncompressed chunk. - */ -static ScanKeyData * -build_scankeys_for_uncompressed(Oid ht_relid, CompressionSettings *settings, Relation out_rel, - Bitmapset *key_columns, TupleTableSlot *slot, int *num_scankeys) -{ - ScanKeyData *scankeys = NULL; - int key_index = 0; - TupleDesc out_desc = RelationGetDescr(out_rel); - - if (bms_is_empty(key_columns)) - { - *num_scankeys = key_index; - return scankeys; - } - - scankeys = palloc(sizeof(ScanKeyData) * bms_num_members(key_columns)); - - int i = -1; - while ((i = bms_next_member(key_columns, i)) > 0) - { - AttrNumber attno = i + FirstLowInvalidHeapAttributeNumber; - bool isnull; - - /* - * slot has the physical layout of the hypertable, so we need to - * get the attribute number of the hypertable for the column. - */ - char *attname = get_attname(out_rel->rd_id, attno, false); - - /* - * We can skip any segmentby columns here since they have already been - * checked during batch filtering. - */ - if (ts_array_is_member(settings->fd.segmentby, attname)) - { - continue; - } - - AttrNumber ht_attno = get_attnum(ht_relid, attname); - Datum value = slot_getattr(slot, ht_attno, &isnull); - - Oid atttypid = out_desc->attrs[AttrNumberGetAttrOffset(attno)].atttypid; - TypeCacheEntry *tce = lookup_type_cache(atttypid, TYPECACHE_BTREE_OPFAMILY); - - /* - * Should never happen since the column is part of unique constraint - * and should therefore have the required opfamily - */ - if (!OidIsValid(tce->btree_opf)) - elog(ERROR, "no btree opfamily for type \"%s\"", format_type_be(atttypid)); - - Oid opr = get_opfamily_member(tce->btree_opf, atttypid, atttypid, BTEqualStrategyNumber); - - /* - * Fall back to btree operator input type when it is binary compatible with - * the column type and no operator for column type could be found. - */ - if (!OidIsValid(opr) && IsBinaryCoercible(atttypid, tce->btree_opintype)) - { - opr = get_opfamily_member(tce->btree_opf, - tce->btree_opintype, - tce->btree_opintype, - BTEqualStrategyNumber); - } - - if (!OidIsValid(opr)) - elog(ERROR, "no operator found for type \"%s\"", format_type_be(atttypid)); - - ScanKeyEntryInitialize(&scankeys[key_index++], - isnull ? SK_ISNULL | SK_SEARCHNULL : 0, - attno, - BTEqualStrategyNumber, - InvalidOid, - out_desc->attrs[AttrNumberGetAttrOffset(attno)].attcollation, - get_opcode(opr), - isnull ? 0 : value); - } - - *num_scankeys = key_index; - return scankeys; -} - -static bool -batch_matches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys) -{ - int num_tuples = decompress_batch(decompressor); - - bool valid = false; - - for (int row = 0; row < num_tuples; row++) - { - TupleTableSlot *decompressed_slot = decompressor->decompressed_slots[row]; - HeapTuple tuple = decompressed_slot->tts_ops->get_heap_tuple(decompressed_slot); -#if PG16_LT - HeapKeyTest(tuple, decompressor->out_desc, num_scankeys, scankeys, valid); -#else - valid = HeapKeyTest(tuple, decompressor->out_desc, num_scankeys, scankeys); -#endif - if (valid) - { - return true; - } - } - - return false; -} diff --git a/tsl/src/compression/compression.h b/tsl/src/compression/compression.h index d0c88d2ed22..7d400ffe5ed 100644 --- a/tsl/src/compression/compression.h +++ b/tsl/src/compression/compression.h @@ -372,6 +372,7 @@ extern RowDecompressor build_decompressor(Relation in_rel, Relation out_rel); extern void row_decompressor_reset(RowDecompressor *decompressor); extern void row_decompressor_close(RowDecompressor *decompressor); extern enum CompressionAlgorithms compress_get_default_algorithm(Oid typeoid); +extern int decompress_batch(RowDecompressor *decompressor); /* * A convenience macro to throw an error about the corrupted compressed data, if * the argument is false. When fuzzing is enabled, we don't show the message not @@ -406,3 +407,9 @@ consumeCompressedData(StringInfo si, int bytes) #define GLOBAL_MAX_ROWS_PER_COMPRESSION INT16_MAX const CompressionAlgorithmDefinition *algorithm_definition(CompressionAlgorithm algo); + +struct decompress_batches_stats +{ + int64 batches_decompressed; + int64 tuples_decompressed; +}; diff --git a/tsl/src/compression/compression_dml.c b/tsl/src/compression/compression_dml.c new file mode 100644 index 00000000000..00a6943ce65 --- /dev/null +++ b/tsl/src/compression/compression_dml.c @@ -0,0 +1,1241 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static struct decompress_batches_stats +decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_rel, + Snapshot snapshot, ScanKeyData *index_scankeys, int num_index_scankeys, + ScanKeyData *heap_scankeys, int num_heap_scankeys, + ScanKeyData *mem_scankeys, int num_mem_scankeys, + Bitmapset *null_columns, List *is_nulls); +static struct decompress_batches_stats +decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot, + ScanKeyData *scankeys, int num_scankeys, ScanKeyData *mem_scankeys, + int num_mem_scankeys, Bitmapset *null_columns, List *is_nulls); + +static bool batch_matches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys); +static void process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, + List **heap_filters, List **index_filters, List **is_null); +static Relation find_matching_index(Relation comp_chunk_rel, List **index_filters, + List **heap_filters); +static Bitmapset *compressed_insert_key_columns(Relation relation); +static BatchFilter *make_batchfilter(char *column_name, StrategyNumber strategy, Oid collation, + RegProcedure opcode, Const *value, bool is_null_check, + bool is_null, bool is_array_op); +static inline TM_Result delete_compressed_tuple(RowDecompressor *decompressor, Snapshot snapshot, + HeapTuple compressed_tuple); +static void report_error(TM_Result result); + +void +decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot) +{ + /* + * This is supposed to be called with the actual tuple that is being + * inserted, so it cannot be empty. + */ + Assert(!TTS_EMPTY(slot)); + + Relation out_rel = cis->rel; + + if (!ts_indexing_relation_has_primary_or_unique_index(out_rel)) + { + /* + * If there are no unique constraints there is nothing to do here. + */ + return; + } + + if (!ts_guc_enable_dml_decompression) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("inserting into compressed chunk with unique constraints disabled"), + errhint("Set timescaledb.enable_dml_decompression to TRUE."))); + + Assert(OidIsValid(cis->compressed_chunk_table_id)); + Relation in_rel = relation_open(cis->compressed_chunk_table_id, RowExclusiveLock); + CompressionSettings *settings = ts_compression_settings_get(cis->compressed_chunk_table_id); + Assert(settings); + + Bitmapset *key_columns = compressed_insert_key_columns(out_rel); + Bitmapset *index_columns = NULL; + Bitmapset *null_columns = NULL; + struct decompress_batches_stats stats; + + /* the scan keys used for in memory tests of the decompressed tuples */ + int num_mem_scankeys; + ScanKeyData *mem_scankeys = build_scankeys_for_uncompressed(cis->hypertable_relid, + settings, + out_rel, + key_columns, + slot, + &num_mem_scankeys); + + int num_index_scankeys; + Relation index_rel = NULL; + ScanKeyData *index_scankeys = build_index_scankeys_using_slot(cis->hypertable_relid, + in_rel, + out_rel, + key_columns, + slot, + &index_rel, + &index_columns, + &num_index_scankeys); + + if (index_rel) + { + /* + * Prepare the heap scan keys for all + * key columns not found in the index + */ + key_columns = bms_difference(key_columns, index_columns); + + int num_heap_scankeys; + ScanKeyData *heap_scankeys = build_heap_scankeys(cis->hypertable_relid, + in_rel, + out_rel, + settings, + key_columns, + &null_columns, + slot, + &num_heap_scankeys); + bms_free(key_columns); + + /* + * Using latest snapshot to scan the heap since we are doing this to build + * the index on the uncompressed chunks in order to do speculative insertion + * which is always built from all tuples (even in higher levels of isolation). + */ + stats = decompress_batches_indexscan(in_rel, + out_rel, + index_rel, + GetLatestSnapshot(), + index_scankeys, + num_index_scankeys, + heap_scankeys, + num_heap_scankeys, + mem_scankeys, + num_mem_scankeys, + NULL, /* no null column check for non-segmentby + columns */ + NIL); + index_close(index_rel, AccessShareLock); + } + else + { + int num_heap_scankeys; + ScanKeyData *heap_scankeys = build_heap_scankeys(cis->hypertable_relid, + in_rel, + out_rel, + settings, + key_columns, + &null_columns, + slot, + &num_heap_scankeys); + + stats = decompress_batches_seqscan(in_rel, + out_rel, + GetLatestSnapshot(), + heap_scankeys, + num_heap_scankeys, + mem_scankeys, + num_mem_scankeys, + null_columns, + NIL); + bms_free(key_columns); + } + + Assert(cis->cds != NULL); + cis->cds->batches_decompressed += stats.batches_decompressed; + cis->cds->tuples_decompressed += stats.tuples_decompressed; + + CommandCounterIncrement(); + table_close(in_rel, NoLock); +} + +/* + * This method will: + * 1. Evaluate WHERE clauses and check if SEGMENT BY columns + * are specified or not. + * 2. Build scan keys for SEGMENT BY columns. + * 3. Move scanned rows to staging area. + * 4. Update catalog table to change status of moved chunk. + * + * Returns true if it decompresses any data. + */ +static bool +decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chunk, + List *predicates, EState *estate) +{ + /* process each chunk with its corresponding predicates */ + + List *heap_filters = NIL; + List *index_filters = NIL; + List *is_null = NIL; + ListCell *lc = NULL; + Relation chunk_rel; + Relation comp_chunk_rel; + Relation matching_index_rel = NULL; + Chunk *comp_chunk; + BatchFilter *filter; + + ScanKeyData *scankeys = NULL; + Bitmapset *null_columns = NULL; + int num_scankeys = 0; + ScanKeyData *index_scankeys = NULL; + int num_index_scankeys = 0; + struct decompress_batches_stats stats; + + comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true); + CompressionSettings *settings = ts_compression_settings_get(comp_chunk->table_id); + + process_predicates(chunk, settings, predicates, &heap_filters, &index_filters, &is_null); + + chunk_rel = table_open(chunk->table_id, RowExclusiveLock); + comp_chunk_rel = table_open(comp_chunk->table_id, RowExclusiveLock); + + if (index_filters) + { + matching_index_rel = find_matching_index(comp_chunk_rel, &index_filters, &heap_filters); + } + + if (heap_filters) + { + scankeys = build_update_delete_scankeys(comp_chunk_rel, + heap_filters, + &num_scankeys, + &null_columns); + } + if (matching_index_rel) + { + index_scankeys = + build_index_scankeys(matching_index_rel, index_filters, &num_index_scankeys); + stats = decompress_batches_indexscan(comp_chunk_rel, + chunk_rel, + matching_index_rel, + GetTransactionSnapshot(), + index_scankeys, + num_index_scankeys, + scankeys, + num_scankeys, + NULL, + 0, + null_columns, + is_null); + /* close the selected index */ + index_close(matching_index_rel, AccessShareLock); + } + else + { + stats = decompress_batches_seqscan(comp_chunk_rel, + chunk_rel, + GetTransactionSnapshot(), + scankeys, + num_scankeys, + NULL, + 0, + null_columns, + is_null); + } + + /* + * tuples from compressed chunk has been decompressed and moved + * to staging area, thus mark this chunk as partially compressed + */ + if (stats.batches_decompressed > 0) + ts_chunk_set_partial(chunk); + + table_close(chunk_rel, NoLock); + table_close(comp_chunk_rel, NoLock); + + foreach (lc, heap_filters) + { + filter = lfirst(lc); + pfree(filter); + } + foreach (lc, index_filters) + { + filter = lfirst(lc); + pfree(filter); + } + ht_state->batches_decompressed += stats.batches_decompressed; + ht_state->tuples_decompressed += stats.tuples_decompressed; + + return stats.batches_decompressed > 0; +} + +/* + * This method will: + * 1.Scan the index created with SEGMENT BY columns. + * 2.Fetch matching rows and decompress the row + * 3.insert decompressed rows to uncompressed chunk + * 4.delete this row from compressed chunk + * + * Returns whether we decompressed anything. + */ +static struct decompress_batches_stats +decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_rel, + Snapshot snapshot, ScanKeyData *index_scankeys, int num_index_scankeys, + ScanKeyData *heap_scankeys, int num_heap_scankeys, + ScanKeyData *mem_scankeys, int num_mem_scankeys, + Bitmapset *null_columns, List *is_nulls) +{ + HeapTuple compressed_tuple; + RowDecompressor decompressor; + bool decompressor_initialized = false; + bool valid = false; + int num_segmentby_filtered_rows = 0; + int num_heap_filtered_rows = 0; + + struct decompress_batches_stats stats = { + .batches_decompressed = 0, + .tuples_decompressed = 0, + }; + + /* TODO: Optimization by reusing the index scan while working on a single chunk */ + IndexScanDesc scan = index_beginscan(in_rel, index_rel, snapshot, num_index_scankeys, 0); + TupleTableSlot *slot = table_slot_create(in_rel, NULL); + index_rescan(scan, index_scankeys, num_index_scankeys, NULL, 0); + while (index_getnext_slot(scan, ForwardScanDirection, slot)) + { + TM_Result result; + /* Deconstruct the tuple */ + Assert(slot->tts_ops->get_heap_tuple); + compressed_tuple = slot->tts_ops->get_heap_tuple(slot); + num_segmentby_filtered_rows++; + if (num_heap_scankeys) + { + /* filter tuple based on compress_orderby columns */ + valid = false; +#if PG16_LT + HeapKeyTest(compressed_tuple, + RelationGetDescr(in_rel), + num_heap_scankeys, + heap_scankeys, + valid); +#else + valid = HeapKeyTest(compressed_tuple, + RelationGetDescr(in_rel), + num_heap_scankeys, + heap_scankeys); +#endif + if (!valid) + { + num_heap_filtered_rows++; + continue; + } + } + + int attrno = bms_next_member(null_columns, -1); + int pos = 0; + bool is_null_condition = 0; + bool seg_col_is_null = false; + valid = true; + for (; attrno >= 0; attrno = bms_next_member(null_columns, attrno)) + { + is_null_condition = list_nth_int(is_nulls, pos); + seg_col_is_null = slot_attisnull(slot, attrno); + if ((seg_col_is_null && !is_null_condition) || (!seg_col_is_null && is_null_condition)) + { + /* + * if segment by column in the scanned tuple has non null value + * and IS NULL is specified, OR segment by column has null value + * and IS NOT NULL is specified then skip this tuple + */ + valid = false; + break; + } + pos++; + } + if (!valid) + { + num_heap_filtered_rows++; + continue; + } + + if (!decompressor_initialized) + { + decompressor = build_decompressor(in_rel, out_rel); + decompressor_initialized = true; + } + + heap_deform_tuple(compressed_tuple, + decompressor.in_desc, + decompressor.compressed_datums, + decompressor.compressed_is_nulls); + + write_logical_replication_msg_decompression_start(); + result = delete_compressed_tuple(&decompressor, snapshot, compressed_tuple); + /* skip reporting error if isolation level is < Repeatable Read + * since somebody decompressed the data concurrently, we need to take + * that data into account as well when in Read Committed level + */ + if (result == TM_Deleted && !IsolationUsesXactSnapshot()) + { + write_logical_replication_msg_decompression_end(); + stats.batches_decompressed++; + continue; + } + if (result != TM_Ok) + { + write_logical_replication_msg_decompression_end(); + row_decompressor_close(&decompressor); + index_endscan(scan); + index_close(index_rel, AccessShareLock); + report_error(result); + return stats; + } + stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor); + stats.batches_decompressed++; + write_logical_replication_msg_decompression_end(); + } + + if (ts_guc_debug_compression_path_info) + { + elog(INFO, + "Number of compressed rows fetched from index: %d. " + "Number of compressed rows filtered by heap filters: %d.", + num_segmentby_filtered_rows, + num_heap_filtered_rows); + } + + ExecDropSingleTupleTableSlot(slot); + index_endscan(scan); + if (decompressor_initialized) + { + row_decompressor_close(&decompressor); + } + CommandCounterIncrement(); + return stats; +} + +/* + * This method will: + * 1.scan compressed chunk + * 2.decompress the row + * 3.delete this row from compressed chunk + * 4.insert decompressed rows to uncompressed chunk + * + * Return value: + * if all 4 steps defined above pass return whether we decompressed anything. + * if step 4 fails return false. Step 3 will fail if there are conflicting concurrent operations on + * same chunk. + */ +static struct decompress_batches_stats +decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot, + ScanKeyData *scankeys, int num_scankeys, ScanKeyData *mem_scankeys, + int num_mem_scankeys, Bitmapset *null_columns, List *is_nulls) +{ + RowDecompressor decompressor; + bool decompressor_initialized = false; + + TupleTableSlot *slot = table_slot_create(in_rel, NULL); + TableScanDesc scan = table_beginscan(in_rel, snapshot, num_scankeys, scankeys); + int num_scanned_rows = 0; + int num_filtered_rows = 0; + struct decompress_batches_stats stats = { + .batches_decompressed = 0, + .tuples_decompressed = 0, + }; + + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) + { + num_scanned_rows++; + bool skip_tuple = false; + int attrno = bms_next_member(null_columns, -1); + int pos = 0; + bool is_null_condition = 0; + bool seg_col_is_null = false; + /* + * Since the heap scan API does not support SK_SEARCHNULL we have to check + * for NULL values manually when those are part of the constraints. + */ + for (; attrno >= 0; attrno = bms_next_member(null_columns, attrno)) + { + /* Treat all conditions as IS NULL if the list is empty */ + is_null_condition = is_nulls == NIL || list_nth_int(is_nulls, pos); + seg_col_is_null = slot_attisnull(slot, attrno); + if ((seg_col_is_null && !is_null_condition) || (!seg_col_is_null && is_null_condition)) + { + /* + * if segment by column in the scanned tuple has non null value + * and IS NULL is specified, OR segment by column has null value + * and IS NOT NULL is specified then skip this tuple + */ + skip_tuple = true; + break; + } + pos++; + } + if (skip_tuple) + { + num_filtered_rows++; + continue; + } + + TM_Result result; + Assert(slot->tts_ops->get_heap_tuple); + HeapTuple compressed_tuple = slot->tts_ops->get_heap_tuple(slot); + + if (!decompressor_initialized) + { + decompressor = build_decompressor(in_rel, out_rel); + decompressor_initialized = true; + } + + heap_deform_tuple(compressed_tuple, + decompressor.in_desc, + decompressor.compressed_datums, + decompressor.compressed_is_nulls); + + if (num_mem_scankeys && !batch_matches(&decompressor, mem_scankeys, num_mem_scankeys)) + { + row_decompressor_reset(&decompressor); + continue; + } + + write_logical_replication_msg_decompression_start(); + result = delete_compressed_tuple(&decompressor, snapshot, compressed_tuple); + /* skip reporting error if isolation level is < Repeatable Read + * since somebody decompressed the data concurrently, we need to take + * that data into account as well when in Read Committed level + */ + if (result == TM_Deleted && !IsolationUsesXactSnapshot()) + { + write_logical_replication_msg_decompression_end(); + stats.batches_decompressed++; + continue; + } + if (result != TM_Ok) + { + table_endscan(scan); + report_error(result); + } + stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor); + stats.batches_decompressed++; + write_logical_replication_msg_decompression_end(); + } + if (scankeys) + pfree(scankeys); + ExecDropSingleTupleTableSlot(slot); + table_endscan(scan); + if (decompressor_initialized) + { + row_decompressor_close(&decompressor); + } + + if (ts_guc_debug_compression_path_info) + { + elog(INFO, + "Number of compressed rows fetched from table scan: %d. " + "Number of compressed rows filtered: %d.", + num_scanned_rows, + num_filtered_rows); + } + return stats; +} + +static bool +batch_matches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys) +{ + int num_tuples = decompress_batch(decompressor); + + bool valid = false; + + for (int row = 0; row < num_tuples; row++) + { + TupleTableSlot *decompressed_slot = decompressor->decompressed_slots[row]; + HeapTuple tuple = decompressed_slot->tts_ops->get_heap_tuple(decompressed_slot); +#if PG16_LT + HeapKeyTest(tuple, decompressor->out_desc, num_scankeys, scankeys, valid); +#else + valid = HeapKeyTest(tuple, decompressor->out_desc, num_scankeys, scankeys); +#endif + if (valid) + { + return true; + } + } + + return false; +} + +/* + * Traverse the plan tree to look for Scan nodes on uncompressed chunks. + * Once Scan node is found check if chunk is compressed, if so then + * decompress those segments which match the filter conditions if present. + */ + +struct decompress_chunk_context +{ + List *relids; + HypertableModifyState *ht_state; + /* indicates decompression actually occurred */ + bool batches_decompressed; +}; + +static bool decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx); + +bool +decompress_target_segments(HypertableModifyState *ht_state) +{ + ModifyTableState *ps = + linitial_node(ModifyTableState, castNode(CustomScanState, ht_state)->custom_ps); + + struct decompress_chunk_context ctx = { + .ht_state = ht_state, + .relids = castNode(ModifyTable, ps->ps.plan)->resultRelations, + }; + Assert(ctx.relids); + + decompress_chunk_walker(&ps->ps, &ctx); + return ctx.batches_decompressed; +} + +static bool +decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx) +{ + RangeTblEntry *rte = NULL; + bool needs_decompression = false; + bool should_rescan = false; + bool batches_decompressed = false; + List *predicates = NIL; + Chunk *current_chunk; + if (ps == NULL) + return false; + + switch (nodeTag(ps)) + { + /* Note: IndexOnlyScans will never be selected for target + * tables because system columns are necessary in order to modify the + * data and those columns cannot be a part of the index + */ + case T_IndexScanState: + { + /* Get the index quals on the original table and also include + * any filters that are used for filtering heap tuples + */ + predicates = list_union(((IndexScan *) ps->plan)->indexqualorig, ps->plan->qual); + needs_decompression = true; + break; + } + case T_BitmapHeapScanState: + predicates = list_union(((BitmapHeapScan *) ps->plan)->bitmapqualorig, ps->plan->qual); + needs_decompression = true; + should_rescan = true; + break; + case T_SeqScanState: + case T_SampleScanState: + case T_TidScanState: + case T_TidRangeScanState: + { + predicates = list_copy(ps->plan->qual); + needs_decompression = true; + break; + } + default: + break; + } + if (needs_decompression) + { + /* + * We are only interested in chunk scans of chunks that are the + * target of the DML statement not chunk scan on joined hypertables + * even when it is a self join + */ + int scanrelid = ((Scan *) ps->plan)->scanrelid; + if (list_member_int(ctx->relids, scanrelid)) + { + rte = rt_fetch(scanrelid, ps->state->es_range_table); + current_chunk = ts_chunk_get_by_relid(rte->relid, false); + if (current_chunk && ts_chunk_is_compressed(current_chunk)) + { + if (!ts_guc_enable_dml_decompression) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("UPDATE/DELETE is disabled on compressed chunks"), + errhint("Set timescaledb.enable_dml_decompression to TRUE."))); + + batches_decompressed = decompress_batches_for_update_delete(ctx->ht_state, + current_chunk, + predicates, + ps->state); + ctx->batches_decompressed |= batches_decompressed; + + /* This is a workaround specifically for bitmap heap scans: + * during node initialization, initialize the scan state with the active snapshot + * but since we are inserting data to be modified during the same query, they end up + * missing that data by using a snapshot which doesn't account for this decompressed + * data. To circumvent this issue, we change the internal scan state to use the + * transaction snapshot and execute a rescan so the scan state is set correctly and + * includes the new data. + */ + if (should_rescan) + { + ScanState *ss = ((ScanState *) ps); + if (ss && ss->ss_currentScanDesc) + { + ss->ss_currentScanDesc->rs_snapshot = GetTransactionSnapshot(); + ExecReScan(ps); + } + } + } + } + } + + if (predicates) + pfree(predicates); + + return planstate_tree_walker(ps, decompress_chunk_walker, ctx); +} + +/* + * For insert into compressed chunks with unique index determine the + * columns which can be used for INSERT batch filtering. + * The passed in relation is the uncompressed chunk. + * + * In case of multiple unique indexes we have to return the shared columns. + * For expression indexes we ignore the columns with expressions, for partial + * indexes we ignore predicate. + */ +static Bitmapset * +compressed_insert_key_columns(Relation relation) +{ + Bitmapset *shared_attrs = NULL; /* indexed columns */ + ListCell *l; + + /* Fast path if definitely no indexes */ + if (!RelationGetForm(relation)->relhasindex) + return NULL; + + List *indexoidlist = RelationGetIndexList(relation); + + /* Fall out if no indexes (but relhasindex was set) */ + if (indexoidlist == NIL) + return NULL; + + foreach (l, indexoidlist) + { + Oid indexOid = lfirst_oid(l); + Relation indexDesc = index_open(indexOid, AccessShareLock); + + /* + * We are only interested in unique indexes. PRIMARY KEY indexes also have + * indisunique set to true so we do not need to check for them separately. + */ + if (!indexDesc->rd_index->indislive || !indexDesc->rd_index->indisvalid || + !indexDesc->rd_index->indisunique) + { + index_close(indexDesc, AccessShareLock); + continue; + } + + Bitmapset *idx_attrs = NULL; + /* + * Collect attributes of current index. + * For covering indexes we need to ignore the included columns. + */ + for (int i = 0; i < indexDesc->rd_index->indnkeyatts; i++) + { + int attrnum = indexDesc->rd_index->indkey.values[i]; + /* We are not interested in expression columns which will have attrnum = 0 */ + if (!attrnum) + continue; + + idx_attrs = bms_add_member(idx_attrs, attrnum - FirstLowInvalidHeapAttributeNumber); + } + index_close(indexDesc, AccessShareLock); + + shared_attrs = shared_attrs ? bms_intersect(idx_attrs, shared_attrs) : idx_attrs; + + if (!shared_attrs) + return NULL; + } + + return shared_attrs; +} + +/* + * This method will evaluate the predicates, extract + * left and right operands, check if any of the operands + * can be used for batch filtering and if so, it will + * create a BatchFilter object and add it to the corresponding + * list. + * Any segmentby filter is put into index_filters list other + * filters are put into heap_filters list. + */ +static void +process_predicates(Chunk *ch, CompressionSettings *settings, List *predicates, List **heap_filters, + List **index_filters, List **is_null) +{ + ListCell *lc; + /* + * We dont want to forward boundParams from the execution state here + * as we dont want to constify join params in the predicates. + * Constifying JOIN params would not be safe as we don't redo + * this part in rescan. + */ + PlannerGlobal glob = { .boundParams = NULL }; + PlannerInfo root = { .glob = &glob }; + + foreach (lc, predicates) + { + Node *node = copyObject(lfirst(lc)); + Var *var; + Expr *expr; + Oid collation, opno; + RegProcedure opcode; + char *column_name; + + switch (nodeTag(node)) + { + case T_OpExpr: + { + OpExpr *opexpr = castNode(OpExpr, node); + collation = opexpr->inputcollid; + Const *arg_value; + + if (!ts_extract_expr_args(&opexpr->xpr, &var, &expr, &opno, &opcode)) + continue; + + if (!IsA(expr, Const)) + { + expr = (Expr *) estimate_expression_value(&root, (Node *) expr); + + if (!IsA(expr, Const)) + continue; + } + + arg_value = castNode(Const, expr); + + column_name = get_attname(ch->table_id, var->varattno, false); + TypeCacheEntry *tce = lookup_type_cache(var->vartype, TYPECACHE_BTREE_OPFAMILY); + int op_strategy = get_op_opfamily_strategy(opno, tce->btree_opf); + if (ts_array_is_member(settings->fd.segmentby, column_name)) + { + switch (op_strategy) + { + case BTEqualStrategyNumber: + case BTLessStrategyNumber: + case BTLessEqualStrategyNumber: + case BTGreaterStrategyNumber: + case BTGreaterEqualStrategyNumber: + { + /* save segment by column name and its corresponding value specified in + * WHERE */ + *index_filters = lappend(*index_filters, + make_batchfilter(column_name, + op_strategy, + collation, + opcode, + arg_value, + false, /* is_null_check */ + false, /* is_null */ + false /* is_array_op */ + )); + } + } + continue; + } + + int min_attno = compressed_column_metadata_attno(settings, + ch->table_id, + var->varattno, + settings->fd.relid, + "min"); + int max_attno = compressed_column_metadata_attno(settings, + ch->table_id, + var->varattno, + settings->fd.relid, + "max"); + + if (min_attno != InvalidAttrNumber && max_attno != InvalidAttrNumber) + { + switch (op_strategy) + { + case BTEqualStrategyNumber: + { + /* orderby col = value implies min <= value and max >= value */ + *heap_filters = lappend(*heap_filters, + make_batchfilter(get_attname(settings->fd.relid, + min_attno, + false), + BTLessEqualStrategyNumber, + collation, + opcode, + arg_value, + false, /* is_null_check */ + false, /* is_null */ + false /* is_array_op */ + )); + *heap_filters = lappend(*heap_filters, + make_batchfilter(get_attname(settings->fd.relid, + max_attno, + false), + BTGreaterEqualStrategyNumber, + collation, + opcode, + arg_value, + false, /* is_null_check */ + false, /* is_null */ + false /* is_array_op */ + )); + } + break; + case BTLessStrategyNumber: + case BTLessEqualStrategyNumber: + { + /* orderby col <[=] value implies min <[=] value */ + *heap_filters = lappend(*heap_filters, + make_batchfilter(get_attname(settings->fd.relid, + min_attno, + false), + op_strategy, + collation, + opcode, + arg_value, + false, /* is_null_check */ + false, /* is_null */ + false /* is_array_op */ + )); + } + break; + case BTGreaterStrategyNumber: + case BTGreaterEqualStrategyNumber: + { + /* orderby col >[=] value implies max >[=] value */ + *heap_filters = lappend(*heap_filters, + make_batchfilter(get_attname(settings->fd.relid, + max_attno, + false), + op_strategy, + collation, + opcode, + arg_value, + false, /* is_null_check */ + false, /* is_null */ + false /* is_array_op */ + )); + } + } + } + } + break; + case T_ScalarArrayOpExpr: + { + ScalarArrayOpExpr *sa_expr = castNode(ScalarArrayOpExpr, node); + if (!ts_extract_expr_args(&sa_expr->xpr, &var, &expr, &opno, &opcode)) + continue; + + if (!IsA(expr, Const)) + { + expr = (Expr *) estimate_expression_value(&root, (Node *) expr); + if (!IsA(expr, Const)) + continue; + } + + Const *arg_value = castNode(Const, expr); + collation = sa_expr->inputcollid; + + column_name = get_attname(ch->table_id, var->varattno, false); + TypeCacheEntry *tce = lookup_type_cache(var->vartype, TYPECACHE_BTREE_OPFAMILY); + int op_strategy = get_op_opfamily_strategy(opno, tce->btree_opf); + if (ts_array_is_member(settings->fd.segmentby, column_name)) + { + switch (op_strategy) + { + case BTEqualStrategyNumber: + case BTLessStrategyNumber: + case BTLessEqualStrategyNumber: + case BTGreaterStrategyNumber: + case BTGreaterEqualStrategyNumber: + { + /* save segment by column name and its corresponding value specified in + * WHERE */ + *index_filters = lappend(*index_filters, + make_batchfilter(column_name, + op_strategy, + collation, + opcode, + arg_value, + false, /* is_null_check */ + false, /* is_null */ + true /* is_array_op */ + )); + } + } + continue; + } + + break; + } + case T_NullTest: + { + NullTest *ntest = (NullTest *) node; + if (IsA(ntest->arg, Var)) + { + var = (Var *) ntest->arg; + /* ignore system-defined attributes */ + if (var->varattno <= 0) + continue; + column_name = get_attname(ch->table_id, var->varattno, false); + if (ts_array_is_member(settings->fd.segmentby, column_name)) + { + *index_filters = + lappend(*index_filters, + make_batchfilter(column_name, + InvalidStrategy, + InvalidOid, + InvalidOid, + NULL, + true, /* is_null_check */ + ntest->nulltesttype == IS_NULL, /* is_null */ + false /* is_array_op */ + )); + if (ntest->nulltesttype == IS_NULL) + *is_null = lappend_int(*is_null, 1); + else + *is_null = lappend_int(*is_null, 0); + } + /* We cannot optimize filtering decompression using ORDERBY + * metadata and null check qualifiers. We could possibly do that by checking the + * compressed data in combination with the ORDERBY nulls first setting and + * verifying that the first or last tuple of a segment contains a NULL value. + * This is left for future optimization */ + } + } + break; + default: + break; + } + } +} + +static BatchFilter * +make_batchfilter(char *column_name, StrategyNumber strategy, Oid collation, RegProcedure opcode, + Const *value, bool is_null_check, bool is_null, bool is_array_op) +{ + BatchFilter *segment_filter = palloc0(sizeof(*segment_filter)); + + *segment_filter = (BatchFilter){ + .strategy = strategy, + .collation = collation, + .opcode = opcode, + .value = value, + .is_null_check = is_null_check, + .is_null = is_null, + .is_array_op = is_array_op, + }; + namestrcpy(&segment_filter->column_name, column_name); + + return segment_filter; +} + +/* + * A compressed chunk can have multiple indexes. For a given list + * of columns in index_filters, find the matching index which has + * the most columns based on index_filters and adjust the filters + * if necessary. + * Return matching index if found else return NULL. + * + * Note: This method will find the best matching index based on + * number of filters it matches. If an index matches all the filters, + * it will be chosen. Otherwise, it will try to select the index + * which has most matches. If there are multiple indexes have + * the same number of matches, it will pick the first one it finds. + * For example + * for a given condition like "WHERE X = 10 AND Y = 8" + * if there are multiple indexes like + * 1. index (a,b,c,x) + * 2. index (a,x,y) + * 3. index (x) + * In this case 2nd index is returned. If that one didn't exist, + * it would return the 1st index. + */ +static Relation +find_matching_index(Relation comp_chunk_rel, List **index_filters, List **heap_filters) +{ + List *index_oids; + ListCell *lc; + int total_filters = list_length(*index_filters); + int max_match_count = 0; + Relation result_rel = NULL; + + /* get list of indexes defined on compressed chunk */ + index_oids = RelationGetIndexList(comp_chunk_rel); + foreach (lc, index_oids) + { + int match_count = 0; + Relation index_rel = index_open(lfirst_oid(lc), AccessShareLock); + IndexInfo *index_info = BuildIndexInfo(index_rel); + + /* Can't use partial or expression indexes */ + if (index_info->ii_Predicate != NIL || index_info->ii_Expressions != NIL) + { + index_close(index_rel, AccessShareLock); + continue; + } + + /* Can only use Btree indexes */ + if (index_info->ii_Am != BTREE_AM_OID) + { + index_close(index_rel, AccessShareLock); + continue; + } + + ListCell *li; + foreach (li, *index_filters) + { + for (int i = 0; i < index_rel->rd_index->indnatts; i++) + { + AttrNumber attnum = index_rel->rd_index->indkey.values[i]; + char *attname = get_attname(RelationGetRelid(comp_chunk_rel), attnum, false); + BatchFilter *sf = lfirst(li); + /* ensure column exists in index relation */ + if (!strcmp(attname, NameStr(sf->column_name))) + { + match_count++; + break; + } + } + } + if (match_count == total_filters) + { + /* found index which has all columns specified in WHERE */ + if (result_rel) + index_close(result_rel, AccessShareLock); + if (ts_guc_debug_compression_path_info) + elog(INFO, "Index \"%s\" is used for scan. ", RelationGetRelationName(index_rel)); + return index_rel; + } + + if (match_count > max_match_count) + { + max_match_count = match_count; + result_rel = index_rel; + continue; + } + index_close(index_rel, AccessShareLock); + } + + /* No matching index whatsoever */ + if (!result_rel) + { + *heap_filters = list_concat(*heap_filters, *index_filters); + *index_filters = list_truncate(*index_filters, 0); + return NULL; + } + + /* We found an index which matches partially. + * It can be used but we need to transfer the unmatched + * filters from index_filters to heap filters. + */ + for (int i = 0; i < list_length(*index_filters); i++) + { + BatchFilter *sf = list_nth(*index_filters, i); + bool match = false; + for (int j = 0; j < result_rel->rd_index->indnatts; j++) + { + AttrNumber attnum = result_rel->rd_index->indkey.values[j]; + char *attname = get_attname(RelationGetRelid(comp_chunk_rel), attnum, false); + /* ensure column exists in index relation */ + if (!strcmp(attname, NameStr(sf->column_name))) + { + match = true; + break; + } + } + + if (!match) + { + *heap_filters = lappend(*heap_filters, sf); + *index_filters = list_delete_nth_cell(*index_filters, i); + } + } + if (ts_guc_debug_compression_path_info) + elog(INFO, "Index \"%s\" is used for scan. ", RelationGetRelationName(result_rel)); + return result_rel; +} + +static inline TM_Result +delete_compressed_tuple(RowDecompressor *decompressor, Snapshot snapshot, + HeapTuple compressed_tuple) +{ + TM_FailureData tmfd; + TM_Result result; + result = table_tuple_delete(decompressor->in_rel, + &compressed_tuple->t_self, + decompressor->mycid, + snapshot, + InvalidSnapshot, + true, + &tmfd, + false); + return result; +} + +static void +report_error(TM_Result result) +{ + switch (result) + { + case TM_Deleted: + { + if (IsolationUsesXactSnapshot()) + { + /* For Repeatable Read isolation level report error */ + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("could not serialize access due to concurrent update"))); + } + } + break; + /* + * If another transaction is updating the compressed data, + * we have to abort the transaction to keep consistency. + */ + case TM_Updated: + { + elog(ERROR, "tuple concurrently updated"); + } + break; + case TM_Invisible: + { + elog(ERROR, "attempted to lock invisible tuple"); + } + break; + case TM_Ok: + break; + default: + { + elog(ERROR, "unexpected tuple operation result: %d", result); + } + break; + } +} diff --git a/tsl/src/compression/compression_dml.h b/tsl/src/compression/compression_dml.h new file mode 100644 index 00000000000..34add5e284f --- /dev/null +++ b/tsl/src/compression/compression_dml.h @@ -0,0 +1,29 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#pragma once + +#include +#include + +#include "ts_catalog/compression_settings.h" + +ScanKeyData *build_scankeys_for_uncompressed(Oid ht_relid, CompressionSettings *settings, + Relation out_rel, Bitmapset *key_columns, + TupleTableSlot *slot, int *num_scankeys); +ScanKeyData *build_index_scankeys(Relation index_rel, List *index_filters, int *num_scankeys); +ScanKeyData *build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, + Relation out_rel, Bitmapset *key_columns, + TupleTableSlot *slot, Relation *result_index_rel, + Bitmapset **index_columns, int *num_scan_keys); +ScanKeyData *build_heap_scankeys(Oid hypertable_relid, Relation in_rel, Relation out_rel, + CompressionSettings *settings, Bitmapset *key_columns, + Bitmapset **null_columns, TupleTableSlot *slot, int *num_scankeys); +ScanKeyData *build_update_delete_scankeys(Relation in_rel, List *heap_filters, int *num_scankeys, + Bitmapset **null_columns); +int create_segment_filter_scankey(Relation in_rel, char *segment_filter_col_name, + StrategyNumber strategy, Oid subtype, ScanKeyData *scankeys, + int num_scankeys, Bitmapset **null_columns, Datum value, + bool is_null_check, bool is_array_op); diff --git a/tsl/src/compression/compression_scankey.c b/tsl/src/compression/compression_scankey.c new file mode 100644 index 00000000000..c119555591c --- /dev/null +++ b/tsl/src/compression/compression_scankey.c @@ -0,0 +1,536 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ + +#include +#include +#include +#include +#include + +#include "compression.h" +#include "compression_dml.h" +#include "create.h" +#include "ts_catalog/array_utils.h" + +static Oid deduce_filter_subtype(BatchFilter *filter, Oid att_typoid); + +/* + * Build scankeys for decompressed tuple to check if it is part of the batch. + * + * The key_columns are the columns of the uncompressed chunk. + */ +ScanKeyData * +build_scankeys_for_uncompressed(Oid ht_relid, CompressionSettings *settings, Relation out_rel, + Bitmapset *key_columns, TupleTableSlot *slot, int *num_scankeys) +{ + ScanKeyData *scankeys = NULL; + int key_index = 0; + TupleDesc out_desc = RelationGetDescr(out_rel); + + if (bms_is_empty(key_columns)) + { + *num_scankeys = key_index; + return scankeys; + } + + scankeys = palloc(sizeof(ScanKeyData) * bms_num_members(key_columns)); + + int i = -1; + while ((i = bms_next_member(key_columns, i)) > 0) + { + AttrNumber attno = i + FirstLowInvalidHeapAttributeNumber; + bool isnull; + + /* + * slot has the physical layout of the hypertable, so we need to + * get the attribute number of the hypertable for the column. + */ + char *attname = get_attname(out_rel->rd_id, attno, false); + + /* + * We can skip any segmentby columns here since they have already been + * checked during batch filtering. + */ + if (ts_array_is_member(settings->fd.segmentby, attname)) + { + continue; + } + + AttrNumber ht_attno = get_attnum(ht_relid, attname); + Datum value = slot_getattr(slot, ht_attno, &isnull); + + Oid atttypid = out_desc->attrs[AttrNumberGetAttrOffset(attno)].atttypid; + TypeCacheEntry *tce = lookup_type_cache(atttypid, TYPECACHE_BTREE_OPFAMILY); + + /* + * Should never happen since the column is part of unique constraint + * and should therefore have the required opfamily + */ + if (!OidIsValid(tce->btree_opf)) + elog(ERROR, "no btree opfamily for type \"%s\"", format_type_be(atttypid)); + + Oid opr = get_opfamily_member(tce->btree_opf, atttypid, atttypid, BTEqualStrategyNumber); + + /* + * Fall back to btree operator input type when it is binary compatible with + * the column type and no operator for column type could be found. + */ + if (!OidIsValid(opr) && IsBinaryCoercible(atttypid, tce->btree_opintype)) + { + opr = get_opfamily_member(tce->btree_opf, + tce->btree_opintype, + tce->btree_opintype, + BTEqualStrategyNumber); + } + + if (!OidIsValid(opr)) + elog(ERROR, "no operator found for type \"%s\"", format_type_be(atttypid)); + + ScanKeyEntryInitialize(&scankeys[key_index++], + isnull ? SK_ISNULL | SK_SEARCHNULL : 0, + attno, + BTEqualStrategyNumber, + InvalidOid, + out_desc->attrs[AttrNumberGetAttrOffset(attno)].attcollation, + get_opcode(opr), + isnull ? 0 : value); + } + + *num_scankeys = key_index; + return scankeys; +} + +/* + * Build scankeys for decompression of specific batches. key_columns references the + * columns of the uncompressed chunk. + */ +ScanKeyData * +build_heap_scankeys(Oid hypertable_relid, Relation in_rel, Relation out_rel, + CompressionSettings *settings, Bitmapset *key_columns, Bitmapset **null_columns, + TupleTableSlot *slot, int *num_scankeys) +{ + int key_index = 0; + ScanKeyData *scankeys = NULL; + + if (!bms_is_empty(key_columns)) + { + scankeys = palloc0(bms_num_members(key_columns) * 2 * sizeof(ScanKeyData)); + int i = -1; + while ((i = bms_next_member(key_columns, i)) > 0) + { + AttrNumber attno = i + FirstLowInvalidHeapAttributeNumber; + char *attname = get_attname(out_rel->rd_id, attno, false); + bool isnull; + AttrNumber ht_attno = get_attnum(hypertable_relid, attname); + + /* + * This is a not very precise but easy assertion to detect attno + * mismatch at least in some cases. The mismatch might happen if the + * hypertable and chunk layout are different because of dropped + * columns, and we're using a wrong slot type here. + */ + PG_USED_FOR_ASSERTS_ONLY Oid ht_atttype = get_atttype(hypertable_relid, ht_attno); + PG_USED_FOR_ASSERTS_ONLY Oid slot_atttype = + slot->tts_tupleDescriptor->attrs[AttrNumberGetAttrOffset(ht_attno)].atttypid; + Assert(ht_atttype == slot_atttype); + + Datum value = slot_getattr(slot, ht_attno, &isnull); + + /* + * There are 3 possible scenarios we have to consider + * when dealing with columns which are part of unique + * constraints. + * + * 1. Column is segmentby-Column + * In this case we can add a single ScanKey with an + * equality check for the value. + * 2. Column is orderby-Column + * In this we can add 2 ScanKeys with range constraints + * utilizing batch metadata. + * 3. Column is neither segmentby nor orderby + * In this case we cannot utilize this column for + * batch filtering as the values are compressed and + * we have no metadata. + */ + if (ts_array_is_member(settings->fd.segmentby, attname)) + { + key_index = create_segment_filter_scankey(in_rel, + attname, + BTEqualStrategyNumber, + InvalidOid, + scankeys, + key_index, + null_columns, + value, + isnull, + false); + } + if (ts_array_is_member(settings->fd.orderby, attname)) + { + /* Cannot optimize orderby columns with NULL values since those + * are not visible in metadata + */ + if (isnull) + continue; + + int16 index = ts_array_position(settings->fd.orderby, attname); + + key_index = create_segment_filter_scankey(in_rel, + column_segment_min_name(index), + BTLessEqualStrategyNumber, + InvalidOid, + scankeys, + key_index, + null_columns, + value, + false, + false); /* is_null_check */ + key_index = create_segment_filter_scankey(in_rel, + column_segment_max_name(index), + BTGreaterEqualStrategyNumber, + InvalidOid, + scankeys, + key_index, + null_columns, + value, + false, + false); /* is_null_check */ + } + } + } + + *num_scankeys = key_index; + return scankeys; +} + +/* + * This method will build scan keys required to do index + * scans on compressed chunks. + */ +ScanKeyData * +build_index_scankeys(Relation index_rel, List *index_filters, int *num_scankeys) +{ + ListCell *lc; + BatchFilter *filter = NULL; + *num_scankeys = list_length(index_filters); + ScanKeyData *scankey = palloc0(sizeof(ScanKeyData) * (*num_scankeys)); + int idx = 0; + int flags; + + /* Order scankeys based on index attribute order */ + for (int idx_attno = 1; idx_attno <= index_rel->rd_index->indnkeyatts && idx < *num_scankeys; + idx_attno++) + { + AttrNumber attno = index_rel->rd_index->indkey.values[AttrNumberGetAttrOffset(idx_attno)]; + char *attname = get_attname(index_rel->rd_index->indrelid, attno, false); + Oid typoid = attnumTypeId(index_rel, idx_attno); + foreach (lc, index_filters) + { + filter = lfirst(lc); + if (!strcmp(attname, NameStr(filter->column_name))) + { + flags = 0; + if (filter->is_null_check) + { + flags = SK_ISNULL | (filter->is_null ? SK_SEARCHNULL : SK_SEARCHNOTNULL); + } + if (filter->is_array_op) + { + flags |= SK_SEARCHARRAY; + } + + ScanKeyEntryInitialize(&scankey[idx++], + flags, + idx_attno, + filter->strategy, + deduce_filter_subtype(filter, typoid), /* subtype */ + filter->collation, + filter->opcode, + filter->value ? filter->value->constvalue : 0); + break; + } + } + } + + Assert(idx == *num_scankeys); + return scankey; +} + +/* This method is used to find matching index on compressed chunk + * and build scan keys from the slot data + */ +ScanKeyData * +build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation out_rel, + Bitmapset *key_columns, TupleTableSlot *slot, + Relation *result_index_rel, Bitmapset **index_columns, + int *num_scan_keys) +{ + List *index_oids; + ListCell *lc; + ScanKeyData *scankeys = NULL; + /* get list of indexes defined on compressed chunk */ + index_oids = RelationGetIndexList(in_rel); + *num_scan_keys = 0; + + foreach (lc, index_oids) + { + Relation index_rel = index_open(lfirst_oid(lc), AccessShareLock); + IndexInfo *index_info = BuildIndexInfo(index_rel); + + /* Can't use partial or expression indexes */ + if (index_info->ii_Predicate != NIL || index_info->ii_Expressions != NIL) + { + index_close(index_rel, AccessShareLock); + continue; + } + + /* Can only use Btree indexes */ + if (index_info->ii_Am != BTREE_AM_OID) + { + index_close(index_rel, AccessShareLock); + continue; + } + + /* + * Must have at least two attributes, index we are looking for contains + * at least one segmentby column and a sequence number. + */ + if (index_rel->rd_index->indnatts < 2) + { + index_close(index_rel, AccessShareLock); + continue; + } + + scankeys = palloc0((index_rel->rd_index->indnatts) * sizeof(ScanKeyData)); + + /* + * Using only key attributes to exclude covering columns + * only interested in filtering here + */ + for (int i = 0; i < index_rel->rd_index->indnkeyatts; i++) + { + AttrNumber idx_attnum = AttrOffsetGetAttrNumber(i); + AttrNumber in_attnum = index_rel->rd_index->indkey.values[i]; + const NameData *attname = attnumAttName(in_rel, in_attnum); + + /* Make sure we find columns in key columns in order to select the right index */ + if (!bms_is_member(get_attnum(out_rel->rd_id, NameStr(*attname)), key_columns)) + { + break; + } + + bool isnull; + AttrNumber ht_attno = get_attnum(hypertable_relid, NameStr(*attname)); + Datum value = slot_getattr(slot, ht_attno, &isnull); + + if (isnull) + { + ScanKeyEntryInitialize(&scankeys[(*num_scan_keys)++], + SK_ISNULL | SK_SEARCHNULL, + idx_attnum, + InvalidStrategy, /* no strategy */ + InvalidOid, /* no strategy subtype */ + InvalidOid, /* no collation */ + InvalidOid, /* no reg proc for this */ + (Datum) 0); /* constant */ + continue; + } + + Oid atttypid = attnumTypeId(index_rel, idx_attnum); + + TypeCacheEntry *tce = lookup_type_cache(atttypid, TYPECACHE_BTREE_OPFAMILY); + if (!OidIsValid(tce->btree_opf)) + elog(ERROR, "no btree opfamily for type \"%s\"", format_type_be(atttypid)); + + Oid opr = + get_opfamily_member(tce->btree_opf, atttypid, atttypid, BTEqualStrategyNumber); + + /* + * Fall back to btree operator input type when it is binary compatible with + * the column type and no operator for column type could be found. + */ + if (!OidIsValid(opr) && IsBinaryCoercible(atttypid, tce->btree_opintype)) + { + opr = get_opfamily_member(tce->btree_opf, + tce->btree_opintype, + tce->btree_opintype, + BTEqualStrategyNumber); + } + + /* No operator could be found so we can't create the scankey. */ + if (!OidIsValid(opr)) + continue; + + Oid opcode = get_opcode(opr); + Ensure(OidIsValid(opcode), + "no opcode found for column operator of a hypertable column"); + + ScanKeyEntryInitialize(&scankeys[(*num_scan_keys)++], + 0, /* flags */ + idx_attnum, + BTEqualStrategyNumber, + InvalidOid, /* No strategy subtype. */ + attnumCollationId(index_rel, idx_attnum), + opcode, + value); + } + + if (*num_scan_keys > 0) + { + *result_index_rel = index_rel; + break; + } + else + { + index_close(index_rel, AccessShareLock); + pfree(scankeys); + scankeys = NULL; + } + } + + return scankeys; +} + +/* + * This method will build scan keys for predicates including + * SEGMENT BY column with attribute number from compressed chunk + * if condition is like = , else + * OUT param null_columns is saved with column attribute number. + */ +ScanKeyData * +build_update_delete_scankeys(Relation in_rel, List *heap_filters, int *num_scankeys, + Bitmapset **null_columns) +{ + ListCell *lc; + BatchFilter *filter; + int key_index = 0; + + ScanKeyData *scankeys = palloc0(heap_filters->length * sizeof(ScanKeyData)); + + foreach (lc, heap_filters) + { + filter = lfirst(lc); + AttrNumber attno = get_attnum(in_rel->rd_id, NameStr(filter->column_name)); + Oid typoid = get_atttype(in_rel->rd_id, attno); + if (attno == InvalidAttrNumber) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + NameStr(filter->column_name), + RelationGetRelationName(in_rel)))); + + key_index = create_segment_filter_scankey(in_rel, + NameStr(filter->column_name), + filter->strategy, + deduce_filter_subtype(filter, typoid), + scankeys, + key_index, + null_columns, + filter->value ? filter->value->constvalue : 0, + filter->is_null_check, + filter->is_array_op); + } + *num_scankeys = key_index; + return scankeys; +} + +int +create_segment_filter_scankey(Relation in_rel, char *segment_filter_col_name, + StrategyNumber strategy, Oid subtype, ScanKeyData *scankeys, + int num_scankeys, Bitmapset **null_columns, Datum value, + bool is_null_check, bool is_array_op) +{ + AttrNumber cmp_attno = get_attnum(in_rel->rd_id, segment_filter_col_name); + Assert(cmp_attno != InvalidAttrNumber); + /* This should never happen but if it does happen, we can't generate a scan key for + * the filter column so just skip it */ + if (cmp_attno == InvalidAttrNumber) + return num_scankeys; + + int flags = is_array_op ? SK_SEARCHARRAY : 0; + + /* + * In PG versions <= 14 NULL values are always considered distinct + * from other NULL values and therefore NULLABLE multi-columnn + * unique constraints might expose unexpected behaviour in the + * presence of NULL values. + * Since SK_SEARCHNULL is not supported by heap scans we cannot + * build a ScanKey for NOT NULL and instead have to do those + * checks manually. + */ + if (is_null_check) + { + *null_columns = bms_add_member(*null_columns, cmp_attno); + return num_scankeys; + } + + Oid atttypid = in_rel->rd_att->attrs[AttrNumberGetAttrOffset(cmp_attno)].atttypid; + + TypeCacheEntry *tce = lookup_type_cache(atttypid, TYPECACHE_BTREE_OPFAMILY); + if (!OidIsValid(tce->btree_opf)) + elog(ERROR, "no btree opfamily for type \"%s\"", format_type_be(atttypid)); + + Oid opr = get_opfamily_member(tce->btree_opf, atttypid, atttypid, strategy); + + /* + * Fall back to btree operator input type when it is binary compatible with + * the column type and no operator for column type could be found. + */ + if (!OidIsValid(opr) && IsBinaryCoercible(atttypid, tce->btree_opintype)) + { + opr = + get_opfamily_member(tce->btree_opf, tce->btree_opintype, tce->btree_opintype, strategy); + } + + /* No operator could be found so we can't create the scankey. */ + if (!OidIsValid(opr)) + return num_scankeys; + + opr = get_opcode(opr); + Assert(OidIsValid(opr)); + /* We should never end up here but: no opcode, no optimization */ + if (!OidIsValid(opr)) + return num_scankeys; + + ScanKeyEntryInitialize(&scankeys[num_scankeys++], + flags, + cmp_attno, + strategy, + subtype, + in_rel->rd_att->attrs[AttrNumberGetAttrOffset(cmp_attno)].attcollation, + opr, + value); + + return num_scankeys; +} + +/* + * Get the subtype for an indexscan from the provided filter. We also + * need to handle array constants appropriately. + */ +static Oid +deduce_filter_subtype(BatchFilter *filter, Oid att_typoid) +{ + Oid subtype = InvalidOid; + + if (!filter->value) + return InvalidOid; + + /* + * Check if the filter type is different from the att type. If yes, the + * subtype needs to be set appropriately. + */ + if (att_typoid != filter->value->consttype) + { + /* For an array type get its element type */ + if (filter->is_array_op) + subtype = get_element_type(filter->value->consttype); + else + subtype = filter->value->consttype; + } + + return subtype; +} diff --git a/tsl/src/compression/segment_meta.c b/tsl/src/compression/segment_meta.c index 388556acfdd..f51fceb3145 100644 --- a/tsl/src/compression/segment_meta.c +++ b/tsl/src/compression/segment_meta.c @@ -11,7 +11,6 @@ #include #include -#include "datum_serialize.h" #include "segment_meta.h" SegmentMetaMinMaxBuilder * diff --git a/tsl/src/init.c b/tsl/src/init.c index 758f1aa9346..6d567c81d63 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -16,13 +16,13 @@ #include "bgw_policy/retention_api.h" #include "chunk.h" #include "chunk_api.h" +#include "compression/algorithms/array.h" +#include "compression/algorithms/deltadelta.h" +#include "compression/algorithms/dictionary.h" +#include "compression/algorithms/gorilla.h" #include "compression/api.h" -#include "compression/array.h" #include "compression/compression.h" #include "compression/create.h" -#include "compression/deltadelta.h" -#include "compression/dictionary.h" -#include "compression/gorilla.h" #include "compression/segment_meta.h" #include "config.h" #include "continuous_aggs/create.h" diff --git a/tsl/src/nodes/decompress_chunk/exec.c b/tsl/src/nodes/decompress_chunk/exec.c index a0805aa8401..c59bcfba072 100644 --- a/tsl/src/nodes/decompress_chunk/exec.c +++ b/tsl/src/nodes/decompress_chunk/exec.c @@ -21,7 +21,6 @@ #include #include "compat/compat.h" -#include "compression/array.h" #include "compression/arrow_c_data_interface.h" #include "compression/compression.h" #include "guc.h" diff --git a/tsl/test/src/compression_unit_test.c b/tsl/test/src/compression_unit_test.c index b08b2a5cb7f..fa78055fdc0 100644 --- a/tsl/test/src/compression_unit_test.c +++ b/tsl/test/src/compression_unit_test.c @@ -23,12 +23,12 @@ #include "ts_catalog/catalog.h" #include -#include "compression/array.h" +#include "compression/algorithms/array.h" +#include "compression/algorithms/deltadelta.h" +#include "compression/algorithms/dictionary.h" +#include "compression/algorithms/float_utils.h" +#include "compression/algorithms/gorilla.h" #include "compression/arrow_c_data_interface.h" -#include "compression/deltadelta.h" -#include "compression/dictionary.h" -#include "compression/float_utils.h" -#include "compression/gorilla.h" #include "compression/segment_meta.h" #define TEST_ELEMENTS 1015