From ddf8596ad5a56d147524cd56944506839c0244c2 Mon Sep 17 00:00:00 2001 From: Cluas Date: Fri, 27 Sep 2024 12:04:56 +0800 Subject: [PATCH] feat: dist support skip_unavailable_shards and custom settings --- lib/db/clickhouse.js | 4 +++- lib/db/maintain/index.js | 24 +++++++++++++----------- lib/db/maintain/scripts.js | 22 +++++++++++----------- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index 5d9bb99c..d8ab954c 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -31,6 +31,8 @@ const { getClickhouseUrl } = require('./clickhouse_options') // External Storage Policy for Tables (S3, MINIO) const storagePolicy = process.env.STORAGE_POLICY || false +// Clickhouse Distributed Engine setting to skip unavailable shards +const skipUnavailableShards = process.env.SKIP_UNAVAILABLE_SHARDS || false const { StringStream, DataStream } = require('scramjet') @@ -188,7 +190,7 @@ const initialize = async function (dbName) { } if (!isOmitTablesCreation()) { const maintain = require('./maintain/index') - await maintain.upgrade({ name: dbName, storage_policy: storagePolicy }) + await maintain.upgrade({ name: dbName, storage_policy: storagePolicy, skip_unavailable_shards: skipUnavailableShards }) await maintain.rotate([{ db: dbName, samples_days: rotationSamples, diff --git a/lib/db/maintain/index.js b/lib/db/maintain/index.js index b66aaabd..ff4deff1 100644 --- a/lib/db/maintain/index.js +++ b/lib/db/maintain/index.js @@ -14,21 +14,21 @@ const getEnv = () => { /** * - * @param db {{name: string, storage_policy: string}} + * @param db {{name: string, storage_policy: string, skip_unavailable_shards: boolean}} * @returns {Promise} */ module.exports.upgrade = async (db) => { - await upgradeSingle(db.name, 1, scripts.overall, db.storage_policy) - await upgradeSingle(db.name, 2, scripts.traces, db.storage_policy) - await upgradeSingle(db.name, 5, scripts.profiles, db.storage_policy) + await upgradeSingle(db.name, 1, scripts.overall, db.storage_policy, false) + await upgradeSingle(db.name, 2, scripts.traces, db.storage_policy, false) + await upgradeSingle(db.name, 5, scripts.profiles, db.storage_policy, false) if (db.storage_policy) { await client.addSetting('rotate', 'v3_storage_policy', db.storage_policy, db.name) await client.addSetting('rotate', 'v1_traces_storage_policy', db.storage_policy, db.name) } if (clusterName) { - await upgradeSingle(db.name, 3, scripts.overall_dist, db.storage_policy) - await upgradeSingle(db.name, 4, scripts.traces_dist, db.storage_policy) - await upgradeSingle(db.name, 6, scripts.profiles_dist, db.storage_policy) + await upgradeSingle(db.name, 3, scripts.overall_dist, db.storage_policy, db.skip_unavailable_shards) + await upgradeSingle(db.name, 4, scripts.traces_dist, db.storage_policy, db.skip_unavailable_shards) + await upgradeSingle(db.name, 6, scripts.profiles_dist, db.storage_policy, db.skip_unavailable_shards) } } @@ -39,10 +39,11 @@ let isDBCreated = false * @param key {number} * @param scripts {string[]} * @param storagePolicy {string} + * @param skipUnavailableShards {boolean} */ -const upgradeSingle = async (db, key, scripts, storagePolicy) => { +const upgradeSingle = async (db, key, scripts, storagePolicy, skipUnavailableShards) => { const _upgradeRequest = (request, useDefaultDB, updateVer) => { - return upgradeRequest({ db, useDefaultDB, updateVer, storage_policy: storagePolicy }, request) + return upgradeRequest({ db, useDefaultDB, updateVer, storage_policy: storagePolicy, skip_unavailable_shards: skipUnavailableShards }, request) } if (!isDBCreated) { isDBCreated = true @@ -68,7 +69,7 @@ const upgradeSingle = async (db, key, scripts, storagePolicy) => { } /** - * @param opts {{db: string, useDefaultDB: boolean, updateVer: {key: number, to: number}, storage_policy: string}} + * @param opts {{db: string, useDefaultDB: boolean, updateVer: {key: number, to: number}, storage_policy: string, skip_unavailable_shards: boolean}} * @param request {string} database to update * @returns {Promise} */ @@ -83,7 +84,8 @@ const upgradeRequest = async (opts, request) => { MergeTree: clusterName ? 'ReplicatedMergeTree' : 'MergeTree', ReplacingMergeTree: clusterName ? 'ReplicatedReplacingMergeTree' : 'ReplacingMergeTree', AggregatingMergeTree: clusterName ? 'ReplicatedAggregatingMergeTree' : 'AggregatingMergeTree', - CREATE_SETTINGS: opts.storage_policy ? `SETTINGS storage_policy='${opts.storage_policy}'` : '' + CREATE_SETTINGS: opts.storage_policy ? `SETTINGS storage_policy='${opts.storage_policy}'` : '', + DIST_CREATE_SETTINGS: opts.skip_unavailable_shards ? `SETTINGS skip_unavailable_shards=1` : '' }) await client.rawRequest(request, null, opts.useDefaultDB ? opts.db : undefined) if (opts.updateVer) { diff --git a/lib/db/maintain/scripts.js b/lib/db/maintain/scripts.js index 8baaf313..aac4b8ad 100644 --- a/lib/db/maintain/scripts.js +++ b/lib/db/maintain/scripts.js @@ -207,7 +207,7 @@ module.exports.overall_dist = [ \`count\` AggregateFunction(count), \`sum\` SimpleAggregateFunction(sum, Float64), \`bytes\` SimpleAggregateFunction(sum, Float64) -) ENGINE = Distributed('{{CLUSTER}}', '{{DB}}', 'metrics_15s', fingerprint)`, +) ENGINE = Distributed('{{CLUSTER}}', '{{DB}}', 'metrics_15s', fingerprint) {{{DIST_CREATE_SETTINGS}}};`, `CREATE TABLE IF NOT EXISTS {{DB}}.samples_v3_dist {{{OnCluster}}} ( \`fingerprint\` UInt64, @@ -221,7 +221,7 @@ module.exports.overall_dist = [ \`fingerprint\` UInt64, \`labels\` String, \`name\` String -) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series', fingerprint);`, +) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series', fingerprint) {{{DIST_CREATE_SETTINGS}}};`, `CREATE TABLE IF NOT EXISTS {{DB}}.settings_dist {{{OnCluster}}} ( \`fingerprint\` UInt64, @@ -229,14 +229,14 @@ module.exports.overall_dist = [ \`name\` String, \`value\` String, \`inserted_at\` DateTime64(9, 'UTC') -) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'settings', rand());`, +) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'settings', rand()) {{{DIST_CREATE_SETTINGS}}};`, `CREATE TABLE IF NOT EXISTS {{DB}}.time_series_gin_dist {{{OnCluster}}} ( date Date, key String, val String, fingerprint UInt64 - ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series_gin', rand());`, + ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series_gin', rand()) {{{DIST_CREATE_SETTINGS}}};`, 'ALTER TABLE {{DB}}.metrics_15s_dist {{{OnCluster}}} ADD COLUMN IF NOT EXISTS `type` UInt8;', @@ -254,7 +254,7 @@ module.exports.traces_dist = [ key String, val_id String, val String -) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_kv', sipHash64(oid, key));`, +) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_kv', sipHash64(oid, key)) {{{DIST_CREATE_SETTINGS}}};`, `CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_dist {{{OnCluster}}} ( oid String, @@ -267,7 +267,7 @@ module.exports.traces_dist = [ service_name String, payload_type Int8, payload String, -) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces', sipHash64(oid, trace_id));`, +) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces', sipHash64(oid, trace_id)) {{{DIST_CREATE_SETTINGS}}};`, `CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_attrs_gin_dist {{{OnCluster}}} ( oid String, @@ -278,7 +278,7 @@ module.exports.traces_dist = [ span_id FixedString(8), timestamp_ns Int64, duration Int64 -) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_attrs_gin', sipHash64(oid, trace_id));` +) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_attrs_gin', sipHash64(oid, trace_id)) {{{DIST_CREATE_SETTINGS}}};` ] module.exports.profiles = [ @@ -440,7 +440,7 @@ module.exports.profiles_dist = [ payload_type LowCardinality(String), payload String, values_agg Array(Tuple(String, Int64, Int32)) - ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles', fingerprint);`, + ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles', fingerprint) {{{DIST_CREATE_SETTINGS}}};`, `CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_dist {{{OnCluster}}} ( date Date, @@ -448,7 +448,7 @@ module.exports.profiles_dist = [ service_name LowCardinality(String), fingerprint UInt64 CODEC(DoubleDelta, ZSTD(1)), tags Array(Tuple(String, String)) CODEC(ZSTD(1)) - ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series',fingerprint);`, + ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series',fingerprint) {{{DIST_CREATE_SETTINGS}}};`, `CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_gin_dist {{{OnCluster}}} ( date Date, @@ -457,14 +457,14 @@ module.exports.profiles_dist = [ type_id LowCardinality(String), service_name LowCardinality(String), fingerprint UInt64 CODEC(DoubleDelta, ZSTD(1)) - ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_gin',fingerprint);`, + ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_gin',fingerprint) {{{DIST_CREATE_SETTINGS}}};`, `CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_keys_dist {{{OnCluster}}} ( date Date, key String, val String, val_id UInt64 - ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand());`, + ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand()) {{{DIST_CREATE_SETTINGS}}};`, `ALTER TABLE {{DB}}.profiles_dist {{{OnCluster}}} ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))),