From 9cae357679f53d2013934fbeffb3b80a64cc0297 Mon Sep 17 00:00:00 2001 From: rongtianyang Date: Thu, 14 Nov 2024 15:14:38 +0800 Subject: [PATCH 1/2] init kaiwudb support --- Makefile | 6 +- .../databases/kaiwudb/common.go | 62 +++ .../databases/kaiwudb/devops.go | 220 +++++++++++ .../databases/kaiwudb/iot.go | 252 ++++++++++++ cmd/tsbs_load_kaiwudb/main.go | 56 +++ cmd/tsbs_run_queries_kaiwudb/main.go | 171 ++++++++ go.mod | 2 + go.sum | 57 ++- pkg/query/factories/init_factories.go | 7 + pkg/query/kaiwudb.go | 59 +++ pkg/targets/constants/constants.go | 2 + .../initializers/target_initializers.go | 3 + pkg/targets/kaiwudb/benchmark.go | 64 +++ pkg/targets/kaiwudb/commonpool/pool.go | 148 +++++++ pkg/targets/kaiwudb/creator.go | 64 +++ pkg/targets/kaiwudb/file_data_source.go | 69 ++++ pkg/targets/kaiwudb/implemented_target.go | 48 +++ pkg/targets/kaiwudb/parse_prepare.go | 313 +++++++++++++++ pkg/targets/kaiwudb/process_insert.go | 241 ++++++++++++ pkg/targets/kaiwudb/process_prepare.go | 372 ++++++++++++++++++ pkg/targets/kaiwudb/program_options.go | 13 + pkg/targets/kaiwudb/scan.go | 89 +++++ pkg/targets/kaiwudb/serializer.go | 290 ++++++++++++++ pkg/targets/kaiwudb/thread/locker.go | 33 ++ pkg/targets/kaiwudb/thread/locker_test.go | 41 ++ 25 files changed, 2667 insertions(+), 15 deletions(-) create mode 100644 cmd/tsbs_generate_queries/databases/kaiwudb/common.go create mode 100644 cmd/tsbs_generate_queries/databases/kaiwudb/devops.go create mode 100644 cmd/tsbs_generate_queries/databases/kaiwudb/iot.go create mode 100644 cmd/tsbs_load_kaiwudb/main.go create mode 100644 cmd/tsbs_run_queries_kaiwudb/main.go create mode 100644 pkg/query/kaiwudb.go create mode 100644 pkg/targets/kaiwudb/benchmark.go create mode 100644 pkg/targets/kaiwudb/commonpool/pool.go create mode 100644 pkg/targets/kaiwudb/creator.go create mode 100644 pkg/targets/kaiwudb/file_data_source.go create mode 100644 pkg/targets/kaiwudb/implemented_target.go create mode 100644 pkg/targets/kaiwudb/parse_prepare.go create mode 100644 pkg/targets/kaiwudb/process_insert.go create mode 100644 pkg/targets/kaiwudb/process_prepare.go create mode 100644 pkg/targets/kaiwudb/program_options.go create mode 100644 pkg/targets/kaiwudb/scan.go create mode 100644 pkg/targets/kaiwudb/serializer.go create mode 100644 pkg/targets/kaiwudb/thread/locker.go create mode 100644 pkg/targets/kaiwudb/thread/locker_test.go diff --git a/Makefile b/Makefile index 1f7388f65..1a67d4793 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,8 @@ loaders: tsbs_load \ tsbs_load_timescaledb \ tsbs_load_victoriametrics \ tsbs_load_questdb \ - tsbs_load_iotdb + tsbs_load_iotdb \ + tsbs_load_kaiwudb runners: tsbs_run_queries_akumuli \ tsbs_run_queries_cassandra \ @@ -40,7 +41,8 @@ runners: tsbs_run_queries_akumuli \ tsbs_run_queries_timestream \ tsbs_run_queries_victoriametrics \ tsbs_run_queries_questdb \ - tsbs_run_queries_iotdb + tsbs_run_queries_iotdb \ + tsbs_run_queries_kaiwudb test: $(GOTEST) -v ./... diff --git a/cmd/tsbs_generate_queries/databases/kaiwudb/common.go b/cmd/tsbs_generate_queries/databases/kaiwudb/common.go new file mode 100644 index 000000000..5d8d4fa1d --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/kaiwudb/common.go @@ -0,0 +1,62 @@ +package kaiwudb + +import ( + "time" + + "github.com/benchant/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/benchant/tsbs/cmd/tsbs_generate_queries/uses/iot" + "github.com/benchant/tsbs/cmd/tsbs_generate_queries/utils" + "github.com/benchant/tsbs/pkg/query" +) + +// BaseGenerator contains settings specific for Influx database. +type BaseGenerator struct { + ReadingDBName string + DiagnosticsDBName string + CPUDBName string +} + +func (g *BaseGenerator) GenerateEmptyQuery() query.Query { + return query.NewKaiwudb() +} + +// fillInQuery fills the query struct with data. +func (g *BaseGenerator) fillInQuery(qi query.Query, humanLabel, humanDesc, table, sql string) { + q := qi.(*query.Kaiwudb) + q.HumanLabel = []byte(humanLabel) + q.HumanDescription = []byte(humanDesc) + q.Hypertable = []byte(table) + q.SqlQuery = []byte(sql) +} + +// NewDevops creates a new devops use case query generator. +func (g *BaseGenerator) NewDevops(start, end time.Time, scale int) (utils.QueryGenerator, error) { + core, err := devops.NewCore(start, end, scale) + + if err != nil { + return nil, err + } + + devops := &Devops{ + BaseGenerator: g, + Core: core, + } + + return devops, nil +} + +// NewIoT creates a new iot use case query generator. +func (g *BaseGenerator) NewIoT(start, end time.Time, scale int) (utils.QueryGenerator, error) { + core, err := iot.NewCore(start, end, scale) + + if err != nil { + return nil, err + } + + iot := &IoT{ + BaseGenerator: g, + Core: core, + } + + return iot, nil +} diff --git a/cmd/tsbs_generate_queries/databases/kaiwudb/devops.go b/cmd/tsbs_generate_queries/databases/kaiwudb/devops.go new file mode 100644 index 000000000..58ca438a7 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/kaiwudb/devops.go @@ -0,0 +1,220 @@ +package kaiwudb + +import ( + "fmt" + "strings" + "time" + + "github.com/benchant/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/benchant/tsbs/pkg/query" +) + +// TODO: Remove the need for this by continuing to bubble up errors +func panicIfErr(err error) { + if err != nil { + panic(err.Error()) + } +} + +// Devops produces TimescaleDB-specific queries for all the devops query types. +type Devops struct { + *BaseGenerator + *devops.Core +} + +// getHostWhereWithHostnames creates WHERE SQL statement for multiple hostnames. +// NOTE 'WHERE' itself is not included, just hostname filter clauses, ready to concatenate to 'WHERE' string +func (d *Devops) getHostWhereWithHostnames(hostnames []string) string { + var hostnameClauses []string + for _, s := range hostnames { + hostnameClauses = append(hostnameClauses, fmt.Sprintf("'%s'", s)) + } + return fmt.Sprintf("satisfying device.hostname in (%s)", strings.Join(hostnameClauses, ",")) +} + +// getHostWhereString gets multiple random hostnames and creates a WHERE SQL statement for these hostnames. +func (d *Devops) getHostWhereString(nHosts int) string { + hostnames, err := d.GetRandomHosts(nHosts) + panicIfErr(err) + return d.getHostWhereWithHostnames(hostnames) +} + +func (d *Devops) getSelectClausesAggMetrics(agg string, metrics []string) []string { + selectClauses := make([]string, len(metrics)) + for i, m := range metrics { + selectClauses[i] = fmt.Sprintf("%s(%s)", agg, m) + } + + return selectClauses +} + +func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { + interval := d.Interval.MustRandWindow(timeRange) + metrics, err := devops.GetCPUMetricsSlice(numMetrics) + panicIfErr(err) + selectClauses := d.getSelectClausesAggMetrics("max", metrics) + if len(selectClauses) < 1 { + panic(fmt.Sprintf("invalid number of select clauses: got %d", len(selectClauses))) + } + hostnames, err := d.GetRandomHosts(nHosts) + if nil != err { + panic(fmt.Sprintf("get randam host error %s", err.Error())) + } else if len(hostnames) < 1 { + panic(fmt.Sprintf("invalid number of host: got %d", len(hostnames))) + } + + sql := "" + if nHosts == 1 { + sql = fmt.Sprintf(`SELECT time_bucket('60s', k_timestamp) as k_timestamp, %s FROM %s.cpu WHERE hostname = '%s' AND k_timestamp >= '%s' AND k_timestamp < '%s' GROUP BY time_bucket('60s', k_timestamp) ORDER BY time_bucket('60s', k_timestamp)`, + strings.Join(selectClauses, ", "), + d.CPUDBName, + hostnames[0], + // int(time.UnixMilli(interval.StartUnixMillis()).UTC().UnixMilli()), + // int(time.UnixMilli(interval.EndUnixMillis()).UTC().UnixMilli())) + parseTime(time.UnixMilli(interval.StartUnixMillis()).UTC()), + parseTime(time.UnixMilli(interval.EndUnixMillis()).UTC())) + } else { + sql = fmt.Sprintf(`SELECT time_bucket('60s', k_timestamp) as k_timestamp, %s + FROM %s.cpu + WHERE hostname IN (%s) AND k_timestamp >= '%s' AND k_timestamp < '%s' + GROUP BY time_bucket('60s', k_timestamp) + ORDER BY time_bucket('60s', k_timestamp)`, + strings.Join(selectClauses, ", "), + d.CPUDBName, + "'"+strings.Join(hostnames, "', '")+"'", + parseTime(time.UnixMilli(interval.StartUnixMillis()).UTC()), + parseTime(time.UnixMilli(interval.EndUnixMillis()).UTC())) + } + + humanLabel := fmt.Sprintf("KaiwuDB %d cpu metric(s), random %4d hosts, random %s by 1m", numMetrics, nHosts, timeRange) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) +} + +func (d *Devops) GroupByOrderByLimit(qi query.Query) { + interval := d.Interval.MustRandWindow(time.Hour) + sql := fmt.Sprintf(`SELECT time_bucket('60s', k_timestamp) as k_timestamp, max(usage_user) + FROM %s.cpu + WHERE k_timestamp < '%s' + GROUP BY time_bucket('60s', k_timestamp) + ORDER BY time_bucket('60s', k_timestamp) + LIMIT 5`, + d.CPUDBName, + parseTime(time.UnixMilli(interval.EndUnixMillis()).UTC())) + + humanLabel := "KaiwuDB max cpu over last 5 min-intervals (random end)" + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.EndString()) + d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) +} + +// GroupByTimeAndPrimaryTag selects the AVG of numMetrics metrics under 'cpu' per device per hour for a day, +func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { + metrics, err := devops.GetCPUMetricsSlice(numMetrics) + panicIfErr(err) + interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration) + + selectClauses := d.getSelectClausesAggMetrics("avg", metrics) + sql := fmt.Sprintf(`SELECT time_bucket('3600s', k_timestamp) as k_timestamp, hostname, %s + FROM %s.cpu + WHERE k_timestamp >= '%s' AND k_timestamp < '%s' + GROUP BY hostname, time_bucket('3600s', k_timestamp) + ORDER BY hostname, time_bucket('3600s', k_timestamp)`, + strings.Join(selectClauses, ", "), + d.CPUDBName, + parseTime(time.UnixMilli(interval.StartUnixMillis()).UTC()), + parseTime(time.UnixMilli(interval.EndUnixMillis()).UTC())) + + humanLabel := devops.GetDoubleGroupByLabel("KaiwuDB", numMetrics) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) +} + +func (d *Devops) MaxAllCPU(qi query.Query, nHosts int, duration time.Duration) { + interval := d.Interval.MustRandWindow(duration) + + metrics := devops.GetAllCPUMetrics() + selectClauses := d.getSelectClausesAggMetrics("max", metrics) + if len(selectClauses) < 1 { + panic(fmt.Sprintf("invalid number of select clauses: got %d", len(selectClauses))) + } + hostnames, err := d.GetRandomHosts(nHosts) + if nil != err { + panic(fmt.Sprintf("get randam host error %s", err.Error())) + } else if len(hostnames) < 1 { + panic(fmt.Sprintf("invalid number of host: got %d", len(hostnames))) + } + sql := "" + if nHosts == 1 { + sql = fmt.Sprintf(`SELECT time_bucket('3600s', k_timestamp) as k_timestamp, %s + FROM %s.cpu + WHERE hostname = '%s' AND k_timestamp >= '%s' AND k_timestamp < '%s' + GROUP BY time_bucket('3600s', k_timestamp) + ORDER BY time_bucket('3600s', k_timestamp)`, + strings.Join(selectClauses, ", "), + d.CPUDBName, + hostnames[0], + parseTime(time.UnixMilli(interval.StartUnixMillis()).UTC()), + parseTime(time.UnixMilli(interval.EndUnixMillis()).UTC())) + } else { + sql = fmt.Sprintf(`SELECT time_bucket('3600s', k_timestamp) as k_timestamp, %s + FROM %s.cpu + WHERE hostname IN (%s) AND k_timestamp >= '%s' AND k_timestamp < '%s' + GROUP BY time_bucket('3600s', k_timestamp) + ORDER BY time_bucket('3600s', k_timestamp)`, + strings.Join(selectClauses, ", "), + d.CPUDBName, + "'"+strings.Join(hostnames, "', '")+"'", + parseTime(time.UnixMilli(interval.StartUnixMillis()).UTC()), + parseTime(time.UnixMilli(interval.EndUnixMillis()).UTC())) + } + humanLabel := devops.GetMaxAllLabel("KaiwuDB", nHosts) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) +} + +func (d *Devops) LastPointPerHost(qi query.Query) { + metrics, err := devops.GetCPUMetricsSlice(devops.GetCPUMetricsLen()) + panicIfErr(err) + selectClauses := d.getSelectClausesAggMetrics("last", metrics) + if len(selectClauses) != devops.GetCPUMetricsLen() { + panic(fmt.Sprintf("invalid number of select clauses: got %d", len(selectClauses))) + } + + sql := fmt.Sprintf(`SELECT last(k_timestamp), %s, hostname FROM %s.cpu GROUP BY hostname`, + strings.Join(selectClauses, ", "), + d.CPUDBName) + + humanLabel := "KaiwuDB last row per host" + humanDesc := humanLabel + d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) +} + +func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { + interval := d.Interval.MustRandWindow(devops.HighCPUDuration) + //var hostWhereClause string + //if nHosts == 0 { + // hostWhereClause = "" + //} else { + // hostWhereClause = fmt.Sprintf("AND %s", d.getHostWhereString(nHosts)) + //} + + sql := "" + if nHosts == 1 { + hostnames, err := d.GetRandomHosts(nHosts) + panicIfErr(err) + sql = fmt.Sprintf(`SELECT * FROM %s.cpu WHERE hostname='%s' AND usage_user > 90.0 AND k_timestamp >= '%s' AND k_timestamp < '%s'`, + d.CPUDBName, + hostnames[0], + parseTime(time.UnixMilli(interval.StartUnixMillis()).UTC()), + parseTime(time.UnixMilli(interval.EndUnixMillis()).UTC())) + } else { + sql = fmt.Sprintf(`SELECT * FROM %s.cpu WHERE usage_user > 90.0 AND k_timestamp >= '%s' AND k_timestamp < '%s'`, + d.CPUDBName, + parseTime(time.UnixMilli(interval.StartUnixMillis()).UTC()), + parseTime(time.UnixMilli(interval.EndUnixMillis()).UTC())) + } + humanLabel, err := devops.GetHighCPULabel("KaiwuDB", nHosts) + panicIfErr(err) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, devops.TableName, sql) +} diff --git a/cmd/tsbs_generate_queries/databases/kaiwudb/iot.go b/cmd/tsbs_generate_queries/databases/kaiwudb/iot.go new file mode 100644 index 000000000..86065f204 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/kaiwudb/iot.go @@ -0,0 +1,252 @@ +package kaiwudb + +import ( + "fmt" + "strings" + "time" + + "github.com/benchant/tsbs/cmd/tsbs_generate_queries/uses/iot" + "github.com/benchant/tsbs/pkg/query" +) + +// IoT produces KaiwuDB-specific queries for all the iot query types. +type IoT struct { + *iot.Core + *BaseGenerator +} + +//last-loc +//single-last-loc +//low-fuel +//avg-vs-projected-fuel-consumption +//avg-daily-driving-duration +//daily-activity + +func (i *IoT) getTrucksWhereWithNames(names []string) string { + var nameClauses []string + + for _, s := range names { + nameClauses = append(nameClauses, fmt.Sprintf("'%s'", s)) + } + return fmt.Sprintf("name IN (%s)", strings.Join(nameClauses, ",")) +} + +// getHostWhereString gets multiple random hostnames and creates a WHERE SQL statement for these hostnames. +func (i *IoT) getTruckWhereString(nTrucks int) string { + names, err := i.GetRandomTrucks(nTrucks) + panicIfErr(err) + return i.getTrucksWhereWithNames(names) +} + +// LastLocByTruck finds the truck location for nTrucks. +func (i *IoT) LastLocByTruck(qi query.Query, nTrucks int) { + // sql := fmt.Sprintf(`SELECT last_row(ts),last_row(latitude),last_row(longitude) FROM readings WHERE %s GROUP BY name`, + sql := fmt.Sprintf(`SELECT last(k_timestamp), last(latitude), last(longitude) FROM benchmark.readings WHERE %s GROUP BY name`, + i.getTruckWhereString(nTrucks)) + + humanLabel := "KaiwuDB last location by specific truck" + humanDesc := fmt.Sprintf("%s: random %4d trucks", humanLabel, nTrucks) + + i.fillInQuery(qi, humanLabel, humanDesc, iot.ReadingsTableName, sql) +} + +// LastLocPerTruck finds all the truck locations along with truck and driver names. +func (i *IoT) LastLocPerTruck(qi query.Query) { + //SELECT last_row(ts),name,driver,latitude,longitude FROM readings WHERE fleet='South' and name IS NOT NULL partition BY name,driver; + // sql := fmt.Sprintf(`SELECT last(k_timestamp) as k_timestamp,name,driver,last(latitude) as latitude,last(longitude) as longitude unionallfrom %s.* satisfying device.fleet='%s' group by name,driver`, + // i.ReadingDBName, i.GetRandomFleet()) + sql := fmt.Sprintf(`SELECT last(k_timestamp), name, driver, last(latitude) as latitude, last(longitude) as longitude FROM benchmark.readings WHERE fleet='%s' and name IS NOT NULL GROUP BY name, driver`, + i.GetRandomFleet()) + + humanLabel := "KaiwuDB last location per truck" + humanDesc := humanLabel + + i.fillInQuery(qi, humanLabel, humanDesc, iot.ReadingsTableName, sql) +} + +// TrucksWithLowFuel finds all trucks with low fuel (less than 10%). +func (i *IoT) TrucksWithLowFuel(qi query.Query) { + //SELECT last_row(ts),name,driver,fuel_state FROM diagnostics WHERE fuel_state <= 0.1 AND fleet = 'South' and name IS NOT NULL GROUP BY name,driver ; + // sql := fmt.Sprintf(`SELECT last(k_timestamp) as k_timestamp,name,driver,last(fuel_state) as fuel_state unionallfrom %s.* satisfying device.fleet = '%s' WHERE fuel_state <= 0.1 GROUP BY name,driver`, + // i.DiagnosticsDBName, i.GetRandomFleet()) + sql := fmt.Sprintf(`SELECT last(k_timestamp) as k_timestamp,name, driver,last(fuel_state) as fuel_state FROM benchmark.diagnostics where fleet = '%s' and fuel_state <= 0.1 GROUP BY name, driver`, + i.GetRandomFleet()) + + humanLabel := "KaiwuDB trucks with low fuel" + humanDesc := fmt.Sprintf("%s: under 10 percent", humanLabel) + + i.fillInQuery(qi, humanLabel, humanDesc, iot.DiagnosticsTableName, sql) +} + +// TrucksWithHighLoad finds all trucks that have load over 90%. +func (i *IoT) TrucksWithHighLoad(qi query.Query) { + //SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last_row(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity); + //pre sql := fmt.Sprintf("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last_row(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = '%s' partition by name,driver) WHERE current_load>= (0.9 * load_capacity)", i.GetRandomFleet()) + // sql := fmt.Sprintf("SELECT k_timestamp,name,driver,current_load,load_capacity FROM (SELECT last(k_timestamp) as k_timestamp,name,driver, last(current_load) as current_load,load_capacity unionallfrom %s.* satisfying device.fleet = '%s' group by name,driver,load_capacity) WHERE current_load>= (0.9 * cast(load_capacity as float))", i.DiagnosticsDBName, i.GetRandomFleet()) + sql := fmt.Sprintf("SELECT last(k_timestamp) as k_timestamp,name,driver, last(current_load) as current_load,load_capacity from benchmark.diagnostics where fleet = '%s' and current_load>= (0.9 * cast(load_capacity as float)) group by name,driver,load_capacity", i.GetRandomFleet()) + + humanLabel := "KaiwuDB trucks with high load" + humanDesc := fmt.Sprintf("%s: over 90 percent", humanLabel) + + i.fillInQuery(qi, humanLabel, humanDesc, iot.DiagnosticsTableName, sql) +} + +// StationaryTrucks finds all trucks that have low average velocity in a time window. +func (i *IoT) StationaryTrucks(qi query.Query) { + interval := i.Interval.MustRandWindow(iot.StationaryDuration) + //select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1) WHERE fleet = 'West' AND mean_velocity < 1 ; + // sql := fmt.Sprintf("select name,driver from (SELECT name,driver,avg(velocity) as mean_velocity unionallfrom %s.* satisfying device.fleet = '%s' WHERE k_timestamp > '%s' AND k_timestamp <= '%s' group BY name,driver,time_bucket(k_timestamp,'600s') order by name, driver, time_bucket(k_timestamp,'600s') LIMIT 1) WHERE mean_velocity < 1", i.ReadingDBName, i.GetRandomFleet(), parseTime(time.UnixMilli(interval.StartUnixMillis()).UTC()), parseTime(time.UnixMilli(interval.EndUnixMillis()).UTC())) + sql := fmt.Sprintf("select name,driver from (SELECT name,driver,avg(velocity) as mean_velocity from benchmark.readings where fleet = '%s' and k_timestamp > '%s' AND k_timestamp <= '%s' group BY name,driver) WHERE mean_velocity < 1", i.GetRandomFleet(), parseTime(time.UnixMilli(interval.StartUnixMillis()).UTC()), parseTime(time.UnixMilli(interval.EndUnixMillis()).UTC())) + //sql := fmt.Sprintf("SELECT name,driver FROM readings WHERE ts > '%s' AND ts <= '%s' and fleet = '%s' partition BY name,driver,fleet interval(10m) having (avg(velocity) < 1) LIMIT 1;", interval.StartUnixMillis(), interval.EndUnixMillis(), i.GetRandomFleet()) + humanLabel := "KaiwuDB stationary trucks" + humanDesc := fmt.Sprintf("%s: with low avg velocity in last 10 minutes", humanLabel) + + i.fillInQuery(qi, humanLabel, humanDesc, iot.ReadingsTableName, sql) +} + +// TrucksWithLongDrivingSessions finds all trucks that have not stopped at least 20 mins in the last 4 hours. +func (i *IoT) TrucksWithLongDrivingSessions(qi query.Query) { + interval := i.Interval.MustRandWindow(iot.LongDrivingSessionDuration) + //SELECT name,driver FROM(SELECT _wstart as ts,name,driver,avg(velocity) as mean_velocity FROM readings WHERE fleet ="West" AND ts > '2016-01-03T13:46:34Z' AND ts <= '2016-01-03T17:46:34Z' partition BY name,driver interval(10m)) WHERE mean_velocity > 1 GROUP BY name,driver having count(*) > 22 + //pre sql := fmt.Sprintf("SELECT name,driver FROM(SELECT _wstart as ts,name,driver,avg(velocity) as mean_velocity FROM readings WHERE fleet =\"%s\" AND ts > '%s' AND ts <= '%s' partition BY name,driver interval(10m)) WHERE mean_velocity > 1 GROUP BY name,driver having count(*) > '%s'", i.GetRandomFleet(), interval.StartUnixMillis(), interval.EndUnixMillis(), tenMinutePeriods(5, iot.LongDrivingSessionDuration)) + // sql := fmt.Sprintf("SELECT name,driver FROM(SELECT name,driver,avg(velocity) as mean_velocity FROM readings WHERE fleet =\"%s\" AND ts > '%s' AND ts <= '%s' partition BY name,driver interval(10m)) WHERE mean_velocity > 1 GROUP BY name,driver having count(*) > '%s'", i.GetRandomFleet(), interval.StartUnixMillis(), interval.EndUnixMillis(), tenMinutePeriods(5, iot.LongDrivingSessionDuration)) + sql := fmt.Sprintf("SELECT name,driver FROM (SELECT name,driver,avg(velocity) as mean_velocity FROM benchmark.readings WHERE fleet = '%s' AND k_timestamp > '%s' AND k_timestamp <= '%s' group BY name,driver, time_bucket(k_timestamp, '10m')) WHERE mean_velocity > 1 GROUP BY name,driver having count(*) > %d", i.GetRandomFleet(), parseTime(time.UnixMilli(interval.StartUnixMillis()).UTC()), parseTime(time.UnixMilli(interval.EndUnixMillis()).UTC()), tenMinutePeriods(5, iot.LongDrivingSessionDuration)) + humanLabel := "KaiwuDB trucks with longer driving sessions" + humanDesc := fmt.Sprintf("%s: stopped less than 20 mins in 4 hour period", humanLabel) + + i.fillInQuery(qi, humanLabel, humanDesc, iot.ReadingsTableName, sql) +} + +// TrucksWithLongDailySessions finds all trucks that have driven more than 10 hours in the last 24 hours. +func (i *IoT) TrucksWithLongDailySessions(qi query.Query) { + //SELECT name,driver FROM(SELECT name,driver,avg(velocity) as mean_velocity FROM readings WHERE fleet ='West' AND ts > '2016-01-01T12:31:37Z' AND ts <= '2016-01-05T12:31:37Z' partition BY name,driver interval(10m) ) WHERE mean_velocity > 1 GROUP BY name,driver having count(*) > 60 + + interval := i.Interval.MustRandWindow(iot.DailyDrivingDuration) + // sql := fmt.Sprintf("SELECT name,driver FROM(SELECT name,driver,avg(velocity) as mean_velocity FROM readings WHERE fleet ='%s' AND ts > '%s' AND ts <= '%s' partition BY name,driver interval(10m) ) WHERE mean_velocity > 1 GROUP BY name,driver having count(*) > '%s'", i.GetRandomFleet(), interval.StartUnixMillis(), interval.EndUnixMillis(), tenMinutePeriods(35, iot.DailyDrivingDuration)) + sql := fmt.Sprintf("SELECT name,driver FROM(SELECT name,driver,avg(velocity) as mean_velocity FROM benchmark.readings WHERE fleet ='%s' AND k_timestamp > '%s' AND k_timestamp <= '%s' group BY name,driver, time_bucket(k_timestamp, '10m')) WHERE mean_velocity > 1 GROUP BY name,driver having count(*) > %d", i.GetRandomFleet(), parseTime(time.UnixMilli(interval.StartUnixMillis()).UTC()), parseTime(time.UnixMilli(interval.EndUnixMillis()).UTC()), tenMinutePeriods(35, iot.DailyDrivingDuration)) + + humanLabel := "KaiwuDB trucks with longer daily sessions" + humanDesc := fmt.Sprintf("%s: drove more than 10 hours in the last 24 hours", humanLabel) + + i.fillInQuery(qi, humanLabel, humanDesc, iot.ReadingsTableName, sql) +} + +// AvgVsProjectedFuelConsumption calculates average and projected fuel consumption per fleet. +func (i *IoT) AvgVsProjectedFuelConsumption(qi query.Query) { + //select avg(fuel_consumption) as avg_fuel_consumption,avg(nominal_fuel_consumption) as nominal_fuel_consumption from readings where velocity > 1 group by fleet + // sql := fmt.Sprintf("select avg(fuel_consumption) as avg_fuel_consumption,avg(nominal_fuel_consumption) as nominal_fuel_consumption from readings where velocity > 1 group by fleet") + sql := fmt.Sprintf("select fleet, avg(fuel_consumption) as avg_fuel_consumption,avg(nominal_fuel_consumption) as nominal_fuel_consumption from benchmark.readings where velocity > 1 group by fleet") + humanLabel := "KaiwuDB average vs projected fuel consumption per fleet" + humanDesc := humanLabel + + i.fillInQuery(qi, humanLabel, humanDesc, iot.ReadingsTableName, sql) +} + +// AvgDailyDrivingDuration finds the average driving duration per driver. +func (i *IoT) AvgDailyDrivingDuration(qi query.Query) { + //select fleet,name,driver,avg(hours_driven) as avg_daily_hours from( select _wstart as ts,fleet,name,driver,count(mv)/6 as hours_driven from ( select _wstart as ts,fleet,tbname,name,driver,avg(velocity) as mv from readings where ts > '2016-01-01T00:00:00Z' and ts < '2016-01-05T00:00:01Z' partition by fleet,tbname,name,driver interval(10m) ) where mv >1 partition by fleet,name,driver interval(1d) )partition by fleet,name,driver ; + // sql := fmt.Sprintf("select fleet,name,driver,avg(hours_driven) as avg_daily_hours from( select _wstart as ts,fleet,name,driver,count(mv)/6 as hours_driven from ( select _wstart as ts,fleet,tbname,name,driver,avg(velocity) as mv from readings where ts > '%s' and ts < '%s' partition by fleet,tbname,name,driver interval(10m) ) where mv >1 partition by fleet,name,driver interval(1d) )partition by fleet,name,driver ;", i.Interval.StartUnixMillis(), i.Interval.EndUnixMillis()) + sql := fmt.Sprintf("select fleet,name,driver,avg(hours_driven) as avg_daily_hours from (select k_timestamp,fleet,name,driver,count(avg_v)/6 as hours_driven from (select k_timestamp,fleet,name,driver,avg(velocity) as avg_v from benchmark.readings where k_timestamp > '%s' AND k_timestamp <= '%s' group by k_timestamp,fleet,name,driver, time_bucket(k_timestamp, '10m') ) where avg_v >1 group by k_timestamp,fleet,name,driver, time_bucket(k_timestamp, '1d') ) group by fleet,name,driver", parseTime(time.UnixMilli(i.Interval.StartUnixMillis()).UTC()), parseTime(time.UnixMilli(i.Interval.EndUnixMillis()).UTC())) + + humanLabel := "KaiwuDB average driver driving duration per day" + humanDesc := humanLabel + + i.fillInQuery(qi, humanLabel, humanDesc, iot.ReadingsTableName, sql) +} + +// AvgDailyDrivingSession finds the average driving session without stopping per driver per day. +func (i *IoT) AvgDailyDrivingSession(qi query.Query) { + // select _wstart as ts,name,avg(ela) from (select ts,name,ela from (SELECT ts,name, diff(difka) as dif, diff(cast(ts as bigint)) as ela FROM (SELECT ts,name,difka FROM (SELECT ts,name,diff(mv) AS difka FROM (SELECT _wstart as ts,name,cast(cast(floor(avg(velocity)/5) as bool) as int) AS mv FROM readings WHERE name is not null AND ts > 1451637149138 AND ts < 1451637749138 partition by name interval(10m))partition BY name ) WHERE difka!=0 order by ts) partition BY name order by ts ) WHERE dif = -2 partition BY name order by ts ) partition BY name interval(1d); + //interval := i.Interval + sql := fmt.Sprintf(`WITH driver_status + AS ( + SELECT name, time_bucket(k_timestamp, '600s') AS ten_minutes, avg(velocity) > 5 AS driving + FROM %s.readings + GROUP BY name, ten_minutes + ORDER BY name, ten_minutes + ), driver_status_change + AS ( + SELECT name, ten_minutes AS start, lead(ten_minutes) OVER (PARTITION BY name ORDER BY ten_minutes) AS stop, driving + FROM ( + SELECT name, ten_minutes, driving, lag(driving) OVER (PARTITION BY name ORDER BY ten_minutes) AS prev_driving + FROM driver_status + ) x + WHERE x.driving <> x.prev_driving + ) + SELECT name, time_bucket(start, '86400s') AS day, avg(stop-start) AS duration + FROM driver_status_change + WHERE name IS NOT NULL + AND driving = true + GROUP BY name, day + ORDER BY name, day`, + i.ReadingDBName) + humanLabel := "KaiwuDB average driver driving session without stopping per day" + humanDesc := humanLabel + + i.fillInQuery(qi, humanLabel, humanDesc, iot.ReadingsTableName, sql) +} + +// AvgLoad finds the average load per truck model per fleet. +func (i *IoT) AvgLoad(qi query.Query) { + //select fleet,model,load_capacity,avg(ml/load_capacity) from(SELECT fleet, model,tbname,load_capacity ,avg(current_load) AS ml FROM diagnostics where name is not null partition BY fleet, model,tbname,load_capacity) partition BY fleet, model,load_capacity; + // sql := fmt.Sprintf("select fleet,model,load_capacity,avg(ml/load_capacity) from(SELECT fleet, model,tbname,load_capacity ,avg(current_load) AS ml FROM diagnostics where name is not null partition BY fleet, model,tbname,load_capacity) partition BY fleet, model,load_capacity") + sql := fmt.Sprintf("select fleet,model,load_capacity,avg(avg_load/load_capacity) from(SELECT fleet, model,load_capacity ,avg(current_load) AS avg_load FROM benchmark.diagnostics where name is not null group BY fleet, model, load_capacity) group BY fleet, model, load_capacity") + + humanLabel := "KaiwuDB average load per truck model per fleet" + humanDesc := humanLabel + + i.fillInQuery(qi, humanLabel, humanDesc, iot.ReadingsTableName, sql) +} + +// DailyTruckActivity returns the number of hours trucks has been active (not out-of-commission) per day per fleet per model. +func (i *IoT) DailyTruckActivity(qi query.Query) { + //SELECT _wstart as ts,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= 1451606400000 AND ts < 1451952001000 partition by model, fleet, tbname interval(10m)) WHERE ms1<1 partition by model, fleet interval(1d) + // sql := fmt.Sprintf("SELECT _wstart as ts,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '%s' AND ts < '%s' partition by model, fleet, tbname interval(10m)) WHERE ms1<1 partition by model, fleet interval(1d)", i.Interval.StartUnixMillis(), i.Interval.EndUnixMillis()) + sql := fmt.Sprintf("SELECT k_timestamp, model,fleet,count(avg_status)/144 FROM (SELECT k_timestamp, model, fleet,avg(status) AS avg_status FROM benchmark.diagnostics WHERE k_timestamp > '%s' AND k_timestamp <= '%s' group by k_timestamp, model, fleet, time_bucket(k_timestamp, '10m')) WHERE avg_status<1 group by k_timestamp, model, fleet, time_bucket(k_timestamp, '1d')", parseTime(time.UnixMilli(i.Interval.StartUnixMillis()).UTC()), parseTime(time.UnixMilli(i.Interval.EndUnixMillis()).UTC())) + humanLabel := "KaiwuDB daily truck activity per fleet per model" + humanDesc := humanLabel + + i.fillInQuery(qi, humanLabel, humanDesc, iot.ReadingsTableName, sql) +} + +// TruckBreakdownFrequency calculates the amount of times a truck model broke down in the last period. +func (i *IoT) TruckBreakdownFrequency(qi query.Query) { + // SELECT model,count(state_changed) FROM (SELECT _rowts,model,diff(broken_down) AS state_changed FROM (SELECT model,tb,cast(cast(floor(2*(nzs)) as bool) as int) AS broken_down FROM (SELECT _wstart as ts,model,tbname as tb, sum(cast(cast(status as bool) as int))/count(cast(cast(status as bool) as int)) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2023-01-05T00:00:01Z' partition BY tbname,model interval(10m)) order by ts ) partition BY tb,model ) WHERE state_changed = 1 partition BY model ; + // sql := fmt.Sprintf("SELECT model,count(state_changed) FROM (SELECT _rowts,model,diff(broken_down) AS state_changed FROM (SELECT ts,model,tb,cast(cast(floor(2*(nzs)) as bool) as int) AS broken_down FROM (SELECT _wstart as ts,model,tbname as tb, sum(cast(cast(status as bool) as int))/count(cast(cast(status as bool) as int)) AS nzs FROM diagnostics WHERE ts >= '%s' AND ts < '%s' partition BY tbname,model interval(10m)) order by ts) partition BY tb,model) WHERE state_changed = 1 partition BY model", i.Interval.StartUnixMillis(), i.Interval.EndUnixMillis()) + sql := fmt.Sprintf(`WITH breakdown_per_truck_per_ten_minutes + AS ( + SELECT time_bucket(k_timestamp, '600s') AS ten_minutes, name, count(STATUS = 0) / count(*) >= 0.5 AS broken_down + FROM %s.diagnostics + GROUP BY ten_minutes, name + ), breakdowns_per_truck + AS ( + SELECT ten_minutes, name, broken_down, lead(broken_down) OVER ( + PARTITION BY name ORDER BY ten_minutes + ) AS next_broken_down + FROM breakdown_per_truck_per_ten_minutes + ) + SELECT d.model as model, count(*) + FROM %s.diagnostics d + INNER JOIN breakdowns_per_truck b ON d.name = b.name + WHERE b.name IS NOT NULL + AND broken_down = false AND next_broken_down = true + GROUP BY model`, + i.DiagnosticsDBName, + i.DiagnosticsDBName) + + humanLabel := "KaiwuDB truck breakdown frequency per model" + humanDesc := humanLabel + + i.fillInQuery(qi, humanLabel, humanDesc, iot.ReadingsTableName, sql) +} + +func tenMinutePeriods(minutesPerHour float64, duration time.Duration) int { + durationMinutes := duration.Minutes() + leftover := minutesPerHour * duration.Hours() + return int((durationMinutes - leftover) / 10) +} + +func parseTime(time time.Time) string { + timeStr := strings.Split(time.String(), " ") + return fmt.Sprintf("%s %s", timeStr[0], timeStr[1]) +} diff --git a/cmd/tsbs_load_kaiwudb/main.go b/cmd/tsbs_load_kaiwudb/main.go new file mode 100644 index 000000000..12cd16db5 --- /dev/null +++ b/cmd/tsbs_load_kaiwudb/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "fmt" + + kaiwudb "github.com/benchant/tsbs/pkg/targets/kaiwudb" + + "github.com/benchant/tsbs/internal/utils" + "github.com/benchant/tsbs/load" + "github.com/benchant/tsbs/pkg/data/source" + "github.com/blagojts/viper" + "github.com/spf13/pflag" +) + +func initProgramOptions() (*kaiwudb.LoadingOptions, load.BenchmarkRunner, *load.BenchmarkRunnerConfig) { + target := kaiwudb.NewTarget() + loaderConf := load.BenchmarkRunnerConfig{} + loaderConf.AddToFlagSet(pflag.CommandLine) + target.TargetSpecificFlags("", pflag.CommandLine) + pflag.Parse() + err := utils.SetupConfigFile() + + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + + if err := viper.Unmarshal(&loaderConf); err != nil { + panic(fmt.Errorf("unable to decode config: %s", err)) + } + opts := kaiwudb.LoadingOptions{} + viper.SetTypeByDefaultValue(true) + opts.User = viper.GetString("user") + opts.Pass = viper.GetString("pass") + opts.Host = viper.GetStringSlice("host") + opts.Port = viper.GetIntSlice("port") + opts.DBName = viper.GetString("db-name") + opts.Workers = viper.GetInt("workers") + opts.DoCreate = viper.GetBool("do-create-db") + opts.Type = viper.GetString("insert-type") + + loaderConf.NoFlowControl = true + loaderConf.ChannelCapacity = 50 + loader := load.GetBenchmarkRunner(loaderConf) + return &opts, loader, &loaderConf +} +func main() { + opts, loader, loaderConf := initProgramOptions() + benchmark, err := kaiwudb.NewBenchmark(loaderConf.DBName, opts, &source.DataSourceConfig{ + Type: source.FileDataSourceType, + File: &source.FileDataSourceConfig{Location: loaderConf.FileName}, + }) + if err != nil { + panic(err) + } + loader.RunBenchmark(benchmark) +} diff --git a/cmd/tsbs_run_queries_kaiwudb/main.go b/cmd/tsbs_run_queries_kaiwudb/main.go new file mode 100644 index 000000000..be7e024b6 --- /dev/null +++ b/cmd/tsbs_run_queries_kaiwudb/main.go @@ -0,0 +1,171 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "time" + + "github.com/benchant/tsbs/internal/utils" + "github.com/benchant/tsbs/pkg/query" + "github.com/benchant/tsbs/pkg/targets/kaiwudb" + "github.com/benchant/tsbs/pkg/targets/kaiwudb/commonpool" + "github.com/blagojts/viper" + "github.com/jackc/pgx/v5" + "github.com/pkg/errors" + "github.com/spf13/pflag" +) + +var ( + user string + pass string + host string + port int + format int + mode int + runner *query.BenchmarkRunner +) + +func init() { + var config query.BenchmarkRunnerConfig + config.AddToFlagSet(pflag.CommandLine) + + pflag.String("user", "root", "User to connect to kaiwudb") + pflag.String("pass", "", "Password for the user connecting to kaiwudb") + pflag.String("host", "", "kaiwudb host") + pflag.Int("port", 36257, "kaiwudb Port") + pflag.Int("query-mode", int(pgx.QueryExecModeSimpleProtocol), "kaiwudb pgx query mode") + pflag.Parse() + err := utils.SetupConfigFile() + + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + if err := viper.Unmarshal(&config); err != nil { + panic(fmt.Errorf("unable to decode config: %s", err)) + } + user = viper.GetString("user") + pass = viper.GetString("pass") + host = viper.GetString("host") + port = viper.GetInt("port") + mode = viper.GetInt("query-mode") + runner = query.NewBenchmarkRunner(config) + format = 0 +} +func main() { + runner.Run(&query.KaiwudbPool, newProcessor) +} + +type queryExecutorOptions struct { + debug bool + printResponse bool +} + +type processor struct { + db *commonpool.Conn + opts *queryExecutorOptions + queryMode pgx.QueryExecMode + parse *kaiwudb.ParsePrepare +} + +func (p *processor) Init(workerNum int) { + db, err := commonpool.GetConnection(user, pass, host, port, format) + if err != nil { + panic(err) + } + p.db = db + p.opts = &queryExecutorOptions{ + debug: runner.DebugLevel() > 0, + printResponse: runner.DoPrintResponses(), + } + + p.queryMode = pgx.QueryExecMode(mode) +} + +func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { + tq := q.(*query.Kaiwudb) + + start := time.Now() + qry := string(tq.SqlQuery) + ctx := context.Background() + + var rows pgx.Rows + var err error + switch p.queryMode { + case pgx.QueryExecModeCacheStatement: + sql, args := kaiwudb.HostRangeFast(qry, p.opts.debug) + args = append([]interface{}{p.queryMode}, args...) + rows, err = p.db.Connection.Query(ctx, sql, args...) + case pgx.QueryExecModeCacheDescribe, pgx.QueryExecModeDescribeExec: + if p.parse == nil { + p.parse = kaiwudb.NewParsePrepare(string(tq.HumanLabel)) + } + sql, args := p.parse.Parse(qry, p.opts.debug) + args = append([]interface{}{p.queryMode}, args...) + rows, err = p.db.Connection.Query(ctx, sql, args...) + default: + rows, err = p.db.Connection.Query(ctx, qry, p.queryMode) + } + + if err != nil { + log.Println("Error running query: '", qry, "'") + return nil, err + } + + if p.opts.printResponse { + prettyPrintResponse(rows, qry) + } + for rows.Next() { + + } + rows.Close() + + took := float64(time.Since(start).Nanoseconds()) / 1e6 + stat := query.GetStat() + stat.Init(q.HumanLabelName(), took) + + if p.opts.debug { + fmt.Println(qry, took) + } + + return []*query.Stat{stat}, nil +} + +func newProcessor() query.Processor { return &processor{} } + +func prettyPrintResponse(rows pgx.Rows, query string) { + resp := make(map[string]interface{}) + resp["query"] = query + resp["results"] = mapRows(rows) + + line, err := json.MarshalIndent(resp, "", " ") + if err != nil { + panic(err) + } + + fmt.Println(string(line) + "\n") +} + +func mapRows(r pgx.Rows) []map[string]interface{} { + rows := []map[string]interface{}{} + cols := r.FieldDescriptions() + for r.Next() { + row := make(map[string]interface{}) + values := make([]interface{}, len(cols)) + for i := range values { + values[i] = new(interface{}) + } + + err := r.Scan(values...) + if err != nil { + panic(errors.Wrap(err, "error while reading values")) + } + + for i, column := range cols { + row[column.Name] = *values[i].(*interface{}) + } + rows = append(rows, row) + } + return rows +} diff --git a/go.mod b/go.mod index 110b0ce1f..e8d56d10a 100644 --- a/go.mod +++ b/go.mod @@ -18,12 +18,14 @@ require ( github.com/google/flatbuffers v1.11.0 github.com/google/go-cmp v0.5.2 github.com/jackc/pgx/v4 v4.8.0 + github.com/jackc/pgx/v5 v5.3.1 github.com/jmoiron/sqlx v1.2.1-0.20190826204134-d7d95172beb5 github.com/kshvakov/clickhouse v1.3.11 github.com/lib/pq v1.3.0 github.com/pkg/errors v0.9.1 github.com/prometheus/common v0.13.0 github.com/shirou/gopsutil v3.21.3+incompatible + github.com/silenceper/pool v1.0.0 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 github.com/spf13/cobra v1.0.0 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index ec32a1e42..b57a7b966 100644 --- a/go.sum +++ b/go.sum @@ -536,8 +536,9 @@ github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwX github.com/jackc/pgproto3/v2 v2.0.2 h1:q1Hsy66zh4vuNsajBUF2PNqfAMMfxU5mk594lPE9vjY= github.com/jackc/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= @@ -554,10 +555,13 @@ github.com/jackc/pgx/v4 v4.6.1-0.20200510190926-94ba730bb1e9/go.mod h1:t3/cdRQl6 github.com/jackc/pgx/v4 v4.6.1-0.20200606145419-4e5062306904/go.mod h1:ZDaNWkt9sW1JMiNn0kdYBaLelIhw7Pg4qd+Vk6tw7Hg= github.com/jackc/pgx/v4 v4.8.0 h1:xO3bPvwr0MJSoDfb4yeeWZIxSZ2VFBm5axPnaNEnGUQ= github.com/jackc/pgx/v4 v4.8.0/go.mod h1:AjqYcDmEyst6GF8jJi/RF73Gla9d7/HLZzJEZj2uwpM= +github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU= +github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jamiealquiza/envy v1.1.0/go.mod h1:MP36BriGCLwEHhi1OU8E9569JNZrjWfCvzG7RsPnHus= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jingyugao/rowserrcheck v0.0.0-20191204022205-72ab7603b68a/go.mod h1:xRskid8CManxVta/ALEhJha/pweKBaVG6fWgc0yH25s= @@ -607,8 +611,10 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= @@ -704,7 +710,6 @@ github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxzi github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d/go.mod h1:o96djdrsSGy3AWPyBgZMAGfxZNfgntdJG+11KU4QvbU= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nishanths/exhaustive v0.0.0-20200811152831-6cf413ae40e0/go.mod h1:wBEpHwM2OdmeNpdCvRPUlkEbBuaFmcK4Wv8Q7FuGW3c= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -748,7 +753,6 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= -github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= github.com/pelletier/go-toml v1.7.0 h1:7utD74fnzVc/cpcyy8sjrlFr5vYpypUixARcHIMIGuI= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= @@ -815,6 +819,8 @@ github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.0/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= @@ -849,6 +855,8 @@ github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/shurcooL/vfsgen v0.0.0-20200627165143-92b8a710ab6c/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= +github.com/silenceper/pool v1.0.0 h1:JTCaA+U6hJAA0P8nCx+JfsRCHMwLTfatsm5QXelffmU= +github.com/silenceper/pool v1.0.0/go.mod h1:3DN13bqAbq86Lmzf6iUXWEPIWFPOSYVfaoceFvilKKI= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= @@ -898,8 +906,10 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= @@ -907,6 +917,7 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69 github.com/tdakkota/asciicheck v0.0.0-20200416190851-d7f85be797a2/go.mod h1:yHp0ai0Z9gUljN3o0xMhYJnH/IcvkdTBOX2fmJ93JEM= github.com/testcontainers/testcontainers-go v0.5.1/go.mod h1:Oc/G02bjZiX0p3lzyh6b1GCELP0e4/6Cg3ciU/LnFvU= github.com/tetafro/godot v0.4.8/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/timakin/bodyclose v0.0.0-20190930140734-f7f2e9bca95e/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk= github.com/timescale/promscale v0.0.0-20201006153045-6a66a36f5c84 h1:jdJdzLyz0SNBuvt5rYyBxDqhgZ2EcbA7eWVBMqcyEHc= @@ -952,6 +963,7 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -960,7 +972,6 @@ go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qL go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.3.0/go.mod h1:MSWZXKOynuguX+JSvwP8i+58jYCXxbia8HS3gZBapIE= -go.mongodb.org/mongo-driver v1.3.2 h1:IYppNjEV/C+/3VPbhHVxQ4t04eVW0cLp0/pNdW++6Ug= go.mongodb.org/mongo-driver v1.3.2/go.mod h1:MSWZXKOynuguX+JSvwP8i+58jYCXxbia8HS3gZBapIE= go.mongodb.org/mongo-driver v1.4.6 h1:rh7GdYmDrb8AQSkF8yteAus8qYOgOASWDOv1BWqBXkU= go.mongodb.org/mongo-driver v1.4.6/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc= @@ -1004,8 +1015,10 @@ golang.org/x/crypto v0.0.0-20191202143827-86a70503ff7e/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1040,8 +1053,9 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1081,8 +1095,11 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA= golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1096,8 +1113,10 @@ golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1156,15 +1175,25 @@ golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200821140526-fda516888d29/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200908134130-d2e65c121b96/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa h1:ZYxPR6aca/uhfRJyaOAtflSHjJYiktO7QnJC5ut7iY4= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1246,8 +1275,9 @@ golang.org/x/tools v0.0.0-20200701041122-1837592efa10/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200724022722-7017fd6b1305/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200812195022-5ae4c3c160a0/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200822203824-307de81be3f4/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= -golang.org/x/tools v0.0.0-20200908211811-12e1bf57a112 h1:DmrRJy1qn9VDMf4+GSpRlwfZ51muIF7r96MFBFP4bPM= golang.org/x/tools v0.0.0-20200908211811-12e1bf57a112/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE= +golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1340,8 +1370,9 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= diff --git a/pkg/query/factories/init_factories.go b/pkg/query/factories/init_factories.go index f99c4a348..4e9967275 100644 --- a/pkg/query/factories/init_factories.go +++ b/pkg/query/factories/init_factories.go @@ -7,6 +7,7 @@ import ( "github.com/benchant/tsbs/cmd/tsbs_generate_queries/databases/cratedb" "github.com/benchant/tsbs/cmd/tsbs_generate_queries/databases/influx" "github.com/benchant/tsbs/cmd/tsbs_generate_queries/databases/iotdb" + "github.com/benchant/tsbs/cmd/tsbs_generate_queries/databases/kaiwudb" "github.com/benchant/tsbs/cmd/tsbs_generate_queries/databases/mongo" "github.com/benchant/tsbs/cmd/tsbs_generate_queries/databases/questdb" "github.com/benchant/tsbs/cmd/tsbs_generate_queries/databases/siridb" @@ -44,5 +45,11 @@ func InitQueryFactories(config *config.QueryGeneratorConfig) map[string]interfac BasicPath: "root", BasicPathLevel: 0, } + factories[constants.FormatKaiwuDB] = &kaiwudb.BaseGenerator{ + CPUDBName: config.DbName, + ReadingDBName: config.DbName, + DiagnosticsDBName: config.DbName, + } + return factories } diff --git a/pkg/query/kaiwudb.go b/pkg/query/kaiwudb.go new file mode 100644 index 000000000..643e87707 --- /dev/null +++ b/pkg/query/kaiwudb.go @@ -0,0 +1,59 @@ +package query + +import ( + "fmt" + "sync" +) + +type Kaiwudb struct { + id uint64 + HumanLabel []byte + HumanDescription []byte + Hypertable []byte + SqlQuery []byte +} + +var KaiwudbPool = sync.Pool{ + New: func() interface{} { + return &Kaiwudb{ + HumanLabel: make([]byte, 0, 1024), + HumanDescription: make([]byte, 0, 1024), + Hypertable: make([]byte, 0, 1024), + SqlQuery: make([]byte, 0, 1024), + } + }, +} + +func NewKaiwudb() *Kaiwudb { + return KaiwudbPool.Get().(*Kaiwudb) +} + +func (q *Kaiwudb) Release() { + q.HumanLabel = q.HumanLabel[:0] + q.HumanDescription = q.HumanDescription[:0] + q.id = 0 + + q.Hypertable = q.Hypertable[:0] + q.SqlQuery = q.SqlQuery[:0] + KaiwudbPool.Put(q) +} + +func (q *Kaiwudb) HumanLabelName() []byte { + return q.HumanLabel +} + +func (q *Kaiwudb) HumanDescriptionName() []byte { + return q.HumanDescription +} + +func (q *Kaiwudb) GetID() uint64 { + return q.id +} + +func (q *Kaiwudb) SetID(n uint64) { + q.id = n +} + +func (q *Kaiwudb) String() string { + return fmt.Sprintf("HumanLabel: %s, HumanDescription: %s, Hypertable: %s, Query: %s", q.HumanLabel, q.HumanDescription, q.Hypertable, q.SqlQuery) +} diff --git a/pkg/targets/constants/constants.go b/pkg/targets/constants/constants.go index 7bfe4423b..0e76dfd40 100644 --- a/pkg/targets/constants/constants.go +++ b/pkg/targets/constants/constants.go @@ -15,6 +15,7 @@ const ( FormatTimestream = "timestream" FormatQuestDB = "questdb" FormatIoTDB = "iotdb" + FormatKaiwuDB = "kaiwudb" ) func SupportedFormats() []string { @@ -32,5 +33,6 @@ func SupportedFormats() []string { FormatTimestream, FormatQuestDB, FormatIoTDB, + FormatKaiwuDB, } } diff --git a/pkg/targets/initializers/target_initializers.go b/pkg/targets/initializers/target_initializers.go index a3286c85e..61274e546 100644 --- a/pkg/targets/initializers/target_initializers.go +++ b/pkg/targets/initializers/target_initializers.go @@ -10,6 +10,7 @@ import ( "github.com/benchant/tsbs/pkg/targets/crate" "github.com/benchant/tsbs/pkg/targets/influx" "github.com/benchant/tsbs/pkg/targets/iotdb" + "github.com/benchant/tsbs/pkg/targets/kaiwudb" "github.com/benchant/tsbs/pkg/targets/mongo" "github.com/benchant/tsbs/pkg/targets/prometheus" "github.com/benchant/tsbs/pkg/targets/questdb" @@ -48,6 +49,8 @@ func GetTarget(format string) targets.ImplementedTarget { return questdb.NewTarget() case constants.FormatIoTDB: return iotdb.NewTarget() + case constants.FormatKaiwuDB: + return kaiwudb.NewTarget() } supportedFormatsStr := strings.Join(constants.SupportedFormats(), ",") diff --git a/pkg/targets/kaiwudb/benchmark.go b/pkg/targets/kaiwudb/benchmark.go new file mode 100644 index 000000000..7e7def178 --- /dev/null +++ b/pkg/targets/kaiwudb/benchmark.go @@ -0,0 +1,64 @@ +package kaiwudb + +import ( + "github.com/benchant/tsbs/pkg/data/source" + "github.com/benchant/tsbs/pkg/targets" +) + +var ( + KAIWUINSERT = "insert" + KAIWUPREPARE = "prepare" + KAIWUPREPAREIOT = "prepareiot" +) + +func NewBenchmark(dbName string, opts *LoadingOptions, dataSourceConfig *source.DataSourceConfig) (targets.Benchmark, error) { + var ds targets.DataSource + if dataSourceConfig.Type == source.FileDataSourceType { + ds = newFileDataSource(dataSourceConfig.File.Location) + } else { + panic("not implement") + } + + return &benchmark{ + opts: opts, + ds: ds, + dbName: dbName, + }, nil +} + +type benchmark struct { + opts *LoadingOptions + ds targets.DataSource + dbName string +} + +func (b *benchmark) GetDataSource() targets.DataSource { + return b.ds +} + +func (b *benchmark) GetBatchFactory() targets.BatchFactory { + return &factory{} +} + +func (b *benchmark) GetPointIndexer(maxPartitions uint) targets.PointIndexer { + if maxPartitions > 1 { + + return &indexer{partitions: maxPartitions, tmp: map[string]uint{}, index: 0, Buffer: b.opts.Buffer} + } + return &targets.ConstantIndexer{} +} + +func (b *benchmark) GetProcessor() targets.Processor { + switch b.opts.Type { + case KAIWUINSERT: + return newProcessorInsert(b.opts, b.dbName) + case KAIWUPREPARE: + return newProcessorPrepare(b.opts, b.dbName) + default: + return nil + } +} + +func (b *benchmark) GetDBCreator() targets.DBCreator { + return &dbCreator{opts: b.opts, ds: b.ds} +} diff --git a/pkg/targets/kaiwudb/commonpool/pool.go b/pkg/targets/kaiwudb/commonpool/pool.go new file mode 100644 index 000000000..5e5e588fe --- /dev/null +++ b/pkg/targets/kaiwudb/commonpool/pool.go @@ -0,0 +1,148 @@ +package commonpool + +import ( + "context" + "fmt" + "sync" + + "github.com/benchant/tsbs/pkg/targets/kaiwudb/thread" + pgx "github.com/jackc/pgx/v5" + "github.com/silenceper/pool" +) + +type ConnectorPool struct { + host string + user string + password string + port int + format int + pool pool.Pool +} + +func NewConnectorPool(user string, password string, host string, port int, format int) (*ConnectorPool, error) { + a := &ConnectorPool{user: user, password: password, host: host, port: port, format: format} + poolConfig := &pool.Config{ + InitialCap: 1, + MaxCap: 10000, + MaxIdle: 10000, + Factory: a.factory, + Close: a.close, + IdleTimeout: -1, + } + p, err := pool.NewChannelPool(poolConfig) + if err != nil { + return nil, err + } + a.pool = p + return a, nil +} + +func (a *ConnectorPool) factory() (interface{}, error) { + thread.Lock() + defer thread.Unlock() + url := fmt.Sprintf("dbname=defaultdb host=%s port=%d user=%s password=%s pg_format=%d sslmode=disable default_query_exec_mode=simple_protocol standard_conforming_strings=on client_encoding=UTF8", a.host, a.port, a.user, a.password, a.format) + + return pgx.Connect(context.Background(), url) +} + +func (a *ConnectorPool) close(v interface{}) error { + if v != nil { + thread.Lock() + defer thread.Unlock() + conn := v.(*pgx.Conn) + a.Close(conn) + } + return nil +} + +func (a *ConnectorPool) Get() (*pgx.Conn, error) { + v, err := a.pool.Get() + if err != nil { + return nil, err + } + return v.(*pgx.Conn), nil +} + +func (a *ConnectorPool) Put(c *pgx.Conn) error { + return a.pool.Put(c) +} + +func (a *ConnectorPool) Close(c *pgx.Conn) error { + return a.pool.Close(c) +} + +func (a *ConnectorPool) Release() { + a.pool.Release() +} + +func (a *ConnectorPool) verifyPassword(password string) bool { + return password == a.password +} + +var connectionMap = sync.Map{} + +type Conn struct { + Connection *pgx.Conn + pool *ConnectorPool +} + +func (c *Conn) Put() error { + return c.pool.Put(c.Connection) +} + +func GetConnection(user string, password string, host string, port int, format int) (*Conn, error) { + p, exist := connectionMap.Load(user) + if exist { + connectionPool := p.(*ConnectorPool) + if !connectionPool.verifyPassword(password) { + newPool, err := NewConnectorPool(user, password, host, port, format) + if err != nil { + return nil, err + } + connectionPool.Release() + connectionMap.Store(user, newPool) + c, err := newPool.Get() + if err != nil { + return nil, err + } + return &Conn{ + Connection: c, + pool: newPool, + }, nil + } else { + c, err := connectionPool.Get() + if err != nil { + return nil, err + } + if c == nil { + newPool, err := NewConnectorPool(user, password, host, port, format) + if err != nil { + return nil, err + } + connectionMap.Store(user, newPool) + c, err = newPool.Get() + if err != nil { + return nil, err + } + } + return &Conn{ + Connection: c, + pool: connectionPool, + }, nil + } + } else { + newPool, err := NewConnectorPool(user, password, host, port, format) + if err != nil { + return nil, err + } + connectionMap.Store(user, newPool) + c, err := newPool.Get() + if err != nil { + return nil, err + } + return &Conn{ + Connection: c, + pool: newPool, + }, nil + } +} diff --git a/pkg/targets/kaiwudb/creator.go b/pkg/targets/kaiwudb/creator.go new file mode 100644 index 000000000..8d5e4d099 --- /dev/null +++ b/pkg/targets/kaiwudb/creator.go @@ -0,0 +1,64 @@ +package kaiwudb + +import "C" +import ( + "context" + "fmt" + "log" + "strings" + + "github.com/benchant/tsbs/pkg/targets" + "github.com/benchant/tsbs/pkg/targets/kaiwudb/commonpool" +) + +var fatal = log.Fatalf + +type dbCreator struct { + opts *LoadingOptions + ds targets.DataSource + db *commonpool.Conn +} + +var IOTPRE = []string{"readings", "diagnostics"} +var DEVOPSPRE = []string{"cpu", "diskio", "disk", "kernel", "mem", "net", "nginx", "postgresl", "redis"} + +func (d *dbCreator) Init() { + db, err := commonpool.GetConnection(d.opts.User, d.opts.Pass, d.opts.Host[0], d.opts.Port[0], 0) + if err != nil { + panic(fmt.Sprintf("kaiwudb can not get connection %s", err.Error())) + } + d.db = db +} + +func (d *dbCreator) DBExists(dbName string) bool { + return true +} + +func (d *dbCreator) CreateDB(dbName string) error { + ctx := context.Background() + // 创建时序数据库 + sql := fmt.Sprintf("create schema %s ", dbName) + _, err := d.db.Connection.Exec(ctx, sql) + if err != nil && !strings.Contains(err.Error(), "already exists") { + panic(fmt.Sprintf("kaiwudb create schema failed, err :%s", err)) + } + + return nil +} + +func (d *dbCreator) RemoveOldDB(dbName string) error { + ctx := context.Background() + + sql := fmt.Sprintf("drop schema %s CASCADE", dbName) + _, err := d.db.Connection.Exec(ctx, sql) + if err != nil && !strings.Contains(err.Error(), "does not exist") { + panic(fmt.Sprintf("kaiwudb drop schema failed, err :%s", err)) + } + return nil +} + +func (d *dbCreator) Close() { + if d.db != nil { + d.db.Put() + } +} diff --git a/pkg/targets/kaiwudb/file_data_source.go b/pkg/targets/kaiwudb/file_data_source.go new file mode 100644 index 000000000..1c38d986b --- /dev/null +++ b/pkg/targets/kaiwudb/file_data_source.go @@ -0,0 +1,69 @@ +package kaiwudb + +import ( + "bufio" + "strconv" + "strings" + + "github.com/benchant/tsbs/load" + "github.com/benchant/tsbs/pkg/data" + "github.com/benchant/tsbs/pkg/data/usecases/common" + "github.com/benchant/tsbs/pkg/targets" +) + +func newFileDataSource(fileName string) targets.DataSource { + br := load.GetBufferedReader(fileName) + + return &fileDataSource{scanner: bufio.NewScanner(br)} +} + +type fileDataSource struct { + scanner *bufio.Scanner + headers *common.GeneratedDataHeaders +} + +func (d *fileDataSource) Headers() *common.GeneratedDataHeaders { + return nil +} + +func (d *fileDataSource) NextItem() data.LoadedPoint { + ok := d.scanner.Scan() + if !ok && d.scanner.Err() == nil { // nothing scanned & no error = EOF + return data.LoadedPoint{} + } else if !ok { + fatal("scan error: %v", d.scanner.Err()) + return data.LoadedPoint{} + } + p := &point{} + line := d.scanner.Text() + p.sqlType = line[0] + switch line[0] { + case InsertMetric: + parts := strings.SplitN(line, ",", 5) + p.table = parts[1] // cpu + p.device = parts[2] // host_0 + // p.tag = parts[2] + p.fieldCount, _ = strconv.Atoi(parts[3]) + p.sql = strings.TrimSpace(parts[4]) + + case CreateTable: + parts := strings.SplitN(line, ",", 4) + p.table = parts[1] //cpu + // p.device = parts[2] //host_0 + p.cols = strings.Replace(parts[2], ":", ",", -1) + p.sql = parts[3] //(column) tags (tagStr) + case InsertMetricAndTag: + parts := strings.SplitN(line, ",", 4) + p.table = parts[1] //cpu + p.device = parts[2] //host_0 + p.sql = parts[3] //metrics + tags + //case Modify: + // parts := strings.SplitN(line, ",", 4) + // p.superTable = parts[1] + // p.subTable = parts[2] + // p.sql = parts[3] + default: + panic(line) + } + return data.NewLoadedPoint(p) +} diff --git a/pkg/targets/kaiwudb/implemented_target.go b/pkg/targets/kaiwudb/implemented_target.go new file mode 100644 index 000000000..b6a17da75 --- /dev/null +++ b/pkg/targets/kaiwudb/implemented_target.go @@ -0,0 +1,48 @@ +package kaiwudb + +import ( + "bytes" + + "github.com/benchant/tsbs/pkg/data/serialize" + "github.com/benchant/tsbs/pkg/data/source" + "github.com/benchant/tsbs/pkg/targets" + "github.com/benchant/tsbs/pkg/targets/constants" + "github.com/blagojts/viper" + "github.com/spf13/pflag" +) + +func NewTarget() targets.ImplementedTarget { + return &kaiwudbTarget{} +} + +type kaiwudbTarget struct { +} + +func (t *kaiwudbTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.FlagSet) { + flagSet.String(flagPrefix+"user", "root", "User to connect to KaiwuDB") + flagSet.String(flagPrefix+"pass", "", "Password for user connecting to KaiwuDB") + flagSet.StringSlice(flagPrefix+"host", []string{"", "", ""}, "KaiwuDB host") + flagSet.IntSlice(flagPrefix+"port", []int{26257, 26258, 26259}, "KaiwuDB client Port") + flagSet.String(flagPrefix+"insert-type", "prepare", "KaiwuDB insert type") +} + +func (t *kaiwudbTarget) TargetName() string { + return constants.FormatKaiwuDB +} + +func (t *kaiwudbTarget) Serializer() serialize.PointSerializer { + return &Serializer{ + tableMap: map[string]map[string]struct{}{}, + superTable: map[string]*Table{}, + tmpBuf: &bytes.Buffer{}, + } +} + +func (t *kaiwudbTarget) Benchmark(targetDB string, dataSourceConfig *source.DataSourceConfig, v *viper.Viper, +) (targets.Benchmark, error) { + var loadingOptions LoadingOptions + if err := v.Unmarshal(&loadingOptions); err != nil { + return nil, err + } + return NewBenchmark(targetDB, &loadingOptions, dataSourceConfig) +} diff --git a/pkg/targets/kaiwudb/parse_prepare.go b/pkg/targets/kaiwudb/parse_prepare.go new file mode 100644 index 000000000..0c2cb5a41 --- /dev/null +++ b/pkg/targets/kaiwudb/parse_prepare.go @@ -0,0 +1,313 @@ +package kaiwudb + +import ( + "fmt" + "regexp" + "strings" + "time" + + "github.com/benchant/tsbs/cmd/tsbs_generate_queries/uses/devops" +) + +const layout = "2006-01-02 15:04:05.000" + +type ParsePrepare struct { + typ string +} + +func NewParsePrepare(label string) *ParsePrepare { + dbName := "KaiwuDB" + var typ string + switch label { + case fmt.Sprintf("KaiwuDB %d cpu metric(s), random %4d hosts, random %s by 1m", 1, 1, "1h0m0s"): + typ = devops.LabelSingleGroupby + "-1-1-1" + case fmt.Sprintf("KaiwuDB %d cpu metric(s), random %4d hosts, random %s by 1m", 1, 1, "12h0m0s"): + typ = devops.LabelSingleGroupby + "-1-1-12" + case fmt.Sprintf("KaiwuDB %d cpu metric(s), random %4d hosts, random %s by 1m", 1, 8, "1h0m0s"): + typ = devops.LabelSingleGroupby + "-1-8-1" + case fmt.Sprintf("KaiwuDB %d cpu metric(s), random %4d hosts, random %s by 1m", 5, 1, "1h0m0s"): + typ = devops.LabelSingleGroupby + "-5-1-1" + case fmt.Sprintf("KaiwuDB %d cpu metric(s), random %4d hosts, random %s by 1m", 5, 1, "12h0m0s"): + typ = devops.LabelSingleGroupby + "-5-1-12" + case devops.GetDoubleGroupByLabel(dbName, 1): + typ = devops.LabelDoubleGroupby + "1" + case devops.GetDoubleGroupByLabel(dbName, 5): + typ = devops.LabelDoubleGroupby + "5" + case devops.GetDoubleGroupByLabel(dbName, devops.GetCPUMetricsLen()): + typ = devops.LabelDoubleGroupby + "all" + case devops.GetMaxAllLabel(dbName, 1): + typ = devops.LabelMaxAll + "1" + case devops.GetMaxAllLabel(dbName, 8): + typ = devops.LabelMaxAll + "8" + case "KaiwuDB max cpu over last 5 min-intervals (random end)": + typ = devops.LabelGroupbyOrderbyLimit + case getHighCPULabel(dbName, 1): + typ = devops.LabelHighCPU + "-1" + case getHighCPULabel(dbName, 0): + typ = devops.LabelHighCPU + "-all" + case "KaiwuDB last row per host": + typ = devops.LabelLastpoint + default: + typ = "not-support" + } + + return &ParsePrepare{typ: typ} +} + +func (p *ParsePrepare) Parse(qry string, debug bool) (string, []interface{}) { + switch p.typ { + case devops.LabelSingleGroupby + "-1-1-1": + return HostRangeFast(qry, debug) + case devops.LabelSingleGroupby + "-5-1-1", + devops.LabelSingleGroupby + "-1-1-12", + devops.LabelSingleGroupby + "-5-1-12", + devops.LabelHighCPU + "-1", + devops.LabelMaxAll + "1": + return p.HostRange(qry, debug) + case devops.LabelSingleGroupby + "-1-8-1", + devops.LabelSingleGroupby + "-5-8-1", + devops.LabelMaxAll + "8": + return p.HostsRange(qry, debug) + case devops.LabelDoubleGroupby + "-1", + devops.LabelDoubleGroupby + "-5", + devops.LabelDoubleGroupby + "-all", + devops.LabelHighCPU + "-all": + return p.Range(qry, debug) + case devops.LabelGroupbyOrderbyLimit: + return p.Less(qry, debug) + case devops.LabelLastpoint: + return p.NotChange(qry, debug) + default: + return p.NotChange(qry, debug) + } +} + +func HostRangeFast(qry string, debug bool) (string, []interface{}) { + arr := strings.Split(qry, " ") + + hostname := strings.Trim(arr[11], "'") + arr[11] = "$1" + + t := strings.Trim(arr[16], "'") + if len(t) == 11 { + t += "0" + } + if len(t) == 10 { + t += "00" + } + if len(t) == 8 { + t += ".000" + } + + str := fmt.Sprintf("%s %s", strings.Trim(arr[15], "'"), t) + s, err := time.Parse(layout, str) + if err != nil { + panic(err) + } + arr[15] = "" + arr[16] = "$2" + + t = strings.Trim(arr[21], "'") + if len(t) == 11 { + t += "0" + } + if len(t) == 10 { + t += "00" + } + if len(t) == 8 { + t += ".000" + } + str = fmt.Sprintf("%s %s", strings.Trim(arr[20], "'"), t) + e, err := time.Parse(layout, str) + if err != nil { + panic(err) + } + arr[20] = "" + arr[21] = "$3" + + var b strings.Builder + b.Grow(len(arr)) + b.WriteString(arr[0]) + for _, ss := range arr[1:] { + if ss != "" { + b.WriteString(" ") + } + b.WriteString(ss) + } + sql := b.String() + if debug { + fmt.Println("args:", hostname, s, e) + fmt.Println("sql:", sql) + } + + return sql, []interface{}{hostname, s, e} +} + +func (p *ParsePrepare) HostRange(qry string, debug bool) (string, []interface{}) { + var args []interface{} + + host := getHost(qry) + args = append(args, strings.Trim(host, "'")) + qry = strings.ReplaceAll(qry, host, fmt.Sprintf("$%d", len(args))) + + start, s := getStart(qry) + if start != "" { + args = append(args, s) + qry = strings.ReplaceAll(qry, start, fmt.Sprintf("$%d", len(args))) + } + + end, e := getEnd(qry) + if end != "" { + args = append(args, e) + qry = strings.ReplaceAll(qry, end, fmt.Sprintf("$%d", len(args))) + } + + if debug { + fmt.Println("sql:", qry, args) + } + + return qry, args +} + +func (p *ParsePrepare) HostsRange(qry string, debug bool) (string, []interface{}) { + var args []interface{} + + str, hosts := getHosts(qry) + args = append(args, hosts) + qry = strings.ReplaceAll(qry, str, fmt.Sprintf("$%d", len(args))) + + start, s := getStart(qry) + if start != "" { + args = append(args, s) + qry = strings.ReplaceAll(qry, start, fmt.Sprintf("$%d", len(args))) + } + + end, e := getEnd(qry) + if end != "" { + args = append(args, e) + qry = strings.ReplaceAll(qry, end, fmt.Sprintf("$%d", len(args))) + } + + if debug { + fmt.Println("sql:", qry, args) + } + return qry, args +} + +func (p *ParsePrepare) Range(qry string, debug bool) (string, []interface{}) { + var args []interface{} + start, s := getStart(qry) + if start != "" { + args = append(args, s) + qry = strings.ReplaceAll(qry, start, fmt.Sprintf("$%d", len(args)+1)) + } + + end, e := getEnd(qry) + if end != "" { + args = append(args, e) + qry = strings.ReplaceAll(qry, end, fmt.Sprintf("$%d", len(args)+1)) + } + + if debug { + fmt.Println("sql:", qry, args) + } + return qry, args +} + +func (p *ParsePrepare) Less(qry string, debug bool) (string, []interface{}) { + var args []interface{} + + end, e := getEnd(qry) + if end != "" { + args = append(args, e) + qry = strings.ReplaceAll(qry, end, fmt.Sprintf("$%d", len(args)+1)) + } + + if debug { + fmt.Println("sql:", qry, args) + } + + return qry, args +} + +func (p *ParsePrepare) NotChange(qry string, debug bool) (string, []interface{}) { + var args []interface{} + + return qry, args +} + +func getHighCPULabel(dbName string, nHosts int) string { + humanLabel, _ := devops.GetHighCPULabel(dbName, nHosts) + + return humanLabel +} + +func getHost(qry string) string { + regex := regexp.MustCompile(`hostname = ('host_\d+')`) + matches := regex.FindStringSubmatch(qry) + if len(matches) > 0 { + return matches[1] + } + + return "" +} + +func getHosts(qry string) (string, []string) { + regex := regexp.MustCompile(`hostname IN (\([^()]*\))`) + matches := regex.FindStringSubmatch(qry) + if len(matches) > 0 { + hosts := strings.Trim(matches[1], "()'") + return matches[1], strings.Split(hosts, ",") + } + + return "", []string{} +} + +func getStart(qry string) (string, time.Time) { + regex := regexp.MustCompile(`>= ('[^']*')`) + matches := regex.FindStringSubmatch(qry) + if len(matches) > 0 { + str := strings.Trim(matches[1], "'") + if len(str) == 22 { + str += "0" + } + if len(str) == 21 { + str += "00" + } + if len(str) == 19 { + str += ".000" + } + s, err := time.Parse(layout, str) + if err != nil { + panic(err) + } + + return matches[1], s + } + + return "", time.Time{} +} + +func getEnd(qry string) (string, time.Time) { + regex := regexp.MustCompile(`< ('[^']*')`) + matches := regex.FindStringSubmatch(qry) + if len(matches) > 0 { + str := strings.Trim(matches[1], "'") + if len(str) == 22 { + str += "0" + } + if len(str) == 21 { + str += "00" + } + if len(str) == 19 { + str += ".000" + } + e, err := time.Parse(layout, str) + if err != nil { + panic(err) + } + + return matches[1], e + } + return "", time.Time{} +} diff --git a/pkg/targets/kaiwudb/process_insert.go b/pkg/targets/kaiwudb/process_insert.go new file mode 100644 index 000000000..50eb6b60e --- /dev/null +++ b/pkg/targets/kaiwudb/process_insert.go @@ -0,0 +1,241 @@ +package kaiwudb + +import ( + "bytes" + "context" + "fmt" + "strings" + "sync" + + "github.com/benchant/tsbs/pkg/targets" + "github.com/benchant/tsbs/pkg/targets/kaiwudb/commonpool" +) + +type syncCSI struct { + m sync.Map +} + +const Size1M = 1 * 1024 * 1024 + +type Ctx struct { + c context.Context + cancel context.CancelFunc +} + +var globalSCI = &syncCSI{} + +type processorInsert struct { + opts *LoadingOptions + dbName string + sci *syncCSI + _db *commonpool.Conn + wg *sync.WaitGroup + buf *bytes.Buffer +} + +func newProcessorInsert(opts *LoadingOptions, dbName string) *processorInsert { + return &processorInsert{opts: opts, dbName: dbName, sci: globalSCI, wg: &sync.WaitGroup{}, buf: &bytes.Buffer{}} +} + +func (p *processorInsert) Init(proNum int, doLoad, _ bool) { + if !doLoad { + return + } + p.buf.Grow(Size1M) + var err error + l := len(p.opts.Port) + idx := proNum % l + + p._db, err = commonpool.GetConnection(p.opts.User, p.opts.Pass, p.opts.Host[idx], p.opts.Port[idx], 0) + if err != nil { + panic(err) + } + + // _, err = p._db.Connection.Exec(context.Background(), "use "+p.dbName) + // if err != nil { + // panic(err) + //} +} + +func (p *processorInsert) ProcessBatch(b targets.Batch, doLoad bool) (metricCount, rowCount uint64) { + batches := b.(*hypertableArr) + rowCnt := uint64(0) + + metricCnt := batches.totalMetric + if !doLoad { + if len(batches.createSql) != 0 { + for _, row := range batches.createSql { + switch row.sqlType { + case CreateTable: + case InsertMetricAndTag: + rowCnt += 1 + metricCnt += uint64(batches.cols[row.table] - 2) + } + } + } + + for _, sqls := range batches.m { + rowCnt += uint64(len(sqls)) + } + return metricCnt, rowCnt + } + p.buf.Reset() + + metricAndTagSql := map[string][]string{} + // create table and insert first record including tags + for _, row := range batches.createSql { + switch row.sqlType { + /** type 2 sample: + * 2,cpu,create table cpu (ts timestamp not null,usage_user int not null,usage_system int not null, + * usage_idle int not null,usage_nice int not null,usage_iowait int not null,usage_irq int not null, + * usage_softirq int not null,usage_steal int not null,usage_guest int not null,usage_guest_nice int not null) + * tags (hostname char(30) not null,region char(30),datacenter char(30),rack char(30),os char(30),arch char(30), + * team char(30),service char(30),service_version char(30),service_environment char(30)) primary tags (hostname) + */ + case CreateTable: + c, cancel := context.WithCancel(context.Background()) + ctx := &Ctx{ + c: c, + cancel: cancel, + } + actual, _ := p.sci.m.LoadOrStore(row.table, ctx) + sql := strings.ReplaceAll( + row.sql, + fmt.Sprintf("create table %s", row.table), + fmt.Sprintf("create table %s.%s", p.opts.DBName, row.table)) + + _, err := p._db.Connection.Exec(context.Background(), sql) + if err != nil { + panic(fmt.Sprintf("kaiwudb insert data failed,err :%s", err)) + } + GlobalTable.Store(row.table, row.cols) + actual.(*Ctx).cancel() + + /** type 3 sampe: + * 3,cpu,host_0,(1451606400000,58,2,24,61,22,63,6,44,80,38,'host_0','eu-west-1','eu-west-1c','87', + * 'Ubuntu16.04LTS','x64','NYC','18','1','production') + */ + case InsertMetricAndTag: + rowCnt += 1 + metricCnt += uint64(batches.cols[row.table] - 2) + c, cancel := context.WithCancel(context.Background()) + ctx := &Ctx{ + c: c, + cancel: cancel, + } + actual, _ := p.sci.m.LoadOrStore(row.table+row.device, ctx) + + //check if table created + _, ok := GlobalTable.Load(row.table) + if !ok { + v, ok := p.sci.m.Load(row.table) + if ok { + <-v.(*Ctx).c.Done() + + sql := "insert into " + p.opts.DBName + "." + row.table + " values " + row.sql + _, err := p._db.Connection.Exec(context.Background(), sql) + if err != nil { + panic(fmt.Sprintf("kaiwudb insert data failed,err :%s", err)) + } + + metricAndTagSql[row.table] = append(metricAndTagSql[row.table], row.sql) + GlobalTable.Store(row.table+row.device, nothing) + actual.(*Ctx).cancel() + continue + } + // wait for table created + tableC, tableCancel := context.WithCancel(context.Background()) + tableCtx := &Ctx{ + c: tableC, + cancel: tableCancel, + } + tableActual, _ := p.sci.m.LoadOrStore(row.table, tableCtx) + <-tableActual.(*Ctx).c.Done() + } + + sql := "insert into " + p.opts.DBName + "." + row.table + " values " + row.sql + _, err := p._db.Connection.Exec(context.Background(), sql) + if err != nil { + panic(fmt.Sprintf("kaiwudb insert data failed,err :%s", err)) + } + metricAndTagSql[row.table] = append(metricAndTagSql[row.table], row.sql) + GlobalTable.Store(row.table+row.device, nothing) + actual.(*Ctx).cancel() + default: + panic("impossible") + } + } + + // make sure table created and first rerord inserted into devices + p.buf.Reset() + p.wg.Add(len(batches.devices) * len(batches.m)) + for name := range batches.m { + // tableName := name + tableName := strings.Split(name, ":")[0] + for deviceName := range batches.devices { + device := deviceName + go func() { + defer p.wg.Done() + _, ok := GlobalTable.Load(tableName + device) + if ok { + return + } + v, ok := p.sci.m.Load(tableName + device) + if ok { + <-v.(*Ctx).c.Done() + return + } + c, cancel := context.WithCancel(context.Background()) + ctx := &Ctx{ + c: c, + cancel: cancel, + } + actual, _ := p.sci.m.LoadOrStore(tableName+device, ctx) + <-actual.(*Ctx).c.Done() + return + }() + } + } + p.wg.Wait() + + /** type 1 sample: + * 1,cpu,host_0,11,(1451606400000,58,2,24,61,22,63,6,44,80,38,'host_0') + */ + for name, sqls := range batches.m { + tableName := strings.Split(name, ":")[0] + + v, ok := GlobalTable.Load(tableName) + if !ok { + panic("table does not exists!") + } + + cols := v.(string) + + rowCnt += uint64(len(sqls)) + p.buf.WriteString("insert into ") + p.buf.WriteString(p.opts.DBName + "." + tableName + cols) + p.buf.WriteString(" values") + for i := 0; i < len(sqls); i++ { + p.buf.WriteString(sqls[i]) + if i < len(sqls)-1 { + p.buf.WriteString(" , ") + } + } + sql := p.buf.String() + + _, err := p._db.Connection.Exec(context.Background(), sql) + if err != nil { + panic(fmt.Sprintf("kaiwudb insert data failed,err :%s", err)) + } + p.buf.Reset() + } + + batches.Reset() + return metricCnt, rowCnt +} + +func (p *processorInsert) Close(doLoad bool) { + if doLoad { + p._db.Put() + } +} diff --git a/pkg/targets/kaiwudb/process_prepare.go b/pkg/targets/kaiwudb/process_prepare.go new file mode 100644 index 000000000..6e6b35787 --- /dev/null +++ b/pkg/targets/kaiwudb/process_prepare.go @@ -0,0 +1,372 @@ +package kaiwudb + +import ( + "context" + "encoding/binary" + "fmt" + "strconv" + "strings" + "sync" + + "github.com/benchant/tsbs/pkg/targets" + "github.com/benchant/tsbs/pkg/targets/kaiwudb/commonpool" +) + +const microSecFromUnixEpochToY2K = 946684800 * 1000000 +const bufferRows = 50 + +type fixedArgList struct { + args [][]byte + capacity int + writePos int +} + +func newFixedArgList(capacity int) *fixedArgList { + return &fixedArgList{ + args: make([][]byte, capacity), + capacity: capacity, + writePos: 0, + } +} + +func (fa *fixedArgList) Init() { + for i := 0; i < fa.capacity; i++ { + fa.args[i] = make([]byte, 8) + } +} + +func (fa *fixedArgList) Reset() { + fa.writePos = 0 +} + +func (fa *fixedArgList) Append(value []byte) { + fa.args[fa.writePos] = value + fa.writePos++ +} + +func (fa *fixedArgList) Emplace(value uint64) { + binary.BigEndian.PutUint64(fa.args[fa.writePos], value) + fa.writePos++ +} + +func (fa *fixedArgList) Capacity() int { + return fa.capacity +} + +func (fa *fixedArgList) Length() int { + return fa.writePos +} + +func (fa *fixedArgList) Args() [][]byte { + return fa.args[:fa.writePos] +} + +type prepareProcessor struct { + opts *LoadingOptions + dbName string + sci *syncCSI + _db *commonpool.Conn + preparedSql map[string]struct{} + workerIndex int + + // prepare buff + buffer map[string]*fixedArgList // tableName, fixedArgList + buffInited bool + wg *sync.WaitGroup +} + +func newProcessorPrepare(opts *LoadingOptions, dbName string) *prepareProcessor { + return &prepareProcessor{ + opts: opts, + dbName: dbName, + sci: globalSCI, + preparedSql: make(map[string]struct{}), + buffer: make(map[string]*fixedArgList), + wg: &sync.WaitGroup{}, + } +} + +func (p *prepareProcessor) Init(workerNum int, doLoad, _ bool) { + if !doLoad { + return + } + + p.workerIndex = workerNum + + var err error + l := len(p.opts.Port) + idx := workerNum % l + p._db, err = commonpool.GetConnection(p.opts.User, p.opts.Pass, p.opts.Host[idx], p.opts.Port[idx], 0) + if err != nil { + panic(err) + } +} + +func (p *prepareProcessor) ProcessBatch(b targets.Batch, doLoad bool) (metricCount, rowCount uint64) { + batches := b.(*hypertableArr) + rowCnt := uint64(0) + var deviceNum, deviceMetric uint64 + metricCnt := batches.totalMetric + if !doLoad { + if len(batches.createSql) != 0 { + for _, row := range batches.createSql { + switch row.sqlType { + case CreateTable: + case InsertMetricAndTag: + deviceNum += 1 + rowCnt += 1 + metricCnt += uint64(batches.cols[row.table] - 2) + } + } + } + + for _, sqls := range batches.m { + rowCnt += uint64(len(sqls)) + } + return metricCnt, rowCnt + } + + // create table + if p.opts.DoCreate && len(batches.createSql) != 0 { + deviceNum, deviceMetric = p.createDeviceAndAttribute(batches) + rowCnt += deviceNum + metricCnt += deviceMetric + } else { + deviceNum = 0 + } + + p.wg.Add(len(batches.devices)) + for deviceName := range batches.devices { + device := deviceName + go func() { + defer p.wg.Done() + _, ok := GlobalTable.Load(device) + if ok { + return + } + v, ok := p.sci.m.Load(device) + if ok { + <-v.(*Ctx).c.Done() + return + } + c, cancel := context.WithCancel(context.Background()) + ctx := &Ctx{ + c: c, + cancel: cancel, + } + actual, _ := p.sci.m.LoadOrStore(device, ctx) + <-actual.(*Ctx).c.Done() + return + }() + } + p.wg.Wait() + + // join args and execute + for name, args := range batches.m { + rowCnt += uint64(len(args)) + tableName := strings.Split(name, ":")[0] + // init buffer for every table + tableBuffer, ok := p.buffer[tableName] + if !ok { + bufferSize := bufferRows * batches.cols[tableName] + tableBuffer = newFixedArgList(bufferSize) + tableBuffer.Init() + p.buffer[tableName] = tableBuffer + } + + var formatBuf []int16 + for _, s := range args { + s = s[1 : len(s)-1] + values := strings.Split(s, ",") + + // Emplace + for i, v := range values { + if i < (batches.cols[tableName] - 1) { + num, _ := strconv.ParseInt(v, 10, 64) + if i == 0 { + // timestamp: UTC+8 Time Zone + tableBuffer.Emplace(uint64(num*1000) - microSecFromUnixEpochToY2K) + } else { + // row data + tableBuffer.Emplace(uint64(num)) + } + } else { + v = strings.TrimSpace(v) + vv := strings.Split(v, "'") + tableBuffer.Append([]byte(vv[1])) + } + formatBuf = append(formatBuf, 1) + } + if tableBuffer.Length() == tableBuffer.Capacity() { + key := p.createPrepareSql(tableName, tableBuffer.Capacity()) + p.execPrepareStmt(key, tableBuffer.args, formatBuf) + // reuse buffer: reset tableBuffer's write position + tableBuffer.Reset() + formatBuf = []int16{} + } + } + // check buffer is full + if tableBuffer.Length() > 0 { + key := p.createPrepareSql(tableName, tableBuffer.Length()) + p.execPrepareStmt(key, tableBuffer.Args(), formatBuf) + // reuse buffer: reset tableBuffer's write position + tableBuffer.Reset() + formatBuf = []int16{} + } + } + + batches.Reset() + return metricCnt, rowCnt +} + +func (p *prepareProcessor) Close(doLoad bool) { + if doLoad { + p._db.Put() + } +} + +func (p *prepareProcessor) createDeviceAndAttribute(batches *hypertableArr) (uint64, uint64) { + var deviceNum uint64 = 0 + var deviceMetricNum uint64 = 0 + for _, row := range batches.createSql { + switch row.sqlType { + case CreateTable: + c, cancel := context.WithCancel(context.Background()) + ctx := &Ctx{ + c: c, + cancel: cancel, + } + actual, _ := p.sci.m.LoadOrStore(row.table, ctx) + sql := strings.ReplaceAll( + row.sql, + fmt.Sprintf("create table %s", row.table), + fmt.Sprintf("create table %s.%s", p.opts.DBName, row.table)) + + _, err := p._db.Connection.Exec(ctx.c, sql) + if err != nil { + panic(fmt.Sprintf("kaiwudb2.0 create device failed,err :%s", err)) + } + + if err != nil && !strings.Contains(err.Error(), "already exists") { + panic(fmt.Sprintf("kaiwudb2.0 create device failed,err :%s", err)) + } + GlobalTable.Store(row.table, row.cols) + actual.(*Ctx).cancel() + case InsertMetricAndTag: + deviceNum += 1 + deviceMetricNum += uint64(batches.cols[row.table] - 2) + c, cancel := context.WithCancel(context.Background()) + ctx := &Ctx{ + c: c, + cancel: cancel, + } + actual, _ := p.sci.m.LoadOrStore(row.device, ctx) + //check if table created + _, ok := GlobalTable.Load(row.table) + if !ok { + v, ok := p.sci.m.Load(row.table) + if ok { + <-v.(*Ctx).c.Done() + sql := "insert into " + p.dbName + "." + row.table + " values " + row.sql + fmt.Println(sql) + _, err := p._db.Connection.Exec(context.Background(), sql) + if err != nil { + panic(fmt.Sprintf("kaiwudb2.0 insert data failed,err :%s", err)) + } + + GlobalTable.Store(row.device, nothing) + actual.(*Ctx).cancel() + continue + } + // wait for table created + tableC, tableCancel := context.WithCancel(context.Background()) + tableCtx := &Ctx{ + c: tableC, + cancel: tableCancel, + } + tableActual, _ := p.sci.m.LoadOrStore(row.table, tableCtx) + <-tableActual.(*Ctx).c.Done() + } + + sql := fmt.Sprintf("insert into %s.%s values %s", p.dbName, row.table, row.sql) + _, err := p._db.Connection.Exec(context.Background(), sql) + if err != nil { + panic(fmt.Sprintf("kaiwudb2.0 insert data failed,err :%s", err)) + } + GlobalTable.Store(row.device, nothing) + actual.(*Ctx).cancel() + continue + + default: + panic("impossible") + } + } + return deviceNum, deviceMetricNum +} + +func (p *prepareProcessor) createPrepareSql(tableName string, args int) string { + key := fmt.Sprintf("tsbs-insert-%s-%d", tableName, args) + _, ok := p.preparedSql[key] + if ok { + return key + } + + v, ok := GlobalTable.Load(tableName) + if !ok { + v1, ok1 := p.sci.m.Load(tableName) + if ok1 { + <-v1.(*Ctx).c.Done() + } + // wait for table created + tableC, tableCancel := context.WithCancel(context.Background()) + tableCtx := &Ctx{ + c: tableC, + cancel: tableCancel, + } + tableActual, _ := p.sci.m.LoadOrStore(tableName, tableCtx) + <-tableActual.(*Ctx).c.Done() + return p.createPrepareSql(tableName, args) + } + cols := v.(string) + + simpleColumns := len(strings.Split(cols, ",")) + var insertsql strings.Builder + + query := fmt.Sprintf("insert into %s.%s %s values ", p.opts.DBName, tableName, cols) + insertsql.WriteString(query) + + for i := 1; i <= args; i++ { + if i%(simpleColumns) == 1 { + insertsql.WriteString("(") + } + insertsql.WriteString(fmt.Sprintf("$%d", i)) + + if i%(simpleColumns) == 0 && i < args { + insertsql.WriteString("),") + } else if i%(simpleColumns) != 0 && i < args { + insertsql.WriteString(",") + } + if i == args { + insertsql.WriteString(");") + } + } + + sql := insertsql.String() + + // fmt.Println(sql) + + _, err1 := p._db.Connection.Prepare(context.Background(), key, sql) + if err1 != nil { + panic(fmt.Sprintf("kaiwudb Prepare failed,err :%s, sql :%s", err1, sql)) + } + + p.preparedSql[key] = struct{}{} + return key +} + +func (p *prepareProcessor) execPrepareStmt(key string, args [][]byte, formatBuf []int16) { + res := p._db.Connection.PgConn().ExecPrepared(context.Background(), key, args, formatBuf, []int16{}).Read() + if res.Err != nil { + panic(res.Err) + } +} diff --git a/pkg/targets/kaiwudb/program_options.go b/pkg/targets/kaiwudb/program_options.go new file mode 100644 index 000000000..ba7b3a464 --- /dev/null +++ b/pkg/targets/kaiwudb/program_options.go @@ -0,0 +1,13 @@ +package kaiwudb + +type LoadingOptions struct { + User string + Pass string + Host []string + Port []int + Buffer int + DBName string + Workers int + DoCreate bool + Type string +} diff --git a/pkg/targets/kaiwudb/scan.go b/pkg/targets/kaiwudb/scan.go new file mode 100644 index 000000000..4f5938202 --- /dev/null +++ b/pkg/targets/kaiwudb/scan.go @@ -0,0 +1,89 @@ +package kaiwudb + +import ( + "sync" + + "github.com/benchant/tsbs/pkg/data" + "github.com/benchant/tsbs/pkg/targets" +) + +// indexer is used to consistently send the same hostnames to the same worker +type indexer struct { + partitions uint + tmp map[string]uint + index uint + Buffer int +} + +func (i *indexer) GetIndex(item data.LoadedPoint) uint { + p := item.Data.(*point) + if p.sqlType != InsertMetric { + return 0 + } + index := i.index + i.index++ + if i.index == i.partitions { + i.index = 0 + } + //fmt.Println(key, index) + return index +} + +// point is a single row of data keyed by which superTable it belongs +type point struct { + sqlType byte + table string + device string + tag string + fieldCount int + sql string + cols string +} + +var GlobalTable = sync.Map{} + +type hypertableArr struct { + createSql []*point + m map[string][]string + devices map[string]bool + cols map[string]int + totalMetric uint64 + cnt uint +} + +func (ha *hypertableArr) Len() uint { + return ha.cnt +} + +func (ha *hypertableArr) Append(item data.LoadedPoint) { + that := item.Data.(*point) + if that.sqlType == InsertMetric { + ha.m[that.table+":"+that.device] = append(ha.m[that.table+":"+that.device], that.sql) + // ha.m[that.table] = append(ha.m[that.table], that.sql) + ha.devices[that.device] = true + ha.cols[that.table] = that.fieldCount + 2 + ha.totalMetric += uint64(that.fieldCount) + ha.cnt++ + } else { + ha.createSql = append(ha.createSql, that) + } +} + +func (ha *hypertableArr) Reset() { + ha.m = map[string][]string{} + ha.devices = map[string]bool{} + ha.cols = map[string]int{} + ha.cnt = 0 + ha.createSql = ha.createSql[:0] +} + +type factory struct{} + +func (f *factory) New() targets.Batch { + return &hypertableArr{ + m: map[string][]string{}, + devices: map[string]bool{}, + cols: map[string]int{}, + cnt: 0, + } +} diff --git a/pkg/targets/kaiwudb/serializer.go b/pkg/targets/kaiwudb/serializer.go new file mode 100644 index 000000000..0c63f04b7 --- /dev/null +++ b/pkg/targets/kaiwudb/serializer.go @@ -0,0 +1,290 @@ +package kaiwudb + +import ( + "bytes" + "fmt" + "io" + "strconv" + "strings" + + "github.com/benchant/tsbs/pkg/data" +) + +type Serializer struct { + tmpBuf *bytes.Buffer + tableMap map[string]map[string]struct{} + superTable map[string]*Table +} + +var nothing = struct{}{} + +type Table struct { + columns map[string]struct{} + tags map[string]struct{} + columnsStr string + tagsStr string +} + +func FastFormat(buf *bytes.Buffer, v interface{}) string { + switch v.(type) { + case int: + buf.WriteString(strconv.Itoa(v.(int))) + return "bigint" + case int64: + buf.WriteString(strconv.FormatInt(v.(int64), 10)) + return "bigint" + case float64: + buf.WriteString(strconv.FormatFloat(v.(float64), 'f', -1, 64)) + return "float" + case float32: + buf.WriteString(strconv.FormatFloat(float64(v.(float32)), 'f', -1, 32)) + return "float" + case bool: + buf.WriteString(strconv.FormatBool(v.(bool))) + return "bool" + case []byte: + buf.WriteByte('\'') + buf.WriteString(string(v.([]byte))) + buf.WriteByte('\'') + return "char(30)" + case string: + buf.WriteByte('\'') + buf.WriteString(v.(string)) + buf.WriteByte('\'') + return "char(30)" + case nil: + buf.WriteString("null") + return "null" + default: + panic(fmt.Sprintf("unknown field type for %#v", v)) + } +} + +var tmpMD5 = map[string]string{} +var tmpIndex = 0 + +func calculateTable(src []byte) string { + key := string(src) + v, exist := tmpMD5[key] + if exist { + return v + } + tmpIndex += 1 + v = fmt.Sprintf("t_%d", tmpIndex) + tmpMD5[key] = v + return v +} + +const ( + InsertMetric = '1' + CreateTable = '2' + InsertMetricAndTag = '3' + InsertTag = '4' + NotNull = " not null" +) + +type tbNameRule struct { + tag string + prefix string + nilValue string +} + +var tbRuleMap = map[string]*tbNameRule{ + "cpu": { + tag: "hostname", + nilValue: "host_null", + }, + "diskio": { + tag: "hostname", + nilValue: "host_null", + }, + "disk": { + tag: "hostname", + nilValue: "host_null", + }, + "kernel": { + tag: "hostname", + nilValue: "host_null", + }, + "mem": { + tag: "hostname", + nilValue: "host_null", + }, + "net": { + tag: "hostname", + nilValue: "host_null", + }, + "nginx": { + tag: "hostname", + nilValue: "host_null", + }, + "postgresl": { + tag: "hostname", + nilValue: "host_null", + }, + "redis": { + tag: "hostname", + nilValue: "host_null", + }, + "readings": { + tag: "name", + nilValue: "truck_null", + }, + "diagnostics": { + tag: "name", + nilValue: "truck_null", + }, +} + +func (s *Serializer) Serialize(p *data.Point, w io.Writer) error { + var fieldKeys []string + var fieldValues []string + var fieldTypes []string + var tagValues []string + var tagKeys []string + var tagTypes []string + tKeys := p.TagKeys() + tValues := p.TagValues() + fKeys := p.FieldKeys() + fValues := p.FieldValues() + superTable := string(p.MeasurementName()) + for i, value := range fValues { + fType := FastFormat(s.tmpBuf, value) + fieldKeys = append(fieldKeys, convertKeywords(string(fKeys[i]))) + fieldTypes = append(fieldTypes, fType) + fieldValues = append(fieldValues, s.tmpBuf.String()) + s.tmpBuf.Reset() + } + + rule := tbRuleMap[superTable] + fixedName := "" + for i, value := range tValues { + tType := FastFormat(s.tmpBuf, value) + tagKeys = append(tagKeys, convertKeywords(string(tKeys[i]))) + tagTypes = append(tagTypes, tType) + if rule != nil && len(fixedName) == 0 && string(tKeys[i]) == rule.tag { + str, is := value.(string) + if is { + fixedName = str + tagValues = append(tagValues, s.tmpBuf.String()) + } else { + fixedName = rule.nilValue + tagValues = append(tagValues, "'"+rule.nilValue+"'") + } + } else { + tagValues = append(tagValues, s.tmpBuf.String()) + } + + s.tmpBuf.Reset() + } + + subTable := "" + if rule != nil { + if len(fixedName) != 0 { + if len(rule.prefix) == 0 { + subTable = fixedName + } else { + s.tmpBuf.WriteString(rule.prefix) + s.tmpBuf.WriteString(fixedName) + subTable = s.tmpBuf.String() + s.tmpBuf.Reset() + } + } else { + subTable = rule.nilValue + } + } else { + s.tmpBuf.WriteString(superTable) + for i, v := range tagValues { + s.tmpBuf.WriteByte(',') + s.tmpBuf.WriteString(tagKeys[i]) + s.tmpBuf.WriteByte('=') + s.tmpBuf.WriteString(v) + } + subTable = calculateTable(s.tmpBuf.Bytes()) + s.tmpBuf.Reset() + } + + _, exist := s.superTable[superTable] + if !exist { + for i := 0; i < len(fieldTypes); i++ { + s.tmpBuf.WriteByte(',') + s.tmpBuf.WriteString(fieldKeys[i]) + s.tmpBuf.WriteByte(' ') + s.tmpBuf.WriteString(fieldTypes[i]) + s.tmpBuf.WriteString(NotNull) + } + columnsStr := s.tmpBuf.String() + s.tmpBuf.Reset() + for i := 0; i < len(tagTypes); i++ { + s.tmpBuf.WriteString(tagKeys[i]) + s.tmpBuf.WriteByte(' ') + s.tmpBuf.WriteString(tagTypes[i]) + if i == 0 { + s.tmpBuf.WriteString(" not null") + } + if i != len(tagTypes)-1 { + s.tmpBuf.WriteByte(',') + } + } + tagsStr := s.tmpBuf.String() + columnsForInsertMetrics := "(k_timestamp:" + strings.Join(fieldKeys, ":") + ":" + rule.tag + ")" + fmt.Fprintf(w, "%c,%s,%s,create table %s (k_timestamp timestamp not null%s) tags (%s) primary tags (%s)\n", + CreateTable, superTable, columnsForInsertMetrics, superTable, columnsStr, tagsStr, rule.tag) + s.tmpBuf.Reset() + table := &Table{ + columns: map[string]struct{}{}, + tags: map[string]struct{}{}, + columnsStr: columnsStr, + tagsStr: tagsStr, + } + for _, key := range fieldKeys { + table.columns[key] = nothing + } + for _, key := range tagKeys { + table.tags[key] = nothing + } + s.superTable[superTable] = table + s.tableMap[superTable] = map[string]struct{}{} + } + _, exist = s.tableMap[superTable][subTable] + if !exist { + fmt.Fprintf(w, "%c,%s,%s,(%d,%s,%s)\n", InsertMetricAndTag, superTable, subTable, p.TimestampInUnixMs(), strings.Join(fieldValues, ","), strings.Join(tagValues, ",")) + s.tableMap[superTable][subTable] = nothing + } else { + fmt.Fprintf(w, "%c,%s,%s,%d,(%d,%s,%s)\n", InsertMetric, superTable, subTable, len(fieldValues), p.TimestampInUnixMs(), strings.Join(fieldValues, ","), tagValues[0]) + } + + return nil +} + +var keyWords = map[string]bool{} + +func convertKeywords(s string) string { + if is := keyWords[s]; is { + return fmt.Sprintf("`%s`", s) + } + return s +} + +func trimString(s string, cutset uint8) string { + result := "" + for i := 0; i < len(s); i++ { + if s[i] != cutset { + result = fmt.Sprintf("%s%c", result, s[i]) + } + } + return result +} + +func trimColumnName(s string) string { + columnNameAndTypes := strings.Split(s, ",") + columnResult := make([]string, 0) + for _, columnNameAndType := range columnNameAndTypes { + columnName := strings.Split(columnNameAndType, " ") + if len(columnName[0]) > 20 { + columnName[0] = columnName[0][:20] + } + columnResult = append(columnResult, strings.Join(columnName, " ")) + } + return strings.Join(columnResult, ",") +} diff --git a/pkg/targets/kaiwudb/thread/locker.go b/pkg/targets/kaiwudb/thread/locker.go new file mode 100644 index 000000000..2ad485075 --- /dev/null +++ b/pkg/targets/kaiwudb/thread/locker.go @@ -0,0 +1,33 @@ +package thread + +import "runtime" + +type Locker struct { + c chan struct{} +} + +func NewLocker(count int) *Locker { + return &Locker{c: make(chan struct{}, count)} +} + +func (l *Locker) Lock() { + l.c <- struct{}{} +} + +func (l *Locker) Unlock() { + <-l.c +} + +var c chan struct{} + +func Lock() { + c <- struct{}{} +} + +func Unlock() { + <-c +} + +func init() { + c = make(chan struct{}, runtime.NumCPU()) +} diff --git a/pkg/targets/kaiwudb/thread/locker_test.go b/pkg/targets/kaiwudb/thread/locker_test.go new file mode 100644 index 000000000..f49be5c33 --- /dev/null +++ b/pkg/targets/kaiwudb/thread/locker_test.go @@ -0,0 +1,41 @@ +package thread + +import ( + "testing" +) + +// @author: xftan +// @date: 2021/12/14 15:16 +// @description: test NewLocker +func TestNewLocker(t *testing.T) { + type args struct { + count int + } + tests := []struct { + name string + args args + }{ + { + name: "test", + args: args{ + count: 1, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + locker := NewLocker(tt.args.count) + locker.Lock() + locker.Unlock() + }) + } +} + +// @author: xftan +// @date: 2021/12/14 15:16 +// @description: test DefaultLocker +func TestDefaultLocker(t *testing.T) { + Lock() + t.Log("success") + defer Unlock() +} From 2d656da6ce77db9453ceed16a498896c3f1ca2cf Mon Sep 17 00:00:00 2001 From: rongtianyang Date: Fri, 15 Nov 2024 15:04:26 +0800 Subject: [PATCH 2/2] add kaiwudb doc --- docs/kaiwudb.md | 83 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 docs/kaiwudb.md diff --git a/docs/kaiwudb.md b/docs/kaiwudb.md new file mode 100644 index 000000000..8468dbb2a --- /dev/null +++ b/docs/kaiwudb.md @@ -0,0 +1,83 @@ +# TSBS Supplemental Guide: KaiwuDB + +**This should be read *after* the main README.** + +KaiwuDB is a distributed multi-model database tailored for AIoT scenarios. It supports the concurrent creation of time-series and relational databases within the same instance and the integrated processing of multi-model data. KaiwuDB can process time-series data with high efficiency. For more information about KaiwuDB, visit [KaiwuDB website](https://www.kaiwudb.com/). + +This supplemental guide explains how the data generated for TSBS is stored, additional flags available when using the data importer (`tsbs_load_Kaiwudb`), and additional flags available for the query runner (`tsbs_run_queries_Kaiwudb`). + +## Data format + +Data generated by `tsbs_generate_data` for KaiwuDB is serialized in a "pseudo-CSV" format. Each reading consists of a row, where the first item represents the operation type, either `3` (insert data with tags) or `1` (insert data). + +- 3 (insert data with tags): The format is: `3,table_name,primary_tag,(insert_values,insert_other_tags)` +- 1 (insert data): The format is: `1,table_name,primary_tag,insert_values_length,(timestamp,insert_values,primary_tag)` + +The `primary_tag` is the primary key or unique identifier for each device in the dataset. For example, in the `devops` test case, if `measurementName` is `cpu`, the primary tag could be `hostname` and one possible tag value would be `host_0`. + +The unit of `timestamp` is millisecond. + +An example for the `cpu-only` use case: + +```text +3,cpu,host_0,(1658707200000,58,2,24,61,22,63,6,44,80,38,'host_0','eu-central-1','eu-central-1a','6','Ubuntu15.10','x86','SF','19','1','test') +1,cpu,host_0,10,(1658707210000,57,2,25,61,23,63,6,46,78,36,'host_0') +``` + +--- + +## `tsbs_load_kaiwudb` Additional Flags + +### KaiwuDB related + +#### `-host` (type: `string`, default: `localhost`) + +Hostname of the KaiwuDB instance. + +#### `-port` (type: `string`, default: `36257`) + +Port to connect to on the database host. + +#### `-user` (type: `string`, default: `root`) + +Username for connecting to KaiwuDB. + +#### `-pass` (type: `string`, default: ``) + +Password for connecting to KaiwuDB. + +#### `-insert-type` (type: `string`, default: `prepare`) + +Defines the insert type: + +- `insert`: simple insert +- `prepare`: insert using prepare statements. + +--- + +## `tsbs_run_queries_kaiwudb` Additional Flags + +### KaiwuDB related + +#### `-host` (type: `string`, default: `localhost`) + +Hostname of the KaiwuDB instance. + +#### `-port` (type: `string`, default: `36257`) + +Port to connect to on the database host. + +#### `-user` (type: `string`, default: `root`) + +Username for connecting to KaiwuDB. + +#### `-pass` (type: `string`, default: ``) + +Password for connecting to KaiwuDB. + +#### `-query-mode` (type: `int`, default: 5) + +Sets the query execution mode: + +- `5`: simple query +- `1`: query using prepare statements. This mode may accelerate the query performance, particularly for certain query types, such as `single-groupby-1-1-1`. \ No newline at end of file