From 141fbafa3066e9ef334e2a8ffd559fb7d7421768 Mon Sep 17 00:00:00 2001 From: iguazio-deploy Date: Tue, 10 Mar 2020 15:30:15 +0000 Subject: [PATCH] Updated TSDB to v0.9.17 --- go.mod | 7 +- go.sum | 6 +- vendor/github.com/v3io/frames/go.mod | 2 +- .../v3io/v3io-go/pkg/dataplane/container.go | 5 +- .../v3io-go/pkg/dataplane/http/container.go | 4 +- .../v3io-go/pkg/dataplane/http/context.go | 79 ++- .../v3io/v3io-go/pkg/dataplane/types.go | 12 + .../v3io/v3io-tsdb/pkg/appender/ingest.go | 6 +- .../v3io/v3io-tsdb/pkg/chunkenc/xor.go | 19 +- .../v3io/v3io-tsdb/pkg/config/config.go | 18 +- .../v3io/v3io-tsdb/pkg/partmgr/partmgr.go | 62 +- .../v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go | 540 +++++++++++++++++- .../v3io/v3io-tsdb/pkg/utils/misc.go | 14 + vendor/modules.txt | 6 +- 14 files changed, 708 insertions(+), 72 deletions(-) diff --git a/go.mod b/go.mod index 345ced561a9..5568f8bef69 100644 --- a/go.mod +++ b/go.mod @@ -84,8 +84,7 @@ require ( github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04 github.com/smartystreets/goconvey v0.0.0-20190306220146-200a235640ff // indirect github.com/stretchr/testify v1.4.0 - github.com/v3io/v3io-go v0.1.5-0.20200224125003-964a745e51aa // indirect - github.com/v3io/v3io-tsdb v0.9.16 + github.com/v3io/v3io-tsdb v0.9.17 go.opencensus.io v0.19.2 // indirect golang.org/x/net v0.0.0-20190311183353-d8887717615a golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421 @@ -113,6 +112,6 @@ replace labix.org/v2/mgo => github.com/go-mgo/mgo v0.0.0-20180705113738-7446a034 replace launchpad.net/gocheck => github.com/go-check/check v0.0.0-20180628173108-788fd78401277ebd861206a03c884797c6ec5541 -replace github.com/v3io/frames => github.com/v3io/frames v0.6.11-v0.9.16 +replace github.com/v3io/frames => github.com/v3io/frames v0.6.11-v0.9.17 -replace github.com/v3io/v3io-tsdb => github.com/v3io/v3io-tsdb v0.9.16 +replace github.com/v3io/v3io-tsdb => github.com/v3io/v3io-tsdb v0.9.17 diff --git a/go.sum b/go.sum index c9b32f9cc7b..fa483152e2b 100644 --- a/go.sum +++ b/go.sum @@ -324,8 +324,8 @@ github.com/rlmcpherson/s3gof3r v0.5.0/go.mod h1:s7vv7SMDPInkitQMuZzH615G7yWHdrU2 github.com/rs/xid v1.1.0/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rubyist/circuitbreaker v2.2.1+incompatible h1:KUKd/pV8Geg77+8LNDwdow6rVCAYOp8+kHUyFvL6Mhk= github.com/rubyist/circuitbreaker v2.2.1+incompatible/go.mod h1:Ycs3JgJADPuzJDwffe12k6BZT8hxVi6lFK+gWYJLN4A= -github.com/russross/blackfriday v1.5.2+incompatible/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/russross/blackfriday v1.5.2+incompatible/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13 h1:4AQBn5RJY4WH8t8TLEMZUsWeXHAUcoao42TCAfpEJJE= github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= @@ -368,6 +368,8 @@ github.com/v3io/frames v0.0.0-20190328123118-1dad1ff610509e7b087d9cd390ed1b452ca github.com/v3io/frames v0.0.0-20190328123118-1dad1ff610509e7b087d9cd390ed1b452caecf15/go.mod h1:6aKW4Wl4A+gQhXH0JRCVOLgwvcrLyk+fqEpemuie094= github.com/v3io/frames v0.6.11-v0.9.16 h1:RrKC+8J7h+WzeYj+R7cFVzGw0peP3DQDS+U83WhWjUI= github.com/v3io/frames v0.6.11-v0.9.16/go.mod h1:SEU2rfEU4AxL6GQc/mXB+tFQoA3yOAN93wCoUMFTJlQ= +github.com/v3io/frames v0.6.11-v0.9.17 h1:PRMYAzvz01uwelnWtzpZC4hvL5jIF43T41eCxIt6jzE= +github.com/v3io/frames v0.6.11-v0.9.17/go.mod h1:cm4d/Eb2HZnLTptisOjBT1bwGa9zXLCBzIqx/GqyDBs= github.com/v3io/v3io-go v0.0.0-20180415000000-1486c75b0e590a14580f7d9b6cef7a944a231ca7 h1:J+ps6exCjowNidrtawSQglJQpKrJ6v8UjBVTNrRTpMs= github.com/v3io/v3io-go v0.0.0-20180415000000-1486c75b0e590a14580f7d9b6cef7a944a231ca7/go.mod h1:MHc+d/Jg/y8lV4B9sgwTvuS3tEE9wS+kqtU0+D0Sr78= github.com/v3io/v3io-go v0.0.0-20190826150152-1f2c9a9a61cb715410a35662f5ddab2b306f95e7 h1:Qx3yIJPtDTJVv/gck/009TrBF/JMwe5RM3N9aTZ4Mlo= @@ -428,6 +430,8 @@ github.com/v3io/v3io-tsdb v0.9.15 h1:iFZvF/pvg8C+bZe9xMcdUi0glKcr1EovUts5K/o16SU github.com/v3io/v3io-tsdb v0.9.15/go.mod h1:hdeE2K5HLZsp2NL8VRK9vmMp7cZ2F8x3VoQB2Gr34MI= github.com/v3io/v3io-tsdb v0.9.16 h1:PF16ZyP76rsLbgcKK+nP/AgbpDIGhYaJ8cmgzBJ0vjA= github.com/v3io/v3io-tsdb v0.9.16/go.mod h1:u0exEO2VYFbuOXbmAilimkoWztkvBuTbNMFysLm99ho= +github.com/v3io/v3io-tsdb v0.9.17 h1:c9AFs0Sy+qHFNzB+r4YOeXnM0ySMH4qZjPO5xobCnHw= +github.com/v3io/v3io-tsdb v0.9.17/go.mod h1:niG39x0WSqZNbRoy+WUZ1jb0t3ClV/tCU3Bsi1yUamc= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.0.0 h1:BwIoZQbBsTo3v2F5lz5Oy3TlTq4wLKTLV260EVTEWco= diff --git a/vendor/github.com/v3io/frames/go.mod b/vendor/github.com/v3io/frames/go.mod index c2f11ef0783..9237f34908b 100644 --- a/vendor/github.com/v3io/frames/go.mod +++ b/vendor/github.com/v3io/frames/go.mod @@ -13,7 +13,7 @@ require ( github.com/pkg/errors v0.8.1 github.com/stretchr/testify v1.4.0 github.com/v3io/v3io-go v0.0.5-0.20191205125653-9003ae83f0b6 - github.com/v3io/v3io-tsdb v0.9.16 + github.com/v3io/v3io-tsdb v0.9.17 github.com/valyala/fasthttp v1.2.0 github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 golang.org/x/net v0.0.0-20181114220301-adae6a3d119a diff --git a/vendor/github.com/v3io/v3io-go/pkg/dataplane/container.go b/vendor/github.com/v3io/v3io-go/pkg/dataplane/container.go index c2dbff31561..3ab3307c3f4 100644 --- a/vendor/github.com/v3io/v3io-go/pkg/dataplane/container.go +++ b/vendor/github.com/v3io/v3io-go/pkg/dataplane/container.go @@ -18,7 +18,6 @@ package v3io // A container interface allows perform actions against a container type Container interface { - // // Container // @@ -77,7 +76,7 @@ type Container interface { PutItem(*PutItemInput, interface{}, chan *Response) (*Request, error) // PutItemSync - PutItemSync(*PutItemInput) error + PutItemSync(*PutItemInput) (*Response, error) // PutItems PutItems(*PutItemsInput, interface{}, chan *Response) (*Request, error) @@ -89,7 +88,7 @@ type Container interface { UpdateItem(*UpdateItemInput, interface{}, chan *Response) (*Request, error) // UpdateItemSync - UpdateItemSync(*UpdateItemInput) error + UpdateItemSync(*UpdateItemInput) (*Response, error) // // Stream diff --git a/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/container.go b/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/container.go index cd5d26dcabf..4b4523db3c2 100644 --- a/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/container.go +++ b/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/container.go @@ -67,7 +67,7 @@ func (c *container) PutItem(putItemInput *v3io.PutItemInput, } // PutItemSync -func (c *container) PutItemSync(putItemInput *v3io.PutItemInput) error { +func (c *container) PutItemSync(putItemInput *v3io.PutItemInput) (*v3io.Response, error) { c.populateInputFields(&putItemInput.DataPlaneInput) return c.session.context.PutItemSync(putItemInput) } @@ -95,7 +95,7 @@ func (c *container) UpdateItem(updateItemInput *v3io.UpdateItemInput, } // UpdateItemSync -func (c *container) UpdateItemSync(updateItemInput *v3io.UpdateItemInput) error { +func (c *container) UpdateItemSync(updateItemInput *v3io.UpdateItemInput) (*v3io.Response, error) { c.populateInputFields(&updateItemInput.DataPlaneInput) return c.session.context.UpdateItemSync(updateItemInput) } diff --git a/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/context.go b/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/context.go index c5f00a5eb7e..3157f93369b 100644 --- a/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/context.go +++ b/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/context.go @@ -321,7 +321,7 @@ func (c *context) PutItem(putItemInput *v3io.PutItemInput, } // PutItemSync -func (c *context) PutItemSync(putItemInput *v3io.PutItemInput) error { +func (c *context) PutItemSync(putItemInput *v3io.PutItemInput) (*v3io.Response, error) { var body map[string]interface{} if putItemInput.UpdateMode != "" { body = map[string]interface{}{ @@ -330,15 +330,24 @@ func (c *context) PutItemSync(putItemInput *v3io.PutItemInput) error { } // prepare the query path - _, err := c.putItem(&putItemInput.DataPlaneInput, + response, err := c.putItem(&putItemInput.DataPlaneInput, putItemInput.Path, putItemFunctionName, putItemInput.Attributes, putItemInput.Condition, putItemHeaders, body) + if err != nil { + return nil, err + } - return err + mtimeSecs, mtimeNSecs, err := parseMtimeHeader(response) + if err != nil { + return nil, err + } + response.Output = &v3io.PutItemOutput{MtimeSecs: mtimeSecs, MtimeNSecs: mtimeNSecs} + + return response, err } // PutItems @@ -399,8 +408,9 @@ func (c *context) UpdateItem(updateItemInput *v3io.UpdateItemInput, } // UpdateItemSync -func (c *context) UpdateItemSync(updateItemInput *v3io.UpdateItemInput) error { +func (c *context) UpdateItemSync(updateItemInput *v3io.UpdateItemInput) (*v3io.Response, error) { var err error + var response *v3io.Response if updateItemInput.Attributes != nil { @@ -413,26 +423,45 @@ func (c *context) UpdateItemSync(updateItemInput *v3io.UpdateItemInput) error { body["UpdateMode"] = updateItemInput.UpdateMode } - _, err = c.putItem(&updateItemInput.DataPlaneInput, + response, err = c.putItem(&updateItemInput.DataPlaneInput, updateItemInput.Path, putItemFunctionName, updateItemInput.Attributes, updateItemInput.Condition, putItemHeaders, body) + if err != nil { + return nil, err + } + + mtimeSecs, mtimeNSecs, err := parseMtimeHeader(response) + if err != nil { + return nil, err + } + response.Output = &v3io.UpdateItemOutput{MtimeSecs: mtimeSecs, MtimeNSecs: mtimeNSecs} } else if updateItemInput.Expression != nil { - _, err = c.updateItemWithExpression(&updateItemInput.DataPlaneInput, + response, err = c.updateItemWithExpression(&updateItemInput.DataPlaneInput, updateItemInput.Path, updateItemFunctionName, *updateItemInput.Expression, updateItemInput.Condition, updateItemHeaders, updateItemInput.UpdateMode) + if err != nil { + return nil, err + } + + mtimeSecs, mtimeNSecs, err := parseMtimeHeader(response) + if err != nil { + return nil, err + } + response.Output = &v3io.UpdateItemOutput{MtimeSecs: mtimeSecs, MtimeNSecs: mtimeNSecs} + } - return err + return response, err } // GetObject @@ -1142,11 +1171,11 @@ func (c *context) workerEntry(workerIndex int) { case *v3io.GetItemsInput: response, err = c.GetItemsSync(typedInput) case *v3io.PutItemInput: - err = c.PutItemSync(typedInput) + response, err = c.PutItemSync(typedInput) case *v3io.PutItemsInput: response, err = c.PutItemsSync(typedInput) case *v3io.UpdateItemInput: - err = c.UpdateItemSync(typedInput) + response, err = c.UpdateItemSync(typedInput) case *v3io.CreateStreamInput: err = c.CreateStreamSync(typedInput) case *v3io.DescribeStreamInput: @@ -1403,3 +1432,35 @@ func (c *context) getItemsParseCAPNPResponse(response *v3io.Response, withWildca } return &getItemsOutput, nil } + +// parsing the mtime from a header of the form `__mtime_secs==1581605100 and __mtime_nsecs==498349956` +func parseMtimeHeader(response *v3io.Response) (int, int, error) { + var mtimeSecs, mtimeNSecs int + var err error + + mtimeHeader := string(response.HeaderPeek("X-v3io-transaction-verifier")) + for _, expression := range strings.Split(mtimeHeader, "and") { + mtimeParts := strings.Split(expression, "==") + mtimeType := strings.TrimSpace(mtimeParts[0]) + if mtimeType == "__mtime_secs" { + mtimeSecs, err = trimAndParseInt(mtimeParts[1]) + if err != nil { + return 0, 0, err + } + } else if mtimeType == "__mtime_nsecs" { + mtimeNSecs, err = trimAndParseInt(mtimeParts[1]) + if err != nil { + return 0, 0, err + } + } else { + return 0, 0, fmt.Errorf("failed to parse 'X-v3io-transaction-verifier', unexpected symbol '%v' ", mtimeType) + } + } + + return mtimeSecs, mtimeNSecs, nil +} + +func trimAndParseInt(str string) (int, error) { + trimmed := strings.TrimSpace(str) + return strconv.Atoi(trimmed) +} \ No newline at end of file diff --git a/vendor/github.com/v3io/v3io-go/pkg/dataplane/types.go b/vendor/github.com/v3io/v3io-go/pkg/dataplane/types.go index aa4833ead24..58258469204 100644 --- a/vendor/github.com/v3io/v3io-go/pkg/dataplane/types.go +++ b/vendor/github.com/v3io/v3io-go/pkg/dataplane/types.go @@ -202,6 +202,12 @@ type PutItemInput struct { UpdateMode string } +type PutItemOutput struct { + DataPlaneInput + MtimeSecs int + MtimeNSecs int +} + type PutItemsInput struct { DataPlaneInput Path string @@ -224,6 +230,12 @@ type UpdateItemInput struct { UpdateMode string } +type UpdateItemOutput struct { + DataPlaneInput + MtimeSecs int + MtimeNSecs int +} + type GetItemInput struct { DataPlaneInput Path string diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go b/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go index 6a24b45002d..817510a76cb 100644 --- a/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go +++ b/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go @@ -62,7 +62,7 @@ func (mc *MetricsCache) metricFeed(index int) { mc.logger.Debug(`Complete update cycle - "in-flight requests"=%d; "metric queue length"=%d\n`, inFlight, length) // If data was sent and the queue is empty, mark as completion - if length == 0 && gotData { + if length == 0 && gotData && len(mc.asyncAppendChan) == 0 { gotCompletion = true if completeChan != nil { completeChan <- 0 @@ -80,7 +80,7 @@ func (mc *MetricsCache) metricFeed(index int) { completeChan = app.resp length := mc.metricQueue.Length() - if gotCompletion && length == 0 { + if gotCompletion && length == 0 && len(mc.asyncAppendChan) == 0 { completeChan <- 0 gotCompletion = false gotData = false @@ -209,7 +209,7 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) { } // Notify the metric feeder when all in-flight tasks are done - if mc.updatesInFlight == 0 { + if mc.updatesInFlight == 0 && len(mc.asyncAppendChan) == 0 { mc.logger.Debug("Return to feed. Metric queue length: %d", mc.metricQueue.Length()) mc.updatesComplete <- 0 } diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/chunkenc/xor.go b/vendor/github.com/v3io/v3io-tsdb/pkg/chunkenc/xor.go index 06e8df5cff9..bfe9a5e3852 100644 --- a/vendor/github.com/v3io/v3io-tsdb/pkg/chunkenc/xor.go +++ b/vendor/github.com/v3io/v3io-tsdb/pkg/chunkenc/xor.go @@ -160,6 +160,8 @@ type xorAppender struct { leading uint8 trailing uint8 + + isPreviousNewSeries bool } func (a *xorAppender) Encoding() Encoding { @@ -192,20 +194,22 @@ func (a *xorAppender) Append(t int64, vvar interface{}) { return } - if num == 0 { + // We write time deltas as 32 bits (for compression) if the delta is too large we'll start a new series + tDelta = uint64(t - a.t) + shouldStartNewSeries := num == 0 || bits.Len64(tDelta) >= 32 + + if shouldStartNewSeries { // add a signature 11111 to indicate start of cseries in case we put few in the same chunk (append to existing) a.b.writeBits(0x1f, 5) a.b.writeBits(uint64(t), 51) a.b.writeBits(math.Float64bits(v), 64) - - } else if num == 1 { - tDelta = uint64(t - a.t) - + a.isPreviousNewSeries = true + tDelta = 0 // saving time delta for the first element is redundant + } else if a.isPreviousNewSeries { a.b.writeBits(tDelta, 32) a.writeVDelta(v) - + a.isPreviousNewSeries = false } else { - tDelta = uint64(t - a.t) dod := int64(tDelta - a.tDelta) // Gorilla has a max resolution of seconds, Prometheus milliseconds. @@ -228,6 +232,7 @@ func (a *xorAppender) Append(t int64, vvar interface{}) { } a.writeVDelta(v) + } a.t = t diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/config/config.go b/vendor/github.com/v3io/v3io-tsdb/pkg/config/config.go index ca1ea85e9f0..494f4e5857f 100644 --- a/vendor/github.com/v3io/v3io-tsdb/pkg/config/config.go +++ b/vendor/github.com/v3io/v3io-tsdb/pkg/config/config.go @@ -62,14 +62,16 @@ const ( DefaultUseServerAggregateCoefficient = 3 // KV attribute names - MaxTimeAttrName = "_maxtime" - LabelSetAttrName = "_lset" - EncodingAttrName = "_enc" - OutOfOrderAttrName = "_ooo" - MetricNameAttrName = "_name" - ObjectNameAttrName = "__name" - ChunkAttrPrefix = "_v" - AggregateAttrPrefix = "_v_" + MaxTimeAttrName = "_maxtime" + LabelSetAttrName = "_lset" + EncodingAttrName = "_enc" + OutOfOrderAttrName = "_ooo" + MetricNameAttrName = "_name" + ObjectNameAttrName = "__name" + ChunkAttrPrefix = "_v" + AggregateAttrPrefix = "_v_" + MtimeSecsAttributeName = "__mtime_secs" + MtimeNSecsAttributeName = "__mtime_nsecs" PrometheusMetricNameAttribute = "__name__" diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/partmgr/partmgr.go b/vendor/github.com/v3io/v3io-tsdb/pkg/partmgr/partmgr.go index 73103918d6d..aaf716cc072 100644 --- a/vendor/github.com/v3io/v3io-tsdb/pkg/partmgr/partmgr.go +++ b/vendor/github.com/v3io/v3io-tsdb/pkg/partmgr/partmgr.go @@ -195,7 +195,7 @@ func (p *PartitionManager) updateSchema() error { } input := &v3io.PutItemInput{Path: schemaFilePath, Attributes: attributes} - err := p.container.PutItemSync(input) + _, err := p.container.PutItemSync(input) if err != nil { outerError = errors.Wrap(err, "failed to update partitions table.") @@ -235,7 +235,7 @@ func (p *PartitionManager) DeletePartitionsFromSchema(partitionsToDelete []*DBPa deletePartitionExpression.WriteString(");") } expression := deletePartitionExpression.String() - err := p.container.UpdateItemSync(&v3io.UpdateItemInput{Path: p.GetSchemaFilePath(), Expression: &expression}) + _, err := p.container.UpdateItemSync(&v3io.UpdateItemInput{Path: p.GetSchemaFilePath(), Expression: &expression}) if err != nil { return err } @@ -589,6 +589,33 @@ func (p *DBPartition) Time2Bucket(t int64) int { return int((t - p.startTime) / p.rollupTime) } +// Return the start time of an aggregation bucket by id +func (p *DBPartition) GetAggregationBucketStartTime(id int) int64 { + return p.startTime + int64(id)*p.rollupTime +} + +// Return the end time of an aggregation bucket by id +func (p *DBPartition) GetAggregationBucketEndTime(id int) int64 { + return p.startTime + int64(id+1)*p.rollupTime - 1 +} + +func (p *DBPartition) Times2BucketRange(start, end int64) []int { + var buckets []int + + if start > p.GetEndTime() || end < p.startTime { + return buckets + } + + startingAggrBucket := p.Time2Bucket(start) + endAggrBucket := p.Time2Bucket(end) + + for bucketID := startingAggrBucket; bucketID <= endAggrBucket; bucketID++ { + buckets = append(buckets, bucketID) + } + + return buckets +} + // Return the nearest chunk start time for the specified time func (p *DBPartition) GetChunkMint(t int64) int64 { if t > p.GetEndTime() { @@ -618,6 +645,37 @@ func (p *DBPartition) TimeToChunkID(tmilli int64) (int, error) { return -1, errors.Errorf("Time %d isn't within the range of this partition.", tmilli) } +// Check if a chunk (by attribute name) is in the given time range. +func (p *DBPartition) IsChunkInRangeByAttr(attr string, mint, maxt int64) bool { + + // Discard '_v' prefix + chunkIDStr := attr[2:] + chunkID, err := strconv.ParseInt(chunkIDStr, 10, 64) + if err != nil { + return false + } + + chunkStartTime := p.startTime + (chunkID-1)*p.chunkInterval + chunkEndTime := chunkStartTime + p.chunkInterval - 1 + + return mint <= chunkStartTime && maxt >= chunkEndTime +} + +// Get a chunk's start time by it's attribute name +func (p *DBPartition) GetChunkStartTimeByAttr(attr string) (int64, error) { + + // Discard '_v' prefix + chunkIDStr := attr[2:] + chunkID, err := strconv.ParseInt(chunkIDStr, 10, 64) + if err != nil { + return 0, err + } + + chunkStartTime := p.startTime + (chunkID-1)*p.chunkInterval + + return chunkStartTime, nil +} + // Check whether the specified time is within the range of this partition func (p *DBPartition) InRange(t int64) bool { if p.manager.cyclic { diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go b/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go index 140019e3588..db137970959 100644 --- a/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go +++ b/vendor/github.com/v3io/v3io-tsdb/pkg/tsdb/v3iotsdb.go @@ -22,18 +22,24 @@ package tsdb import ( "context" + "encoding/base64" "encoding/json" "fmt" "math" pathUtil "path" "path/filepath" + "strconv" + "strings" + "sync" "time" "github.com/nuclio/logger" "github.com/pkg/errors" "github.com/v3io/v3io-go/pkg/dataplane" "github.com/v3io/v3io-go/pkg/dataplane/http" + "github.com/v3io/v3io-tsdb/pkg/aggregate" "github.com/v3io/v3io-tsdb/pkg/appender" + "github.com/v3io/v3io-tsdb/pkg/chunkenc" "github.com/v3io/v3io-tsdb/pkg/config" "github.com/v3io/v3io-tsdb/pkg/partmgr" "github.com/v3io/v3io-tsdb/pkg/pquerier" @@ -42,7 +48,14 @@ import ( "github.com/v3io/v3io-tsdb/pkg/utils" ) -const defaultHTTPTimeout = 30 * time.Second +const ( + defaultHTTPTimeout = 30 * time.Second + + errorCodeString = "ErrorCode" + falseConditionOuterErrorCode = "184549378" // todo: change codes + falseConditionInnerErrorCode = "385876025" + maxExpressionsInUpdateItem = 1500 // max is 2000, we're taking a buffer since it doesn't work with 2000 +) type V3ioAdapter struct { startTimeMargin int64 @@ -54,15 +67,26 @@ type V3ioAdapter struct { partitionMngr *partmgr.PartitionManager } -func CreateTSDB(cfg *config.V3ioConfig, schema *config.Schema) error { +type DeleteParams struct { + Metrics []string + Filter string + From, To int64 + DeleteAll bool + + IgnoreErrors bool +} + +func CreateTSDB(cfg *config.V3ioConfig, schema *config.Schema, container v3io.Container) error { lgr, _ := utils.NewLogger(cfg.LogLevel) httpTimeout := parseHTTPTimeout(cfg, lgr) - container, err := utils.CreateContainer(lgr, cfg, httpTimeout) - if err != nil { - return errors.Wrap(err, "Failed to create a data container.") + var err error + if container == nil { + container, err = utils.CreateContainer(lgr, cfg, httpTimeout) + if err != nil { + return errors.Wrap(err, "Failed to create a data container.") + } } - data, err := json.Marshal(schema) if err != nil { return errors.Wrap(err, "Failed to marshal the TSDB schema file.") @@ -240,59 +264,55 @@ func (a *V3ioAdapter) QuerierV2() (*pquerier.V3ioQuerier, error) { return pquerier.NewV3ioQuerier(a.container, a.logger, a.cfg, a.partitionMngr), nil } -func (a *V3ioAdapter) DeleteDB(deleteAll bool, ignoreErrors bool, fromTime int64, toTime int64) error { - if deleteAll { +// Delete by time range can optionally specify metrics and filter by labels +func (a *V3ioAdapter) DeleteDB(deleteParams DeleteParams) error { + if deleteParams.DeleteAll { // Ignore time boundaries - fromTime = 0 - toTime = math.MaxInt64 - } - - partitions := a.partitionMngr.PartsForRange(fromTime, toTime, false) - for _, part := range partitions { - a.logger.Info("Deleting partition '%s'.", part.GetTablePath()) - err := utils.DeleteTable(a.logger, a.container, part.GetTablePath(), "", a.cfg.QryWorkers) - if err != nil && !ignoreErrors { - return errors.Wrapf(err, "Failed to delete partition '%s'.", part.GetTablePath()) - } - // Delete the Directory object - err = a.container.DeleteObjectSync(&v3io.DeleteObjectInput{Path: part.GetTablePath()}) - if err != nil && !ignoreErrors { - return errors.Wrapf(err, "Failed to delete partition object '%s'.", part.GetTablePath()) + deleteParams.From = 0 + deleteParams.To = math.MaxInt64 + } else { + if deleteParams.To == 0 { + deleteParams.To = time.Now().Unix() * 1000 } } - err := a.partitionMngr.DeletePartitionsFromSchema(partitions) + + // Delete Data + err := a.DeletePartitionsData(&deleteParams) if err != nil { return err } + // If no data is left, delete Names folder if len(a.partitionMngr.GetPartitionsPaths()) == 0 { path := filepath.Join(a.cfg.TablePath, config.NamesDirectory) + "/" // Need a trailing slash a.logger.Info("Delete metric names at path '%s'.", path) err := utils.DeleteTable(a.logger, a.container, path, "", a.cfg.QryWorkers) - if err != nil && !ignoreErrors { + if err != nil && !deleteParams.IgnoreErrors { return errors.Wrap(err, "Failed to delete the metric-names table.") } // Delete the Directory object err = a.container.DeleteObjectSync(&v3io.DeleteObjectInput{Path: path}) - if err != nil && !ignoreErrors { + if err != nil && !deleteParams.IgnoreErrors { if !utils.IsNotExistsError(err) { return errors.Wrapf(err, "Failed to delete table object '%s'.", path) } } } - if deleteAll { + + // If need to 'deleteAll', delete schema + TSDB table folder + if deleteParams.DeleteAll { // Delete Schema file schemaPath := pathUtil.Join(a.cfg.TablePath, config.SchemaConfigFileName) a.logger.Info("Delete the TSDB configuration at '%s'.", schemaPath) err := a.container.DeleteObjectSync(&v3io.DeleteObjectInput{Path: schemaPath}) - if err != nil && !ignoreErrors { + if err != nil && !deleteParams.IgnoreErrors { return errors.New("The configuration at '" + schemaPath + "' cannot be deleted or doesn't exist.") } // Delete the Directory object path := a.cfg.TablePath + "/" err = a.container.DeleteObjectSync(&v3io.DeleteObjectInput{Path: path}) - if err != nil && !ignoreErrors { + if err != nil && !deleteParams.IgnoreErrors { if !utils.IsNotExistsError(err) { return errors.Wrapf(err, "Failed to delete table object '%s'.", path) } @@ -302,6 +322,455 @@ func (a *V3ioAdapter) DeleteDB(deleteAll bool, ignoreErrors bool, fromTime int64 return nil } +func (a *V3ioAdapter) DeletePartitionsData(deleteParams *DeleteParams) error { + partitions := a.partitionMngr.PartsForRange(deleteParams.From, deleteParams.To, true) + var entirelyDeletedPartitions []*partmgr.DBPartition + + deleteWholePartition := deleteParams.DeleteAll || (deleteParams.Filter == "" && len(deleteParams.Metrics) == 0) + + fileToDeleteChan := make(chan v3io.Item, 1024) + getItemsTerminationChan := make(chan error, len(partitions)) + deleteTerminationChan := make(chan error, a.cfg.Workers) + numOfGetItemsRoutines := len(partitions) + if len(deleteParams.Metrics) > 0 { + numOfGetItemsRoutines = numOfGetItemsRoutines * len(deleteParams.Metrics) + } + goRoutinesNum := numOfGetItemsRoutines + a.cfg.Workers + onErrorTerminationChannel := make(chan struct{}, goRoutinesNum) + systemAttributesToFetch := []string{config.ObjectNameAttrName, config.MtimeSecsAttributeName, config.MtimeNSecsAttributeName, config.EncodingAttrName, config.MaxTimeAttrName} + var getItemsWorkers, getItemsTerminated, deletesTerminated int + + var getItemsWG sync.WaitGroup + getItemsErrorChan := make(chan error, numOfGetItemsRoutines) + + aggregates := a.GetSchema().PartitionSchemaInfo.Aggregates + hasServerSideAggregations := len(aggregates) != 1 || aggregates[0] != "" + + var aggrMask aggregate.AggrType + var err error + if hasServerSideAggregations { + aggrMask, _, err = aggregate.AggregatesFromStringListWithCount(aggregates) + if err != nil { + return err + } + } + + for i := 0; i < a.cfg.Workers; i++ { + go deleteObjectWorker(a.container, deleteParams, a.logger, + fileToDeleteChan, deleteTerminationChan, onErrorTerminationChannel, + aggrMask) + } + + for _, part := range partitions { + partitionEntirelyInRange := deleteParams.From <= part.GetStartTime() && deleteParams.To >= part.GetEndTime() + deleteEntirePartitionFolder := partitionEntirelyInRange && deleteWholePartition + + // Delete all files in partition folder and then delete the folder itself + if deleteEntirePartitionFolder { + a.logger.Info("Deleting entire partition '%s'.", part.GetTablePath()) + + getItemsWG.Add(1) + go deleteEntirePartition(a.logger, a.container, part.GetTablePath(), a.cfg.QryWorkers, + &getItemsWG, getItemsErrorChan) + + entirelyDeletedPartitions = append(entirelyDeletedPartitions, part) + // First get all items based on filter+metric+time range then delete what is necessary + } else { + a.logger.Info("Deleting partial partition '%s'.", part.GetTablePath()) + + start, end := deleteParams.From, deleteParams.To + + // Round the start and end times to the nearest aggregation buckets - to later on recalculate server side aggregations + if hasServerSideAggregations { + start = part.GetAggregationBucketStartTime(part.Time2Bucket(deleteParams.From)) + end = part.GetAggregationBucketEndTime(part.Time2Bucket(deleteParams.To)) + } + + var chunkAttributesToFetch []string + + // If we don't want to delete the entire object, fetch also the desired chunks to delete. + if !partitionEntirelyInRange { + chunkAttributesToFetch, _ = part.Range2Attrs("v", start, end) + } + + allAttributes := append(chunkAttributesToFetch, systemAttributesToFetch...) + if len(deleteParams.Metrics) == 0 { + getItemsWorkers++ + input := &v3io.GetItemsInput{Path: part.GetTablePath(), + AttributeNames: allAttributes, + Filter: deleteParams.Filter} + go getItemsWorker(a.logger, a.container, input, part, fileToDeleteChan, getItemsTerminationChan, onErrorTerminationChannel) + } else { + for _, metric := range deleteParams.Metrics { + for _, shardingKey := range part.GetShardingKeys(metric) { + getItemsWorkers++ + input := &v3io.GetItemsInput{Path: part.GetTablePath(), + AttributeNames: allAttributes, + Filter: deleteParams.Filter, + ShardingKey: shardingKey} + go getItemsWorker(a.logger, a.container, input, part, fileToDeleteChan, getItemsTerminationChan, onErrorTerminationChannel) + } + } + } + } + } + a.logger.Debug("issued %v getItems", getItemsWorkers) + + // Waiting fot deleting of full partitions + getItemsWG.Wait() + select { + case err = <-getItemsErrorChan: + // Signal all other goroutines to quite + for i := 0; i < goRoutinesNum; i++ { + onErrorTerminationChannel <- struct{}{} + } + return err + default: + } + + if getItemsWorkers != 0 { + for deletesTerminated < a.cfg.Workers { + select { + case err := <-getItemsTerminationChan: + a.logger.Debug("finished getItems worker, total finished: %v, error: %v", getItemsTerminated+1, err) + if err != nil { + // If requested to ignore non-existing tables do not return error. + if !(deleteParams.IgnoreErrors && utils.IsNotExistsOrConflictError(err)) { + for i := 0; i < goRoutinesNum; i++ { + onErrorTerminationChannel <- struct{}{} + } + return errors.Wrapf(err, "GetItems failed during recursive delete.") + } + } + getItemsTerminated++ + + if getItemsTerminated == getItemsWorkers { + close(fileToDeleteChan) + } + case err := <-deleteTerminationChan: + a.logger.Debug("finished delete worker, total finished: %v, err: %v", deletesTerminated+1, err) + if err != nil { + for i := 0; i < goRoutinesNum; i++ { + onErrorTerminationChannel <- struct{}{} + } + return errors.Wrapf(err, "Delete failed during recursive delete.") + } + deletesTerminated++ + } + } + } else { + close(fileToDeleteChan) + } + + a.logger.Debug("finished deleting data, removing partitions from schema") + err = a.partitionMngr.DeletePartitionsFromSchema(entirelyDeletedPartitions) + if err != nil { + return err + } + + return nil +} + +func deleteEntirePartition(logger logger.Logger, container v3io.Container, partitionPath string, workers int, + wg *sync.WaitGroup, errChannel chan<- error) { + defer wg.Done() + + err := utils.DeleteTable(logger, container, partitionPath, "", workers) + if err != nil { + errChannel <- errors.Wrapf(err, "Failed to delete partition '%s'.", partitionPath) + } + // Delete the Directory object + err = container.DeleteObjectSync(&v3io.DeleteObjectInput{Path: partitionPath}) + if err != nil { + errChannel <- errors.Wrapf(err, "Failed to delete partition folder '%s'.", partitionPath) + } +} + +func getItemsWorker(logger logger.Logger, container v3io.Container, input *v3io.GetItemsInput, partition *partmgr.DBPartition, + filesToDeleteChan chan<- v3io.Item, terminationChan chan<- error, onErrorTerminationChannel <-chan struct{}) { + for { + select { + case _ = <-onErrorTerminationChannel: + terminationChan <- nil + return + default: + } + + logger.Debug("going to getItems for partition '%v', input: %v", partition.GetTablePath(), *input) + resp, err := container.GetItemsSync(input) + if err != nil { + terminationChan <- err + return + } + resp.Release() + output := resp.Output.(*v3io.GetItemsOutput) + + for _, item := range output.Items { + item["partition"] = partition + + // In case we got error on delete while iterating getItems response + select { + case _ = <-onErrorTerminationChannel: + terminationChan <- nil + return + default: + } + + filesToDeleteChan <- item + } + if output.Last { + terminationChan <- nil + return + } + input.Marker = output.NextMarker + } +} + +func deleteObjectWorker(container v3io.Container, deleteParams *DeleteParams, logger logger.Logger, + filesToDeleteChannel <-chan v3io.Item, terminationChan chan<- error, onErrorTerminationChannel <-chan struct{}, + aggrMask aggregate.AggrType) { + for { + select { + case _ = <-onErrorTerminationChannel: + return + case itemToDelete, ok := <-filesToDeleteChannel: + if !ok { + terminationChan <- nil + return + } + + currentPartition := itemToDelete.GetField("partition").(*partmgr.DBPartition) + fileName, err := itemToDelete.GetFieldString(config.ObjectNameAttrName) + if err != nil { + terminationChan <- err + return + } + fullFileName := pathUtil.Join(currentPartition.GetTablePath(), fileName) + + // Delete whole object + if deleteParams.From <= currentPartition.GetStartTime() && + deleteParams.To >= currentPartition.GetEndTime() { + + logger.Debug("delete entire item '%v' ", fullFileName) + input := &v3io.DeleteObjectInput{Path: fullFileName} + err = container.DeleteObjectSync(input) + if err != nil && !utils.IsNotExistsOrConflictError(err) { + terminationChan <- err + return + } + // Delete partial object - specific chunks or sub-parts of chunks + } else { + mtimeSecs, err := itemToDelete.GetFieldInt(config.MtimeSecsAttributeName) + if err != nil { + terminationChan <- err + return + } + mtimeNSecs, err := itemToDelete.GetFieldInt(config.MtimeNSecsAttributeName) + if err != nil { + terminationChan <- err + return + } + + deleteUpdateExpression := strings.Builder{} + dataEncoding, err := getEncoding(itemToDelete) + if err != nil { + terminationChan <- err + return + } + + var aggregationsByBucket map[int]*aggregate.AggregatesList + if aggrMask != 0 { + aggregationsByBucket = make(map[int]*aggregate.AggregatesList) + aggrBuckets := currentPartition.Times2BucketRange(deleteParams.From, deleteParams.To) + for _, bucketID := range aggrBuckets { + aggregationsByBucket[bucketID] = aggregate.NewAggregatesList(aggrMask) + } + } + + var newMaxTime int64 = math.MaxInt64 + var numberOfExpressionsInUpdate int + for attributeName, value := range itemToDelete { + if strings.HasPrefix(attributeName, "_v") { + // Check whether the whole chunk attribute needed to be deleted or just part of it. + if currentPartition.IsChunkInRangeByAttr(attributeName, deleteParams.From, deleteParams.To) { + deleteUpdateExpression.WriteString("delete(") + deleteUpdateExpression.WriteString(attributeName) + deleteUpdateExpression.WriteString(");") + } else { + currentChunksMaxTime, err := generatePartialChunkDeleteExpression(logger, &deleteUpdateExpression, attributeName, + value.([]byte), dataEncoding, deleteParams, currentPartition, aggregationsByBucket) + if err != nil { + terminationChan <- err + return + } + + // We want to save the earliest max time possible + if currentChunksMaxTime < newMaxTime { + newMaxTime = currentChunksMaxTime + } + } + numberOfExpressionsInUpdate++ + } + } + + dbMaxTime := int64(itemToDelete.GetField(config.MaxTimeAttrName).(int)) + + // Update the partition's max time if needed. + if deleteParams.From < dbMaxTime && deleteParams.To >= dbMaxTime { + if deleteParams.From < newMaxTime { + newMaxTime = deleteParams.From + } + + deleteUpdateExpression.WriteString(fmt.Sprintf("%v=%v;", config.MaxTimeAttrName, newMaxTime)) + } + + if deleteUpdateExpression.Len() > 0 { + // If there are server aggregates, update the needed buckets + if aggrMask != 0 { + for bucket, aggregations := range aggregationsByBucket { + numberOfExpressionsInUpdate = numberOfExpressionsInUpdate + len(*aggregations) + + // Due to engine limitation, If we reached maximum number of expressions in an UpdateItem + // we need to break the update into chunks + // TODO: refactor in 2.8: + // in 2.8 there is a better way of doing it by uniting multiple update expressions into + // one expression by range in a form similar to `_v_sum[15...100]=0` + if numberOfExpressionsInUpdate < maxExpressionsInUpdateItem { + deleteUpdateExpression.WriteString(aggregations.SetExpr("v", bucket)) + } else { + exprStr := deleteUpdateExpression.String() + logger.Debug("delete item '%v' with expression '%v'", fullFileName, exprStr) + mtimeSecs, mtimeNSecs, err = sendUpdateItem(fullFileName, exprStr, mtimeSecs, mtimeNSecs, container) + if err != nil { + terminationChan <- err + return + } + + // Reset stuff for next update iteration + numberOfExpressionsInUpdate = 0 + deleteUpdateExpression.Reset() + } + } + } + + // If any expressions are left, save them + if deleteUpdateExpression.Len() > 0 { + exprStr := deleteUpdateExpression.String() + logger.Debug("delete item '%v' with expression '%v'", fullFileName, exprStr) + _, _, err = sendUpdateItem(fullFileName, exprStr, mtimeSecs, mtimeNSecs, container) + if err != nil { + terminationChan <- err + return + } + } + } + } + } + } +} + +func sendUpdateItem(path, expr string, mtimeSecs, mtimeNSecs int, container v3io.Container) (int, int, error) { + condition := fmt.Sprintf("%v == %v and %v == %v", + config.MtimeSecsAttributeName, mtimeSecs, + config.MtimeNSecsAttributeName, mtimeNSecs) + + input := &v3io.UpdateItemInput{Path: path, + Expression: &expr, + Condition: condition} + + response, err := container.UpdateItemSync(input) + if err != nil && !utils.IsNotExistsOrConflictError(err) { + returnError := err + if isFalseConditionError(err) { + returnError = errors.Wrapf(err, "Item '%v' was updated while deleting occurred. Please disable any ingestion and retry.", path) + } + return 0, 0, returnError + } + + output := response.Output.(*v3io.UpdateItemOutput) + return output.MtimeSecs, output.MtimeNSecs, nil +} + +func getEncoding(itemToDelete v3io.Item) (chunkenc.Encoding, error) { + var encoding chunkenc.Encoding + encodingStr, ok := itemToDelete.GetField(config.EncodingAttrName).(string) + // If we don't have the encoding attribute, use XOR as default. (for backwards compatibility) + if !ok { + encoding = chunkenc.EncXOR + } else { + intEncoding, err := strconv.Atoi(encodingStr) + if err != nil { + return 0, fmt.Errorf("error parsing encoding type of chunk, got: %v, error: %v", encodingStr, err) + } + encoding = chunkenc.Encoding(intEncoding) + } + + return encoding, nil +} + +func generatePartialChunkDeleteExpression(logger logger.Logger, expr *strings.Builder, + attributeName string, value []byte, encoding chunkenc.Encoding, deleteParams *DeleteParams, + partition *partmgr.DBPartition, aggregationsByBucket map[int]*aggregate.AggregatesList) (int64, error) { + chunk, err := chunkenc.FromData(logger, encoding, value, 0) + if err != nil { + return 0, err + } + + newChunk := chunkenc.NewChunk(logger, encoding == chunkenc.EncVariant) + appender, err := newChunk.Appender() + if err != nil { + return 0, err + } + + var currentMaxTime int64 + var remainingItemsCount int + iter := chunk.Iterator() + for iter.Next() { + var t int64 + var v interface{} + if encoding == chunkenc.EncXOR { + t, v = iter.At() + } else { + t, v = iter.AtString() + } + + // Append back only events that are not in the delete range + if t < deleteParams.From || t > deleteParams.To { + remainingItemsCount++ + appender.Append(t, v) + + // Calculate server-side aggregations + if aggregationsByBucket != nil { + currentAgg, ok := aggregationsByBucket[partition.Time2Bucket(t)] + // A chunk may contain more data then needed for the aggregations, if this is the case do not aggregate + if ok { + currentAgg.Aggregate(t, v) + } + } + + // Update current chunk's new max time + if t > currentMaxTime { + currentMaxTime = t + } + } + } + + if remainingItemsCount == 0 { + expr.WriteString("delete(") + expr.WriteString(attributeName) + expr.WriteString(");") + currentMaxTime, _ = partition.GetChunkStartTimeByAttr(attributeName) + } else { + bytes := appender.Chunk().Bytes() + val := base64.StdEncoding.EncodeToString(bytes) + + expr.WriteString(fmt.Sprintf("%s=blob('%s'); ", attributeName, val)) + } + + return currentMaxTime, nil + +} + // Return the number of items in a TSDB table func (a *V3ioAdapter) CountMetrics(part string) (int, error) { count := 0 @@ -360,3 +829,16 @@ type Appender interface { Rollback() error Close() } + +// Check if the current error was caused specifically because the condition was evaluated to false. +func isFalseConditionError(err error) bool { + errString := err.Error() + + if strings.Count(errString, errorCodeString) == 2 && + strings.Contains(errString, falseConditionOuterErrorCode) && + strings.Contains(errString, falseConditionInnerErrorCode) { + return true + } + + return false +} diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/utils/misc.go b/vendor/github.com/v3io/v3io-tsdb/pkg/utils/misc.go index e7f32bbb1a9..5bff372f4f5 100644 --- a/vendor/github.com/v3io/v3io-tsdb/pkg/utils/misc.go +++ b/vendor/github.com/v3io/v3io-tsdb/pkg/utils/misc.go @@ -55,3 +55,17 @@ func IsFalseConditionError(err error) bool { return false } + +func IsNotExistsOrConflictError(err error) bool { + errorWithStatusCode, ok := err.(v3ioerrors.ErrorWithStatusCode) + if !ok { + // error of different type + return false + } + statusCode := errorWithStatusCode.StatusCode() + // Ignore 404s and 409s + if statusCode == http.StatusNotFound || statusCode == http.StatusConflict { + return true + } + return false +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 1fb5bfb408b..42c5fe9cd93 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -298,15 +298,15 @@ github.com/shurcooL/vfsgen github.com/stretchr/testify/require github.com/stretchr/testify/suite github.com/stretchr/testify/assert -# github.com/v3io/frames v0.6.8-v0.9.11 => github.com/v3io/frames v0.6.11-v0.9.16 +# github.com/v3io/frames v0.6.8-v0.9.11 => github.com/v3io/frames v0.6.11-v0.9.17 github.com/v3io/frames github.com/v3io/frames/pb -# github.com/v3io/v3io-go v0.1.5-0.20200224125003-964a745e51aa +# github.com/v3io/v3io-go v0.1.5-0.20200301152134-6880d30985de github.com/v3io/v3io-go/pkg/dataplane github.com/v3io/v3io-go/pkg/errors github.com/v3io/v3io-go/pkg/dataplane/http github.com/v3io/v3io-go/pkg/dataplane/schemas/node/common -# github.com/v3io/v3io-tsdb v0.9.16 => github.com/v3io/v3io-tsdb v0.9.16 +# github.com/v3io/v3io-tsdb v0.9.17 => github.com/v3io/v3io-tsdb v0.9.17 github.com/v3io/v3io-tsdb/pkg/aggregate github.com/v3io/v3io-tsdb/pkg/appender github.com/v3io/v3io-tsdb/pkg/config