From fd6c7e66340e3f2a00450936741d0bfc2ee62e23 Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Thu, 21 Nov 2019 13:29:02 -0800 Subject: [PATCH] Revert "Always unsubscribe from order events in SetupOrderStream" --- CHANGELOG.md | 7 ------- cmd/mesh/rpc_handler.go | 10 ++++------ rpc/service.go | 1 + 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a0a0ce74..8a3e5220c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,13 +2,6 @@ This changelog is a work in progress and may contain notes for versions which have not actually been released. Check the [Releases](https://github.com/0xProject/0x-mesh/releases) page to see full release notes and more information about the latest released versions. -## v6.1.1-beta - -### Bug fixes 🐞 - -- Fixed a bug where the internal order event feed could become blocked, rendering Mesh unable to receive any new orders or update existing ones ([#552](https://github.com/0xProject/0x-mesh/pull/552)). - - ## v6.1.0-beta ### Features ✅ diff --git a/cmd/mesh/rpc_handler.go b/cmd/mesh/rpc_handler.go index b7462584e..50af30ef5 100644 --- a/cmd/mesh/rpc_handler.go +++ b/cmd/mesh/rpc_handler.go @@ -22,10 +22,6 @@ import ( log "github.com/sirupsen/logrus" ) -// orderEventsBufferSize is the buffer size for the orderEvents channel. If -// the buffer is full, any additional events won't be processed. -const orderEventsBufferSize = 8000 - type rpcHandler struct { app *core.App } @@ -213,9 +209,8 @@ func SetupOrderStream(ctx context.Context, app *core.App) (*ethrpc.Subscription, rpcSub := notifier.CreateSubscription() go func() { - orderEventsChan := make(chan []*zeroex.OrderEvent, orderEventsBufferSize) + orderEventsChan := make(chan []*zeroex.OrderEvent) orderWatcherSub := app.SubscribeToOrderEvents(orderEventsChan) - defer orderWatcherSub.Unsubscribe() for { select { @@ -242,6 +237,7 @@ func SetupOrderStream(ctx context.Context, app *core.App) (*ethrpc.Subscription, // error. if _, ok := err.(*net.OpError); ok { logEntry.Trace(message) + orderWatcherSub.Unsubscribe() return } if strings.Contains(err.Error(), "write: broken pipe") { @@ -253,11 +249,13 @@ func SetupOrderStream(ctx context.Context, app *core.App) (*ethrpc.Subscription, case err := <-rpcSub.Err(): if err != nil { log.WithField("err", err).Error("rpcSub returned an error") + orderWatcherSub.Unsubscribe() } else { log.Debug("rpcSub was closed without error") } return case <-notifier.Closed(): + orderWatcherSub.Unsubscribe() return } } diff --git a/rpc/service.go b/rpc/service.go index c4864bb11..a6aae3547 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -114,6 +114,7 @@ func SetupHeartbeat(ctx context.Context) (*ethrpc.Subscription, error) { // Wait MinCleanupInterval before emitting the next heartbeat. time.Sleep(minHeartbeatInterval - time.Since(start)) + } }()