Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition (also a regression of the PR 19139) #19221

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 25 additions & 21 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,19 +411,6 @@
close(e.stopc)
})

// close client requests with request timeout
timeout := 2 * time.Second
if e.Server != nil {
timeout = e.Server.Cfg.ReqTimeout()
}
for _, sctx := range e.sctxs {
for ss := range sctx.serversC {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
stopServers(ctx, ss)
cancel()
}
}

for _, sctx := range e.sctxs {
sctx.cancel()
}
Expand All @@ -443,11 +430,6 @@
e.tracingExporterShutdown()
}

// close rafthttp transports
if e.Server != nil {
e.Server.Stop()
}

// close all idle connections in peer handler (wait up to 1-second)
for i := range e.Peers {
if e.Peers[i] != nil && e.Peers[i].close != nil {
Expand All @@ -456,6 +438,25 @@
cancel()
}
}

// close client requests with request timeout
timeout := 2 * time.Second
if e.Server != nil {
timeout = e.Server.Cfg.ReqTimeout()
}
for _, sctx := range e.sctxs {
for ss := range sctx.serversC {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
stopServers(ctx, ss)
cancel()
}
}

// close rafthttp transports
if e.Server != nil {
e.Server.Stop()
}

if e.errc != nil {
e.wg.Wait()
close(e.errc)
Expand Down Expand Up @@ -607,7 +608,9 @@

// start peer servers in a goroutine
for _, pl := range e.Peers {
e.wg.Add(1)
go func(l *peerListener) {
defer e.wg.Done()
u := l.Addr().String()
e.cfg.logger.Info(
"serving peer traffic",
Expand Down Expand Up @@ -774,7 +777,9 @@

// start client servers in each goroutine
for _, sctx := range e.sctxs {
e.wg.Add(1)
go func(s *serveCtx) {
defer e.wg.Done()
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...))
}(sctx)
}
Expand Down Expand Up @@ -859,7 +864,9 @@
return err
}
e.metricsListeners = append(e.metricsListeners, ml)
e.wg.Add(1)

Check warning on line 867 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L867

Added line #L867 was not covered by tests
go func(u url.URL, ln net.Listener) {
defer e.wg.Done()

Check warning on line 869 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L869

Added line #L869 was not covered by tests
e.cfg.logger.Info(
"serving metrics",
zap.String("address", u.String()),
Expand All @@ -872,9 +879,6 @@
}

func (e *Etcd) errHandler(err error) {
e.wg.Add(1)
defer e.wg.Done()

if err != nil {
e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err))
}
Expand Down
24 changes: 19 additions & 5 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
serviceRegister func(*grpc.Server)
serversC chan *servers
closeOnce sync.Once

wg sync.WaitGroup
}

type servers struct {
Expand Down Expand Up @@ -182,13 +184,17 @@
server = m.Serve

httpl := m.Match(cmux.HTTP1())
sctx.wg.Add(1)
go func(srvhttp *http.Server, tlsLis net.Listener) {
defer sctx.wg.Done()
errHandler(srvhttp.Serve(tlsLis))
}(srv, httpl)

if grpcEnabled {
grpcl := m.Match(cmux.HTTP2())
sctx.wg.Add(1)
go func(gs *grpc.Server, l net.Listener) {
defer sctx.wg.Done()
errHandler(gs.Serve(l))
}(gs, grpcl)
}
Expand Down Expand Up @@ -237,7 +243,7 @@
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
}
if err := configureHTTPServer(srv, s.Cfg); err != nil {
if err = configureHTTPServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure https server failed", zap.Error(err))
return err
}
Expand All @@ -248,11 +254,13 @@
} else {
server = m.Serve

tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if err != nil {
return err
tlsl, tlsErr := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if tlsErr != nil {
return tlsErr

Check warning on line 259 in server/embed/serve.go

View check run for this annotation

Codecov / codecov/patch

server/embed/serve.go#L259

Added line #L259 was not covered by tests
}
sctx.wg.Add(1)
go func(srvhttp *http.Server, tlsl net.Listener) {
defer sctx.wg.Done()
errHandler(srvhttp.Serve(tlsl))
}(srv, tlsl)
}
Expand All @@ -265,7 +273,11 @@
)
}

return server()
err = server()
sctx.close()
// ensure all goroutines, which are created by this method, to complete before this method returns.
sctx.wg.Wait()
return err
}

func configureHTTPServer(srv *http.Server, cfg config.ServerConfig) error {
Expand Down Expand Up @@ -334,7 +346,9 @@
return nil, err
}
}
sctx.wg.Add(1)
go func() {
defer sctx.wg.Done()
<-ctx.Done()
if cerr := conn.Close(); cerr != nil {
sctx.lg.Warn(
Expand Down
Loading