Skip to content

Commit

Permalink
Add AsyncTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
geekgonecrazy committed Nov 7, 2024
1 parent cffee49 commit eabd9ab
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 17 deletions.
12 changes: 6 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,12 @@ func (c *Client) Session(opt ...*options.SessionOptions) (*Session, error) {
// - version of mongoDB server >= v4.0
// - Topology of mongoDB server is not Single
// At the same time, please pay attention to the following
// - make sure all operations in callback use the sessCtx as context parameter
// - if operations in callback takes more than(include equal) 120s, the operations will not take effect,
// - if operation in callback return qmgo.ErrTransactionRetry,
// the whole transaction will retry, so this transaction must be idempotent
// - if operations in callback return qmgo.ErrTransactionNotSupported,
// - If the ctx parameter already has a Session attached to it, it will be replaced by this session.
// - make sure all operations in callback use the sessCtx as context parameter
// - if operations in callback takes more than(include equal) 120s, the operations will not take effect,
// - if operation in callback return qmgo.ErrTransactionRetry,
// the whole transaction will retry, so this transaction must be idempotent
// - if operations in callback return qmgo.ErrTransactionNotSupported,
// - If the ctx parameter already has a Session attached to it, it will be replaced by this session.
func (c *Client) DoTransaction(ctx context.Context, callback func(sessCtx context.Context) (interface{}, error), opts ...*options.TransactionOptions) (interface{}, error) {
if !c.transactionAllowed() {

Check warning on line 322 in client.go

View check run for this annotation

qiniu-x / golangci-lint

client.go#L322

Function `transactionAllowed->ServerVersion` should pass the context parameter (contextcheck)
return nil, ErrTransactionNotSupported
Expand Down
43 changes: 32 additions & 11 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ type Session struct {
}

// StartTransaction starts transaction
//precondition:
//- version of mongoDB server >= v4.0
//- Topology of mongoDB server is not Single
//At the same time, please pay attention to the following
//- make sure all operations in callback use the sessCtx as context parameter
//- Dont forget to call EndSession if session is not used anymore
//- if operations in callback takes more than(include equal) 120s, the operations will not take effect,
//- if operation in callback return qmgo.ErrTransactionRetry,
// the whole transaction will retry, so this transaction must be idempotent
//- if operations in callback return qmgo.ErrTransactionNotSupported,
//- If the ctx parameter already has a Session attached to it, it will be replaced by this session.
// precondition:
// - version of mongoDB server >= v4.0
// - Topology of mongoDB server is not Single
// At the same time, please pay attention to the following
// - make sure all operations in callback use the sessCtx as context parameter
// - Dont forget to call EndSession if session is not used anymore
// - if operations in callback takes more than(include equal) 120s, the operations will not take effect,
// - if operation in callback return qmgo.ErrTransactionRetry,
// the whole transaction will retry, so this transaction must be idempotent
// - if operations in callback return qmgo.ErrTransactionNotSupported,
// - If the ctx parameter already has a Session attached to it, it will be replaced by this session.
func (s *Session) StartTransaction(ctx context.Context, cb func(sessCtx context.Context) (interface{}, error), opts ...*opts.TransactionOptions) (interface{}, error) {
transactionOpts := options.Transaction()
if len(opts) > 0 && opts[0].TransactionOptions != nil {
Expand All @@ -51,6 +51,27 @@ func (s *Session) StartTransaction(ctx context.Context, cb func(sessCtx context.
return result, nil
}

func (s *Session) StartAsyncTransaction(ctx context.Context, opts ...*opts.TransactionOptions) (context.Context, error) {
transactionOpts := options.Transaction()
if len(opts) > 0 && opts[0].TransactionOptions != nil {
transactionOpts = opts[0].TransactionOptions
}

sCtx := mongo.NewSessionContext(ctx, s.session)

err := s.session.StartTransaction(transactionOpts)

return sCtx, err
}

func (s *Session) CommitAsyncTransaction(ctx context.Context) error {
return s.session.CommitTransaction(ctx)
}

func (s *Session) AbortAsyncTransaction(ctx context.Context) error {
return s.session.AbortTransaction(ctx)
}

// EndSession will abort any existing transactions and close the session.
func (s *Session) EndSession(ctx context.Context) {
s.session.EndSession(ctx)
Expand Down
76 changes: 76 additions & 0 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,82 @@ func initTransactionClient(coll string) *QmgoClient {
return qClient

}

func TestClient_AsyncTransaction(t *testing.T) {
ast := require.New(t)
ctx := context.Background()
cli := initTransactionClient("test")
defer cli.DropDatabase(ctx)

Check warning on line 58 in session_test.go

View check run for this annotation

qiniu-x / golangci-lint

session_test.go#L58

Error return value of `cli.DropDatabase` is not checked (errcheck)

session, err := cli.Session()
if err != nil {
ast.NoError(err)
}

defer session.EndSession(ctx)

sCtx, err := session.StartAsyncTransaction(ctx)
if err != nil {
ast.NoError(err)
}

if _, err := cli.InsertOne(sCtx, bson.D{{"abc", int32(1)}}); err != nil {
ast.NoError(err)
}
if _, err := cli.InsertOne(sCtx, bson.D{{"xyz", int32(999)}}); err != nil {
ast.NoError(err)
}

if err := session.CommitAsyncTransaction(sCtx); err != nil {
ast.NoError(err)
}

r := bson.M{}
cli.Find(ctx, bson.M{"abc": 1}).One(&r)

Check warning on line 84 in session_test.go

View check run for this annotation

qiniu-x / golangci-lint

session_test.go#L84

Error return value of `(github.com/qiniu/qmgo.QueryI).One` is not checked (errcheck)
ast.Equal(r["abc"], int32(1))

Check warning on line 85 in session_test.go

View check run for this annotation

qiniu-x / golangci-lint

session_test.go#L85

expected-actual: need to reverse actual and expected values (testifylint)

cli.Find(ctx, bson.M{"xyz": 999}).One(&r)

Check warning on line 87 in session_test.go

View check run for this annotation

qiniu-x / golangci-lint

session_test.go#L87

Error return value of `(github.com/qiniu/qmgo.QueryI).One` is not checked (errcheck)
ast.Equal(r["xyz"], int32(999))
}

func TestClient_AsyncAbortTransaction(t *testing.T) {
ast := require.New(t)
ctx := context.Background()
cli := initTransactionClient("test")
defer cli.DropDatabase(ctx)
session, err := cli.Session()
if err != nil {
ast.NoError(err)
}

sCtx, err := session.StartAsyncTransaction(ctx)
if err != nil {
ast.NoError(err)
}

defer session.EndSession(ctx)
defer session.AbortAsyncTransaction(sCtx)

Check warning on line 107 in session_test.go

View check run for this annotation

qiniu-x / golangci-lint

session_test.go#L107

Error return value of `session.AbortAsyncTransaction` is not checked (errcheck)

if _, err := cli.InsertOne(sCtx, bson.D{{"abc", int32(1)}}); err != nil {
ast.NoError(err)
}

if _, err := cli.InsertOne(sCtx, bson.D{{"xyz", int32(999)}}); err != nil {
ast.NoError(err)
}

if err := session.AbortAsyncTransaction(sCtx); err != nil {
ast.NoError(err)
}

r := bson.M{}
cli.Find(ctx, bson.M{"abc": 1}).One(&r)
ast.Empty(r)

cli.Find(ctx, bson.M{"xyz": 999}).One(&r)
ast.Empty(r)
}

func TestClient_DoTransaction(t *testing.T) {
ast := require.New(t)
ctx := context.Background()
Expand Down

0 comments on commit eabd9ab

Please sign in to comment.