diff --git a/internal/api/admin/config_manager.go b/internal/api/admin/config_manager.go new file mode 100644 index 000000000..414965d08 --- /dev/null +++ b/internal/api/admin/config_manager.go @@ -0,0 +1,202 @@ +package admin + +import ( + "encoding/json" + "reflect" + "strconv" + "time" + + "github.com/gin-gonic/gin" + "github.com/openimsdk/chat/pkg/common/apistruct" + "github.com/openimsdk/chat/pkg/common/config" + "github.com/openimsdk/chat/pkg/common/kdisc" + "github.com/openimsdk/chat/pkg/common/kdisc/etcd" + "github.com/openimsdk/chat/version" + "github.com/openimsdk/tools/apiresp" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/runtimeenv" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type ConfigManager struct { + config *config.AllConfig + client *clientv3.Client + configPath string + runtimeEnv string +} + +func NewConfigManager(cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager { + return &ConfigManager{ + config: cfg, + client: client, + configPath: configPath, + runtimeEnv: runtimeEnv, + } +} + +func (cm *ConfigManager) GetConfig(c *gin.Context) { + var req apistruct.GetConfigReq + if err := c.BindJSON(&req); err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + conf := cm.config.Name2Config(req.ConfigName) + if conf == nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap()) + return + } + b, err := json.Marshal(conf) + if err != nil { + apiresp.GinError(c, err) + return + } + apiresp.GinSuccess(c, string(b)) +} + +func (cm *ConfigManager) GetConfigList(c *gin.Context) { + var resp apistruct.GetConfigListResp + resp.ConfigNames = cm.config.GetConfigNames() + resp.Environment = runtimeenv.PrintRuntimeEnvironment() + resp.Version = version.Version + + apiresp.GinSuccess(c, resp) +} + +func (cm *ConfigManager) SetConfig(c *gin.Context) { + if cm.config.Discovery.Enable != kdisc.ETCDCONST { + apiresp.GinError(c, errs.New("only etcd support set config").Wrap()) + return + } + var req apistruct.SetConfigReq + if err := c.BindJSON(&req); err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + var err error + switch req.ConfigName { + case config.DiscoveryConfigFileName: + err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case config.LogConfigFileName: + err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case config.MongodbConfigFileName: + err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case config.ChatAPIAdminCfgFileName: + err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case config.ChatAPIChatCfgFileName: + err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case config.ChatRPCAdminCfgFileName: + err = compareAndSave[config.Admin](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case config.ChatRPCChatCfgFileName: + err = compareAndSave[config.Chat](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case config.ShareFileName: + err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case config.RedisConfigFileName: + err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + default: + apiresp.GinError(c, errs.ErrArgs.Wrap()) + return + } + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + apiresp.GinSuccess(c, nil) +} + +func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, client *clientv3.Client) error { + conf := new(T) + err := json.Unmarshal([]byte(req.Data), &conf) + if err != nil { + return errs.ErrArgs.WithDetail(err.Error()).Wrap() + } + eq := reflect.DeepEqual(old, conf) + if eq { + return nil + } + data, err := json.Marshal(conf) + if err != nil { + return errs.ErrArgs.WithDetail(err.Error()).Wrap() + } + _, err = client.Put(c, etcd.BuildKey(req.ConfigName), string(data)) + if err != nil { + return errs.WrapMsg(err, "save to etcd failed") + } + return nil +} + +func (cm *ConfigManager) ResetConfig(c *gin.Context) { + go cm.resetConfig(c) + apiresp.GinSuccess(c, nil) +} + +func (cm *ConfigManager) resetConfig(c *gin.Context) { + txn := cm.client.Txn(c) + type initConf struct { + old any + new any + isChanged bool + } + configMap := map[string]*initConf{ + config.DiscoveryConfigFileName: {old: &cm.config.Discovery, new: new(config.Discovery)}, + config.LogConfigFileName: {old: &cm.config.Log, new: new(config.Log)}, + config.MongodbConfigFileName: {old: &cm.config.Mongo, new: new(config.Mongo)}, + config.ChatAPIAdminCfgFileName: {old: &cm.config.AdminAPI, new: new(config.API)}, + config.ChatAPIChatCfgFileName: {old: &cm.config.ChatAPI, new: new(config.API)}, + config.ChatRPCAdminCfgFileName: {old: &cm.config.Admin, new: new(config.Admin)}, + config.ChatRPCChatCfgFileName: {old: &cm.config.Chat, new: new(config.Chat)}, + config.RedisConfigFileName: {old: &cm.config.Redis, new: new(config.Redis)}, + config.ShareFileName: {old: &cm.config.Share, new: new(config.Share)}, + } + + changedKeys := make([]string, 0, len(configMap)) + for k, v := range configMap { + err := config.Load( + cm.configPath, + k, + config.EnvPrefixMap[k], + cm.runtimeEnv, + v.new, + ) + if err != nil { + log.ZError(c, "load config failed", err) + continue + } + v.isChanged = reflect.DeepEqual(v.old, v.new) + if !v.isChanged { + changedKeys = append(changedKeys, k) + } + } + + ops := make([]clientv3.Op, 0) + for _, k := range changedKeys { + data, err := json.Marshal(configMap[k].new) + if err != nil { + log.ZError(c, "marshal config failed", err) + continue + } + ops = append(ops, clientv3.OpPut(etcd.BuildKey(k), string(data))) + } + if len(ops) > 0 { + txn.Then(ops...) + _, err := txn.Commit() + if err != nil { + log.ZError(c, "commit etcd txn failed", err) + return + } + } +} + +func (cm *ConfigManager) Restart(c *gin.Context) { + go cm.restart(c) + apiresp.GinSuccess(c, nil) +} + +func (cm *ConfigManager) restart(c *gin.Context) { + time.Sleep(time.Millisecond * 200) // wait for Restart http call return + t := time.Now().Unix() + _, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t))) + if err != nil { + log.ZError(c, "restart etcd put key failed", err) + } +} diff --git a/internal/api/admin/start.go b/internal/api/admin/start.go index 32d2623b9..4740e3aa4 100644 --- a/internal/api/admin/start.go +++ b/internal/api/admin/start.go @@ -2,7 +2,13 @@ package admin import ( "context" + "errors" "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "time" "github.com/gin-gonic/gin" chatmw "github.com/openimsdk/chat/internal/api/mw" @@ -10,24 +16,26 @@ import ( "github.com/openimsdk/chat/pkg/common/config" "github.com/openimsdk/chat/pkg/common/imapi" "github.com/openimsdk/chat/pkg/common/kdisc" + disetcd "github.com/openimsdk/chat/pkg/common/kdisc/etcd" adminclient "github.com/openimsdk/chat/pkg/protocol/admin" chatclient "github.com/openimsdk/chat/pkg/protocol/chat" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/mw" + "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/runtimeenv" + clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - - "github.com/openimsdk/tools/utils/runtimeenv" ) type Config struct { - ApiConfig config.API - - Discovery config.Discovery - Share config.Share + *config.AllConfig RuntimeEnv string + ConfigPath string } func Start(ctx context.Context, index int, config *Config) error { @@ -36,7 +44,7 @@ func Start(ctx context.Context, index int, config *Config) error { if len(config.Share.ChatAdmin) == 0 { return errs.New("share chat admin not configured") } - apiPort, err := datautil.GetElemByIndex(config.ApiConfig.Api.Ports, index) + apiPort, err := datautil.GetElemByIndex(config.AdminAPI.Api.Ports, index) if err != nil { return err } @@ -66,11 +74,51 @@ func Start(ctx context.Context, index int, config *Config) error { gin.SetMode(gin.ReleaseMode) engine := gin.New() engine.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID()) - SetAdminRoute(engine, adminApi, mwApi) - return engine.Run(fmt.Sprintf(":%d", apiPort)) + SetAdminRoute(engine, adminApi, mwApi, config, client) + + if config.Discovery.Enable == kdisc.ETCDCONST { + cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames()) + cm.Watch(ctx) + } + var ( + netDone = make(chan struct{}, 1) + netErr error + ) + server := http.Server{Addr: fmt.Sprintf(":%d", apiPort), Handler: engine} + go func() { + err = server.ListenAndServe() + if err != nil && !errors.Is(err, http.ErrServerClosed) { + netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr)) + netDone <- struct{}{} + } + }() + shutdown := func() error { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + err := server.Shutdown(ctx) + if err != nil { + return errs.WrapMsg(err, "shutdown err") + } + return nil + } + disetcd.RegisterShutDown(shutdown) + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM) + select { + case <-sigs: + program.SIGTERMExit() + if err := shutdown(); err != nil { + return err + } + case <-netDone: + close(netDone) + return netErr + } + return nil } -func SetAdminRoute(router gin.IRouter, admin *Api, mw *chatmw.MW) { +func SetAdminRoute(router gin.IRouter, admin *Api, mw *chatmw.MW, cfg *Config, client discovery.SvcDiscoveryRegistry) { adminRouterGroup := router.Group("/account") adminRouterGroup.POST("/login", admin.AdminLogin) // Login @@ -149,4 +197,20 @@ func SetAdminRoute(router gin.IRouter, admin *Api, mw *chatmw.MW) { applicationGroup.POST("/delete_version", mw.CheckAdmin, admin.DeleteApplicationVersion) applicationGroup.POST("/latest_version", admin.LatestApplicationVersion) applicationGroup.POST("/page_versions", admin.PageApplicationVersion) + + var etcdClient *clientv3.Client + if cfg.Discovery.Enable == kdisc.ETCDCONST { + etcdClient = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + } + cm := NewConfigManager(cfg.AllConfig, etcdClient, cfg.ConfigPath, cfg.RuntimeEnv) + { + configGroup := router.Group("/config", mw.CheckAdmin) + configGroup.POST("/get_config_list", cm.GetConfigList) + configGroup.POST("/get_config", cm.GetConfig) + configGroup.POST("/set_config", cm.SetConfig) + configGroup.POST("/reset_config", cm.ResetConfig) + } + { + router.POST("/restart", mw.CheckAdmin, cm.Restart) + } } diff --git a/internal/api/chat/start.go b/internal/api/chat/start.go index 57dc73ce1..da83468ee 100644 --- a/internal/api/chat/start.go +++ b/internal/api/chat/start.go @@ -2,7 +2,13 @@ package chat import ( "context" + "errors" "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "time" "github.com/gin-gonic/gin" chatmw "github.com/openimsdk/chat/internal/api/mw" @@ -10,10 +16,13 @@ import ( "github.com/openimsdk/chat/pkg/common/config" "github.com/openimsdk/chat/pkg/common/imapi" "github.com/openimsdk/chat/pkg/common/kdisc" + disetcd "github.com/openimsdk/chat/pkg/common/kdisc/etcd" adminclient "github.com/openimsdk/chat/pkg/protocol/admin" chatclient "github.com/openimsdk/chat/pkg/protocol/chat" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/mw" + "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/runtimeenv" "google.golang.org/grpc" @@ -28,36 +37,36 @@ type Config struct { RuntimeEnv string } -func Start(ctx context.Context, index int, config *Config) error { - config.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment() +func Start(ctx context.Context, index int, cfg *Config) error { + cfg.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment() - if len(config.Share.ChatAdmin) == 0 { + if len(cfg.Share.ChatAdmin) == 0 { return errs.New("share chat admin not configured") } - apiPort, err := datautil.GetElemByIndex(config.ApiConfig.Api.Ports, index) + apiPort, err := datautil.GetElemByIndex(cfg.ApiConfig.Api.Ports, index) if err != nil { return err } - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv) + client, err := kdisc.NewDiscoveryRegister(&cfg.Discovery, cfg.RuntimeEnv) if err != nil { return err } - chatConn, err := client.GetConn(ctx, config.Discovery.RpcService.Chat, grpc.WithTransportCredentials(insecure.NewCredentials()), mw.GrpcClient()) + chatConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Chat, grpc.WithTransportCredentials(insecure.NewCredentials()), mw.GrpcClient()) if err != nil { return err } - adminConn, err := client.GetConn(ctx, config.Discovery.RpcService.Admin, grpc.WithTransportCredentials(insecure.NewCredentials()), mw.GrpcClient()) + adminConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Admin, grpc.WithTransportCredentials(insecure.NewCredentials()), mw.GrpcClient()) if err != nil { return err } chatClient := chatclient.NewChatClient(chatConn) adminClient := adminclient.NewAdminClient(adminConn) - im := imapi.New(config.Share.OpenIM.ApiURL, config.Share.OpenIM.Secret, config.Share.OpenIM.AdminUserID) + im := imapi.New(cfg.Share.OpenIM.ApiURL, cfg.Share.OpenIM.Secret, cfg.Share.OpenIM.AdminUserID) base := util.Api{ - ImUserID: config.Share.OpenIM.AdminUserID, - ProxyHeader: config.Share.ProxyHeader, - ChatAdminUserID: config.Share.ChatAdmin[0], + ImUserID: cfg.Share.OpenIM.AdminUserID, + ProxyHeader: cfg.Share.ProxyHeader, + ChatAdminUserID: cfg.Share.ChatAdmin[0], } adminApi := New(chatClient, adminClient, im, &base) mwApi := chatmw.New(adminClient) @@ -65,7 +74,53 @@ func Start(ctx context.Context, index int, config *Config) error { engine := gin.New() engine.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID()) SetChatRoute(engine, adminApi, mwApi) - return engine.Run(fmt.Sprintf(":%d", apiPort)) + + var ( + netDone = make(chan struct{}, 1) + netErr error + ) + server := http.Server{Addr: fmt.Sprintf(":%d", apiPort), Handler: engine} + go func() { + err = server.ListenAndServe() + if err != nil && !errors.Is(err, http.ErrServerClosed) { + netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr)) + netDone <- struct{}{} + } + }() + if cfg.Discovery.Enable == kdisc.ETCDCONST { + cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), + []string{ + config.ChatAPIChatCfgFileName, + config.DiscoveryConfigFileName, + config.ShareFileName, + }, + ) + cm.Watch(ctx) + } + shutdown := func() error { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + err := server.Shutdown(ctx) + if err != nil { + return errs.WrapMsg(err, "shutdown err") + } + return nil + } + disetcd.RegisterShutDown(shutdown) + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM) + select { + case <-sigs: + program.SIGTERMExit() + if err := shutdown(); err != nil { + return err + } + case <-netDone: + close(netDone) + return netErr + } + return nil } func SetChatRoute(router gin.IRouter, chat *Api, mw *chatmw.MW) { diff --git a/pkg/common/apistruct/config_manager.go b/pkg/common/apistruct/config_manager.go new file mode 100644 index 000000000..84b8fb36b --- /dev/null +++ b/pkg/common/apistruct/config_manager.go @@ -0,0 +1,16 @@ +package apistruct + +type GetConfigReq struct { + ConfigName string `json:"configName"` +} + +type GetConfigListResp struct { + Environment string `json:"environment"` + Version string `json:"version"` + ConfigNames []string `json:"configNames"` +} + +type SetConfigReq struct { + ConfigName string `json:"configName"` + Data string `json:"data"` +} diff --git a/pkg/common/cmd/admin_api.go b/pkg/common/cmd/admin_api.go index 47b2469f9..e3c31613e 100644 --- a/pkg/common/cmd/admin_api.go +++ b/pkg/common/cmd/admin_api.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "github.com/openimsdk/chat/internal/api/admin" "github.com/openimsdk/chat/pkg/common/config" "github.com/openimsdk/tools/system/program" @@ -16,15 +17,24 @@ type AdminApiCmd struct { } func NewAdminApiCmd() *AdminApiCmd { - var ret AdminApiCmd + ret := AdminApiCmd{apiConfig: admin.Config{ + AllConfig: &config.AllConfig{}, + }} ret.configMap = map[string]any{ - ShareFileName: &ret.apiConfig.Share, - ChatAPIAdminCfgFileName: &ret.apiConfig.ApiConfig, - DiscoveryConfigFileName: &ret.apiConfig.Discovery, + config.DiscoveryConfigFileName: &ret.apiConfig.Discovery, + config.LogConfigFileName: &ret.apiConfig.Log, + config.MongodbConfigFileName: &ret.apiConfig.Mongo, + config.ChatAPIAdminCfgFileName: &ret.apiConfig.AdminAPI, + config.ChatAPIChatCfgFileName: &ret.apiConfig.ChatAPI, + config.ChatRPCAdminCfgFileName: &ret.apiConfig.Admin, + config.ChatRPCChatCfgFileName: &ret.apiConfig.Chat, + config.RedisConfigFileName: &ret.apiConfig.Redis, + config.ShareFileName: &ret.apiConfig.Share, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { + ret.apiConfig.ConfigPath = ret.configPath return ret.runE() } return &ret diff --git a/pkg/common/cmd/admin_rpc.go b/pkg/common/cmd/admin_rpc.go index 3cea0c2f7..e46c929a3 100644 --- a/pkg/common/cmd/admin_rpc.go +++ b/pkg/common/cmd/admin_rpc.go @@ -34,11 +34,11 @@ type AdminRpcCmd struct { func NewAdminRpcCmd() *AdminRpcCmd { var ret AdminRpcCmd ret.configMap = map[string]any{ - ChatRPCAdminCfgFileName: &ret.adminConfig.RpcConfig, - RedisConfigFileName: &ret.adminConfig.RedisConfig, - DiscoveryConfigFileName: &ret.adminConfig.Discovery, - MongodbConfigFileName: &ret.adminConfig.MongodbConfig, - ShareFileName: &ret.adminConfig.Share, + config.ChatRPCAdminCfgFileName: &ret.adminConfig.RpcConfig, + config.RedisConfigFileName: &ret.adminConfig.RedisConfig, + config.DiscoveryConfigFileName: &ret.adminConfig.Discovery, + config.MongodbConfigFileName: &ret.adminConfig.MongodbConfig, + config.ShareFileName: &ret.adminConfig.Share, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -55,5 +55,13 @@ func (a *AdminRpcCmd) Exec() error { func (a *AdminRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.adminConfig.Discovery, a.adminConfig.RpcConfig.RPC.ListenIP, a.adminConfig.RpcConfig.RPC.RegisterIP, a.adminConfig.RpcConfig.RPC.Ports, - a.Index(), a.adminConfig.Discovery.RpcService.Admin, &a.adminConfig.Share, &a.adminConfig, admin.Start) + a.Index(), a.adminConfig.Discovery.RpcService.Admin, &a.adminConfig.Share, &a.adminConfig, + []string{ + config.ChatRPCAdminCfgFileName, + config.RedisConfigFileName, + config.DiscoveryConfigFileName, + config.MongodbConfigFileName, + config.ShareFileName, + }, + admin.Start) } diff --git a/pkg/common/cmd/chat_api.go b/pkg/common/cmd/chat_api.go index 5e85f11c2..9c8da44e3 100644 --- a/pkg/common/cmd/chat_api.go +++ b/pkg/common/cmd/chat_api.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "github.com/openimsdk/chat/internal/api/chat" "github.com/openimsdk/chat/pkg/common/config" "github.com/openimsdk/tools/system/program" @@ -18,9 +19,9 @@ type ChatApiCmd struct { func NewChatApiCmd() *ChatApiCmd { var ret ChatApiCmd ret.configMap = map[string]any{ - ShareFileName: &ret.apiConfig.Share, - ChatAPIChatCfgFileName: &ret.apiConfig.ApiConfig, - DiscoveryConfigFileName: &ret.apiConfig.Discovery, + config.ShareFileName: &ret.apiConfig.Share, + config.ChatAPIChatCfgFileName: &ret.apiConfig.ApiConfig, + config.DiscoveryConfigFileName: &ret.apiConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/chat_rpc.go b/pkg/common/cmd/chat_rpc.go index 9bc5e72f6..7f4bd859b 100644 --- a/pkg/common/cmd/chat_rpc.go +++ b/pkg/common/cmd/chat_rpc.go @@ -34,11 +34,11 @@ type ChatRpcCmd struct { func NewChatRpcCmd() *ChatRpcCmd { var ret ChatRpcCmd ret.configMap = map[string]any{ - ChatRPCChatCfgFileName: &ret.chatConfig.RpcConfig, - RedisConfigFileName: &ret.chatConfig.RedisConfig, - DiscoveryConfigFileName: &ret.chatConfig.Discovery, - MongodbConfigFileName: &ret.chatConfig.MongodbConfig, - ShareFileName: &ret.chatConfig.Share, + config.ChatRPCChatCfgFileName: &ret.chatConfig.RpcConfig, + config.RedisConfigFileName: &ret.chatConfig.RedisConfig, + config.DiscoveryConfigFileName: &ret.chatConfig.Discovery, + config.MongodbConfigFileName: &ret.chatConfig.MongodbConfig, + config.ShareFileName: &ret.chatConfig.Share, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -55,5 +55,13 @@ func (a *ChatRpcCmd) Exec() error { func (a *ChatRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.chatConfig.Discovery, a.chatConfig.RpcConfig.RPC.ListenIP, a.chatConfig.RpcConfig.RPC.RegisterIP, a.chatConfig.RpcConfig.RPC.Ports, - a.Index(), a.chatConfig.Discovery.RpcService.Chat, &a.chatConfig.Share, &a.chatConfig, chat.Start) + a.Index(), a.chatConfig.Discovery.RpcService.Chat, &a.chatConfig.Share, &a.chatConfig, + []string{ + config.ChatRPCChatCfgFileName, + config.RedisConfigFileName, + config.DiscoveryConfigFileName, + config.MongodbConfigFileName, + config.ShareFileName, + }, + chat.Start) } diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index 4cea0a16d..6521e551b 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -15,10 +15,16 @@ package cmd import ( + "context" + "encoding/json" "fmt" "github.com/openimsdk/chat/pkg/common/config" + "github.com/openimsdk/chat/pkg/common/kdisc" + disetcd "github.com/openimsdk/chat/pkg/common/kdisc/etcd" "github.com/openimsdk/chat/version" + "github.com/openimsdk/tools/discovery/etcd" + clientv3 "go.etcd.io/etcd/client/v3" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -34,6 +40,8 @@ type RootCmd struct { prometheusPort int log config.Log index int + configPath string + etcdClient *clientv3.Client } func (r *RootCmd) Index() int { @@ -71,19 +79,43 @@ func NewRootCmd(processName string, opts ...func(*CmdOpts)) *RootCmd { SilenceUsage: true, SilenceErrors: false, } - cmd.Flags().StringP(FlagConf, "c", "", "path of config directory") - cmd.Flags().IntP(FlagTransferIndex, "i", 0, "process startup sequence number") + cmd.Flags().StringP(config.FlagConf, "c", "", "path of config directory") + cmd.Flags().IntP(config.FlagTransferIndex, "i", 0, "process startup sequence number") rootCmd.Command = cmd return rootCmd } +func (r *RootCmd) initEtcd() error { + configDirectory, _, err := r.getFlag(&r.Command) + if err != nil { + return err + } + disConfig := config.Discovery{} + env := runtimeenv.PrintRuntimeEnvironment() + err = config.Load(configDirectory, config.DiscoveryConfigFileName, config.EnvPrefixMap[config.DiscoveryConfigFileName], + env, &disConfig) + if err != nil { + return err + } + if disConfig.Enable == kdisc.ETCDCONST { + discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env) + r.etcdClient = discov.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + } + return nil +} + func (r *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts)) error { + if err := r.initEtcd(); err != nil { + return err + } cmdOpts := r.applyOptions(opts...) if err := r.initializeConfiguration(cmd, cmdOpts); err != nil { return err } - + if err := r.updateConfigFromEtcd(cmdOpts); err != nil { + return err + } if err := r.initializeLogger(cmdOpts); err != nil { return errs.WrapMsg(err, "failed to initialize logger") } @@ -96,6 +128,7 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err if err != nil { return err } + r.configPath = configDirectory runtimeEnv := runtimeenv.PrintRuntimeEnvironment() @@ -103,14 +136,57 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err //opts.configMap[ShareFileName] = StructEnvPrefix{EnvPrefix: shareEnvPrefix, ConfigStruct: &r.share} for configFileName, configStruct := range opts.configMap { err := config.Load(configDirectory, configFileName, - ConfigEnvPrefixMap[configFileName], runtimeEnv, configStruct) + config.EnvPrefixMap[configFileName], runtimeEnv, configStruct) + if err != nil { + return err + } + } + // Load common log configuration file + return config.Load(configDirectory, config.LogConfigFileName, + config.EnvPrefixMap[config.LogConfigFileName], runtimeEnv, &r.log) +} + +func (r *RootCmd) updateConfigFromEtcd(opts *CmdOpts) error { + if r.etcdClient == nil { + return nil + } + + update := func(configFileName string, configStruct any) error { + ctx := context.TODO() + key := disetcd.BuildKey(configFileName) + etcdRes, err := r.etcdClient.Get(ctx, key) if err != nil { + log.ZWarn(ctx, "root cmd updateConfigFromEtcd, etcd Get err: %v", errs.Wrap(err)) + return nil + } + if etcdRes.Count == 0 { + data, err := json.Marshal(configStruct) + if err != nil { + return errs.ErrArgs.WithDetail(err.Error()).Wrap() + } + _, err = r.etcdClient.Put(ctx, disetcd.BuildKey(configFileName), string(data)) + if err != nil { + log.ZWarn(ctx, "root cmd updateConfigFromEtcd, etcd Put err: %v", errs.Wrap(err)) + } + return nil + } + err = json.Unmarshal(etcdRes.Kvs[0].Value, configStruct) + if err != nil { + return errs.WrapMsg(err, "failed to unmarshal config from etcd") + } + return nil + } + for configFileName, configStruct := range opts.configMap { + if err := update(configFileName, configStruct); err != nil { return err } } + if err := update(config.LogConfigFileName, &r.log); err != nil { + return err + } // Load common log configuration file - return config.Load(configDirectory, LogConfigFileName, - ConfigEnvPrefixMap[LogConfigFileName], runtimeEnv, &r.log) + return nil + } func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts { @@ -150,11 +226,11 @@ func defaultCmdOpts() *CmdOpts { } func (r *RootCmd) getFlag(cmd *cobra.Command) (string, int, error) { - configDirectory, err := cmd.Flags().GetString(FlagConf) + configDirectory, err := cmd.Flags().GetString(config.FlagConf) if err != nil { return "", 0, errs.Wrap(err) } - index, err := cmd.Flags().GetInt(FlagTransferIndex) + index, err := cmd.Flags().GetInt(config.FlagTransferIndex) if err != nil { return "", 0, errs.Wrap(err) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 4acb7ca33..53d2db26c 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -168,3 +168,54 @@ type Log struct { IsSimplify bool `mapstructure:"isSimplify"` WithStack bool `mapstructure:"withStack"` } + +type AllConfig struct { + AdminAPI API + ChatAPI API + Admin Admin + Chat Chat + Discovery Discovery + Log Log + Mongo Mongo + Redis Redis + Share Share +} + +func (a *AllConfig) Name2Config(name string) any { + switch name { + case ChatAPIAdminCfgFileName: + return a.AdminAPI + case ChatAPIChatCfgFileName: + return a.ChatAPI + case ChatRPCAdminCfgFileName: + return a.Admin + case ChatRPCChatCfgFileName: + return a.Chat + case DiscoveryConfigFileName: + return a.Discovery + case LogConfigFileName: + return a.Log + case MongodbConfigFileName: + return a.Mongo + case RedisConfigFileName: + return a.Redis + case ShareFileName: + return a.Share + default: + return nil + } +} + +func (a *AllConfig) GetConfigNames() []string { + return []string{ + ShareFileName, + RedisConfigFileName, + DiscoveryConfigFileName, + MongodbConfigFileName, + LogConfigFileName, + ChatAPIAdminCfgFileName, + ChatAPIChatCfgFileName, + ChatRPCAdminCfgFileName, + ChatRPCChatCfgFileName, + } +} diff --git a/pkg/common/cmd/constant.go b/pkg/common/config/env.go similarity index 56% rename from pkg/common/cmd/constant.go rename to pkg/common/config/env.go index b07147d69..aa3170cce 100644 --- a/pkg/common/cmd/constant.go +++ b/pkg/common/config/env.go @@ -1,18 +1,4 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cmd +package config import ( "strings" @@ -30,10 +16,10 @@ var ( ChatRPCChatCfgFileName = "chat-rpc-chat.yml" ) -var ConfigEnvPrefixMap map[string]string +var EnvPrefixMap map[string]string func init() { - ConfigEnvPrefixMap = make(map[string]string) + EnvPrefixMap = make(map[string]string) fileNames := []string{ ShareFileName, RedisConfigFileName, @@ -50,7 +36,7 @@ func init() { envKey := strings.TrimSuffix(strings.TrimSuffix(fileName, ".yml"), ".yaml") envKey = "CHATENV_" + envKey envKey = strings.ToUpper(strings.ReplaceAll(envKey, "-", "_")) - ConfigEnvPrefixMap[fileName] = envKey + EnvPrefixMap[fileName] = envKey } } diff --git a/pkg/common/kdisc/etcd/config_manager.go b/pkg/common/kdisc/etcd/config_manager.go new file mode 100644 index 000000000..4e4e065b8 --- /dev/null +++ b/pkg/common/kdisc/etcd/config_manager.go @@ -0,0 +1,111 @@ +package etcd + +import ( + "context" + "os" + "os/exec" + "runtime" + "sync" + "syscall" + + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" + clientv3 "go.etcd.io/etcd/client/v3" +) + +const ( + ConfigKeyPrefix = "/chat/config/" + RestartKey = "restart" +) + +var ( + ShutDowns []func() error +) + +func RegisterShutDown(shutDown ...func() error) { + ShutDowns = append(ShutDowns, shutDown...) +} + +type ConfigManager struct { + client *clientv3.Client + watchConfigNames []string + lock sync.Mutex +} + +func BuildKey(s string) string { + return ConfigKeyPrefix + s +} + +func NewConfigManager(client *clientv3.Client, configNames []string) *ConfigManager { + return &ConfigManager{ + client: client, + watchConfigNames: datautil.Batch(func(s string) string { return BuildKey(s) }, append(configNames, RestartKey))} +} + +func (c *ConfigManager) Watch(ctx context.Context) { + chans := make([]clientv3.WatchChan, 0, len(c.watchConfigNames)) + for _, name := range c.watchConfigNames { + chans = append(chans, c.client.Watch(ctx, name, clientv3.WithPrefix())) + } + + doWatch := func(watchChan clientv3.WatchChan) { + for watchResp := range watchChan { + if watchResp.Err() != nil { + log.ZError(ctx, "watch err", errs.Wrap(watchResp.Err())) + continue + } + for _, event := range watchResp.Events { + if event.IsModify() { + if datautil.Contain(string(event.Kv.Key), c.watchConfigNames...) { + c.lock.Lock() + err := restartServer(ctx) + if err != nil { + log.ZError(ctx, "restart server err", err) + } + c.lock.Unlock() + } + } + } + } + } + for _, ch := range chans { + go doWatch(ch) + } +} + +func restartServer(ctx context.Context) error { + exePath, err := os.Executable() + if err != nil { + return errs.New("get executable path fail").Wrap() + } + + args := os.Args + env := os.Environ() + + cmd := exec.Command(exePath, args[1:]...) + cmd.Env = env + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Stdin = os.Stdin + + if runtime.GOOS != "windows" { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + log.ZInfo(ctx, "shutdown server") + for _, f := range ShutDowns { + if err = f(); err != nil { + log.ZError(ctx, "shutdown fail", err) + } + } + + log.ZInfo(ctx, "restart server") + err = cmd.Start() + if err != nil { + return errs.New("restart server fail").Wrap() + } + log.ZInfo(ctx, "cmd start over") + + os.Exit(0) + return nil +} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index d81eb88bb..e8b98a423 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -1,17 +1,3 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package startrpc import ( @@ -27,6 +13,8 @@ import ( "github.com/openimsdk/chat/pkg/common/config" "github.com/openimsdk/chat/pkg/common/kdisc" + disetcd "github.com/openimsdk/chat/pkg/common/kdisc/etcd" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/runtimeenv" @@ -42,8 +30,9 @@ import ( // Start rpc server. func Start[T any](ctx context.Context, discovery *config.Discovery, listenIP, - registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context, - config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { + registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, + watchConfigNames []string, + rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { runtimeEnv := runtimeenv.PrintRuntimeEnvironment() @@ -101,6 +90,10 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, listenIP, netDone <- struct{}{} } }() + if discovery.Enable == kdisc.ETCDCONST { + cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), watchConfigNames) + cm.Watch(ctx) + } sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) diff --git a/tools/attribute-to-credential/main.go b/tools/attribute-to-credential/main.go index d55797948..08dc176c8 100644 --- a/tools/attribute-to-credential/main.go +++ b/tools/attribute-to-credential/main.go @@ -7,7 +7,6 @@ import ( "path/filepath" "github.com/openimsdk/chat/internal/rpc/chat" - "github.com/openimsdk/chat/pkg/common/cmd" "github.com/openimsdk/chat/pkg/common/config" "github.com/openimsdk/chat/pkg/common/constant" table "github.com/openimsdk/chat/pkg/common/db/table/chat" @@ -37,7 +36,7 @@ func initConfig(configDir string) (*config.Mongo, error) { runtimeEnv := runtimeenv.PrintRuntimeEnvironment() - err := config.Load(configDir, cmd.MongodbConfigFileName, cmd.ConfigEnvPrefixMap[cmd.MongodbConfigFileName], runtimeEnv, mongoConfig) + err := config.Load(configDir, config.MongodbConfigFileName, config.EnvPrefixMap[config.MongodbConfigFileName], runtimeEnv, mongoConfig) if err != nil { return nil, err } diff --git a/tools/check-component/main.go b/tools/check-component/main.go index 36f488ea3..00206abbe 100644 --- a/tools/check-component/main.go +++ b/tools/check-component/main.go @@ -21,7 +21,6 @@ import ( "path/filepath" "time" - "github.com/openimsdk/chat/pkg/common/cmd" "github.com/openimsdk/chat/pkg/common/config" "github.com/openimsdk/chat/pkg/common/imapi" "github.com/openimsdk/tools/db/mongoutil" @@ -67,21 +66,21 @@ func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Discove runtimeEnv := runtimeenv.PrintRuntimeEnvironment() - err := config.Load(configDir, cmd.MongodbConfigFileName, cmd.ConfigEnvPrefixMap[cmd.MongodbConfigFileName], runtimeEnv, mongoConfig) + err := config.Load(configDir, config.MongodbConfigFileName, config.EnvPrefixMap[config.MongodbConfigFileName], runtimeEnv, mongoConfig) if err != nil { return nil, nil, nil, nil, err } - err = config.Load(configDir, cmd.RedisConfigFileName, cmd.ConfigEnvPrefixMap[cmd.RedisConfigFileName], runtimeEnv, redisConfig) + err = config.Load(configDir, config.RedisConfigFileName, config.EnvPrefixMap[config.RedisConfigFileName], runtimeEnv, redisConfig) if err != nil { return nil, nil, nil, nil, err } - err = config.Load(configDir, cmd.DiscoveryConfigFileName, cmd.ConfigEnvPrefixMap[cmd.DiscoveryConfigFileName], runtimeEnv, discoveryConfig) + err = config.Load(configDir, config.DiscoveryConfigFileName, config.EnvPrefixMap[config.DiscoveryConfigFileName], runtimeEnv, discoveryConfig) if err != nil { return nil, nil, nil, nil, err } - err = config.Load(configDir, cmd.ShareFileName, cmd.ConfigEnvPrefixMap[cmd.ShareFileName], runtimeEnv, shareConfig) + err = config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], runtimeEnv, shareConfig) if err != nil { return nil, nil, nil, nil, err }