Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
- [#1938](https://github.com/influxdata/kapacitor/issues/1938): pagerduty2 should use routingKey rather than serviceKey
- [#1982](https://github.com/influxdata/kapacitor/pull/1982): Fix KafkaTopic not working from TICKscript
- [#1989](https://github.com/influxdata/kapacitor/pull/1989): Improve Kafka alert throughput.
- [#2062](https://github.com/influxdata/kapacitor/pull/2062): Fix alerta node hang by adding http timeout and fix time.Duration convert when define topic handler with httppost.

## v1.5.0 [2018-05-17]

### Features

- [#1842](https://github.com/influxdata/kapacitor/pull/1842): Add alert inhibitors that allow an alert to suppress events from other matching alerts.
- [#1833](https://github.com/influxdata/kapacitor/pull/1833): Config format updated to allow for more than one slack configuration.
- [#1833](https://github.com/influxdata/kapacitor/pull/1833): Config format updated to allow for more than one slack configuration.
- [#1844](https://github.com/influxdata/kapacitor/pull/1844): Added a new kapacitor node changeDetect that emits a value
for each time a series field changes.
- [#1828](https://github.com/influxdata/kapacitor/pull/1828): Add recoverable field to JSON alert response to indicate whether the
Expand Down Expand Up @@ -1337,5 +1338,3 @@ Some bug fixes including one that cause Kapacitor to deadlock.
### Release Notes

Major public release.


3 changes: 3 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,9 @@ type AlertaHandler struct {
// Alerta timeout.
// Default: 24h
Timeout time.Duration `json:"timeout"`

// HTTP Post timeout
PostTimeout time.Duration `json:"post-timeout"`
}

// List of effected services.
Expand Down
2 changes: 2 additions & 0 deletions services/alerta/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Config struct {
Origin string `toml:"origin" override:"origin"`
// Optional timeout, can be overridden per alert.
Timeout toml.Duration `toml:"timeout" override:"timeout"`
// Optional http post-timeout, can be overridden per alert.
PostTimeout toml.Duration `toml:"post-timeout" override:"post-timeout"`
}

func NewConfig() Config {
Expand Down
28 changes: 23 additions & 5 deletions services/alerta/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package alerta

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
Expand All @@ -24,6 +25,7 @@ const (
defaultEvent = "{{ .ID }}"
defaultGroup = "{{ .Group }}"
defaultTimeout = time.Duration(24 * time.Hour)
defaultPostTimeout = time.Duration(5 * time.Second)
defaultTokenPrefix = "Bearer"
)

Expand Down Expand Up @@ -64,6 +66,7 @@ type testOptions struct {
Origin string `json:"origin"`
Service []string `json:"service"`
Timeout string `json:"timeout"`
PostTimeout string `json:"post-timeout"`
}

func (s *Service) TestOptions() interface{} {
Expand All @@ -79,6 +82,7 @@ func (s *Service) TestOptions() interface{} {
Origin: c.Origin,
Service: []string{"testServiceA", "testServiceB"},
Timeout: "24h0m0s",
PostTimeout: "10s",
}
}

Expand All @@ -89,6 +93,7 @@ func (s *Service) Test(options interface{}) error {
}
c := s.config()
timeout, _ := time.ParseDuration(o.Timeout)
postTimeout, _ := time.ParseDuration(o.PostTimeout)
return s.Alert(
c.Token,
c.TokenPrefix,
Expand All @@ -102,6 +107,7 @@ func (s *Service) Test(options interface{}) error {
o.Origin,
o.Service,
timeout,
postTimeout,
map[string]string{},
models.Result{},
)
Expand Down Expand Up @@ -138,7 +144,7 @@ func (s *Service) Update(newConfig []interface{}) error {
return nil
}

func (s *Service) Alert(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, timeout time.Duration, tags map[string]string, data models.Result) error {
func (s *Service) Alert(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, timeout time.Duration, postTimeout time.Duration, tags map[string]string, data models.Result) error {
if resource == "" || event == "" {
return errors.New("Resource and Event are required to send an alert")
}
Expand All @@ -148,6 +154,12 @@ func (s *Service) Alert(token, tokenPrefix, resource, event, environment, severi
return err
}

if postTimeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), postTimeout)
defer cancel()
req = req.WithContext(ctx)
}

client := s.clientValue.Load().(*http.Client)
resp, err := client.Do(req)
if err != nil {
Expand Down Expand Up @@ -286,6 +298,10 @@ type HandlerConfig struct {
// Alerta timeout.
// Default: 24h
Timeout time.Duration `mapstructure:"timeout"`

// HTTP Post timeout.
// Default 5s
PostTimeout time.Duration `mapstructure:"post-timeout"`
}

type handler struct {
Expand All @@ -303,10 +319,11 @@ type handler struct {

func (s *Service) DefaultHandlerConfig() HandlerConfig {
return HandlerConfig{
Resource: defaultResource,
Event: defaultEvent,
Group: defaultGroup,
Timeout: defaultTimeout,
Resource: defaultResource,
Event: defaultEvent,
Group: defaultGroup,
Timeout: defaultTimeout,
PostTimeout: defaultPostTimeout,
}
}

Expand Down Expand Up @@ -464,6 +481,7 @@ func (h *handler) Handle(event alert.Event) {
h.c.Origin,
service,
h.c.Timeout,
h.c.PostTimeout,
event.Data.Tags,
event.Data.Result,
); err != nil {
Expand Down
18 changes: 16 additions & 2 deletions services/httppost/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"context"

"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/pkg/errors"
Expand Down Expand Up @@ -237,7 +238,8 @@ type HandlerConfig struct {
Endpoint string `mapstructure:"endpoint"`
Headers map[string]string `mapstructure:"headers"`
CaptureResponse bool `mapstructure:"capture-response"`
Timeout time.Duration `mapstructure:"timeout"`
Timeout time.Duration `mapstructure:"-"`
TimeoutRaw string `mapstructure:"timeout" json:""`
}

type handler struct {
Expand All @@ -260,14 +262,26 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) alert.Handler {
if !ok {
e = NewEndpoint(c.URL, nil, BasicAuth{}, nil, nil)
}
return &handler{
h := handler{
s: s,
endpoint: e,
diag: s.diag.WithContext(ctx...),
headers: c.Headers,
captureResponse: c.CaptureResponse,
timeout: c.Timeout,
}
if c.Timeout != 0 {
h.timeout = c.Timeout
}
if c.TimeoutRaw != "" {
parsedTimeout, err := time.ParseDuration(c.TimeoutRaw)
if err != nil {
s.diag.Error("can't parse timeout value, ignored: ", err)
} else {
h.timeout = parsedTimeout
}
}
return &h
}

func (h *handler) NewHTTPRequest(body io.Reader) (req *http.Request, err error) {
Expand Down