Skip to content

Commit

Permalink
refactor(ynabber): channels for fan out to writers
Browse files Browse the repository at this point in the history
Use channels to batch transactions and fan out to writers. This is
prep work for allowing multiple "pipelines" of readers and writers and
potentially multiple instances of Ynabber running concurrently.

commit-id:d118ecdf
  • Loading branch information
martinohansen committed Jan 3, 2025
1 parent 2146e86 commit b74b91b
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 43 deletions.
49 changes: 6 additions & 43 deletions cmd/ynabber/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package main

import (
"fmt"
"log"
"log/slog"
"os"
"time"

"github.com/carlmjohnson/versioninfo"
"github.com/kelseyhightower/envconfig"
Expand Down Expand Up @@ -38,61 +36,26 @@ func main() {
setupLogging(cfg.Debug)
slog.Info("starting...", "version", versioninfo.Short())

ynabber := ynabber.Ynabber{}
y := ynabber.NewYnabber(&cfg)
for _, reader := range cfg.Readers {
switch reader {
case "nordigen":
ynabber.Readers = append(ynabber.Readers, nordigen.NewReader(&cfg))
y.Readers = append(y.Readers, nordigen.NewReader(&cfg))
default:
log.Fatalf("Unknown reader: %s", reader)
}
}
for _, writer := range cfg.Writers {
switch writer {
case "ynab":
ynabber.Writers = append(ynabber.Writers, ynab.NewWriter(&cfg))
y.Writers = append(y.Writers, ynab.NewWriter(&cfg))
case "json":
ynabber.Writers = append(ynabber.Writers, json.Writer{})
y.Writers = append(y.Writers, json.Writer{})
default:
log.Fatalf("Unknown writer: %s", writer)
}
}

for {
start := time.Now()
err = run(ynabber)
if err != nil {
panic(err)
} else {
slog.Info("run succeeded", "in", time.Since(start))
if cfg.Interval > 0 {
slog.Info("waiting for next run", "in", cfg.Interval)
time.Sleep(cfg.Interval)
} else {
os.Exit(0)
}
}
}
}

func run(y ynabber.Ynabber) error {
var transactions []ynabber.Transaction

// Read transactions from all readers
for _, reader := range y.Readers {
t, err := reader.Bulk()
if err != nil {
return fmt.Errorf("reading: %w", err)
}
transactions = append(transactions, t...)
}

// Write transactions to all writers
for _, writer := range y.Writers {
err := writer.Bulk(transactions)
if err != nil {
return fmt.Errorf("writing: %w", err)
}
}
return nil
// Run Ynabber
y.Run()
}
71 changes: 71 additions & 0 deletions ynabber.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
package ynabber

import (
"log/slog"
"strconv"
"time"
)

type Ynabber struct {
Readers []Reader
Writers []Writer

config *Config
logger slog.Logger
}

// NewYnabber creates a new Ynabber instance
func NewYnabber(config *Config) *Ynabber {
return &Ynabber{
config: config,
logger: *slog.Default(),
}
}

type Reader interface {
Expand Down Expand Up @@ -53,3 +65,62 @@ func (m Milliunits) String() string {
func MilliunitsFromAmount(amount float64) Milliunits {
return Milliunits(amount * 1000)
}

// Run starts Ynabber by reading transactions from all readers into a channel to
// fan out to all writers
func (y *Ynabber) Run() {
batches := make(chan []Transaction)

// Create a channel for each writer and fan out transactions to each one
channels := make([]chan []Transaction, len(y.Writers))
for c := range channels {
channels[c] = make(chan []Transaction)
}
go func() {
for batch := range batches {
for _, c := range channels {
c <- batch
}
}
}()

for c, writer := range y.Writers {
go func(writer Writer, batches <-chan []Transaction) {
for batch := range batches {
err := writer.Bulk(batch)
if err != nil {
y.logger.Error("writing", "error", err, "writer", writer)
}
}
}(writer, channels[c])
}

for _, r := range y.Readers {
go func(reader Reader) {
for {
start := time.Now()
batch, err := reader.Bulk()
if err != nil {
y.logger.Error("reading", "error", err, "reader", reader)
continue
}
batches <- batch
y.logger.Info("run succeeded", "in", time.Since(start))

// TODO(Martin): The interval should be controlled by the
// reader. We are only pausing the entire reader goroutine
// because thats how the config option is implemented now.
// Eventually we should move this option into the reader
// allowing for multiple readers with different intervals.
if y.config.Interval > 0 {
y.logger.Info("waiting for next run", "in", y.config.Interval)
time.Sleep(y.config.Interval)
} else {
break
}
}
}(r)
}

select {}
}

0 comments on commit b74b91b

Please sign in to comment.