Skip to content

Commit 76f3b69

Browse files
committed
feat: config center
1 parent 12f642b commit 76f3b69

File tree

15 files changed

+656
-90
lines changed

15 files changed

+656
-90
lines changed

internal/api/admin/config_manager.go

+202
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package admin
2+
3+
import (
4+
"encoding/json"
5+
"reflect"
6+
"strconv"
7+
"time"
8+
9+
"github.com/gin-gonic/gin"
10+
"github.com/openimsdk/chat/pkg/common/apistruct"
11+
"github.com/openimsdk/chat/pkg/common/config"
12+
"github.com/openimsdk/chat/pkg/common/kdisc"
13+
"github.com/openimsdk/chat/pkg/common/kdisc/etcd"
14+
"github.com/openimsdk/chat/version"
15+
"github.com/openimsdk/tools/apiresp"
16+
"github.com/openimsdk/tools/errs"
17+
"github.com/openimsdk/tools/log"
18+
"github.com/openimsdk/tools/utils/runtimeenv"
19+
clientv3 "go.etcd.io/etcd/client/v3"
20+
)
21+
22+
type ConfigManager struct {
23+
config *config.AllConfig
24+
client *clientv3.Client
25+
configPath string
26+
runtimeEnv string
27+
}
28+
29+
func NewConfigManager(cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager {
30+
return &ConfigManager{
31+
config: cfg,
32+
client: client,
33+
configPath: configPath,
34+
runtimeEnv: runtimeEnv,
35+
}
36+
}
37+
38+
func (cm *ConfigManager) GetConfig(c *gin.Context) {
39+
var req apistruct.GetConfigReq
40+
if err := c.BindJSON(&req); err != nil {
41+
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
42+
return
43+
}
44+
conf := cm.config.Name2Config(req.ConfigName)
45+
if conf == nil {
46+
apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap())
47+
return
48+
}
49+
b, err := json.Marshal(conf)
50+
if err != nil {
51+
apiresp.GinError(c, err)
52+
return
53+
}
54+
apiresp.GinSuccess(c, string(b))
55+
}
56+
57+
func (cm *ConfigManager) GetConfigList(c *gin.Context) {
58+
var resp apistruct.GetConfigListResp
59+
resp.ConfigNames = cm.config.GetConfigNames()
60+
resp.Environment = runtimeenv.PrintRuntimeEnvironment()
61+
resp.Version = version.Version
62+
63+
apiresp.GinSuccess(c, resp)
64+
}
65+
66+
func (cm *ConfigManager) SetConfig(c *gin.Context) {
67+
if cm.config.Discovery.Enable != kdisc.ETCDCONST {
68+
apiresp.GinError(c, errs.New("only etcd support set config").Wrap())
69+
return
70+
}
71+
var req apistruct.SetConfigReq
72+
if err := c.BindJSON(&req); err != nil {
73+
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
74+
return
75+
}
76+
var err error
77+
switch req.ConfigName {
78+
case config.DiscoveryConfigFileName:
79+
err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
80+
case config.LogConfigFileName:
81+
err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
82+
case config.MongodbConfigFileName:
83+
err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
84+
case config.ChatAPIAdminCfgFileName:
85+
err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
86+
case config.ChatAPIChatCfgFileName:
87+
err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
88+
case config.ChatRPCAdminCfgFileName:
89+
err = compareAndSave[config.Admin](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
90+
case config.ChatRPCChatCfgFileName:
91+
err = compareAndSave[config.Chat](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
92+
case config.ShareFileName:
93+
err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
94+
case config.RedisConfigFileName:
95+
err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
96+
default:
97+
apiresp.GinError(c, errs.ErrArgs.Wrap())
98+
return
99+
}
100+
if err != nil {
101+
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
102+
return
103+
}
104+
apiresp.GinSuccess(c, nil)
105+
}
106+
107+
func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, client *clientv3.Client) error {
108+
conf := new(T)
109+
err := json.Unmarshal([]byte(req.Data), &conf)
110+
if err != nil {
111+
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
112+
}
113+
eq := reflect.DeepEqual(old, conf)
114+
if eq {
115+
return nil
116+
}
117+
data, err := json.Marshal(conf)
118+
if err != nil {
119+
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
120+
}
121+
_, err = client.Put(c, etcd.BuildKey(req.ConfigName), string(data))
122+
if err != nil {
123+
return errs.WrapMsg(err, "save to etcd failed")
124+
}
125+
return nil
126+
}
127+
128+
func (cm *ConfigManager) ResetConfig(c *gin.Context) {
129+
go cm.resetConfig(c)
130+
apiresp.GinSuccess(c, nil)
131+
}
132+
133+
func (cm *ConfigManager) resetConfig(c *gin.Context) {
134+
txn := cm.client.Txn(c)
135+
type initConf struct {
136+
old any
137+
new any
138+
isChanged bool
139+
}
140+
configMap := map[string]*initConf{
141+
config.DiscoveryConfigFileName: {old: &cm.config.Discovery, new: new(config.Discovery)},
142+
config.LogConfigFileName: {old: &cm.config.Log, new: new(config.Log)},
143+
config.MongodbConfigFileName: {old: &cm.config.Mongo, new: new(config.Mongo)},
144+
config.ChatAPIAdminCfgFileName: {old: &cm.config.AdminAPI, new: new(config.API)},
145+
config.ChatAPIChatCfgFileName: {old: &cm.config.ChatAPI, new: new(config.API)},
146+
config.ChatRPCAdminCfgFileName: {old: &cm.config.Admin, new: new(config.Admin)},
147+
config.ChatRPCChatCfgFileName: {old: &cm.config.Chat, new: new(config.Chat)},
148+
config.RedisConfigFileName: {old: &cm.config.Redis, new: new(config.Redis)},
149+
config.ShareFileName: {old: &cm.config.Share, new: new(config.Share)},
150+
}
151+
152+
changedKeys := make([]string, 0, len(configMap))
153+
for k, v := range configMap {
154+
err := config.Load(
155+
cm.configPath,
156+
k,
157+
config.EnvPrefixMap[k],
158+
cm.runtimeEnv,
159+
v.new,
160+
)
161+
if err != nil {
162+
log.ZError(c, "load config failed", err)
163+
continue
164+
}
165+
v.isChanged = reflect.DeepEqual(v.old, v.new)
166+
if !v.isChanged {
167+
changedKeys = append(changedKeys, k)
168+
}
169+
}
170+
171+
ops := make([]clientv3.Op, 0)
172+
for _, k := range changedKeys {
173+
data, err := json.Marshal(configMap[k].new)
174+
if err != nil {
175+
log.ZError(c, "marshal config failed", err)
176+
continue
177+
}
178+
ops = append(ops, clientv3.OpPut(etcd.BuildKey(k), string(data)))
179+
}
180+
if len(ops) > 0 {
181+
txn.Then(ops...)
182+
_, err := txn.Commit()
183+
if err != nil {
184+
log.ZError(c, "commit etcd txn failed", err)
185+
return
186+
}
187+
}
188+
}
189+
190+
func (cm *ConfigManager) Restart(c *gin.Context) {
191+
go cm.restart(c)
192+
apiresp.GinSuccess(c, nil)
193+
}
194+
195+
func (cm *ConfigManager) restart(c *gin.Context) {
196+
time.Sleep(time.Millisecond * 200) // wait for Restart http call return
197+
t := time.Now().Unix()
198+
_, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t)))
199+
if err != nil {
200+
log.ZError(c, "restart etcd put key failed", err)
201+
}
202+
}

internal/api/admin/start.go

+74-10
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,40 @@ package admin
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"net/http"
8+
"os"
9+
"os/signal"
10+
"syscall"
11+
"time"
612

713
"github.com/gin-gonic/gin"
814
chatmw "github.com/openimsdk/chat/internal/api/mw"
915
"github.com/openimsdk/chat/internal/api/util"
1016
"github.com/openimsdk/chat/pkg/common/config"
1117
"github.com/openimsdk/chat/pkg/common/imapi"
1218
"github.com/openimsdk/chat/pkg/common/kdisc"
19+
disetcd "github.com/openimsdk/chat/pkg/common/kdisc/etcd"
1320
adminclient "github.com/openimsdk/chat/pkg/protocol/admin"
1421
chatclient "github.com/openimsdk/chat/pkg/protocol/chat"
22+
"github.com/openimsdk/tools/discovery"
23+
"github.com/openimsdk/tools/discovery/etcd"
1524
"github.com/openimsdk/tools/errs"
1625
"github.com/openimsdk/tools/mw"
26+
"github.com/openimsdk/tools/system/program"
1727
"github.com/openimsdk/tools/utils/datautil"
28+
"github.com/openimsdk/tools/utils/runtimeenv"
29+
clientv3 "go.etcd.io/etcd/client/v3"
1830
"google.golang.org/grpc"
1931
"google.golang.org/grpc/credentials/insecure"
20-
21-
"github.com/openimsdk/tools/utils/runtimeenv"
2232
)
2333

2434
type Config struct {
25-
ApiConfig config.API
26-
27-
Discovery config.Discovery
28-
Share config.Share
35+
*config.AllConfig
2936

3037
RuntimeEnv string
38+
ConfigPath string
3139
}
3240

3341
func Start(ctx context.Context, index int, config *Config) error {
@@ -36,7 +44,7 @@ func Start(ctx context.Context, index int, config *Config) error {
3644
if len(config.Share.ChatAdmin) == 0 {
3745
return errs.New("share chat admin not configured")
3846
}
39-
apiPort, err := datautil.GetElemByIndex(config.ApiConfig.Api.Ports, index)
47+
apiPort, err := datautil.GetElemByIndex(config.AdminAPI.Api.Ports, index)
4048
if err != nil {
4149
return err
4250
}
@@ -66,11 +74,51 @@ func Start(ctx context.Context, index int, config *Config) error {
6674
gin.SetMode(gin.ReleaseMode)
6775
engine := gin.New()
6876
engine.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID())
69-
SetAdminRoute(engine, adminApi, mwApi)
70-
return engine.Run(fmt.Sprintf(":%d", apiPort))
77+
SetAdminRoute(engine, adminApi, mwApi, config, client)
78+
79+
if config.Discovery.Enable == kdisc.ETCDCONST {
80+
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames())
81+
cm.Watch(ctx)
82+
}
83+
var (
84+
netDone = make(chan struct{}, 1)
85+
netErr error
86+
)
87+
server := http.Server{Addr: fmt.Sprintf(":%d", apiPort), Handler: engine}
88+
go func() {
89+
err = server.ListenAndServe()
90+
if err != nil && !errors.Is(err, http.ErrServerClosed) {
91+
netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr))
92+
netDone <- struct{}{}
93+
}
94+
}()
95+
shutdown := func() error {
96+
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
97+
defer cancel()
98+
err := server.Shutdown(ctx)
99+
if err != nil {
100+
return errs.WrapMsg(err, "shutdown err")
101+
}
102+
return nil
103+
}
104+
disetcd.RegisterShutDown(shutdown)
105+
106+
sigs := make(chan os.Signal, 1)
107+
signal.Notify(sigs, syscall.SIGTERM)
108+
select {
109+
case <-sigs:
110+
program.SIGTERMExit()
111+
if err := shutdown(); err != nil {
112+
return err
113+
}
114+
case <-netDone:
115+
close(netDone)
116+
return netErr
117+
}
118+
return nil
71119
}
72120

73-
func SetAdminRoute(router gin.IRouter, admin *Api, mw *chatmw.MW) {
121+
func SetAdminRoute(router gin.IRouter, admin *Api, mw *chatmw.MW, cfg *Config, client discovery.SvcDiscoveryRegistry) {
74122

75123
adminRouterGroup := router.Group("/account")
76124
adminRouterGroup.POST("/login", admin.AdminLogin) // Login
@@ -149,4 +197,20 @@ func SetAdminRoute(router gin.IRouter, admin *Api, mw *chatmw.MW) {
149197
applicationGroup.POST("/delete_version", mw.CheckAdmin, admin.DeleteApplicationVersion)
150198
applicationGroup.POST("/latest_version", admin.LatestApplicationVersion)
151199
applicationGroup.POST("/page_versions", admin.PageApplicationVersion)
200+
201+
var etcdClient *clientv3.Client
202+
if cfg.Discovery.Enable == kdisc.ETCDCONST {
203+
etcdClient = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
204+
}
205+
cm := NewConfigManager(cfg.AllConfig, etcdClient, cfg.ConfigPath, cfg.RuntimeEnv)
206+
{
207+
configGroup := router.Group("/config", mw.CheckAdmin)
208+
configGroup.POST("/get_config_list", cm.GetConfigList)
209+
configGroup.POST("/get_config", cm.GetConfig)
210+
configGroup.POST("/set_config", cm.SetConfig)
211+
configGroup.POST("/reset_config", cm.ResetConfig)
212+
}
213+
{
214+
router.POST("/restart", mw.CheckAdmin, cm.Restart)
215+
}
152216
}

0 commit comments

Comments
 (0)