Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add PublishToLocal for health check #686

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion integration-tests/amqp_amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func TestAmqpAmqp(t *testing.T) {
})

worker := server.(*machinery.Server).NewWorker("test_worker", 0)
defer worker.Quit()
go worker.Launch()
testAll(server, t)
worker.Quit()

testPubslishToLocal(server, t)
}
4 changes: 3 additions & 1 deletion integration-tests/amqp_memcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func TestAmqpMemcache(t *testing.T) {
})

worker := server.(*machinery.Server).NewWorker("test_worker", 0)
defer worker.Quit()
go worker.Launch()
testAll(server, t)
worker.Quit()

testPubslishToLocal(server, t)
}
4 changes: 3 additions & 1 deletion integration-tests/amqp_mongodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func TestAmqpMongodb(t *testing.T) {
})

worker := server.(*machinery.Server).NewWorker("test_worker", 0)
defer worker.Quit()
go worker.Launch()
testAll(server, t)
worker.Quit()

testPubslishToLocal(server, t)
}
4 changes: 3 additions & 1 deletion integration-tests/amqp_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func TestAmqpRedis(t *testing.T) {
})

worker := server.(*machinery.Server).NewWorker("test_worker", 0)
defer worker.Quit()
go worker.Launch()
testAll(server, t)
worker.Quit()

testPubslishToLocal(server, t)
}
5 changes: 4 additions & 1 deletion integration-tests/gcppubsub_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ func TestGCPPubSubRedis(t *testing.T) {
})

worker := server.(*machinery.Server).NewWorker("test_worker", 0)
defer worker.Quit()
go worker.Launch()
testAll(server, t)
worker.Quit()

// not supported
// testPubslishToLocal(server, t)
}
4 changes: 3 additions & 1 deletion integration-tests/redis_memcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ func TestRedisMemcache(t *testing.T) {
})

worker := server.(*machinery.Server).NewWorker("test_worker", 0)
defer worker.Quit()
go worker.Launch()
testAll(server, t)
worker.Quit()

testPubslishToLocal(server, t)
}
4 changes: 3 additions & 1 deletion integration-tests/redis_mongodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ func TestRedisMongodb(t *testing.T) {
})

worker := server.(*machinery.Server).NewWorker("test_worker", 0)
defer worker.Quit()
go worker.Launch()
testAll(server, t)
worker.Quit()

testPubslishToLocal(server, t)
}
4 changes: 3 additions & 1 deletion integration-tests/redis_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ func TestRedisRedis_Redigo(t *testing.T) {
})

worker := server.(*machinery.Server).NewWorker("test_worker", 0)
defer worker.Quit()
go worker.Launch()
testAll(server, t)
worker.Quit()

testPubslishToLocal(server, t)
}

func TestRedisRedisNormalTaskPollPeriodLessThan1SecondShouldNotFailNextTask(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion integration-tests/redis_socket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ func TestRedisSocket(t *testing.T) {
})

worker := server.(*machinery.Server).NewWorker("test_worker", 0)
defer worker.Quit()
go worker.Launch()
testAll(server, t)
worker.Quit()

testPubslishToLocal(server, t)
}
4 changes: 3 additions & 1 deletion integration-tests/sqs_amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func TestSQSAmqp(t *testing.T) {
})

worker := server.(*machinery.Server).NewWorker("test_worker", 0)
defer worker.Quit()
go worker.Launch()
testAll(server, t)
worker.Quit()

testPubslishToLocal(server, t)
}
2 changes: 2 additions & 0 deletions integration-tests/sqs_mongodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ func TestSQSMongodb(t *testing.T) {
go worker.Launch()
testAll(server, t)
worker.Quit()

testPubslishToLocal(server, t)
}
98 changes: 98 additions & 0 deletions integration-tests/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package integration_test
import (
"context"
"errors"
"fmt"
"log"
"reflect"
"sort"
"strings"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/RichardKnop/machinery/v1"
Expand All @@ -29,6 +32,7 @@ type Server interface {
GetBroker() brokersiface.Broker
GetConfig() *config.Config
RegisterTasks(namedTaskFuncs map[string]interface{}) error
RegisterTask(name string, taskFunc interface{}) error
SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*result.AsyncResult, error)
SendTask(signature *tasks.Signature) (*result.AsyncResult, error)
SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error)
Expand All @@ -37,6 +41,7 @@ type Server interface {
SendGroup(group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)
SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
SendChord(chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
NewWorker(consumerTag string, concurrency int) *machinery.Worker
}

func testAll(server Server, t *testing.T) {
Expand Down Expand Up @@ -340,6 +345,99 @@ func testDelay(server Server, t *testing.T) {
}
}

func testPubslishToLocal(oldServer Server, t *testing.T) {
tag := "health_check_tag"
config := oldServer.GetConfig()
config.DefaultQueue = tag
if config.AMQP != nil {
config.AMQP.BindingKey = tag
}
server, err := machinery.NewServer(config)
if err != nil {
t.Fatal(err)
}

healthCheckTaskName := "health-check"
healthCheckCompleteChan := make(chan string, 1)
// RegisterTask health check task
err = server.RegisterTask(healthCheckTaskName, func(healthCheckUUID string) error {
select {
case healthCheckCompleteChan <- healthCheckUUID: // success and send uuid
return nil
case <-time.After(5 * time.Second):
return fmt.Errorf("send health check result error: %v", healthCheckUUID)
}
})
assert.Nil(t, err)

// start worker with concurrency 1
worker := server.NewWorker(tag, 1)
worker.Queue = tag
go worker.Launch()
time.Sleep(1 * time.Second) // ensure worker start

// check health: send message to local worker; wait `taskExecutionTimeout` until the task is completed
checkHealth := func(consumerTag string, taskExecutionTimeout time.Duration) error {
// clear channel
select {
case <-healthCheckCompleteChan:
default:
}

broker := server.GetBroker()
healthCheckUUID, err := uuid.NewUUID()
if err != nil {
return err
}
if err := broker.PublishToLocal(consumerTag, &tasks.Signature{
UUID: healthCheckUUID.String(),
Name: healthCheckTaskName,
Args: []tasks.Arg{
{Type: "string", Value: healthCheckUUID.String()},
},
}, 5*time.Second); err != nil {
return err
}

// wait for task execution success
select {
case successUUID := <-healthCheckCompleteChan:
if successUUID == healthCheckUUID.String() {
return nil
}
case <-time.After(taskExecutionTimeout):
}
return fmt.Errorf("health check execution fail: %v", healthCheckUUID.String())
}
// trigger `checkHealth`
err = checkHealth(tag, 5*time.Second)
assert.Nil(t, err)

// Simulation of worker being stuck
if err := server.RegisterTask("sleep-ten-seconds", func() error {
time.Sleep(10 * time.Second)
return nil
}); err != nil {
t.Fatal(err)
}
if _, err = server.SendTask(&tasks.Signature{
Name: "sleep-ten-seconds",
}); err != nil {
t.Fatal(err)
}
time.Sleep(3 * time.Second) // ensure sleep-ten-seconds running
// checkHealth fail
err = checkHealth(tag, 5*time.Second)
assert.True(t, strings.HasPrefix(err.Error(), "health check execution fail: "))
time.Sleep(6 * time.Second) // ensure queue is empty and last health check task executed
// checkHealth success
err = checkHealth(tag, 5*time.Second)
assert.Nil(t, err)

// stop worker
worker.Quit()
}

func registerTestTasks(server Server) {

tasks := map[string]interface{}{
Expand Down
83 changes: 57 additions & 26 deletions v1/brokers/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ type Broker struct {
common.AMQPConnector
processingWG sync.WaitGroup // use wait group to make sure task processing completes on interrupt signal

connections map[string]*AMQPConnection
connectionsMutex sync.RWMutex
connections map[string]*AMQPConnection
connectionsMutex sync.RWMutex
localDeliveries map[string]chan amqp.Delivery
localDeliveriesMutex sync.RWMutex
}

// New creates new Broker instance
func New(cnf *config.Config) iface.Broker {
return &Broker{Broker: common.NewBroker(cnf), AMQPConnector: common.AMQPConnector{}, connections: make(map[string]*AMQPConnection)}
return &Broker{Broker: common.NewBroker(cnf), AMQPConnector: common.AMQPConnector{}, connections: make(map[string]*AMQPConnection), localDeliveries: make(map[string]chan amqp.Delivery)}
}

// StartConsuming enters a loop and waits for incoming messages
Expand Down Expand Up @@ -92,10 +94,14 @@ func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcess
if err != nil {
return b.GetRetry(), fmt.Errorf("Queue consume error: %s", err)
}
localDeliveries := make(chan amqp.Delivery, 1)
b.localDeliveriesMutex.Lock()
b.localDeliveries[consumerTag] = localDeliveries
b.localDeliveriesMutex.Unlock()

log.INFO.Print("[*] Waiting for messages. To exit press CTRL+C")

if err := b.consume(deliveries, concurrency, taskProcessor, amqpCloseChan); err != nil {
if err := b.consume(deliveries, concurrency, taskProcessor, amqpCloseChan, localDeliveries); err != nil {
return b.GetRetry(), err
}

Expand Down Expand Up @@ -251,7 +257,7 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error

// consume takes delivered messages from the channel and manages a worker pool
// to process tasks concurrently
func (b *Broker) consume(deliveries <-chan amqp.Delivery, concurrency int, taskProcessor iface.TaskProcessor, amqpCloseChan <-chan *amqp.Error) error {
func (b *Broker) consume(deliveries <-chan amqp.Delivery, concurrency int, taskProcessor iface.TaskProcessor, amqpCloseChan <-chan *amqp.Error, healthCheckDeliveries chan amqp.Delivery) error {
pool := make(chan struct{}, concurrency)

// initialize worker pool with maxWorkers workers
Expand All @@ -266,36 +272,42 @@ func (b *Broker) consume(deliveries <-chan amqp.Delivery, concurrency int, taskP
// a worker, that is, it avoids a possible deadlock
errorsChan := make(chan error, 1)

consumeDelivery := func(d amqp.Delivery) {
if concurrency > 0 {
// get worker from pool (blocks until one is available)
<-pool
}

b.processingWG.Add(1)

// Consume the task inside a gotourine so multiple tasks
// can be processed concurrently
go func() {
if err := b.consumeOne(d, taskProcessor, true); err != nil {
errorsChan <- err
}

b.processingWG.Done()

if concurrency > 0 {
// give worker back to pool
pool <- struct{}{}
}
}()
}

for {
select {
case amqpErr := <-amqpCloseChan:
return amqpErr
case err := <-errorsChan:
return err
case d := <-deliveries:
if concurrency > 0 {
// get worker from pool (blocks until one is available)
<-pool
}

b.processingWG.Add(1)

// Consume the task inside a gotourine so multiple tasks
// can be processed concurrently
go func() {
if err := b.consumeOne(d, taskProcessor, true); err != nil {
errorsChan <- err
}

b.processingWG.Done()

if concurrency > 0 {
// give worker back to pool
pool <- struct{}{}
}
}()
consumeDelivery(d)
case <-b.GetStopChan():
return nil
case d := <-healthCheckDeliveries:
consumeDelivery(d)
}
}
}
Expand Down Expand Up @@ -490,3 +502,22 @@ func (b *Broker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {

return dumper.Signatures, nil
}

func (b *Broker) PublishToLocal(consumerTag string, sig *tasks.Signature, blockTimeout time.Duration) error {
b.localDeliveriesMutex.RLock()
deliveries, ok := b.localDeliveries[consumerTag]
b.localDeliveriesMutex.RUnlock()
if !ok {
return fmt.Errorf("no such consumerTag: %v", consumerTag)
}
msg, err := json.Marshal(sig)
if err != nil {
return fmt.Errorf("JSON marshal error: %s", err)
}
select {
case deliveries <- amqp.Delivery{Body: msg}:
return nil
case <-time.After(blockTimeout):
return fmt.Errorf("health check: %v queue is full.", consumerTag)
}
}
Loading