Skip to content

Commit

Permalink
integration config
Browse files Browse the repository at this point in the history
  • Loading branch information
smx-Morgan committed Sep 12, 2024
1 parent a7655f8 commit 7ea5238
Show file tree
Hide file tree
Showing 63 changed files with 998 additions and 720 deletions.
33 changes: 17 additions & 16 deletions config/apollo/apollo/apollo.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package apollo

import (
"bytes"
cwutils "github.com/cloudwego-contrib/cwgo-pkg/config/utils"
"runtime/debug"
"sync"
"text/template"
Expand All @@ -26,18 +27,18 @@ import (

// Client the wrapper of apollo client.
type Client interface {
SetParser(ConfigParser)
ClientConfigParam(cpc *ConfigParamConfig) (ConfigParam, error)
ServerConfigParam(cpc *ConfigParamConfig) (ConfigParam, error)
RegisterConfigCallback(ConfigParam, func(string, ConfigParser), int64)
SetParser(cwutils.ConfigParser)
ClientConfigParam(cpc *cwutils.ConfigParamConfig) (ConfigParam, error)
ServerConfigParam(cpc *cwutils.ConfigParamConfig) (ConfigParam, error)
RegisterConfigCallback(ConfigParam, func(string, cwutils.ConfigParser), int64)
DeregisterConfig(ConfigParam, int64) error
}

type ConfigParam struct {
Key string
nameSpace string
Cluster string
Type ConfigType
Type cwutils.ConfigType
}

type callbackHandler func(namespace, cluster, key, data string)
Expand All @@ -61,7 +62,7 @@ func getConfigParamKey(in *ConfigParam) configParamKey {
type client struct {
acli agollo.Agollo
// support customise parser
parser ConfigParser
parser cwutils.ConfigParser
stop chan bool
clusterTemplate *template.Template
serverKeyTemplate *template.Template
Expand All @@ -87,7 +88,7 @@ type Options struct {
ServerKeyFormat string
ClientKeyFormat string
ApolloOptions []agollo.Option
ConfigParser ConfigParser
ConfigParser cwutils.ConfigParser
}

type OptionFunc func(option *Options)
Expand All @@ -97,7 +98,7 @@ func NewClient(opts Options, optsfunc ...OptionFunc) (Client, error) {
opts.ConfigServerURL = ApolloDefaultConfigServerURL
}
if opts.ConfigParser == nil {
opts.ConfigParser = defaultConfigParse()
opts.ConfigParser = cwutils.DefaultConfigParse()
}
if opts.AppID == "" {
opts.AppID = ApolloDefaultAppId
Expand Down Expand Up @@ -154,11 +155,11 @@ func WithApolloOption(apolloOption ...agollo.Option) OptionFunc {
}
}

func (c *client) SetParser(parser ConfigParser) {
func (c *client) SetParser(parser cwutils.ConfigParser) {
c.parser = parser
}

func (c *client) render(cpc *ConfigParamConfig, t *template.Template) (string, error) {
func (c *client) render(cpc *cwutils.ConfigParamConfig, t *template.Template) (string, error) {
var tpl bytes.Buffer
err := t.Execute(&tpl, cpc)
if err != nil {
Expand All @@ -167,26 +168,26 @@ func (c *client) render(cpc *ConfigParamConfig, t *template.Template) (string, e
return tpl.String(), nil
}

func (c *client) ServerConfigParam(cpc *ConfigParamConfig) (ConfigParam, error) {
func (c *client) ServerConfigParam(cpc *cwutils.ConfigParamConfig) (ConfigParam, error) {
return c.configParam(cpc, c.serverKeyTemplate)
}

// ClientConfigParam render client config parameters
func (c *client) ClientConfigParam(cpc *ConfigParamConfig) (ConfigParam, error) {
func (c *client) ClientConfigParam(cpc *cwutils.ConfigParamConfig) (ConfigParam, error) {
return c.configParam(cpc, c.clientKeyTemplate)
}

// configParam render config parameters. All the parameters can be customized with CustomFunction.
// ConfigParam explain:
// 1. Type: key format, support JSON and YAML, JSON by default. Could extend it by implementing the ConfigParser interface.
// 2. Content: empty by default. Customize with CustomFunction.
// 3. NameSpace: select by user (retry / circuit_breaker / rpc_timeout / limit).
// 3. nameSpace: select by user (retry / circuit_breaker / rpc_timeout / limit).
// 4. ServerKey: {{.ServerServiceName}} by default.
// ClientKey: {{.ClientServiceName}}.{{.ServerServiceName}} by default.
// 5. Cluster: default by default
func (c *client) configParam(cpc *ConfigParamConfig, t *template.Template) (ConfigParam, error) {
func (c *client) configParam(cpc *cwutils.ConfigParamConfig, t *template.Template) (ConfigParam, error) {
param := ConfigParam{
Type: JSON,
Type: cwutils.JSON,
nameSpace: cpc.Category,
}
var err error
Expand Down Expand Up @@ -244,7 +245,7 @@ func (c *client) onChange(namespace, cluster, key, data string) {

// RegisterConfigCallback register the callback function to apollo client.
func (c *client) RegisterConfigCallback(param ConfigParam,
callback func(string, ConfigParser), uniqueID int64,
callback func(string, cwutils.ConfigParser), uniqueID int64,
) {
onChange := func(namespace, cluster, key, data string) {
klog.Debugf("[apollo] uniqueID %d config %s updated, namespace %s cluster %s key %s data %s",
Expand Down
5 changes: 3 additions & 2 deletions config/apollo/apollo/apollo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package apollo

import (
cwutils "github.com/cloudwego-contrib/cwgo-pkg/config/utils"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -107,7 +108,7 @@ func TestRegisterAndDeregister(t *testing.T) {
Key: "k1",
nameSpace: "n1",
Cluster: "c1",
}, func(s string, cp ConfigParser) {
}, func(s string, cp cwutils.ConfigParser) {
gotlock.Lock()
defer gotlock.Unlock()
ids, ok := gots[configkey]
Expand All @@ -133,7 +134,7 @@ func TestRegisterAndDeregister(t *testing.T) {
Key: "k1",
nameSpace: "n1",
Cluster: "c1",
}, func(s string, cp ConfigParser) {
}, func(s string, cp cwutils.ConfigParser) {
gotlock.Lock()
defer gotlock.Unlock()
klog.Info("onchange callback2:", s)
Expand Down
59 changes: 7 additions & 52 deletions config/apollo/apollo/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,62 +14,17 @@

package apollo

import (
"fmt"

"github.com/bytedance/sonic"
)

// CustomFunction use for customize the config parameters.
type (
CustomFunction func(*ConfigParam)
ConfigType string
ConfigContent string
)
type CustomFunction func(*ConfigParam)

const (
JSON ConfigType = "json"
YAML ConfigType = "yaml"
ApolloDefaultConfigServerURL = "127.0.0.1:8080"
ApolloDefaultAppId = "KitexApp"
ApolloDefaultCluster = "default"
ApolloNameSpace = "{{.Category}}"
ApolloDefaultClientKey = "{{.ClientServiceName}}.{{.ServerServiceName}}"
ApolloDefaultServerKey = "{{.ServerServiceName}}"
ApolloDefaultConfigServerURL = "127.0.0.1:8080"
ApolloDefaultAppId = "KitexApp"
ApolloDefaultCluster = "default"
ApolloNameSpace = "{{.Category}}"
ApolloDefaultClientKey = "{{.ClientServiceName}}.{{.ServerServiceName}}"
ApolloDefaultServerKey = "{{.ServerServiceName}}"
)

const (
emptyConfig string = "{}"
)

// ConfigParamConfig use for render the dataId or group info by go template, ref: https://pkg.go.dev/text/template
// The fixed key shows as below.
type ConfigParamConfig struct {
Category string
ClientServiceName string
ServerServiceName string
}

var _ ConfigParser = &parser{}

// ConfigParser the parser for Apollo config.
type ConfigParser interface {
Decode(kind ConfigType, data string, config interface{}) error
}

type parser struct{}

// Decode decodes the data to struct in specified format.
func (p *parser) Decode(kind ConfigType, data string, config interface{}) error {
switch kind {
case JSON:
return sonic.Unmarshal([]byte(data), config)
default:
return fmt.Errorf("unsupported config data type %s", kind)
}
}

// DefaultConfigParse default apollo config parser.
func defaultConfigParse() ConfigParser {
return &parser{}
}
10 changes: 5 additions & 5 deletions config/apollo/client/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
package client

import (
cwutils "github.com/cloudwego-contrib/cwgo-pkg/config/utils"
"strings"

"github.com/cloudwego-contrib/cwgo-pkg/config/apollo/apollo"
"github.com/cloudwego-contrib/cwgo-pkg/config/apollo/utils"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/klog"
Expand All @@ -30,7 +30,7 @@ import (
func WithCircuitBreaker(dest, src string, apolloClient apollo.Client,
opts utils.Options,
) []client.Option {
param, err := apolloClient.ClientConfigParam(&apollo.ConfigParamConfig{
param, err := apolloClient.ClientConfigParam(&cwutils.ConfigParamConfig{
Category: apollo.CircuitBreakerConfigName,
ServerServiceName: dest,
ClientServiceName: src,
Expand Down Expand Up @@ -83,10 +83,10 @@ func initCircuitBreaker(param apollo.ConfigParam, dest, src string,
apolloClient apollo.Client, uniqueID int64,
) *circuitbreak.CBSuite {
cb := circuitbreak.NewCBSuite(genServiceCBKeyWithRPCInfo)
lcb := utils.ThreadSafeSet{}
lcb := cwutils.ThreadSafeSet{}

onChangeCallback := func(data string, parser apollo.ConfigParser) {
set := utils.Set{}
onChangeCallback := func(data string, parser cwutils.ConfigParser) {
set := cwutils.Set{}
configs := map[string]circuitbreak.CBConfig{}
err := parser.Decode(param.Type, data, &configs)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions config/apollo/client/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package client
import (
"github.com/cloudwego-contrib/cwgo-pkg/config/apollo/apollo"
"github.com/cloudwego-contrib/cwgo-pkg/config/apollo/utils"
cwutils "github.com/cloudwego-contrib/cwgo-pkg/config/utils"
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/retry"
Expand All @@ -25,7 +26,7 @@ import (
func WithRetryPolicy(dest, src string, apolloClient apollo.Client,
opts utils.Options,
) []client.Option {
param, err := apolloClient.ClientConfigParam(&apollo.ConfigParamConfig{
param, err := apolloClient.ClientConfigParam(&cwutils.ConfigParamConfig{
Category: apollo.RetryConfigName,
ServerServiceName: dest,
ClientServiceName: src,
Expand Down Expand Up @@ -59,9 +60,9 @@ func initRetryContainer(param apollo.ConfigParam, dest string,
) *retry.Container {
retryContainer := retry.NewRetryContainerWithPercentageLimit()

ts := utils.ThreadSafeSet{}
ts := cwutils.ThreadSafeSet{}

onChangeCallback := func(data string, parser apollo.ConfigParser) {
onChangeCallback := func(data string, parser cwutils.ConfigParser) {
// the key is method name, wildcard "*" can match anything.
rcs := map[string]*retry.Policy{}
err := parser.Decode(param.Type, data, &rcs)
Expand All @@ -70,7 +71,7 @@ func initRetryContainer(param apollo.ConfigParam, dest string,
return
}

set := utils.Set{}
set := cwutils.Set{}
for method, policy := range rcs {
set[method] = true
if policy.BackupPolicy != nil && policy.FailurePolicy != nil {
Expand Down
5 changes: 3 additions & 2 deletions config/apollo/client/rpc_timetout.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package client
import (
"github.com/cloudwego-contrib/cwgo-pkg/config/apollo/apollo"
"github.com/cloudwego-contrib/cwgo-pkg/config/apollo/utils"
cwutils "github.com/cloudwego-contrib/cwgo-pkg/config/utils"
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
Expand All @@ -27,7 +28,7 @@ import (
func WithRPCTimeout(dest, src string, apolloClient apollo.Client,
opts utils.Options,
) []client.Option {
param, err := apolloClient.ClientConfigParam(&apollo.ConfigParamConfig{
param, err := apolloClient.ClientConfigParam(&cwutils.ConfigParamConfig{
Category: apollo.RpcTimeoutConfigName,
ServerServiceName: dest,
ClientServiceName: src,
Expand Down Expand Up @@ -55,7 +56,7 @@ func initRPCTimeoutContainer(param apollo.ConfigParam, dest string,
) rpcinfo.TimeoutProvider {
rpcTimeoutContainer := rpctimeout.NewContainer()

onChangeCallback := func(data string, parser apollo.ConfigParser) {
onChangeCallback := func(data string, parser cwutils.ConfigParser) {
configs := map[string]*rpctimeout.RPCTimeout{}
err := parser.Decode(param.Type, data, &configs)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions config/apollo/client/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@
package client

import (
"github.com/cloudwego-contrib/cwgo-pkg/config/apollo/apollo"
cwutils "github.com/cloudwego-contrib/cwgo-pkg/config/apollo/apollo"
"github.com/cloudwego-contrib/cwgo-pkg/config/apollo/utils"
"github.com/cloudwego/kitex/client"
)

type ApolloClientSuite struct {
apolloClient apollo.Client
apolloClient cwutils.Client
service string
client string
opts utils.Options
}

type ClientSuiteOption func(*ApolloClientSuite)

func NewSuite(service, client string, cli apollo.Client,
func NewSuite(service, client string, cli cwutils.Client,
options ...utils.Option,
) *ApolloClientSuite {
client_suite := &ApolloClientSuite{
Expand Down
1 change: 1 addition & 0 deletions config/apollo/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/shima-park/agollo v1.2.14
go.uber.org/atomic v1.11.0
gopkg.in/go-playground/assert.v1 v1.2.1

)

require (
Expand Down
5 changes: 3 additions & 2 deletions config/apollo/server/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package server

import (
cwutils "github.com/cloudwego-contrib/cwgo-pkg/config/utils"
"sync/atomic"

"github.com/cloudwego/kitex/pkg/klog"
Expand All @@ -30,7 +31,7 @@ import (
func WithLimiter(dest string, apolloClient apollo.Client,
opts utils.Options,
) server.Option {
param, err := apolloClient.ServerConfigParam(&apollo.ConfigParamConfig{
param, err := apolloClient.ServerConfigParam(&cwutils.ConfigParamConfig{
Category: apollo.LimiterConfigName,
ServerServiceName: dest,
})
Expand All @@ -55,7 +56,7 @@ func initLimitOptions(param apollo.ConfigParam, dest string, apolloClient apollo
u.UpdateLimit(opt)
updater.Store(u)
}
onChangeCallback := func(data string, parser apollo.ConfigParser) {
onChangeCallback := func(data string, parser cwutils.ConfigParser) {
lc := &limiter.LimiterConfig{}
err := parser.Decode(param.Type, data, lc)
if err != nil {
Expand Down
Loading

0 comments on commit 7ea5238

Please sign in to comment.