Skip to content

Commit b9fca75

Browse files
address comments
Signed-off-by: LiZhenCheng9527 <[email protected]>
1 parent 72c9a32 commit b9fca75

File tree

6 files changed

+308
-230
lines changed

6 files changed

+308
-230
lines changed

pkg/controller/ads/dns.go

+68-64
Original file line numberDiff line numberDiff line change
@@ -20,115 +20,119 @@ import (
2020
"net"
2121
"net/netip"
2222
"slices"
23+
"time"
2324

2425
clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
2526
v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2627
endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
2728
"google.golang.org/protobuf/types/known/wrapperspb"
28-
"k8s.io/client-go/util/workqueue"
2929

3030
core_v2 "kmesh.net/kmesh/api/v2/core"
3131
"kmesh.net/kmesh/pkg/dns"
3232
)
3333

3434
// adsDnsResolver is DNS resolver of Kernel Native
35-
type AdsDnsResolver struct {
36-
Clusters chan []*clusterv3.Cluster
37-
adsCache *AdsCache
38-
dnsResolver *dns.DNSResolver
39-
dnsRefreshQueue workqueue.TypedDelayingInterface[any]
35+
type dnsController struct {
36+
Clusters chan []*clusterv3.Cluster
37+
cache *AdsCache
38+
dnsResolver *dns.DNSResolver
4039
}
4140

42-
func NewAdsDnsResolver(adsCache *AdsCache) (*AdsDnsResolver, error) {
41+
// pending resolve domain info of Kennel-Native Mode,
42+
// domain name is used for dns resolution
43+
// cluster is used for create the apicluster
44+
type pendingResolveDomain struct {
45+
DomainName string
46+
Clusters []*clusterv3.Cluster
47+
RefreshRate time.Duration
48+
}
49+
50+
func NewDnsResolver(adsCache *AdsCache) (*dnsController, error) {
4351
resolver, err := dns.NewDNSResolver()
4452
if err != nil {
4553
return nil, err
4654
}
47-
return &AdsDnsResolver{
48-
Clusters: make(chan []*clusterv3.Cluster),
49-
dnsRefreshQueue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "dnsRefreshQueue"}),
50-
adsCache: adsCache,
51-
dnsResolver: resolver,
55+
return &dnsController{
56+
Clusters: make(chan []*clusterv3.Cluster),
57+
// dnsRefreshQueue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "dnsRefreshQueue"}),
58+
cache: adsCache,
59+
dnsResolver: resolver,
5260
}, nil
5361
}
5462

55-
func (adsResolver *AdsDnsResolver) StartAdsDnsResolver(stopCh <-chan struct{}) {
56-
go adsResolver.startAdsResolver()
57-
go adsResolver.refreshAdsWorker()
63+
func (r *dnsController) StartKernelNativeDnsController(stopCh <-chan struct{}) {
64+
go r.startDnsController()
65+
// start dns resolver
66+
go r.dnsResolver.StartDnsResolver(stopCh)
5867
go func() {
5968
<-stopCh
60-
adsResolver.dnsRefreshQueue.ShutDown()
61-
close(adsResolver.Clusters)
69+
close(r.Clusters)
6270
}()
6371
}
6472

65-
func (adsResolver *AdsDnsResolver) startAdsResolver() {
73+
func (r *dnsController) startDnsController() {
6674
rateLimiter := make(chan struct{}, dns.MaxConcurrency)
67-
for clusters := range adsResolver.Clusters {
75+
for clusters := range r.Clusters {
6876
rateLimiter <- struct{}{}
6977
go func(clusters []*clusterv3.Cluster) {
7078
defer func() {
7179
<-rateLimiter
7280
}()
73-
adsResolver.resolveDomains(clusters)
81+
r.resolveDomains(clusters)
7482
}(clusters)
7583
}
7684
}
7785

78-
func (adsResolver *AdsDnsResolver) refreshAdsDns() bool {
79-
element, quit := adsResolver.dnsRefreshQueue.Get()
80-
if quit {
81-
return false
82-
}
83-
defer adsResolver.dnsRefreshQueue.Done(element)
84-
e := element.(*dns.PendingResolveDomain)
85-
86-
adsResolver.dnsResolver.RLock()
87-
_, exist := adsResolver.dnsResolver.Cache[e.DomainName]
88-
adsResolver.dnsResolver.RUnlock()
89-
// if the domain is no longer watched, no need to refresh it
90-
if !exist {
91-
return true
92-
}
93-
addresses, ttl, err := adsResolver.dnsResolver.Resolve(e.DomainName)
94-
if err != nil {
95-
log.Errorf("failed to dns resolve: %v", err)
96-
return false
97-
}
98-
if ttl > e.RefreshRate {
99-
ttl = e.RefreshRate
100-
}
101-
if ttl == 0 {
102-
ttl = dns.DeRefreshInterval
103-
}
104-
adsResolver.dnsRefreshQueue.AddAfter(e, ttl)
86+
func (r *dnsController) resolveDomains(cds []*clusterv3.Cluster) {
87+
domains := getPendingResolveDomain(cds)
88+
hostNames := make(map[string]struct{})
10589

106-
adsResolver.adsDnsResolve(e, addresses)
107-
adsResolver.adsCache.ClusterCache.Flush()
108-
return true
109-
}
90+
for k := range domains {
91+
hostNames[k] = struct{}{}
92+
}
11093

111-
func (adsResolver *AdsDnsResolver) resolveDomains(cds []*clusterv3.Cluster) {
112-
domains := getPendingResolveDomain(cds)
94+
// delete any scheduled re-resolve for domains we no longer care about
95+
r.dnsResolver.RemoveUnwatchDomain(hostNames)
96+
// Directly update the clusters that can find the dns resolution result in the cache
97+
alreadyResolveDomains := r.dnsResolver.GetAddressesFromCache(hostNames)
98+
for k, v := range alreadyResolveDomains {
99+
pendingDomain := domains[k]
100+
r.adsDnsResolve(pendingDomain, v.Addresses)
101+
r.cache.ClusterCache.Flush()
102+
delete(domains, k)
103+
}
113104

114-
// Stow domain updates, need to remove unwatched domains first
115-
adsResolver.dnsResolver.RemoveUnwatchedDomain(domains)
116105
for k, v := range domains {
117-
adsResolver.dnsResolver.ResolveDomains(k)
118-
adsResolver.dnsRefreshQueue.AddAfter(v, 0)
106+
r.dnsResolver.ResolveDomains(k)
107+
domainInfo := &dns.DomainInfo{
108+
Domain: v.DomainName,
109+
RefreshRate: v.RefreshRate,
110+
}
111+
r.dnsResolver.RefreshQueue.AddAfter(domainInfo, 0)
119112
}
113+
go r.refreshAdsWorker(domains)
120114
}
121115

122-
func (adsResolver *AdsDnsResolver) refreshAdsWorker() {
123-
for adsResolver.refreshAdsDns() {
116+
func (r *dnsController) refreshAdsWorker(domains map[string]*pendingResolveDomain) {
117+
for !(len(domains) == 0) {
118+
domain := <-r.dnsResolver.AdsDnsChan
119+
v, ok := domains[domain]
120+
// will this happen?
121+
if !ok {
122+
continue
123+
}
124+
addresses, _ := r.dnsResolver.GetOneDomainFromCache(domain)
125+
r.adsDnsResolve(v, addresses)
126+
r.cache.ClusterCache.Flush()
127+
delete(domains, domain)
124128
}
125129
}
126130

127-
func (adsResolver *AdsDnsResolver) adsDnsResolve(pendingDomain *dns.PendingResolveDomain, addrs []string) {
131+
func (r *dnsController) adsDnsResolve(pendingDomain *pendingResolveDomain, addrs []string) {
128132
for _, cluster := range pendingDomain.Clusters {
129133
ready := overwriteDnsCluster(cluster, pendingDomain.DomainName, addrs)
130134
if ready {
131-
if !adsResolver.adsCache.UpdateApiClusterIfExists(core_v2.ApiStatus_UPDATE, cluster) {
135+
if !r.cache.UpdateApiClusterIfExists(core_v2.ApiStatus_UPDATE, cluster) {
132136
log.Debugf("cluster: %s is deleted", cluster.Name)
133137
return
134138
}
@@ -201,8 +205,8 @@ func overwriteDnsCluster(cluster *clusterv3.Cluster, domain string, addrs []stri
201205
return ready
202206
}
203207

204-
func getPendingResolveDomain(cds []*clusterv3.Cluster) map[string]*dns.PendingResolveDomain {
205-
domains := make(map[string]*dns.PendingResolveDomain)
208+
func getPendingResolveDomain(cds []*clusterv3.Cluster) map[string]*pendingResolveDomain {
209+
domains := make(map[string]*pendingResolveDomain)
206210

207211
for _, cluster := range cds {
208212
if cluster.LoadAssignment == nil {
@@ -224,7 +228,7 @@ func getPendingResolveDomain(cds []*clusterv3.Cluster) map[string]*dns.PendingRe
224228
if v, ok := domains[address]; ok {
225229
v.Clusters = append(v.Clusters, cluster)
226230
} else {
227-
domainWithRefreshRate := &dns.PendingResolveDomain{
231+
domainWithRefreshRate := &pendingResolveDomain{
228232
DomainName: address,
229233
Clusters: []*clusterv3.Cluster{cluster},
230234
RefreshRate: cluster.GetDnsRefreshRate().AsDuration(),

pkg/controller/ads/dns_test.go

+5-137
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
"istio.io/istio/pkg/test/util/retry"
3333

3434
core_v2 "kmesh.net/kmesh/api/v2/core"
35-
"kmesh.net/kmesh/pkg/dns"
3635
)
3736

3837
func TestOverwriteDNSCluster(t *testing.T) {
@@ -197,19 +196,14 @@ func TestHandleCdsResponseWithDns(t *testing.T) {
197196
clusters: []*clusterv3.Cluster{cluster1, cluster2},
198197
expected: []string{"foo.bar", "foo.baz"},
199198
},
200-
{
201-
name: "remove all DNS type clusters",
202-
clusters: []*clusterv3.Cluster{},
203-
expected: []string{},
204-
},
205199
}
206200

207201
p := NewController(nil).Processor
208202
stopCh := make(chan struct{})
209203
defer close(stopCh)
210-
dnsResolver, err := NewAdsDnsResolver(p.Cache)
204+
dnsResolver, err := NewDnsResolver(p.Cache)
211205
assert.NoError(t, err)
212-
dnsResolver.StartAdsDnsResolver(stopCh)
206+
dnsResolver.StartKernelNativeDnsController(stopCh)
213207
p.DnsResolverChan = dnsResolver.Clusters
214208
for _, tc := range testcases {
215209
t.Run(tc.name, func(t *testing.T) {
@@ -222,132 +216,6 @@ func TestHandleCdsResponseWithDns(t *testing.T) {
222216
}
223217
}
224218

225-
func TestDNS(t *testing.T) {
226-
fakeDNSServer := dns.NewFakeDNSServer()
227-
228-
testDNSResolver, err := NewAdsDnsResolver(NewAdsCache(nil))
229-
if err != nil {
230-
t.Fatal(err)
231-
}
232-
stopCh := make(chan struct{})
233-
defer close(stopCh)
234-
// testDNSResolver.StartAdsDnsResolver(stopCh)
235-
dnsServer := fakeDNSServer.Server.PacketConn.LocalAddr().String()
236-
testDNSResolver.dnsResolver.ResolvConfServers = []string{dnsServer}
237-
238-
testCases := []struct {
239-
name string
240-
domain string
241-
refreshRate time.Duration
242-
ttl time.Duration
243-
expected []string
244-
expectedAfterTTL []string
245-
registerDomain func(domain string)
246-
}{
247-
{
248-
name: "success",
249-
domain: "www.google.com.",
250-
refreshRate: 10 * time.Second,
251-
expected: []string{"10.0.0.1", "fd00::1"},
252-
registerDomain: func(domain string) {
253-
fakeDNSServer.SetHosts(domain, 1)
254-
},
255-
},
256-
{
257-
name: "check dns refresh after ttl, ttl < refreshRate",
258-
domain: "www.bing.com.",
259-
refreshRate: 10 * time.Second,
260-
ttl: 3 * time.Second,
261-
expected: []string{"10.0.0.2", "fd00::2"},
262-
expectedAfterTTL: []string{"10.0.0.3", "fd00::3"},
263-
registerDomain: func(domain string) {
264-
fakeDNSServer.SetHosts(domain, 2)
265-
fakeDNSServer.SetTTL(uint32(3))
266-
time.AfterFunc(time.Second, func() {
267-
fakeDNSServer.SetHosts(domain, 3)
268-
})
269-
},
270-
},
271-
{
272-
name: "check dns refresh after ttl without update bpfmap",
273-
domain: "www.test.com.",
274-
refreshRate: 10 * time.Second,
275-
ttl: 3 * time.Second,
276-
expected: []string{"10.0.0.2", "fd00::2"},
277-
expectedAfterTTL: []string{"10.0.0.2", "fd00::2"},
278-
registerDomain: func(domain string) {
279-
fakeDNSServer.SetHosts(domain, 2)
280-
fakeDNSServer.SetTTL(uint32(3))
281-
},
282-
},
283-
{
284-
name: "check dns refresh after refreshRate, ttl > refreshRate",
285-
domain: "www.baidu.com.",
286-
refreshRate: 3 * time.Second,
287-
ttl: 10 * time.Second,
288-
expected: []string{"10.0.0.2", "fd00::2"},
289-
expectedAfterTTL: []string{"10.0.0.3", "fd00::3"},
290-
registerDomain: func(domain string) {
291-
fakeDNSServer.SetHosts(domain, 2)
292-
fakeDNSServer.SetTTL(uint32(10))
293-
time.AfterFunc(time.Second, func() {
294-
fakeDNSServer.SetHosts(domain, 3)
295-
})
296-
},
297-
},
298-
{
299-
name: "failed to resolve",
300-
domain: "www.kmesh.test.",
301-
refreshRate: 10 * time.Second,
302-
expected: []string{},
303-
},
304-
}
305-
var wg sync.WaitGroup
306-
for _, testcase := range testCases {
307-
wg.Add(1)
308-
if testcase.registerDomain != nil {
309-
testcase.registerDomain(testcase.domain)
310-
}
311-
312-
input := &dns.PendingResolveDomain{
313-
DomainName: testcase.domain,
314-
RefreshRate: testcase.refreshRate,
315-
}
316-
testDNSResolver.dnsResolver.Lock()
317-
testDNSResolver.dnsResolver.Cache[testcase.domain] = &dns.DomainCacheEntry{}
318-
testDNSResolver.dnsResolver.Unlock()
319-
go testDNSResolver.refreshAdsWorker()
320-
321-
_, ttl, err := testDNSResolver.dnsResolver.Resolve(input.DomainName)
322-
assert.NoError(t, err)
323-
if ttl > input.RefreshRate {
324-
ttl = input.RefreshRate
325-
}
326-
if ttl == 0 {
327-
ttl = dns.DeRefreshInterval
328-
}
329-
testDNSResolver.dnsRefreshQueue.AddAfter(input, ttl)
330-
time.Sleep(2 * time.Second)
331-
332-
res := testDNSResolver.dnsResolver.GetDNSAddresses(testcase.domain)
333-
if len(res) != 0 || len(testcase.expected) != 0 {
334-
if !reflect.DeepEqual(res, testcase.expected) {
335-
t.Errorf("dns resolve for %s do not match. \n got %v\nwant %v", testcase.domain, res, testcase.expected)
336-
}
337-
338-
if testcase.expectedAfterTTL != nil {
339-
time.Sleep(ttl + 1)
340-
res = testDNSResolver.dnsResolver.GetDNSAddresses(testcase.domain)
341-
if !reflect.DeepEqual(res, testcase.expectedAfterTTL) {
342-
t.Errorf("dns refresh after ttl failed, for %s do not match. \n got %v\nwant %v", testcase.domain, res, testcase.expectedAfterTTL)
343-
}
344-
}
345-
}
346-
wg.Done()
347-
}
348-
wg.Wait()
349-
}
350-
351219
func TestGetPendingResolveDomain(t *testing.T) {
352220
utCluster := clusterv3.Cluster{
353221
Name: "testCluster",
@@ -411,7 +279,7 @@ func TestGetPendingResolveDomain(t *testing.T) {
411279
tests := []struct {
412280
name string
413281
args args
414-
want map[string]*dns.PendingResolveDomain
282+
want map[string]*pendingResolveDomain
415283
}{
416284
{
417285
name: "empty domains test",
@@ -420,7 +288,7 @@ func TestGetPendingResolveDomain(t *testing.T) {
420288
&utCluster,
421289
},
422290
},
423-
want: map[string]*dns.PendingResolveDomain{},
291+
want: map[string]*pendingResolveDomain{},
424292
},
425293
{
426294
name: "cluster domain is not IP",
@@ -429,7 +297,7 @@ func TestGetPendingResolveDomain(t *testing.T) {
429297
&utClusterWithHost,
430298
},
431299
},
432-
want: map[string]*dns.PendingResolveDomain{
300+
want: map[string]*pendingResolveDomain{
433301
"www.google.com": {
434302
DomainName: "www.google.com",
435303
Clusters: []*clusterv3.Cluster{&utClusterWithHost},

0 commit comments

Comments
 (0)