diff --git a/cmd/kwild/server/build.go b/cmd/kwild/server/build.go index 6cdb30310..d3a4e3204 100644 --- a/cmd/kwild/server/build.go +++ b/cmd/kwild/server/build.go @@ -4,24 +4,29 @@ import ( "context" "crypto/tls" "crypto/x509" + "encoding/binary" "errors" "fmt" "net" "os" "path/filepath" + "slices" "strings" "syscall" "time" "github.com/kwilteam/kwil-db/cmd/kwild/config" "github.com/kwilteam/kwil-db/common" + "github.com/kwilteam/kwil-db/common/sql" "github.com/kwilteam/kwil-db/internal/abci" "github.com/kwilteam/kwil-db/internal/abci/cometbft" + "github.com/kwilteam/kwil-db/internal/abci/meta" "github.com/kwilteam/kwil-db/internal/abci/snapshots" "github.com/kwilteam/kwil-db/internal/accounts" "github.com/kwilteam/kwil-db/internal/engine/execution" "github.com/kwilteam/kwil-db/internal/events" "github.com/kwilteam/kwil-db/internal/events/broadcast" + "github.com/kwilteam/kwil-db/internal/kv" "github.com/kwilteam/kwil-db/internal/kv/badger" "github.com/kwilteam/kwil-db/internal/listeners" admSvc "github.com/kwilteam/kwil-db/internal/services/grpc/admin/v0" @@ -55,31 +60,128 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" ) +// initStores prepares the datastores with an atomic DB transaction. These +// actions are performed outside of ABCI's DB sessions. +func initStores(d *coreDependencies, db *pg.DB) error { + initTx, err := db.BeginTx(d.ctx) + if err != nil { + return fmt.Errorf("could not start app initialization DB transaction: %w", err) + } + defer initTx.Rollback(d.ctx) + + // chain meta data for abci and txApp + initChainMetadata(d, initTx) + // upgrade v0.7.0 that had ABCI meta in badgerDB and nowhere else... + if err = migrateOldChainState(d, initTx); err != nil { + return err + } + + // account store + initAccountRepository(d, initTx) + + // vote store + initVoteStore(d, initTx) + + if err = initTx.Commit(d.ctx); err != nil { + return fmt.Errorf("failed to commit the app initialization DB transaction: %w", err) + } + + return nil +} + +// migrateOldChainState detects if the old badger-based chain metadata store +// needs to be migrated into the new postgres metadata store and does the update. +func migrateOldChainState(d *coreDependencies, initTx sql.Tx) error { + height, appHash, err := meta.GetChainState(d.ctx, initTx) + if err != nil { + return fmt.Errorf("failed to get chain state from metadata store: %w", err) + } + if height != -1 { // have already updated the meta chain tables + d.log.Infof("Application's chain metadata store loaded at height %d / app hash %x", + height, appHash) + return nil + } // else we are either at genesis or we need to migrate from badger + + // detect old badger kv DB for ABCI's metadata + badgerPath := filepath.Join(d.cfg.RootDir, abciDirName, config.ABCIInfoSubDirName) + height, appHash, err = getOldChainState(d, badgerPath) + if err != nil { + return fmt.Errorf("unable to read old metadata store: %w", err) + } + if height < 1 { // badger hadn't been used, and we're just at genesis + if height == 0 { // files existed, but still at genesis + if err = os.RemoveAll(badgerPath); err != nil { + d.log.Errorf("failed to remove old badger db file (%s): %v", badgerPath, err) + } + } + return nil + } + + d.log.Infof("Migrating from badger DB chain metadata to postgresql: height %d, apphash %x", + height, appHash) + err = meta.SetChainState(d.ctx, initTx, height, appHash) + if err != nil { + return fmt.Errorf("failed to migrate height and app hash: %w", err) + } + + if err = os.RemoveAll(badgerPath); err != nil { + d.log.Errorf("failed to remove old badger db file (%s): %v", badgerPath, err) + } + + return nil +} + +// getOldChainState attempts to retrieve the height and app hash from any legacy +// badger metadata store. If the folder does not exists, it is not an error, but +// height -1 is returned. If the DB exists but there are no entries, it returns +// 0 and no error. +func getOldChainState(d *coreDependencies, badgerPath string) (int64, []byte, error) { + if _, err := os.Stat(badgerPath); errors.Is(err, os.ErrNotExist) { + return -1, nil, nil // would hit the kv.ErrKeyNotFound case, but we don't want artifacts + } + badgerKv, err := badger.NewBadgerDB(d.ctx, badgerPath, &badger.Options{ + Logger: *d.log.Named("old-abci-kv-store"), + }) + if err != nil { + return 0, nil, fmt.Errorf("failed to open badger: %w", err) + } + + defer badgerKv.Close() + + appHash, err := badgerKv.Get([]byte("a")) + if err == kv.ErrKeyNotFound { + return 0, nil, nil + } + if err != nil { + return 0, nil, err + } + height, err := badgerKv.Get([]byte("b")) + if err == kv.ErrKeyNotFound { + return 0, nil, nil + } + if err != nil { + return 0, nil, err + } + return int64(binary.BigEndian.Uint64(height)), slices.Clone(appHash), nil +} + func buildServer(d *coreDependencies, closers *closeFuncs) *Server { // main postgres db db := buildDB(d, closers) - // Must of setup and initialization performs database writes outside of - // ABCI's DB sessions, so put the DB in auto-commit mode to allow writes - // without first explicitly creatsing a transaction. - db.AutoCommit(true) // alt: db.BeginTx ... defer tx.Rollback, etc. + if err := initStores(d, db); err != nil { + failBuild(err, "initStores failed") + } // engine e := buildEngine(d, db) - // account store - initAccountRepository(d, db) - + // these are dummies, but they might need init in the future. snapshotModule := buildSnapshotter() - bootstrapperModule := buildBootstrapper() - // vote store - // v := buildVoteStore(d, db, accs, e) - initVoteStore(d, db) - // event store - ev := buildEventStore(d, closers) + ev := buildEventStore(d, closers) // makes own DB connection // this is a hack // we need the cometbft client to broadcast txs. @@ -89,14 +191,11 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server { // but the tx router needs the cometbft client txApp := buildTxApp(d, db, e, ev) - abciApp := buildAbci(d, closers, - txApp, snapshotModule, bootstrapperModule) - - // buildCometNode immediately starts talking to the abciApp and replaying - // blocks (and using atomic db tx commits), i.e. calling - // FinalizeBlock+Commit, so we have to be done with ad-hoc DB writes here. - db.AutoCommit(false) + abciApp := buildAbci(d, txApp, snapshotModule, bootstrapperModule) + // NOTE: buildCometNode immediately starts talking to the abciApp and + // replaying blocks (and using atomic db tx commits), i.e. calling + // FinalizeBlock+Commit. This is not just a constructor, sadly. cometBftNode := buildCometNode(d, closers, abciApp) cometBftClient := buildCometBftClient(cometBftNode) @@ -207,23 +306,16 @@ func (c *closeFuncs) closeAll() error { } func buildTxApp(d *coreDependencies, db *pg.DB, engine *execution.GlobalContext, ev *events.EventStore) *txapp.TxApp { - return txapp.NewTxApp(db, engine, buildSigner(d), ev, d.genesisCfg.ChainID, + txApp, err := txapp.NewTxApp(db, engine, buildSigner(d), ev, d.genesisCfg.ChainID, !d.genesisCfg.ConsensusParams.WithoutGasCosts, d.cfg.AppCfg.Extensions, *d.log.Named("tx-router")) -} - -func buildAbci(d *coreDependencies, closer *closeFuncs, txApp abci.TxApp, - snapshotter *snapshots.SnapshotStore, bootstrapper *snapshots.Bootstrapper) *abci.AbciApp { - badgerPath := filepath.Join(d.cfg.RootDir, abciDirName, config.ABCIInfoSubDirName) - badgerKv, err := badger.NewBadgerDB(d.ctx, badgerPath, &badger.Options{ - GuaranteeFSync: true, - Logger: *d.log.Named("abci-kv-store"), - }) - d.log.Info(fmt.Sprintf("created ABCI kv DB in %v", badgerPath)) if err != nil { - failBuild(err, "failed to open badger") + failBuild(err, "failed to build new TxApp") } - closer.addCloser(badgerKv.Close) + return txApp +} +func buildAbci(d *coreDependencies, txApp abci.TxApp, snapshotter *snapshots.SnapshotStore, + bootstrapper *snapshots.Bootstrapper) *abci.AbciApp { var sh abci.SnapshotModule if snapshotter != nil { sh = snapshotter @@ -236,11 +328,7 @@ func buildAbci(d *coreDependencies, closer *closeFuncs, txApp abci.TxApp, GenesisAllocs: d.genesisCfg.Alloc, GasEnabled: !d.genesisCfg.ConsensusParams.WithoutGasCosts, } - return abci.NewAbciApp(cfg, - badgerKv, - sh, - bootstrapper, - txApp, + return abci.NewAbciApp(cfg, sh, bootstrapper, txApp, &txapp.ConsensusParams{ VotingPeriod: d.genesisCfg.ConsensusParams.Votes.VoteExpiry, JoinVoteExpiration: d.genesisCfg.ConsensusParams.Validator.JoinExpiry, @@ -253,22 +341,11 @@ func buildEventBroadcaster(d *coreDependencies, ev broadcast.EventStore, b broad return broadcast.NewEventBroadcaster(ev, b, txapp, txapp, buildSigner(d), d.genesisCfg.ChainID) } -func initVoteStore(d *coreDependencies, db *pg.DB) { - tx, err := db.BeginTx(d.ctx) - if err != nil { - failBuild(err, "failed to start transaction") - } - defer tx.Rollback(d.ctx) - - err = voting.InitializeVoteStore(d.ctx, tx) +func initVoteStore(d *coreDependencies, tx sql.Tx) { + err := voting.InitializeVoteStore(d.ctx, tx) if err != nil { failBuild(err, "failed to initialize vote store") } - - err = tx.Commit(d.ctx) - if err != nil { - failBuild(err, "failed to commit") - } } func buildEventStore(d *coreDependencies, closers *closeFuncs) *events.EventStore { @@ -278,8 +355,8 @@ func buildEventStore(d *coreDependencies, closers *closeFuncs) *events.EventStor if err != nil { failBuild(err, "failed to build event store") } - closers.addCloser(db.Close) + e, err := events.NewEventStore(d.ctx, db) if err != nil { failBuild(err, "failed to build event store") @@ -356,22 +433,18 @@ func buildEngine(d *coreDependencies, db *pg.DB) *execution.GlobalContext { return eng } -func initAccountRepository(d *coreDependencies, db *pg.DB) { - tx, err := db.BeginTx(d.ctx) +func initChainMetadata(d *coreDependencies, tx sql.Tx) { + err := meta.InitializeMetaStore(d.ctx, tx) if err != nil { - failBuild(err, "failed to start transaction") + failBuild(err, "failed to initialize chain metadata store") } - defer tx.Rollback(d.ctx) +} - err = accounts.InitializeAccountStore(d.ctx, tx) +func initAccountRepository(d *coreDependencies, tx sql.Tx) { + err := accounts.InitializeAccountStore(d.ctx, tx) if err != nil { failBuild(err, "failed to initialize account store") } - - err = tx.Commit(d.ctx) - if err != nil { - failBuild(err, "failed to commit") - } } func buildSnapshotter() *snapshots.SnapshotStore { diff --git a/common/sql/sql.go b/common/sql/sql.go index 51dffc917..c8a47f692 100644 --- a/common/sql/sql.go +++ b/common/sql/sql.go @@ -40,22 +40,18 @@ type TxMaker interface { BeginTx(ctx context.Context) (Tx, error) } -// TxCloser terminates a transaction by committing or rolling it back. -type TxCloser interface { - // Rollback rolls back the transaction. - Rollback(ctx context.Context) error - // Commit commits the transaction. - Commit(ctx context.Context) error -} - // Tx represents a database transaction. It can be nested within other // transactions, and create new nested transactions. An implementation of Tx may // also be an AccessModer, but it is not required. type Tx interface { Executor - TxCloser TxMaker // recursive interface // note: does not embed DB for clear semantics (DB makes a Tx, not the reverse) + + // Rollback rolls back the transaction. + Rollback(ctx context.Context) error + // Commit commits the transaction. + Commit(ctx context.Context) error } // DB is a top level database interface, which may directly execute queries or diff --git a/internal/abci/abci.go b/internal/abci/abci.go index 1967f2840..9a86f44d6 100644 --- a/internal/abci/abci.go +++ b/internal/abci/abci.go @@ -3,7 +3,6 @@ package abci import ( "bytes" "context" - "encoding/binary" "encoding/hex" "errors" "fmt" @@ -11,12 +10,10 @@ import ( "sort" "strings" - "github.com/kwilteam/kwil-db/core/crypto" "github.com/kwilteam/kwil-db/core/log" "github.com/kwilteam/kwil-db/core/types" "github.com/kwilteam/kwil-db/core/types/transactions" "github.com/kwilteam/kwil-db/internal/ident" - "github.com/kwilteam/kwil-db/internal/kv" "github.com/kwilteam/kwil-db/internal/txapp" abciTypes "github.com/cometbft/cometbft/abci/types" @@ -42,19 +39,10 @@ type AbciConfig struct { GasEnabled bool } -type AtomicCommitter interface { - Begin(ctx context.Context, idempotencyKey []byte) error - Precommit(ctx context.Context) ([]byte, error) - Commit(ctx context.Context) error -} - -func NewAbciApp(cfg *AbciConfig, kv KVStore, snapshotter SnapshotModule, - bootstrapper DBBootstrapModule, txRouter TxApp, consensusParams *txapp.ConsensusParams, log log.Logger) *AbciApp { +func NewAbciApp(cfg *AbciConfig, snapshotter SnapshotModule, bootstrapper DBBootstrapModule, + txRouter TxApp, consensusParams *txapp.ConsensusParams, log log.Logger) *AbciApp { app := &AbciApp{ - cfg: *cfg, - metadataStore: &metadataStore{ - kv: kv, - }, + cfg: *cfg, bootstrapper: bootstrapper, snapshotter: snapshotter, txApp: txRouter, @@ -97,9 +85,6 @@ type AbciApp struct { // bootstrapper is the bootstrapper module that handles bootstrapping the database bootstrapper DBBootstrapModule - // metadataStore to track the app hash and block height - metadataStore *metadataStore - log log.Logger // Expected AppState after bootstrapping the node with a given snapshot, @@ -110,6 +95,10 @@ type AbciApp struct { consensusParams *txapp.ConsensusParams + // height carries the height from FinalizeBlock to Commit for the snapshotter. + // Ultimately this may not be required, or TxApp could provide the info. + height uint64 + broadcastFn EventBroadcaster // validatorAddressToPubKey is a map of validator addresses to their public keys @@ -218,7 +207,9 @@ func (a *AbciApp) CheckTx(ctx context.Context, incoming *abciTypes.RequestCheckT return &abciTypes.ResponseCheckTx{Code: code.Uint32()}, nil } -// FinalizeBlock is on the consensus connection +// FinalizeBlock is on the consensus connection. Note that according to CometBFT +// docs, "ResponseFinalizeBlock.app_hash is included as the Header.AppHash in +// the next block." func (a *AbciApp) FinalizeBlock(ctx context.Context, req *abciTypes.RequestFinalizeBlock) (*abciTypes.ResponseFinalizeBlock, error) { fmt.Printf("\n\n") logger := a.log.With(zap.String("stage", "ABCI FinalizeBlock"), zap.Int64("height", req.Height)) @@ -310,10 +301,13 @@ func (a *AbciApp) FinalizeBlock(ctx context.Context, req *abciTypes.RequestFinal } } - newAppHash, finalValidators, err := a.txApp.Finalize(ctx, req.Height) + // Get the new validator set and apphash from txApp. + appHash, finalValidators, err := a.txApp.Finalize(ctx, req.Height) if err != nil { return nil, fmt.Errorf("failed to finalize transaction app: %w", err) } + res.AppHash = appHash + a.height = uint64(req.Height) // remember for Commit valUpdates := validatorUpdates(initialValidators, finalValidators) @@ -332,12 +326,6 @@ func (a *AbciApp) FinalizeBlock(ctx context.Context, req *abciTypes.RequestFinal res.ValidatorUpdates[i] = abciTypes.Ed25519ValidatorUpdate(up.PubKey, up.Power) } - appHash, err := a.createNewAppHash(ctx, newAppHash) - if err != nil { - return nil, fmt.Errorf("failed to create new app hash: %w", err) - } - res.AppHash = appHash - return res, nil } @@ -389,20 +377,10 @@ func (a *AbciApp) Commit(ctx context.Context, _ *abciTypes.RequestCommit) (*abci return nil, fmt.Errorf("failed to commit transaction app: %w", err) } - err = a.metadataStore.IncrementBlockHeight(ctx) - if err != nil { - return nil, fmt.Errorf("failed to increment block height: %w", err) - } - - height, err := a.metadataStore.GetBlockHeight(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get block height: %w", err) - } - // snapshotting - if a.snapshotter != nil && a.snapshotter.IsSnapshotDue(uint64(height)) { + if a.snapshotter != nil && a.snapshotter.IsSnapshotDue(a.height) { // TODO: Lock all DBs - err = a.snapshotter.CreateSnapshot(uint64(height)) + err = a.snapshotter.CreateSnapshot(a.height) if err != nil { a.log.Error("snapshot creation failed", zap.Error(err)) } @@ -412,12 +390,26 @@ func (a *AbciApp) Commit(ctx context.Context, _ *abciTypes.RequestCommit) (*abci return &abciTypes.ResponseCommit{}, nil // RetainHeight stays 0 to not prune any blocks } -// Info is part of the Info/Query connection. +// Info is part of the Info/Query connection. CometBFT will call this during +// it's handshake, and if height 0 is returned it will then use InitChain. This +// method should also be usable at any time (and read only) as it is used for +// cometbft's /abci_info RPC endpoint. +// +// CometBFT docs say: +// - LastBlockHeight is the "Latest height for which the app persisted its state" +// - LastBlockAppHash is the "Latest AppHash returned by FinalizeBlock" +// - "ResponseFinalizeBlock.app_hash is included as the Header.AppHash in the +// next block." Notably, the *next* block's header. This is verifiable with +// the /block RPC endpoint. +// +// Thus, LastBlockAppHash is not NOT AppHash in the header of the block at the +// returned height. Our meta data store has to track the above values, where the +// stored app hash corresponds to the block at height+1. This is simple, but the +// discrepancy is worth noting. func (a *AbciApp) Info(ctx context.Context, _ *abciTypes.RequestInfo) (*abciTypes.ResponseInfo, error) { - // TODO: check kwild_voting.height!!!!!!!!!!!!!! - height, err := a.metadataStore.GetBlockHeight(ctx) + height, appHash, err := a.txApp.ChainInfo(ctx) if err != nil { - return nil, fmt.Errorf("failed to get block height: %w", err) + return nil, fmt.Errorf("chainInfo: %w", err) } validators, err := a.txApp.GetValidators(ctx) @@ -434,11 +426,6 @@ func (a *AbciApp) Info(ctx context.Context, _ *abciTypes.RequestInfo) (*abciType a.validatorAddressToPubKey[addr] = val.PubKey } - appHash, err := a.metadataStore.GetAppHash(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get app hash: %w", err) - } - a.log.Info("ABCI application is ready", zap.Int64("height", height)) return &abciTypes.ResponseInfo{ @@ -487,7 +474,8 @@ func (a *AbciApp) InitChain(ctx context.Context, req *abciTypes.RequestInitChain a.validatorAddressToPubKey[addr] = pk } - if err := a.txApp.GenesisInit(ctx, vldtrs, genesisAllocs, req.InitialHeight); err != nil { + if err := a.txApp.GenesisInit(ctx, vldtrs, genesisAllocs, req.InitialHeight, + a.cfg.GenesisAppHash); err != nil { return nil, fmt.Errorf("txApp.GenesisInit failed: %w", err) } @@ -496,16 +484,11 @@ func (a *AbciApp) InitChain(ctx context.Context, req *abciTypes.RequestInitChain valUpdates[i] = abciTypes.Ed25519ValidatorUpdate(validator.PubKey, validator.Power) } - err := a.metadataStore.SetAppHash(ctx, a.cfg.GenesisAppHash) - if err != nil { - return nil, fmt.Errorf("failed to set app hash: %v", err) - } - logger.Info("initialized chain", zap.String("app hash", fmt.Sprintf("%x", a.cfg.GenesisAppHash))) return &abciTypes.ResponseInitChain{ Validators: valUpdates, - AppHash: a.cfg.GenesisAppHash, + AppHash: a.cfg.GenesisAppHash, // doesn't matter what we gave the node in GenesisDoc, this is it }, nil } @@ -517,16 +500,8 @@ func (a *AbciApp) ApplySnapshotChunk(ctx context.Context, req *abciTypes.Request } if a.bootstrapper.IsDBRestored() { - err = a.metadataStore.SetAppHash(ctx, a.bootupState.appHash) - if err != nil { - return &abciTypes.ResponseApplySnapshotChunk{Result: abciTypes.ResponseApplySnapshotChunk_ABORT, RefetchChunks: nil}, nil - } - - err = a.metadataStore.SetBlockHeight(ctx, a.bootupState.height) - if err != nil { - return &abciTypes.ResponseApplySnapshotChunk{Result: abciTypes.ResponseApplySnapshotChunk_ABORT, RefetchChunks: nil}, nil - } - + // NOTE: when snapshot is implemented, the bootstrapper should be able + // to meta.SetChainState. a.log.Info("Bootstrapped database successfully") } return &abciTypes.ResponseApplySnapshotChunk{Result: abciTypes.ResponseApplySnapshotChunk_ACCEPT, RefetchChunks: nil}, nil @@ -906,73 +881,6 @@ func (a *AbciApp) Query(ctx context.Context, req *abciTypes.RequestQuery) (*abci return &abciTypes.ResponseQuery{}, nil } -// createNewAppHash updates the app hash by combining the previous app hash with -// the provided bytes. It persists the app hash to the metadata store. -func (a *AbciApp) createNewAppHash(ctx context.Context, addition []byte) ([]byte, error) { - oldHash, err := a.metadataStore.GetAppHash(ctx) - if err != nil { - return nil, err - } - - newHash := crypto.Sha256(append(oldHash, addition...)) - - err = a.metadataStore.SetAppHash(ctx, newHash) - return newHash, err -} - -// TODO: here should probably be other apphash computations such as the genesis -// config digest. The cmd/kwild/config package should probably not -// contain consensus-critical computations. - -var ( - appHashKey = []byte("a") - blockHeightKey = []byte("b") -) - -type metadataStore struct { - kv KVStore -} - -func (m *metadataStore) GetAppHash(ctx context.Context) ([]byte, error) { - res, err := m.kv.Get(appHashKey) - if err == kv.ErrKeyNotFound { - return nil, nil - } - return res, err -} - -func (m *metadataStore) SetAppHash(ctx context.Context, appHash []byte) error { - return m.kv.Set(appHashKey, appHash) -} - -func (m *metadataStore) GetBlockHeight(ctx context.Context) (int64, error) { - height, err := m.kv.Get(blockHeightKey) - if err == kv.ErrKeyNotFound { - return 0, nil - } - if err != nil { - return 0, err - } - - return int64(binary.BigEndian.Uint64(height)), nil -} - -func (m *metadataStore) SetBlockHeight(ctx context.Context, height int64) error { - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, uint64(height)) - - return m.kv.Set(blockHeightKey, buf) -} - -func (m *metadataStore) IncrementBlockHeight(ctx context.Context) error { - height, err := m.GetBlockHeight(ctx) - if err != nil { - return err - } - - return m.SetBlockHeight(ctx, height+1) -} - type EventBroadcaster func(ctx context.Context, proposer []byte) error func (a *AbciApp) SetEventBroadcaster(fn EventBroadcaster) { diff --git a/internal/abci/abci_test.go b/internal/abci/abci_test.go index c1cf62f2a..da2c53dd2 100644 --- a/internal/abci/abci_test.go +++ b/internal/abci/abci_test.go @@ -411,10 +411,15 @@ func (m *mockTxApp) Finalize(ctx context.Context, blockHeight int64) (apphash [] return nil, nil, nil } -func (m *mockTxApp) GenesisInit(ctx context.Context, validators []*types.Validator, accounts []*types.Account, initialHeight int64) error { +func (m *mockTxApp) GenesisInit(ctx context.Context, validators []*types.Validator, accounts []*types.Account, + initialHeight int64, appHash []byte) error { return nil } +func (m *mockTxApp) ChainInfo(ctx context.Context) (height int64, appHash []byte, err error) { + return 1, nil, nil +} + func (m *mockTxApp) GetValidators(ctx context.Context) ([]*types.Validator, error) { return nil, nil } diff --git a/internal/abci/interfaces.go b/internal/abci/interfaces.go index 94b2f37ca..9c5615499 100644 --- a/internal/abci/interfaces.go +++ b/internal/abci/interfaces.go @@ -15,12 +15,6 @@ import ( "github.com/kwilteam/kwil-db/internal/txapp" ) -// KVStore is an interface for a basic key-value store -type KVStore interface { - Get(key []byte) ([]byte, error) - Set(key []byte, value []byte) error -} - // SnapshotModule is an interface for a struct that implements snapshotting type SnapshotModule interface { // Checks if databases are to be snapshotted at a particular height @@ -52,12 +46,12 @@ type DBBootstrapModule interface { // It has methods for beginning and ending blocks, applying transactions, // and managing a mempool type TxApp interface { - // accounts -> string([]accound_identifier) : *big.Int(balance) - GenesisInit(ctx context.Context, validators []*types.Validator, accounts []*types.Account, initialHeight int64) error + GenesisInit(ctx context.Context, validators []*types.Validator, accounts []*types.Account, initialHeight int64, appHash []byte) error + ChainInfo(ctx context.Context) (int64, []byte, error) ApplyMempool(ctx context.Context, tx *transactions.Transaction) error // Begin signals that a new block has begun. Begin(ctx context.Context) error - Finalize(ctx context.Context, blockHeight int64) (apphash []byte, validatorUpgrades []*types.Validator, err error) + Finalize(ctx context.Context, blockHeight int64) (appHash []byte, validatorUpgrades []*types.Validator, err error) Commit(ctx context.Context) error Execute(ctx txapp.TxContext, tx *transactions.Transaction) *txapp.TxResponse ProposerTxs(ctx context.Context, txNonce uint64, maxTxSize int64, proposerAddr []byte) ([][]byte, error) diff --git a/internal/abci/meta/meta.go b/internal/abci/meta/meta.go new file mode 100644 index 000000000..0d835885d --- /dev/null +++ b/internal/abci/meta/meta.go @@ -0,0 +1,94 @@ +// Package meta defines a chain metadata store for the ABCI application. Prior +// to using the methods, the tables should be initialized and updated to the +// latest schema version with InitializeMetaStore. +package meta + +import ( + "context" + "fmt" + "slices" + + "github.com/kwilteam/kwil-db/common/sql" + "github.com/kwilteam/kwil-db/internal/sql/versioning" +) + +const ( + chainSchemaName = `kwild_chain` + + chainStoreVersion = 0 + + initChainTable = `CREATE TABLE IF NOT EXISTS ` + chainSchemaName + `.chain ( + height INT8 NOT NULL, + app_hash BYTEA + );` // no primary key, only one row + + insertChainState = `INSERT INTO ` + chainSchemaName + `.chain ` + + `VALUES ($1, $2);` + + setChainState = `UPDATE ` + chainSchemaName + `.chain ` + + `SET height = $1, app_hash = $2;` + + getChainState = `SELECT height, app_hash FROM ` + chainSchemaName + `.chain;` +) + +func initTables(ctx context.Context, tx sql.DB) error { + _, err := tx.Execute(ctx, initChainTable) + return err +} + +// InitializeMetaStore initializes the chain metadata store schema. +func InitializeMetaStore(ctx context.Context, db sql.DB) error { + upgradeFns := map[int64]versioning.UpgradeFunc{ + 0: initTables, + } + + return versioning.Upgrade(ctx, db, chainSchemaName, upgradeFns, chainStoreVersion) +} + +// GetChainState returns height and app hash from the chain state store. +// If there is no recorded data, height will be -1 and app hash nil. +func GetChainState(ctx context.Context, db sql.Executor) (int64, []byte, error) { + res, err := db.Execute(ctx, getChainState) + if err != nil { + return 0, nil, err + } + + switch n := len(res.Rows); n { + case 0: + return -1, nil, nil // fresh DB + case 1: + default: + return 0, nil, fmt.Errorf("expected at most one row, got %d", n) + } + + row := res.Rows[0] + if len(row) != 2 { + return 0, nil, fmt.Errorf("expected two columns, got %d", len(row)) + } + + height, ok := sql.Int64(row[0]) + if !ok { + return 0, nil, fmt.Errorf("invalid type for height (%T)", res.Rows[0][0]) + } + + appHash, ok := row[1].([]byte) + if !ok { + return 0, nil, fmt.Errorf("expected bytes for apphash, got %T", row[1]) + } + + return height, slices.Clone(appHash), nil +} + +// SetChainState will update the current height and app hash. +func SetChainState(ctx context.Context, db sql.Executor, height int64, appHash []byte) error { + // attempt UPDATE + res, err := db.Execute(ctx, setChainState, height, appHash) + if err != nil { + return err + } + // If no rows updated, meaning empty table, do INSERT + if res.Status.RowsAffected == 0 { + _, err = db.Execute(ctx, insertChainState, height, appHash) + } + return err +} diff --git a/internal/kv/_atomic/committer.go b/internal/kv/_atomic/committer.go deleted file mode 100644 index 5781e5636..000000000 --- a/internal/kv/_atomic/committer.go +++ /dev/null @@ -1,231 +0,0 @@ -package atomic - -import ( - "context" - "crypto/sha256" - "encoding/binary" - "fmt" - - "github.com/kwilteam/kwil-db/core/types" -) - -// This package is using big endian byte ordering to encode integer values such -// as data lengths. This convention should not be changed. The slice -// serialization convention defined by Kwil's SerializeSlice and -// DeserializeSlice must also be used. If either of these aspects are changed, -// the value returned by ID will be different. - -func uint16ToBytes(i uint16) []byte { - bts := make([]byte, 2) - binary.BigEndian.PutUint16(bts, i) - return bts -} - -var bytesToUint16 = binary.BigEndian.Uint16 - -type kvOperation uint8 - -const ( - kvOperationSet kvOperation = iota - kvOperationDelete -) - -// keyValue is a basic struct containing keys and values -// it can be quickly serialized and deserialized, which is -// used for writing to and receiving data from the AtomicCommitter -type keyValue struct { - Operation kvOperation - // Key cannot be longer than 65535 bytes - Key []byte - Value []byte // value can be any length -} - -// MarshalBinary appends the length of the key, the key, and the value -func (k *keyValue) MarshalBinary() ([]byte, error) { - var buf []byte - - // write operation - buf = append(buf, byte(k.Operation)) - - buf = append(buf, uint16ToBytes(uint16(len(k.Key)))...) - buf = append(buf, k.Key...) - - // write value - buf = append(buf, k.Value...) - - return buf, nil -} - -// UnmarshalBinary reads the length of the key, the key, and the value -func (k *keyValue) UnmarshalBinary(data []byte) error { - if len(data) < 3 { - return fmt.Errorf("data too short") - } - // read operation - k.Operation = kvOperation(data[0]) - - // read key length and key - keyLen := bytesToUint16(data[1:3]) - if len(data) < int(keyLen)+3 { - return fmt.Errorf("data too short") - } - k.Key = data[3 : keyLen+3] - - // read value - k.Value = data[keyLen+3:] - return nil -} - -/* - The below implements the sessions.Committable interface -*/ - -func (k *AtomicKV) BeginCommit(ctx context.Context) error { - k.mu.Lock() - defer k.mu.Unlock() - - if k.inSession { - return ErrSessionActive - } - k.inSession = true - return nil -} - -func (k *AtomicKV) EndCommit(ctx context.Context, appender func([]byte) error) (err error) { - k.mu.Lock() - defer k.mu.Unlock() - - if !k.inSession { - return ErrSessionNotActive - } - k.inSession = false - - // flush uncommitted data - bts, err := k.flushUncommittedData() - if err != nil { - return err - } - - // append the data to the appender - if err := appender(bts); err != nil { - return err - } - - // return the commit id - return nil -} - -func (k *AtomicKV) BeginApply(ctx context.Context) error { - k.mu.Lock() - defer k.mu.Unlock() - - if k.currentTx != nil { - return ErrTxnActive - } - - k.currentTx = k.db.BeginTransaction() - - return nil -} - -func (k *AtomicKV) Apply(ctx context.Context, changes []byte) error { - k.mu.Lock() - defer k.mu.Unlock() - - if k.currentTx == nil { - return ErrTxnNotActive - } - - // deserialize the changes - values, err := types.DeserializeSlice[*keyValue](changes, func() *keyValue { - return &keyValue{} - }) - if err != nil { - return err - } - - // apply the changes - for _, v := range values { - var err error - switch v.Operation { - case kvOperationSet: - err = k.currentTx.Set(v.Key, v.Value) - case kvOperationDelete: - err = k.currentTx.Delete(v.Key) - default: - err = fmt.Errorf("unknown operation") - } - if err != nil { - return err - } - } - - return nil -} - -func (k *AtomicKV) EndApply(ctx context.Context) error { - k.mu.Lock() - defer k.mu.Unlock() - - if k.currentTx == nil { - return ErrTxnNotActive - } - - err := k.currentTx.Commit() - if err != nil { - return err - } - - k.currentTx = nil - - return nil -} - -func (k *AtomicKV) Cancel(ctx context.Context) error { - k.mu.Lock() - defer k.mu.Unlock() - - if k.currentTx != nil { - k.currentTx.Discard() - } - - k.currentTx = nil - k.inSession = false - k.uncommittedData = make([]*keyValue, 0) - return nil -} - -func (k *AtomicKV) ID(ctx context.Context) ([]byte, error) { - k.mu.Lock() - defer k.mu.Unlock() - - if !k.inSession { - return nil, ErrSessionNotActive - } - - hash := sha256.New() - for _, v := range k.uncommittedData { - bts, err := v.MarshalBinary() - if err != nil { - return nil, err - } - - _, err = hash.Write(bts) - if err != nil { - return nil, err - } - } - - return hash.Sum(nil), nil -} - -// flushUncommittedData returns the uncommitted data and clears the uncommitted data -func (k *AtomicKV) flushUncommittedData() ([]byte, error) { - bts, err := types.SerializeSlice(k.uncommittedData) - if err != nil { - return nil, err - } - - k.uncommittedData = make([]*keyValue, 0) - return bts, nil -} diff --git a/internal/kv/_atomic/errors.go b/internal/kv/_atomic/errors.go deleted file mode 100644 index 1e9c3ad3f..000000000 --- a/internal/kv/_atomic/errors.go +++ /dev/null @@ -1,10 +0,0 @@ -package atomic - -import "errors" - -var ( - ErrSessionActive = errors.New("session already active") - ErrSessionNotActive = errors.New("session not active") - ErrTxnActive = errors.New("transaction already active") - ErrTxnNotActive = errors.New("transaction not active") -) diff --git a/internal/kv/_atomic/kv.go b/internal/kv/_atomic/kv.go deleted file mode 100644 index 27ffa67d0..000000000 --- a/internal/kv/_atomic/kv.go +++ /dev/null @@ -1,104 +0,0 @@ -package atomic - -import ( - "sync" - - "github.com/kwilteam/kwil-db/internal/kv" - "github.com/kwilteam/kwil-db/internal/sessions" -) - -// NewAtomicKV creates a new atomic key value store. -// It is compatible with Kwil's 2pc protocol. -func NewAtomicKV(store kv.KVStore) (*AtomicKV, error) { - // return an interface for newPrefix - return &AtomicKV{ - baseKV: &baseKV{ - db: store, - inSession: false, - uncommittedData: []*keyValue{}, - }, - prefix: []byte{}, - }, nil -} - -// AtomicKV is a threadsafe, linearizable key/value store. -type AtomicKV struct { - *baseKV - // prefix is the prefix for this store. - prefix []byte -} - -type baseKV struct { - mu sync.RWMutex - - // db is the underlying badger database. - // you should not write to this directly. - db kv.KVStore - - // inSession is true if we are currently in a session. - // if in a session, data is not written to the database, - // but instead stored to be written when the session is committed. - inSession bool - - //uncommittedData is a map of keys to values that have not yet been committed. - uncommittedData []*keyValue - - // currentTxn is the current transaction. - currentTx kv.Transaction -} - -var _ sessions.Committable = &AtomicKV{} -var _ kv.KVWriter = &AtomicKV{} - -// Write writes a key/value pair to the database. -func (k *AtomicKV) Set(key []byte, value []byte) error { - k.mu.Lock() - defer k.mu.Unlock() - - if k.inSession { - k.uncommittedData = append(k.uncommittedData, &keyValue{ - Operation: kvOperationSet, - Key: append(k.prefix, key...), - Value: value, - }) - return nil - } - - return k.db.Set(append(k.prefix, key...), value) -} - -// Read reads a key from the database. -func (k *AtomicKV) Get(key []byte) ([]byte, error) { - k.mu.RLock() - defer k.mu.RUnlock() - - return k.db.Get(append(k.prefix, key...)) -} - -// Delete deletes a key from the database. -func (k *AtomicKV) Delete(key []byte) error { - k.mu.Lock() - defer k.mu.Unlock() - - if k.inSession { - k.uncommittedData = append(k.uncommittedData, &keyValue{ - Operation: kvOperationDelete, - Key: append(k.prefix, key...), - }) - return nil - } - - return k.db.Delete(append(k.prefix, key...)) -} - -// NewPrefix creates a new writer with a prefix for the current store. -func (k *AtomicKV) NewPrefix(prefix []byte) *AtomicKV { - return &AtomicKV{ - baseKV: &baseKV{ - db: k.db, - inSession: k.inSession, - uncommittedData: k.uncommittedData, - }, - prefix: append(k.prefix, prefix...), - } -} diff --git a/internal/kv/_atomic/kv_test.go b/internal/kv/_atomic/kv_test.go deleted file mode 100644 index 6050606fd..000000000 --- a/internal/kv/_atomic/kv_test.go +++ /dev/null @@ -1,197 +0,0 @@ -package atomic_test - -import ( - "context" - "testing" - - "github.com/kwilteam/kwil-db/internal/kv" - "github.com/kwilteam/kwil-db/internal/kv/atomic" - kvTesting "github.com/kwilteam/kwil-db/internal/kv/atomic/testing" - "github.com/stretchr/testify/assert" -) - -// tests basic KV functionality; anything that is not the sessions.Committable interface -func Test_BasicKV(t *testing.T) { - dbType := kvTesting.TestKVFlagInMemory - type testCase struct { - name string - testFunc func(t *testing.T, db *atomic.AtomicKV) - } - - testCases := []testCase{ - { - name: "successful set, get, delete", - testFunc: func(t *testing.T, db *atomic.AtomicKV) { - err := db.Set(ser("key"), ser("value")) - assert.NoError(t, err) - - value, err := db.Get(ser("key")) - assert.NoError(t, err) - - assert.Equal(t, ser("value"), value) - - err = db.Delete(ser("key")) - assert.NoError(t, err) - - _, err = db.Get(ser("key")) - assert.ErrorIs(t, err, kv.ErrKeyNotFound) - }, - }, - { - name: "nested prefixes can manually append to the key", - testFunc: func(t *testing.T, db *atomic.AtomicKV) { - prefDb := db.NewPrefix(ser("prefix")) - doublePrefDb := prefDb.NewPrefix(ser("prefix2")) - - err := doublePrefDb.Set(ser("key"), ser("value")) - assert.NoError(t, err) - - value, err := doublePrefDb.Get(ser("key")) - assert.NoError(t, err) - value2, err := prefDb.Get(ser("prefix2", "key")) - assert.NoError(t, err) - value3, err := db.Get(ser("prefix", "prefix2", "key")) - assert.NoError(t, err) - - assert.Equal(t, value, value2) - assert.Equal(t, value2, value3) - - err = doublePrefDb.Delete(ser("key")) - assert.NoError(t, err) - - _, err = doublePrefDb.Get(ser("key")) - assert.ErrorIs(t, err, kv.ErrKeyNotFound) - _, err = prefDb.Get(ser("prefix2", "key")) - assert.ErrorIs(t, err, kv.ErrKeyNotFound) - _, err = db.Get(ser("prefix", "prefix2", "key")) - assert.ErrorIs(t, err, kv.ErrKeyNotFound) - }, - }, - { - name: "successful atomic commit", - testFunc: func(t *testing.T, db *atomic.AtomicKV) { - ctx := context.Background() - - // begin with some data - err := db.Set([]byte("original"), []byte("value")) - assert.NoError(t, err) - - err = db.BeginCommit(ctx) - assert.NoError(t, err) - - err = db.Set([]byte("key"), []byte("value")) - assert.NoError(t, err) - - err = db.Delete([]byte("original")) - assert.NoError(t, err) - - appender := newCommitAppender() - - testIds(t, db) - - err = db.EndCommit(ctx, appender.Append) - assert.NoError(t, err) - - err = db.BeginApply(ctx) - assert.NoError(t, err) - - // there should only be one, but this may change - for _, commit := range appender.commits { - err = db.Apply(ctx, commit) - assert.NoError(t, err) - } - - err = db.EndApply(ctx) - assert.NoError(t, err) - - value, err := db.Get([]byte("key")) - assert.NoError(t, err) - - assert.Equal(t, []byte("value"), value) - - _, err = db.Get([]byte("original")) - assert.ErrorIs(t, err, kv.ErrKeyNotFound) - }, - }, - { - name: "canceled atomic commit", - testFunc: func(t *testing.T, db *atomic.AtomicKV) { - ctx := context.Background() - err := db.BeginCommit(ctx) - assert.NoError(t, err) - - err = db.Set([]byte("key"), []byte("value")) - assert.NoError(t, err) - - appender := newCommitAppender() - - err = db.EndCommit(ctx, appender.Append) - assert.NoError(t, err) - - err = db.BeginApply(ctx) - assert.NoError(t, err) - - // there should only be one, but this may change - for _, commit := range appender.commits { - err = db.Apply(ctx, commit) - assert.NoError(t, err) - } - - db.Cancel(ctx) - - _, err = db.Get([]byte("key")) - assert.ErrorIs(t, err, kv.ErrKeyNotFound) - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - db, td, err := kvTesting.OpenTestKv(context.Background(), "kv_test", dbType) - if err != nil { - t.Fatal(err) - } - defer td() - - tc.testFunc(t, db) - }) - } -} - -func testIds(t *testing.T, db *atomic.AtomicKV) { - ctx := context.Background() - lastId, err := db.ID(ctx) - assert.NoError(t, err) - - for i := 0; i < 100; i++ { - id, err := db.ID(ctx) - assert.NoError(t, err) - - assert.Equal(t, lastId, id) - lastId = id - } -} - -func newCommitAppender() *commitAppender { - return &commitAppender{ - commits: make([][]byte, 0), - } -} - -type commitAppender struct { - commits [][]byte -} - -func (c *commitAppender) Append(v []byte) error { - c.commits = append(c.commits, v) - return nil -} - -// ser is a helper function to convert a list of strings to a byte slice -func ser(str ...string) []byte { - var result []byte - for _, s := range str { - result = append(result, []byte(s)...) - } - return result -} diff --git a/internal/kv/_atomic/testing/testing.go b/internal/kv/_atomic/testing/testing.go deleted file mode 100644 index 7082cf3d6..000000000 --- a/internal/kv/_atomic/testing/testing.go +++ /dev/null @@ -1,43 +0,0 @@ -package testing - -import ( - "context" - "fmt" - - "github.com/kwilteam/kwil-db/internal/kv/atomic" - badgerTesting "github.com/kwilteam/kwil-db/internal/kv/badger/testing" - kvTesting "github.com/kwilteam/kwil-db/internal/kv/testing" -) - -type TestKVFlag uint8 - -const ( - TestKVFlagInMemory TestKVFlag = iota - TestKVFlagBadger -) - -// OpenTestKv opens a new test kv store -// It returns a teardown function. If a teardown -// function is not necessary, it does nothing -func OpenTestKv(ctx context.Context, name string, flag TestKVFlag) (*atomic.AtomicKV, func() error, error) { - - switch flag { - case TestKVFlagInMemory: - fn := func() error { - return nil - } - - db, err := atomic.NewAtomicKV(kvTesting.NewMemoryKV()) - return db, fn, err - case TestKVFlagBadger: - badgerDB, td, err := badgerTesting.NewTestBadgerDB(ctx, name, nil) - if err != nil { - return nil, nil, err - } - - db, err := atomic.NewAtomicKV(badgerDB) - return db, td, err - default: - return nil, nil, fmt.Errorf("unknown flag: %d", flag) - } -} diff --git a/internal/kv/badger/db.go b/internal/kv/badger/db.go index fdc6696ca..00aa37c65 100644 --- a/internal/kv/badger/db.go +++ b/internal/kv/badger/db.go @@ -6,10 +6,11 @@ import ( "sync" "time" - badger "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v3" + "go.uber.org/zap" + "github.com/kwilteam/kwil-db/core/log" "github.com/kwilteam/kwil-db/internal/kv" - "go.uber.org/zap" ) // NewBadgerDB creates a new BadgerDB. diff --git a/internal/sql/pg/db.go b/internal/sql/pg/db.go index ea59e4f5e..6b51a58f5 100644 --- a/internal/sql/pg/db.go +++ b/internal/sql/pg/db.go @@ -45,6 +45,14 @@ type DB struct { autoCommit bool // skip the explicit transaction (begin/commit automatically) tx pgx.Tx // interface txid string // uid of the prepared transaction + + // NOTE: this was initially designed for a single ongoing write transaction, + // held in the tx field, and the (*DB).Execute method using it *implicitly*. + // We have moved toward using the Execute method of the transaction returned + // by BeginTx/BeginOuterTx/BeginReadTx, and we can potentially allow + // multiple uncommitted write transactions to support a 2 phase commit of + // different stores using the same pg.DB instance. This will take refactoring + // of the DB and concrete transaction type methods. } // DBConfig is the configuration for the Kwil DB backend, which includes the @@ -324,7 +332,7 @@ func (db *DB) precommit(ctx context.Context) ([]byte, error) { } } -// commit is called from the Commit method of the sql.Tx (or sql.TxCloser) +// commit is called from the Commit method of the sql.Tx // returned from BeginTx (or Begin). See tx.go. func (db *DB) commit(ctx context.Context) error { db.mtx.Lock() @@ -366,7 +374,7 @@ func (db *DB) commit(ctx context.Context) error { return nil } -// rollback is called from the Rollback method of the sql.Tx (or sql.TxCloser) +// rollback is called from the Rollback method of the sql.Tx // returned from BeginTx (or Begin). See tx.go. func (db *DB) rollback(ctx context.Context) error { db.mtx.Lock() @@ -427,6 +435,8 @@ func (db *DB) Execute(ctx context.Context, stmt string, args ...any) (*sql.Resul db.mtx.Lock() defer db.mtx.Unlock() + // NOTE: if we remove the db.tx field, we'd have this top level method + // always function in "autoCommit" mode, with an ephemeral transaction. if db.tx != nil { if db.autoCommit { return nil, errors.New("tx already created, cannot use auto commit") diff --git a/internal/sql/versioning/versioning.go b/internal/sql/versioning/versioning.go index 25419406a..1be90199a 100644 --- a/internal/sql/versioning/versioning.go +++ b/internal/sql/versioning/versioning.go @@ -12,8 +12,17 @@ var ( ErrTargetVersionTooLow = fmt.Errorf("target version is lower than current version") ) -// EnsureVersionTableExists ensures that the version table exists in the database. -// If the table does not exist, it will be created, and the first version will be set to version specified. +const ( + // preVersion is the value inserted into a fresh version table with + // sqlEnsureVersionExists. This is used to ensure that the "upgrade" to + // version 0, which is usually just schema table initialization, defined + // per-store is executed. + preVersion = -1 +) + +// ensureVersionTableExists ensures that the version table exists in the +// database. If the table does not exist, it will be created, and the first +// version will be set to -1. func ensureVersionTableExists(ctx context.Context, db sql.TxMaker, schema string) error { tx, err := db.BeginTx(ctx) if err != nil { @@ -32,7 +41,7 @@ func ensureVersionTableExists(ctx context.Context, db sql.TxMaker, schema string } // Ensure that the version exists. If it does not, insert it with the target version. - if _, err = tx.Execute(ctx, fmt.Sprintf(sqlEnsureVersionExists, schema), -1); err != nil { + if _, err = tx.Execute(ctx, fmt.Sprintf(sqlEnsureVersionExists, schema), preVersion); err != nil { return err } diff --git a/internal/txapp/interfaces.go b/internal/txapp/interfaces.go index 1fda31390..0ffc9bd6d 100644 --- a/internal/txapp/interfaces.go +++ b/internal/txapp/interfaces.go @@ -5,6 +5,7 @@ import ( "github.com/kwilteam/kwil-db/common/sql" "github.com/kwilteam/kwil-db/core/types" + "github.com/kwilteam/kwil-db/internal/abci/meta" "github.com/kwilteam/kwil-db/internal/accounts" "github.com/kwilteam/kwil-db/internal/events" "github.com/kwilteam/kwil-db/internal/voting" @@ -55,8 +56,10 @@ var ( approveResolution = voting.ApproveResolution getVoterPower = voting.GetValidatorPower hasVoted = voting.HasVoted - getDBHeight = voting.GetHeight - updateDBHeight = voting.SetHeight + + // chain metadata + getChainState = meta.GetChainState + setChainState = meta.SetChainState // account functions getAccount = accounts.GetAccount diff --git a/internal/txapp/txapp.go b/internal/txapp/txapp.go index 843349d2d..5c1786213 100644 --- a/internal/txapp/txapp.go +++ b/internal/txapp/txapp.go @@ -4,15 +4,17 @@ package txapp import ( "bytes" "context" + "encoding/hex" "errors" "fmt" "math/big" - "sort" + "slices" "go.uber.org/zap" "github.com/kwilteam/kwil-db/common" "github.com/kwilteam/kwil-db/common/sql" + "github.com/kwilteam/kwil-db/core/crypto" "github.com/kwilteam/kwil-db/core/crypto/auth" "github.com/kwilteam/kwil-db/core/log" "github.com/kwilteam/kwil-db/core/types" @@ -24,14 +26,17 @@ import ( ) // NewTxApp creates a new router. -func NewTxApp(db DB, engine common.Engine, - signer *auth.Ed25519Signer, events Rebroadcaster, chainID string, GasEnabled bool, extensionConfigs map[string]map[string]string, log log.Logger) *TxApp { +func NewTxApp(db DB, engine common.Engine, signer *auth.Ed25519Signer, + events Rebroadcaster, chainID string, GasEnabled bool, extensionConfigs map[string]map[string]string, log log.Logger) (*TxApp, error) { voteBodyTxSize, err := computeEmptyVoteBodyTxSize(chainID) if err != nil { - log.Error("failed to compute empty vote body tx size", zap.Error(err)) + return nil, fmt.Errorf("failed to compute empty vote body tx size: %w", err) } - return &TxApp{ + resTypes := resolutions.ListResolutions() + slices.Sort(resTypes) + + t := &TxApp{ Database: db, Engine: engine, events: events, @@ -45,7 +50,13 @@ func NewTxApp(db DB, engine common.Engine, GasEnabled: GasEnabled, extensionConfigs: extensionConfigs, emptyVoteBodyTxSize: voteBodyTxSize, + resTypes: resTypes, + } + t.height, t.appHash, err = t.ChainInfo(context.TODO()) + if err != nil { + return nil, fmt.Errorf("failed to get chain info: %w", err) } + return t, nil } // TxApp maintains the state for Kwil's ABCI application. @@ -55,9 +66,9 @@ func NewTxApp(db DB, engine common.Engine, type TxApp struct { Database DB // postgres database Engine common.Engine // tracks deployed schemas - // Accounts AccountsStore // accounts - // Validators ValidatorStore // validators - // VoteStore VoteStore // tracks resolutions, their votes, manages expiration + // The various internal stores (accounts, votes, etc.) are accessed through + // the Database via the functions defined in relevant packages. + GasEnabled bool events Rebroadcaster @@ -68,6 +79,11 @@ type TxApp struct { mempool *mempool + // appHash is the last block's apphash, set for genesis in GenesisInit + // updated in FinalizeBlock by combining with new engine hash. + appHash []byte + height int64 + // transaction that exists between Begin and Commit currentTx sql.OuterTx @@ -78,25 +94,30 @@ type TxApp struct { // For more information, see: https://github.com/cometbft/cometbft/issues/203 // genesisTx is the transaction that is used to apply the genesis state changes // along with the updates by the transactions in the first block. - genesisTx sql.OuterTx + genesisTx sql.OuterTx + extensionConfigs map[string]map[string]string // precomputed variables emptyVoteBodyTxSize int64 + resTypes []string } // GenesisInit initializes the TxApp. It must be called outside of a session, // and before any session is started. // It can assign the initial validator set and initial account balances. // It is only called once for a new chain. -func (r *TxApp) GenesisInit(ctx context.Context, validators []*types.Validator, genesisAccounts []*types.Account, initialHeight int64) error { +func (r *TxApp) GenesisInit(ctx context.Context, validators []*types.Validator, genesisAccounts []*types.Account, + initialHeight int64, genesisAppHash []byte) error { tx, err := r.Database.BeginOuterTx(ctx) if err != nil { return err } r.genesisTx = tx - height, err := getDBHeight(ctx, tx) + // With the genesisTx not being committed until the first FinalizeBlock, we + // expect no existing chain state in the application (postgres). + height, appHash, err := getChainState(ctx, tx) if err != nil { err2 := tx.Rollback(ctx) if err2 != nil { @@ -105,12 +126,16 @@ func (r *TxApp) GenesisInit(ctx context.Context, validators []*types.Validator, return fmt.Errorf("error getting database height: %s", err.Error()) } - if height == -1 { - r.log.Info("GenesisInit: starting with empty database") - } else if initialHeight-1 != height { - return fmt.Errorf("genesisInit: expected database to be at height %d, got %d", initialHeight-1, height) + // First app hash and height are stored in FinalizeBlock for first block. + if height != -1 { + return fmt.Errorf("expected database to be uninitialized, but had height %d", height) + } + if len(appHash) != 0 { + return fmt.Errorf("expected NULL app hash, got %x", appHash) } + r.appHash = genesisAppHash // combined with first block's apphash and stored in FinalizeBlock + // Add Genesis Validators for _, validator := range validators { err := setVoterPower(ctx, tx, validator.PubKey, validator.Power) @@ -137,6 +162,33 @@ func (r *TxApp) GenesisInit(ctx context.Context, validators []*types.Validator, return nil } +// ChainInfo is called be the ABCI applications' Info method, which is called +// once on startup except when the node is at genesis, in which case GenesisInit +// is called by the application's ChainInit method. +func (r *TxApp) ChainInfo(ctx context.Context) (int64, []byte, error) { + tx, err := r.Database.BeginReadTx(ctx) + if err != nil { + return 0, nil, err + } + defer tx.Rollback(ctx) + + // MAYBE: return r.height, r.appHash from the exported method and only query + // the DB from an unexported method that c'tor uses. Needs mutex. Hitting DB + // always may also be good to ensure the exported method gets committed. + + // return getChainState(ctx, tx) + height, appHash, err := getChainState(ctx, tx) + if err != nil { + return 0, nil, err + } + // r.log.Debug("ChainInfo", log.Int("height", height), log.String("appHash", hex.EncodeToString(appHash)), + // log.Int("height_x", r.height), log.String("appHash_x", hex.EncodeToString(r.appHash))) + if height == -1 { + height = 0 // for ChainInfo caller, non-negative is expected for genesis + } + return height, appHash, nil +} + // UpdateValidator updates a validator's power. // It can only be called in between Begin and Finalize. // The value passed as power will simply replace the current power. @@ -210,7 +262,7 @@ func (r *TxApp) Execute(ctx TxContext, tx *transactions.Transaction) *TxResponse } // Begin signals that a new block has begun. -// It contains information on any validators whos power should be updated. +// It contains information on any validators whose power should be updated. func (r *TxApp) Begin(ctx context.Context) error { if r.currentTx != nil { return errors.New("txapp misuse: cannot begin a new block while a transaction is in progress") @@ -231,10 +283,9 @@ func (r *TxApp) Begin(ctx context.Context) error { return nil } -// Finalize signals that a block has been finalized. -// No more changes can be applied to the database, and TxApp should return -// information on the apphash and validator updates. -func (r *TxApp) Finalize(ctx context.Context, blockHeight int64) (apphash []byte, validatorUpgrades []*types.Validator, err error) { +// Finalize signals that a block has been finalized. No more changes can be +// applied to the database. It returns the apphash and the validator set. +func (r *TxApp) Finalize(ctx context.Context, blockHeight int64) (appHash []byte, finalValidators []*types.Validator, err error) { if r.currentTx == nil { return nil, nil, errors.New("txapp misuse: cannot finalize a block without a transaction in progress") } @@ -251,18 +302,29 @@ func (r *TxApp) Finalize(ctx context.Context, blockHeight int64) (apphash []byte } }() + r.log.Debug("Finalize(start)", log.Int("height", r.height), log.String("appHash", hex.EncodeToString(r.appHash))) + + if blockHeight != r.height+1 { + return nil, nil, fmt.Errorf("Finalize was expecting height %d, got %d", r.height+1, blockHeight) + } + err = r.processVotes(ctx, blockHeight) if err != nil { return nil, nil, err } - finalValidators, err := getAllVoters(ctx, r.currentTx) + finalValidators, err = getAllVoters(ctx, r.currentTx) if err != nil { return nil, nil, err } - // Update Height - err = updateDBHeight(ctx, r.currentTx, blockHeight) + // While still in the DB transaction, update to this next height but dummy + // app hash. If we crash before Commit can store the next app hash that we + // get after Precommit, the startup handshake's call to Info will detect the + // mismatch. That requires manual recovery (drop state and reapply), but it + // at least detects this recorded height rather than not recognizing that we + // have committed the data for this block at all. + err = setChainState(ctx, r.currentTx, blockHeight, []byte{0x42}) if err != nil { return nil, nil, err } @@ -272,7 +334,15 @@ func (r *TxApp) Finalize(ctx context.Context, blockHeight int64) (apphash []byte return nil, nil, err } - return engineHash, finalValidators, nil + r.appHash = crypto.Sha256(append(r.appHash, engineHash...)) + r.height = blockHeight + + r.log.Debug("Finalize(end)", log.Int("height", r.height), log.String("appHash", hex.EncodeToString(r.appHash))) + + // I'd really like to setChainState here with appHash, but we can't use + // currentTx for anything now except Commit. + + return r.appHash, finalValidators, nil } // processVotes confirms resolutions that have been approved by the network, @@ -280,9 +350,6 @@ func (r *TxApp) Finalize(ctx context.Context, blockHeight int64) (apphash []byte func (r *TxApp) processVotes(ctx context.Context, blockheight int64) error { credits := make(creditMap) - resolutionTypes := resolutions.ListResolutions() - sort.Strings(resolutionTypes) // for deterministic order - var finalizedIds []types.UUID // markedProcessedIds is a separate list for marking processed, since we do not want to process validator resolutions // validator vote IDs are not unique, so we cannot mark them as processed, in case a validator leaves and joins again @@ -295,7 +362,7 @@ func (r *TxApp) processVotes(ctx context.Context, blockheight int64) error { Resolution *resolutions.Resolution ResolveFunc func(ctx context.Context, app *common.App, resolution *resolutions.Resolution) error } - for _, resolutionType := range resolutionTypes { + for _, resolutionType := range r.resTypes { cfg, err := resolutions.GetResolution(resolutionType) if err != nil { return err @@ -461,13 +528,15 @@ func (c creditMap) applyResolution(res *resolutions.Resolution) { c[string(res.Proposer)] = big.NewInt(0).Add(currentBalance, bodyCost) } -// Commit signals that a block has been committed. +// Commit signals that a block's state changes should be committed. func (r *TxApp) Commit(ctx context.Context) error { if r.currentTx == nil { return errors.New("txapp misuse: cannot commit a block without a transaction in progress") } defer r.mempool.reset() + // r.log.Debug("Commit(start)", log.Int("height", r.height), log.String("appHash", hex.EncodeToString(r.appHash))) + err := r.currentTx.Commit(ctx) if err != nil { return err @@ -475,7 +544,28 @@ func (r *TxApp) Commit(ctx context.Context) error { r.currentTx = nil r.genesisTx = nil - return nil + + // Now we can store the app hash computed in FinalizeBlock after Precommit. + // Note that if we crash here, Info on startup will immediately detect an + // unexpected app hash since we've saved this height in the Commit above. + // While this takes manual recovery, it does not go undetected as if we had + // not updated to the new height in that Commit. We could improve this with + // some refactoring to pg.DB to allow multiple simultaneous uncommitted + // prepared transactions to make this an actual two-phase commit, but it is + // just a single row so the difference is minor. + ctx = context.Background() // don't let them cancel us now, we need consistency with consensus tx commit + tx, err := r.Database.BeginOuterTx(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + err = setChainState(ctx, tx, r.height, r.appHash) // unchanged height, known appHash + if err != nil { + return err + } + + return tx.Commit(ctx) // no Precommit for this one } // ApplyMempool applies the transactions in the mempool. diff --git a/internal/voting/sql.go b/internal/voting/sql.go index b9a479648..232e78271 100644 --- a/internal/voting/sql.go +++ b/internal/voting/sql.go @@ -7,7 +7,7 @@ package voting const ( votingSchemaName = `kwild_voting` - voteStoreVersion = 0 + voteStoreVersion = 1 // tableResolutions is the sql table used to store resolutions that can be voted on. // the vote_body_proposer is the BYTEA of the public key of the submitter, NOT the UUID @@ -51,15 +51,7 @@ const ( id BYTEA PRIMARY KEY );` - tableHeight = `CREATE TABLE IF NOT EXISTS ` + votingSchemaName + `.height ( - name TEXT PRIMARY KEY, -- name is 'height' - height INT NOT NULL - );` - - getHeight = `SELECT height FROM ` + votingSchemaName + `.height WHERE name = 'height';` - - updateHeight = `INSERT INTO ` + votingSchemaName + `.height (name, height) VALUES ('height', $1) - ON CONFLICT (name) DO UPDATE SET height = $1;` + dropHeightTable = `DROP TABLE IF EXISTS ` + votingSchemaName + `.height` // ensureResolutionIDExists is the sql statement used to ensure a resolution ID is present in the resolutions table ensureResolutionIDExists = `INSERT INTO ` + votingSchemaName + `.resolutions (id, expiration) VALUES ($1, $2) diff --git a/internal/voting/voting.go b/internal/voting/voting.go index 7a963e75b..c922f2db2 100644 --- a/internal/voting/voting.go +++ b/internal/voting/voting.go @@ -23,6 +23,7 @@ const ( func InitializeVoteStore(ctx context.Context, db sql.DB) error { upgradeFns := map[int64]versioning.UpgradeFunc{ 0: initTables, + 1: dropHeight, } err := versioning.Upgrade(ctx, db, votingSchemaName, upgradeFns, voteStoreVersion) @@ -33,10 +34,15 @@ func InitializeVoteStore(ctx context.Context, db sql.DB) error { return nil } +func dropHeight(ctx context.Context, db sql.DB) error { + _, err := db.Execute(ctx, dropHeightTable) + return err +} + func initTables(ctx context.Context, db sql.DB) error { initStmts := []string{ //createVotingSchema, tableVoters, tableResolutionTypes, tableResolutions, - resolutionsTypeIndex, tableProcessed, tableVotes, tableHeight} // order important + resolutionsTypeIndex, tableProcessed, tableVotes} // order important for _, stmt := range initStmts { _, err := db.Execute(ctx, stmt) @@ -334,12 +340,18 @@ func GetResolutionsByType(ctx context.Context, db sql.Executor, resType string) // DeleteResolutions deletes a slice of resolution IDs from the database. // It will mark the resolutions as processed in the processed table. func DeleteResolutions(ctx context.Context, db sql.Executor, ids ...types.UUID) error { + if len(ids) == 0 { + return nil + } _, err := db.Execute(ctx, deleteResolutions, types.UUIDArray(ids)) return err } // MarkProcessed marks a set of resolutions as processed. func MarkProcessed(ctx context.Context, db sql.Executor, ids ...types.UUID) error { + if len(ids) == 0 { + return nil + } _, err := db.Execute(ctx, markManyProcessed, types.UUIDArray(ids)) return err } @@ -589,27 +601,3 @@ func intDivUpFraction(val, numerator, divisor int64) int64 { tempNumerator.Add(tempNumerator, new(big.Int).Sub(divBig, big.NewInt(1))) return new(big.Int).Div(tempNumerator, divBig).Int64() } - -func GetHeight(ctx context.Context, db sql.Executor) (int64, error) { - res, err := db.Execute(ctx, getHeight) - if err != nil { - return 0, err - } - - // Fresh database - if len(res.Rows) != 1 { - return -1, nil - } - - height, ok := sql.Int64(res.Rows[0][0]) - if !ok { - return 0, fmt.Errorf("invalid type for height (%T)", res.Rows[0][0]) - } - - return height, nil -} - -func SetHeight(ctx context.Context, db sql.Executor, height int64) error { - _, err := db.Execute(ctx, updateHeight, height) - return err -}