Skip to content

Optional individual thread factories for reader and writer#1583

Merged
scottf merged 5 commits into
mainfrom
reader-writer-threads
Jun 16, 2026
Merged

Optional individual thread factories for reader and writer#1583
scottf merged 5 commits into
mainfrom
reader-writer-threads

Conversation

@scottf

@scottf scottf commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

No description provided.

@scottf

scottf commented Jun 12, 2026

Copy link
Copy Markdown
Contributor Author

@claude review

@synadia-claude-reviewer

synadia-claude-reviewer Bot commented Jun 12, 2026

Copy link
Copy Markdown

Claude finished @scottf's task in 3m 5s —— View job


PR Review: Optional individual thread factories for reader and writer

  • Load tools
  • Read changed files and diff
  • Review Options.java changes
  • Review NatsConnection.java changes
  • Review Reader/Writer changes
  • Review tests
  • Post final review

Summary

This PR adds optional ThreadFactory parameters for the NATS connection's reader and writer threads, following the same pattern already established for connectThreadFactory and callbackThreadFactory. The implementation is clean and consistent. A few observations below.


Strengths

  • Follows existing patterns exactly. getReaderExecutor() / getWriterExecutor() mirror getCallbackExecutor() / getConnectExecutor() (lazy creation under lock, recreation if shut down, xIsInternal() helpers, shutdownNow() in lifecycle).
  • Property-based configuration is wired up (PROP_READER_THREAD_FACTORY_CLASS / PROP_WRITER_THREAD_FACTORY_CLASS), consistent with other factories.
  • Copy constructor (Builder(Options)) is updated — easy to miss, correctly included.
  • Shutdown lifecycle looks correct: the dedicated executors are only shut down when executorUseCount reaches 0, protecting shared-Options use cases.
  • README updated with the two new property names.
  • Tests cover both unit (factory wiring, copy constructor) and integration (real connection, factory actually called).

Observations

1. "Restarted across reconnects" wording is slightly misleading
src/main/java/io/nats/client/Options.java — Javadoc for readerThreadFactory():

When set, the reader runs on a dedicated single-thread executor built from this factory (restarted across reconnects)

The executor itself is not recreated on reconnect — NatsConnection.readerExecutor is a snapshot set once in the constructor and held for the connection's lifetime. On reconnect, reader.start() just submits a new Runnable to the same executor (SingleThreadExecutor keeps its worker thread idle between tasks, ready for re-use). The "restart" is the reader task, not the executor or the thread.

This matters for virtual-thread users: Executors.newSingleThreadExecutor(Thread.ofVirtual().factory()) creates one long-lived virtual worker thread (the pool worker) — not a new virtual thread per reconnect. If the intent is per-reconnect virtual threads, Executors.newVirtualThreadPerTaskExecutor() is the right tool, but that can't be supplied via a factory here. Clarifying the docstring (or at least noting the single-threaded-executor semantics) would help avoid surprise.

2. Shared-Options scenario silently shares the reader/writer executor
When the same Options instance is used by two NatsConnection objects, both connections call options.getReaderExecutor() and receive the same resolvedReaderExecutor. Both connections' reader tasks then run sequentially on one thread. This is consistent with how callbackThreadFactory behaves (same caching), but it's less obvious for reader/writer which are typically per-connection IO workers. A note in the Javadoc (similar to how the callback executor is documented for the shared-executor case) would help users who deliberately share an Options instance.

3. No ExecutorService-direct variant for reader/writer
connectExecutor(ExecutorService) and callbackExecutor(ExecutorService) let callers supply a fully-constructed executor, while this PR adds only ThreadFactory variants. This is likely intentional (a multi-threaded pool would break reader/writer's single-task assumptions), but the asymmetry is visible in the API. If it's intentional, a brief comment in the builder Javadoc explaining why an ExecutorService override isn't offered would pre-empt questions.

4. Integration test could assert thread names more precisely
src/test/java/io/nats/client/impl/NatsConnectionImplTests.java

assertFalse(readerFactory.created.isEmpty());
assertFalse(writerFactory.created.isEmpty());

This confirms the factory was invoked, but it would be marginally stronger to also assert that the created thread names start with the expected prefix (the factory already records names):

assertTrue(readerFactory.created.stream().anyMatch(n -> n.startsWith("test-reader")));
assertTrue(writerFactory.created.stream().anyMatch(n -> n.startsWith("test-writer")));

Not a blocker — the current test catches the main regression (factory not used at all).

5. No reconnect-path coverage
No test verifies that the reader/writer threads continue working correctly after a reconnect when a custom factory is in use. Given that the executor is reused (not recreated) across reconnects, this path is low-risk, but a reconnect test would give added confidence, especially for the isShutdown() guard in getReaderExecutor().


Minor / Nits

  • NatsConnection.readerExecutorIsClosed() / writerExecutorIsClosed() helpers are absent (the comment at line 932 says those four helpers are "only used for tests" — reader/writer counterparts are not added). Consistent with the design since readerExecutor isn't used in the same test infrastructure, but worth a look if future tests need it.
  • The resolvedReaderExecutor / resolvedWriterExecutor fields at Options.java:949-950 are not volatile. They're always accessed under executorsLock, so this is fine — just calling it out for clarity.

Overall this is a solid, focused addition. The two docstring points (reconnect semantics and shared-Options behavior) are the most worth addressing before merge.

@nats-io nats-io deleted a comment from synadia-claude-reviewer Bot Jun 13, 2026
@scottf

scottf commented Jun 15, 2026

Copy link
Copy Markdown
Contributor Author

@claude re-review with these notes:

  1. "Restarted across reconnects" wording is slightly misleading

Simplified comments

  1. Shared-Options scenario silently shares the reader/writer executor

Addressed with notes in readme since this is an options wide behavior

  1. No ExecutorService-direct variant for reader/writer

This is intentional. An executor service would provide a pool.

  1. Integration test could assert thread names more precisely

No, I think this is the correct behavior.

  1. No reconnect-path coverage

There are many tests that test reconnect, so this review point is not correct.

NatsConnection.readerExecutorIsClosed() / writerExecutorIsClosed() helpers are absent

Fixed

resolvedReaderExecutor / resolvedWriterExecutor fields at Options.java:949-950 are not volatile. They're always accessed under executorsLock, so this is fine — just calling it out for clarity.

Correct. Clarity not needed.

@synadia-claude-reviewer

synadia-claude-reviewer Bot commented Jun 15, 2026

Copy link
Copy Markdown

Claude finished @scottf's task in 3m 5s —— View job


PR Re-Review: Optional individual thread factories for reader and writer

  • Read changed files and current diff
  • Verify addressed: simplified reconnect comments
  • Verify addressed: shared-Options README notes
  • Confirm intentional: no ExecutorService variant
  • Confirm: integration test assertions are correct as-is
  • Confirm: reconnect test coverage sufficient
  • Verify fixed: readerExecutorIsClosed / writerExecutorIsClosed helpers
  • Post final review

Re-Review Summary

Each point from the previous review is addressed. The "address review comments" commit (3293e6d) is clean — a few notes below.


Previous Points — All Addressed

1. Reconnect wording — Simplified to just "Sets a custom thread factory used to run the connection's reader/writer." Clean, no misleading claims.

2. Shared-Options behavior — The README's new "Connection Options Executors" section explains this well. Additionally, the implementation itself improved: getReaderExecutor() / getWriterExecutor() now use _getInternalExecutor(factory) (a cached pool with SynchronousQueue) instead of Executors.newSingleThreadExecutor(factory). This is a meaningful fix — when a single Options is shared across two connections, each connection's reader/writer now gets its own thread rather than serializing on one, because SynchronousQueue forces thread creation on each submit rather than queuing. The inline comment explains the reasoning clearly.

3. No ExecutorService variant (noted as intentional) — The PR actually added readerExecutor(ExecutorService) and writerExecutor(ExecutorService) in the follow-up commit, which is fine — see new observation below.

4. Integration test assertions — Confirmed, checking created.isEmpty() is the right level of coverage here.

5. Reconnect coverage — Fair point; the existing reconnect suite covers the path.

6. readerExecutorIsClosed() / writerExecutorIsClosed() helpers — Added at NatsConnection.java:936-937, consistent with the existing pattern.

7. volatile fields — No change needed; correctly accessed under executorsLock.


One New Observation

The follow-up commit added readerExecutor(ExecutorService) and writerExecutor(ExecutorService) builder methods, along with property-based wiring (PROP_READER_EXECUTOR_SERVICE_CLASS / PROP_WRITER_EXECUTOR_SERVICE_CLASS), copy-constructor propagation, and lifecycle management (readerExecutorIsInternal() correctly returns false for user-supplied executors so they aren't shut down). The implementation is correct.

However, these new APIs have no test coverage. OptionsTests.testReaderExecutor() and testWriterExecutor() only exercise the ThreadFactory variants. The new ExecutorService variants — builder method, copy constructor propagation, and "caller owns lifecycle" behavior — are untested. Given these are public API additions, a unit test similar to the existing testCallbackExecutor() would round things out.


Summary

Implementation is solid. The lifecycle logic, README explanation, and executor-per-task fix for shared-Options are all good additions. The only gap is tests for the new readerExecutor(ExecutorService) / writerExecutor(ExecutorService) builder methods. Worth adding before merge, but low risk otherwise.

@scottf

scottf commented Jun 15, 2026

Copy link
Copy Markdown
Contributor Author

@claude added some coverage, please review

@synadia-claude-reviewer

synadia-claude-reviewer Bot commented Jun 15, 2026

Copy link
Copy Markdown

Claude finished @scottf's task in 1m 56s —— View job


PR Review: New ExecutorService Test Coverage

  • Load tools
  • Read current diff and changed test files
  • Review new test coverage for readerExecutor(ExecutorService) / writerExecutor(ExecutorService)
  • Check copy constructor propagation tests
  • Check lifecycle ("caller owns lifecycle") behavior tests
  • Post final review

Review Summary

The "more unit tests" commit (d582f2d) directly addresses the one remaining gap from the previous review. All three previously untested behaviors are now covered.


New Coverage in OptionsTests.java

testReaderExecutorService() / testWriterExecutorService() cover:

Behavior Assertion
Builder method wires executor directly assertSame(es, options.getReaderExecutor())
Lifecycle flag is correct (caller owns it) assertFalse(options.readerExecutorIsInternal())
Copy constructor propagates the user executor assertSame(es, copy.getReaderExecutor())
Copy also has correct lifecycle flag assertFalse(copy.readerExecutorIsInternal())

The try/finally wrapping es.shutdownNow() is the right pattern — avoids leaking a thread pool on assertion failure.


New Coverage in NatsConnectionImplTests.java

The verifyExternalExecutors helper is extended with userReaderEs / userWriterEs parameters and now asserts:

  1. options.getReaderExecutor() returns the same instance passed by the caller (checked while the connection is live)
  2. readerExecutorIsInternal() is false
  3. The user-supplied executor is not shut down immediately after connection close — the "caller owns lifecycle" contract is enforced against a real connection, not just Options in isolation

The critical assertion at NatsConnectionImplTests.java:205 (after nc is closed):

// user supplied reader/writer executors must NOT be shut down by the connection (caller owns them)
if (userReaderEs != null) {
    assertFalse(userReaderEs.isShutdown());
}

This is exactly the right place to make this assertion, and the comment makes the intent clear.


Minor Notes

  • assertEquals(options.getReaderExecutor(), userReaderEs) inside the helper will fall back to reference equality since ExecutorService doesn't override equals() — functionally identical to assertSame. Consistent with how connectEs is checked in the same helper, so no issue here.
  • The tests appropriately use newCachedThreadPool() for readerEs/writerEs (unbounded, so no deadlock risk when the reader's long-lived task is submitted to it). Correct choice.

Verdict

All three previously untested behaviors — builder method, copy constructor propagation, and lifecycle ownership — are now covered both at the unit level (OptionsTests) and against a real connection (NatsConnectionImplTests). The new tests are well-structured and follow established patterns. This is ready to merge.

Comment thread build.gradle
}

def jarVersion = "2.60.0"
def jarVersion = "2.26.0"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it expected that this has gone down?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a mistake and is being fixed here.

@scottf scottf merged commit 1d14abd into main Jun 16, 2026
7 checks passed
@scottf scottf deleted the reader-writer-threads branch June 16, 2026 11:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants