Skip to content

Commit

Permalink
update file config
Browse files Browse the repository at this point in the history
  • Loading branch information
smx-Morgan committed Oct 23, 2024
1 parent 97b4275 commit a16af31
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 0 deletions.
83 changes: 83 additions & 0 deletions config/file/client/circuit_breaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"strings"

utils "github.com/cloudwego-contrib/cwgo-pkg/config/common"
"github.com/cloudwego-contrib/cwgo-pkg/config/file/monitor"

kitexclient "github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/rpcinfo"
)

// WithCircuitBreaker returns a server.Option that sets the circuit breaker for the client
func WithCircuitBreaker(service string, watcher monitor.ConfigMonitor) []kitexclient.Option {
cbSuite, keyCircuitBreaker := initCircuitBreaker(service, watcher)
return []kitexclient.Option{
kitexclient.WithCircuitBreaker(cbSuite),
kitexclient.WithCloseCallbacks(func() error {
watcher.DeregisterCallback(keyCircuitBreaker)
return cbSuite.Close()
}),
}
}

// initCircuitBreaker init the circuitbreaker suite
func initCircuitBreaker(service string, watcher monitor.ConfigMonitor) (*circuitbreak.CBSuite, int64) {
cb := circuitbreak.NewCBSuite(genServiceCBKeyWithRPCInfo)
lcb := utils.ThreadSafeSet{}

onChangeCallback := func() {
set := utils.Set{}
config := getFileConfig(watcher)
if config == nil {
return // config is nil, do nothing, log will be printed in getFileConfig
}

for method, config := range config.Circuitbreaker {
set[method] = true
key := genServiceCBKey(service, method)
cb.UpdateServiceCBConfig(key, *config)
}

for _, method := range lcb.DiffAndEmplace(set) {
key := genServiceCBKey(service, method)
cb.UpdateServiceCBConfig(key, circuitbreak.GetDefaultCBConfig())
}
}

keyCircuitBreaker := watcher.RegisterCallback(onChangeCallback)
return cb, keyCircuitBreaker
}

func genServiceCBKeyWithRPCInfo(ri rpcinfo.RPCInfo) string {
if ri == nil {
return ""
}
return genServiceCBKey(ri.To().ServiceName(), ri.To().Method())
}

func genServiceCBKey(toService, method string) string {
sum := len(toService) + len(method) + 2
var buf strings.Builder
buf.Grow(sum)
buf.WriteString(toService)
buf.WriteByte('/')
buf.WriteString(method)
return buf.String()
}
35 changes: 35 additions & 0 deletions config/file/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"github.com/cloudwego-contrib/cwgo-pkg/config/file/monitor"
"github.com/cloudwego-contrib/cwgo-pkg/config/file/parser"
"github.com/cloudwego/kitex/pkg/klog"
)

// getFileConfig returns the config from the watcher.
// if the config type is not *parser.ClientFileConfig, it will log an error and return nil.
func getFileConfig(watcher monitor.ConfigMonitor) *parser.ClientFileConfig {
config, ok := watcher.Config().(*parser.ClientFileConfig)
if !ok {
// This should never happen.
// But if it does, we should log it and do nothing.
// Otherwise, the program will panic.
klog.Errorf("[local] Invalid config type: %T", watcher.Config())
return nil
}
return config
}
78 changes: 78 additions & 0 deletions config/file/client/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
utils "github.com/cloudwego-contrib/cwgo-pkg/config/common"
"github.com/cloudwego-contrib/cwgo-pkg/config/file/monitor"
kitexclient "github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/retry"
)

// WithRetryPolicy returns a server.Option that sets the retry policies for the client
func WithRetryPolicy(watcher monitor.ConfigMonitor) []kitexclient.Option {
rc, keyRetry := initRetryContainer(watcher)
return []kitexclient.Option{
kitexclient.WithRetryContainer(rc),
kitexclient.WithCloseCallbacks(func() error {
watcher.DeregisterCallback(keyRetry)
return rc.Close()
}),
}
}

// initRetryOptions init the retry container
func initRetryContainer(watcher monitor.ConfigMonitor) (*retry.Container, int64) {
retryContainer := retry.NewRetryContainerWithPercentageLimit()

ts := utils.ThreadSafeSet{}

onChangeCallback := func() {
// the key is method name, wildcard "*" can match anything.
config := getFileConfig(watcher)
if config == nil {
return // config is nil, do nothing, log will be printed in getFileConfig
}
rcs := config.Retry
set := utils.Set{}

for method, policy := range rcs {
set[method] = true

if policy.BackupPolicy != nil && policy.FailurePolicy != nil {
klog.Warnf("[local] %s client policy for method %s BackupPolicy and FailurePolicy must not be set at same time",
watcher.Key(), method)
continue
}

if policy.BackupPolicy == nil && policy.FailurePolicy == nil {
klog.Warnf("[local] %s client policy for method %s BackupPolicy and FailurePolicy must not be empty at same time",
watcher.Key(), method)
continue
}

retryContainer.NotifyPolicyChange(method, *policy)
}

for _, method := range ts.DiffAndEmplace(set) {
retryContainer.DeletePolicy(method)
}
}

keyRetry := watcher.RegisterCallback(onChangeCallback)

return retryContainer, keyRetry
}
51 changes: 51 additions & 0 deletions config/file/client/rpc_timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"github.com/cloudwego-contrib/cwgo-pkg/config/file/monitor"
kitexclient "github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/rpctimeout"
)

// WithRPCTimeout returns a server.Option that sets the timeout provider for the client.
func WithRPCTimeout(watcher monitor.ConfigMonitor) []kitexclient.Option {
opt, keyRPCTimeout := initRPCTimeout(watcher)
return []kitexclient.Option{
kitexclient.WithTimeoutProvider(opt),
kitexclient.WithCloseCallbacks(func() error {
watcher.DeregisterCallback(keyRPCTimeout)
return nil
}),
}
}

// initRPCTimeout init the rpc timeout provider
func initRPCTimeout(watcher monitor.ConfigMonitor) (rpcinfo.TimeoutProvider, int64) {
rpcTimeoutContainer := rpctimeout.NewContainer()

onChangeCallback := func() {
// the key is method name, wildcard "*" can match anything.
config := getFileConfig(watcher)
if config == nil {
return // config is nil, do nothing, log will be printed in getFileConfig
}
rpcTimeoutContainer.NotifyPolicyChange(config.Timeout)
}

keyRPCTimeout := watcher.RegisterCallback(onChangeCallback)
return rpcTimeoutContainer, keyRPCTimeout
}
61 changes: 61 additions & 0 deletions config/file/client/suite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"github.com/cloudwego-contrib/cwgo-pkg/config/file/filewatcher"
"github.com/cloudwego-contrib/cwgo-pkg/config/file/monitor"
"github.com/cloudwego-contrib/cwgo-pkg/config/file/parser"
"github.com/cloudwego-contrib/cwgo-pkg/config/file/utils"
kitexclient "github.com/cloudwego/kitex/client"
)

type FileConfigClientSuite struct {
watcher monitor.ConfigMonitor
service string
}

// NewSuite service is the destination service.
func NewSuite(service, key string, watcher filewatcher.FileWatcher, opts ...utils.Option) *FileConfigClientSuite {
cm, err := monitor.NewConfigMonitor(key, watcher, opts...)
if err != nil {
panic(err)
}

return &FileConfigClientSuite{
watcher: cm,
service: service,
}
}

// Options return a list client.Option
func (s *FileConfigClientSuite) Options() []kitexclient.Option {
s.watcher.SetManager(&parser.ClientFileManager{})

opts := make([]kitexclient.Option, 0, 7)
opts = append(opts, WithRetryPolicy(s.watcher)...)
opts = append(opts, WithCircuitBreaker(s.service, s.watcher)...)
opts = append(opts, WithRPCTimeout(s.watcher)...)
opts = append(opts, kitexclient.WithCloseCallbacks(func() error {
s.watcher.Stop()
return nil
}))

if err := s.watcher.Start(); err != nil {
panic(err)
}

return opts
}

0 comments on commit a16af31

Please sign in to comment.