Skip to content

[WIP] Add option to merge nats core and jetstream subscriptions#971

Draft
AdamPayzant wants to merge 5 commits intomainfrom
single_muxer
Draft

[WIP] Add option to merge nats core and jetstream subscriptions#971
AdamPayzant wants to merge 5 commits intomainfrom
single_muxer

Conversation

@AdamPayzant
Copy link
Collaborator

Currently 2 muxers are created for a jetstream and nats core. This can cause scale issues when many clients are connected to a cluster.

This diff aims to provide a proof of concept. @kozlovic Could you please review this or let me know if you can think of a better solution for this?

Signed-off-by: Edward Payzant <adam@synadia.com>
Signed-off-by: Edward Payzant <adam@synadia.com>
Signed-off-by: Edward Payzant <adam@synadia.com>
Signed-off-by: Edward Payzant <adam@synadia.com>
@AdamPayzant AdamPayzant requested a review from kozlovic February 20, 2026 21:15
@codecov
Copy link

codecov bot commented Feb 20, 2026

Codecov Report

❌ Patch coverage is 56.25000% with 14 lines in your changes missing coverage. Please review.
✅ Project coverage is 70.26%. Comparing base (2ce5ad8) to head (7655acd).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/sub.c 46.15% 11 Missing and 3 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #971      +/-   ##
==========================================
- Coverage   70.27%   70.26%   -0.01%     
==========================================
  Files          48       48              
  Lines       17409    17447      +38     
  Branches     3568     3575       +7     
==========================================
+ Hits        12235    12260      +25     
- Misses       1737     1743       +6     
- Partials     3437     3444       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@kozlovic
Copy link
Member

@AdamPayzant I will look at it later, but my first question is: is the scale issue related to the server or the client? Meaning, is that change supposed to help the client library or the server by reducing the number of subscriptions on the server?

@AdamPayzant
Copy link
Collaborator Author

@AdamPayzant I will look at it later, but my first question is: is the scale issue related to the server or the client? Meaning, is that change supposed to help the client library or the server by reducing the number of subscriptions on the server?

Reduce the number of subscriptions on the server. When scaled to 100s of thousands clients it was causing resource limitations

Copy link
Member

@kozlovic kozlovic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lots of changes needed.

test("Validate Core RequestMsg: ");
s = natsConnection_RequestMsg(&msg, nc, req, 100);
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.msgReceived)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would not the js_PublishMsgAsync above already trigger _recvTestString, which in this case would have already set msgReceived. So you may need to check after the js_PublishMsgAsync and before doing a plain request. Also, this test callback is going to send the message arg.string to the reply subject, so for the JS publish, it will send that to the JS reply subject, which seems wrong.

I am not entirely sure what you are trying to test here.

src/sub.c Outdated
respInfo *resp = NULL;
bool dmsg = true;

js_handleSharedReply(nc, sub, msg, closure);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that we want the same "muxer" (a single subscription for both core NATS req/reply and JS, but that still should allow us to have the subject in a way that we could have a token/byte in the subject that is an indication if this is a JS reply or not. We would check that token from the msg subject here and invoke either the existing _handleAsyncReply() (would have to be "exported" of course) or _handleAsyncReply(). There would not be a need for callback code duplication since they will do exactly what they do today. Also, it would avoid doing double processing like you do here (treat as a JS reply and a NATS reply).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have it set to now compare the length of the subject and verify it's '0', but is there a better way to go about this?

Signed-off-by: Edward Payzant <adam@synadia.com>
@AdamPayzant AdamPayzant changed the title [WIP] Add option to merge [WIP] Add option to merge nats core and jetstream subscriptions Feb 25, 2026
Copy link
Member

@kozlovic kozlovic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore the comments here. I don't think the approach is sound. There would be limitations in that it would work only for 1 JS context, the shared callback as of now would not know if it is a core or JS reply, and JS callback can invoke user callbacks, which would possibly block the processing of core replies.

I think we should have a meeting to discuss further.

printf("Received message on subject: %s subjlen = %d; subscription subj = %s, nc->reqIdOffset = %d\n",
subj, len, sub->subject, nc->reqIdOffset);

if (len == nc->reqIdOffset + 1 && subj[len - 1] == '0')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work only for the first NATS core request ;-) (the core reply subject normally is <inbox prefix>.<nuid>.<seq number>.

natsConnection *nc = js->nc;

// If either are already set, we shouldn't create a new one
if ((nc->respMux != NULL) || (js->rsub != NULL) || (js->nc != nc)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reference to js-> and nc-> fields should be protected (there are exceptions for the ones that are immutable. Note that the check for js->nc is not needed now since you do nc = js->nc.

}

natsMutex_Lock(nc->mu);
natsMutex_Lock(js->mu);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why you would do this way, but I don't recall if there is already such case of holding the two locks, this would need to be verified to see if there could be a lock ordering issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants