Skip to content

Commit dfa9741

Browse files
authored
feat:xdsv3 support envoy odcds (polarismesh#1304)
1 parent 0806c91 commit dfa9741

File tree

213 files changed

+5095
-2884
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

213 files changed

+5095
-2884
lines changed

.github/workflows/benchmark.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ jobs:
100100
101101
sleep 120s
102102
ls -alR
103-
cat ./log/stdout 2>&1
104103
105104
cd ..
106105
ls -lstrh

.github/workflows/codecov.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ jobs:
7272
mysql -e "ALTER USER '${{ env.MYSQL_DB_USER }}'@'localhost' IDENTIFIED WITH mysql_native_password BY 'root';" -u${{ env.MYSQL_DB_USER }} -p${{ env.MYSQL_DB_PWD }}
7373
7474
# Execute vert check
75-
- name: Vert check
76-
run: bash vert.sh -install && bash vert.sh
75+
# - name: Vert check
76+
# run: bash vert.sh -install && bash vert.sh
7777

7878
- name: Standalone Test
7979
env:

.github/workflows/integration-testing-mysql.yml

-2
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,6 @@ jobs:
145145
146146
sleep 120s
147147
ls -alR
148-
cat ./log/stdout 2>&1
149-
150148
cd ..
151149
ls -lstrh
152150
# 先测试普通的集成测试

.github/workflows/integration-testing.yml

-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ jobs:
100100
101101
sleep 120s
102102
ls -alR
103-
cat ./log/stdout 2>&1
104103
105104
cd ..
106105
ls -lstrh

.golangci.yml

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ run:
3838
- pkg/model/pb
3939
- .*~
4040
- test
41+
- "apiserver/nacosserver/v2/pb"
4142

4243
# Which files to skip: they will be analyzed, but issues from them won't be reported.
4344
# Default value is empty list,
@@ -51,6 +52,7 @@ run:
5152
- ".*_test\\.go$"
5253
- ".*\\.yaml$"
5354
- ".*\\.yml$"
55+
- "apiserver/xdsserverv3/cache/linear.go"
5456

5557

5658
# Main linters configurations.

.licenserc.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ header: # `header` section is configurations for source codes license header.
5858
- "deploy"
5959
- "release"
6060
- "test/data/xds"
61+
- "apiserver/nacosserver/v2/pb"
6162

6263
# single file
6364
- "LICENSE"
@@ -74,6 +75,7 @@ header: # `header` section is configurations for source codes license header.
7475
- "**/*.md"
7576
- "**/go.mod"
7677
- "**/go.sum"
78+
- "apiserver/xdsserverv3/cache/linear.go"
7779
comment: on-failure # on what condition license-eye will comment on the pull request, `on-failure`, `always`, `never`.
7880

7981
# license-location-threshold specifies the index threshold where the license header can be located,

apiserver/apiserver.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ package apiserver
2020
import (
2121
"context"
2222
"fmt"
23-
"net/http"
23+
24+
"github.com/polarismesh/polaris/common/model"
2425
)
2526

2627
const (
@@ -61,12 +62,7 @@ type Apiserver interface {
6162

6263
type EnrichApiserver interface {
6364
Apiserver
64-
DebugHandlers() []DebugHandler
65-
}
66-
67-
type DebugHandler struct {
68-
Path string
69-
Handler http.HandlerFunc
65+
DebugHandlers() []model.DebugHandler
7066
}
7167

7268
var (

apiserver/eurekaserver/access_test.go

+31-30
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ func createEurekaServerForTest(
5353
if err != nil {
5454
return nil, err
5555
}
56+
// 注册实例信息修改 chain 数据
57+
eurekaSrv.registerInstanceChain()
5658
return eurekaSrv, nil
5759
}
5860

@@ -88,7 +90,7 @@ func batchBuildInstances(appId string, host string, port int, lease *LeaseInfo,
8890
func batchCreateInstance(t *testing.T, eurekaSvr *EurekaServer, namespace string, instances []*InstanceInfo) {
8991
for _, instance := range instances {
9092
code := eurekaSvr.registerInstances(context.Background(), namespace, instance.AppName, instance, false)
91-
assert.Equal(t, api.ExecuteSuccess, code)
93+
assert.Equal(t, api.ExecuteSuccess, code, fmt.Sprintf("%+v", code))
9294
}
9395
}
9496

@@ -223,36 +225,34 @@ func Test_EurekaWrite(t *testing.T) {
223225

224226
mockIns := genMockEurekaInstance()
225227

226-
t.Run("RegisterInstance", func(t *testing.T) {
227-
// pretty output must be created and written explicitly
228-
output, err := xml.MarshalIndent(mockIns, " ", " ")
229-
assert.NoError(t, err)
230-
231-
var body bytes.Buffer
232-
_, err = body.Write([]byte(xml.Header))
233-
assert.NoError(t, err)
234-
_, err = body.Write(output)
235-
assert.NoError(t, err)
236-
237-
mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s", mockIns.AppName), &body)
238-
mockReq.Header.Add(restful.HEADER_Accept, restful.MIME_XML)
239-
mockReq.Header.Add(restful.HEADER_ContentType, restful.MIME_XML)
240-
mockRsp := newMockResponseWriter()
241-
242-
restfulReq := restful.NewRequest(mockReq)
243-
injectRestfulReqPathParameters(t, restfulReq, map[string]string{
244-
ParamAppId: mockIns.AppName,
245-
})
246-
// 这里是异步注册
247-
eurekaSrv.RegisterApplication(restfulReq, restful.NewResponse(mockRsp))
248-
assert.Equal(t, http.StatusNoContent, mockRsp.statusCode)
249-
assert.Equal(t, restfulReq.Attribute(statusCodeHeader), uint32(apimodel.Code_ExecuteSuccess))
250-
251-
time.Sleep(5 * time.Second)
252-
saveIns, err := eurekaSrv.originDiscoverSvr.Cache().GetStore().GetInstance(mockIns.InstanceId)
253-
assert.NoError(t, err)
254-
assert.NotNil(t, saveIns)
228+
// pretty output must be created and written explicitly
229+
output, err := xml.MarshalIndent(mockIns, " ", " ")
230+
assert.NoError(t, err)
231+
232+
var body bytes.Buffer
233+
_, err = body.Write([]byte(xml.Header))
234+
assert.NoError(t, err)
235+
_, err = body.Write(output)
236+
assert.NoError(t, err)
237+
238+
mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s", mockIns.AppName), &body)
239+
mockReq.Header.Add(restful.HEADER_Accept, restful.MIME_XML)
240+
mockReq.Header.Add(restful.HEADER_ContentType, restful.MIME_XML)
241+
mockRsp := newMockResponseWriter()
242+
243+
restfulReq := restful.NewRequest(mockReq)
244+
injectRestfulReqPathParameters(t, restfulReq, map[string]string{
245+
ParamAppId: mockIns.AppName,
255246
})
247+
// 这里是异步注册
248+
eurekaSrv.RegisterApplication(restfulReq, restful.NewResponse(mockRsp))
249+
assert.Equal(t, http.StatusNoContent, mockRsp.statusCode)
250+
assert.Equal(t, restfulReq.Attribute(statusCodeHeader), uint32(apimodel.Code_ExecuteSuccess))
251+
252+
time.Sleep(5 * time.Second)
253+
saveIns, err := eurekaSrv.originDiscoverSvr.Cache().GetStore().GetInstance(mockIns.InstanceId)
254+
assert.NoError(t, err)
255+
assert.NotNil(t, saveIns)
256256

257257
t.Run("UpdateStatus", func(t *testing.T) {
258258
t.Run("StatusUnknown", func(t *testing.T) {
@@ -274,6 +274,7 @@ func Test_EurekaWrite(t *testing.T) {
274274
//
275275
saveIns, err := discoverSuit.Storage.GetInstance(mockIns.InstanceId)
276276
assert.NoError(t, err)
277+
assert.NotNil(t, saveIns)
277278
assert.False(t, saveIns.Isolate())
278279
})
279280

apiserver/eurekaserver/applications.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -393,9 +393,8 @@ func buildDataCenterInfo() *DataCenterInfo {
393393
Clazz: DefaultDciClazz,
394394
Name: customDciName,
395395
}
396-
} else {
397-
return DefaultDataCenterInfo
398396
}
397+
return DefaultDataCenterInfo
399398
}
400399

401400
func buildLocationInfo(instanceInfo *InstanceInfo, instance *apiservice.Instance) {

apiserver/eurekaserver/write_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/polarismesh/polaris/common/eventhub"
3333
"github.com/polarismesh/polaris/common/model"
3434
"github.com/polarismesh/polaris/common/utils"
35+
"github.com/polarismesh/polaris/service"
3536
"github.com/polarismesh/polaris/store"
3637
"github.com/polarismesh/polaris/store/mock"
3738
testsuit "github.com/polarismesh/polaris/test/suit"
@@ -120,10 +121,14 @@ func TestEurekaServer_renew(t *testing.T) {
120121
},
121122
}, nil)
122123

124+
mockStore.EXPECT().GetMoreClients(gomock.Any(), gomock.Any()).Return(map[string]*model.Client{}, nil).AnyTimes()
125+
mockStore.EXPECT().GetMoreGrayResouces(gomock.Any(), gomock.Any()).Return([]*model.GrayResource{}, nil).AnyTimes()
123126
mockStore.EXPECT().GetInstancesCountTx(gomock.Any()).AnyTimes().Return(uint32(1), nil)
124127
mockStore.EXPECT().GetUnixSecond(gomock.Any()).AnyTimes().Return(time.Now().Unix(), nil)
125128
mockStore.EXPECT().GetServicesCount().Return(uint32(1), nil).AnyTimes()
126129
mockStore.EXPECT().StartLeaderElection(gomock.Any()).AnyTimes()
130+
mockStore.EXPECT().GetMoreServiceContracts(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
131+
mockStore.EXPECT().GetMoreNamespaces(gomock.Any()).Return(nil, nil).AnyTimes()
127132
mockStore.EXPECT().Destroy().Return(nil)
128133
mockStore.EXPECT().Initialize(gomock.Any()).Return(nil).AnyTimes()
129134
mockStore.EXPECT().Name().Return("eureka_store_test").AnyTimes()
@@ -135,7 +140,10 @@ func TestEurekaServer_renew(t *testing.T) {
135140
return mockStore
136141
})
137142
eurekaSuit.Initialize(func(conf *testsuit.TestConfig) {
143+
conf.DisableAuth = true
138144
conf.Cache = cache.Config{}
145+
conf.DisableConfig = true
146+
conf.ServiceCacheEntries = service.GetRegisterCaches()
139147
store.TestInjectConfig(store.Config{
140148
Name: "eureka_store_test",
141149
})

apiserver/grpcserver/config/client_access.go

+121-7
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,48 @@ package config
2020
import (
2121
"context"
2222
"fmt"
23+
"io"
24+
"strconv"
2325

2426
apiconfig "github.com/polarismesh/specification/source/go/api/v1/config_manage"
27+
apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
28+
"go.uber.org/zap"
29+
"google.golang.org/grpc"
30+
"google.golang.org/protobuf/types/known/wrapperspb"
2531

32+
api "github.com/polarismesh/polaris/common/api/v1"
33+
commonlog "github.com/polarismesh/polaris/common/log"
2634
"github.com/polarismesh/polaris/common/metrics"
2735
commontime "github.com/polarismesh/polaris/common/time"
2836
"github.com/polarismesh/polaris/common/utils"
2937
"github.com/polarismesh/polaris/plugin"
3038
)
3139

40+
var (
41+
accesslog = commonlog.GetScopeOrDefaultByName(commonlog.APIServerLoggerName)
42+
)
43+
3244
// GetConfigFile 拉取配置
3345
func (g *ConfigGRPCServer) GetConfigFile(ctx context.Context,
3446
req *apiconfig.ClientConfigFileInfo) (*apiconfig.ConfigClientResponse, error) {
3547
ctx = utils.ConvertGRPCContext(ctx)
3648

3749
startTime := commontime.CurrentMillisecond()
50+
var ret *apiconfig.ConfigClientResponse
3851
defer func() {
3952
plugin.GetStatis().ReportDiscoverCall(metrics.ClientDiscoverMetric{
53+
Action: metrics.ActionGetConfigFile,
4054
ClientIP: utils.ParseClientAddress(ctx),
4155
Namespace: req.GetNamespace().GetValue(),
42-
Resource: fmt.Sprintf("CONFIG_FILE:%s|%s|%d", req.GetGroup().GetValue(),
43-
req.GetFileName().GetValue(), req.GetVersion().GetValue()),
56+
Resource: metrics.ResourceOfConfigFile(req.GetGroup().GetValue(), req.GetFileName().GetValue()),
4457
Timestamp: startTime,
4558
CostTime: commontime.CurrentMillisecond() - startTime,
59+
Revision: strconv.FormatUint(ret.GetConfigFile().GetVersion().GetValue(), 10),
60+
Success: ret.GetCode().GetValue() > uint32(apimodel.Code_DataNoChange),
4661
})
4762
}()
48-
response := g.configServer.GetConfigFileForClient(ctx, req)
49-
return response, nil
63+
ret = g.configServer.GetConfigFileWithCache(ctx, req)
64+
return ret, nil
5065
}
5166

5267
// CreateConfigFile 创建或更新配置
@@ -90,17 +105,116 @@ func (g *ConfigGRPCServer) GetConfigFileMetadataList(ctx context.Context,
90105
req *apiconfig.ConfigFileGroupRequest) (*apiconfig.ConfigClientListResponse, error) {
91106

92107
startTime := commontime.CurrentMillisecond()
108+
var ret *apiconfig.ConfigClientListResponse
93109
defer func() {
94110
plugin.GetStatis().ReportDiscoverCall(metrics.ClientDiscoverMetric{
111+
Action: metrics.ActionListConfigFiles,
95112
ClientIP: utils.ParseClientAddress(ctx),
96113
Namespace: req.GetConfigFileGroup().GetNamespace().GetValue(),
97-
Resource: fmt.Sprintf("CONFIG_FILE_LIST:%s|%s", req.GetConfigFileGroup().GetName().GetValue(),
98-
req.GetRevision().GetValue()),
114+
Resource: metrics.ResourceOfConfigFileList(req.GetConfigFileGroup().GetName().GetValue()),
99115
Timestamp: startTime,
100116
CostTime: commontime.CurrentMillisecond() - startTime,
117+
Revision: ret.GetRevision().GetValue(),
118+
Success: ret.GetCode().GetValue() > uint32(apimodel.Code_DataNoChange),
101119
})
102120
}()
103121

104122
ctx = utils.ConvertGRPCContext(ctx)
105-
return g.configServer.GetConfigFileNamesWithCache(ctx, req), nil
123+
ret = g.configServer.GetConfigFileNamesWithCache(ctx, req)
124+
return ret, nil
125+
}
126+
127+
func (g *ConfigGRPCServer) Discover(svr apiconfig.PolarisConfigGRPC_DiscoverServer) error {
128+
ctx := utils.ConvertGRPCContext(svr.Context())
129+
clientIP, _ := ctx.Value(utils.StringContext("client-ip")).(string)
130+
clientAddress, _ := ctx.Value(utils.StringContext("client-address")).(string)
131+
requestID, _ := ctx.Value(utils.StringContext("request-id")).(string)
132+
userAgent, _ := ctx.Value(utils.StringContext("user-agent")).(string)
133+
method, _ := grpc.MethodFromServerStream(svr)
134+
135+
for {
136+
in, err := svr.Recv()
137+
if err != nil {
138+
if io.EOF == err {
139+
return nil
140+
}
141+
return err
142+
}
143+
144+
msg := fmt.Sprintf("receive grpc discover request: %s", in.String())
145+
accesslog.Info(msg,
146+
zap.String("type", apiconfig.ConfigDiscoverRequest_ConfigDiscoverRequestType_name[int32(in.Type)]),
147+
zap.String("client-address", clientAddress),
148+
zap.String("user-agent", userAgent),
149+
utils.ZapRequestID(requestID),
150+
)
151+
152+
// 是否允许访问
153+
if ok := g.allowAccess(method); !ok {
154+
resp := api.NewConfigDiscoverResponse(apimodel.Code_ClientAPINotOpen)
155+
if sendErr := svr.Send(resp); sendErr != nil {
156+
return sendErr
157+
}
158+
continue
159+
}
160+
161+
// stream模式,需要对每个包进行检测
162+
if code := g.enterRateLimit(clientIP, method); code != uint32(apimodel.Code_ExecuteSuccess) {
163+
resp := api.NewConfigDiscoverResponse(apimodel.Code(code))
164+
if err = svr.Send(resp); err != nil {
165+
return err
166+
}
167+
continue
168+
}
169+
170+
var out *apiconfig.ConfigDiscoverResponse
171+
var action string
172+
startTime := commontime.CurrentMillisecond()
173+
defer func() {
174+
plugin.GetStatis().ReportDiscoverCall(metrics.ClientDiscoverMetric{
175+
Action: action,
176+
ClientIP: utils.ParseClientAddress(ctx),
177+
Namespace: in.GetConfigFile().GetNamespace().GetValue(),
178+
Resource: metrics.ResourceOfConfigFile(in.GetConfigFile().GetGroup().GetValue(), in.GetConfigFile().GetFileName().GetValue()),
179+
Timestamp: startTime,
180+
CostTime: commontime.CurrentMillisecond() - startTime,
181+
Revision: out.GetRevision(),
182+
Success: out.GetCode() > uint32(apimodel.Code_DataNoChange),
183+
})
184+
}()
185+
186+
switch in.Type {
187+
case apiconfig.ConfigDiscoverRequest_CONFIG_FILE:
188+
action = metrics.ActionGetConfigFile
189+
ret := g.configServer.GetConfigFileWithCache(ctx, &apiconfig.ClientConfigFileInfo{})
190+
out = api.NewConfigDiscoverResponse(apimodel.Code(ret.GetCode().GetValue()))
191+
out.ConfigFile = ret.GetConfigFile()
192+
out.Type = apiconfig.ConfigDiscoverResponse_CONFIG_FILE
193+
out.Revision = strconv.Itoa(int(out.GetConfigFile().GetVersion().GetValue()))
194+
case apiconfig.ConfigDiscoverRequest_CONFIG_FILE_Names:
195+
action = metrics.ActionListConfigFiles
196+
ret := g.configServer.GetConfigFileNamesWithCache(ctx, &apiconfig.ConfigFileGroupRequest{
197+
Revision: wrapperspb.String(in.GetRevision()),
198+
ConfigFileGroup: &apiconfig.ConfigFileGroup{
199+
Namespace: in.GetConfigFile().GetNamespace(),
200+
Name: in.GetConfigFile().GetGroup(),
201+
},
202+
})
203+
out = api.NewConfigDiscoverResponse(apimodel.Code(ret.GetCode().GetValue()))
204+
out.ConfigFileNames = ret.GetConfigFileInfos()
205+
out.Type = apiconfig.ConfigDiscoverResponse_CONFIG_FILE_Names
206+
out.Revision = ret.GetRevision().GetValue()
207+
case apiconfig.ConfigDiscoverRequest_CONFIG_FILE_GROUPS:
208+
action = metrics.ActionListConfigGroups
209+
req := in.GetConfigFile()
210+
req.Md5 = wrapperspb.String(in.GetRevision())
211+
out = g.configServer.GetConfigGroupsWithCache(ctx, req)
212+
default:
213+
out = api.NewConfigDiscoverResponse(apimodel.Code_InvalidDiscoverResource)
214+
}
215+
216+
if err := svr.Send(out); err != nil {
217+
return err
218+
}
219+
}
106220
}

0 commit comments

Comments
 (0)