@@ -70,20 +70,28 @@ function write_transport_layer(stream, response)
70
70
end
71
71
72
72
function read_transport_layer (stream)
73
- header_dict = Dict {String,String} ()
74
- line = chomp (readline (stream))
75
- # Check whether the socket was closed
76
- if line == " "
77
- return nothing
78
- end
79
- while length (line) > 0
80
- h_parts = split (line, " :" )
81
- header_dict[chomp (h_parts[1 ])] = chomp (h_parts[2 ])
73
+ try
74
+ header_dict = Dict {String,String} ()
82
75
line = chomp (readline (stream))
76
+ # Check whether the socket was closed
77
+ if line == " "
78
+ return nothing
79
+ end
80
+ while length (line) > 0
81
+ h_parts = split (line, " :" )
82
+ header_dict[chomp (h_parts[1 ])] = chomp (h_parts[2 ])
83
+ line = chomp (readline (stream))
84
+ end
85
+ message_length = parse (Int, header_dict[" Content-Length" ])
86
+ message_str = String (read (stream, message_length))
87
+ return message_str
88
+ catch err
89
+ if err isa Base. IOError
90
+ return nothing
91
+ end
92
+
93
+ rethrow (err)
83
94
end
84
- message_length = parse (Int, header_dict[" Content-Length" ])
85
- message_str = String (read (stream, message_length))
86
- return message_str
87
95
end
88
96
89
97
Base. isopen (x:: JSONRPCEndpoint ) = x. status != :closed && isopen (x. pipe_in) && isopen (x. pipe_out)
@@ -114,41 +122,41 @@ function Base.run(x::JSONRPCEndpoint)
114
122
end
115
123
116
124
x. read_task = @async try
117
- try
118
- while true
119
- message = read_transport_layer (x. pipe_in)
125
+ while true
126
+ message = read_transport_layer (x. pipe_in)
120
127
121
- if message === nothing || x. status == :closed
122
- break
123
- end
128
+ if message === nothing || x. status == :closed
129
+ break
130
+ end
124
131
125
- message_dict = JSON. parse (message)
126
-
127
- if haskey (message_dict, " method" )
128
- try
129
- put! (x. in_msg_queue, message_dict)
130
- catch err
131
- if err isa InvalidStateException
132
- break
133
- else
134
- rethrow (err)
135
- end
136
- end
137
- else
138
- # This must be a response
139
- id_of_request = message_dict[" id" ]
132
+ message_dict = JSON. parse (message)
140
133
141
- channel_for_response = x. outstanding_requests[id_of_request]
142
- put! (channel_for_response, message_dict)
134
+ if haskey (message_dict, " method" )
135
+ try
136
+ put! (x. in_msg_queue, message_dict)
137
+ catch err
138
+ if err isa InvalidStateException
139
+ break
140
+ else
141
+ rethrow (err)
142
+ end
143
143
end
144
+ else
145
+ # This must be a response
146
+ id_of_request = message_dict[" id" ]
147
+
148
+ channel_for_response = x. outstanding_requests[id_of_request]
149
+ put! (channel_for_response, message_dict)
144
150
end
145
- finally
146
- close (x. in_msg_queue)
147
151
end
152
+
153
+ close (x. in_msg_queue)
148
154
149
155
for i in values (x. outstanding_requests)
150
156
close (i)
151
157
end
158
+
159
+ x. status = :closed
152
160
catch err
153
161
bt = catch_backtrace ()
154
162
if x. err_handler != = nothing
@@ -243,6 +251,8 @@ function send_error_response(endpoint, original_request, code, message, data)
243
251
end
244
252
245
253
function Base. close (endpoint:: JSONRPCEndpoint )
254
+ endpoint. status == :closed && return
255
+
246
256
flush (endpoint)
247
257
248
258
endpoint. status = :closed
0 commit comments