Skip to content

Commit 6a189f3

Browse files
committed
Perform graceful shutdown according to number of connections
Prior to this change the watchdog would attempt to wait for the write_timeout value twice, even if there were no connections in flight. This adds a new metric for in-flight connections and removes the explicit wait. Testing with killall of-watchdog -s SIGTERM performs as expected, and clients that already connected complete their work before the termination. This change is a prerequisite for a change in faas-netes openfaas/faas-netes#853 Signed-off-by: Alex Ellis (OpenFaaS Ltd) <[email protected]>
1 parent 989ac5f commit 6a189f3

File tree

13 files changed

+788
-89
lines changed

13 files changed

+788
-89
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,5 @@ template
2525
**/*.sha256
2626

2727
bin
28+
/handler
29+
/Dockerfile2

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ Environmental variables:
136136
| `static_path` | Yes | Absolute or relative path to the directory that will be served if `mode="static"` |
137137
| `read_timeout` | Yes | HTTP timeout for reading the payload from the client caller (in seconds) |
138138
| `write_timeout` | Yes | HTTP timeout for writing a response body from your function (in seconds) |
139+
| `healthcheck_interval` | Yes | Interval (in seconds) for HTTP healthcheck by container orchestrator i.e. kubelet. Used for graceful shutdowns. |
139140
| `exec_timeout` | Yes | Exec timeout for process exec'd for each incoming request (in seconds). Disabled if set to 0. |
140141
| `port` | Yes | Specify an alternative TCP port for testing. Default: `8080` |
141142
| `write_debug` | No | Write all output, error messages, and additional information to the logs. Default is `false`. |

config/config.go

+40-24
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package config
55

66
import (
7+
"fmt"
78
"log"
89
"strconv"
910
"strings"
@@ -12,10 +13,11 @@ import (
1213

1314
// WatchdogConfig configuration for a watchdog.
1415
type WatchdogConfig struct {
15-
TCPPort int
16-
HTTPReadTimeout time.Duration
17-
HTTPWriteTimeout time.Duration
18-
ExecTimeout time.Duration
16+
TCPPort int
17+
HTTPReadTimeout time.Duration
18+
HTTPWriteTimeout time.Duration
19+
ExecTimeout time.Duration
20+
HealthcheckInterval time.Duration
1921

2022
FunctionProcess string
2123
ContentType string
@@ -56,7 +58,7 @@ func (w WatchdogConfig) Process() (string, []string) {
5658
}
5759

5860
// New create config based upon environmental variables.
59-
func New(env []string) WatchdogConfig {
61+
func New(env []string) (WatchdogConfig, error) {
6062

6163
envMap := mapEnv(env)
6264

@@ -100,29 +102,43 @@ func New(env []string) WatchdogConfig {
100102
staticPath = val
101103
}
102104

103-
config := WatchdogConfig{
104-
TCPPort: getInt(envMap, "port", 8080),
105-
HTTPReadTimeout: getDuration(envMap, "read_timeout", time.Second*10),
106-
HTTPWriteTimeout: getDuration(envMap, "write_timeout", time.Second*10),
107-
FunctionProcess: functionProcess,
108-
StaticPath: staticPath,
109-
InjectCGIHeaders: true,
110-
ExecTimeout: getDuration(envMap, "exec_timeout", time.Second*10),
111-
OperationalMode: ModeStreaming,
112-
ContentType: contentType,
113-
SuppressLock: getBool(envMap, "suppress_lock"),
114-
UpstreamURL: upstreamURL,
115-
BufferHTTPBody: getBools(envMap, "buffer_http", "http_buffer_req_body"),
116-
MetricsPort: 8081,
117-
MaxInflight: getInt(envMap, "max_inflight", 0),
118-
PrefixLogs: prefixLogs,
105+
writeTimeout := getDuration(envMap, "write_timeout", time.Second*10)
106+
healthcheckInterval := writeTimeout
107+
if val, exists := envMap["healthcheck_interval"]; exists {
108+
healthcheckInterval = parseIntOrDurationValue(val, writeTimeout)
109+
}
110+
111+
c := WatchdogConfig{
112+
TCPPort: getInt(envMap, "port", 8080),
113+
HTTPReadTimeout: getDuration(envMap, "read_timeout", time.Second*10),
114+
HTTPWriteTimeout: writeTimeout,
115+
HealthcheckInterval: healthcheckInterval,
116+
FunctionProcess: functionProcess,
117+
StaticPath: staticPath,
118+
InjectCGIHeaders: true,
119+
ExecTimeout: getDuration(envMap, "exec_timeout", time.Second*10),
120+
OperationalMode: ModeStreaming,
121+
ContentType: contentType,
122+
SuppressLock: getBool(envMap, "suppress_lock"),
123+
UpstreamURL: upstreamURL,
124+
BufferHTTPBody: getBools(envMap, "buffer_http", "http_buffer_req_body"),
125+
MetricsPort: 8081,
126+
MaxInflight: getInt(envMap, "max_inflight", 0),
127+
PrefixLogs: prefixLogs,
119128
}
120-
121129
if val := envMap["mode"]; len(val) > 0 {
122-
config.OperationalMode = WatchdogModeConst(val)
130+
c.OperationalMode = WatchdogModeConst(val)
131+
}
132+
133+
if writeTimeout == 0 {
134+
return c, fmt.Errorf("HTTP write timeout must be over 0s")
135+
}
136+
137+
if len(c.FunctionProcess) == 0 && c.OperationalMode != ModeStatic {
138+
return c, fmt.Errorf(`provide a "function_process" or "fprocess" environmental variable for your function`)
123139
}
124140

125-
return config
141+
return c, nil
126142
}
127143

128144
func mapEnv(env []string) map[string]string {

config/config_test.go

+15-15
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ import (
1010
)
1111

1212
func TestNew(t *testing.T) {
13-
defaults := New([]string{})
13+
defaults, _ := New([]string{})
1414
if defaults.TCPPort != 8080 {
1515
t.Errorf("Want TCPPort: 8080, got: %d", defaults.TCPPort)
1616
}
1717
}
1818

1919
func Test_OperationalMode_Default(t *testing.T) {
20-
defaults := New([]string{})
20+
defaults, _ := New([]string{})
2121
if defaults.OperationalMode != ModeStreaming {
2222
t.Errorf("Want %s. got: %s", WatchdogMode(ModeStreaming), WatchdogMode(defaults.OperationalMode))
2323
}
@@ -26,7 +26,7 @@ func Test_OperationalMode_Default(t *testing.T) {
2626
func Test_BufferHttpModeDefaultsToFalse(t *testing.T) {
2727
env := []string{}
2828

29-
actual := New(env)
29+
actual, _ := New(env)
3030
want := false
3131
if actual.BufferHTTPBody != want {
3232
t.Errorf("Want %v. got: %v", want, actual.BufferHTTPBody)
@@ -39,7 +39,7 @@ func Test_UpstreamURL(t *testing.T) {
3939
fmt.Sprintf("upstream_url=%s", urlVal),
4040
}
4141

42-
actual := New(env)
42+
actual, _ := New(env)
4343
want := urlVal
4444
if actual.UpstreamURL != want {
4545
t.Errorf("Want %v. got: %v", want, actual.UpstreamURL)
@@ -52,7 +52,7 @@ func Test_UpstreamURLVerbose(t *testing.T) {
5252
fmt.Sprintf("http_upstream_url=%s", urlVal),
5353
}
5454

55-
actual := New(env)
55+
actual, _ := New(env)
5656
want := urlVal
5757
if actual.UpstreamURL != want {
5858
t.Errorf("Want %v. got: %v", want, actual.UpstreamURL)
@@ -64,7 +64,7 @@ func Test_BufferHttpMode_CanBeSetToTrue(t *testing.T) {
6464
"http_buffer_req_body=true",
6565
}
6666

67-
actual := New(env)
67+
actual, _ := New(env)
6868
want := true
6969
if actual.BufferHTTPBody != want {
7070
t.Errorf("Want %v. got: %v", want, actual.BufferHTTPBody)
@@ -76,7 +76,7 @@ func Test_OperationalMode_AfterBurn(t *testing.T) {
7676
"mode=afterburn",
7777
}
7878

79-
actual := New(env)
79+
actual, _ := New(env)
8080

8181
if actual.OperationalMode != ModeAfterBurn {
8282
t.Errorf("Want %s. got: %s", WatchdogMode(ModeAfterBurn), WatchdogMode(actual.OperationalMode))
@@ -88,7 +88,7 @@ func Test_OperationalMode_Static(t *testing.T) {
8888
"mode=static",
8989
}
9090

91-
actual := New(env)
91+
actual, _ := New(env)
9292

9393
if actual.OperationalMode != ModeStatic {
9494
t.Errorf("Want %s. got: %s", WatchdogMode(ModeStatic), WatchdogMode(actual.OperationalMode))
@@ -98,7 +98,7 @@ func Test_OperationalMode_Static(t *testing.T) {
9898
func Test_ContentType_Default(t *testing.T) {
9999
env := []string{}
100100

101-
actual := New(env)
101+
actual, _ := New(env)
102102

103103
if actual.ContentType != "application/octet-stream" {
104104
t.Errorf("Default (ContentType) Want %s. got: %s", actual.ContentType, "octet-stream")
@@ -110,7 +110,7 @@ func Test_ContentType_Override(t *testing.T) {
110110
"content_type=application/json",
111111
}
112112

113-
actual := New(env)
113+
actual, _ := New(env)
114114

115115
if actual.ContentType != "application/json" {
116116
t.Errorf("(ContentType) Want %s. got: %s", actual.ContentType, "application/json")
@@ -122,7 +122,7 @@ func Test_FunctionProcessLegacyName(t *testing.T) {
122122
"fprocess=env",
123123
}
124124

125-
actual := New(env)
125+
actual, _ := New(env)
126126

127127
if actual.FunctionProcess != "env" {
128128
t.Errorf("Want %s. got: %s", "env", actual.FunctionProcess)
@@ -134,7 +134,7 @@ func Test_FunctionProcessAlternativeName(t *testing.T) {
134134
"function_process=env",
135135
}
136136

137-
actual := New(env)
137+
actual, _ := New(env)
138138

139139
if actual.FunctionProcess != "env" {
140140
t.Errorf("Want %s. got: %s", "env", actual.FunctionProcess)
@@ -177,7 +177,7 @@ func Test_FunctionProcess_Arguments(t *testing.T) {
177177

178178
for _, testCase := range cases {
179179
t.Run(testCase.scenario, func(t *testing.T) {
180-
actual := New([]string{testCase.env})
180+
actual, _ := New([]string{testCase.env})
181181

182182
process, args := actual.Process()
183183
if process != testCase.wantProcess {
@@ -205,7 +205,7 @@ func Test_PortOverride(t *testing.T) {
205205
"port=8081",
206206
}
207207

208-
actual := New(env)
208+
actual, _ := New(env)
209209

210210
if actual.TCPPort != 8081 {
211211
t.Errorf("Want %d. got: %d", 8081, actual.TCPPort)
@@ -251,7 +251,7 @@ func Test_Timeouts(t *testing.T) {
251251
}
252252

253253
for _, testCase := range cases {
254-
actual := New(testCase.env)
254+
actual, _ := New(testCase.env)
255255
if testCase.readTimeout != actual.HTTPReadTimeout {
256256
t.Errorf("(%s) HTTPReadTimeout want: %s, got: %s", testCase.name, actual.HTTPReadTimeout, testCase.readTimeout)
257257
}

executor/http_runner.go

+12-16
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ func (f *HTTPFunctionRunner) Start() error {
6868

6969
<-sig
7070
cmd.Process.Signal(syscall.SIGTERM)
71-
7271
}()
7372

7473
err := cmd.Start()
@@ -100,7 +99,11 @@ func (f *HTTPFunctionRunner) Run(req FunctionRequest, contentLength int64, r *ht
10099
body = r.Body
101100
}
102101

103-
request, _ := http.NewRequest(r.Method, upstreamURL, body)
102+
request, err := http.NewRequest(r.Method, upstreamURL, body)
103+
if err != nil {
104+
return err
105+
}
106+
104107
for h := range r.Header {
105108
request.Header.Set(h, r.Header.Get(h))
106109
}
@@ -116,14 +119,11 @@ func (f *HTTPFunctionRunner) Run(req FunctionRequest, contentLength int64, r *ht
116119
} else {
117120
reqCtx = r.Context()
118121
cancel = func() {
119-
120122
}
121123
}
122-
123124
defer cancel()
124125

125126
res, err := f.Client.Do(request.WithContext(reqCtx))
126-
127127
if err != nil {
128128
log.Printf("Upstream HTTP request error: %s\n", err.Error())
129129

@@ -136,19 +136,15 @@ func (f *HTTPFunctionRunner) Run(req FunctionRequest, contentLength int64, r *ht
136136
return nil
137137
}
138138

139-
select {
140-
case <-reqCtx.Done():
141-
{
142-
if reqCtx.Err() != nil {
143-
// Error due to timeout / deadline
144-
log.Printf("Upstream HTTP killed due to exec_timeout: %s\n", f.ExecTimeout)
145-
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", time.Since(startedTime).Seconds()))
139+
<-reqCtx.Done()
146140

147-
w.WriteHeader(http.StatusGatewayTimeout)
148-
return nil
149-
}
141+
if reqCtx.Err() != nil {
142+
// Error due to timeout / deadline
143+
log.Printf("Upstream HTTP killed due to exec_timeout: %s\n", f.ExecTimeout)
144+
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", time.Since(startedTime).Seconds()))
150145

151-
}
146+
w.WriteHeader(http.StatusGatewayTimeout)
147+
return nil
152148
}
153149

154150
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", time.Since(startedTime).Seconds()))

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ go 1.13
55
require (
66
github.com/openfaas/faas-middleware v1.0.0
77
github.com/prometheus/client_golang v1.9.0
8+
github.com/prometheus/client_model v0.2.0 // indirect
89
)

0 commit comments

Comments
 (0)