From 83cf5605a5b986c9b8c0db845ed8b1780b9d7883 Mon Sep 17 00:00:00 2001 From: Sven Klemm Date: Wed, 13 Dec 2023 19:24:52 +0100 Subject: [PATCH] Remove multinode functionality from compression --- tsl/src/compression/api.c | 137 +---------------------------------- tsl/src/compression/create.c | 18 +---- 2 files changed, 5 insertions(+), 150 deletions(-) diff --git a/tsl/src/compression/api.c b/tsl/src/compression/api.c index ab874768c69..7bb056927f2 100644 --- a/tsl/src/compression/api.c +++ b/tsl/src/compression/api.c @@ -27,7 +27,6 @@ #include #include -#include #include "compat/compat.h" #include "cache.h" #include "chunk.h" @@ -687,74 +686,6 @@ tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed) return compress_chunk_impl(chunk->hypertable_relid, chunk->table_id); } -/* - * Helper for remote invocation of chunk compression and decompression. - */ -static bool -invoke_compression_func_remotely(FunctionCallInfo fcinfo, const Chunk *chunk) -{ - List *datanodes; - DistCmdResult *distres; - bool isnull_result = true; - Size i; - - Assert(chunk->relkind == RELKIND_FOREIGN_TABLE); - Assert(chunk->data_nodes != NIL); - datanodes = ts_chunk_get_data_node_name_list(chunk); - distres = ts_dist_cmd_invoke_func_call_on_data_nodes(fcinfo, datanodes); - - for (i = 0; i < ts_dist_cmd_response_count(distres); i++) - { - const char *node_name; - bool isnull; - Datum PG_USED_FOR_ASSERTS_ONLY d; - - d = ts_dist_cmd_get_single_scalar_result_by_index(distres, i, &isnull, &node_name); - - /* Make sure data nodes either (1) all return NULL, or (2) all return - * a non-null result. */ - if (i > 0 && isnull_result != isnull) - elog(ERROR, "inconsistent result from data node \"%s\"", node_name); - - isnull_result = isnull; - - if (!isnull) - { - Assert(OidIsValid(DatumGetObjectId(d))); - } - } - - ts_dist_cmd_close_response(distres); - - return !isnull_result; -} - -static bool -compress_remote_chunk(FunctionCallInfo fcinfo, const Chunk *chunk, bool if_not_compressed) -{ - bool success = invoke_compression_func_remotely(fcinfo, chunk); - - if (!success) - ereport((if_not_compressed ? NOTICE : ERROR), - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("chunk \"%s\" is already compressed", get_rel_name(chunk->table_id)))); - - return success; -} - -static bool -decompress_remote_chunk(FunctionCallInfo fcinfo, const Chunk *chunk, bool if_compressed) -{ - bool success = invoke_compression_func_remotely(fcinfo, chunk); - - if (!success) - ereport((if_compressed ? NOTICE : ERROR), - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("chunk \"%s\" is not compressed", get_rel_name(chunk->table_id)))); - - return success; -} - /* * Create a new compressed chunk using existing table with compressed data. * @@ -845,24 +776,7 @@ tsl_compress_chunk(PG_FUNCTION_ARGS) TS_PREVENT_FUNC_IF_READ_ONLY(); Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true); - if (chunk->relkind == RELKIND_FOREIGN_TABLE) - { - /* chunks of distributed hypertables are foreign tables */ - if (!compress_remote_chunk(fcinfo, chunk, if_not_compressed)) - PG_RETURN_NULL(); - - /* - * Updating the chunk compression status of the Access Node AFTER executing remote - * compression. In the event of failure, the compressed status will NOT be set. The - * distributed compression policy will attempt to compress again, which is idempotent, thus - * the metadata are eventually consistent. - */ - ts_chunk_set_compressed_chunk(chunk, INVALID_CHUNK_ID); - } - else - { - uncompressed_chunk_id = tsl_compress_chunk_wrapper(chunk, if_not_compressed); - } + uncompressed_chunk_id = tsl_compress_chunk_wrapper(chunk, if_not_compressed); PG_RETURN_OID(uncompressed_chunk_id); } @@ -881,27 +795,6 @@ tsl_decompress_chunk(PG_FUNCTION_ARGS) if (NULL == uncompressed_chunk) elog(ERROR, "unknown chunk id %d", uncompressed_chunk_id); - if (uncompressed_chunk->relkind == RELKIND_FOREIGN_TABLE) - { - /* - * Updating the chunk compression status of the Access Node BEFORE executing remote - * decompression. In the event of failure, the compressed status will be cleared. The - * distributed compression policy will attempt to compress again, which is idempotent, thus - * the metadata are eventually consistent. - * If CHUNK_STATUS_COMPRESSED is cleared, then it is probable that a remote compress_chunk() - * has not taken place, but not certain. For this above reason, this flag should not be - * assumed to be consistent (when it is cleared) for Access-Nodes. When used in distributed - * hypertables one should take advantage of the idempotent properties of remote - * compress_chunk() and distributed compression policy to make progress. - */ - ts_chunk_clear_compressed_chunk(uncompressed_chunk); - - if (!decompress_remote_chunk(fcinfo, uncompressed_chunk, if_compressed)) - PG_RETURN_NULL(); - - PG_RETURN_OID(uncompressed_chunk_id); - } - if (!decompress_chunk_impl(uncompressed_chunk->hypertable_relid, uncompressed_chunk_id, if_compressed)) @@ -1004,17 +897,9 @@ decompress_segment_changed_group(CompressedSegmentInfo **current_segment, TupleT return changed_segment; } -static bool -recompress_remote_chunk(FunctionCallInfo fcinfo, Chunk *chunk) -{ - return invoke_compression_func_remotely(fcinfo, chunk); -} - /* * This is hacky but it doesn't matter. We just want to check for the existence of such an index - * on the compressed chunk. For distributed hypertables, returning the index oid would raise issues, - * because the Access Node does not see that oid. So we return the oid of the uncompresed chunk - * instead, when an index is found. + * on the compressed chunk. */ extern Datum tsl_get_compressed_chunk_index_for_recompression(PG_FUNCTION_ARGS) @@ -1026,15 +911,6 @@ tsl_get_compressed_chunk_index_for_recompression(PG_FUNCTION_ARGS) if (NULL == uncompressed_chunk) elog(ERROR, "unknown chunk id %d", uncompressed_chunk_id); - /* push down to data nodes for distributed case */ - if (uncompressed_chunk->relkind == RELKIND_FOREIGN_TABLE) - { - /* look for index on data nodes */ - if (!(invoke_compression_func_remotely(fcinfo, uncompressed_chunk))) - PG_RETURN_NULL(); - else // don't care what the idx oid is, data node will find it and open it - PG_RETURN_OID(uncompressed_chunk_id); - } int32 srcht_id = uncompressed_chunk->fd.hypertable_id; Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true); @@ -1212,15 +1088,6 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS) TS_PREVENT_FUNC_IF_READ_ONLY(); Chunk *uncompressed_chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true); - /* need to push down to data nodes if this is an access node */ - if (uncompressed_chunk->relkind == RELKIND_FOREIGN_TABLE) - { - if (!recompress_remote_chunk(fcinfo, uncompressed_chunk)) - PG_RETURN_NULL(); - - PG_RETURN_OID(uncompressed_chunk_id); - } - int32 status = uncompressed_chunk->fd.status; if (status == CHUNK_STATUS_DEFAULT) diff --git a/tsl/src/compression/create.c b/tsl/src/compression/create.c index b020402c5c6..e207aa67534 100644 --- a/tsl/src/compression/create.c +++ b/tsl/src/compression/create.c @@ -976,7 +976,6 @@ disable_compression(Hypertable *ht, WithClauseResult *with_clause_options) /* compression is enabled. can we turn it off? */ check_modify_compression_options(ht, settings, with_clause_options, NIL); - /* distributed hypertables do not have compression table on the access node */ if (TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht)) drop_existing_compression_table(ht); else @@ -1149,20 +1148,9 @@ tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht, compresscolinfo_add_metadata_columns(&compress_cols); - if (hypertable_is_distributed(ht)) - { - /* On a distributed hypertable, there's no data locally, so don't - * create local compression tables and data but let the DDL pass on to - * data nodes. */ - ts_hypertable_set_compressed(ht, 0); - return true; - } - else - { - Oid tablespace_oid = get_rel_tablespace(ht->main_table_relid); - compress_htid = create_compression_table(ownerid, &compress_cols, tablespace_oid); - ts_hypertable_set_compressed(ht, compress_htid); - } + Oid tablespace_oid = get_rel_tablespace(ht->main_table_relid); + compress_htid = create_compression_table(ownerid, &compress_cols, tablespace_oid); + ts_hypertable_set_compressed(ht, compress_htid); if (!with_clause_options[CompressChunkTimeInterval].is_default) {