forked from metal-stack/go-ipam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsql.go
137 lines (125 loc) · 3.88 KB
/
sql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package ipam
import (
"fmt"
"github.com/jmoiron/sqlx"
)
type sql struct {
db *sqlx.DB
}
func (s *sql) prefixExists(prefix Prefix) (*Prefix, bool) {
p, err := s.ReadPrefix(prefix.Cidr)
if err != nil {
return nil, false
}
return &p, true
}
func (s *sql) CreatePrefix(prefix Prefix) (Prefix, error) {
existingPrefix, exists := s.prefixExists(prefix)
if exists {
return *existingPrefix, nil
}
prefix.version = int64(0)
pj, err := prefix.toJSON()
if err != nil {
return Prefix{}, err
}
tx, err := s.db.Beginx()
if err != nil {
return Prefix{}, fmt.Errorf("unable to start transaction:%w", err)
}
_, err = tx.Exec("INSERT INTO prefixes (cidr, prefix) VALUES ($1, $2)", prefix.Cidr, pj)
if err != nil {
return Prefix{}, fmt.Errorf("unable to insert prefix:%w", err)
}
return prefix, tx.Commit()
}
func (s *sql) ReadPrefix(prefix string) (Prefix, error) {
var result []byte
err := s.db.Get(&result, "SELECT prefix FROM prefixes WHERE cidr=$1", prefix)
if err != nil {
return Prefix{}, fmt.Errorf("unable to read prefix:%w", err)
}
return fromJSON(result)
}
func (s *sql) DeleteAllPrefixes() error {
_, err := s.db.Exec("DELETE FROM prefixes")
return err
}
// ReadAllPrefixes returns all known prefixes.
func (s *sql) ReadAllPrefixes() (Prefixes, error) {
var prefixes [][]byte
err := s.db.Select(&prefixes, "SELECT prefix FROM prefixes")
if err != nil {
return nil, fmt.Errorf("unable to read prefixes:%w", err)
}
result := Prefixes{}
for _, v := range prefixes {
pfx, err := fromJSON(v)
if err != nil {
return nil, err
}
result = append(result, pfx)
}
return result, nil
}
// ReadAllPrefixCidrs is cheaper that ReadAllPrefixes because it only returns the Cidrs.
func (s *sql) ReadAllPrefixCidrs() ([]string, error) {
cidrs := []string{}
err := s.db.Select(&cidrs, "SELECT cidr FROM prefixes")
if err != nil {
return nil, fmt.Errorf("unable to read prefixes:%w", err)
}
return cidrs, nil
}
// UpdatePrefix tries to update the prefix.
// Returns OptimisticLockError if it does not succeed due to a concurrent update.
func (s *sql) UpdatePrefix(prefix Prefix) (Prefix, error) {
oldVersion := prefix.version
prefix.version = oldVersion + 1
pn, err := prefix.toJSON()
if err != nil {
return Prefix{}, err
}
tx, err := s.db.Beginx()
if err != nil {
return Prefix{}, fmt.Errorf("unable to start transaction:%w", err)
}
result, err := tx.Exec("SELECT prefix FROM prefixes WHERE cidr=$1 AND prefix->>'Version'=$2 FOR UPDATE", prefix.Cidr, oldVersion)
if err != nil {
return Prefix{}, fmt.Errorf("%w: unable to select for update prefix:%s", ErrOptimisticLockError, prefix.Cidr)
}
rows, err := result.RowsAffected()
if err != nil {
return Prefix{}, err
}
if rows == 0 {
// Rollback, but ignore error, if rollback is omitted, the row lock created by SELECT FOR UPDATE will not get released.
_ = tx.Rollback()
return Prefix{}, fmt.Errorf("%w: select for update did not effect any row", ErrOptimisticLockError)
}
result, err = tx.Exec("UPDATE prefixes SET prefix=$1 WHERE cidr=$2 AND prefix->>'Version'=$3", pn, prefix.Cidr, oldVersion)
if err != nil {
return Prefix{}, fmt.Errorf("%w: unable to update prefix:%s", ErrOptimisticLockError, prefix.Cidr)
}
rows, err = result.RowsAffected()
if err != nil {
return Prefix{}, err
}
if rows == 0 {
// Rollback, but ignore error, if rollback is omitted, the row lock created by SELECT FOR UPDATE will not get released.
_ = tx.Rollback()
return Prefix{}, fmt.Errorf("%w: updatePrefix did not effect any row", ErrOptimisticLockError)
}
return prefix, tx.Commit()
}
func (s *sql) DeletePrefix(prefix Prefix) (Prefix, error) {
tx, err := s.db.Beginx()
if err != nil {
return Prefix{}, fmt.Errorf("unable to start transaction:%w", err)
}
_, err = tx.Exec("DELETE from prefixes WHERE cidr=$1", prefix.Cidr)
if err != nil {
return Prefix{}, fmt.Errorf("unable delete prefix:%w", err)
}
return prefix, tx.Commit()
}