Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 114 additions & 5 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@

memset(ar, 0, sizeof(jsApiResponse));

// Server can return zero length response
if (resp->dataLen == 0)
return NATS_OK;

s = nats_JSONParse(&json, natsMsg_GetData(resp), natsMsg_GetDataLength(resp));
if (s != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);
Expand Down Expand Up @@ -547,7 +551,7 @@
IFOK_JSR(s, natsConnection_RequestMsg(&resp, js->nc, msg, ttl));
if (s == NATS_OK)
s = js_unmarshalResponse(&ar, &json, resp);
if (s == NATS_OK)
if (s == NATS_OK && json != NULL)
{
if (js_apiResponseIsErr(&ar))
{
Expand All @@ -569,6 +573,8 @@
IFOK(s, nats_JSONGetULong(json, "seq", &(pa->Sequence)));
IFOK(s, nats_JSONGetBool(json, "duplicate", &(pa->Duplicate)));
IFOK(s, nats_JSONGetStr(json, "domain", &(pa->Domain)));
IFOK(s, nats_JSONGetStr(json, "batch", &(pa->Batch)));
IFOK(s, nats_JSONGetULong(json, "count", &(pa->Count)));

if (s == NATS_OK)
*new_puback = pa;
Expand All @@ -579,6 +585,9 @@
js_freeApiRespContent(&ar);
nats_JSONDestroy(json);
}
else if ((s == NATS_OK) && (new_puback != NULL))
*new_puback = NULL;

Check warning on line 589 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L589

Added line #L589 was not covered by tests

natsMsg_Destroy(resp);
return NATS_UPDATE_ERR_STACK(s);
}
Expand All @@ -591,6 +600,7 @@

NATS_FREE(pa->Stream);
NATS_FREE(pa->Domain);
NATS_FREE(pa->Batch);
NATS_FREE(pa);
}

Expand Down Expand Up @@ -637,10 +647,15 @@
else if (pa != NULL)
{
memset(pa, 0, sizeof(jsPubAck));
s = nats_JSONGetStr(json, "stream", &(pa->Stream));
IFOK(s, nats_JSONGetULong(json, "seq", &(pa->Sequence)));
IFOK(s, nats_JSONGetBool(json, "duplicate", &(pa->Duplicate)));
IFOK(s, nats_JSONGetStr(json, "domain", &(pa->Domain)));
if (json != NULL)
{
s = nats_JSONGetStr(json, "stream", &(pa->Stream));
IFOK(s, nats_JSONGetULong(json, "seq", &(pa->Sequence)));
IFOK(s, nats_JSONGetBool(json, "duplicate", &(pa->Duplicate)));
IFOK(s, nats_JSONGetStr(json, "domain", &(pa->Domain)));
IFOK(s, nats_JSONGetStr(json, "batch", &(pa->Batch)));
IFOK(s, nats_JSONGetULong(json, "count", &(pa->Count)));
}
}

js_freeApiRespContent(&ar);
Expand Down Expand Up @@ -3673,3 +3688,97 @@
js->onReleaseCbArg = arg;
js_unlock(js);
}

natsStatus
js_BatchPublishAdd(jsPubAck **new_puback, jsAtomicBatchCtx *ctx, natsMsg *msg,
jsPubOptions *opts, jsErrCode *errCode)
{
natsStatus s = NATS_OK;
char temp[64] = {'\0'};

if ((ctx == NULL) || (msg == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

natsMutex_Lock(ctx->mu);

s = natsMsgHeader_Set(msg, jsNatsBatchIdHdr, ctx->id);
if (s == NATS_OK)
{
if (snprintf(temp, sizeof(temp), "%" PRIu64, ctx->count+1) < 1)
s = nats_setDefaultError(NATS_NO_MEMORY);

Check warning on line 3708 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L3708

Added line #L3708 was not covered by tests
else
s = natsMsgHeader_Set(msg, jsNatsBatchSequenceHdr, temp);
}
if (s == NATS_OK)
{
ctx->count++;
}

IFOK(s, js_PublishMsg(new_puback, ctx->js, msg, opts, errCode));

natsMutex_Unlock(ctx->mu);
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_BatchPublishCommit(jsPubAck **new_puback, jsAtomicBatchCtx *ctx, natsMsg *msg,
jsPubOptions *opts, jsErrCode *errCode)
{
natsStatus s = NATS_OK;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, not checking explicitly on parameters are ok because you are invoking public APIs that do already check for them (or should).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and added them so that we don't manipulate the header unnecessarily

if ((ctx == NULL) || (msg == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

s = natsMsgHeader_Set(msg, jsNatsBatchCommit, "1");
IFOK(s, js_BatchPublishAdd(new_puback, ctx, msg, opts, errCode));

return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_BatchPublishStart(jsAtomicBatchCtx **ctx, jsPubAck **new_puback, jsCtx *js,
natsMsg *msg, jsPubOptions *opts, jsErrCode *errCode)
{
jsAtomicBatchCtx *batchCtx = NULL;
natsStatus s = NATS_OK;

if ((ctx == NULL) || (js == NULL) || (msg == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

batchCtx = NATS_CALLOC(1, sizeof(jsAtomicBatchCtx));
if (batchCtx == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

Check warning on line 3750 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L3750

Added line #L3750 was not covered by tests
batchCtx->id = NATS_MALLOC(NUID_BUFFER_LEN + 1);
if (batchCtx->id == NULL)
{
NATS_FREE(batchCtx);
return nats_setDefaultError(NATS_NO_MEMORY);

Check warning on line 3755 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L3754-L3755

Added lines #L3754 - L3755 were not covered by tests
}

js_retain(js);
batchCtx->js = js;
s = natsMutex_Create(&batchCtx->mu);
IFOK(s, natsNUID_Next(batchCtx->id, NUID_BUFFER_LEN + 1));
if (s != NATS_OK)
{
jsAtomicBatchCtx_Destroy(batchCtx);
return NATS_UPDATE_ERR_STACK(s);

Check warning on line 3765 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L3764-L3765

Added lines #L3764 - L3765 were not covered by tests
}

*ctx = batchCtx;
s = js_BatchPublishAdd(new_puback, *ctx, msg, opts, errCode);

return NATS_UPDATE_ERR_STACK(s);

Check warning on line 3771 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L3771

Added line #L3771 was not covered by tests
}

void
jsAtomicBatchCtx_Destroy(jsAtomicBatchCtx *ctx)
{
if (ctx == NULL)
return;

js_release(ctx->js);
natsMutex_Destroy(ctx->mu);
NATS_FREE(ctx->id);
NATS_FREE(ctx);
}
3 changes: 3 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ extern const int64_t jsDefaultRequestWait;
#define jsExpectedLastMsgIdHdr "Nats-Expected-Last-Msg-Id"
#define jsConsumerStalledHdr "Nats-Consumer-Stalled"
#define jsConsumerPinIDHdr "Nats-Pin-Id"
#define jsNatsBatchIdHdr "Nats-Batch-Id"
#define jsNatsBatchSequenceHdr "Nats-Batch-Sequence"
#define jsNatsBatchCommit "Nats-Batch-Commit"

#define jsErrStreamNameRequired "stream name is required"
#define jsErrConsumerNameRequired "consumer name is required"
Expand Down
3 changes: 3 additions & 0 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig
IFOK(s, nats_JSONGetLong(jcfg, "subject_delete_marker_ttl", &(cfg->SubjectDeleteMarkerTTL)));
IFOK(s, nats_JSONGetBool(jcfg, "allow_msg_ttl", &(cfg->AllowMsgTTL)));
IFOK(s, _unmarshalPersistModeType(jcfg, &(cfg->PersistMode)));
IFOK(s, nats_JSONGetBool(jcfg, "allow_atomic", &(cfg->AllowAtomic)));

if (s == NATS_OK)
*new_cfg = cfg;
Expand Down Expand Up @@ -912,6 +913,8 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg)
IFOK(s, natsBuf_Append(buf, ",\"mirror_direct\":true", -1));
if ((s == NATS_OK) && cfg->DiscardNewPerSubject)
IFOK(s, natsBuf_Append(buf, ",\"discard_new_per_subject\":true", -1));
if ((s == NATS_OK) && cfg->AllowAtomic)
IFOK(s, natsBuf_Append(buf, ",\"allow_atomic\":true", -1));

IFOK(s, nats_marshalMetadata(buf, true, "metadata", &(cfg->Metadata)));
IFOK(s, _marshalStorageCompression(cfg->Compression, buf));
Expand Down
73 changes: 73 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ typedef struct natsMetadata
*/
typedef struct __jsCtx jsCtx;

/**
* The JetStream atomic batch context. Use for JetStream atomic batch publish.
*/
typedef struct __jsAtomicBatchCtx jsAtomicBatchCtx;

/**
* JetStream publish options.
*
Expand Down Expand Up @@ -644,6 +649,8 @@ typedef struct jsStreamConfig {
/// v2.12.0 or later.
jsPersistModeType PersistMode;

/// @brief Enables sending atomic batch publishing into the stream
bool AllowAtomic;
} jsStreamConfig;

/**
Expand Down Expand Up @@ -1227,6 +1234,8 @@ typedef struct jsPubAck
uint64_t Sequence;
char *Domain;
bool Duplicate;
char *Batch;
uint64_t Count;

} jsPubAck;

Expand Down Expand Up @@ -7543,6 +7552,70 @@ js_PublishAsyncComplete(jsCtx *js, jsPubOptions *opts);
NATS_EXTERN natsStatus
js_PublishAsyncGetPendingList(natsMsgList *pending, jsCtx *js);

/** \brief Starts an atomic batch publish.
*
* This call initializes an atomic batch publish and sends the first message.
*
* \note The returned context must be destroyed with #jsAtomicBatchCtx_Destroy
* after the publish is committed or aborted.
* \note The returned #jsPubAck object needs to be destroyed with #jsPubAck_Destroy
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: two \note are rendered as a single with doxygen unfortunately.

* when no longer needed.
*
* @param ctx Where to store the atomic batch context for subsequent publishes.
* @param new_puback Where to store the pub ack for the first message, or `NULL` if not needed.
* @param js the pointer to the #jsCtx object.
* @param msg the message to publish as part of this batch.
* @param opts the publish options, possibly `NULL`.
* @param errCode the location where to store the JetStream specific error code, possibly `NULL`.
*/
NATS_EXTERN natsStatus
js_BatchPublishStart(jsAtomicBatchCtx **ctx, jsPubAck **new_puback, jsCtx *js,
natsMsg *msg, jsPubOptions *opts, jsErrCode *errCode);

/** \brief Adds a message to the batch publish.
*
* This call adds a message to the batch publish initialized by #js_BatchPublishStart.
*
* \note The returned #jsPubAck object needs to be destroyed with #jsPubAck_Destroy.
*
* @param new_puback Where to store the pub ack for this message, or `NULL` if not needed.
* @param ctx the atomic batch context returned by #js_BatchPublishStart.
* @param msg the message to publish as part of this batch.
* @param opts the publish options, possibly `NULL`.
* @param errCode the location where to store the JetStream specific error code, possibly `NULL`.
*/
NATS_EXTERN natsStatus
js_BatchPublishAdd(jsPubAck **new_puback, jsAtomicBatchCtx *ctx, natsMsg *msg,
jsPubOptions *opts, jsErrCode *errCode);

/** \brief Commits the batch publish.
*
* This call commits the batch publish initialized by #js_BatchPublishStart
* and added to by #js_BatchPublishAdd.
*
* \note The returned #jsPubAck object needs to be destroyed with #jsPubAck_Destroy.
* \note After this call, the context should be destroyed with #jsAtomicBatchCtx_Destroy.
*
* @param new_puback Where to store the pub ack for the commit, or `NULL` if not needed.
* @param ctx the atomic batch context returned by #js_BatchPublishStart.
* @param msg the message to publish as part of this batch.
* @param opts the publish options, possibly `NULL`.
* @param errCode the location where to store the JetStream specific error code, possibly `NULL`.
*/
NATS_EXTERN natsStatus
js_BatchPublishCommit(jsPubAck **new_puback, jsAtomicBatchCtx *ctx, natsMsg *msg,
jsPubOptions *opts, jsErrCode *errCode);


/** \brief Destroys the atomic batch context object.
*
* Releases memory allocated for this atomic batch context object.
*
* @param ctx the pointer to the #jsAtomicBatchCtx object.
*/
NATS_EXTERN void
jsAtomicBatchCtx_Destroy(jsAtomicBatchCtx *ctx);

/** @} */ // end of jsPubGroup

/** \defgroup jsSubGroup Subscribing
Expand Down
9 changes: 9 additions & 0 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,15 @@ struct __jsCtx
void *onReleaseCbArg;
};

struct __jsAtomicBatchCtx
{
natsMutex *mu;
char *id;
jsCtx *js;
uint64_t count;

};

typedef struct __jsFetch
{
struct jsOptionsPullSubscribeAsync opts;
Expand Down
7 changes: 7 additions & 0 deletions src/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,13 @@ typedef enum {
JSMessageTTLInvalidErr = 10165, ///< Invalid per-message TTL
JSMessageTTLDisabledErr = 10166, ///< Per-message TTL is disabled
JSStreamTooManyRequestsErr = 10167, ///< Too many requests
JSBatchPublishNotEnabledErr = 10174, ///< Batch publish not enabled on stream
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this PR, but we may have to add missing ones since I am not sure this was updated with server additions.

JSBatchPublishIncompleteErr = 10176, ///< Batch publish is incomplete and was abandoned
JSBatchPublishInvalidIDErr = 10179, ///< Batch publish ID is invalid (exceeds 64 characters)
JSBatchPublishSequenceMissingErr = 10175, ///< Batch publish sequence is missing
JSBatchPublishSequenceExceedsLimitErr = 10199, ///< Batch publish sequence exceeds server limit (default 1000)
JSBatchPublishUnsupportedHeaderErr = 10177, ///< Batch publish unsupported header used (Nats-Expected-Last-Msg-Id)
JSBatchPublishDuplicateMessageIDErr = 10201, ///< Batch publish contains duplicate message id (Nats-Msg-Id)

} jsErrCode;

Expand Down
1 change: 1 addition & 0 deletions test/list_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ _test(IsClosed)
_test(IsReconnectingAndStatus)
_test(IsValidSubscriber)
_test(JetStream_GH823)
_test(JetStreamAtomicBatchPublish)
_test(JetStreamBackOffRedeliveries)
_test(JetStreamClusterInfo)
_test(JetStreamContext)
Expand Down
Loading
Loading