Skip to content

Commit fb354c5

Browse files
make ResolveNow non block (#5)
* make resolve now non block * Update resolver.go Co-authored-by: Geon Kim <[email protected]> * rename lookupCloudmap --------- Co-authored-by: Geon Kim <[email protected]>
1 parent 01012f2 commit fb354c5

File tree

3 files changed

+123
-37
lines changed

3 files changed

+123
-37
lines changed

builder.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package cloudmap
22

33
import (
4-
"sync"
4+
"context"
55
"time"
66

77
"google.golang.org/grpc/grpclog"
@@ -37,11 +37,11 @@ type builder struct {
3737
// so you don't need to call this function to register the default builder.
3838
//
3939
// Default Options:
40+
//
4041
// Session: session.NewSession()
4142
// HealthStatusFilter: HealthStatusFilterHealthy
4243
// MaxResults: 100
4344
// RefreshInterval: 30s
44-
//
4545
func Register(opts ...Opt) {
4646
b := &builder{
4747
healthStatusFilter: HealthStatusFilterHealthy,
@@ -72,23 +72,26 @@ func (b *builder) Build(t grpcresolver.Target, cc grpcresolver.ClientConn, _ grp
7272
}
7373
}
7474

75+
ctx, cancel := context.WithCancel(context.Background())
7576
r := &resolver{
76-
mu: &sync.RWMutex{},
77-
7877
logger: grpclog.Component(b.Scheme()),
7978

8079
cc: cc,
8180

82-
ticker: time.NewTicker(b.refreshInterval),
83-
8481
sd: servicediscovery.New(sess),
8582
namespace: cmT.namespace,
8683
service: cmT.service,
8784
healthStatusFilter: b.healthStatusFilter,
8885
maxResults: b.maxResults,
86+
87+
ctx: ctx,
88+
cancel: cancel,
89+
ticker: time.NewTicker(b.refreshInterval),
90+
resolveCmd: make(chan struct{}, 1),
8991
}
9092

91-
go r.watch()
93+
r.wg.Add(1)
94+
go r.watcher()
9295

9396
return r, nil
9497
}

resolver.go

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cloudmap
22

33
import (
4+
"context"
45
"fmt"
56
"sync"
67
"time"
@@ -14,34 +15,35 @@ import (
1415
"github.com/aws/aws-sdk-go/service/servicediscovery"
1516
)
1617

17-
type resolver struct {
18-
mu *sync.RWMutex
19-
isClosed bool
18+
type serviceDiscovery interface {
19+
DiscoverInstances(input *servicediscovery.DiscoverInstancesInput) (*servicediscovery.DiscoverInstancesOutput, error)
20+
}
2021

22+
type resolver struct {
2123
logger grpclog.LoggerV2
24+
cc grpcresolver.ClientConn
2225

23-
cc grpcresolver.ClientConn
24-
25-
ticker *time.Ticker
26-
27-
sd *servicediscovery.ServiceDiscovery
26+
sd serviceDiscovery
2827
namespace string
2928
service string
3029
healthStatusFilter string
3130
maxResults int64
31+
32+
ctx context.Context
33+
cancel context.CancelFunc
34+
ticker *time.Ticker
35+
resolveCmd chan struct{}
36+
wg sync.WaitGroup
3237
}
3338

3439
func (c *resolver) ResolveNow(grpcresolver.ResolveNowOptions) {
35-
locked := c.mu.TryLock()
36-
if !locked { // already resolving
37-
return
38-
}
39-
defer c.mu.Unlock()
40-
41-
if c.isClosed {
42-
return
40+
select {
41+
case c.resolveCmd <- struct{}{}:
42+
default:
4343
}
44+
}
4445

46+
func (c *resolver) lookupCloudmap() (*grpcresolver.State, error) {
4547
output, err := c.sd.DiscoverInstances(&servicediscovery.DiscoverInstancesInput{
4648
NamespaceName: aws.String(c.namespace),
4749
ServiceName: aws.String(c.service),
@@ -65,34 +67,45 @@ func (c *resolver) ResolveNow(grpcresolver.ResolveNowOptions) {
6567
} else {
6668
c.logger.Errorln(err.Error())
6769
}
68-
c.cc.ReportError(err)
69-
return
70+
return nil, err
7071
}
7172

7273
addrs := make([]grpcresolver.Address, len(output.Instances))
7374
for i, instance := range output.Instances {
7475
addrs[i] = httpInstanceSummaryToAddr(instance)
7576
}
7677

77-
c.cc.UpdateState(grpcresolver.State{Addresses: addrs})
78+
return &grpcresolver.State{Addresses: addrs}, nil
7879
}
7980

8081
func (c *resolver) Close() {
81-
c.mu.Lock()
82-
defer c.mu.Unlock()
83-
84-
if c.isClosed {
85-
return
86-
}
87-
88-
c.isClosed = true
82+
c.cancel()
8983
c.ticker.Stop()
84+
c.wg.Wait()
9085
}
9186

92-
func (c *resolver) watch() {
87+
func (c *resolver) watcher() {
88+
defer c.wg.Done()
89+
9390
for {
94-
c.ResolveNow(grpcresolver.ResolveNowOptions{})
95-
<-c.ticker.C
91+
state, err := c.lookupCloudmap()
92+
if err != nil {
93+
c.cc.ReportError(err)
94+
} else {
95+
err = c.cc.UpdateState(*state)
96+
}
97+
98+
if err != nil {
99+
c.logger.Errorln(err)
100+
// wait for next iteration
101+
}
102+
103+
select {
104+
case <-c.ctx.Done():
105+
return
106+
case <-c.ticker.C:
107+
case <-c.resolveCmd:
108+
}
96109
}
97110
}
98111

resolver_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package cloudmap
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/aws/aws-sdk-go/service/servicediscovery"
7+
"google.golang.org/grpc/grpclog"
8+
grpcresolver "google.golang.org/grpc/resolver"
9+
"google.golang.org/grpc/serviceconfig"
10+
"testing"
11+
"time"
12+
)
13+
14+
type mockCC struct{}
15+
16+
func (m mockCC) UpdateState(state grpcresolver.State) error { return nil }
17+
18+
func (m mockCC) ReportError(err error) {}
19+
20+
func (m mockCC) NewAddress(addresses []grpcresolver.Address) {}
21+
22+
func (m mockCC) NewServiceConfig(serviceConfig string) {}
23+
24+
func (m mockCC) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult {
25+
return nil
26+
}
27+
28+
type mockDiscovery struct{}
29+
30+
func (m mockDiscovery) DiscoverInstances(input *servicediscovery.DiscoverInstancesInput) (*servicediscovery.DiscoverInstancesOutput, error) {
31+
time.Sleep(1 * time.Second)
32+
fmt.Println("DiscoverInstances called")
33+
return &servicediscovery.DiscoverInstancesOutput{
34+
Instances: make([]*servicediscovery.HttpInstanceSummary, 0),
35+
}, nil
36+
}
37+
38+
func Test_resolver(t *testing.T) {
39+
ctx, cancel := context.WithCancel(context.Background())
40+
r := &resolver{
41+
logger: grpclog.Component("test"),
42+
43+
cc: mockCC{},
44+
sd: mockDiscovery{},
45+
46+
ctx: ctx,
47+
cancel: cancel,
48+
ticker: time.NewTicker(10 * time.Second),
49+
resolveCmd: make(chan struct{}, 1),
50+
}
51+
52+
r.wg.Add(1)
53+
go r.watcher()
54+
55+
timeout := time.After(100 * time.Millisecond)
56+
done := make(chan bool)
57+
go func() {
58+
for i := 0; i < 10; i++ {
59+
r.ResolveNow(grpcresolver.ResolveNowOptions{})
60+
}
61+
done <- true
62+
}()
63+
select {
64+
case <-timeout:
65+
t.Error("timeout")
66+
case <-done:
67+
t.Log("done")
68+
}
69+
r.Close()
70+
}

0 commit comments

Comments
 (0)