-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdlock_by_zk.go
145 lines (124 loc) · 3.17 KB
/
dlock_by_zk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package distlock
import (
"os"
"strconv"
"strings"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/samuel/go-zookeeper/zk"
)
// DLockByZookeeper 通过zookeeper实现的分布式锁
type DLockByZookeeper struct {
conn *zk.Conn
lockpath string
}
// NewDLockByZookeeper 获取DLockByZookeeper实例.
func NewDLockByZookeeper(conn *zk.Conn) *DLockByZookeeper {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339}).With().Caller().Logger()
return &DLockByZookeeper{
conn: conn,
}
}
// TryLock 尝试获取分布式锁, 超时后就放弃 (不可重入锁).
/*
==> acquire lock
n = create("/distlock/fast-lock/request-", "", ephemeral|sequence)
RETRY:
children = getChildren("/distlock/fast-lock", watch=False)
if n is lowest znode in children:
return
else:
exist("/distlock/fast-lock/request-" % (n - 1), watch=True)
watch_event:
goto RETRY
*/
func (dlz *DLockByZookeeper) TryLock(timeoutInSecs int) bool {
if dlz.lockpath != "" {
log.Error().Err(ErrDeadlock).Msg("failed to acquire lock")
return false
}
path, err := safeCreate(dlz.conn, _LockerLockPathFastLockUsedPrefix, []byte(""), zk.FlagEphemeral|zk.FlagSequence)
if err != nil {
log.Error().Err(err).Msg("failed to acquire lock")
return false
}
seq := dlz.getSequenceNum(path, _LockerLockPathFastLockUsedPrefix)
ticker := time.NewTicker(time.Duration(timeoutInSecs) * time.Second)
LOOP:
for {
select {
case <-ticker.C:
{
log.Warn().Msg("failed to acquire lock, since timeout to retry")
return false
}
default:
{
TRY_AGAIN:
children, _, err := safeGetChildren(dlz.conn, _LockerLockPathFastLockUsed, false)
if err != nil {
log.Error().Err(err).Msg("failed to acquire lock")
return false
}
minSeq := seq
prevSeq := -1
prevSeqPath := ""
for _, child := range children {
s := dlz.getSequenceNum(child, _LockerLockPathFastLockUsedShortestPrefix)
if s < minSeq {
minSeq = s
}
if s < seq && s > prevSeq {
prevSeq = s
prevSeqPath = child
}
}
if seq == minSeq {
break LOOP
}
_, _, watcher, err := dlz.conn.ExistsW(_LockerLockPathFastLockUsed + "/" + prevSeqPath)
if err != nil {
log.Error().Err(err).Msg("failed to acquire lock")
return false
}
smallTicker := time.NewTicker(time.Duration(timeoutInSecs/3) * time.Second)
for {
select {
case ev, ok := <-watcher:
{
if !ok {
return false
}
if ev.Type == zk.EventNodeDeleted {
goto TRY_AGAIN
}
}
case <-smallTicker.C:
{
goto TRY_AGAIN
}
}
}
}
}
}
dlz.lockpath = path
return true
}
// Unlock 释放分布式锁.
/*
==> release lock (voluntarily or session timeout)
delete("/distlock/fast-lock/request-" % n)
*/
func (dlz *DLockByZookeeper) Unlock() {
if err := safeDelete(dlz.conn, dlz.lockpath, -1); err != nil {
log.Error().Err(err).Msg("failed to release lock")
}
dlz.lockpath = ""
}
func (dlz *DLockByZookeeper) getSequenceNum(path, prefix string) int {
numStr := strings.TrimPrefix(path, prefix)
num, _ := strconv.Atoi(numStr)
return num
}