Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abci,txapp,kwild: new chain meta store #610

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 135 additions & 62 deletions cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,29 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"slices"
"strings"
"syscall"
"time"

"github.com/kwilteam/kwil-db/cmd/kwild/config"
"github.com/kwilteam/kwil-db/common"
"github.com/kwilteam/kwil-db/common/sql"
"github.com/kwilteam/kwil-db/internal/abci"
"github.com/kwilteam/kwil-db/internal/abci/cometbft"
"github.com/kwilteam/kwil-db/internal/abci/meta"
"github.com/kwilteam/kwil-db/internal/abci/snapshots"
"github.com/kwilteam/kwil-db/internal/accounts"
"github.com/kwilteam/kwil-db/internal/engine/execution"
"github.com/kwilteam/kwil-db/internal/events"
"github.com/kwilteam/kwil-db/internal/events/broadcast"
"github.com/kwilteam/kwil-db/internal/kv"
"github.com/kwilteam/kwil-db/internal/kv/badger"
"github.com/kwilteam/kwil-db/internal/listeners"
admSvc "github.com/kwilteam/kwil-db/internal/services/grpc/admin/v0"
Expand Down Expand Up @@ -55,31 +60,128 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
)

// initStores prepares the datastores with an atomic DB transaction. These
// actions are performed outside of ABCI's DB sessions.
func initStores(d *coreDependencies, db *pg.DB) error {
initTx, err := db.BeginTx(d.ctx)
if err != nil {
return fmt.Errorf("could not start app initialization DB transaction: %w", err)
}
defer initTx.Rollback(d.ctx)

// chain meta data for abci and txApp
initChainMetadata(d, initTx)
// upgrade v0.7.0 that had ABCI meta in badgerDB and nowhere else...
if err = migrateOldChainState(d, initTx); err != nil {
return err
}

// account store
initAccountRepository(d, initTx)

// vote store
initVoteStore(d, initTx)

if err = initTx.Commit(d.ctx); err != nil {
return fmt.Errorf("failed to commit the app initialization DB transaction: %w", err)
}

return nil
}

// migrateOldChainState detects if the old badger-based chain metadata store
// needs to be migrated into the new postgres metadata store and does the update.
func migrateOldChainState(d *coreDependencies, initTx sql.Tx) error {
height, appHash, err := meta.GetChainState(d.ctx, initTx)
if err != nil {
return fmt.Errorf("failed to get chain state from metadata store: %w", err)
}
if height != -1 { // have already updated the meta chain tables
d.log.Infof("Application's chain metadata store loaded at height %d / app hash %x",
height, appHash)
return nil
} // else we are either at genesis or we need to migrate from badger

// detect old badger kv DB for ABCI's metadata
badgerPath := filepath.Join(d.cfg.RootDir, abciDirName, config.ABCIInfoSubDirName)
height, appHash, err = getOldChainState(d, badgerPath)
if err != nil {
return fmt.Errorf("unable to read old metadata store: %w", err)
}
if height < 1 { // badger hadn't been used, and we're just at genesis
if height == 0 { // files existed, but still at genesis
if err = os.RemoveAll(badgerPath); err != nil {
d.log.Errorf("failed to remove old badger db file (%s): %v", badgerPath, err)
}
}
return nil
}

d.log.Infof("Migrating from badger DB chain metadata to postgresql: height %d, apphash %x",
height, appHash)
err = meta.SetChainState(d.ctx, initTx, height, appHash)
if err != nil {
return fmt.Errorf("failed to migrate height and app hash: %w", err)
}

if err = os.RemoveAll(badgerPath); err != nil {
d.log.Errorf("failed to remove old badger db file (%s): %v", badgerPath, err)
}

return nil
}

// getOldChainState attempts to retrieve the height and app hash from any legacy
// badger metadata store. If the folder does not exists, it is not an error, but
// height -1 is returned. If the DB exists but there are no entries, it returns
// 0 and no error.
func getOldChainState(d *coreDependencies, badgerPath string) (int64, []byte, error) {
if _, err := os.Stat(badgerPath); errors.Is(err, os.ErrNotExist) {
return -1, nil, nil // would hit the kv.ErrKeyNotFound case, but we don't want artifacts
}
badgerKv, err := badger.NewBadgerDB(d.ctx, badgerPath, &badger.Options{
Logger: *d.log.Named("old-abci-kv-store"),
})
if err != nil {
return 0, nil, fmt.Errorf("failed to open badger: %w", err)
}

defer badgerKv.Close()

appHash, err := badgerKv.Get([]byte("a"))
if err == kv.ErrKeyNotFound {
return 0, nil, nil
}
if err != nil {
return 0, nil, err
}
height, err := badgerKv.Get([]byte("b"))
if err == kv.ErrKeyNotFound {
return 0, nil, nil
}
if err != nil {
return 0, nil, err
}
return int64(binary.BigEndian.Uint64(height)), slices.Clone(appHash), nil
}

func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
// main postgres db
db := buildDB(d, closers)

// Must of setup and initialization performs database writes outside of
// ABCI's DB sessions, so put the DB in auto-commit mode to allow writes
// without first explicitly creatsing a transaction.
db.AutoCommit(true) // alt: db.BeginTx ... defer tx.Rollback, etc.
if err := initStores(d, db); err != nil {
failBuild(err, "initStores failed")
}

// engine
e := buildEngine(d, db)

// account store
initAccountRepository(d, db)

// these are dummies, but they might need init in the future.
snapshotModule := buildSnapshotter()

bootstrapperModule := buildBootstrapper()

// vote store
// v := buildVoteStore(d, db, accs, e)
initVoteStore(d, db)

// event store
ev := buildEventStore(d, closers)
ev := buildEventStore(d, closers) // makes own DB connection

// this is a hack
// we need the cometbft client to broadcast txs.
Expand All @@ -89,14 +191,11 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
// but the tx router needs the cometbft client
txApp := buildTxApp(d, db, e, ev)

abciApp := buildAbci(d, closers,
txApp, snapshotModule, bootstrapperModule)

// buildCometNode immediately starts talking to the abciApp and replaying
// blocks (and using atomic db tx commits), i.e. calling
// FinalizeBlock+Commit, so we have to be done with ad-hoc DB writes here.
db.AutoCommit(false)
abciApp := buildAbci(d, txApp, snapshotModule, bootstrapperModule)

// NOTE: buildCometNode immediately starts talking to the abciApp and
// replaying blocks (and using atomic db tx commits), i.e. calling
// FinalizeBlock+Commit. This is not just a constructor, sadly.
cometBftNode := buildCometNode(d, closers, abciApp)

cometBftClient := buildCometBftClient(cometBftNode)
Expand Down Expand Up @@ -207,23 +306,16 @@ func (c *closeFuncs) closeAll() error {
}

func buildTxApp(d *coreDependencies, db *pg.DB, engine *execution.GlobalContext, ev *events.EventStore) *txapp.TxApp {
return txapp.NewTxApp(db, engine, buildSigner(d), ev, d.genesisCfg.ChainID,
txApp, err := txapp.NewTxApp(db, engine, buildSigner(d), ev, d.genesisCfg.ChainID,
!d.genesisCfg.ConsensusParams.WithoutGasCosts, d.cfg.AppCfg.Extensions, *d.log.Named("tx-router"))
}

func buildAbci(d *coreDependencies, closer *closeFuncs, txApp abci.TxApp,
snapshotter *snapshots.SnapshotStore, bootstrapper *snapshots.Bootstrapper) *abci.AbciApp {
badgerPath := filepath.Join(d.cfg.RootDir, abciDirName, config.ABCIInfoSubDirName)
badgerKv, err := badger.NewBadgerDB(d.ctx, badgerPath, &badger.Options{
GuaranteeFSync: true,
Logger: *d.log.Named("abci-kv-store"),
})
d.log.Info(fmt.Sprintf("created ABCI kv DB in %v", badgerPath))
if err != nil {
failBuild(err, "failed to open badger")
failBuild(err, "failed to build new TxApp")
}
closer.addCloser(badgerKv.Close)
return txApp
}

func buildAbci(d *coreDependencies, txApp abci.TxApp, snapshotter *snapshots.SnapshotStore,
bootstrapper *snapshots.Bootstrapper) *abci.AbciApp {
var sh abci.SnapshotModule
if snapshotter != nil {
sh = snapshotter
Expand All @@ -236,11 +328,7 @@ func buildAbci(d *coreDependencies, closer *closeFuncs, txApp abci.TxApp,
GenesisAllocs: d.genesisCfg.Alloc,
GasEnabled: !d.genesisCfg.ConsensusParams.WithoutGasCosts,
}
return abci.NewAbciApp(cfg,
badgerKv,
sh,
bootstrapper,
txApp,
return abci.NewAbciApp(cfg, sh, bootstrapper, txApp,
&txapp.ConsensusParams{
VotingPeriod: d.genesisCfg.ConsensusParams.Votes.VoteExpiry,
JoinVoteExpiration: d.genesisCfg.ConsensusParams.Validator.JoinExpiry,
Expand All @@ -253,22 +341,11 @@ func buildEventBroadcaster(d *coreDependencies, ev broadcast.EventStore, b broad
return broadcast.NewEventBroadcaster(ev, b, txapp, txapp, buildSigner(d), d.genesisCfg.ChainID)
}

func initVoteStore(d *coreDependencies, db *pg.DB) {
tx, err := db.BeginTx(d.ctx)
if err != nil {
failBuild(err, "failed to start transaction")
}
defer tx.Rollback(d.ctx)

err = voting.InitializeVoteStore(d.ctx, tx)
func initVoteStore(d *coreDependencies, tx sql.Tx) {
err := voting.InitializeVoteStore(d.ctx, tx)
if err != nil {
failBuild(err, "failed to initialize vote store")
}

err = tx.Commit(d.ctx)
if err != nil {
failBuild(err, "failed to commit")
}
}

func buildEventStore(d *coreDependencies, closers *closeFuncs) *events.EventStore {
Expand All @@ -278,8 +355,8 @@ func buildEventStore(d *coreDependencies, closers *closeFuncs) *events.EventStor
if err != nil {
failBuild(err, "failed to build event store")
}

closers.addCloser(db.Close)

e, err := events.NewEventStore(d.ctx, db)
if err != nil {
failBuild(err, "failed to build event store")
Expand Down Expand Up @@ -356,22 +433,18 @@ func buildEngine(d *coreDependencies, db *pg.DB) *execution.GlobalContext {
return eng
}

func initAccountRepository(d *coreDependencies, db *pg.DB) {
tx, err := db.BeginTx(d.ctx)
func initChainMetadata(d *coreDependencies, tx sql.Tx) {
err := meta.InitializeMetaStore(d.ctx, tx)
if err != nil {
failBuild(err, "failed to start transaction")
failBuild(err, "failed to initialize chain metadata store")
}
defer tx.Rollback(d.ctx)
}

err = accounts.InitializeAccountStore(d.ctx, tx)
func initAccountRepository(d *coreDependencies, tx sql.Tx) {
err := accounts.InitializeAccountStore(d.ctx, tx)
if err != nil {
failBuild(err, "failed to initialize account store")
}

err = tx.Commit(d.ctx)
if err != nil {
failBuild(err, "failed to commit")
}
}

func buildSnapshotter() *snapshots.SnapshotStore {
Expand Down
14 changes: 5 additions & 9 deletions common/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,18 @@ type TxMaker interface {
BeginTx(ctx context.Context) (Tx, error)
}

// TxCloser terminates a transaction by committing or rolling it back.
type TxCloser interface {
// Rollback rolls back the transaction.
Rollback(ctx context.Context) error
// Commit commits the transaction.
Commit(ctx context.Context) error
}

// Tx represents a database transaction. It can be nested within other
// transactions, and create new nested transactions. An implementation of Tx may
// also be an AccessModer, but it is not required.
type Tx interface {
Executor
TxCloser
TxMaker // recursive interface
// note: does not embed DB for clear semantics (DB makes a Tx, not the reverse)

// Rollback rolls back the transaction.
Rollback(ctx context.Context) error
// Commit commits the transaction.
Commit(ctx context.Context) error
}

// DB is a top level database interface, which may directly execute queries or
Expand Down
Loading
Loading