diff --git a/src/js.c b/src/js.c index 81406726..5a02ef64 100644 --- a/src/js.c +++ b/src/js.c @@ -212,6 +212,10 @@ js_unmarshalResponse(jsApiResponse *ar, nats_JSON **new_json, natsMsg *resp) memset(ar, 0, sizeof(jsApiResponse)); + // Server can return zero length response + if (resp->dataLen == 0) + return NATS_OK; + s = nats_JSONParse(&json, natsMsg_GetData(resp), natsMsg_GetDataLength(resp)); if (s != NATS_OK) return NATS_UPDATE_ERR_STACK(s); @@ -547,7 +551,7 @@ js_PublishMsg(jsPubAck **new_puback,jsCtx *js, natsMsg *msg, IFOK_JSR(s, natsConnection_RequestMsg(&resp, js->nc, msg, ttl)); if (s == NATS_OK) s = js_unmarshalResponse(&ar, &json, resp); - if (s == NATS_OK) + if (s == NATS_OK && json != NULL) { if (js_apiResponseIsErr(&ar)) { @@ -569,6 +573,8 @@ js_PublishMsg(jsPubAck **new_puback,jsCtx *js, natsMsg *msg, IFOK(s, nats_JSONGetULong(json, "seq", &(pa->Sequence))); IFOK(s, nats_JSONGetBool(json, "duplicate", &(pa->Duplicate))); IFOK(s, nats_JSONGetStr(json, "domain", &(pa->Domain))); + IFOK(s, nats_JSONGetStr(json, "batch", &(pa->Batch))); + IFOK(s, nats_JSONGetULong(json, "count", &(pa->Count))); if (s == NATS_OK) *new_puback = pa; @@ -579,6 +585,9 @@ js_PublishMsg(jsPubAck **new_puback,jsCtx *js, natsMsg *msg, js_freeApiRespContent(&ar); nats_JSONDestroy(json); } + else if ((s == NATS_OK) && (new_puback != NULL)) + *new_puback = NULL; + natsMsg_Destroy(resp); return NATS_UPDATE_ERR_STACK(s); } @@ -591,6 +600,7 @@ jsPubAck_Destroy(jsPubAck *pa) NATS_FREE(pa->Stream); NATS_FREE(pa->Domain); + NATS_FREE(pa->Batch); NATS_FREE(pa); } @@ -637,10 +647,15 @@ _parsePubAck(natsMsg *msg, jsPubAck *pa, jsPubAckErr *pae, char *errTxt, size_t else if (pa != NULL) { memset(pa, 0, sizeof(jsPubAck)); - s = nats_JSONGetStr(json, "stream", &(pa->Stream)); - IFOK(s, nats_JSONGetULong(json, "seq", &(pa->Sequence))); - IFOK(s, nats_JSONGetBool(json, "duplicate", &(pa->Duplicate))); - IFOK(s, nats_JSONGetStr(json, "domain", &(pa->Domain))); + if (json != NULL) + { + s = nats_JSONGetStr(json, "stream", &(pa->Stream)); + IFOK(s, nats_JSONGetULong(json, "seq", &(pa->Sequence))); + IFOK(s, nats_JSONGetBool(json, "duplicate", &(pa->Duplicate))); + IFOK(s, nats_JSONGetStr(json, "domain", &(pa->Domain))); + IFOK(s, nats_JSONGetStr(json, "batch", &(pa->Batch))); + IFOK(s, nats_JSONGetULong(json, "count", &(pa->Count))); + } } js_freeApiRespContent(&ar); @@ -3673,3 +3688,97 @@ js_setOnReleasedCb(jsCtx *js, js_onReleaseCb cb, void *arg) js->onReleaseCbArg = arg; js_unlock(js); } + +natsStatus +js_BatchPublishAdd(jsPubAck **new_puback, jsAtomicBatchCtx *ctx, natsMsg *msg, + jsPubOptions *opts, jsErrCode *errCode) +{ + natsStatus s = NATS_OK; + char temp[64] = {'\0'}; + + if ((ctx == NULL) || (msg == NULL)) + return nats_setDefaultError(NATS_INVALID_ARG); + + natsMutex_Lock(ctx->mu); + + s = natsMsgHeader_Set(msg, jsNatsBatchIdHdr, ctx->id); + if (s == NATS_OK) + { + if (snprintf(temp, sizeof(temp), "%" PRIu64, ctx->count+1) < 1) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + s = natsMsgHeader_Set(msg, jsNatsBatchSequenceHdr, temp); + } + if (s == NATS_OK) + { + ctx->count++; + } + + IFOK(s, js_PublishMsg(new_puback, ctx->js, msg, opts, errCode)); + + natsMutex_Unlock(ctx->mu); + return NATS_UPDATE_ERR_STACK(s); +} + +natsStatus +js_BatchPublishCommit(jsPubAck **new_puback, jsAtomicBatchCtx *ctx, natsMsg *msg, + jsPubOptions *opts, jsErrCode *errCode) +{ + natsStatus s = NATS_OK; + + if ((ctx == NULL) || (msg == NULL)) + return nats_setDefaultError(NATS_INVALID_ARG); + + s = natsMsgHeader_Set(msg, jsNatsBatchCommit, "1"); + IFOK(s, js_BatchPublishAdd(new_puback, ctx, msg, opts, errCode)); + + return NATS_UPDATE_ERR_STACK(s); +} + +natsStatus +js_BatchPublishStart(jsAtomicBatchCtx **ctx, jsPubAck **new_puback, jsCtx *js, + natsMsg *msg, jsPubOptions *opts, jsErrCode *errCode) +{ + jsAtomicBatchCtx *batchCtx = NULL; + natsStatus s = NATS_OK; + + if ((ctx == NULL) || (js == NULL) || (msg == NULL)) + return nats_setDefaultError(NATS_INVALID_ARG); + + batchCtx = NATS_CALLOC(1, sizeof(jsAtomicBatchCtx)); + if (batchCtx == NULL) + return nats_setDefaultError(NATS_NO_MEMORY); + batchCtx->id = NATS_MALLOC(NUID_BUFFER_LEN + 1); + if (batchCtx->id == NULL) + { + NATS_FREE(batchCtx); + return nats_setDefaultError(NATS_NO_MEMORY); + } + + js_retain(js); + batchCtx->js = js; + s = natsMutex_Create(&batchCtx->mu); + IFOK(s, natsNUID_Next(batchCtx->id, NUID_BUFFER_LEN + 1)); + if (s != NATS_OK) + { + jsAtomicBatchCtx_Destroy(batchCtx); + return NATS_UPDATE_ERR_STACK(s); + } + + *ctx = batchCtx; + s = js_BatchPublishAdd(new_puback, *ctx, msg, opts, errCode); + + return NATS_UPDATE_ERR_STACK(s); +} + +void +jsAtomicBatchCtx_Destroy(jsAtomicBatchCtx *ctx) +{ + if (ctx == NULL) + return; + + js_release(ctx->js); + natsMutex_Destroy(ctx->mu); + NATS_FREE(ctx->id); + NATS_FREE(ctx); +} diff --git a/src/js.h b/src/js.h index 208fd3b5..3a18a201 100644 --- a/src/js.h +++ b/src/js.h @@ -42,6 +42,9 @@ extern const int64_t jsDefaultRequestWait; #define jsExpectedLastMsgIdHdr "Nats-Expected-Last-Msg-Id" #define jsConsumerStalledHdr "Nats-Consumer-Stalled" #define jsConsumerPinIDHdr "Nats-Pin-Id" +#define jsNatsBatchIdHdr "Nats-Batch-Id" +#define jsNatsBatchSequenceHdr "Nats-Batch-Sequence" +#define jsNatsBatchCommit "Nats-Batch-Commit" #define jsErrStreamNameRequired "stream name is required" #define jsErrConsumerNameRequired "consumer name is required" diff --git a/src/jsm.c b/src/jsm.c index 4e1cc157..9d96b942 100644 --- a/src/jsm.c +++ b/src/jsm.c @@ -791,6 +791,7 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig IFOK(s, nats_JSONGetLong(jcfg, "subject_delete_marker_ttl", &(cfg->SubjectDeleteMarkerTTL))); IFOK(s, nats_JSONGetBool(jcfg, "allow_msg_ttl", &(cfg->AllowMsgTTL))); IFOK(s, _unmarshalPersistModeType(jcfg, &(cfg->PersistMode))); + IFOK(s, nats_JSONGetBool(jcfg, "allow_atomic", &(cfg->AllowAtomic))); if (s == NATS_OK) *new_cfg = cfg; @@ -912,6 +913,8 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg) IFOK(s, natsBuf_Append(buf, ",\"mirror_direct\":true", -1)); if ((s == NATS_OK) && cfg->DiscardNewPerSubject) IFOK(s, natsBuf_Append(buf, ",\"discard_new_per_subject\":true", -1)); + if ((s == NATS_OK) && cfg->AllowAtomic) + IFOK(s, natsBuf_Append(buf, ",\"allow_atomic\":true", -1)); IFOK(s, nats_marshalMetadata(buf, true, "metadata", &(cfg->Metadata))); IFOK(s, _marshalStorageCompression(cfg->Compression, buf)); diff --git a/src/nats.h b/src/nats.h index 7774cf63..25acc396 100644 --- a/src/nats.h +++ b/src/nats.h @@ -280,6 +280,11 @@ typedef struct natsMetadata */ typedef struct __jsCtx jsCtx; +/** + * The JetStream atomic batch context. Use for JetStream atomic batch publish. + */ +typedef struct __jsAtomicBatchCtx jsAtomicBatchCtx; + /** * JetStream publish options. * @@ -644,6 +649,8 @@ typedef struct jsStreamConfig { /// v2.12.0 or later. jsPersistModeType PersistMode; + /// @brief Enables sending atomic batch publishing into the stream + bool AllowAtomic; } jsStreamConfig; /** @@ -1227,6 +1234,8 @@ typedef struct jsPubAck uint64_t Sequence; char *Domain; bool Duplicate; + char *Batch; + uint64_t Count; } jsPubAck; @@ -7543,6 +7552,70 @@ js_PublishAsyncComplete(jsCtx *js, jsPubOptions *opts); NATS_EXTERN natsStatus js_PublishAsyncGetPendingList(natsMsgList *pending, jsCtx *js); +/** \brief Starts an atomic batch publish. + * + * This call initializes an atomic batch publish and sends the first message. + * + * \note The returned context must be destroyed with #jsAtomicBatchCtx_Destroy + * after the publish is committed or aborted. + * \note The returned #jsPubAck object needs to be destroyed with #jsPubAck_Destroy + * when no longer needed. + * + * @param ctx Where to store the atomic batch context for subsequent publishes. + * @param new_puback Where to store the pub ack for the first message, or `NULL` if not needed. + * @param js the pointer to the #jsCtx object. + * @param msg the message to publish as part of this batch. + * @param opts the publish options, possibly `NULL`. + * @param errCode the location where to store the JetStream specific error code, possibly `NULL`. + */ +NATS_EXTERN natsStatus +js_BatchPublishStart(jsAtomicBatchCtx **ctx, jsPubAck **new_puback, jsCtx *js, + natsMsg *msg, jsPubOptions *opts, jsErrCode *errCode); + +/** \brief Adds a message to the batch publish. + * + * This call adds a message to the batch publish initialized by #js_BatchPublishStart. + * + * \note The returned #jsPubAck object needs to be destroyed with #jsPubAck_Destroy. + * + * @param new_puback Where to store the pub ack for this message, or `NULL` if not needed. + * @param ctx the atomic batch context returned by #js_BatchPublishStart. + * @param msg the message to publish as part of this batch. + * @param opts the publish options, possibly `NULL`. + * @param errCode the location where to store the JetStream specific error code, possibly `NULL`. + */ +NATS_EXTERN natsStatus +js_BatchPublishAdd(jsPubAck **new_puback, jsAtomicBatchCtx *ctx, natsMsg *msg, + jsPubOptions *opts, jsErrCode *errCode); + +/** \brief Commits the batch publish. + * + * This call commits the batch publish initialized by #js_BatchPublishStart + * and added to by #js_BatchPublishAdd. + * + * \note The returned #jsPubAck object needs to be destroyed with #jsPubAck_Destroy. + * \note After this call, the context should be destroyed with #jsAtomicBatchCtx_Destroy. + * + * @param new_puback Where to store the pub ack for the commit, or `NULL` if not needed. + * @param ctx the atomic batch context returned by #js_BatchPublishStart. + * @param msg the message to publish as part of this batch. + * @param opts the publish options, possibly `NULL`. + * @param errCode the location where to store the JetStream specific error code, possibly `NULL`. + */ +NATS_EXTERN natsStatus +js_BatchPublishCommit(jsPubAck **new_puback, jsAtomicBatchCtx *ctx, natsMsg *msg, + jsPubOptions *opts, jsErrCode *errCode); + + +/** \brief Destroys the atomic batch context object. + * + * Releases memory allocated for this atomic batch context object. + * + * @param ctx the pointer to the #jsAtomicBatchCtx object. + */ +NATS_EXTERN void +jsAtomicBatchCtx_Destroy(jsAtomicBatchCtx *ctx); + /** @} */ // end of jsPubGroup /** \defgroup jsSubGroup Subscribing diff --git a/src/natsp.h b/src/natsp.h index 7cfc8438..90e557a0 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -403,6 +403,15 @@ struct __jsCtx void *onReleaseCbArg; }; +struct __jsAtomicBatchCtx +{ + natsMutex *mu; + char *id; + jsCtx *js; + uint64_t count; + +}; + typedef struct __jsFetch { struct jsOptionsPullSubscribeAsync opts; diff --git a/src/status.h b/src/status.h index 3233f3c1..550e2024 100644 --- a/src/status.h +++ b/src/status.h @@ -306,6 +306,13 @@ typedef enum { JSMessageTTLInvalidErr = 10165, ///< Invalid per-message TTL JSMessageTTLDisabledErr = 10166, ///< Per-message TTL is disabled JSStreamTooManyRequestsErr = 10167, ///< Too many requests + JSBatchPublishNotEnabledErr = 10174, ///< Batch publish not enabled on stream + JSBatchPublishIncompleteErr = 10176, ///< Batch publish is incomplete and was abandoned + JSBatchPublishInvalidIDErr = 10179, ///< Batch publish ID is invalid (exceeds 64 characters) + JSBatchPublishSequenceMissingErr = 10175, ///< Batch publish sequence is missing + JSBatchPublishSequenceExceedsLimitErr = 10199, ///< Batch publish sequence exceeds server limit (default 1000) + JSBatchPublishUnsupportedHeaderErr = 10177, ///< Batch publish unsupported header used (Nats-Expected-Last-Msg-Id) + JSBatchPublishDuplicateMessageIDErr = 10201, ///< Batch publish contains duplicate message id (Nats-Msg-Id) } jsErrCode; diff --git a/test/list_test.txt b/test/list_test.txt index d7897819..ef7994a8 100644 --- a/test/list_test.txt +++ b/test/list_test.txt @@ -82,6 +82,7 @@ _test(IsClosed) _test(IsReconnectingAndStatus) _test(IsValidSubscriber) _test(JetStream_GH823) +_test(JetStreamAtomicBatchPublish) _test(JetStreamBackOffRedeliveries) _test(JetStreamClusterInfo) _test(JetStreamContext) diff --git a/test/test.c b/test/test.c index 98d772ec..da3b3a30 100644 --- a/test/test.c +++ b/test/test.c @@ -24536,6 +24536,7 @@ void test_JetStreamMarshalStreamConfig(void) .InactiveThreshold = 1000, .MaxAckPending = 99, }; + sc.AllowAtomic = true; test("Marshal stream config: "); s = js_marshalStreamConfig(&buf, &sc); @@ -24619,6 +24620,7 @@ void test_JetStreamMarshalStreamConfig(void) && (rsc->ConsumerLimits.InactiveThreshold == 1000) && (rsc->ConsumerLimits.MaxAckPending == 99) && (rsc->PersistMode == js_PersistAsync) + && (rsc->AllowAtomic == true) ); js_destroyStreamConfig(rsc); rsc = NULL; @@ -40345,6 +40347,182 @@ void test_JetStreamPrioritizedPullConsumer(void) _destroyDefaultThreadArgs(&arg); } +void test_JetStreamAtomicBatchPublish(void) +{ + natsStatus s = NATS_OK; + natsMsg *msg = NULL, *msg2 = NULL; + jsAtomicBatchCtx *batch_ctx = NULL, *ctx2 = NULL; + jsStreamConfig sc; + jsPubAck *new_puback = NULL, *puback2 = NULL; + + const uint64_t msg_count = 98; + + JS_SETUP(2, 12, 0); + + test("Initialize Stream: "); + s = jsStreamConfig_Init(&sc); + if (s == NATS_OK) + { + sc.AllowAtomic = true; + sc.Name = "foo"; + } + IFOK(s, js_AddStream(NULL, js, &sc, NULL, NULL)); + testCond(s == NATS_OK); + + test ("Create message: "); + s = natsMsg_Create(&msg, "foo", NULL, "hello", 5); + testCond(s == NATS_OK); + + test("Create batch (bad args): "); + s = js_BatchPublishStart(NULL, NULL, js, msg, NULL, NULL); + if (s == NATS_INVALID_ARG) + s = js_BatchPublishStart(&batch_ctx, NULL, NULL, msg, NULL, NULL); + if (s == NATS_INVALID_ARG) + s = js_BatchPublishStart(&batch_ctx, NULL, js, NULL, NULL, NULL); + testCond((s == NATS_INVALID_ARG) && (batch_ctx == NULL)); + nats_clearLastError(); + + test("Create batch context: "); + s = js_BatchPublishStart(&batch_ctx, NULL, js, msg, NULL, NULL); + testCond(s == NATS_OK); + natsMsg_Destroy(msg); + msg = NULL; + + test("Publish message (bad args): "); + s = natsMsg_Create(&msg, "foo", NULL, "hello", 5); + if (s == NATS_OK) { + s = js_BatchPublishAdd(NULL, NULL, msg, NULL, NULL); + if (s == NATS_INVALID_ARG) + s = js_BatchPublishAdd(NULL, batch_ctx, NULL, NULL, NULL); + } + testCond((s == NATS_INVALID_ARG) && (batch_ctx != NULL)); + nats_clearLastError(); + natsMsg_Destroy(msg); + msg = NULL; + s = NATS_OK; + + test("Publish 98 messages: "); + for (uint64_t i = 0; (i < msg_count) && (s == NATS_OK); i++) + { + char data[12]; + snprintf(data, sizeof(data), "%" PRIu64, i); + s = natsMsg_Create(&msg, "foo", NULL, data, strlen(data)); + IFOK(s, js_BatchPublishAdd(NULL, batch_ctx, msg, NULL, NULL)); + natsMsg_Destroy(msg); + msg = NULL; + } + testCond(s == NATS_OK); + + test("Commit batch (bad args): "); + s = js_BatchPublishCommit(NULL, NULL, NULL, NULL, NULL); + if (s == NATS_INVALID_ARG) + s = js_BatchPublishCommit(NULL, batch_ctx, NULL, NULL, NULL); + if (s == NATS_INVALID_ARG) + { + s = natsMsg_Create(&msg, "foo", NULL, "hello", 5); + IFOK(s, js_BatchPublishCommit(NULL, NULL, msg, NULL, NULL)); + } + testCond(s == NATS_INVALID_ARG); + nats_clearLastError(); + natsMsg_Destroy(msg); + msg = NULL; + + test("Commit batch: "); + s = natsMsg_Create(&msg, "foo", NULL, "end", 3); + IFOK(s, js_BatchPublishCommit(&new_puback, batch_ctx, msg, NULL, NULL)); + testCond((s == NATS_OK) && (new_puback != NULL) && (new_puback->Count == msg_count + 2)); + + jsPubAck_Destroy(new_puback); + new_puback = NULL; + jsAtomicBatchCtx_Destroy(batch_ctx); + batch_ctx = NULL; + natsMsg_Destroy(msg); + msg = NULL; + + // Out of sequence error test + test("Create batch context: "); + s = natsMsg_Create(&msg, "foo", NULL, "hello", 5); + IFOK(s, js_BatchPublishStart(&batch_ctx, NULL, js, msg, NULL, NULL)); + testCond(s == NATS_OK); + natsMsg_Destroy(msg); + msg = NULL; + + test("Publish 98 messages: "); + for (uint64_t i = 0; (i < msg_count) && (s == NATS_OK); i++) + { + char data[12]; + snprintf(data, sizeof(data), "%" PRIu64, i); + s = natsMsg_Create(&msg, "foo", NULL, data, strlen(data)); + IFOK(s, js_BatchPublishAdd(NULL, batch_ctx, msg, NULL, NULL)); + natsMsg_Destroy(msg); + msg = NULL; + } + testCond(s == NATS_OK); + + test("Increment count, commit batch and expect error: "); + batch_ctx->count++; + s = natsMsg_Create(&msg, "foo", NULL, "end", 3); + IFOK(s, js_BatchPublishCommit(&new_puback, batch_ctx, msg, NULL, NULL)); + testCond((s == NATS_ERR)); + nats_clearLastError(); + + jsPubAck_Destroy(new_puback); + new_puback = NULL; + jsAtomicBatchCtx_Destroy(batch_ctx); + batch_ctx = NULL; + natsMsg_Destroy(msg); + msg = NULL; + + // Two batch publishes + test("Create 2 batch contexts: "); + s = natsMsg_Create(&msg, "foo", NULL, "hello", 5); + IFOK(s, natsMsg_Create(&msg2, "foo", NULL, "hello", 5)); + IFOK(s, js_BatchPublishStart(&batch_ctx, NULL, js, msg, NULL, NULL)); + IFOK(s, js_BatchPublishStart(&ctx2, NULL, js, msg, NULL, NULL)); + testCond(s == NATS_OK); + natsMsg_Destroy(msg); + msg = NULL; + natsMsg_Destroy(msg2); + msg2 = NULL; + + test("Publish 98 messages to each: "); + for (uint64_t i = 0; (i < msg_count) && (s == NATS_OK); i++) + { + char data[12]; + snprintf(data, sizeof(data), "%" PRIu64, i); + s = natsMsg_Create(&msg, "foo", NULL, data, strlen(data)); + IFOK(s, natsMsg_Create(&msg2, "foo", NULL, data, strlen(data))); + IFOK(s, js_BatchPublishAdd(NULL, batch_ctx, msg, NULL, NULL)); + IFOK(s, js_BatchPublishAdd(NULL, ctx2, msg, NULL, NULL)); + natsMsg_Destroy(msg); + msg = NULL; + natsMsg_Destroy(msg2); + msg2 = NULL; + } + testCond(s == NATS_OK); + + test("Commit both batches batch: "); + s = natsMsg_Create(&msg, "foo", NULL, "end", 3); + IFOK(s, natsMsg_Create(&msg2, "foo", NULL, "end", 3)); + IFOK(s, js_BatchPublishCommit(&new_puback, batch_ctx, msg, NULL, NULL)); + IFOK(s, js_BatchPublishCommit(&puback2, ctx2, msg, NULL, NULL)); + testCond((s == NATS_OK) && + ((new_puback != NULL) && (new_puback->Count == msg_count + 2)) && + ((puback2 != NULL) && (puback2->Count == msg_count + 2))); + + jsPubAck_Destroy(new_puback); + new_puback = NULL; + jsPubAck_Destroy(puback2); + jsAtomicBatchCtx_Destroy(batch_ctx); + batch_ctx = NULL; + jsAtomicBatchCtx_Destroy(ctx2); + batch_ctx = NULL; + natsMsg_Destroy(msg); + natsMsg_Destroy(msg2); + + JS_TEARDOWN; +} + #if defined(NATS_HAS_STREAMING) static int