Skip to content

Commit 7a84eaa

Browse files
Quest HenkartQuest Henkart
Quest Henkart
authored and
Quest Henkart
committed
init
0 parents  commit 7a84eaa

15 files changed

+1986
-0
lines changed

LICENSE.txt

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) [2020] [Quest Henkart]
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

README.md

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# gosqs
2+
3+
GoSQS serves as the messaging interface between AWS-SQS and AWS-SNS services. If there is an feature you would like implemented, please make a Pull Request
4+
5+
Please refer to the examples to see how to interact with this library. Please make contributions, and post any issues.
6+
7+
Take a look at the Dead Letter Queue and Naming your Queue as they are important for this design pattern
8+
9+
## TODO
10+
- create better documentation
11+
- implement internal testing package to avoid dependency
12+
- create a controller that scales the worker pool up automatically depending on the message count and then kills the workers to avoid costs
13+
14+
## Scaling the Consumers
15+
Each SQS consumer is ready to be scaled up as well as scaled out.
16+
17+
* Scaling Out: You can scale out by creating additional instances with no extra configuration required. The library and SQS is designed for multiple workers reaching into the same pool
18+
19+
* Scaling Up: Each consumer has a configuration variable `config.WorkerPool`. The default is set to `30`, that means there are 30 goroutines checking for messages at any given time. You can increase the amount of active threads simply by adjusting that number. Make sure to monitor CPU usage to find the right count for your application. For a local or dev environment. Reduce this number to 1 to save battery
20+
21+
## Configuring SNS
22+
configuring SNS is easy, simply login to the AWS-console, navigate to SNS. Click on Topics on the sidebar and "Create New Topic". Fill in the name and display name.
23+
* make sure to set the topic delivery policy to exponential back off
24+
25+
## Configuring SQS
26+
1. Navigate to aws-sqs
27+
2. Choose a queue Name and click on Standard Queue
28+
3. Click configure, apply optional configurations
29+
4. Hit Create Queue
30+
5. in the main page, click on the newly created queue
31+
6. Click on Queue Actions and at the bottom hit "Subscribe Queue to SNS Topic"
32+
7. Select the SNS topic from the dropdown provided
33+
34+
## Naming your Queue
35+
The naming convention for queues supported by this library follow the following syntax
36+
37+
<env>-<name>
38+
39+
## SQS Configurations
40+
41+
### Default Visibility Timeout
42+
The default visibility timeout is responsible for managing how often a single message gets received by a consumer. A message remains in the queue until it is deleted, and any receipt of the message that does not end in deletion before the Visibility Timeout is hit is considered a "failure". As a result, the Default Visibility Timeout should exceed the maximum possible amount of time it would take for a handler to process and delete a message.
43+
44+
The currently set default is 30 seconds
45+
46+
*note* The visibility timeout is extended for an individual message, for a maximum of 3 x the visibility timeout
47+
48+
### Message Retention Period
49+
The # of days that the Queue will hold on to an unconsumed message before deleting it. Since we will always be consuming, this value is not important, the default is 4 days
50+
51+
### Receive Message Wait Time
52+
The amount of time that the request will hang before returning 0 messages. This field is important as it allows us to use long-polling instead of short-polling. The default is 0 and it *should be set to the max 20 seconds to save on processing and cost*. AWS recommends using long polling over short polling
53+
54+
### Custom Attributes
55+
You can add custom attributes to your SQS implementation. These are fields that exist outside of the payload body. A common practice is to include a correlationId or some sort of trackingId to track a message
56+
57+
58+
### DEAD LETTER QUEUE CONFIGURATION
59+
The following settings activate an automatic reroute to the DLQ upon repetetive failure of message processing.
60+
* Redrive Policy must be checked
61+
* The Dead Letter Queue name must be provided (A DLQ is just a normal SQS)
62+
* Maximum Receives reflects the amount of times a message is received, but not deleted before it is requeued into the DLQ
63+
* *Including a DLQ is an absolute must, do not run a system without it our you will be vulnerable to Poison-Pill attacks*
64+
65+
## Consumer Configuration
66+
67+
### Custom Middleware
68+
You can add custom middleware to your consumer. These will run using the adapter method before each handler is called. You can include a logger or modify the context etc
69+
70+
## Testing
71+
You can set up a local SNS/SQS emulator using https://github.com/p4tin/goaws. Contributions have been added to this emulator specifically to support this library
72+
Tests also require this to be running, I will eventually set up a ci environment that runs the emulator in a container and runs the tests

adapters.go

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package gosqs
2+
3+
import (
4+
"context"
5+
)
6+
7+
const (
8+
dispatcherKey = contextKey("dispatcher")
9+
)
10+
11+
type contextKey string
12+
13+
// Handler provides a standardized handler method, this is the required function composition for event handlers
14+
type Handler func(context.Context, Message) error
15+
16+
// Adapter implements adapters in the context
17+
type Adapter func(Handler) Handler
18+
19+
// WithRecovery is an adapter that logs a Panic error and recovers the service from a failed state
20+
func WithRecovery(recovery func()) Adapter {
21+
return func(fn Handler) Handler {
22+
return func(ctx context.Context, m Message) error {
23+
defer recovery()
24+
25+
return fn(ctx, m)
26+
}
27+
}
28+
}
29+
30+
// WithMiddleware add middleware to the consumer service
31+
func WithMiddleware(f func(ctx context.Context, m Message) error) Adapter {
32+
return func(fn Handler) Handler {
33+
return func(ctx context.Context, m Message) error {
34+
f(ctx, m)
35+
36+
return fn(ctx, m)
37+
}
38+
}
39+
}
40+
41+
// WithDispatcher sets an adapter to support sending async messages
42+
func WithDispatcher(ctx context.Context, pub Publisher) context.Context {
43+
return context.WithValue(ctx, dispatcherKey, pub)
44+
}
45+
46+
// Dispatcher retrieves the sqs dispatcher from the context for sending messeges
47+
func Dispatcher(ctx context.Context) (Publisher, error) {
48+
if p, ok := ctx.Value(dispatcherKey).(Publisher); ok {
49+
return p, nil
50+
}
51+
52+
return nil, ErrUndefinedPublisher
53+
}
54+
55+
// MustDispatcher retrieves the sqs dispatcher from the context for sending messeges or panics if
56+
// the Dispatcher does not exist in the context
57+
func MustDispatcher(ctx context.Context) Publisher {
58+
if p, ok := ctx.Value(dispatcherKey).(Publisher); ok {
59+
return p
60+
}
61+
62+
panic(ErrUndefinedPublisher.Error())
63+
}

config.go

+132
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package gosqs
2+
3+
import (
4+
"strconv"
5+
6+
"github.com/aws/aws-sdk-go/aws"
7+
"github.com/aws/aws-sdk-go/aws/client"
8+
"github.com/aws/aws-sdk-go/aws/credentials"
9+
"github.com/aws/aws-sdk-go/aws/request"
10+
"github.com/aws/aws-sdk-go/aws/session"
11+
)
12+
13+
// Config defines the gosqs configuration
14+
type Config struct {
15+
// private key to access aws
16+
Key string
17+
// secret to access aws
18+
Secret string
19+
// region for aws and used for determining the topic ARN
20+
Region string
21+
// provided automatically by aws, but must be set for emulators or local testing
22+
Hostname string
23+
// account ID of the aws account, used for determining the topic ARN
24+
AWSAccountID string
25+
// environment name, used for determinig the topic ARN
26+
Env string
27+
// prefix of the topic, this is set as a prefix to the environment
28+
TopicPrefix string
29+
// optional address of the topic, if this is not provided it will be created using other variables
30+
TopicARN string
31+
// optional address of queue, if this is not provided it will be retrieved during setup
32+
QueueURL string
33+
// used to extend the allowed processing time of a message
34+
VisibilityTimeout int
35+
// used to determine how many attempts exponential backoff should use before logging an error
36+
RetryCount int
37+
// defines the total amount of goroutines that can be run by the consumer
38+
WorkerPool int
39+
// defines the total number of processing extensions that occur. Each proccessing extension will double the
40+
// visibilitytimeout counter, ensuring the handler has more time to process the message. Default is 2 extensions (1m30s processing time)
41+
// set to 0 to turn off extension processing
42+
ExtensionLimit *int
43+
44+
// Add custom attributes to the message. This might be a correlationId or client meta information
45+
// custom attributes will be viewable on the sqs dashboard as meta data
46+
Attributes []customAttribute
47+
48+
// Add a custom logger, the default will be log.Println
49+
Logger Logger
50+
}
51+
52+
// customAttribute add custom attributes to SNS and SQS messages. This can include correlationIds, or any additional information you would like
53+
// separate from the payload body. These attributes can be easily seen from the SQS console.
54+
type customAttribute struct {
55+
Title string
56+
// Use gosqs.DataTypeNumber or gosqs.DataTypeString
57+
DataType string
58+
// Value represents the value
59+
Value string
60+
}
61+
62+
// NewCustomAttribute adds a custom attribute to SNS and SQS messages. This can include correlationIds, logIds, or any additional information you would like
63+
// separate from the payload body. These attributes can be easily seen from the SQS console.
64+
//
65+
// must use gosqs.DataTypeNumber of gosqs.DataTypeString for the datatype, the value must match the type provided
66+
func (c *Config) NewCustomAttribute(dataType dataType, title string, value interface{}) error {
67+
if dataType == DataTypeNumber {
68+
val, ok := value.(int)
69+
if !ok {
70+
return ErrMarshal
71+
}
72+
73+
c.Attributes = append(c.Attributes, customAttribute{title, dataType.String(), strconv.Itoa(val)})
74+
return nil
75+
}
76+
77+
val, ok := value.(string)
78+
if !ok {
79+
return ErrMarshal
80+
}
81+
c.Attributes = append(c.Attributes, customAttribute{title, dataType.String(), val})
82+
return nil
83+
}
84+
85+
type dataType string
86+
87+
func (dt dataType) String() string {
88+
return string(dt)
89+
}
90+
91+
// DataTypeNumber represents the Number datatype, use it when creating custom attributes
92+
const DataTypeNumber = dataType("Number")
93+
94+
// DataTypeString represents the String datatype, use it when creating custom attributes
95+
const DataTypeString = dataType("String")
96+
97+
type retryer struct {
98+
client.DefaultRetryer
99+
retryCount int
100+
}
101+
102+
// MaxRetries sets the total exponential back off attempts to 10 retries
103+
func (r retryer) MaxRetries() int {
104+
if r.retryCount > 0 {
105+
return r.retryCount
106+
}
107+
108+
return 10
109+
}
110+
111+
// newSession creates a new aws session
112+
func newSession(c Config) (*session.Session, error) {
113+
//sets credentials
114+
creds := credentials.NewStaticCredentials(c.Key, c.Secret, "")
115+
_, err := creds.Get()
116+
if err != nil {
117+
return nil, ErrInvalidCreds.Context(err)
118+
}
119+
120+
r := &retryer{retryCount: c.RetryCount}
121+
122+
cfg := request.WithRetryer(aws.NewConfig().WithRegion(c.Region).WithCredentials(creds), r)
123+
124+
//if an optional hostname config is provided, then replace the default one
125+
//
126+
// This will set the default AWS URL to a hostname of your choice. Perfect for testing, or mocking functionality
127+
if c.Hostname != "" {
128+
cfg.Endpoint = &c.Hostname
129+
}
130+
131+
return session.NewSession(cfg)
132+
}

0 commit comments

Comments
 (0)