Skip to content

Commit

Permalink
abci,txapp,kwild: new chain meta store (#610)
Browse files Browse the repository at this point in the history
* abci,txapp,kwild: new chain meta store

* pg: remove the TxCloser relic
  • Loading branch information
jchappelow committed Mar 22, 2024
1 parent 6493dc5 commit 2cba60f
Show file tree
Hide file tree
Showing 18 changed files with 449 additions and 871 deletions.
197 changes: 135 additions & 62 deletions cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,29 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"slices"
"strings"
"syscall"
"time"

"github.com/kwilteam/kwil-db/cmd/kwild/config"
"github.com/kwilteam/kwil-db/common"
"github.com/kwilteam/kwil-db/common/sql"
"github.com/kwilteam/kwil-db/internal/abci"
"github.com/kwilteam/kwil-db/internal/abci/cometbft"
"github.com/kwilteam/kwil-db/internal/abci/meta"
"github.com/kwilteam/kwil-db/internal/abci/snapshots"
"github.com/kwilteam/kwil-db/internal/accounts"
"github.com/kwilteam/kwil-db/internal/engine/execution"
"github.com/kwilteam/kwil-db/internal/events"
"github.com/kwilteam/kwil-db/internal/events/broadcast"
"github.com/kwilteam/kwil-db/internal/kv"
"github.com/kwilteam/kwil-db/internal/kv/badger"
"github.com/kwilteam/kwil-db/internal/listeners"
admSvc "github.com/kwilteam/kwil-db/internal/services/grpc/admin/v0"
Expand Down Expand Up @@ -55,31 +60,128 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
)

// initStores prepares the datastores with an atomic DB transaction. These
// actions are performed outside of ABCI's DB sessions.
func initStores(d *coreDependencies, db *pg.DB) error {
initTx, err := db.BeginTx(d.ctx)
if err != nil {
return fmt.Errorf("could not start app initialization DB transaction: %w", err)
}
defer initTx.Rollback(d.ctx)

// chain meta data for abci and txApp
initChainMetadata(d, initTx)
// upgrade v0.7.0 that had ABCI meta in badgerDB and nowhere else...
if err = migrateOldChainState(d, initTx); err != nil {
return err
}

// account store
initAccountRepository(d, initTx)

// vote store
initVoteStore(d, initTx)

if err = initTx.Commit(d.ctx); err != nil {
return fmt.Errorf("failed to commit the app initialization DB transaction: %w", err)
}

return nil
}

// migrateOldChainState detects if the old badger-based chain metadata store
// needs to be migrated into the new postgres metadata store and does the update.
func migrateOldChainState(d *coreDependencies, initTx sql.Tx) error {
height, appHash, err := meta.GetChainState(d.ctx, initTx)
if err != nil {
return fmt.Errorf("failed to get chain state from metadata store: %w", err)
}
if height != -1 { // have already updated the meta chain tables
d.log.Infof("Application's chain metadata store loaded at height %d / app hash %x",
height, appHash)
return nil
} // else we are either at genesis or we need to migrate from badger

// detect old badger kv DB for ABCI's metadata
badgerPath := filepath.Join(d.cfg.RootDir, abciDirName, config.ABCIInfoSubDirName)
height, appHash, err = getOldChainState(d, badgerPath)
if err != nil {
return fmt.Errorf("unable to read old metadata store: %w", err)
}
if height < 1 { // badger hadn't been used, and we're just at genesis
if height == 0 { // files existed, but still at genesis
if err = os.RemoveAll(badgerPath); err != nil {
d.log.Errorf("failed to remove old badger db file (%s): %v", badgerPath, err)
}
}
return nil
}

d.log.Infof("Migrating from badger DB chain metadata to postgresql: height %d, apphash %x",
height, appHash)
err = meta.SetChainState(d.ctx, initTx, height, appHash)
if err != nil {
return fmt.Errorf("failed to migrate height and app hash: %w", err)
}

if err = os.RemoveAll(badgerPath); err != nil {
d.log.Errorf("failed to remove old badger db file (%s): %v", badgerPath, err)
}

return nil
}

// getOldChainState attempts to retrieve the height and app hash from any legacy
// badger metadata store. If the folder does not exists, it is not an error, but
// height -1 is returned. If the DB exists but there are no entries, it returns
// 0 and no error.
func getOldChainState(d *coreDependencies, badgerPath string) (int64, []byte, error) {
if _, err := os.Stat(badgerPath); errors.Is(err, os.ErrNotExist) {
return -1, nil, nil // would hit the kv.ErrKeyNotFound case, but we don't want artifacts
}
badgerKv, err := badger.NewBadgerDB(d.ctx, badgerPath, &badger.Options{
Logger: *d.log.Named("old-abci-kv-store"),
})
if err != nil {
return 0, nil, fmt.Errorf("failed to open badger: %w", err)
}

defer badgerKv.Close()

appHash, err := badgerKv.Get([]byte("a"))
if err == kv.ErrKeyNotFound {
return 0, nil, nil
}
if err != nil {
return 0, nil, err
}
height, err := badgerKv.Get([]byte("b"))
if err == kv.ErrKeyNotFound {
return 0, nil, nil
}
if err != nil {
return 0, nil, err
}
return int64(binary.BigEndian.Uint64(height)), slices.Clone(appHash), nil
}

func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
// main postgres db
db := buildDB(d, closers)

// Must of setup and initialization performs database writes outside of
// ABCI's DB sessions, so put the DB in auto-commit mode to allow writes
// without first explicitly creatsing a transaction.
db.AutoCommit(true) // alt: db.BeginTx ... defer tx.Rollback, etc.
if err := initStores(d, db); err != nil {
failBuild(err, "initStores failed")
}

// engine
e := buildEngine(d, db)

// account store
initAccountRepository(d, db)

// these are dummies, but they might need init in the future.
snapshotModule := buildSnapshotter()

bootstrapperModule := buildBootstrapper()

// vote store
// v := buildVoteStore(d, db, accs, e)
initVoteStore(d, db)

// event store
ev := buildEventStore(d, closers)
ev := buildEventStore(d, closers) // makes own DB connection

// this is a hack
// we need the cometbft client to broadcast txs.
Expand All @@ -89,14 +191,11 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
// but the tx router needs the cometbft client
txApp := buildTxApp(d, db, e, ev)

abciApp := buildAbci(d, closers,
txApp, snapshotModule, bootstrapperModule)

// buildCometNode immediately starts talking to the abciApp and replaying
// blocks (and using atomic db tx commits), i.e. calling
// FinalizeBlock+Commit, so we have to be done with ad-hoc DB writes here.
db.AutoCommit(false)
abciApp := buildAbci(d, txApp, snapshotModule, bootstrapperModule)

// NOTE: buildCometNode immediately starts talking to the abciApp and
// replaying blocks (and using atomic db tx commits), i.e. calling
// FinalizeBlock+Commit. This is not just a constructor, sadly.
cometBftNode := buildCometNode(d, closers, abciApp)

cometBftClient := buildCometBftClient(cometBftNode)
Expand Down Expand Up @@ -207,23 +306,16 @@ func (c *closeFuncs) closeAll() error {
}

func buildTxApp(d *coreDependencies, db *pg.DB, engine *execution.GlobalContext, ev *events.EventStore) *txapp.TxApp {
return txapp.NewTxApp(db, engine, buildSigner(d), ev, d.genesisCfg.ChainID,
txApp, err := txapp.NewTxApp(db, engine, buildSigner(d), ev, d.genesisCfg.ChainID,
!d.genesisCfg.ConsensusParams.WithoutGasCosts, d.cfg.AppCfg.Extensions, *d.log.Named("tx-router"))
}

func buildAbci(d *coreDependencies, closer *closeFuncs, txApp abci.TxApp,
snapshotter *snapshots.SnapshotStore, bootstrapper *snapshots.Bootstrapper) *abci.AbciApp {
badgerPath := filepath.Join(d.cfg.RootDir, abciDirName, config.ABCIInfoSubDirName)
badgerKv, err := badger.NewBadgerDB(d.ctx, badgerPath, &badger.Options{
GuaranteeFSync: true,
Logger: *d.log.Named("abci-kv-store"),
})
d.log.Info(fmt.Sprintf("created ABCI kv DB in %v", badgerPath))
if err != nil {
failBuild(err, "failed to open badger")
failBuild(err, "failed to build new TxApp")
}
closer.addCloser(badgerKv.Close)
return txApp
}

func buildAbci(d *coreDependencies, txApp abci.TxApp, snapshotter *snapshots.SnapshotStore,
bootstrapper *snapshots.Bootstrapper) *abci.AbciApp {
var sh abci.SnapshotModule
if snapshotter != nil {
sh = snapshotter
Expand All @@ -236,11 +328,7 @@ func buildAbci(d *coreDependencies, closer *closeFuncs, txApp abci.TxApp,
GenesisAllocs: d.genesisCfg.Alloc,
GasEnabled: !d.genesisCfg.ConsensusParams.WithoutGasCosts,
}
return abci.NewAbciApp(cfg,
badgerKv,
sh,
bootstrapper,
txApp,
return abci.NewAbciApp(cfg, sh, bootstrapper, txApp,
&txapp.ConsensusParams{
VotingPeriod: d.genesisCfg.ConsensusParams.Votes.VoteExpiry,
JoinVoteExpiration: d.genesisCfg.ConsensusParams.Validator.JoinExpiry,
Expand All @@ -253,22 +341,11 @@ func buildEventBroadcaster(d *coreDependencies, ev broadcast.EventStore, b broad
return broadcast.NewEventBroadcaster(ev, b, txapp, txapp, buildSigner(d), d.genesisCfg.ChainID)
}

func initVoteStore(d *coreDependencies, db *pg.DB) {
tx, err := db.BeginTx(d.ctx)
if err != nil {
failBuild(err, "failed to start transaction")
}
defer tx.Rollback(d.ctx)

err = voting.InitializeVoteStore(d.ctx, tx)
func initVoteStore(d *coreDependencies, tx sql.Tx) {
err := voting.InitializeVoteStore(d.ctx, tx)
if err != nil {
failBuild(err, "failed to initialize vote store")
}

err = tx.Commit(d.ctx)
if err != nil {
failBuild(err, "failed to commit")
}
}

func buildEventStore(d *coreDependencies, closers *closeFuncs) *events.EventStore {
Expand All @@ -278,8 +355,8 @@ func buildEventStore(d *coreDependencies, closers *closeFuncs) *events.EventStor
if err != nil {
failBuild(err, "failed to build event store")
}

closers.addCloser(db.Close)

e, err := events.NewEventStore(d.ctx, db)
if err != nil {
failBuild(err, "failed to build event store")
Expand Down Expand Up @@ -356,22 +433,18 @@ func buildEngine(d *coreDependencies, db *pg.DB) *execution.GlobalContext {
return eng
}

func initAccountRepository(d *coreDependencies, db *pg.DB) {
tx, err := db.BeginTx(d.ctx)
func initChainMetadata(d *coreDependencies, tx sql.Tx) {
err := meta.InitializeMetaStore(d.ctx, tx)
if err != nil {
failBuild(err, "failed to start transaction")
failBuild(err, "failed to initialize chain metadata store")
}
defer tx.Rollback(d.ctx)
}

err = accounts.InitializeAccountStore(d.ctx, tx)
func initAccountRepository(d *coreDependencies, tx sql.Tx) {
err := accounts.InitializeAccountStore(d.ctx, tx)
if err != nil {
failBuild(err, "failed to initialize account store")
}

err = tx.Commit(d.ctx)
if err != nil {
failBuild(err, "failed to commit")
}
}

func buildSnapshotter() *snapshots.SnapshotStore {
Expand Down
14 changes: 5 additions & 9 deletions common/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,18 @@ type TxMaker interface {
BeginTx(ctx context.Context) (Tx, error)
}

// TxCloser terminates a transaction by committing or rolling it back.
type TxCloser interface {
// Rollback rolls back the transaction.
Rollback(ctx context.Context) error
// Commit commits the transaction.
Commit(ctx context.Context) error
}

// Tx represents a database transaction. It can be nested within other
// transactions, and create new nested transactions. An implementation of Tx may
// also be an AccessModer, but it is not required.
type Tx interface {
Executor
TxCloser
TxMaker // recursive interface
// note: does not embed DB for clear semantics (DB makes a Tx, not the reverse)

// Rollback rolls back the transaction.
Rollback(ctx context.Context) error
// Commit commits the transaction.
Commit(ctx context.Context) error
}

// DB is a top level database interface, which may directly execute queries or
Expand Down
Loading

0 comments on commit 2cba60f

Please sign in to comment.