Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,7 @@ natsConn_close(natsConnection *nc);
void
natsConn_destroy(natsConnection *nc, bool fromPublicDestroy);

void
natsConnection_respHandler(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure);

#endif /* CONN_H_ */
12 changes: 6 additions & 6 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -663,8 +663,8 @@ _parsePubAck(natsMsg *msg, jsPubAck *pa, jsPubAckErr *pae, char *errTxt, size_t
return s;
}

static void
_handleAsyncReply(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
void
js_handleAsyncReply(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
const char *subject = natsMsg_GetSubject(msg);
char *id = NULL;
Expand Down Expand Up @@ -753,8 +753,8 @@ _handleAsyncReply(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void
natsMsg_Destroy(msg);
}

static void
_subComplete(void *closure)
void
js_subComplete(void *closure)
{
js_release((jsCtx*) closure);
}
Expand Down Expand Up @@ -796,12 +796,12 @@ _newAsyncReply(char *reply, jsCtx *js)
if (nats_asprintf(&subj, "%s*", js->rpre) < 0)
s = nats_setDefaultError(NATS_NO_MEMORY);
else
s = natsConn_subscribeNoPool(&(js->rsub), js->nc, subj, _handleAsyncReply, (void*) js);
s = natsConn_subscribeNoPool(&(js->rsub), js->nc, subj, js_handleAsyncReply, (void*) js);
if (s == NATS_OK)
{
_retain(js);
natsSubscription_SetPendingLimits(js->rsub, -1, -1);
natsSubscription_SetOnCompleteCB(js->rsub, _subComplete, (void*) js);
natsSubscription_SetOnCompleteCB(js->rsub, js_subComplete, (void*) js);
}
NATS_FREE(subj);
}
Expand Down
6 changes: 6 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,9 @@ js_maybeFetchMore(natsSubscription *sub, jsFetch *fetch);

void
js_setOnReleasedCb(jsCtx *js, js_onReleaseCb cb, void *arg);

void
js_subComplete(void *closure);

void
js_handleAsyncReply(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure);
10 changes: 10 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -6351,6 +6351,16 @@ natsSubscription_SetOnCompleteCB(natsSubscription *sub, natsOnCompleteCB cb, voi
NATS_EXTERN void
natsSubscription_Destroy(natsSubscription *sub);

/** \breif Create a shared subscription for a nats Connection and jetstream stream
*
* Creates a subscription shared between a jetstream stream and it's nats connection
* to be used for jetstream publishes and req/reply.
*
* @param js the pointer to the #jsCtx object.
*/
NATS_EXTERN natsStatus
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would need documentation here... which would help me understand what this function is supposed to do ;-)

natsSubscription_CreateSharedSubscription(jsCtx *js);

/** @} */ // end of subGroup

#if defined(NATS_HAS_STREAMING)
Expand Down
2 changes: 2 additions & 0 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,8 @@ struct __natsSubscription

// For JetStream
jsSub *jsi;
// Counts the usage of this subscription when shared between nc and js
int shareCount;
};

typedef struct __natsPong
Expand Down
6 changes: 3 additions & 3 deletions src/pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ _oldRequestMsg(natsMsg **replyMsg, natsConnection *nc,
return NATS_UPDATE_ERR_STACK(s);
}

static void
_respHandler(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
void
natsConnection_respHandler(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
char *rt = NULL;
const char *subj = NULL;
Expand Down Expand Up @@ -472,7 +472,7 @@ natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,

// Setup only once (but could be more if natsConn_initResp() returns != OK)
if (nc->respMux == NULL)
s = natsConn_initResp(nc, _respHandler);
s = natsConn_initResp(nc, natsConnection_respHandler);
if (s == NATS_OK)
s = natsConn_addRespInfo(&resp, nc, respInbox);

Expand Down
121 changes: 121 additions & 0 deletions src/sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,7 @@
void natsSubscription_Destroy(natsSubscription *sub)
{
bool doUnsub = false;
int shareCount = 0;

if (sub == NULL)
return;
Expand All @@ -1370,10 +1371,130 @@
if (sub->jsi != NULL)
sub->jsi->dc = false;

if (sub->shareCount > 0)
sub->shareCount--;

shareCount = sub->shareCount;
natsSub_Unlock(sub);

// Don't close or free the subscription if it's still in use
if (shareCount != 0)
return;

if (doUnsub)
(void)natsSubscription_Unsubscribe(sub);

natsSub_release(sub);
}

static void
_sharedRespHandler(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
int len = 0;
const char *subj = subj = natsMsg_GetSubject(msg);

len = strlen(subj);
printf("Received message on subject: %s subjlen = %d; subscription subj = %s, nc->reqIdOffset = %d\n",
subj, len, sub->subject, nc->reqIdOffset);

if (len == nc->reqIdOffset + 1 && subj[len - 1] == '0')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work only for the first NATS core request ;-) (the core reply subject normally is <inbox prefix>.<nuid>.<seq number>.

{
printf("In nc resp handler\n");
natsConnection_respHandler(nc, sub, msg, closure);
}
else
{
printf("In js async reply handler\n");
js_handleAsyncReply(nc, sub, msg, closure);
}
}

natsStatus
natsSubscription_CreateSharedSubscription(jsCtx *js)
{
natsSubscription *sub = NULL;
natsStatus s = NATS_OK;
natsConnection *nc = js->nc;

// If either are already set, we shouldn't create a new one
if ((nc->respMux != NULL) || (js->rsub != NULL) || (js->nc != nc)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reference to js-> and nc-> fields should be protected (there are exceptions for the ones that are immutable. Note that the check for js->nc is not needed now since you do nc = js->nc.

return NATS_INVALID_ARG;
}

natsMutex_Lock(nc->mu);
natsMutex_Lock(js->mu);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why you would do this way, but I don't recall if there is already such case of holding the two locks, this would need to be verified to see if there could be a lock ordering issue.


nc->respPool = NATS_CALLOC(RESP_INFO_POOL_MAX_SIZE, sizeof(respInfo*));
if (nc->respPool == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);
if (s == NATS_OK)

Check warning on line 1430 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1429-L1430

Added lines #L1429 - L1430 were not covered by tests
s = natsStrHash_Create(&nc->respMap, 4);

IFOK(s, natsCondition_Create(&(js->cond)));
IFOK(s, natsStrHash_Create(&(js->pm), 64));
if (s == NATS_OK)
{
js->rpre = NATS_MALLOC(js->rpreLen+1);
if (js->rpre == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);

Check warning on line 1439 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1439

Added line #L1439 was not covered by tests
else
{
char nuid[NUID_BUFFER_LEN+1];

s = natsNUID_Next(nuid, sizeof(nuid));
if (s == NATS_OK)
{
memcpy(js->rpre, js->nc->inboxPfx, js->nc->inboxPfxLen);
memcpy(js->rpre+js->nc->inboxPfxLen, nuid+((int)strlen(nuid)-8), 8);
js->rpre[js->rpreLen-1] = '.';
js->rpre[js->rpreLen] = '\0';
}
}
}
if (s == NATS_OK)
{
char *subj = NULL;

if (nats_asprintf(&subj, "%s*", js->rpre) < 0)
s = nats_setDefaultError(NATS_NO_MEMORY);

Check warning on line 1459 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1459

Added line #L1459 was not covered by tests
else
{
s = natsConn_subscribeNoPool(&sub, js->nc, subj, _sharedRespHandler, (void*) js);
if (s == NATS_OK)
{
nc->respSub = subj;
nc->reqIdOffset = js->rpreLen;

sub->shareCount = 2;
nc->respMux = sub;
js->rsub = sub;
}
else {
NATS_FREE(subj);

Check warning on line 1473 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1473

Added line #L1473 was not covered by tests
}
}

if (s == NATS_OK)
{
_retain(js);
natsSubscription_SetPendingLimits(js->rsub, -1, -1);
natsSubscription_SetOnCompleteCB(js->rsub, js_subComplete, (void*) js);
}
}
if (s != NATS_OK)
{
// Undo the things we created so we retry again next time.
// It is either that or we have to always check individual
// objects to know if we have to create them.
NATS_FREE(js->rpre);
js->rpre = NULL;
natsStrHash_Destroy(js->pm);
js->pm = NULL;
natsCondition_Destroy(js->cond);
js->cond = NULL;

Check warning on line 1494 in src/sub.c

View check run for this annotation

Codecov / codecov/patch

src/sub.c#L1489-L1494

Added lines #L1489 - L1494 were not covered by tests
}

natsMutex_Unlock(nc->mu);
natsMutex_Unlock(js->mu);
return NATS_UPDATE_ERR_STACK(s);
}
1 change: 1 addition & 0 deletions test/list_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ _test(JetStreamPublish)
_test(JetStreamPublishAckHandler)
_test(JetStreamPublishAsync)
_test(JetStreamPublishTTL)
_test(JetStreamSharedSub)
_test(JetStreamStreamsSealAndRollup)
_test(JetStreamSubscribe)
_test(JetStreamSubscribeConfigCheck)
Expand Down
56 changes: 56 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -42397,3 +42397,59 @@ int main(int argc, char **argv)
printf("ALL PASSED\n");
return 0;
}

void test_JetStreamSharedSub(void)
{
natsStatus s;
natsMsg *msg = NULL;
natsMsg *req = NULL;
struct threadArg arg;
natsSubscription *sub = NULL;

JS_SETUP(2, 12, 0);

s = _createDefaultThreadArgsForCbTests(&arg);
if ( s != NATS_OK)
FAIL("Unable to setup test!");

arg.string = "I will help you";
arg.status = NATS_OK;
arg.control= 4;

test("Creating shared subscription: ");
s = natsSubscription_CreateSharedSubscription(js);
testCond(s == NATS_OK);

test("Subscribe: ");
IFOK(s, natsConnection_Subscribe(&sub, nc, "foo", _recvTestString, (void*) &arg));
testCond(s == NATS_OK);

test("Validate Jetstream Publish: ");
s = natsMsg_Create(&req, "foo", NULL, "help", 4);
IFOK(s, js_PublishMsgAsync(js, &req, NULL));
testCond(s == NATS_OK);

test("Wait for publish ack: ");
s = js_PublishAsyncComplete(js, NULL);
testCond(s == NATS_OK);
natsMsg_Destroy(req);

test("Create req message: ");
s = natsMsg_Create(&req, "foo", NULL, "help", 4);
testCond(s == NATS_OK);

test("Validate Core RequestMsg: ");
s = natsConnection_RequestMsg(&msg, nc, req, 100);
natsMutex_Lock(arg.m);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would not the js_PublishMsgAsync above already trigger _recvTestString, which in this case would have already set msgReceived. So you may need to check after the js_PublishMsgAsync and before doing a plain request. Also, this test callback is going to send the message arg.string to the reply subject, so for the JS publish, it will send that to the JS reply subject, which seems wrong.

I am not entirely sure what you are trying to test here.

while ((s != NATS_TIMEOUT) && !arg.msgReceived)
s = natsCondition_TimedWait(arg.c, arg.m, 2000);
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);
natsMsg_Destroy(req);
natsMsg_Destroy(msg);

natsSubscription_Destroy(sub);
_destroyDefaultThreadArgs(&arg);

JS_TEARDOWN;
}