44 "context"
55 "sync"
66
7- "github.com/SumoLogic/sumologic-lambda-extensions/lambda-extensions/utils"
8-
97 cfg "github.com/SumoLogic/sumologic-lambda-extensions/lambda-extensions/config"
108 sumocli "github.com/SumoLogic/sumologic-lambda-extensions/lambda-extensions/sumoclient"
119
@@ -14,8 +12,8 @@ import (
1412
1513// TaskConsumer exposing methods every consmumer should implement
1614type TaskConsumer interface {
17- FlushDataQueue ()
18- DrainQueue (context.Context , int64 ) int
15+ FlushDataQueue (context. Context )
16+ DrainQueue (context.Context ) int
1917}
2018
2119// sumoConsumer to drain log from dataQueue
@@ -37,23 +35,35 @@ func NewTaskConsumer(consumerQueue chan []byte, config *cfg.LambdaExtensionConfi
3735}
3836
3937// FlushDataQueue drains the dataqueue commpletely
40- func (sc * sumoConsumer ) FlushDataQueue () {
41- var rawMsgArr [][]byte
42- Loop:
43- for {
44- //Receives block when the buffer is empty.
45- select {
46- case rawmsg := <- sc .dataQueue :
47- rawMsgArr = append (rawMsgArr , rawmsg )
48- default :
49- err := sc .sumoclient .FlushAll (rawMsgArr )
50- if err != nil {
51- sc .logger .Debugln ("Unable to flush DataQueue" , err .Error ())
52- // TODO: raise alert if flush fails
38+ func (sc * sumoConsumer ) FlushDataQueue (ctx context.Context ) {
39+ if sc .config .EnableFailover {
40+ var rawMsgArr [][]byte
41+ Loop:
42+ for {
43+ //Receives block when the buffer is empty.
44+ select {
45+ case rawmsg := <- sc .dataQueue :
46+ rawMsgArr = append (rawMsgArr , rawmsg )
47+ default :
48+ err := sc .sumoclient .FlushAll (rawMsgArr )
49+ if err != nil {
50+ sc .logger .Errorln ("Unable to flush DataQueue" , err .Error ())
51+ // putting back all the msg to the queue in case of failure
52+ for _ , msg := range rawMsgArr {
53+ sc .dataQueue <- msg
54+ }
55+ // TODO: raise alert if flush fails
56+ }
57+ close (sc .dataQueue )
58+ sc .logger .Debugf ("DataQueue completely drained" )
59+ break Loop
5360 }
54- close (sc .dataQueue )
55- sc .logger .Debugf ("DataQueue completely drained" )
56- break Loop
61+ }
62+ } else {
63+ // calling drainqueue (during shutdown) if failover is not enabled
64+ maxCallsNeededForCompleteDraining := (len (sc .dataQueue ) / sc .config .MaxConcurrentRequests ) + 1
65+ for i := 0 ; i < maxCallsNeededForCompleteDraining ; i ++ {
66+ sc .DrainQueue (ctx )
5767 }
5868 }
5969
@@ -64,17 +74,19 @@ func (sc *sumoConsumer) consumeTask(ctx context.Context, wg *sync.WaitGroup, raw
6474 err := sc .sumoclient .SendLogs (ctx , rawmsg )
6575 if err != nil {
6676 sc .logger .Error ("Error during Send Logs to Sumo Logic." , err .Error ())
77+ // putting back the msg to the queue in case of failure
78+ sc .dataQueue <- rawmsg
6779 // TODO: raise alert if send logs fails
6880 }
6981 return
7082}
7183
72- func (sc * sumoConsumer ) DrainQueue (ctx context.Context , deadtimems int64 ) int {
84+ func (sc * sumoConsumer ) DrainQueue (ctx context.Context ) int {
7385 wg := new (sync.WaitGroup )
7486 //sc.logger.Debug("Consuming data from dataQueue")
7587 counter := 0
7688Loop:
77- for i := 0 ; i < sc .config .MaxConcurrentRequests && len (sc .dataQueue ) != 0 && utils . IsTimeRemaining ( deadtimems ) ; i ++ {
89+ for i := 0 ; i < sc .config .MaxConcurrentRequests && len (sc .dataQueue ) != 0 ; i ++ {
7890 //Receives block when the buffer is empty.
7991 select {
8092 case rawmsg := <- sc .dataQueue :
0 commit comments