Skip to content

Commit

Permalink
Merge pull request polarismesh#126 from chuntaojun/feat_issue_125
Browse files Browse the repository at this point in the history
[ISSUE polarismesh#125] Server access layer indicators are reported to Prometheus
  • Loading branch information
andrewshan authored Oct 27, 2021
2 parents bf4d803 + 42ce59a commit 0ed386d
Show file tree
Hide file tree
Showing 19 changed files with 839 additions and 49 deletions.
4 changes: 2 additions & 2 deletions apiserver/grpcserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (g *GRPCServer) streamInterceptor(srv interface{}, ss grpc.ServerStream,
zap.String("method", stream.Method),
)

g.statis.AddAPICall(stream.Method, stream.Code, 0)
g.statis.AddAPICall(stream.Method, "gRPC", stream.Code, 0)
}

return
Expand Down Expand Up @@ -420,7 +420,7 @@ func (g *GRPCServer) postprocess(stream *VirtualStream, m interface{}) {
)
}

_ = g.statis.AddAPICall(stream.Method, int(response.GetCode().GetValue()), diff.Nanoseconds())
_ = g.statis.AddAPICall(stream.Method, "gRPC", int(response.GetCode().GetValue()), diff.Nanoseconds())
}

// 限流
Expand Down
21 changes: 20 additions & 1 deletion apiserver/httpserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"github.com/polarismesh/polaris-server/common/utils"
"github.com/polarismesh/polaris-server/naming"
"github.com/polarismesh/polaris-server/plugin"
"github.com/polarismesh/polaris-server/plugin/statis/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -157,6 +159,7 @@ func (h *HTTPServer) Run(errCh chan error) {
}

ln = &tcpKeepAliveListener{ln.(*net.TCPListener)}

// 开启最大连接数限制
if h.connLimitConfig != nil && h.connLimitConfig.OpenConnLimit {
log.Infof("http server use max connection limit per ip: %d, http max limit: %d",
Expand Down Expand Up @@ -284,17 +287,33 @@ func (h *HTTPServer) createRestfulContainer() (*restful.Container, error) {
if h.enablePprof {
h.enablePprofAccess(wsContainer)
}

statis := plugin.GetStatis()
if _, ok := statis.(*prometheus.PrometheusStatis); ok {
h.enablePrometheusAccess(wsContainer)
}

return wsContainer, nil
}

// 开启pprof接口
func (h *HTTPServer) enablePprofAccess(wsContainer *restful.Container) {
log.Infof("open http access for pprof")
wsContainer.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
wsContainer.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
wsContainer.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
wsContainer.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
}

// 开启 Prometheus 接口
func (h *HTTPServer) enablePrometheusAccess(wsContainer *restful.Container) {
log.Infof("open http access for prometheus")
wsContainer.Handle("/metrics", promhttp.HandlerFor(
plugin.GetStatis().(*prometheus.PrometheusStatis).GetRegistry(),
promhttp.HandlerOpts{},
))
}

/**
* @brief 在接收和回复时统一处理请求
*/
Expand Down Expand Up @@ -384,7 +403,7 @@ func (h *HTTPServer) postProcess(req *restful.Request, rsp *restful.Response) {
)
}

_ = h.statis.AddAPICall(method, int(code), diff.Nanoseconds())
_ = h.statis.AddAPICall(method, "HTTP", int(code), diff.Nanoseconds())
}

/**
Expand Down
2 changes: 1 addition & 1 deletion apiserver/l5pbserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,6 @@ func (l *L5pbserver) PostProcess(req *cl5Request) {
zap.Duration("handling-time", diff),
)
}
_ = l.statis.AddAPICall(cmdStr, int(req.code), diff.Nanoseconds())
_ = l.statis.AddAPICall(cmdStr, "HTTP", int(req.code), diff.Nanoseconds())
// 告警
}
1 change: 1 addition & 0 deletions bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func Start(configFilePath string) {

// 设置插件配置
plugin.SetPluginConfig(&cfg.Plugin)
plugin.SetLocalHost(LocalHost)

// 初始化存储层
store.SetStoreConfig(&cfg.Store)
Expand Down
8 changes: 3 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ require (
github.com/gomodule/redigo v1.8.5
github.com/google/uuid v1.2.0
github.com/hashicorp/golang-lru v0.5.3
github.com/json-iterator/go v1.1.9 // indirect
github.com/mitchellh/mapstructure v1.1.2
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pkg/errors v0.8.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5 // indirect
Expand All @@ -31,7 +29,7 @@ require (
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/grpc v1.36.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v2 v2.3.0

)

Expand Down
91 changes: 84 additions & 7 deletions go.sum

Large diffs are not rendered by default.

21 changes: 16 additions & 5 deletions naming/test/circuitbreaker_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package test

import (
"fmt"
api "github.com/polarismesh/polaris-server/common/api/v1"
"github.com/polarismesh/polaris-server/common/utils"
"github.com/polarismesh/polaris-server/naming"
"sync"
"testing"
"time"

api "github.com/polarismesh/polaris-server/common/api/v1"
"github.com/polarismesh/polaris-server/common/utils"
"github.com/polarismesh/polaris-server/naming"
)

/**
Expand Down Expand Up @@ -575,6 +576,7 @@ func TestUpdateCircuitBreaker(t *testing.T) {

t.Run("并发更新熔断规则时,可以正常更新", func(t *testing.T) {
var wg sync.WaitGroup
errs := make(chan error)
for i := 1; i <= 500; i++ {
wg.Add(1)
go func(index int) {
Expand All @@ -594,12 +596,21 @@ func TestUpdateCircuitBreaker(t *testing.T) {
}
resp := server.GetCircuitBreaker(filters)
if !respSuccess(resp) {
t.Fatal("error")
errs <- fmt.Errorf("error : %v", resp)
}
checkCircuitBreaker(t, cbResp, cbResp, resp.GetConfigWithServices()[0].GetCircuitBreaker())
}(i)
}
wg.Wait()
go func() {
wg.Wait()
close(errs)
}()

for err := range errs {
if err != nil {
t.Fatal(err)
}
}
})
}

Expand Down
71 changes: 58 additions & 13 deletions naming/test/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ package test
import (
"context"
"fmt"
api "github.com/polarismesh/polaris-server/common/api/v1"
"github.com/polarismesh/polaris-server/common/utils"
"github.com/polarismesh/polaris-server/naming"
. "github.com/smartystreets/goconvey/convey"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

api "github.com/polarismesh/polaris-server/common/api/v1"
"github.com/polarismesh/polaris-server/common/utils"
"github.com/polarismesh/polaris-server/naming"
. "github.com/smartystreets/goconvey/convey"
)

// 测试新建实例
Expand Down Expand Up @@ -172,6 +173,7 @@ func TestCreateInstance2(t *testing.T) {
total := 1024
var wg sync.WaitGroup
start := time.Now()
errs := make(chan error)
for i := 0; i < total; i++ {
wg.Add(1)
go func(index int) {
Expand All @@ -181,15 +183,25 @@ func TestCreateInstance2(t *testing.T) {
req, resp = createCommonInstance(t, serviceResps[index%10], index)
for c := 0; c < 10; c++ {
if updateResp := server.UpdateInstance(defaultCtx, req); !respSuccess(updateResp) {
t.Fatalf("error: %+v", updateResp)
errs <- fmt.Errorf("error: %+v", updateResp)
return
}
}
removeCommonInstance(t, serviceResps[index%10], resp.GetId().GetValue())
cleanInstance(resp.GetId().GetValue())
}(i)
}

wg.Wait()
go func() {
wg.Wait()
close(errs)
}()

for err := range errs {
if err != nil {
t.Fatal(err)
}
}
t.Logf("consume: %v", time.Now().Sub(start))
})
}
Expand All @@ -203,19 +215,30 @@ func TestUpdateInstanceManyTimes(t *testing.T) {
defer cleanInstance(instanceResp.GetId().GetValue())

var wg sync.WaitGroup
errs := make(chan error)
for i := 0; i < 64; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
for c := 0; c < 16; c++ {
instanceReq.Weight.Value = uint32(rand.Int() % 32767)
if updateResp := server.UpdateInstance(defaultCtx, instanceReq); !respSuccess(updateResp) {
t.Fatalf("error: %+v", updateResp)
errs <- fmt.Errorf("error: %+v", updateResp)
return
}
}
}(i)
}
wg.Wait()
go func() {
wg.Wait()
close(errs)
}()

for err := range errs {
if err != nil {
t.Fatal(err)
}
}
}

// 测试获取实例
Expand Down Expand Up @@ -576,7 +599,7 @@ func TestListInstances1(t *testing.T) {
query = map[string]string{
"service": serviceResp.GetName().GetValue(),
"namespace": serviceResp.GetNamespace().GetValue(),
"values": "internal-personal-xxx",
"values": "internal-personal-xxx",
}
resp = server.GetInstances(query)
if resp.GetCode().GetValue() != api.InvalidQueryInsParameter {
Expand Down Expand Up @@ -831,19 +854,30 @@ func TestUpdateIsolate(t *testing.T) {
defer cleanInstance(instanceResp.GetId().GetValue())

var wg sync.WaitGroup
errs := make(chan error)
for i := 0; i < 64; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
for c := 0; c < 16; c++ {
instanceReq.Isolate = utils.NewBoolValue(true)
if resp := server.UpdateInstanceIsolate(defaultCtx, instanceReq); !respSuccess(resp) {
t.Fatalf("error: %+v", resp)
errs <- fmt.Errorf("error: %+v", resp)
return
}
}
}(i)
}
wg.Wait()
go func() {
wg.Wait()
close(errs)
}()

for err := range errs {
if err != nil {
t.Fatal(err)
}
}
t.Log("pass")
})

Expand Down Expand Up @@ -1168,6 +1202,7 @@ func TestBatchDeleteInstances(t *testing.T) {
t.Run("测试batch删除实例,单个接口", func(t *testing.T) {
_, resps := createInstances(t)
var wg sync.WaitGroup
errs := make(chan error)
for _, resp := range resps.GetResponses() {
wg.Add(1)
go func(instance *api.Instance) {
Expand All @@ -1177,11 +1212,21 @@ func TestBatchDeleteInstances(t *testing.T) {
}()
req := &api.Instance{Id: instance.Id, ServiceToken: service.Token}
if out := server.DeleteInstance(defaultCtx, req); !respSuccess(out) {
t.Fatalf("error: %+v", out)
errs <- fmt.Errorf("error: %+v", out)
return
}
}(resp.GetInstance())
}
wg.Wait()
go func() {
wg.Wait()
close(errs)
}()

for err := range errs {
if err != nil {
t.Fatal(err)
}
}
})
t.Run("测试batch删除实例,批量接口", func(t *testing.T) {
instances, instancesResp := createInstances(t)
Expand Down
23 changes: 18 additions & 5 deletions naming/test/ratelimit_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package test
import (
"context"
"encoding/json"
"github.com/golang/protobuf/ptypes/duration"
api "github.com/polarismesh/polaris-server/common/api/v1"
"github.com/polarismesh/polaris-server/common/utils"
"fmt"
"sync"
"testing"
"time"

"github.com/golang/protobuf/ptypes/duration"
api "github.com/polarismesh/polaris-server/common/api/v1"
"github.com/polarismesh/polaris-server/common/utils"
)

/**
Expand Down Expand Up @@ -294,6 +296,7 @@ func TestUpdateRateLimit(t *testing.T) {

t.Run("并发更新限流规则时,可以正常更新", func(t *testing.T) {
var wg sync.WaitGroup
errs := make(chan error)
for i := 1; i <= 500; i++ {
wg.Add(1)
go func(index int) {
Expand All @@ -311,12 +314,22 @@ func TestUpdateRateLimit(t *testing.T) {
}
resp := server.GetRateLimits(filters)
if !respSuccess(resp) {
t.Fatalf("error")
errs <- fmt.Errorf("error : %v", resp)
}
checkRateLimit(t, rateLimitResp, resp.GetRateLimits()[0])
}(i)
}
wg.Wait()
go func() {
wg.Wait()
close(errs)
}()

for err := range errs {
if err != nil {
t.Fatal(err)
}
}

t.Log("pass")
})
}
Expand Down
Loading

0 comments on commit 0ed386d

Please sign in to comment.