Skip to content

Commit

Permalink
Merge pull request #72 from sak0/fix_issue58
Browse files Browse the repository at this point in the history
fix issue 58
  • Loading branch information
sak0 authored Feb 27, 2025
2 parents 50581b8 + 07c48ed commit 1538316
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 13 deletions.
91 changes: 78 additions & 13 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"google.golang.org/grpc/status"
)

const defaultInstanceProtocol = "grpc"

var (
reportInfoAnalyzer ReportInfoAnalyzer = func(info balancer.DoneInfo) (model.RetStatus, uint32) {
recErr := info.Err
Expand Down Expand Up @@ -117,9 +119,10 @@ type polarisNamingBalancer struct {
subConns map[string]balancer.SubConn
scStates map[balancer.SubConn]connectivity.State

v2Picker balancer.Picker
consumerAPI polaris.ConsumerAPI
routerAPI polaris.RouterAPI
v2Picker balancer.Picker
consumerAPI polaris.ConsumerAPI
routerAPI polaris.RouterAPI
circuitBreakerAPI polaris.CircuitBreakerAPI

lbCfg *LBConfig

Expand Down Expand Up @@ -207,6 +210,7 @@ func (p *polarisNamingBalancer) UpdateClientConnState(state balancer.ClientConnS
if nil == p.consumerAPI {
p.consumerAPI = polaris.NewConsumerAPIByContext(p.options.SDKContext)
p.routerAPI = polaris.NewRouterAPIByContext(p.options.SDKContext)
p.circuitBreakerAPI = polaris.NewCircuitBreakerAPIByContext(p.options.SDKContext)
}
// Successful resolution; clear resolver error and ensure we return nil.
p.resolverErr = nil
Expand Down Expand Up @@ -436,11 +440,18 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul
subSc, ok := pnp.readySCs[addr]
if ok {
reporter := &resultReporter{
method: info.FullMethodName,
instance: targetInstance,
consumerAPI: pnp.balancer.consumerAPI,
startTime: time.Now(),
sourceService: sourceService,
method: info.FullMethodName,
instance: targetInstance,
consumerAPI: pnp.balancer.consumerAPI,
circuitBreakerAPI: pnp.balancer.circuitBreakerAPI,
startTime: time.Now(),
sourceService: sourceService,
namespace: pnp.options.Namespace,
service: pnp.balancer.host,
circuitBreaker: false,
}
if pnp.options.CircuitBreaker {
reporter.circuitBreaker = true
}

return balancer.PickResult{
Expand Down Expand Up @@ -544,11 +555,15 @@ func collectRouteLabels(routings []*traffic_manage.Route) []string {
}

type resultReporter struct {
method string
instance model.Instance
consumerAPI polaris.ConsumerAPI
startTime time.Time
sourceService *model.ServiceInfo
method string
namespace string
service string
instance model.Instance
consumerAPI polaris.ConsumerAPI
circuitBreakerAPI polaris.CircuitBreakerAPI
startTime time.Time
sourceService *model.ServiceInfo
circuitBreaker bool
}

func (r *resultReporter) report(info balancer.DoneInfo) {
Expand All @@ -567,4 +582,54 @@ func (r *resultReporter) report(info balancer.DoneInfo) {
if err := r.consumerAPI.UpdateServiceCallResult(callResult); err != nil {
GetLogger().Error("[Polaris][Balancer] report grpc call info fail : %+v", err)
}
if r.circuitBreaker {
if err := r.reportCircuitBreak(r.instance, retStatus, strconv.Itoa(int(code)), r.startTime); err != nil {
GetLogger().Error("[Polaris][Balancer] report grpc circuit breaker info fail : %+v", err)
}
}
}

func (r *resultReporter) reportCircuitBreak(instance model.Instance, status model.RetStatus,

Check warning on line 592 in balancer.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.15.x)

import-shadowing: The name 'status' shadows an import name (revive)

Check warning on line 592 in balancer.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.16.x)

import-shadowing: The name 'status' shadows an import name (revive)

Check warning on line 592 in balancer.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.18.x)

import-shadowing: The name 'status' shadows an import name (revive)

Check warning on line 592 in balancer.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.17.x)

import-shadowing: The name 'status' shadows an import name (revive)
retCode string, start time.Time) error {

caller := &model.ServiceKey{}
if r.sourceService != nil {
caller.Service = r.sourceService.Service
caller.Namespace = r.sourceService.Namespace
}

protocol := instance.GetProtocol()
if protocol == "" {
protocol = defaultInstanceProtocol
}
insRes, err := model.NewInstanceResource(&model.ServiceKey{
Namespace: r.namespace,
Service: r.service,
}, caller, protocol, instance.GetHost(), instance.GetPort())
if err != nil {
return fmt.Errorf("report circuitBreaker for service %v get instance resource failed: %v",
insRes, err)
}

GetLogger().Debug("report circuitBreaker status [%v] code [%s] for instance %s/%s:%d "+
"caller [%s] "+
"delay [%v] "+
"circuitBreaker status [%v]\n",
status, retCode,
instance.GetService(),
instance.GetHost(), instance.GetPort(),
r.service, time.Since(start),
instance.GetCircuitBreakerStatus())

if err := r.circuitBreakerAPI.Report(&model.ResourceStat{
Delay: time.Since(start),
RetStatus: status,
RetCode: retCode,
Resource: insRes,
}); err != nil {
return fmt.Errorf("report circuitBreaker for service %v failed: %v",
insRes, err)
}

return nil
}
1 change: 1 addition & 0 deletions examples/circuitbreak/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func main() {
conn, err := polaris.DialContext(ctx, "polaris://CircuitBreakerEchoServerGRPC/",
polaris.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())),
polaris.WithEnableCircuitBreaker(),
polaris.WithClientNamespace("default"),
)
if err != nil {
log.Fatal(err)
Expand Down

0 comments on commit 1538316

Please sign in to comment.