Skip to content

Commit 42e2ae9

Browse files
committed
refactor(engine): drain tasks before eager WINDOW for accept-side child
Replace the conditional task-queue-defer approach (commit bcf7b03 + Phase 1 cc83e5e/484fb4eb) with a simpler in-line task drain. The accepted-side WINDOW is once again emitted eagerly inside ZillaStreamFactory.Stream.onBegin (pre-Phase-1 behaviour). For accept-side child channels, ZillaEngine.drainTasks() is called between beginInputFuture.setSuccess() and the eager doWindow so any tasks queued during fireChannelBound's ACCEPTED-event chain (e.g., an AdviseInputTask from a `read advise` placed before `connected`) run synchronously and write their frames to streamsBuffer ahead of the WINDOW. Wire order for trick scripts: ACCEPTED → script statements (queue AdviseInputTask) → drainTasks (CHALLENGE written) → eager WINDOW written. Same ordering as Phase 1's intended CHALLENGE→WINDOW; no flag, no submitTask gymnastics. Wire order for non-trick scripts (e.g., binding-http AdvisoryIT): same as pre-Phase-1 — no tasks queued before drain runs (drain is a no-op), eager WINDOW written immediately. Removes: - ZillaChannel.windowNeedsTask field + setWindowNeedsTask/windowNeedsTask methods - ZillaEngine.adviseInput's flag-set call - ZillaPartition windowFuture listener's doWindow + submitTask logic (reverts listener to pre-Phase-1: just fireChannelConnected) Adds: - ZillaEngine.drainTasks() — package-private, delegates to executeTasks Tests: engine driver ITs (DuplexIT 72/72, HalfDuplexIT 64/64, SimplexIT 35/35, EngineIT 14/14); binding-http 374/374 (both client.AdvisoryIT.shouldSendRequestAndFlush and server.AdvisoryIT.shouldSendRequestAndFlush restored); binding-mcp 138/138; binding-mcp.spec ApplicationIT/NetworkIT all green. https://claude.ai/code/session_01Sm5BSDDaRNueem3xUeM2Li
1 parent bcf7b03 commit 42e2ae9

4 files changed

Lines changed: 13 additions & 47 deletions

File tree

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/k3po/ext/behavior/ZillaChannel.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ public abstract class ZillaChannel extends AbstractChannel<ZillaChannelConfig>
7676

7777
private int capabilities;
7878
private boolean flushable;
79-
private boolean windowNeedsTask;
8079

8180
private DefaultBudgetDebitor debitor;
8281
private long debitorIndex = -1L;
@@ -671,16 +670,6 @@ public void setFlushable()
671670
flushable = true;
672671
}
673672

674-
public void setWindowNeedsTask()
675-
{
676-
windowNeedsTask = true;
677-
}
678-
679-
public boolean windowNeedsTask()
680-
{
681-
return windowNeedsTask;
682-
}
683-
684673
public boolean isFlushable()
685674
{
686675
return flushable;

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/k3po/ext/behavior/ZillaEngine.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,6 @@ public void adviseInput(
128128
ChannelFuture handlerFuture,
129129
Object value)
130130
{
131-
// an inbound advise placed before `connected` (e.g., `read advise zilla:challenge`)
132-
// signals the script wants its frames written ahead of the accepted-side WINDOW
133-
channel.setWindowNeedsTask();
134131
submitTask(new AdviseInputTask(channel, handlerFuture, value));
135132
}
136133

@@ -299,6 +296,11 @@ void submitTask(
299296
submitTask(task, false);
300297
}
301298

299+
void drainTasks()
300+
{
301+
executeTasks();
302+
}
303+
302304
private void submitTask(
303305
Runnable task,
304306
boolean immediateIfAligned)

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/k3po/ext/behavior/ZillaPartition.java

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -310,33 +310,7 @@ private void handleBeginInitial(
310310
{
311311
if (future.isSuccess())
312312
{
313-
final ZillaChannelConfig childConfig = childChannel.getConfig();
314-
final boolean updateNeedsWindow =
315-
childConfig.getUpdate() == ZillaUpdateMode.HANDSHAKE ||
316-
childConfig.getUpdate() == ZillaUpdateMode.STREAM;
317-
318-
if (updateNeedsWindow && childChannel.windowNeedsTask())
319-
{
320-
// submit via task queue so the WINDOW write is ordered
321-
// after any AdviseInputTask queued by a `read advise`
322-
// statement that the script placed before `connected`,
323-
// and before any tasks queued by statements that follow
324-
// `connected` (since those run inside fireChannelConnected
325-
// below)
326-
childChannel.engine.submitTask(() -> sender.doWindow(childChannel));
327-
fireChannelConnected(childChannel, childChannel.getRemoteAddress());
328-
}
329-
else
330-
{
331-
// no `read advise` ahead of `connected`: preserve the
332-
// pre-Phase-1 ordering so the script's post-`connected`
333-
// listeners are registered before WINDOW is written
334-
fireChannelConnected(childChannel, childChannel.getRemoteAddress());
335-
if (updateNeedsWindow)
336-
{
337-
sender.doWindow(childChannel);
338-
}
339-
}
313+
fireChannelConnected(childChannel, childChannel.getRemoteAddress());
340314
}
341315
});
342316
}

runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/k3po/ext/behavior/ZillaStreamFactory.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -201,14 +201,15 @@ private void onBegin(
201201
if (config.getUpdate() == ZillaUpdateMode.HANDSHAKE ||
202202
config.getUpdate() == ZillaUpdateMode.STREAM)
203203
{
204-
if (channel.getParent() == null)
204+
if (channel.getParent() != null)
205205
{
206-
sender.doWindow(channel);
206+
// accept-side child: drain any tasks queued during the
207+
// beginInputFuture listener chain (e.g., AdviseInputTask
208+
// from a `read advise` placed before `connected`) so their
209+
// frames land on the streamsBuffer ahead of this WINDOW
210+
channel.engine.drainTasks();
207211
}
208-
// accept-side child channels defer WINDOW to handleBeginInitial's
209-
// windowFuture listener so any advised statements that the script
210-
// places before `connected` (e.g. `read advise zilla:challenge`)
211-
// get a chance to write their frames first.
212+
sender.doWindow(channel);
212213
}
213214

214215
beginFuture.setSuccess();

0 commit comments

Comments
 (0)