Skip to content
This repository has been archived by the owner on Sep 5, 2024. It is now read-only.

Commit

Permalink
use only loki.StopNow to avoid deadlocks (#52)
Browse files Browse the repository at this point in the history
* use only loki.StopNow to avoid deadlocks

* check the URL manually, since we can't stop Promtail in some cases
  • Loading branch information
skudasov authored Dec 7, 2023
1 parent 64c16b6 commit 44e3f13
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 15 deletions.
24 changes: 16 additions & 8 deletions loki_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package wasp
import (
"encoding/json"
"fmt"
"net/http"
"os"
"time"

Expand Down Expand Up @@ -38,14 +39,17 @@ func (m *LokiLogWrapper) SetClient(c *LokiClient) {
}

func (m *LokiLogWrapper) Log(kvars ...interface{}) error {
if len(m.errors) > m.MaxErrors {
return nil
}
if len(kvars) < 13 {
log.Error().
Interface("Line", kvars).
Msg("Malformed promtail log message, skipping")
return nil
}
if _, ok := kvars[13].(error); ok {
if kvars[13].(error) != nil {
if kvars[13] != nil {
if _, ok := kvars[13].(error); ok {
m.errors = append(m.errors, kvars[13].(error))
log.Error().
Interface("Status", kvars[9]).
Expand Down Expand Up @@ -86,9 +90,9 @@ func (m *LokiClient) HandleStruct(ls model.LabelSet, t time.Time, st interface{}
return m.Handle(ls, t, string(d))
}

// Stop stops the client goroutine
func (m *LokiClient) Stop() {
m.Client.Stop()
// StopNow stops the client goroutine
func (m *LokiClient) StopNow() {
m.Client.StopNow()
}

// LokiConfig is simplified subset of a Promtail client configuration
Expand Down Expand Up @@ -131,8 +135,8 @@ func NewEnvLokiConfig() *LokiConfig {
URL: os.Getenv("LOKI_URL"),
Token: os.Getenv("LOKI_TOKEN"),
BasicAuth: os.Getenv("LOKI_BASIC_AUTH"),
MaxErrors: 10,
BatchWait: 5 * time.Second,
MaxErrors: 5,
BatchWait: 3 * time.Second,
BatchSize: 500 * 1024,
Timeout: 20 * time.Second,
DropRateLimitedBatches: false,
Expand All @@ -145,8 +149,12 @@ func NewEnvLokiConfig() *LokiConfig {

// NewLokiClient creates a new Promtail client
func NewLokiClient(extCfg *LokiConfig) (*LokiClient, error) {
_, err := http.Get(extCfg.URL)
if err != nil {
return nil, err
}
serverURL := dskit.URLValue{}
err := serverURL.Set(extCfg.URL)
err = serverURL.Set(extCfg.URL)
if err != nil {
return nil, err
}
Expand Down
17 changes: 12 additions & 5 deletions loki_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ func lokiLogTupleMsg() []interface{} {
return logMsg
}

func TestSmokeLokiErrors(t *testing.T) {
func TestSmokeLokiFailIfURLIsNotResolving(t *testing.T) {
cfg := NewEnvLokiConfig()
cfg.URL = "http://nothing:3000"
_, err := NewLokiClient(cfg)
require.Error(t, err)
}

func TestLokiErrors(t *testing.T) {
type testcase struct {
name string
maxErrors int
Expand All @@ -42,11 +49,11 @@ func TestSmokeLokiErrors(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
lc, err := NewLokiClient(&LokiConfig{
MaxErrors: tc.maxErrors,
})
defer lc.StopNow()
cfg := NewEnvLokiConfig()
cfg.MaxErrors = tc.maxErrors
lc, err := NewLokiClient(cfg)
require.NoError(t, err)
defer lc.StopNow()
_ = lc.logWrapper.Log(lokiLogTupleMsg()...)
_ = lc.logWrapper.Log(lokiLogTupleMsg()...)
q := struct {
Expand Down
3 changes: 2 additions & 1 deletion perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
"runtime"

"fmt"
"github.com/pyroscope-io/client/pyroscope"
"net/http/httptest"

"github.com/pyroscope-io/client/pyroscope"
)

/* This tests can also be used as a performance validation of a tool itself or as a dashboard data filler */
Expand Down
2 changes: 1 addition & 1 deletion wasp.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ func (g *Generator) Stats() *Stats {
func (g *Generator) stopLokiStream() {
if g.cfg.LokiConfig != nil && g.cfg.LokiConfig.URL != "" {
g.Log.Info().Msg("Stopping Loki")
g.loki.Stop()
g.loki.StopNow()
g.Log.Info().Msg("Loki exited")
}
}
Expand Down

0 comments on commit 44e3f13

Please sign in to comment.