diff --git a/cmd/ynabber/main.go b/cmd/ynabber/main.go index 2919b13..1c0f133 100644 --- a/cmd/ynabber/main.go +++ b/cmd/ynabber/main.go @@ -1,11 +1,9 @@ package main import ( - "fmt" "log" "log/slog" "os" - "time" "github.com/carlmjohnson/versioninfo" "github.com/kelseyhightower/envconfig" @@ -38,11 +36,11 @@ func main() { setupLogging(cfg.Debug) slog.Info("starting...", "version", versioninfo.Short()) - ynabber := ynabber.Ynabber{} + y := ynabber.NewYnabber(&cfg) for _, reader := range cfg.Readers { switch reader { case "nordigen": - ynabber.Readers = append(ynabber.Readers, nordigen.NewReader(&cfg)) + y.Readers = append(y.Readers, nordigen.NewReader(&cfg)) default: log.Fatalf("Unknown reader: %s", reader) } @@ -50,49 +48,14 @@ func main() { for _, writer := range cfg.Writers { switch writer { case "ynab": - ynabber.Writers = append(ynabber.Writers, ynab.NewWriter(&cfg)) + y.Writers = append(y.Writers, ynab.NewWriter(&cfg)) case "json": - ynabber.Writers = append(ynabber.Writers, json.Writer{}) + y.Writers = append(y.Writers, json.Writer{}) default: log.Fatalf("Unknown writer: %s", writer) } } - for { - start := time.Now() - err = run(ynabber) - if err != nil { - panic(err) - } else { - slog.Info("run succeeded", "in", time.Since(start)) - if cfg.Interval > 0 { - slog.Info("waiting for next run", "in", cfg.Interval) - time.Sleep(cfg.Interval) - } else { - os.Exit(0) - } - } - } -} - -func run(y ynabber.Ynabber) error { - var transactions []ynabber.Transaction - - // Read transactions from all readers - for _, reader := range y.Readers { - t, err := reader.Bulk() - if err != nil { - return fmt.Errorf("reading: %w", err) - } - transactions = append(transactions, t...) - } - - // Write transactions to all writers - for _, writer := range y.Writers { - err := writer.Bulk(transactions) - if err != nil { - return fmt.Errorf("writing: %w", err) - } - } - return nil + // Run Ynabber + y.Run() } diff --git a/ynabber.go b/ynabber.go index 8ee55de..63bbe20 100644 --- a/ynabber.go +++ b/ynabber.go @@ -1,6 +1,7 @@ package ynabber import ( + "log/slog" "strconv" "time" ) @@ -8,6 +9,17 @@ import ( type Ynabber struct { Readers []Reader Writers []Writer + + config *Config + logger slog.Logger +} + +// NewYnabber creates a new Ynabber instance +func NewYnabber(config *Config) *Ynabber { + return &Ynabber{ + config: config, + logger: *slog.Default(), + } } type Reader interface { @@ -53,3 +65,62 @@ func (m Milliunits) String() string { func MilliunitsFromAmount(amount float64) Milliunits { return Milliunits(amount * 1000) } + +// Run starts Ynabber by reading transactions from all readers into a channel to +// fan out to all writers +func (y *Ynabber) Run() { + batches := make(chan []Transaction) + + // Create a channel for each writer and fan out transactions to each one + channels := make([]chan []Transaction, len(y.Writers)) + for c := range channels { + channels[c] = make(chan []Transaction) + } + go func() { + for batch := range batches { + for _, c := range channels { + c <- batch + } + } + }() + + for c, writer := range y.Writers { + go func(writer Writer, batches <-chan []Transaction) { + for batch := range batches { + err := writer.Bulk(batch) + if err != nil { + y.logger.Error("writing", "error", err, "writer", writer) + } + } + }(writer, channels[c]) + } + + for _, r := range y.Readers { + go func(reader Reader) { + for { + start := time.Now() + batch, err := reader.Bulk() + if err != nil { + y.logger.Error("reading", "error", err, "reader", reader) + continue + } + batches <- batch + y.logger.Info("run succeeded", "in", time.Since(start)) + + // TODO(Martin): The interval should be controlled by the + // reader. We are only pausing the entire reader goroutine + // because thats how the config option is implemented now. + // Eventually we should move this option into the reader + // allowing for multiple readers with different intervals. + if y.config.Interval > 0 { + y.logger.Info("waiting for next run", "in", y.config.Interval) + time.Sleep(y.config.Interval) + } else { + break + } + } + }(r) + } + + select {} +}