Skip to content

Commit 581132d

Browse files
authored
Merge pull request #11 from SumoLogic/hpal_SUMO-175893_delay_fix
Delay Fix in Lambda Extension
2 parents c628825 + a500448 commit 581132d

File tree

9 files changed

+163
-32
lines changed

9 files changed

+163
-32
lines changed

README.md

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ The Sumo Logic lambda extension is available as an AWS public Layer. The latest
1313

1414
For x86_64 use:
1515

16-
arn:aws:lambda:<AWS_REGION>:956882708938:layer:sumologic-extension-amd64:2
16+
arn:aws:lambda:<AWS_REGION>:956882708938:layer:sumologic-extension-amd64:3
1717

1818
For arm64 use:
1919

20-
arn:aws:lambda:<AWS_REGION>:956882708938:layer:sumologic-extension-arm64:2
20+
arn:aws:lambda:<AWS_REGION>:956882708938:layer:sumologic-extension-arm64:3
2121

2222

2323
- AWS_REGION - Replace with your AWS Lambda Region.
@@ -60,3 +60,31 @@ For Full Change Log, please visit [Releases](https://github.com/SumoLogic/sumolo
6060
[github-release-badge]: https://img.shields.io/github/release/sumologic/sumologic-lambda-extensions/all.svg?label=release
6161

6262
[github-release]: https://github.com/sumologic/sumologic-lambda-extensions/releases/latest
63+
64+
65+
## Compiling
66+
67+
`env GOOS=darwin go build -o "sumologic-extension" "lambda-extensions/sumologic-extension.go"`
68+
69+
70+
## Building
71+
This script assumes you have aws cli already configured.
72+
73+
- Go to scripts folder
74+
- Export Profile export AWS_PROFILE=<sumo content profile>
75+
- Change the layer_name variable in zip.sh to avoid replacing the prod.
76+
- Run below command
77+
`sh zip.sh`
78+
79+
## Testing
80+
81+
1> Unit Testing locally
82+
83+
- Go to root folder and run "go test ./..."
84+
85+
- Go to lambda-extensions folder and run "go test ./..."
86+
87+
2> Testing with Lambda function
88+
89+
Add the layer arn generated from build command output to your lambda function by following instructions in [docs](https://help.sumologic.com/03Send-Data/Collect-from-Other-Data-Sources/Collect_AWS_Lambda_Logs_using_an_Extension).
90+

lambda-extensions/config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type LambdaExtensionConfig struct {
3737
SourceCategoryOverride string
3838
}
3939

40-
var validLogTypes = []string{"platform", "function", "extension"}
40+
var validLogTypes = []string{"platform", "function"}
4141

4242
// GetConfig to get config instance
4343
func GetConfig() (*LambdaExtensionConfig, error) {

lambda-extensions/lambdaapi/logsapiclient.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ const (
1212
logsURL = "2020-08-15/logs"
1313
// Subscription Body Constants. Subscribe to platform logs and receive them on ${local_ip}:4243 via HTTP protocol.
1414
timeoutMs = 1000
15-
maxBytes = 262144
16-
maxItems = 1000
15+
maxBytes = 1048576
16+
maxItems = 10000
1717
receiverPort = 4243
1818
)
1919

@@ -22,9 +22,10 @@ func (client *Client) SubscribeToLogsAPI(ctx context.Context, logEvents []string
2222
URL := client.baseURL + logsURL
2323

2424
reqBody, err := json.Marshal(map[string]interface{}{
25-
"destination": map[string]interface{}{"protocol": "HTTP", "URI": fmt.Sprintf("http://sandbox:%v", receiverPort)},
26-
"types": logEvents,
27-
"buffering": map[string]interface{}{"timeoutMs": timeoutMs, "maxBytes": maxBytes, "maxItems": maxItems},
25+
"destination": map[string]interface{}{"protocol": "HTTP", "URI": fmt.Sprintf("http://sandbox:%v", receiverPort)},
26+
"types": logEvents,
27+
"buffering": map[string]interface{}{"timeoutMs": timeoutMs, "maxBytes": maxBytes, "maxItems": maxItems},
28+
"schemaVersion": "2021-03-18",
2829
})
2930
if err != nil {
3031
return nil, err

lambda-extensions/sumoclient/sumoclient.go

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ var isColdStart = true
2323
// LogSender interface which needs to be implemented to send logs
2424
type LogSender interface {
2525
SendLogs(context.Context, []byte) error
26+
SendAllLogs(context.Context, [][]byte) error
2627
FlushAll([][]byte) error
2728
}
2829

@@ -279,8 +280,61 @@ func (s *sumoLogicClient) SendLogs(ctx context.Context, rawmsg []byte) error {
279280
return nil
280281
}
281282

283+
func (s *sumoLogicClient) SendAllLogs(ctx context.Context, allMessages [][]byte) error {
284+
if (len(allMessages) == 0) {
285+
s.logger.Debugf("SendAllLogs: No messages to send")
286+
return nil
287+
}
288+
289+
s.logger.Debugf("SendAllLogs: Attempting to send %d payloads from dataqueue to SumoLogic", len(allMessages))
290+
291+
var errorCount int = 0
292+
var totalitems int = 0
293+
var payload responseBody
294+
for _, rawmsg := range allMessages {
295+
// converting to arr of maps
296+
msgArr, err := s.transformBytesToArrayOfMap(rawmsg)
297+
if err != nil {
298+
s.logger.Error("SendAllLogs: Error in transforming bytes to array of struct", err.Error())
299+
errorCount++
300+
continue
301+
}
302+
303+
if len(msgArr) > 0 {
304+
// enhancing logs
305+
s.enhanceLogs(msgArr)
306+
totalitems += len(msgArr)
307+
// converting back to string
308+
for _, item := range msgArr {
309+
payload = append(payload, item)
310+
}
311+
}
312+
}
313+
s.logger.Debugf("SendAllLogs: Enhanced TotalLogItems - %d \n", totalitems)
314+
// converting back to chunks of string
315+
chunks, err := s.createChunks(payload)
316+
if err != nil {
317+
return fmt.Errorf("SendAllLogs: CreateChunks failed - %v", err)
318+
}
319+
for _, strobj := range chunks {
320+
err := s.postToSumo(ctx, &strobj)
321+
if err != nil {
322+
errorCount++
323+
}
324+
}
325+
if errorCount > 0 {
326+
err = fmt.Errorf("SendAllLogs: Errors during postToSumo - %d", errorCount)
327+
return err
328+
} else {
329+
s.logger.Debugf("SendAllLogs: Sent TotalChunks - %d \n", totalitems)
330+
}
331+
332+
return nil
333+
}
334+
282335
func (s *sumoLogicClient) postToSumo(ctx context.Context, logStringToSend *string) error {
283-
s.logger.Debug("Attempting to send to Sumo Endpoint")
336+
337+
s.logger.Debug("postToSumo: Attempting to send to Sumo Endpoint")
284338

285339
// compressing here because Sumo recommends payload size of 1MB before compression
286340
bytedata := utils.Compress(logStringToSend)
@@ -295,39 +349,39 @@ func (s *sumoLogicClient) postToSumo(ctx context.Context, logStringToSend *strin
295349
defer response.Body.Close()
296350
}
297351
if (err != nil) || (response.StatusCode != 200 && response.StatusCode != 302 && response.StatusCode < 500) {
298-
s.logger.Errorf("Not able to post statuscode: %v %v\n", err, response)
352+
s.logger.Errorf("postToSumo: Not able to post statuscode - %v %v\n", err, response)
299353
err := utils.Retry(func(attempt int) (bool, error) {
300-
s.logger.Debugf("Waiting for %v ms for retry attempt: %v\n", s.config.RetrySleepTime, attempt)
354+
s.logger.Debugf("postToSumo: Waiting for %v ms for retry attempt - %v\n", s.config.RetrySleepTime, attempt)
301355
time.Sleep(s.config.RetrySleepTime)
302356
buf := createBuffer()
303357
retryResponse, errRetry := s.makeRequest(ctx, buf)
304358
if (errRetry != nil) || (retryResponse.StatusCode != 200 && retryResponse.StatusCode != 302 && retryResponse.StatusCode < 500) {
305359
if errRetry == nil {
306360
errRetry = fmt.Errorf("statuscode %v", retryResponse.StatusCode)
307361
}
308-
s.logger.Error("Not able to post: ", errRetry)
362+
s.logger.Error("postToSumo: Not able to post - ", errRetry)
309363
return attempt < s.config.MaxRetryAttempts, errRetry
310364
} else if retryResponse.StatusCode == 200 {
311-
s.logger.Debugf("Post of logs successful after retry %v attempts\n", attempt)
365+
s.logger.Debugf("postToSumo: Post of logs successful after retry %v attempts\n", attempt)
312366
return true, nil
313367
}
314368
return attempt < s.config.MaxRetryAttempts, errRetry
315369
}, s.config.NumRetry)
316370
if err != nil {
317-
s.logger.Error("Finished retrying Error: ", err)
371+
s.logger.Error("postToSumo: Finished retrying Error - ", err)
318372
if s.config.EnableFailover {
319373
buf = createBuffer()
320374
err := s.failoverHandler(buf)
321375
if err != nil {
322-
s.logger.Errorf("Dropping messages as post to S3 failed: %v\n", err)
376+
s.logger.Errorf("postToSumo: Dropping messages as post to S3 failed - %v\n", err)
323377
return err
324378
}
325379
} else {
326-
s.logger.Info("Dropping messages as no failover enabled.")
380+
s.logger.Info("postToSumo: Dropping messages as no failover enabled.")
327381
}
328382
}
329383
} else if response.StatusCode == 200 {
330-
s.logger.Debugf("Post of logs successful")
384+
s.logger.Debugf("postToSumo: Post of logs successful")
331385
}
332386

333387
return nil

lambda-extensions/sumoclient/sumoclient_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ func TestSumoClient(t *testing.T) {
119119
var reportLogs = []byte(`[{"record":{"metrics":{"billedDurationMs":120000,"durationMs":122066.85,"maxMemoryUsedMB":74,"memorySizeMB":128},"requestId":"fcea12d9-e0b4-43b2-a9a2-04d04519539f"},"time":"2020-11-02T20:33:16.536Z","type":"platform.report"}]`)
120120
assertEqual(t, client.SendLogs(ctx, reportLogs), nil, "SendLogs should not generate error")
121121

122+
t.Log("\ntesting SendAllLogs\n======================")
123+
assertEqual(t, client.SendAllLogs(ctx, multiplelargedata), nil, "SendAllLogs should not generate error")
122124
//Todo remove this function from sumologic-extension
123125
// t.Log("\ntesting sumo if no s3 failover\n=================")
124126
// config.EnableFailover = false

lambda-extensions/sumologic-extension.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"os"
66
"os/signal"
77
"path/filepath"
8+
"runtime"
89
"syscall"
910

1011
"github.com/SumoLogic/sumologic-lambda-extensions/lambda-extensions/utils"
@@ -22,12 +23,18 @@ var (
2223
extensionClient = lambdaapi.NewClient(os.Getenv("AWS_LAMBDA_RUNTIME_API"), extensionName)
2324
logger = logrus.New().WithField("Name", extensionName)
2425
)
26+
2527
var producer workers.TaskProducer
2628
var consumer workers.TaskConsumer
2729
var config *cfg.LambdaExtensionConfig
2830
var dataQueue chan []byte
2931

3032
func init() {
33+
Formatter := new(logrus.TextFormatter)
34+
Formatter.TimestampFormat = "2006-01-02T15:04:05.999999999Z07:00"
35+
Formatter.FullTimestamp = true
36+
logger.Logger.SetFormatter(Formatter)
37+
3138
logger.Logger.SetOutput(os.Stdout)
3239

3340
// Creating config and performing validation
@@ -89,14 +96,27 @@ func processEvents(ctx context.Context) {
8996
logger.Error("Error during Registration: ", err.Error())
9097
return
9198
}
99+
92100
// The For loop will continue till we recieve a shutdown event.
93101
for {
94102
select {
95103
case <-ctx.Done():
96104
consumer.FlushDataQueue(ctx)
97105
return
98106
default:
99-
go consumer.DrainQueue(ctx)
107+
logger.Infof("Calling DrainQueue from processEvents")
108+
for {
109+
runtime_done := consumer.DrainQueue(ctx)
110+
111+
if runtime_done == 1 {
112+
logger.Infof("Exiting DrainQueueLoop: Runtime is Done")
113+
break
114+
} else {
115+
logger.Debugf("switching to other go routine")
116+
runtime.Gosched()
117+
}
118+
}
119+
100120
// This statement will freeze lambda
101121
nextResponse, err := nextEvent(ctx)
102122
if err != nil {
@@ -106,7 +126,7 @@ func processEvents(ctx context.Context) {
106126
// Next invoke will start from here
107127
logger.Infof("Received Next Event as %s", nextResponse.EventType)
108128
if nextResponse.EventType == lambdaapi.Shutdown {
109-
consumer.FlushDataQueue(ctx)
129+
consumer.DrainQueue(ctx)
110130
return
111131
}
112132
}

lambda-extensions/workers/consumer.go

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package workers
22

33
import (
44
"context"
5+
"fmt"
6+
"strings"
57
"sync"
68

79
cfg "github.com/SumoLogic/sumologic-lambda-extensions/lambda-extensions/config"
@@ -10,6 +12,13 @@ import (
1012
"github.com/sirupsen/logrus"
1113
)
1214

15+
type SubEventType string
16+
17+
const (
18+
// RuntimeDone event is sent when lambda function is finished it's execution
19+
RuntimeDone SubEventType = "platform.runtimeDone"
20+
)
21+
1322
// TaskConsumer exposing methods every consmumer should implement
1423
type TaskConsumer interface {
1524
FlushDataQueue(context.Context)
@@ -82,24 +91,38 @@ func (sc *sumoConsumer) consumeTask(ctx context.Context, wg *sync.WaitGroup, raw
8291
}
8392

8493
func (sc *sumoConsumer) DrainQueue(ctx context.Context) int {
85-
wg := new(sync.WaitGroup)
8694
//sc.logger.Debug("Consuming data from dataQueue")
87-
counter := 0
95+
96+
var rawMsgArr [][]byte
97+
var logsStr string = ""
98+
var runtime_done int = 0
8899
Loop:
89-
for i := 0; i < sc.config.MaxConcurrentRequests && len(sc.dataQueue) != 0; i++ {
100+
for {
90101
//Receives block when the buffer is empty.
91102
select {
92103
case rawmsg := <-sc.dataQueue:
93-
counter++
94-
wg.Add(1)
95-
go sc.consumeTask(ctx, wg, rawmsg)
104+
rawMsgArr = append(rawMsgArr, rawmsg)
105+
logsStr = fmt.Sprintf("%s", rawmsg)
106+
sc.logger.Debugf("DrainQueue: logsStr: %s", logsStr)
107+
if strings.Contains(logsStr, string(RuntimeDone)) {
108+
runtime_done = 1
109+
}
110+
96111
default:
97-
sc.logger.Debugf("DataQueue completely drained")
112+
err := sc.sumoclient.SendAllLogs(ctx, rawMsgArr)
113+
if err != nil {
114+
sc.logger.Errorln("Unable to flush DataQueue", err.Error())
115+
// putting back all the msg to the queue in case of failure
116+
for _, msg := range rawMsgArr {
117+
sc.dataQueue <- msg
118+
}
119+
// TODO: raise alert if flush fails
120+
} else {
121+
sc.logger.Debugf("DrainQueue: DataQueue completely drained")
122+
}
98123
break Loop
99124
}
100-
101125
}
102-
//sc.logger.Debugf("Waiting for %d consumer to finish their tasks", counter)
103-
wg.Wait()
104-
return counter
126+
sc.logger.Debugf("DrainQueue: Runtime done or not? %d", runtime_done)
127+
return runtime_done
105128
}

lambda-extensions/workers/producer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,17 @@ func (httpServer *httpServer) logsHandler(writer http.ResponseWriter, request *h
4848
}
4949
switch request.Method {
5050
case "POST":
51+
defer request.Body.Close()
5152
reqBody, err := ioutil.ReadAll(request.Body)
5253
if err != nil {
5354
// TODO: raise alert if read fails
5455
httpServer.logger.Error("Read from Logs API failed: ", err.Error())
5556
}
56-
httpServer.logger.Debug("Producing data into dataQueue")
57+
58+
httpServer.logger.Debugf("Producing data into dataQueue - %d \n", len(reqBody))
5759
payload := []byte(reqBody)
5860
// Sends to a buffered channel block only when the buffer is full
5961
httpServer.dataQueue <- payload
62+
writer.WriteHeader(http.StatusOK)
6063
}
6164
}

scripts/zip.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ for arch in "${ARCHITECTURES[@]}"; do
8989
echo "Layer Arn: arn:aws:lambda:${region}:<accountId>:layer:${layer_name}:${layer_version} deployed to Region ${region}"
9090

9191
echo "Setting public permissions for layer version: ${layer_version}"
92-
aws lambda add-layer-version-permission --layer-name ${layer_name} --statement-id ${layer_name}-prod --version-number $layer_version --principal '*' --action lambda:GetLayerVersion --region ${region}
92+
aws lambda add-layer-version-permission --layer-name ${layer_name} --statement-id ${layer_name}-prod --version-number $layer_version --principal '*' --action lambda:GetLayerVersion --region ${region} --profile ${AWS_PROFILE}
9393
# aws lambda add-layer-version-permission --layer-name ${layer_name} --statement-id ${layer_name}-dev --version-number ${layer_version} --principal '956882708938' --action lambda:GetLayerVersion --region ${region}
9494
done
9595

0 commit comments

Comments
 (0)