Skip to content

Commit 8c8b397

Browse files
committed
feat(command-dump): add REDIRECT control frame support
Capture the engine's REDIRECT frame (type id 0x40000005, introduced in #1755) in the dump command's pcap output and surface it in the zilla.lua Wireshark dissector. Like RESET, REDIRECT terminates the stream, so it is mapped to a TCP PSH segment. Like BEGIN, the frame carries an int64 affinity and a round-tripped extension, so each binding's extension dissector treats REDIRECT the same as BEGIN. Adds k3po scripts for the client-sent redirect scenario; the IT method is wired but @ignore'd until its tshark-generated expected fixture is regenerated on a docker-enabled host. https://claude.ai/code/session_01K4wd5z8Ejgq4ejTsmL88oh
1 parent b162be1 commit 8c8b397

5 files changed

Lines changed: 117 additions & 8 deletions

File tree

  • incubator/command-dump/src

incubator/command-dump/src/main/java/io/aklivity/zilla/runtime/command/dump/internal/airline/ZillaDumpCommand.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.ExtensionFW;
8080
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.FlushFW;
8181
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.FrameFW;
82+
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.RedirectFW;
8283
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.ResetFW;
8384
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.SignalFW;
8485
import io.aklivity.zilla.runtime.command.dump.internal.types.stream.WindowFW;
@@ -216,6 +217,7 @@ public final class ZillaDumpCommand extends ZillaCommand
216217
private final FlushFW flushRO = new FlushFW();
217218
private final SignalFW signalRO = new SignalFW();
218219
private final ChallengeFW challengeRO = new ChallengeFW();
220+
private final RedirectFW redirectRO = new RedirectFW();
219221
private final PcapGlobalHeaderFW.Builder pcapGlobalHeaderRW = new PcapGlobalHeaderFW.Builder();
220222
private final PcapPacketHeaderFW.Builder pcapPacketHeaderRW = new PcapPacketHeaderFW.Builder();
221223
private final BeginFW.Builder beginRW = new BeginFW.Builder();
@@ -225,6 +227,7 @@ public final class ZillaDumpCommand extends ZillaCommand
225227
private final ResetFW.Builder resetRW = new ResetFW.Builder();
226228
private final FlushFW.Builder flushRW = new FlushFW.Builder();
227229
private final ChallengeFW.Builder challengeRW = new ChallengeFW.Builder();
230+
private final RedirectFW.Builder redirectRW = new RedirectFW.Builder();
228231
private final IPv6HeaderFW.Builder ipv6HeaderRW = new IPv6HeaderFW.Builder();
229232
private final IPv6JumboHeaderFW.Builder ipv6JumboHeaderRW = new IPv6JumboHeaderFW.Builder();
230233
private final TcpHeaderFW.Builder tcpHeaderRW = new TcpHeaderFW.Builder();
@@ -595,6 +598,9 @@ private boolean handleFrame(
595598
case ChallengeFW.TYPE_ID:
596599
onChallenge(challengeRO.wrap(buffer, index, index + length));
597600
break;
601+
case RedirectFW.TYPE_ID:
602+
onRedirect(redirectRO.wrap(buffer, index, index + length));
603+
break;
598604
default:
599605
break;
600606
}
@@ -740,6 +746,21 @@ private void onChallenge(
740746
}
741747
}
742748

749+
private void onRedirect(
750+
RedirectFW redirect)
751+
{
752+
if (allowedBinding.test(redirect.routedId()))
753+
{
754+
int offset = redirect.offset() - HEADER_LENGTH;
755+
final RedirectFW newRedirect = redirectRW.wrap(patchBuffer, 0, redirect.sizeof()).set(redirect).build();
756+
final ExtensionFW extension = newRedirect.extension().get(extensionRO::tryWrap);
757+
patchExtension(patchBuffer, extension, RedirectFW.FIELD_OFFSET_EXTENSION);
758+
759+
writeFrame(RedirectFW.TYPE_ID, worker, offset, newRedirect.originId(), newRedirect.routedId(),
760+
newRedirect.streamId(), newRedirect.timestamp(), newRedirect, PSH);
761+
}
762+
}
763+
743764
private void patchExtension(
744765
MutableDirectBuffer buffer,
745766
ExtensionFW extension,

incubator/command-dump/src/main/resources/io/aklivity/zilla/runtime/command/dump/internal/airline/zilla.lua

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ RESET_ID = 0x40000001
4141
WINDOW_ID = 0x40000002
4242
SIGNAL_ID = 0x40000003
4343
CHALLENGE_ID = 0x40000004
44+
REDIRECT_ID = 0x40000005
4445

4546
AMQP_ID = 0x112dc182
4647
FILESYSTEM_ID = 0xe4e6aa9e
@@ -867,6 +868,8 @@ function zilla_protocol.dissector(buffer, pinfo, tree)
867868
handle_signal_frame(buffer, next_offset, subtree, pinfo, info)
868869
elseif frame_type_id == ABORT_ID or frame_type_id == RESET_ID or frame_type_id == CHALLENGE_ID then
869870
handle_extension(buffer, subtree, pinfo, info, next_offset, frame_type_id)
871+
elseif frame_type_id == REDIRECT_ID then
872+
handle_redirect_frame(buffer, next_offset, subtree, pinfo, info)
870873
end
871874
end
872875

@@ -876,6 +879,12 @@ function handle_begin_frame(buffer, offset, subtree, pinfo, info)
876879
handle_extension(buffer, subtree, pinfo, info, offset + 8, BEGIN_ID)
877880
end
878881

882+
function handle_redirect_frame(buffer, offset, subtree, pinfo, info)
883+
local slice_affinity = buffer(offset, 8)
884+
subtree:add_le(fields.affinity, slice_affinity)
885+
handle_extension(buffer, subtree, pinfo, info, offset + 8, REDIRECT_ID)
886+
end
887+
879888
function handle_data_frame(buffer, offset, tree, subtree, sequence, acknowledge, maximum, pinfo, info, protocol_type)
880889
local slice_flags = buffer(offset, 1)
881890
local flags_label = string.format("Flags: 0x%02x", slice_flags:le_uint())
@@ -974,6 +983,7 @@ function resolve_frame_type(frame_type_id)
974983
elseif frame_type_id == WINDOW_ID then frame_type = "WINDOW"
975984
elseif frame_type_id == SIGNAL_ID then frame_type = "SIGNAL"
976985
elseif frame_type_id == CHALLENGE_ID then frame_type = "CHALLENGE"
986+
elseif frame_type_id == REDIRECT_ID then frame_type = "REDIRECT"
977987
end
978988
return frame_type
979989
end
@@ -1280,7 +1290,8 @@ function add_proxy_string_as_subtree(buffer, tree, label_format, slice_type_id,
12801290
end
12811291

12821292
function handle_http_extension(buffer, offset, ext_subtree, frame_type_id)
1283-
if frame_type_id == BEGIN_ID or frame_type_id == RESET_ID or frame_type_id == CHALLENGE_ID then
1293+
if frame_type_id == BEGIN_ID or frame_type_id == RESET_ID or frame_type_id == CHALLENGE_ID or
1294+
frame_type_id == REDIRECT_ID then
12841295
dissect_and_add_http_headers(buffer, offset, ext_subtree, "Headers", "Header")
12851296
elseif frame_type_id == END_ID then
12861297
dissect_and_add_http_headers(buffer, offset, ext_subtree, "Trailers", "Trailer")
@@ -1311,7 +1322,7 @@ function dissect_and_add_http_headers(buffer, offset, tree, plural_name, singula
13111322
end
13121323

13131324
function handle_grpc_extension(buffer, offset, ext_subtree, frame_type_id)
1314-
if frame_type_id == BEGIN_ID then
1325+
if frame_type_id == BEGIN_ID or frame_type_id == REDIRECT_ID then
13151326
handle_grpc_begin_extension(buffer, offset, ext_subtree)
13161327
elseif frame_type_id == DATA_ID then
13171328
handle_grpc_data_extension(buffer, offset, ext_subtree)
@@ -1464,7 +1475,7 @@ function decode_varuint32(buffer, offset)
14641475
end
14651476

14661477
function handle_sse_extension(buffer, offset, ext_subtree, frame_type_id)
1467-
if frame_type_id == BEGIN_ID then
1478+
if frame_type_id == BEGIN_ID or frame_type_id == REDIRECT_ID then
14681479
handle_sse_begin_extension(buffer, offset, ext_subtree)
14691480
elseif frame_type_id == DATA_ID then
14701481
handle_sse_data_extension(buffer, offset, ext_subtree)
@@ -1521,7 +1532,7 @@ function handle_sse_end_extension(buffer, offset, ext_subtree)
15211532
end
15221533

15231534
function handle_ws_extension(buffer, offset, ext_subtree, frame_type_id)
1524-
if frame_type_id == BEGIN_ID then
1535+
if frame_type_id == BEGIN_ID or frame_type_id == REDIRECT_ID then
15251536
handle_ws_begin_extension(buffer, offset, ext_subtree)
15261537
elseif frame_type_id == DATA_ID then
15271538
handle_ws_data_extension(buffer, offset, ext_subtree)
@@ -1629,12 +1640,13 @@ function handle_filesystem_extension(buffer, offset, ext_subtree)
16291640
end
16301641

16311642
function handle_mqtt_extension(buffer, offset, ext_subtree, frame_type_id)
1632-
if frame_type_id == BEGIN_ID or frame_type_id == DATA_ID or frame_type_id == FLUSH_ID then
1643+
if frame_type_id == BEGIN_ID or frame_type_id == DATA_ID or frame_type_id == FLUSH_ID or
1644+
frame_type_id == REDIRECT_ID then
16331645
local kind_length = 1
16341646
local slice_kind = buffer(offset, kind_length)
16351647
local kind = mqtt_ext_kinds[slice_kind:le_int()]
16361648
ext_subtree:add_le(fields.mqtt_ext_kind, slice_kind)
1637-
if frame_type_id == BEGIN_ID then
1649+
if frame_type_id == BEGIN_ID or frame_type_id == REDIRECT_ID then
16381650
if kind == "PUBLISH" then
16391651
handle_mqtt_begin_publish_extension(buffer, offset + kind_length, ext_subtree)
16401652
elseif kind == "SUBSCRIBE" then
@@ -2046,12 +2058,13 @@ function handle_mqtt_reset_extension(buffer, offset, ext_subtree)
20462058
end
20472059

20482060
function handle_kafka_extension(buffer, offset, ext_subtree, frame_type_id)
2049-
if frame_type_id == BEGIN_ID or frame_type_id == DATA_ID or frame_type_id == FLUSH_ID then
2061+
if frame_type_id == BEGIN_ID or frame_type_id == DATA_ID or frame_type_id == FLUSH_ID or
2062+
frame_type_id == REDIRECT_ID then
20502063
local api_length = 1
20512064
local slice_api = buffer(offset, api_length)
20522065
local api = kafka_ext_apis[slice_api:le_uint()]
20532066
ext_subtree:add_le(fields.kafka_ext_api, slice_api)
2054-
if frame_type_id == BEGIN_ID then
2067+
if frame_type_id == BEGIN_ID or frame_type_id == REDIRECT_ID then
20552068
if api == "CONSUMER" then
20562069
handle_kafka_begin_consumer_extension(buffer, offset + api_length, ext_subtree)
20572070
elseif api == "GROUP" then
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#
2+
# Copyright 2021-2024 Aklivity Inc
3+
#
4+
# Licensed under the Aklivity Community License (the "License"); you may not use
5+
# this file except in compliance with the License. You may obtain a copy of the
6+
# License at
7+
#
8+
# https://www.aklivity.io/aklivity-community-license/
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
# specific language governing permissions and limitations under the License.
14+
#
15+
16+
connect "zilla://streams/app0"
17+
option zilla:ephemeral "app0"
18+
option zilla:timestamps "false"
19+
option zilla:window 8192
20+
option zilla:transmission "duplex"
21+
22+
write zilla:begin.ext ${ws:beginEx()
23+
.typeId(zilla:id("ws"))
24+
.protocol(null)
25+
.scheme("http")
26+
.authority("localhost:8080")
27+
.path("/echo")
28+
.build()}
29+
30+
connected
31+
32+
write advised zilla:redirect 0x00000000000000b1L
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#
2+
# Copyright 2021-2024 Aklivity Inc
3+
#
4+
# Licensed under the Aklivity Community License (the "License"); you may not use
5+
# this file except in compliance with the License. You may obtain a copy of the
6+
# License at
7+
#
8+
# https://www.aklivity.io/aklivity-community-license/
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
# specific language governing permissions and limitations under the License.
14+
#
15+
16+
accept "zilla://streams/app0"
17+
option zilla:timestamps "false"
18+
option zilla:window 8192
19+
option zilla:transmission "duplex"
20+
accepted
21+
22+
read zilla:begin.ext ${ws:beginEx()
23+
.typeId(zilla:id("ws"))
24+
.protocol(null)
25+
.scheme("http")
26+
.authority("localhost:8080")
27+
.path("/echo")
28+
.build()}
29+
30+
connected
31+
32+
read advise zilla:redirect 0x00000000000000b1L

incubator/command-dump/src/test/java/io/aklivity/zilla/runtime/command/dump/internal/WsAdvisoryApplicationIT.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import static java.util.concurrent.TimeUnit.SECONDS;
1818
import static org.junit.rules.RuleChain.outerRule;
1919

20+
import org.junit.Ignore;
2021
import org.junit.Rule;
2122
import org.junit.Test;
2223
import org.junit.rules.DisableOnDebug;
@@ -49,4 +50,14 @@ public void shouldReceiveClientSentChallenge() throws Exception
4950
{
5051
k3po.finish();
5152
}
53+
54+
@Ignore("Pending regeneration of WsAdvisoryApplicationIT_shouldReceiveClientSentRedirect.txt via tshark")
55+
@Test
56+
@Specification({
57+
"${app}/client.sent.redirect/client",
58+
"${app}/client.sent.redirect/server" })
59+
public void shouldReceiveClientSentRedirect() throws Exception
60+
{
61+
k3po.finish();
62+
}
5263
}

0 commit comments

Comments
 (0)