Skip to content

Commit b4ed17d

Browse files
authored
Merge pull request #8 from senid231/use-own-socket-wrapper
use own socket wrapper
2 parents 2d8b060 + 93539e9 commit b4ed17d

9 files changed

+209
-25
lines changed

jrpc.gemspec

-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ Gem::Specification.new do |spec|
1818
spec.require_paths = ['lib']
1919

2020
spec.add_dependency 'netstring', '~> 0'
21-
spec.add_dependency 'net_tcp_client', '~> 2.0'
2221
spec.add_dependency 'oj', '~> 2.0'
2322

2423
spec.add_development_dependency 'bundler', '~> 1.10'

lib/jrpc.rb

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
require 'jrpc/version'
22
require 'jrpc/utils'
33
require 'jrpc/base_client'
4+
require 'jrpc/transport/socket_base'
5+
require 'jrpc/transport/socket_tcp'
46
require 'jrpc/tcp_client'
57
require 'jrpc/error/error'
68
require 'jrpc/error/connection_error'

lib/jrpc/base_client.rb

+7
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ class BaseClient
99
ID_CHARACTERS = (('a'..'z').to_a + ('0'..'9').to_a + ('A'..'Z').to_a).freeze
1010
REQUEST_TYPES = [:request, :notification].freeze
1111

12+
def self.connect(uri, options)
13+
client = new(uri, options)
14+
yield(client)
15+
ensure
16+
client.close if client
17+
end
18+
1219
def initialize(uri, options)
1320
@uri = uri
1421
@options = options

lib/jrpc/tcp_client.rb

+31-16
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
require 'net/tcp_client'
21
require 'netstring'
32
require 'logger'
43
require 'benchmark'
@@ -18,22 +17,34 @@ def initialize(uri, options = {})
1817
timeout = @options.fetch(:timeout, 5)
1918
connect_timeout = @options.fetch(:connect_timeout, timeout)
2019
read_timeout = @options.fetch(:read_timeout, timeout)
21-
write_timeout = @options.fetch(:write_timeout, nil) # default 60
22-
connect_retry_count = @options.fetch(:connect_retry_count, nil) # default 10
20+
write_timeout = @options.fetch(:write_timeout, 60) # default 60
21+
connect_retry_count = @options.fetch(:connect_retry_count, 10) # default 10
22+
@close_after_sent = @options.fetch(:close_after_sent, false)
2323

24-
@transport = Net::TCPClient.new server: @uri,
25-
connect_retry_count: connect_retry_count,
26-
connect_timeout: connect_timeout,
27-
read_timeout: read_timeout,
28-
write_timeout: write_timeout,
29-
buffered: false # recommended for RPC
30-
rescue ::SocketError
31-
raise ConnectionError, "Can't connect to #{@uri}"
24+
@transport = JRPC::Transport::SocketTcp.new server: @uri,
25+
connect_retry_count: connect_retry_count,
26+
connect_timeout: connect_timeout,
27+
read_timeout: read_timeout,
28+
write_timeout: write_timeout
29+
begin
30+
@transport.connect
31+
rescue JRPC::SocketTcp::Error
32+
raise ConnectionError, "Can't connect to #{@uri}"
33+
end
3234
end
3335

3436
private
3537

36-
def send_command(request, options={})
38+
def ensure_connected
39+
if @transport.closed?
40+
logger.debug { 'Connecting transport...' }
41+
@transport.connect
42+
logger.debug { 'Connected.' }
43+
end
44+
end
45+
46+
def send_command(request, options = {})
47+
ensure_connected
3748
read_timeout = options.fetch(:read_timeout)
3849
write_timeout = options.fetch(:write_timeout)
3950
response = nil
@@ -49,15 +60,20 @@ def send_command(request, options={})
4960
"(#{'%.2f' % (t * 1000)}ms) Response message: #{Utils.truncate(response, MAX_LOGGED_MESSAGE_LENGTH)}"
5061
end
5162
response
63+
ensure
64+
@transport.close if @close_after_sent
5265
end
5366

54-
def send_notification(request, options={})
67+
def send_notification(request, options = {})
68+
ensure_connected
5569
write_timeout = options.fetch(:write_timeout)
5670
logger.debug { "Request address: #{uri}" }
5771
logger.debug { "Request message: #{Utils.truncate(request, MAX_LOGGED_MESSAGE_LENGTH)}" }
5872
logger.debug { "Request write_timeout: #{write_timeout}" }
5973
send_request(request, write_timeout)
6074
logger.debug { 'No response required' }
75+
ensure
76+
@transport.close if @close_after_sent
6177
end
6278

6379
def create_message(method, params)
@@ -74,8 +90,7 @@ def send_request(request, timeout)
7490
def receive_response(timeout)
7591
timeout ||= @transport.read_timeout
7692
length = get_msg_length(timeout)
77-
response = ''
78-
@transport.read(length+1, response, timeout)
93+
response = @transport.read(length + 1, timeout)
7994
raise ClientError.new('invalid response. missed comma as terminator') if response[-1] != ','
8095
response.chomp(',')
8196
rescue ::SocketError
@@ -85,7 +100,7 @@ def receive_response(timeout)
85100
def get_msg_length(timeout)
86101
length = ''
87102
while true do
88-
character = @transport.read(1, nil, timeout)
103+
character = @transport.read(1, timeout)
89104
break if character == ':'
90105
length += character
91106
end

lib/jrpc/transport/socket_base.rb

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
module JRPC
2+
module Transport
3+
class SocketBase
4+
5+
class Error < StandardError
6+
end
7+
8+
class TimeoutError < Error
9+
def initialize
10+
super(self.class.to_s.split('::').last)
11+
end
12+
end
13+
14+
class ReadTimeoutError < TimeoutError
15+
end
16+
17+
class WriteTimeoutError < TimeoutError
18+
end
19+
20+
class ConnectionTimeoutError < TimeoutError
21+
end
22+
23+
class ConnectionFailedError < Error
24+
end
25+
26+
attr_reader :options, :read_timeout, :write_timeout
27+
28+
def self.connect(options)
29+
connection = new(options)
30+
yield(connection)
31+
ensure
32+
connection.close if connection
33+
end
34+
35+
def initialize(options)
36+
@server = options.fetch(:server)
37+
@read_timeout = options.fetch(:read_timeout, nil)
38+
@write_timeout = options.fetch(:write_timeout, nil)
39+
@connect_timeout = options.fetch(:connect_timeout, nil)
40+
@connect_retry_count = options.fetch(:connect_retry_count, 0)
41+
@options = options
42+
end
43+
44+
def connect
45+
retries = @connect_retry_count
46+
47+
while retries >= 0
48+
begin
49+
connect_socket
50+
break
51+
rescue Error => e
52+
retries -= 1
53+
raise e if retries < 0
54+
end
55+
end
56+
end
57+
58+
def read(_length, _timeout = @read_timeout)
59+
raise NotImplementedError
60+
end
61+
62+
def write(_data, _timeout = @write_timeout)
63+
raise NotImplementedError
64+
end
65+
66+
def close
67+
raise NotImplementedError
68+
end
69+
70+
def closed?
71+
raise NotImplementedError
72+
end
73+
74+
private
75+
76+
def connect_socket
77+
raise NotImplementedError
78+
end
79+
80+
end
81+
end
82+
end

lib/jrpc/transport/socket_tcp.rb

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
module JRPC
2+
module Transport
3+
class SocketTcp < SocketBase
4+
5+
attr_reader :socket
6+
7+
def initialize(options)
8+
super
9+
@socket = build_socket
10+
end
11+
12+
def read(length, timeout = @read_timeout)
13+
received = ''
14+
length_to_read = length
15+
while length_to_read > 0
16+
io_read, = IO.select([@socket], [], [], timeout)
17+
raise ReadTimeoutError unless io_read
18+
chunk = io_read[0].read_nonblock(length_to_read)
19+
received += chunk
20+
length_to_read -= chunk.bytesize
21+
end
22+
received
23+
end
24+
25+
def write(data, timeout = @write_timeout)
26+
length_written = 0
27+
data_to_write = data
28+
while data_to_write.bytesize > 0
29+
_, io_write, = IO.select([], [@socket], [], timeout)
30+
raise WriteTimeoutError unless io_write
31+
chunk_length = io_write[0].write_nonblock(data_to_write)
32+
length_written += chunk_length
33+
data_to_write = data.byteslice(length_written, data.length)
34+
end
35+
length_written
36+
end
37+
38+
def close
39+
@socket.close
40+
end
41+
42+
def closed?
43+
@socket.closed?
44+
end
45+
46+
private
47+
48+
def set_timeout_to(socket, type, value)
49+
secs = Integer(value)
50+
u_secs = Integer((value - secs) * 1_000_000)
51+
opt_val = [secs, u_secs].pack('l_2')
52+
socket.setsockopt Socket::SOL_SOCKET, type, opt_val
53+
end
54+
55+
def build_socket
56+
host = @server.split(':').first
57+
addr = Socket.getaddrinfo(host, nil)
58+
sock = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
59+
set_timeout_to(sock, Socket::SO_RCVTIMEO, @connect_timeout) if @connect_timeout
60+
sock
61+
end
62+
63+
def connect_socket
64+
host, port = @server.split(':')
65+
addr = Socket.getaddrinfo(host, nil)
66+
full_addr = Socket.pack_sockaddr_in(port, addr[0][3])
67+
@socket.connect(full_addr)
68+
rescue Errno::EISCONN => _
69+
# already connected
70+
rescue Errno::ETIMEDOUT => _
71+
raise ConnectionTimeoutError
72+
rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::ETIMEDOUT => e
73+
raise ConnectionFailedError, "#{e.class} #{e.message}"
74+
end
75+
76+
end
77+
end
78+
end

lib/jrpc/version.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module JRPC
2-
VERSION = '1.0.1'
2+
VERSION = '1.1.0'
33
end

spec/fake_transport.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ def write(_, write_timeout = nil)
1212
@write_timeout = write_timeout if write_timeout
1313
end
1414

15-
def read(_, buffer, read_timeout = nil)
15+
def read(_, read_timeout = nil)
1616
@read_timeout = read_timeout if read_timeout
17-
buffer << @response
17+
@response
1818
end
1919

2020
def response=(str)

spec/tcp_client_spec.rb

+6-5
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@
5050

5151
json_result = expected_result.to_json
5252
socket_stub.response = json_result
53-
expect(socket_stub).to receive(:read).with(1, nil, 30).exactly(json_result.size.to_s.size).times.
53+
expect(socket_stub).to receive(:read).with(1, 30).exactly(json_result.size.to_s.size).times.
5454
and_return(
5555
*(json_result.size.to_s.split('') + [':'])
5656
)
57-
expect(socket_stub).to receive(:read).with(json_result.size + 1, '', 30).and_return(json_result + ',').and_call_original
57+
expect(socket_stub).to receive(:read).with(json_result.size + 1, 30).and_return(json_result + ',').and_call_original
5858

5959
expect(subject).to eq JSON.parse(json_result)['result']
6060
end
@@ -113,7 +113,7 @@
113113
let(:invoke_request_params) { [1, 2] }
114114

115115
before do
116-
allow(Net::TCPClient).to receive(:new).with(any_args).once.and_return(socket_stub)
116+
allow(JRPC::Transport::SocketTcp).to receive(:new).with(any_args).once.and_return(socket_stub)
117117
end
118118

119119
it 'calls perform_request("sum", params: [1, 2])' do
@@ -149,7 +149,7 @@
149149
let(:invoke_notification_params) { [1, 2] }
150150

151151
before do
152-
allow(Net::TCPClient).to receive(:new).with(any_args).once.and_return(socket_stub)
152+
allow(JRPC::Transport::SocketTcp).to receive(:new).with(any_args).once.and_return(socket_stub)
153153
end
154154

155155
it 'calls perform_request("sum", params: [1, 2], type: :notification)' do
@@ -187,7 +187,8 @@
187187

188188
before do
189189
allow_any_instance_of(JRPC::TcpClient).to receive(:generate_id).with(no_args).and_return(stubbed_generated_id)
190-
allow(Net::TCPClient).to receive(:new).with(any_args).once.and_return(socket_stub)
190+
allow(JRPC::Transport::SocketTcp).to receive(:new).with(any_args).once.and_return(socket_stub)
191+
allow(socket_stub).to receive(:closed?).and_return(false)
191192
end
192193

193194
context 'with array params' do

0 commit comments

Comments
 (0)