From f2423af67b3239d9e054d7e5642a0d6edfcabb04 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Thu, 19 Sep 2024 15:38:34 +0200 Subject: [PATCH] feat: Move out cursor handling --- cmd/influx_tools/parquet/batcher.go | 122 ++++----------- cmd/influx_tools/parquet/cursors.go | 213 +++++++++++++++++++++++++++ cmd/influx_tools/parquet/exporter.go | 2 +- 3 files changed, 239 insertions(+), 98 deletions(-) create mode 100644 cmd/influx_tools/parquet/cursors.go diff --git a/cmd/influx_tools/parquet/batcher.go b/cmd/influx_tools/parquet/batcher.go index e3bfdfcfec2..e91f300fb7a 100644 --- a/cmd/influx_tools/parquet/batcher.go +++ b/cmd/influx_tools/parquet/batcher.go @@ -5,10 +5,11 @@ import ( "fmt" "sort" + "go.uber.org/zap" + "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" - "go.uber.org/zap" ) type row struct { @@ -103,112 +104,39 @@ func (b *batcher) next(ctx context.Context) ([]row, error) { converter = c } fieldEnd := models.MaxNanoTime - switch c := cursor.(type) { - case tsdb.IntegerArrayCursor: - values := c.Next() - for i, t := range values.Timestamps { - v, err := converter(values.Values[i]) - if err != nil { - b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err) - continue - } - - if _, found := data[s.key][t]; !found { - data[s.key][t] = row{ - timestamp: t, - tags: tags, - fields: make(map[string]interface{}), - } - } - data[s.key][t].fields[fname] = v - fieldEnd = t - } - case tsdb.FloatArrayCursor: - values := c.Next() - for i, t := range values.Timestamps { - v, err := converter(values.Values[i]) - if err != nil { - b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err) - continue - } - - if _, found := data[s.key][t]; !found { - data[s.key][t] = row{ - timestamp: t, - tags: tags, - fields: make(map[string]interface{}), - } - } + c, err := newValueCursor(cursor) + if err != nil { + return nil, fmt.Errorf("creating value cursor failed: %w", err) + } - data[s.key][t].fields[fname] = v - fieldEnd = t + for { + // Check if we do still have data + timestamp, ok := c.peek() + if !ok { + break } - case tsdb.UnsignedArrayCursor: - values := c.Next() - for i, t := range values.Timestamps { - v, err := converter(values.Values[i]) - if err != nil { - b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err) - continue - } - - if _, found := data[s.key][t]; !found { - data[s.key][t] = row{ - timestamp: t, - tags: tags, - fields: make(map[string]interface{}), - } - } - data[s.key][t].fields[fname] = v - fieldEnd = t + timestamp, value := c.next() + v, err := converter(value) + if err != nil { + b.logger.Errorf("converting %v of field %q failed: %v", value, field, err) + continue } - case tsdb.BooleanArrayCursor: - values := c.Next() - for i, t := range values.Timestamps { - v, err := converter(values.Values[i]) - if err != nil { - b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err) - continue - } - if _, found := data[s.key][t]; !found { - data[s.key][t] = row{ - timestamp: t, - tags: tags, - fields: make(map[string]interface{}), - } + if _, found := data[s.key][timestamp]; !found { + data[s.key][timestamp] = row{ + timestamp: timestamp, + tags: tags, + fields: make(map[string]interface{}), } - - data[s.key][t].fields[fname] = v - fieldEnd = t } - case tsdb.StringArrayCursor: - values := c.Next() - for i, t := range values.Timestamps { - v, err := converter(values.Values[i]) - if err != nil { - b.logger.Errorf("converting %v of field %q failed: %v", values.Values[i], field, err) - continue - } - - if _, found := data[s.key][t]; !found { - data[s.key][t] = row{ - timestamp: t, - tags: tags, - fields: make(map[string]interface{}), - } - } - data[s.key][t].fields[fname] = v - fieldEnd = t - } - default: - cursor.Close() - return nil, fmt.Errorf("unexpected type %T", cursor) + data[s.key][timestamp].fields[fname] = v + fieldEnd = timestamp } - cursor.Close() + + c.close() end = min(end, fieldEnd) } } diff --git a/cmd/influx_tools/parquet/cursors.go b/cmd/influx_tools/parquet/cursors.go new file mode 100644 index 00000000000..cb871d12695 --- /dev/null +++ b/cmd/influx_tools/parquet/cursors.go @@ -0,0 +1,213 @@ +package parquet + +import ( + "fmt" + + "github.com/influxdata/influxdb/tsdb" + "github.com/influxdata/influxdb/tsdb/cursors" +) + +type valueCursor interface { + next() (int64, interface{}) + peek() (int64, bool) + close() +} + +func newValueCursor(cursor cursors.Cursor) (valueCursor, error) { + switch c := cursor.(type) { + case tsdb.FloatArrayCursor: + return &floatValueCursor{cur: c}, nil + case tsdb.UnsignedArrayCursor: + return &uintValueCursor{cur: c}, nil + case tsdb.IntegerArrayCursor: + return &intValueCursor{cur: c}, nil + case tsdb.BooleanArrayCursor: + return &boolValueCursor{cur: c}, nil + case tsdb.StringArrayCursor: + return &stringValueCursor{cur: c}, nil + } + return nil, fmt.Errorf("unexpected type %T", cursor) + +} + +type floatValueCursor struct { + cur tsdb.FloatArrayCursor + arr *cursors.FloatArray + idx int +} + +func (c *floatValueCursor) next() (int64, interface{}) { + // Initialize the array on first call + if c.arr == nil { + c.idx = 0 + c.arr = c.cur.Next() + } + // Indicate no elements early + if c.arr.Len() == 0 || c.idx >= c.arr.Len() { + return 0, nil + } + + defer func() { c.idx++ }() + return c.arr.Timestamps[c.idx], c.arr.Values[c.idx] +} + +func (c *floatValueCursor) peek() (int64, bool) { + // Initialize the array on first call + if c.arr == nil { + c.idx = 0 + c.arr = c.cur.Next() + } + // Indicate no elements early + if c.arr.Len() == 0 || c.idx >= c.arr.Len() { + return 0, false + } + return c.arr.Timestamps[c.idx], true +} + +func (c *floatValueCursor) close() { + c.cur.Close() +} + +type uintValueCursor struct { + cur tsdb.UnsignedArrayCursor + arr *cursors.UnsignedArray + idx int +} + +func (c *uintValueCursor) next() (int64, interface{}) { + // Initialize the array on first call + if c.arr == nil { + c.arr = c.cur.Next() + } + // Indicate no elements early + if c.arr.Len() == 0 || c.idx >= c.arr.Len() { + return 0, nil + } + defer func() { c.idx++ }() + return c.arr.Timestamps[c.idx], c.arr.Values[c.idx] +} + +func (c *uintValueCursor) peek() (int64, bool) { + // Initialize the array on first call + if c.arr == nil { + c.idx = 0 + c.arr = c.cur.Next() + } + // Indicate no elements early + if c.arr.Len() == 0 || c.idx >= c.arr.Len() { + return 0, false + } + return c.arr.Timestamps[c.idx], true +} + +func (c *uintValueCursor) close() { + c.cur.Close() +} + +type intValueCursor struct { + cur tsdb.IntegerArrayCursor + arr *cursors.IntegerArray + idx int +} + +func (c *intValueCursor) next() (int64, interface{}) { + // Initialize the array on first call + if c.arr == nil { + c.arr = c.cur.Next() + } + // Indicate no elements early + if c.arr.Len() == 0 || c.idx >= c.arr.Len() { + return 0, nil + } + defer func() { c.idx++ }() + return c.arr.Timestamps[c.idx], c.arr.Values[c.idx] +} + +func (c *intValueCursor) peek() (int64, bool) { + // Initialize the array on first call + if c.arr == nil { + c.idx = 0 + c.arr = c.cur.Next() + } + // Indicate no elements early + if c.arr.Len() == 0 || c.idx >= c.arr.Len() { + return 0, false + } + return c.arr.Timestamps[c.idx], true +} + +func (c *intValueCursor) close() { + c.cur.Close() +} + +type boolValueCursor struct { + cur tsdb.BooleanArrayCursor + arr *cursors.BooleanArray + idx int +} + +func (c *boolValueCursor) next() (int64, interface{}) { + // Initialize the array on first call + if c.arr == nil { + c.arr = c.cur.Next() + } + // Indicate no elements early + if c.arr.Len() == 0 || c.idx >= c.arr.Len() { + return 0, nil + } + defer func() { c.idx++ }() + return c.arr.Timestamps[c.idx], c.arr.Values[c.idx] +} + +func (c *boolValueCursor) peek() (int64, bool) { + // Initialize the array on first call + if c.arr == nil { + c.idx = 0 + c.arr = c.cur.Next() + } + // Indicate no elements early + if c.arr.Len() == 0 || c.idx >= c.arr.Len() { + return 0, false + } + return c.arr.Timestamps[c.idx], true +} + +func (c *boolValueCursor) close() { + c.cur.Close() +} + +type stringValueCursor struct { + cur tsdb.StringArrayCursor + arr *cursors.StringArray + idx int +} + +func (c *stringValueCursor) next() (int64, interface{}) { + // Initialize the array on first call + if c.arr == nil { + c.arr = c.cur.Next() + } + // Indicate no elements early + if c.arr.Len() == 0 || c.idx >= c.arr.Len() { + return 0, nil + } + defer func() { c.idx++ }() + return c.arr.Timestamps[c.idx], c.arr.Values[c.idx] +} + +func (c *stringValueCursor) peek() (int64, bool) { + // Initialize the array on first call + if c.arr == nil { + c.idx = 0 + c.arr = c.cur.Next() + } + // Indicate no elements early + if c.arr.Len() == 0 || c.idx >= c.arr.Len() { + return 0, false + } + return c.arr.Timestamps[c.idx], true +} + +func (c *stringValueCursor) close() { + c.cur.Close() +} diff --git a/cmd/influx_tools/parquet/exporter.go b/cmd/influx_tools/parquet/exporter.go index cf33b2159bc..91f1cf06c27 100644 --- a/cmd/influx_tools/parquet/exporter.go +++ b/cmd/influx_tools/parquet/exporter.go @@ -457,7 +457,7 @@ func (e *exporter) exportMeasurement(ctx context.Context, shard *tsdb.Shard, mea record := e.convertData(rows, builder, creator.tags, creator.fieldKeys) // Write data - if err := writer.WriteBuffered(record); err != nil { + if err := writer.Write(record); err != nil { return fmt.Errorf("writing parquet file %q failed: %w", filename, err) } e.logger.Infof(" exported %d rows in %v", len(rows), time.Since(last))