Skip to content

Commit

Permalink
feat: add transaction isolation level support to pgdriver (#1034)
Browse files Browse the repository at this point in the history
  • Loading branch information
iamgoroot authored Oct 21, 2024
1 parent f8e0d5e commit 3ef44ce
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 5 deletions.
17 changes: 12 additions & 5 deletions driver/pgdriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -213,15 +212,23 @@ var _ driver.ConnBeginTx = (*Conn)(nil)

func (cn *Conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
// No need to check if the conn is closed. ExecContext below handles that.
isolation := sql.IsolationLevel(opts.Isolation)

if sql.IsolationLevel(opts.Isolation) != sql.LevelDefault {
return nil, errors.New("pgdriver: custom IsolationLevel is not supported")
var command string
switch isolation {
case sql.LevelDefault:
command = "BEGIN"
case sql.LevelReadUncommitted, sql.LevelReadCommitted, sql.LevelRepeatableRead, sql.LevelSerializable:
command = fmt.Sprintf("BEGIN; SET TRANSACTION ISOLATION LEVEL %s", isolation.String())
default:
return nil, fmt.Errorf("pgdriver: unsupported transaction isolation: %s", isolation.String())
}

if opts.ReadOnly {
return nil, errors.New("pgdriver: ReadOnly transactions are not supported")
command = fmt.Sprintf("%s READ ONLY", command)
}

if _, err := cn.ExecContext(ctx, "BEGIN", nil); err != nil {
if _, err := cn.ExecContext(ctx, command, nil); err != nil {
return nil, err
}
return tx{cn: cn}, nil
Expand Down
69 changes: 69 additions & 0 deletions driver/pgdriver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package pgdriver_test
import (
"context"
"database/sql"
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -288,6 +290,73 @@ func TestPartialScan(t *testing.T) {
}
}

func TestTransactionIsolationLevels(t *testing.T) {
db := sqlDB()
t.Cleanup(func() {
require.NoError(t, db.Close())
})
type testCase struct {
*sql.TxOptions
supported bool
expectedIsoLvl string
}
testCases := []testCase{
// supported
{TxOptions: &sql.TxOptions{Isolation: sql.LevelDefault, ReadOnly: true}, supported: true, expectedIsoLvl: "READ COMMITTED"},
{TxOptions: &sql.TxOptions{Isolation: sql.LevelDefault, ReadOnly: false}, supported: true, expectedIsoLvl: "READ COMMITTED"},

{TxOptions: &sql.TxOptions{Isolation: sql.LevelReadUncommitted, ReadOnly: true}, supported: true, expectedIsoLvl: sql.LevelReadUncommitted.String()},
{TxOptions: &sql.TxOptions{Isolation: sql.LevelReadUncommitted, ReadOnly: false}, supported: true, expectedIsoLvl: sql.LevelReadUncommitted.String()},
{TxOptions: &sql.TxOptions{Isolation: sql.LevelReadCommitted, ReadOnly: true}, supported: true, expectedIsoLvl: sql.LevelReadCommitted.String()},
{TxOptions: &sql.TxOptions{Isolation: sql.LevelReadCommitted, ReadOnly: false}, supported: true, expectedIsoLvl: sql.LevelReadCommitted.String()},
{TxOptions: &sql.TxOptions{Isolation: sql.LevelRepeatableRead, ReadOnly: true}, supported: true, expectedIsoLvl: sql.LevelRepeatableRead.String()},
{TxOptions: &sql.TxOptions{Isolation: sql.LevelRepeatableRead, ReadOnly: false}, supported: true, expectedIsoLvl: sql.LevelRepeatableRead.String()},
{TxOptions: &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: true}, supported: true, expectedIsoLvl: sql.LevelSerializable.String()},
{TxOptions: &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: false}, supported: true, expectedIsoLvl: sql.LevelSerializable.String()},
// unsupported
{TxOptions: &sql.TxOptions{Isolation: sql.LevelLinearizable, ReadOnly: true}, supported: false},
{TxOptions: &sql.TxOptions{Isolation: sql.LevelLinearizable, ReadOnly: false}, supported: false},
{TxOptions: &sql.TxOptions{Isolation: sql.LevelSnapshot, ReadOnly: true}, supported: false},
{TxOptions: &sql.TxOptions{Isolation: sql.LevelSnapshot, ReadOnly: false}, supported: false},
{TxOptions: &sql.TxOptions{Isolation: sql.LevelWriteCommitted, ReadOnly: true}, supported: false},
{TxOptions: &sql.TxOptions{Isolation: sql.LevelWriteCommitted, ReadOnly: false}, supported: false},
}
testIsolationFunc := func(t *testing.T, testCase testCase) {
tx, err := db.BeginTx(context.Background(), testCase.TxOptions)
if !testCase.supported {
require.Error(t, err)
return
}
require.NoError(t, err)
t.Cleanup(func() {
err := tx.Rollback()
require.NoError(t, err)
})

var currentLvl string
err = tx.QueryRow("SHOW TRANSACTION ISOLATION LEVEL;").Scan(&currentLvl)
require.NoError(t, err)
expectedIsoLvl := strings.ToUpper(testCase.expectedIsoLvl)
currentIsoLvl := strings.ToUpper(currentLvl)
require.Equal(t, expectedIsoLvl, currentIsoLvl)

var readOnlyResult string
err = tx.QueryRow("SHOW TRANSACTION_READ_ONLY").Scan(&readOnlyResult)
require.NoError(t, err)
isReadOnly := strings.ToUpper(readOnlyResult) == "ON"

require.Equal(t, testCase.ReadOnly, isReadOnly)
}

for i := 0; i < len(testCases); i++ {
testCase := testCases[i]
name := fmt.Sprintf("test isolation level %s read only %t", testCase.Isolation.String(), testCase.ReadOnly)
t.Run(name, func(t *testing.T) {
testIsolationFunc(t, testCase)
})
}
}

func sqlDB() *sql.DB {
db, err := sql.Open("pg", dsn())
if err != nil {
Expand Down

0 comments on commit 3ef44ce

Please sign in to comment.