Skip to content

Commit

Permalink
Merge pull request ClickHouse#70520 from ClickHouse/vdimir/max_parts_…
Browse files Browse the repository at this point in the history
…to_move

Add setting max_parts_to_move
  • Loading branch information
vdimir authored Oct 10, 2024
2 parents c8117d2 + 9d2dff2 commit 9876841
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 24 deletions.
49 changes: 28 additions & 21 deletions docs/en/operations/settings/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Write add http CORS header.

Type: String

Default value:
Default value:

An additional filter expression to apply to the result of `SELECT` query.
This setting is not applied to any subquery.
Expand Down Expand Up @@ -1389,7 +1389,7 @@ The engine family allowed in Cloud. 0 - allow everything, 1 - rewrite DDLs to us

Type: String

Default value:
Default value:

Cluster for a shard in which current server is located

Expand All @@ -1413,7 +1413,7 @@ Enable collecting hash table statistics to optimize memory allocation

Type: String

Default value:
Default value:

The `compatibility` setting causes ClickHouse to use the default settings of a previous version of ClickHouse, where the previous version is provided as the setting.

Expand Down Expand Up @@ -3119,7 +3119,7 @@ The setting is used by the server itself to support distributed queries. Do not

Type: String

Default value:
Default value:

Disables query execution if passed data skipping indices wasn't used.

Expand Down Expand Up @@ -3183,7 +3183,7 @@ Possible values:

Type: String

Default value:
Default value:

If it is set to a non-empty string, check that this projection is used in the query at least once.

Expand Down Expand Up @@ -3277,7 +3277,7 @@ It makes sense to disable it if the server has millions of tiny tables that are

Type: String

Default value:
Default value:

Choose function implementation for specific target or variant (experimental). If empty enable all of them.

Expand Down Expand Up @@ -3770,7 +3770,7 @@ Only available in ClickHouse Cloud. Exclude new data parts from SELECT queries u

Type: String

Default value:
Default value:

Ignores the skipping indexes specified if used by the query.

Expand Down Expand Up @@ -3945,7 +3945,7 @@ For not replicated tables see [non_replicated_deduplication_window](merge-tree-s

Type: String

Default value:
Default value:

The setting allows a user to provide own deduplication semantic in MergeTree/ReplicatedMergeTree
For example, by providing a unique value for the setting in each INSERT statement,
Expand Down Expand Up @@ -4623,7 +4623,7 @@ Possible values:

Type: String

Default value:
Default value:

Specifies the value for the `log_comment` field of the [system.query_log](../system-tables/query_log.md) table and comment text for the server log.

Expand Down Expand Up @@ -5533,6 +5533,12 @@ Default value: -1

Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited.

## max_parts_to_move {#max_parts_to_move}

Limit the number of parts that can be moved in one query. Zero means unlimited.

Default value: `1000`.

## max_query_size {#max_query_size}

Type: UInt64
Expand Down Expand Up @@ -6081,7 +6087,7 @@ If enabled, some of the perf events will be measured throughout queries' executi

Type: String

Default value:
Default value:

Comma separated list of perf metrics that will be measured throughout queries' execution. Empty means all events. See PerfEventInfo in sources for the available events.

Expand Down Expand Up @@ -6371,7 +6377,7 @@ Possible values:

Type: MySQLDataTypesSupport

Default value:
Default value:

Defines how MySQL types are converted to corresponding ClickHouse types. A comma separated list in any combination of `decimal`, `datetime64`, `date2Date32` or `date2String`.
- `decimal`: convert `NUMERIC` and `DECIMAL` types to `Decimal` when precision allows it.
Expand Down Expand Up @@ -6725,15 +6731,15 @@ Type: UInt64

Default value: 3

The minimum length of the expression `expr = x1 OR ... expr = xN` for optimization
The minimum length of the expression `expr = x1 OR ... expr = xN` for optimization

## optimize_min_inequality_conjunction_chain_length {#optimize_min_inequality_conjunction_chain_length}

Type: UInt64

Default value: 3

The minimum length of the expression `expr <> x1 AND ... expr <> xN` for optimization
The minimum length of the expression `expr <> x1 AND ... expr <> xN` for optimization

## optimize_move_to_prewhere {#optimize_move_to_prewhere}

Expand Down Expand Up @@ -7245,7 +7251,7 @@ This is internal setting that should not be used directly and represents an impl

Type: String

Default value:
Default value:

An arbitrary integer expression that can be used to split work between replicas for a specific table.
The value can be any integer expression.
Expand Down Expand Up @@ -7592,7 +7598,7 @@ Limit on max column size in block while reading. Helps to decrease cache misses

Type: String

Default value:
Default value:

If it is set to a non-empty string, ClickHouse will try to apply specified projection in query.

Expand Down Expand Up @@ -7774,7 +7780,7 @@ Possible values:

Type: String

Default value:
Default value:

A string which acts as a label for [query cache](../query-cache.md) entries.
The same queries with different tags are considered different by the query cache.
Expand Down Expand Up @@ -8340,7 +8346,7 @@ Min bytes required for remote read (url, s3) to do seek, instead of read with ig

Type: String

Default value:
Default value:

- **Type:** String

Expand Down Expand Up @@ -8827,7 +8833,7 @@ Send server text logs with specified minimum level to client. Valid values: 'tra

Type: String

Default value:
Default value:

Send server text logs with specified regexp to match log source name. Empty means all sources.

Expand Down Expand Up @@ -8858,7 +8864,7 @@ Timeout for sending data to the network, in seconds. If a client needs to send s

Type: Timezone

Default value:
Default value:

Sets the implicit time zone of the current session or query.
The implicit time zone is the time zone applied to values of type DateTime/DateTime64 which have no explicitly specified time zone.
Expand Down Expand Up @@ -9111,7 +9117,7 @@ Allow direct SELECT query for Kafka, RabbitMQ, FileLog, Redis Streams, and NATS

Type: String

Default value:
Default value:

When stream-like engine reads from multiple queues, the user will need to select one queue to insert into when writing. Used by Redis Streams and NATS.

Expand Down Expand Up @@ -9382,7 +9388,7 @@ Traverse shadow directory when query system.remote_data_paths

Type: SetOperationMode

Default value:
Default value:

Sets a mode for combining `SELECT` query results. The setting is only used when shared with [UNION](../../sql-reference/statements/select/union.md) without explicitly specifying the `UNION ALL` or `UNION DISTINCT`.

Expand Down Expand Up @@ -9747,4 +9753,5 @@ Default value: 0

Allows you to select the max window log of ZSTD (it will not be used for MergeTree family)

Default value: `true`.

1 change: 1 addition & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3067,6 +3067,7 @@ Possible values:
M(Bool, allow_drop_detached, false, R"(
Allow ALTER TABLE ... DROP DETACHED PART[ITION] ... queries
)", 0) \
M(UInt64, max_parts_to_move, 1000, "Limit the number of parts that can be moved in one query. Zero means unlimited.", 0) \
\
M(UInt64, max_table_size_to_drop, 50000000000lu, R"(
Restriction on deleting tables in query time. The value 0 means that you can delete all tables without any restrictions.
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"distributed_cache_read_alignment", 0, 0, "A setting for ClickHouse Cloud"},
{"distributed_cache_max_unacked_inflight_packets", 10, 10, "A setting for ClickHouse Cloud"},
{"distributed_cache_data_packet_ack_window", 5, 5, "A setting for ClickHouse Cloud"},
{"max_parts_to_move", 1000, 1000, "New setting"},
}
},
{"24.9",
Expand Down
20 changes: 17 additions & 3 deletions src/Storages/StorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ namespace Setting
extern const SettingsBool optimize_throw_if_noop;
extern const SettingsBool parallel_replicas_for_non_replicated_merge_tree;
extern const SettingsBool throw_on_unsupported_query_inside_transaction;
extern const SettingsUInt64 max_parts_to_move;
}

namespace MergeTreeSetting
Expand Down Expand Up @@ -89,6 +90,7 @@ namespace ErrorCodes
extern const int ABORTED;
extern const int SUPPORT_IS_DISABLED;
extern const int TABLE_IS_READ_ONLY;
extern const int TOO_MANY_PARTS;
}

namespace ActionLocks
Expand Down Expand Up @@ -2344,9 +2346,9 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const

// Use the same back-pressure (delay/throw) logic as for INSERTs to be consistent and avoid possibility of exceeding part limits using MOVE PARTITION queries
dest_table_storage->delayInsertOrThrowIfNeeded(nullptr, local_context, true);

auto lock1 = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef()[Setting::lock_acquire_timeout]);
auto lock2 = dest_table->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef()[Setting::lock_acquire_timeout]);
const auto & settings = local_context->getSettingsRef();
auto lock1 = lockForShare(local_context->getCurrentQueryId(), settings[Setting::lock_acquire_timeout]);
auto lock2 = dest_table->lockForShare(local_context->getCurrentQueryId(), settings[Setting::lock_acquire_timeout]);
auto merges_blocker = stopMergesAndWait();

auto dest_metadata_snapshot = dest_table->getInMemoryMetadataPtr();
Expand All @@ -2358,6 +2360,18 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
String partition_id = getPartitionIDFromQuery(partition, local_context);

DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
if (src_parts.size() > settings[Setting::max_parts_to_move])
{
/// Moving a large number of parts at once can take a long time or get stuck in a retry loop in case of an S3 error, for example.
/// Since merging is blocked, it can lead to a kind of deadlock:
/// MOVE cannot be done because of the number of parts, and merges are not executed because of the MOVE.
/// So abort the operation until parts are merged and user should retry
throw Exception(ErrorCodes::TOO_MANY_PARTS,
"Cannot move {} parts at once, the limit is {}. "
"Wait until some parts are merged and retry, move smaller partitions, or increase the setting 'max_parts_to_move'.",
src_parts.size(), settings[Setting::max_parts_to_move]);
}

MutableDataPartsVector dst_parts;
std::vector<scope_guard> dst_parts_locks;

Expand Down

0 comments on commit 9876841

Please sign in to comment.