Skip to content

Commit

Permalink
Merge pull request #481 from TarsCloud/feature/lbbniu/registry
Browse files Browse the repository at this point in the history
feat: framework supports automatic service registry discovery
  • Loading branch information
lbbniu authored Jun 18, 2023
2 parents 93a2db4 + 35bf0d2 commit 88e9920
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 80 deletions.
23 changes: 19 additions & 4 deletions tars/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type destroyableImp interface {

type application struct {
conf *conf.Conf
opt *options
svrCfg *serverConfig
cltCfg *clientConfig
communicator *Communicator
Expand Down Expand Up @@ -81,6 +82,7 @@ func init() {

func newApp() *application {
return &application{
opt: &options{},
tarsConfig: make(map[string]*transport.TarsServerConf),
goSvrs: make(map[string]*transport.TarsServer),
httpSvrs: make(map[string]*http.Server),
Expand All @@ -97,8 +99,8 @@ func GetConf() *conf.Conf {
return defaultApp.GetConf()
}

func Run() {
defaultApp.Run()
func Run(opts ...Option) {
defaultApp.Run(opts...)
}

// GetConf Get server conf.Conf config
Expand Down Expand Up @@ -332,12 +334,16 @@ func (a *application) initConfig() {
}

// Run the application
func (a *application) Run() {
func (a *application) Run(opts ...Option) {
defer rogger.FlushLogger()
a.isShutdowning = 0
a.init()
<-statInited

for _, opt := range opts {
opt(a.opt)
}

for _, env := range os.Environ() {
if strings.HasPrefix(env, grace.InheritFdPrefix) {
TLOG.Infof("env %s", env)
Expand Down Expand Up @@ -490,6 +496,12 @@ func (a *application) graceShutdown() {

TLOG.Infof("grace shutdown start %d in %v", pid, graceShutdownTimeout)
ctx, cancel := context.WithTimeout(context.Background(), graceShutdownTimeout)
// deregister service
wg.Add(1)
go func() {
defer wg.Done()
a.deregisterAdapters(ctx)
}()

for _, obj := range a.destroyableObjs {
wg.Add(1)
Expand Down Expand Up @@ -570,8 +582,11 @@ func (a *application) mainLoop() {
go ha.ReportVersion(svrCfg.Version)
go ha.KeepAlive("") //first start
go a.handleSignal()
loop := time.NewTicker(svrCfg.MainLoopTicker)
// registrar service
ctx := context.Background()
go a.registryAdapters(ctx)

loop := time.NewTicker(svrCfg.MainLoopTicker)
for {
select {
case <-a.shutdown:
Expand Down
22 changes: 18 additions & 4 deletions tars/communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type ProxyPrx interface {
type Communicator struct {
Client *clientConfig
app *application
opt *options
properties sync.Map
}

Expand All @@ -29,8 +30,8 @@ func GetCommunicator() *Communicator {

// NewCommunicator returns a new communicator. A Communicator is used for communicating with
// the server side which should only init once and be global!!!
func NewCommunicator() *Communicator {
return defaultApp.NewCommunicator()
func NewCommunicator(opts ...Option) *Communicator {
return defaultApp.NewCommunicator(opts...)
}

// Communicator returns a default communicator
Expand All @@ -41,8 +42,21 @@ func (a *application) Communicator() *Communicator {
return a.communicator
}

func (a *application) NewCommunicator() *Communicator {
c := &Communicator{app: a, Client: a.ClientConfig()}
func (a *application) NewCommunicator(opts ...Option) *Communicator {
a.init()
return newCommunicator(a, a.ClientConfig(), opts...)
}

func newCommunicator(app *application, client *clientConfig, opts ...Option) *Communicator {
o := *app.opt
for _, opt := range opts {
opt(&o)
}
c := &Communicator{
Client: client,
app: app,
opt: &o,
}
c.init()
return c
}
Expand Down
120 changes: 48 additions & 72 deletions tars/endpointmanager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package tars

import (
"context"
"encoding/json"
"fmt"
"hash/crc32"
"math/rand"
"os"
Expand All @@ -16,6 +16,8 @@ import (

"github.com/TarsCloud/TarsGo/tars/protocol/res/endpointf"
"github.com/TarsCloud/TarsGo/tars/protocol/res/queryf"
"github.com/TarsCloud/TarsGo/tars/registry"
tarsregistry "github.com/TarsCloud/TarsGo/tars/registry/tars"
"github.com/TarsCloud/TarsGo/tars/selector/consistenthash"
"github.com/TarsCloud/TarsGo/tars/selector/modhash"
"github.com/TarsCloud/TarsGo/tars/selector/roundrobin"
Expand Down Expand Up @@ -74,7 +76,7 @@ func GetManager(comm *Communicator, objName string, opts ...EndpointManagerOptio
g.mlock.Unlock()

TLOG.Debug("Create endpoint manager for ", objName)
em := newTarsEndpointManager(objName, comm, opts...) // avoid dead lock
em := newEndpointManager(objName, comm, opts...) // avoid dead lock
g.mlock.Lock()
if v, ok := g.eps[key]; ok {
g.mlock.Unlock()
Expand All @@ -90,7 +92,7 @@ func GetManager(comm *Communicator, objName string, opts ...EndpointManagerOptio
for i, ep := range em.activeEpf {
newEps[i] = endpoint.Tars2endpoint(ep)
}
em.firstUpdateActiveEp(newEps)
em.updateActiveEp(newEps)
TLOG.Debugf("init endpoint %s %v %v", objName, em.activeEp, em.inactiveEpf)
}
}
Expand All @@ -105,7 +107,7 @@ func (g *globalManager) checkEpStatus() {
g.mlock.Lock()
eps := make([]*endpointManager, 0)
for _, v := range g.eps {
if v.locator != nil {
if v.registrar != nil {
eps = append(eps, v)
}
}
Expand All @@ -122,7 +124,7 @@ func (g *globalManager) updateEndpoints() {
g.mlock.Lock()
eps := make([]*endpointManager, 0)
for _, v := range g.eps {
if v.locator != nil {
if v.registrar != nil {
eps = append(eps, v)
}
}
Expand Down Expand Up @@ -163,7 +165,7 @@ type endpointManager struct {
setDivision string
directProxy bool
comm *Communicator
locator *queryf.QueryF
registrar registry.Registrar

epList *sync.Map
epLock *sync.Mutex
Expand Down Expand Up @@ -221,7 +223,7 @@ func WithSet(setDivision string) OptionFunc {
})
}

func newTarsEndpointManager(objName string, comm *Communicator, opts ...EndpointManagerOption) *endpointManager {
func newEndpointManager(objName string, comm *Communicator, opts ...EndpointManagerOption) *endpointManager {
if objName == "" {
return nil
}
Expand All @@ -246,16 +248,21 @@ func newTarsEndpointManager(objName string, comm *Communicator, opts ...Endpoint
for i, end := range ends {
eps[i] = endpoint.Parse(end)
}
e.firstUpdateActiveEp(eps)
e.updateActiveEp(eps)
} else {
// [proxy] TODO singleton
TLOG.Debug("proxy mode:", objName)
e.objName = objName
e.directProxy = false
obj, _ := e.comm.GetProperty("locator")
e.locator = new(queryf.QueryF)
TLOG.Debug("string to proxy locator ", obj)
e.comm.StringToProxy(obj, e.locator)
if e.comm.opt.registrar == nil {
obj, _ := e.comm.GetProperty("locator")
query := new(queryf.QueryF)
TLOG.Debug("string to proxy locator ", obj)
e.comm.StringToProxy(obj, query)
e.registrar = tarsregistry.New(query)
} else {
e.registrar = e.comm.opt.registrar
}
e.checkAdapter = make(chan *AdapterProxy, 1000)
}
return e
Expand Down Expand Up @@ -390,7 +397,7 @@ func (e *endpointManager) doFresh() error {
}
e.freshLock.Lock()
defer e.freshLock.Unlock()
return e.findAndSetObj(e.locator)
return e.refreshEndpoints()
}

func (e *endpointManager) preInvoke() {
Expand All @@ -402,31 +409,27 @@ func (e *endpointManager) postInvoke() {
atomic.AddInt32(&e.invokeNum, -1)
}

func (e *endpointManager) findAndSetObj(q *queryf.QueryF) error {
activeEp := make([]endpointf.EndpointF, 0)
inactiveEp := make([]endpointf.EndpointF, 0)
var enableSet, ok bool
var setDivision string
var ret int32
var err error
func (e *endpointManager) refreshEndpoints() error {
var (
activeEp, inactiveEp []endpointf.EndpointF
enableSet, ok bool
setDivision string
err error
)
if e.enableSet && e.setDivision != "" {
enableSet = e.enableSet
setDivision = e.setDivision
enableSet, setDivision = e.enableSet, e.setDivision
} else if enableSet, ok = e.comm.GetPropertyBool("enableset"); ok {
setDivision, _ = e.comm.GetProperty("setdivision")
}

if enableSet {
ret, err = q.FindObjectByIdInSameSet(e.objName, setDivision, &activeEp, &inactiveEp)
activeEp, inactiveEp, err = e.registrar.QueryServantBySet(context.Background(), e.objName, setDivision)
} else {
ret, err = q.FindObjectByIdInSameGroup(e.objName, &activeEp, &inactiveEp)
activeEp, inactiveEp, err = e.registrar.QueryServant(context.Background(), e.objName)
}
if err != nil {
return err
}
if ret != 0 {
return fmt.Errorf("findAndSetObj %s fail, ret: %d", e.objName, ret)
}

// sort activeEp slice
sort.Slice(activeEp, func(i, j int) bool {
Expand All @@ -438,10 +441,14 @@ func (e *endpointManager) findAndSetObj(q *queryf.QueryF) error {
}

if len(activeEp) == 0 {
TLOG.Errorf("findAndSetObj %s, empty of active endpoint", e.objName)
TLOG.Errorf("refreshEndpoints %s, empty of active endpoint", e.objName)
return nil
}
TLOG.Debugf("findAndSetObj|call FindObjectById ok, obj: %s, ret: %d, active: %v, inactive: %v", e.objName, ret, activeEp, inactiveEp)
e.epLock.Lock()
e.activeEpf = activeEp
e.inactiveEpf = inactiveEp
e.epLock.Unlock()
TLOG.Debugf("refreshEndpoints|call QueryServant or QueryServantBySet, obj: %s, set: %s, active: %v, inactive: %v", e.objName, setDivision, activeEp, inactiveEp)

newEps := make([]endpoint.Endpoint, len(activeEp))
for i, ep := range activeEp {
Expand Down Expand Up @@ -469,14 +476,22 @@ func (e *endpointManager) findAndSetObj(q *queryf.QueryF) error {
if !flagActive && !flagInactive {
value.(*AdapterProxy).Close()
e.epList.Delete(key)
TLOG.Debugf("findAndSetObj|delete useless endpoint from epList: %+v", key)
TLOG.Debugf("refreshEndpoints|delete useless endpoint from epList: %+v", key)
}
return true
})

e.updateActiveEp(newEps)
return nil
}

func (e *endpointManager) updateActiveEp(newEps []endpoint.Endpoint) {
if len(newEps) == 0 {
return
}
sameType, lastType := true, newEps[0].WeightType
// delete active endpoint which status is false
sortedEps := make([]endpoint.Endpoint, 0)
sortedEps := make([]endpoint.Endpoint, 0, len(newEps))
for _, ep := range newEps {
if v, ok := e.epList.Load(ep.Key); ok {
adp := v.(*AdapterProxy)
Expand All @@ -502,60 +517,21 @@ func (e *endpointManager) findAndSetObj(q *queryf.QueryF) error {
sort.Slice(sortedEps, func(i int, j int) bool {
return crc32.ChecksumIEEE([]byte(sortedEps[i].Key)) < crc32.ChecksumIEEE([]byte(sortedEps[j].Key))
})

roundRobinSelector := roundrobin.New(e.enableWeight())
roundRobinSelector.Refresh(sortedEps)
conHashSelector := consistenthash.New(e.enableWeight(), consistenthash.KetamaHash)
conHashSelector.Refresh(sortedEps)
modHashSelector := modhash.New(e.enableWeight())
roundRobinSelector.Refresh(sortedEps)
modHashSelector.Refresh(sortedEps)

e.epLock.Lock()
e.activeEpf = activeEp
e.inactiveEpf = inactiveEp
e.activeEp = sortedEps
e.activeEpRoundRobin = roundRobinSelector
e.activeEpConHash = conHashSelector
e.activeEpModHash = modHashSelector
e.epLock.Unlock()

TLOG.Debugf("findAndSetObj|activeEp: %+v", sortedEps)
return nil
}

func (e *endpointManager) firstUpdateActiveEp(eps []endpoint.Endpoint) {
if len(eps) == 0 {
return
}
sameType, lastType := true, eps[0].WeightType
sortedEps := make([]endpoint.Endpoint, 0, len(eps))
for _, ep := range eps {
sortedEps = append(sortedEps, ep)
// check weightType
if ep.WeightType != lastType {
sameType = false
}
}

e.weightType = endpoint.ELoop
if sameType {
e.weightType = endpoint.WeightType(lastType)
}

// make endpoint slice sorted
sort.Slice(sortedEps, func(i int, j int) bool {
return crc32.ChecksumIEEE([]byte(sortedEps[i].Key)) < crc32.ChecksumIEEE([]byte(sortedEps[j].Key))
})
roundRobinSelector := roundrobin.New(e.enableWeight())
roundRobinSelector.Refresh(sortedEps)
conHashSelector := consistenthash.New(e.enableWeight(), consistenthash.KetamaHash)
conHashSelector.Refresh(sortedEps)
modHashSelector := modhash.New(e.enableWeight())
modHashSelector.Refresh(sortedEps)
e.activeEp = sortedEps
e.activeEpRoundRobin = roundRobinSelector
e.activeEpConHash = conHashSelector
e.activeEpModHash = modHashSelector
TLOG.Debugf("updateActiveEp|activeEp: %+v", sortedEps)
}

func (e *endpointManager) enableWeight() bool {
Expand Down
16 changes: 16 additions & 0 deletions tars/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package tars

import "github.com/TarsCloud/TarsGo/tars/registry"

type Option func(o *options)

type options struct {
registrar registry.Registrar
}

// Registrar returns an Option to use the Registrar
func Registrar(r registry.Registrar) Option {
return func(o *options) {
o.registrar = r
}
}
Loading

0 comments on commit 88e9920

Please sign in to comment.