Skip to content

Commit

Permalink
Pipelining followups (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
tzaffi authored Aug 31, 2023
1 parent 1ee2e86 commit 39c6941
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 26 deletions.
10 changes: 5 additions & 5 deletions conduit/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type pluginOutput interface {
pluginInput | empty
}

// Retries is a wrapper for retrying a function call f() with a cancellation context,
// retries is a wrapper for retrying a function call f() with a cancellation context,
// a delay and a max retry count. It attempts to call the wrapped function at least once
// and only after the first attempt will pay attention to a context cancellation.
// This can allow the pipeline to receive a cancellation and guarantee attempting to finish
Expand All @@ -45,7 +45,7 @@ type pluginOutput interface {
// - when p.cfg.retryCount > 0, the error will be a join of all the errors encountered during the retries
// - when p.cfg.retryCount == 0, the error will be the last error encountered
// - the returned duration dur is the total time spent in the function, including retries
func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) {
func retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) {
start := time.Now()

for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ {
Expand Down Expand Up @@ -74,9 +74,9 @@ func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipe
return
}

// RetriesNoOutput applies the same logic as Retries, but for functions that return no output.
func RetriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) {
_, d, err := Retries(func(x X) (empty, error) {
// retriesNoOutput applies the same logic as Retries, but for functions that return no output.
func retriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) {
_, d, err := retries(func(x X) (empty, error) {
return empty{}, f(x)
}, a, p, msg)
return d, err
Expand Down
16 changes: 8 additions & 8 deletions conduit/pipeline/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func TestRetries(t *testing.T) {
for _, tc := range cases {
tc := tc

// run cases for Retries()
t.Run("Retries() "+tc.name, func(t *testing.T) {
// run cases for retries()
t.Run("retries() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
Expand All @@ -127,7 +127,7 @@ func TestRetries(t *testing.T) {
yChan := make(chan uint64)
errChan := make(chan error)
go func() {
y, _, err := Retries(succeedAfter, 0, p, "test")
y, _, err := retries(succeedAfter, 0, p, "test")
yChan <- y
errChan <- err
}()
Expand All @@ -144,7 +144,7 @@ func TestRetries(t *testing.T) {
return
}

y, _, err := Retries(succeedAfter, 0, p, "test")
y, _, err := retries(succeedAfter, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)

Expand All @@ -163,8 +163,8 @@ func TestRetries(t *testing.T) {
}
})

// run cases for RetriesNoOutput()
t.Run("RetriesNoOutput() "+tc.name, func(t *testing.T) {
// run cases for retriesNoOutput()
t.Run("retriesNoOutput() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
Expand All @@ -183,7 +183,7 @@ func TestRetries(t *testing.T) {

errChan := make(chan error)
go func() {
_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
_, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test")
errChan <- err
}()
time.Sleep(5 * time.Millisecond)
Expand All @@ -197,7 +197,7 @@ func TestRetries(t *testing.T) {
return
}

_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
_, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)
} else { // retryCount > 0 so doesn't retry forever
Expand Down
15 changes: 6 additions & 9 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (p *pipelineImpl) importerHandler(importer importers.Importer, roundChan <-
totalSelectWait += waitTime
p.logger.Tracef("importer handler waited %dms to receive round %d", waitTime.Milliseconds(), rnd)

blkData, importTime, lastError := Retries(importer.GetBlock, rnd, p, importer.Metadata().Name)
blkData, importTime, lastError := retries(importer.GetBlock, rnd, p, importer.Metadata().Name)
if lastError != nil {
p.cancelWithProblem(fmt.Errorf("importer %s handler (%w): failed to import round %d after %dms: %w", importer.Metadata().Name, errImporterCause, rnd, importTime.Milliseconds(), lastError))
return
Expand Down Expand Up @@ -533,7 +533,7 @@ func (p *pipelineImpl) processorHandler(idx int, proc processors.Processor, blkI

var procTime time.Duration
var lastError error
blk, procTime, lastError = Retries(proc.Process, blk, p, proc.Metadata().Name)
blk, procTime, lastError = retries(proc.Process, blk, p, proc.Metadata().Name)
if lastError != nil {
p.cancelWithProblem(fmt.Errorf("processor[%d] %s handler (%w): failed to process round %d after %dms: %w", idx, proc.Metadata().Name, errProcessorCause, lastRnd, procTime.Milliseconds(), lastError))
return
Expand Down Expand Up @@ -598,7 +598,7 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug
}

var exportTime time.Duration
exportTime, lastError = RetriesNoOutput(exporter.Receive, blk, p, eName)
exportTime, lastError = retriesNoOutput(exporter.Receive, blk, p, eName)
if lastError != nil {
lastError = fmt.Errorf("aborting after failing to export round %d: %w", lastRound, lastError)
return
Expand Down Expand Up @@ -640,16 +640,15 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug
// WARNING: removing/re-log-levelling the following will BREAK:
// - the E2E test (Search for "Pipeline round" in subslurp.py)
// - the internal tools logstats collector (See func ConduitCollector in logstats.go of internal-tools repo)
p.logger.Infof(logstatsE2Elog(nextRound, lastRound, len(blk.Payset), exportTime))
p.logger.Infof(logstatsE2Elog(lastRound, len(blk.Payset), exportTime))
}
}
}()
}

func logstatsE2Elog(nextRound, lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string {
func logstatsE2Elog(lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string {
return fmt.Sprintf(
"UPDATED Pipeline NextRound=%d. FINISHED Pipeline round r=%d (%d txn) exported in %s",
nextRound,
"FINISHED Pipeline round r=%d (%d txn) exported in %s",
lastRound,
topLevelTxnCount,
exportTime,
Expand Down Expand Up @@ -696,8 +695,6 @@ func (p *pipelineImpl) Start() {
}
}
}(p.pipelineMetadata.NextRound)

<-p.ctx.Done()
}

func (p *pipelineImpl) Wait() {
Expand Down
5 changes: 2 additions & 3 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,13 +990,12 @@ func TestMetrics(t *testing.T) {
}

func TestLogStatsE2Elog(t *testing.T) {
nextRound := uint64(1337)
round := uint64(42)
numTxns := 13
duration := 12345600 * time.Microsecond

expectedLog := "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
log := logstatsE2Elog(nextRound, round, numTxns, duration)
expectedLog := "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
log := logstatsE2Elog(round, numTxns, duration)
require.Equal(t, expectedLog, log)

logstatsRex, err := regexp.Compile(`round r=(\d+) \((\d+) txn\) exported in (.*)`)
Expand Down
1 change: 1 addition & 0 deletions conduit/plugins/importers/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ import (
// Call package wide init function
_ "github.com/algorand/conduit/conduit/plugins/importers/algod"
_ "github.com/algorand/conduit/conduit/plugins/importers/filereader"
_ "github.com/algorand/conduit/conduit/plugins/importers/noop"
)
81 changes: 81 additions & 0 deletions conduit/plugins/importers/noop/noop_importer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package noop

import (
"context"
_ "embed" // used to embed config
"fmt"
"time"

"github.com/sirupsen/logrus"

sdk "github.com/algorand/go-algorand-sdk/v2/types"

"github.com/algorand/conduit/conduit/data"
"github.com/algorand/conduit/conduit/plugins"
"github.com/algorand/conduit/conduit/plugins/importers"
)

// PluginName to use when configuring.
var PluginName = "noop"

const sleepForGetBlock = 100 * time.Millisecond

// `noopImporter`s will function without ever erroring. This means they will also process out of order blocks
// which may or may not be desirable for different use cases--it can hide errors in actual importers expecting in order
// block processing.
// The `noopImporter` will maintain `Round` state according to the round of the last block it processed.
// It also sleeps 100 milliseconds between blocks to slow down the pipeline.
type noopImporter struct {
round uint64
cfg ImporterConfig
}

//go:embed sample.yaml
var sampleConfig string

var metadata = plugins.Metadata{
Name: PluginName,
Description: "noop importer",
Deprecated: false,
SampleConfig: sampleConfig,
}

func (imp *noopImporter) Metadata() plugins.Metadata {
return metadata
}

func (imp *noopImporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *logrus.Logger) error {
if err := cfg.UnmarshalConfig(&imp.cfg); err != nil {
return fmt.Errorf("init failure in unmarshalConfig: %v", err)
}
imp.round = imp.cfg.Round
return nil
}

func (imp *noopImporter) Close() error {
return nil
}

func (imp *noopImporter) GetGenesis() (*sdk.Genesis, error) {
return &sdk.Genesis{}, nil
}

func (imp *noopImporter) GetBlock(rnd uint64) (data.BlockData, error) {
time.Sleep(sleepForGetBlock)
imp.round = rnd
return data.BlockData{
BlockHeader: sdk.BlockHeader{
Round: sdk.Round(rnd),
},
}, nil
}

func (imp *noopImporter) Round() uint64 {
return imp.round
}

func init() {
importers.Register(PluginName, importers.ImporterConstructorFunc(func() importers.Importer {
return &noopImporter{}
}))
}
7 changes: 7 additions & 0 deletions conduit/plugins/importers/noop/noop_importer_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package noop

// ImporterConfig specific to the noop importer
type ImporterConfig struct {
// Optionally specify the round to start on
Round uint64 `yaml:"round"`
}
3 changes: 3 additions & 0 deletions conduit/plugins/importers/noop/sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name: noop
# noop has no config
config:
2 changes: 1 addition & 1 deletion e2e_tests/src/e2e_conduit/subslurp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
logger = logging.getLogger(__name__)

# Matches conduit log output:
# "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
# "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
FINISH_ROUND: re.Pattern = re.compile(b"FINISHED Pipeline round r=(\d+)")


Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func runConduitCmdWithConfig(args *data.Args) error {

// Start server
if pCfg.API.Address != "" {
logger.Infof("starting API server on %s", pCfg.API.Address)
shutdown, err := api.StartServer(logger, pline, pCfg.API.Address)
if err != nil {
// Suppress log, it is about to be printed to stderr.
Expand All @@ -114,6 +115,8 @@ func runConduitCmdWithConfig(args *data.Args) error {
return fmt.Errorf("failed to start API server: %w", err)
}
defer shutdown(context.Background())
} else {
logger.Info("API server is disabled")
}

pline.Wait()
Expand Down
43 changes: 43 additions & 0 deletions pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package cli

import (
_ "embed"
"fmt"
"net/http"
"os"
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/algorand/conduit/conduit/data"
_ "github.com/algorand/conduit/conduit/plugins/exporters/noop"
_ "github.com/algorand/conduit/conduit/plugins/importers/noop"
)

// Fills in a temp data dir and creates files
Expand Down Expand Up @@ -135,3 +140,41 @@ func TestLogFile(t *testing.T) {
require.Contains(t, dataStr, "\nWriting logs to file:")
})
}

func TestHealthEndpoint(t *testing.T) {
healthPort := 7777
healthNet := fmt.Sprintf("http://localhost:%d/health", healthPort)

test := func(t *testing.T, address string) {
cfg := data.Config{
ConduitArgs: &data.Args{ConduitDataDir: t.TempDir()},
API: data.API{Address: address},
Importer: data.NameConfigPair{Name: "noop", Config: map[string]interface{}{}},
Processors: nil,
Exporter: data.NameConfigPair{Name: "noop", Config: map[string]interface{}{}},
}
args := setupDataDir(t, cfg)

go func() {
runConduitCmdWithConfig(args)
}()
time.Sleep(1 * time.Second)

resp, err := http.Get(healthNet)
if address != "" {
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
} else {
require.ErrorContains(t, err, "connection refused")
require.Nil(t, resp)
}
}

t.Run("API_OFF", func(t *testing.T) {
test(t, "")
})

t.Run("API_ON", func(t *testing.T) {
test(t, fmt.Sprintf(":%d", healthPort))
})
}

0 comments on commit 39c6941

Please sign in to comment.