Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 89 additions & 3 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,94 @@
natsMsg_Destroy(msg);
}

static void
_subComplete(void *closure)
void
js_handleSharedReply(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
const char *subject = natsMsg_GetSubject(msg);
char *id = NULL;
jsCtx *js = (jsCtx*) closure;
natsMsg *pmsg = NULL;
char errTxt[256] = {'\0'};
jsPubAckErr pae;
jsPubAck pa;
struct jsOptionsPublishAsync *opa = NULL;

if ((subject == NULL) || (int) strlen(subject) <= js->rpreLen)
{
return;
}

id = (char*) (subject+js->rpreLen);

js_lock(js);

pmsg = natsStrHash_Remove(js->pm, id);
if (pmsg == NULL)
{
js_unlock(js);
return;
}

opa = &(js->opts.PublishAsync);
if (opa->AckHandler)
{
jsPubAckErr *ppae = NULL;
jsPubAck *ppa = NULL;

Check warning on line 788 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L787-L788

Added lines #L787 - L788 were not covered by tests

// If _parsePubAck returns an error, we will set the pointer ppae to
// our stack variable 'pae', otherwise, set the pointer ppa to the
// stack variable 'pa', which is the jsPubAck (positive ack).
if (_parsePubAck(msg, &pa, &pae, errTxt, sizeof(errTxt)) != NATS_OK)
ppae = &pae;
else
ppa = &pa;

Check warning on line 796 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L796

Added line #L796 was not covered by tests

// Invoke the handler with pointer to either jsPubAck or jsPubAckErr.
js_unlock(js);

Check warning on line 799 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L799

Added line #L799 was not covered by tests

(opa->AckHandler)(js, pmsg, ppa, ppae, opa->AckHandlerClosure);

Check warning on line 801 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L801

Added line #L801 was not covered by tests

js_lock(js);

Check warning on line 803 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L803

Added line #L803 was not covered by tests

_freePubAck(ppa);

Check warning on line 805 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L805

Added line #L805 was not covered by tests
// Set pmsg to NULL because user was responsible for destroying the message.
pmsg = NULL;

Check warning on line 807 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L807

Added line #L807 was not covered by tests
}
else if ((opa->ErrHandler != NULL) && (_parsePubAck(msg, NULL, &pae, errTxt, sizeof(errTxt)) != NATS_OK))

Check warning on line 809 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L809

Added line #L809 was not covered by tests
{
// We will invoke CB only if there is any kind of error.
// Associate the message with the pubAckErr object.
pae.Msg = pmsg;
js_unlock(js);

Check warning on line 814 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L813-L814

Added lines #L813 - L814 were not covered by tests

(opa->ErrHandler)(js, &pae, opa->ErrHandlerClosure);

Check warning on line 816 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L816

Added line #L816 was not covered by tests

js_lock(js);

Check warning on line 818 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L818

Added line #L818 was not covered by tests

// If the user resent the message, pae->Msg will have been cleared.
// In this case, do not destroy the message. Do not blindly destroy
// an address that could have been set, so destroy only if pmsg
// is same value than pae->Msg.
if (pae.Msg != pmsg)
pmsg = NULL;

Check warning on line 825 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L825

Added line #L825 was not covered by tests
}

// Now that the callback has returned, decrement the number of pending messages.
js->pmcount--;

// If there are callers waiting for async pub completion, or stalled async
// publish calls and we are now below max pending, broadcast to unblock them.
if (((js->pacw > 0) && (js->pmcount == 0))
|| ((js->stalled > 0) && (js->pmcount <= opa->MaxPending)))

Check warning on line 834 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L834

Added line #L834 was not covered by tests
{
natsCondition_Broadcast(js->cond);
}
js_unlock(js);
natsMsg_Destroy(pmsg);
}

void
js_subComplete(void *closure)
{
js_release((jsCtx*) closure);
}
Expand Down Expand Up @@ -801,7 +887,7 @@
{
_retain(js);
natsSubscription_SetPendingLimits(js->rsub, -1, -1);
natsSubscription_SetOnCompleteCB(js->rsub, _subComplete, (void*) js);
natsSubscription_SetOnCompleteCB(js->rsub, js_subComplete, (void*) js);
}
NATS_FREE(subj);
}
Expand Down
6 changes: 6 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,9 @@ js_maybeFetchMore(natsSubscription *sub, jsFetch *fetch);

void
js_setOnReleasedCb(jsCtx *js, js_onReleaseCb cb, void *arg);

void
js_subComplete(void *closure);

void
js_handleSharedReply(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure);
18 changes: 18 additions & 0 deletions src/msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,24 @@
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
natsMsg_Clone(natsMsg **dest, natsMsg *src)

Check warning on line 917 in src/msg.c

View check run for this annotation

Codecov / codecov/patch

src/msg.c#L917

Added line #L917 was not covered by tests
{
natsMsg *msg = NULL;

Check warning on line 919 in src/msg.c

View check run for this annotation

Codecov / codecov/patch

src/msg.c#L919

Added line #L919 was not covered by tests

if (dest == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

Check warning on line 922 in src/msg.c

View check run for this annotation

Codecov / codecov/patch

src/msg.c#L922

Added line #L922 was not covered by tests

msg = NATS_MALLOC(sizeof(natsMsg));

Check warning on line 924 in src/msg.c

View check run for this annotation

Codecov / codecov/patch

src/msg.c#L924

Added line #L924 was not covered by tests
if (msg == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

Check warning on line 926 in src/msg.c

View check run for this annotation

Codecov / codecov/patch

src/msg.c#L926

Added line #L926 was not covered by tests

memcpy(msg, src, sizeof(natsMsg));
*dest = msg;

Check warning on line 929 in src/msg.c

View check run for this annotation

Codecov / codecov/patch

src/msg.c#L928-L929

Added lines #L928 - L929 were not covered by tests

return NATS_OK;

Check warning on line 931 in src/msg.c

View check run for this annotation

Codecov / codecov/patch

src/msg.c#L931

Added line #L931 was not covered by tests
}

bool
natsMsg_IsNoResponders(natsMsg *m)
{
Expand Down
3 changes: 3 additions & 0 deletions src/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,7 @@ natsMsg_freeHeaders(natsMsg *msg);
void
natsMsg_free(void *object);

natsStatus
natsMsg_Clone(natsMsg **dest, natsMsg *src);

#endif /* MSG_H_ */
3 changes: 3 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -6351,6 +6351,9 @@ natsSubscription_SetOnCompleteCB(natsSubscription *sub, natsOnCompleteCB cb, voi
NATS_EXTERN void
natsSubscription_Destroy(natsSubscription *sub);

NATS_EXTERN natsStatus
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would need documentation here... which would help me understand what this function is supposed to do ;-)

natsSubscription_CreateSharedSubscription(natsConnection *nc, jsCtx *js);

/** @} */ // end of subGroup

#if defined(NATS_HAS_STREAMING)
Expand Down
2 changes: 2 additions & 0 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,8 @@ struct __natsSubscription

// For JetStream
jsSub *jsi;
// Counts the usage of this subscription when shared between nc and js
int shareCount;
};

typedef struct __natsPong
Expand Down
148 changes: 148 additions & 0 deletions src/sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -1370,10 +1370,158 @@
if (sub->jsi != NULL)
sub->jsi->dc = false;

if (sub->shareCount > 0)
sub->shareCount--;

natsSub_Unlock(sub);

// Don't close or free the subscription if it's still in use
if (sub->shareCount != 0)
return;

if (doUnsub)
(void)natsSubscription_Unsubscribe(sub);

natsSub_release(sub);
}

static void
_sharedRespHandler(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
char *rt = NULL;
const char *subj = NULL;
respInfo *resp = NULL;
bool dmsg = true;

js_handleSharedReply(nc, sub, msg, closure);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that we want the same "muxer" (a single subscription for both core NATS req/reply and JS, but that still should allow us to have the subject in a way that we could have a token/byte in the subject that is an indication if this is a JS reply or not. We would check that token from the msg subject here and invoke either the existing _handleAsyncReply() (would have to be "exported" of course) or _handleAsyncReply(). There would not be a need for callback code duplication since they will do exactly what they do today. Also, it would avoid doing double processing like you do here (treat as a JS reply and a NATS reply).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have it set to now compare the length of the subject and verify it's '0', but is there a better way to go about this?


natsConn_Lock(nc);
if (natsConn_isClosed(nc))
{
natsConn_Unlock(nc);
natsMsg_Destroy(msg);
return;

Check warning on line 1403 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1401-L1403

Added lines #L1401 - L1403 were not covered by tests
}
subj = natsMsg_GetSubject(msg);
// We look for the reply token by first checking that the message subject
// prefix matches the subscription's subject (without the last '*').
// It is possible that it does not due to subject rewrite (JetStream).
if (((int) strlen(subj) > nc->reqIdOffset)
&& (memcmp((const void*) sub->subject, (const void*) subj, strlen(sub->subject) - 1) == 0))
{
rt = (char*) (natsMsg_GetSubject(msg) + nc->reqIdOffset);
resp = (respInfo*) natsStrHash_Remove(nc->respMap, rt);
}
else if (natsStrHash_Count(nc->respMap) == 1)

Check warning on line 1415 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1415

Added line #L1415 was not covered by tests
{
// Only if the subject is completely different, we assume that it
// could be the server that has rewritten the subject and so if there
// is a single entry, use that.
void *value = NULL;
natsStrHash_RemoveSingle(nc->respMap, NULL, &value);
resp = (respInfo*) value;

Check warning on line 1422 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1420-L1422

Added lines #L1420 - L1422 were not covered by tests
}
if (resp != NULL)
{
natsMutex_Lock(resp->mu);
// Check for the race where the requestor has already timed-out.
// If so, resp->removed will be true, in which case simply discard
// the message.
if (!resp->removed)
{

dmsg = false;
resp->msg = msg;
resp->removed = true;
natsCondition_Signal(resp->cond);
}
natsMutex_Unlock(resp->mu);
}
natsConn_Unlock(nc);

if (dmsg)
natsMsg_Destroy(msg);
}

natsStatus
natsSubscription_CreateSharedSubscription(natsConnection *nc, jsCtx *js)
{
natsSubscription *sub = NULL;
natsStatus s = NATS_OK;

// If either are already set, we shouldn't create a new one
if ((nc->respMux != NULL) || (js->rsub != NULL) || (js->nc != nc)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reference to js-> and nc-> fields should be protected (there are exceptions for the ones that are immutable. Note that the check for js->nc is not needed now since you do nc = js->nc.

return NATS_INVALID_ARG;
}

nc->respPool = NATS_CALLOC(RESP_INFO_POOL_MAX_SIZE, sizeof(respInfo*));
if (nc->respPool == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);
if (s == NATS_OK)

Check warning on line 1460 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1459-L1460

Added lines #L1459 - L1460 were not covered by tests
s = natsStrHash_Create(&nc->respMap, 4);

s = natsCondition_Create(&(js->cond));
IFOK(s, natsStrHash_Create(&(js->pm), 64));
if (s == NATS_OK)
{
js->rpre = NATS_MALLOC(js->rpreLen+1);
if (js->rpre == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);

Check warning on line 1469 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1469

Added line #L1469 was not covered by tests
else
{
char nuid[NUID_BUFFER_LEN+1];

s = natsNUID_Next(nuid, sizeof(nuid));
if (s == NATS_OK)
{
memcpy(js->rpre, js->nc->inboxPfx, js->nc->inboxPfxLen);
memcpy(js->rpre+js->nc->inboxPfxLen, nuid+((int)strlen(nuid)-8), 8);
js->rpre[js->rpreLen-1] = '.';
js->rpre[js->rpreLen] = '\0';
}
}
}
if (s == NATS_OK)
{
char *subj = NULL;

if (nats_asprintf(&subj, "%s*", js->rpre) < 0)
s = nats_setDefaultError(NATS_NO_MEMORY);

Check warning on line 1489 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1489

Added line #L1489 was not covered by tests
else
{
s = natsConn_subscribeNoPool(&sub, js->nc, subj, _sharedRespHandler, (void*) js);
if (s == NATS_OK)
{
nc->respSub = subj;
nc->reqIdOffset = js->rpreLen;

sub->shareCount = 2;
nc->respMux = sub;
js->rsub = sub;
}
else {
NATS_FREE(subj);

Check warning on line 1503 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1503

Added line #L1503 was not covered by tests
}
}

if (s == NATS_OK)
{
_retain(js);
natsSubscription_SetPendingLimits(js->rsub, -1, -1);
natsSubscription_SetOnCompleteCB(js->rsub, js_subComplete, (void*) js);
}
}
if (s != NATS_OK)

Check warning on line 1514 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1514

Added line #L1514 was not covered by tests
{
// Undo the things we created so we retry again next time.
// It is either that or we have to always check individual
// objects to know if we have to create them.
NATS_FREE(js->rpre);
js->rpre = NULL;
natsStrHash_Destroy(js->pm);
js->pm = NULL;
natsCondition_Destroy(js->cond);
js->cond = NULL;

Check warning on line 1524 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1519-L1524

Added lines #L1519 - L1524 were not covered by tests
}
return NATS_UPDATE_ERR_STACK(s);
}
1 change: 1 addition & 0 deletions test/list_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ _test(JetStreamPublish)
_test(JetStreamPublishAckHandler)
_test(JetStreamPublishAsync)
_test(JetStreamPublishTTL)
_test(JetStreamSharedSub)
_test(JetStreamStreamsSealAndRollup)
_test(JetStreamSubscribe)
_test(JetStreamSubscribeConfigCheck)
Expand Down
54 changes: 54 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -42397,3 +42397,57 @@ int main(int argc, char **argv)
printf("ALL PASSED\n");
return 0;
}

void test_JetStreamSharedSub(void)
{
natsStatus s;
natsMsg *msg = NULL;
natsMsg *req = NULL;
struct threadArg arg;
natsSubscription *sub = NULL;

JS_SETUP(2, 12, 0);

s = _createDefaultThreadArgsForCbTests(&arg);
if ( s != NATS_OK)
FAIL("Unable to setup test!");

arg.string = "I will help you";
arg.status = NATS_OK;
arg.control= 4;

test("Creating shared subscription: ");
s = natsSubscription_CreateSharedSubscription(nc, js);
testCond(s == NATS_OK);

test("Subscribe: ");
IFOK(s, natsConnection_Subscribe(&sub, nc, "foo", _recvTestString, (void*) &arg));
testCond(s == NATS_OK);

test("Validate Jetstream Publish: ");
s = natsMsg_Create(&req, "foo", NULL, "help", 4);
IFOK(s, js_PublishMsgAsync(js, &req, NULL));
testCond(s == NATS_OK);

test("Wait for publish ack: ");
s = js_PublishAsyncComplete(js, NULL);
testCond(s == NATS_OK);

test("Create req message: ");
s = natsMsg_Create(&req, "foo", NULL, "help", 4);
testCond(s == NATS_OK);

test("Validate Core RequestMsg: ");
s = natsConnection_RequestMsg(&msg, nc, req, 100);
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.msgReceived)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would not the js_PublishMsgAsync above already trigger _recvTestString, which in this case would have already set msgReceived. So you may need to check after the js_PublishMsgAsync and before doing a plain request. Also, this test callback is going to send the message arg.string to the reply subject, so for the JS publish, it will send that to the JS reply subject, which seems wrong.

I am not entirely sure what you are trying to test here.

s = natsCondition_TimedWait(arg.c, arg.m, 2000);
testCond(s == NATS_OK);
natsMsg_Destroy(req);
natsMsg_Destroy(msg);

natsSubscription_Destroy(sub);
_destroyDefaultThreadArgs(&arg);

JS_TEARDOWN;
}
Loading