Skip to content

Commit a056d3b

Browse files
jfallowsclaude
andauthored
feat(command-dump): add REDIRECT control frame support (#1759)
* feat(command-dump): add REDIRECT control frame support Capture the engine's REDIRECT frame (type id 0x40000005, introduced in #1755) in the dump command's pcap output and surface it in the zilla.lua Wireshark dissector. Like RESET, REDIRECT terminates the stream, so it is mapped to a TCP PSH segment. Like BEGIN, the frame carries an int64 affinity and a round-tripped extension, so each binding's extension dissector treats REDIRECT the same as BEGIN. Adds k3po scripts for the client-sent redirect scenario; the IT method is wired but @ignore'd until its tshark-generated expected fixture is regenerated on a docker-enabled host. https://claude.ai/code/session_01K4wd5z8Ejgq4ejTsmL88oh * fix(engine): align REDIRECT teardown with RESET in k3po-zilla transport Per the engine ABI from #1755, REDIRECT terminates the stream like RESET (EngineWorker dispatcher.remove + Target streams[].remove for both RedirectFW.TYPE_ID and ResetFW.TYPE_ID). The k3po-zilla test transport was leaving the channel in an inconsistent state on both ends, so follow-up cleanup or script steps could emit END / ABORT frames against a stream the engine had already torn down. This produced inconsistent zilla dump pcap output for redirect scenarios. Mirror the existing RESET pattern exactly: - `ZillaSource.doAdviseInput` (ADVISORY_REDIRECT branch) — the caller of `streamFactory.doRedirect` now closes the local READ side after the frame is written, mirroring `doAbortInputAfterBegin` for RESET. The frame writer (`ZillaTarget.doRedirect`) stays state-free, mirroring `ZillaTarget.doReset` / `doAbortInput`. - `Throttle.onRedirect` — on the receiver side, mirror `Throttle.onReset` exactly: `setWriteAborted` + `setWriteClosed` + `fireOutputAborted` + the disconnect / unbind / close events. The receiver is the source of the stream (REDIRECT travels in the throttle direction, opposite the BEGIN), so its WRITE side is what the redirect terminates — same as RESET semantics. The pre-existing `binding-http` `RedirectIT` (`read advise zilla:redirect` + `write aborted` on the application side, added in #1758) was authored against the previous buggy transport behavior and has been failing on develop ever since. With this fix in place it will need to be revisited in a follow-up — that test is unaffected by this commit and remains in the same failure state as on develop's HEAD. Verified: engine surefire 320/320 + failsafe 204/204 (incl. SimplexIT 37/37 with the PR #1755 REDIRECT test). https://claude.ai/code/session_01K4wd5z8Ejgq4ejTsmL88oh * test(command-dump): enable shouldReceiveClientSentRedirect Pair the redirect rpt scripts with `read abort` (client) and `write aborted` (server) so post-REDIRECT teardown is deterministic once the k3po-zilla transport aligns REDIRECT with RESET (#13ad202a1). Add the tshark fixture and unignore the IT method. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent b162be1 commit a056d3b

8 files changed

Lines changed: 358 additions & 9 deletions

File tree

incubator/command-dump/src/main/java/io/aklivity/zilla/runtime/command/dump/internal/airline/ZillaDumpCommand.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.ExtensionFW;
8080
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.FlushFW;
8181
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.FrameFW;
82+
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.RedirectFW;
8283
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.ResetFW;
8384
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.SignalFW;
8485
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.WindowFW;
@@ -216,6 +217,7 @@ public final class ZillaDumpCommand extends ZillaCommand
216217
private final FlushFW flushRO = new FlushFW();
217218
private final SignalFW signalRO = new SignalFW();
218219
private final ChallengeFW challengeRO = new ChallengeFW();
220+
private final RedirectFW redirectRO = new RedirectFW();
219221
private final PcapGlobalHeaderFW.Builder pcapGlobalHeaderRW = new PcapGlobalHeaderFW.Builder();
220222
private final PcapPacketHeaderFW.Builder pcapPacketHeaderRW = new PcapPacketHeaderFW.Builder();
221223
private final BeginFW.Builder beginRW = new BeginFW.Builder();
@@ -225,6 +227,7 @@ public final class ZillaDumpCommand extends ZillaCommand
225227
private final ResetFW.Builder resetRW = new ResetFW.Builder();
226228
private final FlushFW.Builder flushRW = new FlushFW.Builder();
227229
private final ChallengeFW.Builder challengeRW = new ChallengeFW.Builder();
230+
private final RedirectFW.Builder redirectRW = new RedirectFW.Builder();
228231
private final IPv6HeaderFW.Builder ipv6HeaderRW = new IPv6HeaderFW.Builder();
229232
private final IPv6JumboHeaderFW.Builder ipv6JumboHeaderRW = new IPv6JumboHeaderFW.Builder();
230233
private final TcpHeaderFW.Builder tcpHeaderRW = new TcpHeaderFW.Builder();
@@ -595,6 +598,9 @@ private boolean handleFrame(
595598
case ChallengeFW.TYPE_ID:
596599
onChallenge(challengeRO.wrap(buffer, index, index + length));
597600
break;
601+
case RedirectFW.TYPE_ID:
602+
onRedirect(redirectRO.wrap(buffer, index, index + length));
603+
break;
598604
default:
599605
break;
600606
}
@@ -740,6 +746,21 @@ private void onChallenge(
740746
}
741747
}
742748

749+
private void onRedirect(
750+
RedirectFW redirect)
751+
{
752+
if (allowedBinding.test(redirect.routedId()))
753+
{
754+
int offset = redirect.offset() - HEADER_LENGTH;
755+
final RedirectFW newRedirect = redirectRW.wrap(patchBuffer, 0, redirect.sizeof()).set(redirect).build();
756+
final ExtensionFW extension = newRedirect.extension().get(extensionRO::tryWrap);
757+
patchExtension(patchBuffer, extension, RedirectFW.FIELD_OFFSET_EXTENSION);
758+
759+
writeFrame(RedirectFW.TYPE_ID, worker, offset, newRedirect.originId(), newRedirect.routedId(),
760+
newRedirect.streamId(), newRedirect.timestamp(), newRedirect, PSH);
761+
}
762+
}
763+
743764
private void patchExtension(
744765
MutableDirectBuffer buffer,
745766
ExtensionFW extension,

incubator/command-dump/src/main/resources/io/aklivity/zilla/runtime/command/dump/internal/airline/zilla.lua

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ RESET_ID = 0x40000001
4141
WINDOW_ID = 0x40000002
4242
SIGNAL_ID = 0x40000003
4343
CHALLENGE_ID = 0x40000004
44+
REDIRECT_ID = 0x40000005
4445

4546
AMQP_ID = 0x112dc182
4647
FILESYSTEM_ID = 0xe4e6aa9e
@@ -867,6 +868,8 @@ function zilla_protocol.dissector(buffer, pinfo, tree)
867868
handle_signal_frame(buffer, next_offset, subtree, pinfo, info)
868869
elseif frame_type_id == ABORT_ID or frame_type_id == RESET_ID or frame_type_id == CHALLENGE_ID then
869870
handle_extension(buffer, subtree, pinfo, info, next_offset, frame_type_id)
871+
elseif frame_type_id == REDIRECT_ID then
872+
handle_redirect_frame(buffer, next_offset, subtree, pinfo, info)
870873
end
871874
end
872875

@@ -876,6 +879,12 @@ function handle_begin_frame(buffer, offset, subtree, pinfo, info)
876879
handle_extension(buffer, subtree, pinfo, info, offset + 8, BEGIN_ID)
877880
end
878881

882+
function handle_redirect_frame(buffer, offset, subtree, pinfo, info)
883+
local slice_affinity = buffer(offset, 8)
884+
subtree:add_le(fields.affinity, slice_affinity)
885+
handle_extension(buffer, subtree, pinfo, info, offset + 8, REDIRECT_ID)
886+
end
887+
879888
function handle_data_frame(buffer, offset, tree, subtree, sequence, acknowledge, maximum, pinfo, info, protocol_type)
880889
local slice_flags = buffer(offset, 1)
881890
local flags_label = string.format("Flags: 0x%02x", slice_flags:le_uint())
@@ -974,6 +983,7 @@ function resolve_frame_type(frame_type_id)
974983
elseif frame_type_id == WINDOW_ID then frame_type = "WINDOW"
975984
elseif frame_type_id == SIGNAL_ID then frame_type = "SIGNAL"
976985
elseif frame_type_id == CHALLENGE_ID then frame_type = "CHALLENGE"
986+
elseif frame_type_id == REDIRECT_ID then frame_type = "REDIRECT"
977987
end
978988
return frame_type
979989
end
@@ -1280,7 +1290,8 @@ function add_proxy_string_as_subtree(buffer, tree, label_format, slice_type_id,
12801290
end
12811291

12821292
function handle_http_extension(buffer, offset, ext_subtree, frame_type_id)
1283-
if frame_type_id == BEGIN_ID or frame_type_id == RESET_ID or frame_type_id == CHALLENGE_ID then
1293+
if frame_type_id == BEGIN_ID or frame_type_id == RESET_ID or frame_type_id == CHALLENGE_ID or
1294+
frame_type_id == REDIRECT_ID then
12841295
dissect_and_add_http_headers(buffer, offset, ext_subtree, "Headers", "Header")
12851296
elseif frame_type_id == END_ID then
12861297
dissect_and_add_http_headers(buffer, offset, ext_subtree, "Trailers", "Trailer")
@@ -1311,7 +1322,7 @@ function dissect_and_add_http_headers(buffer, offset, tree, plural_name, singula
13111322
end
13121323

13131324
function handle_grpc_extension(buffer, offset, ext_subtree, frame_type_id)
1314-
if frame_type_id == BEGIN_ID then
1325+
if frame_type_id == BEGIN_ID or frame_type_id == REDIRECT_ID then
13151326
handle_grpc_begin_extension(buffer, offset, ext_subtree)
13161327
elseif frame_type_id == DATA_ID then
13171328
handle_grpc_data_extension(buffer, offset, ext_subtree)
@@ -1464,7 +1475,7 @@ function decode_varuint32(buffer, offset)
14641475
end
14651476

14661477
function handle_sse_extension(buffer, offset, ext_subtree, frame_type_id)
1467-
if frame_type_id == BEGIN_ID then
1478+
if frame_type_id == BEGIN_ID or frame_type_id == REDIRECT_ID then
14681479
handle_sse_begin_extension(buffer, offset, ext_subtree)
14691480
elseif frame_type_id == DATA_ID then
14701481
handle_sse_data_extension(buffer, offset, ext_subtree)
@@ -1521,7 +1532,7 @@ function handle_sse_end_extension(buffer, offset, ext_subtree)
15211532
end
15221533

15231534
function handle_ws_extension(buffer, offset, ext_subtree, frame_type_id)
1524-
if frame_type_id == BEGIN_ID then
1535+
if frame_type_id == BEGIN_ID or frame_type_id == REDIRECT_ID then
15251536
handle_ws_begin_extension(buffer, offset, ext_subtree)
15261537
elseif frame_type_id == DATA_ID then
15271538
handle_ws_data_extension(buffer, offset, ext_subtree)
@@ -1629,12 +1640,13 @@ function handle_filesystem_extension(buffer, offset, ext_subtree)
16291640
end
16301641

16311642
function handle_mqtt_extension(buffer, offset, ext_subtree, frame_type_id)
1632-
if frame_type_id == BEGIN_ID or frame_type_id == DATA_ID or frame_type_id == FLUSH_ID then
1643+
if frame_type_id == BEGIN_ID or frame_type_id == DATA_ID or frame_type_id == FLUSH_ID or
1644+
frame_type_id == REDIRECT_ID then
16331645
local kind_length = 1
16341646
local slice_kind = buffer(offset, kind_length)
16351647
local kind = mqtt_ext_kinds[slice_kind:le_int()]
16361648
ext_subtree:add_le(fields.mqtt_ext_kind, slice_kind)
1637-
if frame_type_id == BEGIN_ID then
1649+
if frame_type_id == BEGIN_ID or frame_type_id == REDIRECT_ID then
16381650
if kind == "PUBLISH" then
16391651
handle_mqtt_begin_publish_extension(buffer, offset + kind_length, ext_subtree)
16401652
elseif kind == "SUBSCRIBE" then
@@ -2046,12 +2058,13 @@ function handle_mqtt_reset_extension(buffer, offset, ext_subtree)
20462058
end
20472059

20482060
function handle_kafka_extension(buffer, offset, ext_subtree, frame_type_id)
2049-
if frame_type_id == BEGIN_ID or frame_type_id == DATA_ID or frame_type_id == FLUSH_ID then
2061+
if frame_type_id == BEGIN_ID or frame_type_id == DATA_ID or frame_type_id == FLUSH_ID or
2062+
frame_type_id == REDIRECT_ID then
20502063
local api_length = 1
20512064
local slice_api = buffer(offset, api_length)
20522065
local api = kafka_ext_apis[slice_api:le_uint()]
20532066
ext_subtree:add_le(fields.kafka_ext_api, slice_api)
2054-
if frame_type_id == BEGIN_ID then
2067+
if frame_type_id == BEGIN_ID or frame_type_id == REDIRECT_ID then
20552068
if api == "CONSUMER" then
20562069
handle_kafka_begin_consumer_extension(buffer, offset + api_length, ext_subtree)
20572070
elseif api == "GROUP" then
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#
2+
# Copyright 2021-2024 Aklivity Inc
3+
#
4+
# Licensed under the Aklivity Community License (the "License"); you may not use
5+
# this file except in compliance with the License. You may obtain a copy of the
6+
# License at
7+
#
8+
# https://www.aklivity.io/aklivity-community-license/
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
# specific language governing permissions and limitations under the License.
14+
#
15+
16+
connect "zilla://streams/app0"
17+
option zilla:ephemeral "app0"
18+
option zilla:timestamps "false"
19+
option zilla:window 8192
20+
option zilla:transmission "duplex"
21+
22+
write zilla:begin.ext ${ws:beginEx()
23+
.typeId(zilla:id("ws"))
24+
.protocol(null)
25+
.scheme("http")
26+
.authority("localhost:8080")
27+
.path("/echo")
28+
.build()}
29+
30+
connected
31+
32+
write advised zilla:redirect 0x00000000000000b1L
33+
34+
read abort
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#
2+
# Copyright 2021-2024 Aklivity Inc
3+
#
4+
# Licensed under the Aklivity Community License (the "License"); you may not use
5+
# this file except in compliance with the License. You may obtain a copy of the
6+
# License at
7+
#
8+
# https://www.aklivity.io/aklivity-community-license/
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
# specific language governing permissions and limitations under the License.
14+
#
15+
16+
accept "zilla://streams/app0"
17+
option zilla:timestamps "false"
18+
option zilla:window 8192
19+
option zilla:transmission "duplex"
20+
accepted
21+
22+
read zilla:begin.ext ${ws:beginEx()
23+
.typeId(zilla:id("ws"))
24+
.protocol(null)
25+
.scheme("http")
26+
.authority("localhost:8080")
27+
.path("/echo")
28+
.build()}
29+
30+
connected
31+
32+
read advise zilla:redirect 0x00000000000000b1L
33+
34+
write aborted

incubator/command-dump/src/test/java/io/aklivity/zilla/runtime/command/dump/internal/WsAdvisoryApplicationIT.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,13 @@ public void shouldReceiveClientSentChallenge() throws Exception
4949
{
5050
k3po.finish();
5151
}
52+
53+
@Test
54+
@Specification({
55+
"${app}/client.sent.redirect/client",
56+
"${app}/client.sent.redirect/server" })
57+
public void shouldReceiveClientSentRedirect() throws Exception
58+
{
59+
k3po.finish();
60+
}
5261
}

0 commit comments

Comments
 (0)