Skip to content

Commit

Permalink
feat(influx_tools): Add export to parquet files
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Sep 18, 2024
1 parent 8eaa24d commit 46aef0b
Show file tree
Hide file tree
Showing 8 changed files with 1,390 additions and 103 deletions.
6 changes: 6 additions & 0 deletions cmd/influx_tools/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
geninit "github.com/influxdata/influxdb/cmd/influx_tools/generate/init"
"github.com/influxdata/influxdb/cmd/influx_tools/help"
"github.com/influxdata/influxdb/cmd/influx_tools/importer"
"github.com/influxdata/influxdb/cmd/influx_tools/parquet"
"github.com/influxdata/influxdb/cmd/influx_tools/server"
"github.com/influxdata/influxdb/cmd/influxd/run"
"github.com/influxdata/influxdb/services/meta"
Expand Down Expand Up @@ -67,6 +68,11 @@ func (m *Main) Run(args ...string) error {
if err := c.Run(args); err != nil {
return fmt.Errorf("export failed: %s", err)
}
case "export-parquet":
c := parquet.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
return fmt.Errorf("export failed: %s", err)
}
case "import":
c := importer.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
Expand Down
236 changes: 236 additions & 0 deletions cmd/influx_tools/parquet/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package parquet

import (
"context"
"fmt"
"os"
"sort"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
)

type row struct {
timestamp int64
tags map[string]string
fields map[string]interface{}
}

type batcher struct {
measurement []byte
shard *tsdb.Shard

converter map[string]func(interface{}) (interface{}, error)
nameMapping map[string]string

series []seriesEntry
start int64
}

func newBatcher(
ctx context.Context,
shard *tsdb.Shard,
measurement string,
series []seriesEntry,
converter map[string]func(interface{}) (interface{}, error),
nameMapping map[string]string,
) (*batcher, error) {
seriesCursor, err := shard.CreateSeriesCursor(
ctx,
tsdb.SeriesCursorRequest{},
influxql.MustParseExpr("_name = '"+measurement+"'"),
)
if err != nil {
return nil, fmt.Errorf("getting series cursor failed: %w", err)
}
defer seriesCursor.Close()

return &batcher{
measurement: []byte(measurement),
shard: shard,
series: series,
converter: converter,
nameMapping: nameMapping,
start: models.MinNanoTime,
}, nil
}

func (b *batcher) reset() {
b.start = models.MinNanoTime
}

func (b *batcher) next(ctx context.Context) ([]row, error) {
// Iterate over the series and fields and accumulate the data row-wise
iter, err := b.shard.CreateCursorIterator(ctx)
if err != nil {
return nil, fmt.Errorf("getting cursor iterator for %q failed: %w", string(b.measurement), err)
}

data := make(map[string]map[int64]row)
end := models.MaxNanoTime
for _, s := range b.series {
data[s.key] = make(map[int64]row)
tags := make(map[string]string, len(s.tags))
for _, t := range s.tags {
tags[string(t.Key)] = string(t.Value)
}
for field := range s.fields {
cursor, err := iter.Next(ctx,
&tsdb.CursorRequest{
Name: b.measurement,
Tags: s.tags,
Field: field,
Ascending: true,
StartTime: b.start,
EndTime: models.MaxNanoTime,
},
)
if err != nil {
return nil, fmt.Errorf("getting cursor for %s-%s failed: %w", s.key, field, err)
}
if cursor == nil {
continue
}

// Prepare mappings
fname := field
if n, found := b.nameMapping[field]; found {
fname = n
}
converter := identity
if c, found := b.converter[field]; found {
converter = c
}
fieldEnd := models.MaxNanoTime
switch c := cursor.(type) {
case tsdb.IntegerArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
fmt.Fprintf(os.Stderr, "converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
case tsdb.FloatArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
fmt.Fprintf(os.Stderr, "converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
case tsdb.UnsignedArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
fmt.Fprintf(os.Stderr, "converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
case tsdb.BooleanArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
fmt.Fprintf(os.Stderr, "converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
case tsdb.StringArrayCursor:
values := c.Next()
for i, t := range values.Timestamps {
v, err := converter(values.Values[i])
if err != nil {
fmt.Fprintf(os.Stderr, "converting %v of field %q failed: %v", values.Values[i], field, err)
continue
}

if _, found := data[s.key][t]; !found {
data[s.key][t] = row{
timestamp: t,
tags: tags,
fields: make(map[string]interface{}),
}
}

data[s.key][t].fields[fname] = v
fieldEnd = t
}
default:
cursor.Close()
panic(fmt.Errorf("unexpected type %T", cursor))
}
cursor.Close()
end = min(end, fieldEnd)
}
}
if len(data) == 0 {
return nil, nil
}

// Extract the rows ordered by timestamp
var rows []row
for _, tmap := range data {
for _, r := range tmap {
rows = append(rows, r)
}
}
sort.Slice(rows, func(i, j int) bool { return rows[i].timestamp < rows[j].timestamp })

// Only include rows that are before the end-timestamp to avoid duplicate
// or incomplete entries due to not iterating through all data
n := sort.Search(len(rows), func(i int) bool { return rows[i].timestamp > end })

// Remember the earliest datum to use this for the next batch excluding the entry itself
b.start = end + 1

return rows[:n], nil
}
101 changes: 101 additions & 0 deletions cmd/influx_tools/parquet/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package parquet

import (
"context"
"errors"
"flag"
"fmt"
"io"
"os"

"go.uber.org/zap"

"github.com/influxdata/influxdb/cmd/influx_tools/server"
)

// Command represents the program execution for "store query".
type Command struct {
// Standard input/output, overridden for testing.
Stderr io.Writer
Logger *zap.Logger

server server.Interface
}

// NewCommand returns a new instance of the export Command.
func NewCommand(server server.Interface) *Command {
return &Command{
Stderr: os.Stderr,
server: server,
}
}

// Run executes the export command using the specified args.
func (cmd *Command) Run(args []string) (err error) {
var (
configPath string
database string
rp string
measurements string
typeResolutions string
nameResolutions string
output string
dryRun bool
)

cwd, err := os.Getwd()
if err != nil {
return fmt.Errorf("getting current working directory failed: %w", err)
}

flags := flag.NewFlagSet("export", flag.ContinueOnError)
flags.StringVar(&configPath, "config", "", "Config file of the InfluxDB v1 instance")
flags.StringVar(&database, "database", "", "Database to export")
flags.StringVar(&rp, "rp", "", "Retention policy in the database to export")
flags.StringVar(&measurements, "measurements", "*", "Comma-separated list of measurements to export")
flags.StringVar(&typeResolutions, "resolve-types", "", "Comma-separated list of field type resolutions in the form <measurements>.<field>=<type>")
flags.StringVar(&nameResolutions, "resolve-names", "", "Comma-separated list of field renamings in the form <measurements>.<field>=<new name>")
flags.StringVar(&output, "output", cwd, "Output directory for exported parquet files")
flags.BoolVar(&dryRun, "dry-run", false, "Print plan and exit")

if err := flags.Parse(args); err != nil {
return err
}

if database == "" {
return errors.New("database is required")
}

if err := cmd.server.Open(configPath); err != nil {
return err
}
defer cmd.server.Close()

cfg := &config{
Database: database,
RP: rp,
Measurements: measurements,
TypeResolutions: typeResolutions,
NameResolutions: nameResolutions,
Output: output,
Stderr: cmd.Stderr,
}
exp, err := newExporter(cmd.server, cfg)
if err != nil {
return err
}

ctx := context.Background()
if err := exp.open(ctx); err != nil {
return err
}
defer exp.close()

exp.printPlan()

if dryRun {
return nil
}

return exp.export(ctx)
}
Loading

0 comments on commit 46aef0b

Please sign in to comment.