Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

promoting changes to add support for read only db instance #429

Open
wants to merge 5 commits into
base: v0.25
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 155 additions & 13 deletions gorm/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,19 @@ type ctxKey int
var txnKey ctxKey

var (
ErrCtxTxnMissing = errors.New("Database transaction for request missing in context")
ErrCtxTxnNoDB = errors.New("Transaction in context, but DB is nil")
ErrCtxTxnMissing = errors.New("Database transaction for request missing in context")
ErrCtxTxnNoDB = errors.New("Transaction in context, but DB is nil")
ErrCtxTxnOptMismatch = errors.New("Transaction in context, but Txn opts are mismatched")
ErrCtxDBOptMismatch = errors.New("Transaction in context, but DB opts are mismatched")
)

// This is used to define various options to set/get the current database setup in the context
type dbType int

const (
dbNotSet dbType = iota
dbReadOnly
dbReadWrite
)

// NewContext returns a new Context that carries value txn.
Expand All @@ -45,10 +56,44 @@ func FromContext(ctx context.Context) (txn *Transaction, ok bool) {
type Transaction struct {
mu sync.Mutex
parent *gorm.DB
parentRO *gorm.DB
current *gorm.DB
currentOpts databaseOptions
afterCommitHook []func(context.Context)
}

type databaseOptions struct {
database dbType
txOpts *sql.TxOptions
}

type DatabaseOption func(*databaseOptions)

// WithRODB returns closure to set the readOnlyReplica flag
func WithRODB(readOnlyReplica bool) DatabaseOption {
akleinib marked this conversation as resolved.
Show resolved Hide resolved
return func(ops *databaseOptions) {
if readOnlyReplica == false {
ops.database = dbReadWrite
} else {
ops.database = dbReadOnly
}
}
}

// WithTxOptions returns a closure to set the TxOptions
func WithTxOptions(opts *sql.TxOptions) DatabaseOption {
return func(ops *databaseOptions) {
ops.txOpts = opts
}
}
func toDatabaseOptions(options ...DatabaseOption) *databaseOptions {
opts := &databaseOptions{}
for _, op := range options {
op(opts)
}
return opts
}

func NewTransaction(db *gorm.DB) Transaction {
return Transaction{parent: db}
}
Expand All @@ -57,23 +102,102 @@ func (t *Transaction) AddAfterCommitHook(hooks ...func(context.Context)) {
t.afterCommitHook = append(t.afterCommitHook, hooks...)
}

// BeginFromContext will extract transaction wrapper from context and start new transaction.
// getReadOnlyDBInstance returns the read only db txn if RO DB available otherwise it returns read/write db txn
func getReadOnlyDBTxn(ctx context.Context, opts *databaseOptions, txn *Transaction) (*gorm.DB, error) {
var db *gorm.DB
switch {
case txn.parentRO == nil:
return getReadWriteDBTxn(ctx, opts, txn)
case opts.txOpts != nil && txn.currentOpts.txOpts != nil:
if opts.txOpts.ReadOnly != txn.currentOpts.txOpts.ReadOnly || opts.txOpts.Isolation != txn.currentOpts.txOpts.Isolation {
return nil, ErrCtxTxnOptMismatch
}
default:
// We should error in two cases 1. We should error if read-only DB requested with read-write txn
// 2. If no txn options provided in previous call but provided in subsequent call
if opts.txOpts != nil {
if opts.txOpts.ReadOnly == false || txn.currentOpts.database != dbNotSet {
return nil, ErrCtxTxnOptMismatch
}
txnOpts := *opts.txOpts
txn.currentOpts.txOpts = &txnOpts
}
}
if txn.current != nil {
return txn.current, nil
}
db = txn.beginReadOnlyWithContextAndOptions(ctx, txn.currentOpts.txOpts)
if db.Error != nil {
return nil, db.Error
}
if txn.currentOpts.database == dbNotSet {
txn.currentOpts.database = dbReadOnly
}
return db, nil
}

// getReadWriteDBTxn returns the read/write db txn
func getReadWriteDBTxn(ctx context.Context, opts *databaseOptions, txn *Transaction) (*gorm.DB, error) {
var db *gorm.DB
switch {
case txn.parent == nil:
return nil, ErrCtxTxnNoDB
case opts.txOpts != nil && txn.currentOpts.txOpts != nil:
if opts.txOpts.ReadOnly != txn.currentOpts.txOpts.ReadOnly || opts.txOpts.Isolation != txn.currentOpts.txOpts.Isolation {
return nil, ErrCtxTxnOptMismatch
}
default:
if opts.txOpts != nil {
// We should return error If no txn options provided in previous call but provided in subsequent call
if txn.currentOpts.database != dbNotSet {
return nil, ErrCtxTxnOptMismatch
}
txnOpts := *opts.txOpts
txn.currentOpts.txOpts = &txnOpts
}
}
if txn.current != nil {
return txn.current, nil
}
db = txn.beginWithContextAndOptions(ctx, txn.currentOpts.txOpts)
if db.Error != nil {
return nil, db.Error
}
if txn.currentOpts.database == dbNotSet {
txn.currentOpts.database = dbReadWrite
}
return db, nil
}

// BeginFromContext will return read only db txn if readOnlyReplica flag is set otherwise it will extract transaction wrapper from context and start new transaction
// If readOnlyReplica flag is set and read only db is not available then it will check if a txn with read-write db is already in use then it will return a txn from ctx otherwise it will start a new txn with read/write db and return
// As result new instance of `*gorm.DB` will be returned.
// Error will be returned in case either transaction or db connection info is missing in context.
// Gorm specific error can be checked by `*gorm.DB.Error`.
func BeginFromContext(ctx context.Context) (*gorm.DB, error) {
func BeginFromContext(ctx context.Context, options ...DatabaseOption) (*gorm.DB, error) {
txn, ok := FromContext(ctx)
if !ok {
return nil, ErrCtxTxnMissing
}
if txn.parent == nil {
return nil, ErrCtxTxnNoDB
}
db := txn.beginWithContext(ctx)
if db.Error != nil {
return nil, db.Error
opts := toDatabaseOptions(options...)
akleinib marked this conversation as resolved.
Show resolved Hide resolved
switch opts.database {
case dbReadOnly:
if txn.currentOpts.database == dbReadWrite && txn.parentRO != nil {
return nil, ErrCtxDBOptMismatch
}
return getReadOnlyDBTxn(ctx, opts, txn)
case dbReadWrite:
if txn.currentOpts.database == dbReadOnly {
return nil, ErrCtxDBOptMismatch
}
return getReadWriteDBTxn(ctx, opts, txn)
default:
// This is the case to handle when no database options provided
if txn.currentOpts.database == dbReadOnly {
return getReadOnlyDBTxn(ctx, opts, txn)
}
return getReadWriteDBTxn(ctx, opts, txn)
}
return db, nil
}

// BeginWithOptionsFromContext will extract transaction wrapper from context and start new transaction,
Expand Down Expand Up @@ -119,6 +243,18 @@ func (t *Transaction) BeginWithOptions(opts *sql.TxOptions) *gorm.DB {
return t.beginWithContextAndOptions(context.Background(), opts)
}

// beginReadOnlyWithContextAndOptions will start a new transaction by calling `*gorm.DB.BeginTx() if no current transaction exist
func (t *Transaction) beginReadOnlyWithContextAndOptions(ctx context.Context, opts *sql.TxOptions) *gorm.DB {
t.mu.Lock()
defer t.mu.Unlock()

if t.current == nil {
t.current = t.parentRO.BeginTx(ctx, opts)
}

return t.current
}

func (t *Transaction) beginWithContextAndOptions(ctx context.Context, opts *sql.TxOptions) *gorm.DB {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down Expand Up @@ -173,15 +309,21 @@ func (t *Transaction) Commit(ctx context.Context) error {
// Client is responsible to call `txn.Begin()` to open transaction.
// If call of grpc.UnaryHandler returns with an error the transaction
// is aborted, otherwise committed.
func UnaryServerInterceptor(db *gorm.DB) grpc.UnaryServerInterceptor {
func UnaryServerInterceptor(db *gorm.DB, readOnlyDB ...*gorm.DB) grpc.UnaryServerInterceptor {
txn := &Transaction{parent: db}
if len(readOnlyDB) > 0 {
dbRO := readOnlyDB[0]
if dbRO != nil {
txn.parentRO = dbRO
}
}
return UnaryServerInterceptorTxn(txn)
}

func UnaryServerInterceptorTxn(txn *Transaction) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// Deep copy is necessary as a tansaction should be created per request.
txn := &Transaction{parent: txn.parent, afterCommitHook: txn.afterCommitHook}
txn := &Transaction{parent: txn.parent, parentRO: txn.parentRO, afterCommitHook: txn.afterCommitHook}
defer func() {
// simple panic handler
if perr := recover(); perr != nil {
Expand Down
Loading