diff --git a/config/file/client/circuit_breaker.go b/config/file/client/circuit_breaker.go new file mode 100644 index 0000000..b1d40e0 --- /dev/null +++ b/config/file/client/circuit_breaker.go @@ -0,0 +1,83 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "strings" + + utils "github.com/cloudwego-contrib/cwgo-pkg/config/common" + "github.com/cloudwego-contrib/cwgo-pkg/config/file/monitor" + + kitexclient "github.com/cloudwego/kitex/client" + "github.com/cloudwego/kitex/pkg/circuitbreak" + "github.com/cloudwego/kitex/pkg/rpcinfo" +) + +// WithCircuitBreaker returns a server.Option that sets the circuit breaker for the client +func WithCircuitBreaker(service string, watcher monitor.ConfigMonitor) []kitexclient.Option { + cbSuite, keyCircuitBreaker := initCircuitBreaker(service, watcher) + return []kitexclient.Option{ + kitexclient.WithCircuitBreaker(cbSuite), + kitexclient.WithCloseCallbacks(func() error { + watcher.DeregisterCallback(keyCircuitBreaker) + return cbSuite.Close() + }), + } +} + +// initCircuitBreaker init the circuitbreaker suite +func initCircuitBreaker(service string, watcher monitor.ConfigMonitor) (*circuitbreak.CBSuite, int64) { + cb := circuitbreak.NewCBSuite(genServiceCBKeyWithRPCInfo) + lcb := utils.ThreadSafeSet{} + + onChangeCallback := func() { + set := utils.Set{} + config := getFileConfig(watcher) + if config == nil { + return // config is nil, do nothing, log will be printed in getFileConfig + } + + for method, config := range config.Circuitbreaker { + set[method] = true + key := genServiceCBKey(service, method) + cb.UpdateServiceCBConfig(key, *config) + } + + for _, method := range lcb.DiffAndEmplace(set) { + key := genServiceCBKey(service, method) + cb.UpdateServiceCBConfig(key, circuitbreak.GetDefaultCBConfig()) + } + } + + keyCircuitBreaker := watcher.RegisterCallback(onChangeCallback) + return cb, keyCircuitBreaker +} + +func genServiceCBKeyWithRPCInfo(ri rpcinfo.RPCInfo) string { + if ri == nil { + return "" + } + return genServiceCBKey(ri.To().ServiceName(), ri.To().Method()) +} + +func genServiceCBKey(toService, method string) string { + sum := len(toService) + len(method) + 2 + var buf strings.Builder + buf.Grow(sum) + buf.WriteString(toService) + buf.WriteByte('/') + buf.WriteString(method) + return buf.String() +} diff --git a/config/file/client/client.go b/config/file/client/client.go new file mode 100644 index 0000000..f635b68 --- /dev/null +++ b/config/file/client/client.go @@ -0,0 +1,35 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "github.com/cloudwego-contrib/cwgo-pkg/config/file/monitor" + "github.com/cloudwego-contrib/cwgo-pkg/config/file/parser" + "github.com/cloudwego/kitex/pkg/klog" +) + +// getFileConfig returns the config from the watcher. +// if the config type is not *parser.ClientFileConfig, it will log an error and return nil. +func getFileConfig(watcher monitor.ConfigMonitor) *parser.ClientFileConfig { + config, ok := watcher.Config().(*parser.ClientFileConfig) + if !ok { + // This should never happen. + // But if it does, we should log it and do nothing. + // Otherwise, the program will panic. + klog.Errorf("[local] Invalid config type: %T", watcher.Config()) + return nil + } + return config +} diff --git a/config/file/client/retry.go b/config/file/client/retry.go new file mode 100644 index 0000000..0bf8a6e --- /dev/null +++ b/config/file/client/retry.go @@ -0,0 +1,78 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + utils "github.com/cloudwego-contrib/cwgo-pkg/config/common" + "github.com/cloudwego-contrib/cwgo-pkg/config/file/monitor" + kitexclient "github.com/cloudwego/kitex/client" + "github.com/cloudwego/kitex/pkg/klog" + "github.com/cloudwego/kitex/pkg/retry" +) + +// WithRetryPolicy returns a server.Option that sets the retry policies for the client +func WithRetryPolicy(watcher monitor.ConfigMonitor) []kitexclient.Option { + rc, keyRetry := initRetryContainer(watcher) + return []kitexclient.Option{ + kitexclient.WithRetryContainer(rc), + kitexclient.WithCloseCallbacks(func() error { + watcher.DeregisterCallback(keyRetry) + return rc.Close() + }), + } +} + +// initRetryOptions init the retry container +func initRetryContainer(watcher monitor.ConfigMonitor) (*retry.Container, int64) { + retryContainer := retry.NewRetryContainerWithPercentageLimit() + + ts := utils.ThreadSafeSet{} + + onChangeCallback := func() { + // the key is method name, wildcard "*" can match anything. + config := getFileConfig(watcher) + if config == nil { + return // config is nil, do nothing, log will be printed in getFileConfig + } + rcs := config.Retry + set := utils.Set{} + + for method, policy := range rcs { + set[method] = true + + if policy.BackupPolicy != nil && policy.FailurePolicy != nil { + klog.Warnf("[local] %s client policy for method %s BackupPolicy and FailurePolicy must not be set at same time", + watcher.Key(), method) + continue + } + + if policy.BackupPolicy == nil && policy.FailurePolicy == nil { + klog.Warnf("[local] %s client policy for method %s BackupPolicy and FailurePolicy must not be empty at same time", + watcher.Key(), method) + continue + } + + retryContainer.NotifyPolicyChange(method, *policy) + } + + for _, method := range ts.DiffAndEmplace(set) { + retryContainer.DeletePolicy(method) + } + } + + keyRetry := watcher.RegisterCallback(onChangeCallback) + + return retryContainer, keyRetry +} diff --git a/config/file/client/rpc_timeout.go b/config/file/client/rpc_timeout.go new file mode 100644 index 0000000..c8fc222 --- /dev/null +++ b/config/file/client/rpc_timeout.go @@ -0,0 +1,51 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "github.com/cloudwego-contrib/cwgo-pkg/config/file/monitor" + kitexclient "github.com/cloudwego/kitex/client" + "github.com/cloudwego/kitex/pkg/rpcinfo" + "github.com/cloudwego/kitex/pkg/rpctimeout" +) + +// WithRPCTimeout returns a server.Option that sets the timeout provider for the client. +func WithRPCTimeout(watcher monitor.ConfigMonitor) []kitexclient.Option { + opt, keyRPCTimeout := initRPCTimeout(watcher) + return []kitexclient.Option{ + kitexclient.WithTimeoutProvider(opt), + kitexclient.WithCloseCallbacks(func() error { + watcher.DeregisterCallback(keyRPCTimeout) + return nil + }), + } +} + +// initRPCTimeout init the rpc timeout provider +func initRPCTimeout(watcher monitor.ConfigMonitor) (rpcinfo.TimeoutProvider, int64) { + rpcTimeoutContainer := rpctimeout.NewContainer() + + onChangeCallback := func() { + // the key is method name, wildcard "*" can match anything. + config := getFileConfig(watcher) + if config == nil { + return // config is nil, do nothing, log will be printed in getFileConfig + } + rpcTimeoutContainer.NotifyPolicyChange(config.Timeout) + } + + keyRPCTimeout := watcher.RegisterCallback(onChangeCallback) + return rpcTimeoutContainer, keyRPCTimeout +} diff --git a/config/file/client/suite.go b/config/file/client/suite.go new file mode 100644 index 0000000..0edafeb --- /dev/null +++ b/config/file/client/suite.go @@ -0,0 +1,61 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "github.com/cloudwego-contrib/cwgo-pkg/config/file/filewatcher" + "github.com/cloudwego-contrib/cwgo-pkg/config/file/monitor" + "github.com/cloudwego-contrib/cwgo-pkg/config/file/parser" + "github.com/cloudwego-contrib/cwgo-pkg/config/file/utils" + kitexclient "github.com/cloudwego/kitex/client" +) + +type FileConfigClientSuite struct { + watcher monitor.ConfigMonitor + service string +} + +// NewSuite service is the destination service. +func NewSuite(service, key string, watcher filewatcher.FileWatcher, opts ...utils.Option) *FileConfigClientSuite { + cm, err := monitor.NewConfigMonitor(key, watcher, opts...) + if err != nil { + panic(err) + } + + return &FileConfigClientSuite{ + watcher: cm, + service: service, + } +} + +// Options return a list client.Option +func (s *FileConfigClientSuite) Options() []kitexclient.Option { + s.watcher.SetManager(&parser.ClientFileManager{}) + + opts := make([]kitexclient.Option, 0, 7) + opts = append(opts, WithRetryPolicy(s.watcher)...) + opts = append(opts, WithCircuitBreaker(s.service, s.watcher)...) + opts = append(opts, WithRPCTimeout(s.watcher)...) + opts = append(opts, kitexclient.WithCloseCallbacks(func() error { + s.watcher.Stop() + return nil + })) + + if err := s.watcher.Start(); err != nil { + panic(err) + } + + return opts +}