Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

📦 new(proposal:transport) a way to abstract away the rabbitmq #230

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion gbus/abstractions.go
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ import (
"time"

"github.com/sirupsen/logrus"

"github.com/streadway/amqp"
)

@@ -284,3 +283,25 @@ type Logged interface {
SetLogger(entry logrus.FieldLogger)
Log() logrus.FieldLogger
}

type Transport interface {
Messaging
Logged
Health

Start() error
Stop() error

RPCChannel() <-chan *BusMessage
MessageChannel() <-chan *BusMessage

ErrorChan() <-chan error
BackPressureChannel() <-chan bool

ListenOnEvent(exchange, topic string) error

Ack(message *BusMessage) error
Reject(message *BusMessage, requeue bool) error
}

type NewTransport func(svcName, connString, DLX string, prefetchCount, maxRetryCount uint, purgeOnStartup, withConfirms bool, logger logrus.FieldLogger) Transport
104 changes: 12 additions & 92 deletions gbus/messages.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
package gbus

import (
"errors"
"fmt"
"strings"

"github.com/opentracing/opentracing-go/log"
"github.com/rs/xid"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)

//BusMessage the structure that gets sent to the underlying transport
type BusMessage struct {
ID string
IdempotencyKey string
CorrelationID string
SagaID string
SagaCorrelationID string
Semantics Semantics /*cmd or evt*/
Payload Message
PayloadFQN string
RPCID string
ID string
IdempotencyKey string
CorrelationID string
SagaID string
SagaCorrelationID string
Semantics Semantics /*cmd or evt*/
Payload Message
PayloadFQN string
RPCID string
ResurrectedFromDeath bool
RawPayload []byte
ContentType string
}

//NewBusMessage factory method for creating a BusMessage that wraps the given payload
@@ -34,65 +33,6 @@ func NewBusMessage(payload Message) *BusMessage {
return bm
}

//NewFromDelivery creates a BusMessage from an amqp delivery
func NewFromDelivery(delivery amqp.Delivery) (*BusMessage, error) {
bm := &BusMessage{}
bm.SetFromAMQPHeaders(delivery)

bm.ID = delivery.MessageId
bm.CorrelationID = delivery.CorrelationId
if delivery.Exchange != "" {
bm.Semantics = EVT
} else {
bm.Semantics = CMD
}
if bm.PayloadFQN == "" || bm.Semantics == "" {
errMsg := fmt.Sprintf("missing critical headers. message_name:%s semantics: %s", bm.PayloadFQN, bm.Semantics)
return nil, errors.New(errMsg)
}
return bm, nil
}

//GetMessageName extracts the valuee of the custom x-msg-name header from an amq delivery
func GetMessageName(delivery amqp.Delivery) string {
return castToString(delivery.Headers["x-msg-name"])
}

func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) {
headers = amqp.Table{
"x-msg-name": bm.Payload.SchemaName(),
"x-idempotency-key": bm.IdempotencyKey,
}

/*
only set the following headers if they contain a value
https://github.com/wework/grabbit/issues/221
*/
setNonEmpty(headers, "x-msg-saga-id", bm.SagaID)
setNonEmpty(headers, "x-msg-saga-correlation-id", bm.SagaCorrelationID)
setNonEmpty(headers, "x-grabbit-msg-rpc-id", bm.RPCID)

return
}

func setNonEmpty(headers amqp.Table, headerName, headerValue string) {

if headerValue != "" {
headers[headerName] = headerValue
}
}

//SetFromAMQPHeaders convert from AMQP headers Table everything but a payload
func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery) {
headers := delivery.Headers
bm.IdempotencyKey = castToString(headers["x-idempotency-key"])
bm.SagaID = castToString(headers["x-msg-saga-id"])
bm.SagaCorrelationID = castToString(headers["x-msg-saga-correlation-id"])
bm.RPCID = castToString(headers["x-grabbit-msg-rpc-id"])
bm.PayloadFQN = GetMessageName(delivery)

}

//SetPayload sets the payload and makes sure that Name is saved
func (bm *BusMessage) SetPayload(payload Message) {
bm.PayloadFQN = payload.SchemaName()
@@ -121,21 +61,6 @@ func (bm *BusMessage) GetTraceLog() (fields []log.Field) {
log.String("RPCID", bm.RPCID),
}
}

func GetDeliveryLogEntries(delivery amqp.Delivery) logrus.Fields {

return logrus.Fields{
"message_name": castToString(delivery.Headers["x-msg-name"]),
"message_id": delivery.MessageId,
"routing_key": delivery.RoutingKey,
"exchange": delivery.Exchange,
"idempotency_key": castToString(delivery.Headers["x-idempotency-key"]),
"correlation_id": castToString(delivery.CorrelationId),
"rpc_id": castToString(delivery.Headers["x-grabbit-msg-rpc-id"]),
}

}

func castToString(i interface{}) string {
v, ok := i.(string)
if !ok {
@@ -155,8 +80,3 @@ type SagaTimeoutMessage struct {
func (SagaTimeoutMessage) SchemaName() string {
return "grabbit.timeout"
}

func isResurrectedMessage(delivery amqp.Delivery) bool {
isResurrected, ok := delivery.Headers[ResurrectedHeaderName].(bool)
return ok && isResurrected
}
Loading