Skip to content

Commit efbebdf

Browse files
authored
Merge pull request #489 from TheLortex/simultaneous-close-fix
Fixing memory leaks when a simultaneous close is happening
2 parents afa354f + aac0e02 commit efbebdf

11 files changed

+311
-167
lines changed

CHANGES.md

+9
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
### v7.1.2 (2022-07-27)
2+
3+
* TCP: fix memory leaks on connection close in three scenarios (#489 @TheLortex)
4+
- simultanous close: set up the timewait timer in the `Closing(1) - Recv_ack(2) -> Time_wait`
5+
state transition
6+
- client sends a RST instead of a FIN: enable sending a challenge ACK even when the reception
7+
thread is stopped
8+
- client doesn't ACK server's FIN: enable the retransmit timer in the `Closing(_)` state
9+
110
### v7.1.1 (2022-05-24)
211

312
* Ndpv6: demote more logs to debug level (#480 @reynir)

src/tcp/flow.ml

+7-5
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ module Log = (val Logs.src_log src : Logs.LOG)
2323
module Make(Ip: Tcpip.Ip.S)(Time:Mirage_time.S)(Clock:Mirage_clock.MCLOCK)(Random:Mirage_random.S) =
2424
struct
2525

26-
module RXS = Segment.Rx(Time)
27-
module TXS = Segment.Tx(Time)(Clock)
2826
module ACK = Ack.Immediate
27+
module RXS = Segment.Rx(Time)(ACK)
28+
module TXS = Segment.Tx(Time)(Clock)
2929
module UTX = User_buffer.Tx(Time)(Clock)
3030
module WIRE = Wire.Make(Ip)
3131
module STATE = State.Make(Time)
@@ -75,6 +75,8 @@ struct
7575
connects: (WIRE.t, ((connection, error) result Lwt.u * Sequence.t * Tcpip.Tcp.Keepalive.t option)) Hashtbl.t;
7676
}
7777

78+
let num_open_channels t = Hashtbl.length t.channels
79+
7880
let listen t ~port ?keepalive cb =
7981
if port < 0 || port > 65535 then
8082
raise (Invalid_argument (Printf.sprintf "invalid port number (%d)" port))
@@ -356,11 +358,11 @@ struct
356358
let txq, _tx_t =
357359
TXS.create ~xmit:(Tx.xmit_pcb t.ip id) ~wnd ~state ~rx_ack ~tx_ack ~tx_wnd_update
358360
in
359-
(* The user application transmit buffer *)
360-
let utx = UTX.create ~wnd ~txq ~max_size:16384l in
361-
let rxq = RXS.create ~rx_data ~wnd ~state ~tx_ack in
362361
(* Set up ACK module *)
363362
let ack = ACK.t ~send_ack ~last:(Sequence.succ rx_isn) in
363+
(* The user application transmit buffer *)
364+
let utx = UTX.create ~wnd ~txq ~max_size:16384l in
365+
let rxq = RXS.create ~rx_data ~ack ~wnd ~state ~tx_ack in
364366
(* Set up the keepalive state if requested *)
365367
let keepalive = match keepalive with
366368
| None -> None

src/tcp/flow.mli

+6
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,10 @@ module Make (IP:Tcpip.Ip.S)
2020
(R:Mirage_random.S) : sig
2121
include Tcpip.Tcp.S with type ipaddr = IP.ipaddr
2222
val connect : IP.t -> t Lwt.t
23+
24+
(**/**)
25+
(* the number of open connections *)
26+
val num_open_channels : t -> int
27+
(**/**)
28+
2329
end

src/tcp/segment.ml

+6-8
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ let rec reset_seq segs =
5353
It also looks for control messages and dispatches them to
5454
the Rtx queue to ack messages or close channels.
5555
*)
56-
module Rx(Time:Mirage_time.S) = struct
56+
module Rx(Time:Mirage_time.S)(ACK: Ack.M) = struct
5757
open Tcp_packet
5858
module StateTick = State.Make(Time)
5959

@@ -82,14 +82,15 @@ module Rx(Time:Mirage_time.S) = struct
8282
type t = {
8383
mutable segs: S.t;
8484
rx_data: (Cstruct.t list option * Sequence.t option) Lwt_mvar.t; (* User receive channel *)
85+
ack: ACK.t;
8586
tx_ack: (Sequence.t * int) Lwt_mvar.t; (* Acks of our transmitted segs *)
8687
wnd: Window.t;
8788
state: State.t;
8889
}
8990

90-
let create ~rx_data ~wnd ~state ~tx_ack =
91+
let create ~rx_data ~ack ~wnd ~state ~tx_ack =
9192
let segs = S.empty in
92-
{ segs; rx_data; tx_ack; wnd; state }
93+
{ segs; rx_data; ack; tx_ack; wnd; state }
9394

9495
let pp fmt t =
9596
let pp_v fmt seg =
@@ -133,10 +134,7 @@ module Rx(Time:Mirage_time.S) = struct
133134

134135
let send_challenge_ack q =
135136
(* TODO: rfc5961 ACK Throttling *)
136-
(* Is this the correct way trigger an ack? *)
137-
if Lwt_mvar.is_empty q.rx_data
138-
then Lwt_mvar.put q.rx_data (Some [], Some Sequence.zero)
139-
else Lwt.return_unit
137+
ACK.pushack q.ack Sequence.zero
140138

141139
(* Given an input segment, the window information, and a receive
142140
queue, update the window, extract any ready segments into the
@@ -285,7 +283,7 @@ module Tx (Time:Mirage_time.S) (Clock:Mirage_clock.MCLOCK) = struct
285283
let ontimer xmit st segs wnd seq =
286284
match State.state st with
287285
| State.Syn_rcvd _ | State.Established | State.Fin_wait_1 _
288-
| State.Close_wait | State.Last_ack _ ->
286+
| State.Close_wait | State.Closing _ | State.Last_ack _ ->
289287
begin match peek_opt_l segs with
290288
| None -> Lwt.return Tcptimer.Stoptimer
291289
| Some rexmit_seg ->

src/tcp/segment.mli

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
the Rtx queue to ack messages or close channels.
2525
*)
2626

27-
module Rx (T:Mirage_time.S) : sig
27+
module Rx (T:Mirage_time.S)(ACK:Ack.M) : sig
2828

2929
type segment = { header: Tcp_packet.t; payload: Cstruct.t }
3030
(** Individual received TCP segment *)
@@ -38,6 +38,7 @@ module Rx (T:Mirage_time.S) : sig
3838

3939
val create:
4040
rx_data:(Cstruct.t list option * Sequence.t option) Lwt_mvar.t ->
41+
ack:ACK.t ->
4142
wnd:Window.t ->
4243
state:State.t ->
4344
tx_ack:(Sequence.t * int) Lwt_mvar.t ->

src/tcp/state.ml

+9-4
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ module Make(Time:Mirage_time.S) = struct
119119
t.on_close ();
120120
Lwt.return_unit
121121

122+
let transition_to_timewait t =
123+
Lwt.async (fun () -> timewait t time_wait_time);
124+
Time_wait
125+
122126
let tick t (i:action) =
123127
let diffone x y = Sequence.succ y = x in
124128
let tstr s (i:action) =
@@ -148,10 +152,11 @@ module Make(Time:Mirage_time.S) = struct
148152
| Fin_wait_1 _, Recv_rst -> t.on_close (); Reset
149153
| Fin_wait_2 i, Recv_ack _ -> Fin_wait_2 (i + 1)
150154
| Fin_wait_2 _, Recv_rst -> t.on_close (); Reset
151-
| Fin_wait_2 _, Recv_fin ->
152-
Lwt.async (fun () -> timewait t time_wait_time);
153-
Time_wait
154-
| Closing a, Recv_ack b -> if diffone b a then Time_wait else Closing a
155+
| Fin_wait_2 _, Recv_fin -> transition_to_timewait t
156+
| Closing a, Recv_ack b ->
157+
if diffone b a then
158+
transition_to_timewait t
159+
else Closing a
155160
| Closing _, Timeout -> t.on_close (); Closed
156161
| Closing _, Recv_rst -> t.on_close (); Reset
157162
| Time_wait, Timeout -> t.on_close (); Closed

test/low_level.ml

+150
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
open Lwt.Infix
2+
3+
(*
4+
* Connects two stacks to the same backend.
5+
* One is a complete v4 stack (the system under test, referred to as [sut]).
6+
* The other gives us low level access to inject crafted TCP packets,
7+
* and sends and receives crafted packets to check the [sut] behavior.
8+
*)
9+
module VNETIF_STACK = Vnetif_common.VNETIF_STACK(Vnetif_backends.Basic)
10+
11+
module Time = Vnetif_common.Time
12+
module V = Vnetif.Make(Vnetif_backends.Basic)
13+
module E = Ethernet.Make(V)
14+
module A = Arp.Make(E)(Time)
15+
module I = Static_ipv4.Make(Mirage_random_test)(Vnetif_common.Clock)(E)(A)
16+
module Wire = Tcp.Wire
17+
module WIRE = Wire.Make(I)
18+
module Tcp_wire = Tcp.Tcp_wire
19+
module Tcp_unmarshal = Tcp.Tcp_packet.Unmarshal
20+
module Sequence = Tcp.Sequence
21+
22+
let sut_cidr = Ipaddr.V4.Prefix.of_string_exn "10.0.0.101/24"
23+
let server_ip = Ipaddr.V4.of_string_exn "10.0.0.100"
24+
let server_cidr = Ipaddr.V4.Prefix.make 24 server_ip
25+
let gateway = Ipaddr.V4.of_string_exn "10.0.0.1"
26+
27+
let header_size = Ethernet.Packet.sizeof_ethernet
28+
29+
30+
31+
(* defaults when injecting packets *)
32+
let options = []
33+
let window = 5120
34+
35+
(* Helper functions *)
36+
let reply_id_from ~src ~dst data =
37+
let sport = Tcp_wire.get_tcp_src_port data in
38+
let dport = Tcp_wire.get_tcp_dst_port data in
39+
WIRE.v ~dst_port:sport ~dst:src ~src_port:dport ~src:dst
40+
41+
let ack_for data =
42+
match Tcp_unmarshal.of_cstruct data with
43+
| Error s -> Alcotest.fail ("attempting to ack data: " ^ s)
44+
| Ok (packet, data) ->
45+
let open Tcp.Tcp_packet in
46+
let data_len =
47+
Sequence.of_int ((Cstruct.length data) +
48+
(if packet.fin then 1 else 0) +
49+
(if packet.syn then 1 else 0)) in
50+
let sequence = packet.sequence in
51+
let ack_n = Sequence.(add sequence data_len) in
52+
ack_n
53+
54+
let ack data =
55+
Some(ack_for data)
56+
57+
let ack_in_future data off =
58+
Some Sequence.(add (ack_for data) (of_int off))
59+
60+
let ack_from_past data off =
61+
Some Sequence.(sub (ack_for data) (of_int off))
62+
63+
let fail_result_not_expected fail = function
64+
| Error _err ->
65+
fail "error not expected"
66+
| Ok `Eof ->
67+
fail "eof"
68+
| Ok (`Data data) ->
69+
Alcotest.fail (Format.asprintf "data not expected but received: %a"
70+
Cstruct.hexdump_pp data)
71+
72+
73+
74+
let create_sut_stack backend =
75+
VNETIF_STACK.create_stack ~cidr:sut_cidr ~gateway backend
76+
77+
let create_raw_stack backend =
78+
V.connect backend >>= fun netif ->
79+
E.connect netif >>= fun ethif ->
80+
A.connect ethif >>= fun arpv4 ->
81+
I.connect ~cidr:server_cidr ~gateway ethif arpv4 >>= fun ip ->
82+
Lwt.return (netif, ethif, arpv4, ip)
83+
84+
type 'state fsm_result =
85+
| Fsm_next of 'state
86+
| Fsm_done
87+
| Fsm_error of string
88+
89+
(* This could be moved to a common module and reused for other low level tcp tests *)
90+
91+
(* setups network and run a given sut and raw fsm *)
92+
let run backend fsm sut () =
93+
let initial_state, fsm_handler = fsm in
94+
create_sut_stack backend >>= fun stackv4 ->
95+
create_raw_stack backend >>= fun (netif, ethif, arp, rawip) ->
96+
let error_mbox = Lwt_mvar.create_empty () in
97+
let stream, pushf = Lwt_stream.create () in
98+
Lwt.pick [
99+
VNETIF_STACK.Stackv4.listen stackv4;
100+
101+
(* Consume TCP packets one by one, in sequence *)
102+
let rec fsm_thread state =
103+
Lwt_stream.next stream >>= fun (src, dst, data) ->
104+
fsm_handler rawip state ~src ~dst data >>= function
105+
| Fsm_next s ->
106+
fsm_thread s
107+
| Fsm_done ->
108+
Lwt.return_unit
109+
| Fsm_error err ->
110+
Lwt_mvar.put error_mbox err >>= fun () ->
111+
(* it will be terminated anyway when the error is picked up *)
112+
fsm_thread state in
113+
114+
Lwt.async (fun () ->
115+
(V.listen netif ~header_size
116+
(E.input
117+
~arpv4:(A.input arp)
118+
~ipv4:(I.input
119+
~tcp: (fun ~src ~dst data -> pushf (Some(src,dst,data)); Lwt.return_unit)
120+
~udp:(fun ~src:_ ~dst:_ _data -> Lwt.return_unit)
121+
~default:(fun ~proto ~src ~dst _data ->
122+
Logs.debug (fun f -> f "default handler invoked for packet from %a to %a, protocol %d -- dropping" Ipaddr.V4.pp src Ipaddr.V4.pp dst proto); Lwt.return_unit)
123+
rawip
124+
)
125+
~ipv6:(fun _buf ->
126+
Logs.debug (fun f -> f "IPv6 packet -- dropping");
127+
Lwt.return_unit)
128+
ethif) ) >|= fun _ -> ());
129+
130+
(* Either both fsm and the sut terminates, or a timeout occurs, or one of the sut/fsm informs an error *)
131+
Lwt.pick [
132+
(Time.sleep_ns (Duration.of_sec 5) >>= fun () ->
133+
Lwt.return_some "timed out");
134+
135+
(Lwt.join [
136+
(fsm_thread initial_state);
137+
138+
(* time to let the other end connects to the network and listen.
139+
* Otherwise initial syn might need to be repeated slowing down the test *)
140+
(Time.sleep_ns (Duration.of_ms 100) >>= fun () ->
141+
sut stackv4 (Lwt_mvar.put error_mbox) >>= fun _ ->
142+
Time.sleep_ns (Duration.of_ms 100));
143+
] >>= fun () -> Lwt.return_none);
144+
145+
(Lwt_mvar.take error_mbox >>= fun cause ->
146+
Lwt.return_some cause);
147+
] >|= function
148+
| None -> ()
149+
| Some err -> Alcotest.fail err
150+
]

test/test.ml

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ let suite = [
3131
"iperf" , Test_iperf.suite ;
3232
"iperf_ipv6" , Test_iperf_ipv6.suite ;
3333
"keepalive" , Test_keepalive.suite ;
34+
"simultaneous_close", Test_simulatenous_close.suite
3435
]
3536

3637
let run test () =

0 commit comments

Comments
 (0)