From aaad42b350ea2ffdc18b1ffc0f4bee1e0194b1b6 Mon Sep 17 00:00:00 2001 From: crazyhzm Date: Mon, 8 Apr 2024 11:05:17 +0800 Subject: [PATCH] Support requeue delay msg --- internal/version/binary.go | 2 +- nsqd/channel.go | 4 +-- nsqdserver/http.go | 52 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 3 deletions(-) diff --git a/internal/version/binary.go b/internal/version/binary.go index 89721b42..6bc9e7ea 100644 --- a/internal/version/binary.go +++ b/internal/version/binary.go @@ -5,7 +5,7 @@ import ( "runtime" ) -const Binary = "0.3.7-HA.1.12.9" +const Binary = "0.3.7-HA.1.13.0" var ( Commit = "unset" diff --git a/nsqd/channel.go b/nsqd/channel.go index 2c75fc2c..82bd1db9 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -2663,7 +2663,7 @@ func (c *Channel) processInFlightQueue(tnow int64) (bool, bool) { if !c.IsConsumeDisabled() && !c.IsOrdered() && needPeekDelay && clientNum > 0 { - newAdded, cnt, err := c.peekAndReqDelayedMessages(tnow) + newAdded, cnt, err := c.PeekAndReqDelayedMessages(tnow) if err == nil { if newAdded > 0 && c.chLog.Level() >= levellogger.LOG_DEBUG { c.chLog.LogDebugf("channel delayed waiting peeked %v added %v new : %v", @@ -2709,7 +2709,7 @@ func (c *Channel) processInFlightQueue(tnow int64) (bool, bool) { return dirty, checkFast } -func (c *Channel) peekAndReqDelayedMessages(tnow int64) (int, int, error) { +func (c *Channel) PeekAndReqDelayedMessages(tnow int64) (int, int, error) { if c.IsEphemeral() { return 0, 0, nil } diff --git a/nsqdserver/http.go b/nsqdserver/http.go index f2907594..efc00e0c 100644 --- a/nsqdserver/http.go +++ b/nsqdserver/http.go @@ -86,6 +86,7 @@ func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer router.Handle("POST", "/channel/fixconfirmed", http_api.Decorate(s.doFixChannelConfirmed, log, http_api.V1)) router.Handle("POST", "/channel/finishmemdelayed", http_api.Decorate(s.doFinishMemDelayed, log, http_api.V1)) router.Handle("POST", "/channel/emptydelayed", http_api.Decorate(s.doEmptyChannelDelayed, log, http_api.V1)) + router.Handle("POST", "/channel/requeuedelayed", http_api.Decorate(s.doRequeueChannelDelayed, log, http_api.V1)) router.Handle("POST", "/channel/setoffset", http_api.Decorate(s.doSetChannelOffset, log, http_api.V1)) router.Handle("POST", "/channel/setorder", http_api.Decorate(s.doSetChannelOrder, log, http_api.V1)) router.Handle("POST", "/channel/setclientlimit", http_api.Decorate(s.doSetChannelClientLimit, log, http_api.V1)) @@ -224,6 +225,30 @@ func (s *httpServer) getExistingTopicChannelFromQuery(req *http.Request) (url.Va return reqParams, topic, channelName, err } +func (s *httpServer) getTimeOffsetFromQuery(req *http.Request) (int, error) { + reqParams, err := url.ParseQuery(req.URL.RawQuery) + + if err != nil { + nsqd.NsqLogger().LogErrorf("failed to parse request params - %s", err) + return 1, http_api.Err{400, "INVALID_REQUEST"} + } + + offsetStr := reqParams.Get("offset") + + if offsetStr == "" { + nsqd.NsqLogger().LogErrorf("The value of offset does not exist. Set the default value to 1") + return 1, nil + } + offset, err := strconv.Atoi(offsetStr) + + if err != nil { + nsqd.NsqLogger().LogErrorf("offset invalid - %s", err) + return 1, http_api.Err{400, "INVALID_REQUEST"} + } + + return offset, err +} + //TODO: will be refactored for further extension func getTag(reqParams url.Values) string { return reqParams.Get("tag") @@ -718,6 +743,33 @@ func (s *httpServer) doEmptyChannelDelayed(w http.ResponseWriter, req *http.Requ return nil, nil } +func (s *httpServer) doRequeueChannelDelayed(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { + _, topic, channelName, err := s.getExistingTopicChannelFromQuery(req) + if err != nil { + return nil, err + } + + offset, err := s.getTimeOffsetFromQuery(req) + + channel, err := topic.GetExistingChannel(channelName) + if err != nil { + return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"} + } + + if s.ctx.checkConsumeForMasterWrite(topic.GetTopicName(), topic.GetTopicPart()) { + _, _, err := channel.PeekAndReqDelayedMessages(time.Now().Add(time.Hour * time.Duration(offset)).UnixNano()) + if err != nil { + nsqd.NsqLogger().Logf("failed to requeue the channel %v delayed data: %v, by client:%v", + channelName, err, req.RemoteAddr) + } + } else { + nsqd.NsqLogger().LogDebugf("should request to master: %v, from %v", + topic.GetFullName(), req.RemoteAddr) + return nil, http_api.Err{400, FailedOnNotLeader} + } + return nil, nil +} + func (s *httpServer) doFixChannelConfirmed(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { _, topic, channelName, err := s.getExistingTopicChannelFromQuery(req) if err != nil {