Skip to content

Commit

Permalink
feat: dist support skip_unavailable_shards and custom settings
Browse files Browse the repository at this point in the history
  • Loading branch information
Cluas committed Sep 27, 2024
1 parent 312885d commit ddf8596
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 23 deletions.
4 changes: 3 additions & 1 deletion lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const { getClickhouseUrl } = require('./clickhouse_options')

// External Storage Policy for Tables (S3, MINIO)
const storagePolicy = process.env.STORAGE_POLICY || false
// Clickhouse Distributed Engine setting to skip unavailable shards
const skipUnavailableShards = process.env.SKIP_UNAVAILABLE_SHARDS || false

const { StringStream, DataStream } = require('scramjet')

Expand Down Expand Up @@ -188,7 +190,7 @@ const initialize = async function (dbName) {
}
if (!isOmitTablesCreation()) {
const maintain = require('./maintain/index')
await maintain.upgrade({ name: dbName, storage_policy: storagePolicy })
await maintain.upgrade({ name: dbName, storage_policy: storagePolicy, skip_unavailable_shards: skipUnavailableShards })
await maintain.rotate([{
db: dbName,
samples_days: rotationSamples,
Expand Down
24 changes: 13 additions & 11 deletions lib/db/maintain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ const getEnv = () => {

/**
*
* @param db {{name: string, storage_policy: string}}
* @param db {{name: string, storage_policy: string, skip_unavailable_shards: boolean}}
* @returns {Promise<void>}
*/
module.exports.upgrade = async (db) => {
await upgradeSingle(db.name, 1, scripts.overall, db.storage_policy)
await upgradeSingle(db.name, 2, scripts.traces, db.storage_policy)
await upgradeSingle(db.name, 5, scripts.profiles, db.storage_policy)
await upgradeSingle(db.name, 1, scripts.overall, db.storage_policy, false)
await upgradeSingle(db.name, 2, scripts.traces, db.storage_policy, false)
await upgradeSingle(db.name, 5, scripts.profiles, db.storage_policy, false)
if (db.storage_policy) {
await client.addSetting('rotate', 'v3_storage_policy', db.storage_policy, db.name)
await client.addSetting('rotate', 'v1_traces_storage_policy', db.storage_policy, db.name)
}
if (clusterName) {
await upgradeSingle(db.name, 3, scripts.overall_dist, db.storage_policy)
await upgradeSingle(db.name, 4, scripts.traces_dist, db.storage_policy)
await upgradeSingle(db.name, 6, scripts.profiles_dist, db.storage_policy)
await upgradeSingle(db.name, 3, scripts.overall_dist, db.storage_policy, db.skip_unavailable_shards)
await upgradeSingle(db.name, 4, scripts.traces_dist, db.storage_policy, db.skip_unavailable_shards)
await upgradeSingle(db.name, 6, scripts.profiles_dist, db.storage_policy, db.skip_unavailable_shards)
}
}

Expand All @@ -39,10 +39,11 @@ let isDBCreated = false
* @param key {number}
* @param scripts {string[]}
* @param storagePolicy {string}
* @param skipUnavailableShards {boolean}
*/
const upgradeSingle = async (db, key, scripts, storagePolicy) => {
const upgradeSingle = async (db, key, scripts, storagePolicy, skipUnavailableShards) => {
const _upgradeRequest = (request, useDefaultDB, updateVer) => {
return upgradeRequest({ db, useDefaultDB, updateVer, storage_policy: storagePolicy }, request)
return upgradeRequest({ db, useDefaultDB, updateVer, storage_policy: storagePolicy, skip_unavailable_shards: skipUnavailableShards }, request)
}
if (!isDBCreated) {
isDBCreated = true
Expand All @@ -68,7 +69,7 @@ const upgradeSingle = async (db, key, scripts, storagePolicy) => {
}

/**
* @param opts {{db: string, useDefaultDB: boolean, updateVer: {key: number, to: number}, storage_policy: string}}
* @param opts {{db: string, useDefaultDB: boolean, updateVer: {key: number, to: number}, storage_policy: string, skip_unavailable_shards: boolean}}
* @param request {string} database to update
* @returns {Promise<void>}
*/
Expand All @@ -83,7 +84,8 @@ const upgradeRequest = async (opts, request) => {
MergeTree: clusterName ? 'ReplicatedMergeTree' : 'MergeTree',
ReplacingMergeTree: clusterName ? 'ReplicatedReplacingMergeTree' : 'ReplacingMergeTree',
AggregatingMergeTree: clusterName ? 'ReplicatedAggregatingMergeTree' : 'AggregatingMergeTree',
CREATE_SETTINGS: opts.storage_policy ? `SETTINGS storage_policy='${opts.storage_policy}'` : ''
CREATE_SETTINGS: opts.storage_policy ? `SETTINGS storage_policy='${opts.storage_policy}'` : '',
DIST_CREATE_SETTINGS: opts.skip_unavailable_shards ? `SETTINGS skip_unavailable_shards=1` : ''
})
await client.rawRequest(request, null, opts.useDefaultDB ? opts.db : undefined)
if (opts.updateVer) {
Expand Down
22 changes: 11 additions & 11 deletions lib/db/maintain/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ module.exports.overall_dist = [
\`count\` AggregateFunction(count),
\`sum\` SimpleAggregateFunction(sum, Float64),
\`bytes\` SimpleAggregateFunction(sum, Float64)
) ENGINE = Distributed('{{CLUSTER}}', '{{DB}}', 'metrics_15s', fingerprint)`,
) ENGINE = Distributed('{{CLUSTER}}', '{{DB}}', 'metrics_15s', fingerprint) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.samples_v3_dist {{{OnCluster}}} (
\`fingerprint\` UInt64,
Expand All @@ -221,22 +221,22 @@ module.exports.overall_dist = [
\`fingerprint\` UInt64,
\`labels\` String,
\`name\` String
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series', fingerprint);`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series', fingerprint) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.settings_dist {{{OnCluster}}} (
\`fingerprint\` UInt64,
\`type\` String,
\`name\` String,
\`value\` String,
\`inserted_at\` DateTime64(9, 'UTC')
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'settings', rand());`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'settings', rand()) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.time_series_gin_dist {{{OnCluster}}} (
date Date,
key String,
val String,
fingerprint UInt64
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series_gin', rand());`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series_gin', rand()) {{{DIST_CREATE_SETTINGS}}};`,

'ALTER TABLE {{DB}}.metrics_15s_dist {{{OnCluster}}} ADD COLUMN IF NOT EXISTS `type` UInt8;',

Expand All @@ -254,7 +254,7 @@ module.exports.traces_dist = [
key String,
val_id String,
val String
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_kv', sipHash64(oid, key));`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_kv', sipHash64(oid, key)) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_dist {{{OnCluster}}} (
oid String,
Expand All @@ -267,7 +267,7 @@ module.exports.traces_dist = [
service_name String,
payload_type Int8,
payload String,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces', sipHash64(oid, trace_id));`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces', sipHash64(oid, trace_id)) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_attrs_gin_dist {{{OnCluster}}} (
oid String,
Expand All @@ -278,7 +278,7 @@ module.exports.traces_dist = [
span_id FixedString(8),
timestamp_ns Int64,
duration Int64
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_attrs_gin', sipHash64(oid, trace_id));`
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_attrs_gin', sipHash64(oid, trace_id)) {{{DIST_CREATE_SETTINGS}}};`
]

module.exports.profiles = [
Expand Down Expand Up @@ -440,15 +440,15 @@ module.exports.profiles_dist = [
payload_type LowCardinality(String),
payload String,
values_agg Array(Tuple(String, Int64, Int32))
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles', fingerprint);`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles', fingerprint) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_dist {{{OnCluster}}} (
date Date,
type_id LowCardinality(String),
service_name LowCardinality(String),
fingerprint UInt64 CODEC(DoubleDelta, ZSTD(1)),
tags Array(Tuple(String, String)) CODEC(ZSTD(1))
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series',fingerprint);`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series',fingerprint) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_gin_dist {{{OnCluster}}} (
date Date,
Expand All @@ -457,14 +457,14 @@ module.exports.profiles_dist = [
type_id LowCardinality(String),
service_name LowCardinality(String),
fingerprint UInt64 CODEC(DoubleDelta, ZSTD(1))
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_gin',fingerprint);`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_gin',fingerprint) {{{DIST_CREATE_SETTINGS}}};`,

`CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_keys_dist {{{OnCluster}}} (
date Date,
key String,
val String,
val_id UInt64
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand());`,
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand()) {{{DIST_CREATE_SETTINGS}}};`,

`ALTER TABLE {{DB}}.profiles_dist {{{OnCluster}}}
ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))),
Expand Down

0 comments on commit ddf8596

Please sign in to comment.