Skip to content

Commit

Permalink
now ip chooser & subnet chooser would always returns results
Browse files Browse the repository at this point in the history
  • Loading branch information
bachue committed Jan 30, 2024
1 parent 71ee70c commit 64e2749
Show file tree
Hide file tree
Showing 12 changed files with 305 additions and 268 deletions.
3 changes: 1 addition & 2 deletions storage/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"strings"
"time"

"github.com/alex-ant/gomath/rational"
"github.com/qiniu/go-sdk/v7/internal/clientv2"
"github.com/qiniu/go-sdk/v7/storagev2/apis"
"github.com/qiniu/go-sdk/v7/storagev2/apis/batch_ops"
Expand Down Expand Up @@ -1066,7 +1065,7 @@ func (m *BucketManager) chooser() chooser.Chooser {
if m.options.Chooser != nil {
return m.options.Chooser
}
return chooser.NewNeverEmptyHandedChooser(chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil)), rational.New(1, 2))
return chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil))
}

// 构建op的方法,导出的方法支持在Batch操作中使用
Expand Down
3 changes: 1 addition & 2 deletions storage/region_uc_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"golang.org/x/sync/singleflight"

"github.com/alex-ant/gomath/rational"
"github.com/qiniu/go-sdk/v7/internal/clientv2"
"github.com/qiniu/go-sdk/v7/storagev2/chooser"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
Expand Down Expand Up @@ -262,7 +261,7 @@ func getRegionByV2(ak, bucket string, options UCApiOptions) (*Region, error) {
}
}
if options.Chooser == nil {
options.Chooser = chooser.NewNeverEmptyHandedChooser(chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil)), rational.New(1, 2))
options.Chooser = chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil))
}

var ret UcQueryRet
Expand Down
3 changes: 1 addition & 2 deletions storage/region_uc_v4.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"golang.org/x/sync/singleflight"

"github.com/alex-ant/gomath/rational"
"github.com/qiniu/go-sdk/v7/internal/clientv2"
"github.com/qiniu/go-sdk/v7/storagev2/chooser"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
Expand Down Expand Up @@ -160,7 +159,7 @@ func getRegionByV4(ak, bucket string, options UCApiOptions) (*RegionGroup, error
}
}
if options.Chooser == nil {
options.Chooser = chooser.NewNeverEmptyHandedChooser(chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil)), rational.New(1, 2))
options.Chooser = chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil))
}

var ret ucQueryV4Ret
Expand Down
13 changes: 8 additions & 5 deletions storagev2/chooser/chooser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package chooser
import (
"context"
"net"
"time"
)

type (
Expand All @@ -22,10 +21,6 @@ type (
FeedbackGood(context.Context, *FeedbackOptions)
FeedbackBad(context.Context, *FeedbackOptions)
}

blacklistItem struct {
expiredAt time.Time
}
)

type directChooser struct {
Expand All @@ -46,3 +41,11 @@ func (chooser *directChooser) FeedbackGood(_ context.Context, _ *FeedbackOptions
func (chooser *directChooser) FeedbackBad(_ context.Context, _ *FeedbackOptions) {
// do nothing
}

func (options *ChooseOptions) makeSet(makeKey func(string, net.IP) string) map[string]struct{} {
m := make(map[string]struct{}, len(options.IPs))
for _, ip := range options.IPs {
m[makeKey(options.Domain, ip)] = struct{}{}
}
return m
}
241 changes: 171 additions & 70 deletions storagev2/chooser/ip_chooser.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,38 @@
package chooser

import (
"bytes"
"container/heap"
"context"
"fmt"
"net"
"sync"
"time"
)

type ipChooser struct {
blacklist map[string]blacklistItem
blacklistMutex sync.Mutex
freezeDuration time.Duration
shrinkInterval time.Duration
shrunkAt time.Time
}
type (
blackItem struct {
index int
domain string
ip net.IP
expiredAt time.Time
}

type IPChooserOptions struct {
FreezeDuration time.Duration
ShrinkInterval time.Duration
}
blackheap struct {
m map[string]*blackItem
items []*blackItem
}

ipChooser struct {
blackheap blackheap
blackheapMutex sync.Mutex
freezeDuration time.Duration
}

IPChooserOptions struct {
FreezeDuration time.Duration
}
)

// NewIPChooser 创建 IP 选择器
func NewIPChooser(options *IPChooserOptions) Chooser {
Expand All @@ -29,94 +42,182 @@ func NewIPChooser(options *IPChooserOptions) Chooser {
if options.FreezeDuration == 0 {
options.FreezeDuration = 10 * time.Minute
}
if options.ShrinkInterval == 0 {
options.ShrinkInterval = 10 * time.Minute
}
return &ipChooser{
blacklist: make(map[string]blacklistItem),
blackheap: blackheap{
m: make(map[string]*blackItem, 1024),
items: make([]*blackItem, 0, 1024),
},
freezeDuration: options.FreezeDuration,
shrinkInterval: options.ShrinkInterval,
shrunkAt: time.Now(),
}
}

func (chooser *ipChooser) Choose(_ context.Context, options *ChooseOptions) []net.IP {
return chooser.isInBlacklist(options.Domain, options.IPs)
// TODO: 考虑没有 IP 的情况
chooser.blackheapMutex.Lock()
defer chooser.blackheapMutex.Unlock()

chosenIPs := make([]net.IP, 0, chooser.blackheap.Len())
for _, ip := range options.IPs {
if item := chooser.blackheap.FindByDomainAndIp(options.Domain, ip); item == nil {
chosenIPs = append(chosenIPs, ip)
}
}
if len(chosenIPs) > 0 {
return chosenIPs
}

var chosenExpiredAt time.Time
toFind := options.makeSet(makeMapKey)
backups := make([]*blackItem, 0, chooser.blackheap.Len())
for chooser.blackheap.Len() > 0 {
firstChosen := heap.Pop(&chooser.blackheap).(*blackItem)
backups = append(backups, firstChosen)
key := makeMapKey(firstChosen.domain, firstChosen.ip)
if _, ok := toFind[key]; ok {
chosenExpiredAt = firstChosen.expiredAt
chosenIPs = append(chosenIPs, firstChosen.ip)
delete(toFind, key)
break
}
}
if chosenExpiredAt.IsZero() {
panic("chosenExpiredAt should not be empty")
}
for chooser.blackheap.Len() > 0 {
item := heap.Pop(&chooser.blackheap).(*blackItem)
backups = append(backups, item)
if chosenExpiredAt.Equal(item.expiredAt) {
key := makeMapKey(item.domain, item.ip)
if _, ok := toFind[key]; ok {
chosenIPs = append(chosenIPs, item.ip)
delete(toFind, key)
}
} else {
break
}
}
for _, backup := range backups {
if backup.expiredAt.After(time.Now()) {
heap.Push(&chooser.blackheap, backup)
}
}
return chosenIPs
}

func (chooser *ipChooser) FeedbackGood(_ context.Context, options *FeedbackOptions) {
chooser.deleteFromBlacklist(options.Domain, options.IPs)
// TODO: 考虑没有 IP 的情况
chooser.blackheapMutex.Lock()
defer chooser.blackheapMutex.Unlock()

for _, ip := range options.IPs {
if item := chooser.blackheap.FindByDomainAndIp(options.Domain, ip); item != nil {
heap.Remove(&chooser.blackheap, item.index)
}
}
}

func (chooser *ipChooser) FeedbackBad(_ context.Context, options *FeedbackOptions) {
chooser.putIntoBlacklist(options.Domain, options.IPs)
// TODO: 考虑没有 IP 的情况
chooser.blackheapMutex.Lock()
defer chooser.blackheapMutex.Unlock()

newExpiredAt := time.Now().Add(chooser.freezeDuration)
for _, ip := range options.IPs {
if item := chooser.blackheap.FindByDomainAndIp(options.Domain, ip); item != nil {
chooser.blackheap.items[item.index].expiredAt = newExpiredAt
heap.Fix(&chooser.blackheap, item.index)
} else {
heap.Push(&chooser.blackheap, &blackItem{
domain: options.Domain,
ip: ip,
expiredAt: newExpiredAt,
})
}
}
}

func (chooser *ipChooser) isInBlacklist(domain string, ips []net.IP) []net.IP {
chooser.blacklistMutex.Lock()
defer chooser.blacklistMutex.Unlock()
func (h *blackheap) Len() int {
return len(h.items)
}

filtered := make([]net.IP, 0, len(ips))
func (h *blackheap) Less(i, j int) bool {
return h.items[i].expiredAt.Before(h.items[j].expiredAt)
}

for _, ip := range ips {
blocklistKey := chooser.makeBlocklistKey(domain, ip)
if blacklistItem, ok := chooser.blacklist[blocklistKey]; ok {
if time.Now().After(blacklistItem.expiredAt) {
delete(chooser.blacklist, blocklistKey)
filtered = append(filtered, ip)
}
} else {
filtered = append(filtered, ip)
}
func (h *blackheap) Swap(i, j int) {
if i == j {
return
}
h.items[i].domain, h.items[j].domain = h.items[j].domain, h.items[i].domain
h.items[i].ip, h.items[j].ip = h.items[j].ip, h.items[i].ip
h.items[i].expiredAt, h.items[j].expiredAt = h.items[j].expiredAt, h.items[i].expiredAt
h.m[makeMapKey(h.items[i].domain, h.items[i].ip)] = h.items[i]
h.m[makeMapKey(h.items[j].domain, h.items[j].ip)] = h.items[j]
}

go chooser.shrink()

return filtered
func (h *blackheap) Push(x interface{}) {
item := x.(*blackItem)
item.index = len(h.items)
h.items = append(h.items, item)
h.m[makeMapKey(item.domain, item.ip)] = item
}

func (chooser *ipChooser) putIntoBlacklist(domain string, ips []net.IP) {
chooser.blacklistMutex.Lock()
defer chooser.blacklistMutex.Unlock()
func (h *blackheap) Pop() interface{} {
n := len(h.items)
last := h.items[n-1]
h.items = h.items[0 : n-1]
delete(h.m, makeMapKey(last.domain, last.ip))
return last
}

for _, ip := range ips {
blocklistKey := chooser.makeBlocklistKey(domain, ip)
chooser.blacklist[blocklistKey] = blacklistItem{expiredAt: time.Now().Add(chooser.freezeDuration)}
func (h *blackheap) FindByDomainAndIp(domain string, ip net.IP) *blackItem {
key := makeMapKey(domain, ip)
if item, ok := h.m[key]; ok {
if item.expiredAt.Before(time.Now()) {
heap.Remove(h, item.index)
return nil
}
return item
}

go chooser.shrink()
return nil
}

func (chooser *ipChooser) deleteFromBlacklist(domain string, ips []net.IP) {
chooser.blacklistMutex.Lock()
defer chooser.blacklistMutex.Unlock()
func (h *blackheap) String() string {
var buf bytes.Buffer

for _, ip := range ips {
blocklistKey := chooser.makeBlocklistKey(domain, ip)
delete(chooser.blacklist, blocklistKey)
buf.WriteString("&blackheap{ items: [")
for _, item := range h.items {
buf.WriteString(item.String())
buf.WriteString(", ")
}
buf.WriteString("], m: {")
for k, v := range h.m {
buf.WriteString(k)
buf.WriteString(": ")
buf.WriteString(v.String())
buf.WriteString(", ")
}
buf.WriteString("}")

go chooser.shrink()
return buf.String()
}

func (chooser *ipChooser) makeBlocklistKey(domain string, ip net.IP) string {
return fmt.Sprintf("%s_%s", ip, domain)
}
func (item *blackItem) String() string {
var buf bytes.Buffer

func (chooser *ipChooser) shrink() {
chooser.blacklistMutex.Lock()
defer chooser.blacklistMutex.Unlock()
buf.WriteString("&blackItem{ index: ")
buf.WriteString(fmt.Sprintf("%d", item.index))
buf.WriteString(", domain: ")
buf.WriteString(item.domain)
buf.WriteString(", ip: ")
buf.WriteString(item.ip.String())
buf.WriteString(", expiredAt: ")
buf.WriteString(item.expiredAt.String())
buf.WriteString("}")

if time.Now().After(chooser.shrunkAt.Add(chooser.shrinkInterval)) {
shrinkKeys := make([]string, 0, len(chooser.blacklist))
for key, blacklistItem := range chooser.blacklist {
if time.Now().After(blacklistItem.expiredAt) {
shrinkKeys = append(shrinkKeys, key)
}
}
for _, key := range shrinkKeys {
delete(chooser.blacklist, key)
}
chooser.shrunkAt = time.Now()
}
return buf.String()
}

func makeMapKey(domain string, ip net.IP) string {
return ip.String() + "|" + domain
}
Loading

0 comments on commit 64e2749

Please sign in to comment.