Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor dns #1247

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
243 changes: 243 additions & 0 deletions pkg/controller/ads/dns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Copyright The Kmesh Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ads

import (
"net"
"net/netip"
"slices"
"time"

clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"google.golang.org/protobuf/types/known/wrapperspb"

core_v2 "kmesh.net/kmesh/api/v2/core"
"kmesh.net/kmesh/pkg/dns"
)

// adsDnsResolver is DNS resolver of Kernel Native
type dnsController struct {
Clusters chan []*clusterv3.Cluster
cache *AdsCache
dnsResolver *dns.DNSResolver
}

// pending resolve domain info of Kennel-Native Mode,
// domain name is used for dns resolution
// cluster is used for create the apicluster
type pendingResolveDomain struct {
DomainName string
Clusters []*clusterv3.Cluster
RefreshRate time.Duration
}

func NewDnsResolver(adsCache *AdsCache) (*dnsController, error) {
resolver, err := dns.NewDNSResolver()
if err != nil {
return nil, err
}

Check warning on line 54 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L53-L54

Added lines #L53 - L54 were not covered by tests
return &dnsController{
Clusters: make(chan []*clusterv3.Cluster),
// dnsRefreshQueue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "dnsRefreshQueue"}),
cache: adsCache,
dnsResolver: resolver,
}, nil
}

func (r *dnsController) StartKernelNativeDnsController(stopCh <-chan struct{}) {
go r.startDnsController()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks confusing with StartDnsResolver.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already change function name

// start dns resolver
go r.dnsResolver.StartDnsResolver(stopCh)
go func() {
<-stopCh
close(r.Clusters)
}()
}

func (r *dnsController) startDnsController() {
rateLimiter := make(chan struct{}, dns.MaxConcurrency)
for clusters := range r.Clusters {
rateLimiter <- struct{}{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how does the ratelimiter effect, because in the dns pkg, it always run in order

go func(clusters []*clusterv3.Cluster) {
defer func() {
<-rateLimiter
}()
r.resolveDomains(clusters)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not safe running async in separate routines,

think about this case:

t0: a resolveDomains thread

t1: another resolveDomains

If t1 is run before t0, even some step.The state machine would be broken

}(clusters)
}
}

func (r *dnsController) resolveDomains(cds []*clusterv3.Cluster) {
domains := getPendingResolveDomain(cds)
hostNames := make(map[string]struct{})

for k := range domains {
hostNames[k] = struct{}{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you need convert after calling getPendingResolveDomain, why not directly return what you want

}

// delete any scheduled re-resolve for domains we no longer care about
r.dnsResolver.RemoveUnwatchDomain(hostNames)
// Directly update the clusters that can find the dns resolution result in the cache
alreadyResolveDomains := r.dnsResolver.GetAddressesFromCache(hostNames)
for k, v := range alreadyResolveDomains {
pendingDomain := domains[k]
r.adsDnsResolve(pendingDomain, v.Addresses)
r.cache.ClusterCache.Flush()
delete(domains, k)
}

Check warning on line 103 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L99-L103

Added lines #L99 - L103 were not covered by tests

for k, v := range domains {
r.dnsResolver.ResolveDomains(k)
domainInfo := &dns.DomainInfo{
Domain: v.DomainName,
RefreshRate: v.RefreshRate,
}
r.dnsResolver.RefreshQueue.AddAfter(domainInfo, 0)
}
go r.refreshAdsWorker(domains)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will this thread exit?

}

func (r *dnsController) refreshAdsWorker(domains map[string]*pendingResolveDomain) {
for !(len(domains) == 0) {
domain := <-r.dnsResolver.AdsDnsChan
v, ok := domains[domain]
// will this happen?
if !ok {
continue

Check warning on line 122 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L122

Added line #L122 was not covered by tests
}
addresses, _ := r.dnsResolver.GetOneDomainFromCache(domain)
r.adsDnsResolve(v, addresses)
r.cache.ClusterCache.Flush()
delete(domains, domain)

Check warning on line 127 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L124-L127

Added lines #L124 - L127 were not covered by tests
}
}

func (r *dnsController) adsDnsResolve(pendingDomain *pendingResolveDomain, addrs []string) {
for _, cluster := range pendingDomain.Clusters {
ready := overwriteDnsCluster(cluster, pendingDomain.DomainName, addrs)
if ready {
if !r.cache.UpdateApiClusterIfExists(core_v2.ApiStatus_UPDATE, cluster) {
log.Debugf("cluster: %s is deleted", cluster.Name)
return
}

Check warning on line 138 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L131-L138

Added lines #L131 - L138 were not covered by tests
}
}
}

func overwriteDnsCluster(cluster *clusterv3.Cluster, domain string, addrs []string) bool {
buildLbEndpoints := func(port uint32) []*endpointv3.LbEndpoint {
lbEndpoints := make([]*endpointv3.LbEndpoint, 0, len(addrs))
for _, addr := range addrs {
ip := net.ParseIP(addr)
if ip == nil {
continue

Check warning on line 149 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L149

Added line #L149 was not covered by tests
}
if ip.To4() == nil {
continue

Check warning on line 152 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L152

Added line #L152 was not covered by tests
}
lbEndpoint := &endpointv3.LbEndpoint{
HealthStatus: v3.HealthStatus_HEALTHY,
HostIdentifier: &endpointv3.LbEndpoint_Endpoint{
Endpoint: &endpointv3.Endpoint{
Address: &v3.Address{
Address: &v3.Address_SocketAddress{
SocketAddress: &v3.SocketAddress{
Address: addr,
PortSpecifier: &v3.SocketAddress_PortValue{
PortValue: port,
},
},
},
},
},
},
// TODO: support LoadBalancingWeight
LoadBalancingWeight: &wrapperspb.UInt32Value{
Value: 1,
},
}
lbEndpoints = append(lbEndpoints, lbEndpoint)
}
return lbEndpoints
}

ready := true
for _, e := range cluster.LoadAssignment.Endpoints {
pos := -1
var lbEndpoints []*endpointv3.LbEndpoint
for i, le := range e.LbEndpoints {
socketAddr, ok := le.GetEndpoint().GetAddress().GetAddress().(*v3.Address_SocketAddress)
if !ok {
continue

Check warning on line 187 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L187

Added line #L187 was not covered by tests
}
_, err := netip.ParseAddr(socketAddr.SocketAddress.Address)
if err != nil {
if socketAddr.SocketAddress.Address == domain {
pos = i
lbEndpoints = buildLbEndpoints(socketAddr.SocketAddress.GetPortValue())
} else {
// There is other domains not resolved for this cluster
ready = false
}

Check warning on line 197 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L195-L197

Added lines #L195 - L197 were not covered by tests
}
}
if pos >= 0 {
e.LbEndpoints = slices.Replace(e.LbEndpoints, pos, pos+1, lbEndpoints...)
}
}

return ready
}

func getPendingResolveDomain(cds []*clusterv3.Cluster) map[string]*pendingResolveDomain {
domains := make(map[string]*pendingResolveDomain)

for _, cluster := range cds {
if cluster.LoadAssignment == nil {
continue

Check warning on line 213 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L213

Added line #L213 was not covered by tests
}

for _, e := range cluster.LoadAssignment.Endpoints {
for _, le := range e.LbEndpoints {
socketAddr, ok := le.GetEndpoint().GetAddress().GetAddress().(*v3.Address_SocketAddress)
if !ok {
continue

Check warning on line 220 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L220

Added line #L220 was not covered by tests
}
address := socketAddr.SocketAddress.Address
if _, err := netip.ParseAddr(address); err == nil {
// This is an ip address
continue
}

if v, ok := domains[address]; ok {
v.Clusters = append(v.Clusters, cluster)

Check warning on line 229 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L229

Added line #L229 was not covered by tests
} else {
domainWithRefreshRate := &pendingResolveDomain{
DomainName: address,
Clusters: []*clusterv3.Cluster{cluster},
RefreshRate: cluster.GetDnsRefreshRate().AsDuration(),
}
domains[address] = domainWithRefreshRate
}
}
}
}

return domains
}
Loading