Skip to content

Commit fc386ee

Browse files
committed
fix fork reconnections which were broken since retryable error
1 parent e9ab1eb commit fc386ee

File tree

2 files changed

+254
-1
lines changed

2 files changed

+254
-1
lines changed

lib/dalli/protocol/connection_manager.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ def close_on_fork
243243
# Close socket on a fork, setting us up for reconnect
244244
# on next request.
245245
close
246-
raise Dalli::NetworkError, message
246+
raise Dalli::RetryableNetworkError, message
247247
end
248248

249249
def fork_detected?

test/integration/test_concurrency.rb

+253
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,257 @@
4848
cache.flush
4949
end
5050
end
51+
52+
it 'supports multithreaded access for single server optimized' do
53+
memcached_persistent do |_cache, port|
54+
# NOTE: we have a bug with set multi and a namespace namespace: 'some:namspace' fails
55+
cache = Dalli::Client.new("localhost:#{port}", raw: true)
56+
cache.close
57+
cache.flush
58+
workers = []
59+
60+
cache.set('f', 'zzz')
61+
62+
assert op_cas_succeeds((cache.cas('f') do |value|
63+
value << 'z'
64+
end))
65+
assert_equal 'zzzz', cache.get('f')
66+
67+
multi_keys = { 'ab' => 'vala', 'bb' => 'valb', 'cb' => 'valc' }
68+
69+
# Have a bunch of threads perform a bunch of operations at the same time.
70+
# Verify the result of each operation to ensure the request and response
71+
# are not intermingled between threads.
72+
10.times do
73+
workers << Thread.new do
74+
100.times do
75+
cache.set('a', 9)
76+
cache.set('b', 11)
77+
cache.set('f', 'zzz')
78+
cache.set_multi(multi_keys, 10)
79+
res = cache.cas('f') do |value|
80+
value << 'z'
81+
end
82+
83+
refute_nil res
84+
refute cache.add('a', 11)
85+
assert_equal({ 'a' => '9', 'b' => '11' }, cache.get_multi(%w[a b]))
86+
87+
assert_equal '11', cache.get('b')
88+
assert_equal 'vala', cache.get('ab')
89+
90+
assert_equal %w[a b], cache.get_multi('a', 'b', 'c').keys.sort
91+
assert_equal multi_keys, cache.get_multi(multi_keys.keys)
92+
cache.set_multi(multi_keys, 10)
93+
end
94+
end
95+
end
96+
97+
workers.each(&:join)
98+
cache.flush
99+
end
100+
end
101+
102+
# Have a bunch of threads perform a bunch of operations at the same time.
103+
# Verify the result of each operation to ensure the request and response
104+
# are not intermingled between forks.
105+
it 'supports multi process client usage for multiple servers' do
106+
memcached_persistent do |_cache, port|
107+
memcached_persistent do |_cache, port2|
108+
cache = Dalli::Client.new(["localhost:#{port}", "localhost:#{port2}"], socket_timeout: 0.1,
109+
socket_max_failures: 0,
110+
socket_failure_delay: 0.0,
111+
down_retry_delay: 0.0)
112+
cache.close
113+
cache.flush
114+
workers = []
115+
116+
cache.set('f', 'zzz')
117+
118+
assert op_cas_succeeds((cache.cas('f') do |value|
119+
value << 'z'
120+
end))
121+
assert_equal 'zzzz', cache.get('f')
122+
123+
10.times do
124+
workers << Process.fork do
125+
# first request after forking will try to reconnect to the server, we need to ensure we hit both rings
126+
cache.set('ring1', 'work')
127+
cache.set('ring2', 'work')
128+
sleep(0.2)
129+
10.times do
130+
cache.set('a', 9)
131+
cache.set('b', 11)
132+
cache.set('f', 'zzz')
133+
res = cache.cas('f') do |value|
134+
value << 'z'
135+
end
136+
137+
refute_nil res
138+
refute cache.add('a', 11)
139+
assert_equal({ 'a' => 9, 'b' => 11 }, cache.get_multi(%w[a b]))
140+
141+
assert_equal 11, cache.get('b')
142+
143+
assert_equal %w[a b], cache.get_multi('a', 'b', 'c').keys.sort
144+
end
145+
end
146+
end
147+
148+
Process.wait
149+
sleep(1) # if we don't sleep between the two protocol tests, second fails on connection issues
150+
end
151+
end
152+
end
153+
154+
it 'supports multi process client usage for single server' do
155+
memcached_persistent do |_cache, port|
156+
cache = Dalli::Client.new("localhost:#{port}", socket_timeout: 0.1, protocol: p,
157+
socket_max_failures: 0,
158+
socket_failure_delay: 0.0,
159+
down_retry_delay: 0.0)
160+
cache.close
161+
cache.flush
162+
workers = []
163+
164+
cache.set('f', 'zzz')
165+
166+
assert op_cas_succeeds((cache.cas('f') do |value|
167+
value << 'z'
168+
end))
169+
assert_equal 'zzzz', cache.get('f')
170+
171+
10.times do
172+
workers << Process.fork do
173+
# first request after forking will try to reconnect to the server, we need to ensure we hit both rings
174+
cache.set('ring1', 'work')
175+
sleep(0.2)
176+
10.times do
177+
cache.set('a', 9)
178+
cache.set('b', 11)
179+
cache.set('f', 'zzz')
180+
res = cache.cas('f') do |value|
181+
value << 'z'
182+
end
183+
184+
refute_nil res
185+
refute cache.add('a', 11)
186+
assert_equal({ 'a' => 9, 'b' => 11 }, cache.get_multi(%w[a b]))
187+
188+
assert_equal 11, cache.get('b')
189+
190+
assert_equal %w[a b], cache.get_multi('a', 'b', 'c').keys.sort
191+
end
192+
end
193+
end
194+
195+
Process.wait
196+
sleep(1) # if we don't sleep between the two protocol tests, second fails on connection issues
197+
end
198+
end
199+
200+
it 'supports multi process client usage for single server raw optimized' do
201+
memcached_persistent do |_cache, port|
202+
cache = Dalli::Client.new("localhost:#{port}", raw: true, socket_timeout: 0.1, protocol: p,
203+
socket_max_failures: 0,
204+
socket_failure_delay: 0.0,
205+
down_retry_delay: 0.0)
206+
cache.close
207+
cache.flush
208+
workers = []
209+
210+
cache.set('f', 'zzz')
211+
212+
assert op_cas_succeeds((cache.cas('f') do |value|
213+
value << 'z'
214+
end))
215+
assert_equal 'zzzz', cache.get('f')
216+
217+
10.times do
218+
workers << Process.fork do
219+
# first request after forking will try to reconnect to the server, we need to ensure we hit both rings
220+
cache.set('ring1', 'work')
221+
sleep(0.2)
222+
10.times do
223+
cache.set('a', 9)
224+
cache.set('b', 11)
225+
cache.set('f', 'zzz')
226+
res = cache.cas('f') do |value|
227+
value << 'z'
228+
end
229+
230+
refute_nil res
231+
refute cache.add('a', 11)
232+
assert_equal({ 'a' => '9', 'b' => '11' }, cache.get_multi(%w[a b]))
233+
234+
assert_equal '11', cache.get('b')
235+
236+
assert_equal %w[a b], cache.get_multi('a', 'b', 'c').keys.sort
237+
end
238+
end
239+
end
240+
241+
Process.wait
242+
sleep(1) # if we don't sleep between the two protocol tests, second fails on connection issues
243+
end
244+
end
245+
246+
# it 'supports multi process access for single server optimized' do
247+
# memcached_persistent do |_cache, port|
248+
# cache = Dalli::Client.new("localhost:#{port}", raw: true)
249+
# cache.close
250+
# cache.flush
251+
# workers = []
252+
253+
# cache.set('f', 'zzz')
254+
255+
# assert op_cas_succeeds((cache.cas('f') do |value|
256+
# value << 'z'
257+
# end))
258+
# assert_equal 'zzzz', cache.get('f')
259+
260+
# multi_keys = { 'ab' => 'vala', 'bb' => 'valb', 'cb' => 'valc' }
261+
262+
# # Have a bunch of threads perform a bunch of operations at the same time.
263+
# # Verify the result of each operation to ensure the request and response
264+
# # are not intermingled between threads.
265+
# 10.times do
266+
# workers << Process.fork do
267+
# # assert_raises Dalli::RingError do
268+
# # assert_raises Dalli::NetworkError do
269+
# puts 'gonna fail'
270+
# cache.set('gonna', 'fail')
271+
# # end
272+
# # 1.times do
273+
# # cache.set('a', 9)
274+
# # cache.set('b', 11)
275+
# # cache.set('f', 'zzz')
276+
# # cache.set_multi(multi_keys, 10)
277+
# # res = cache.cas('f') do |value|
278+
# # value << 'z'
279+
# # end
280+
281+
# # refute_nil res
282+
# # refute cache.add('a', 11)
283+
# # assert_equal({ 'a' => '9', 'b' => '11' }, cache.get_multi(%w[a b]))
284+
285+
# # assert_equal '11', cache.get('b')
286+
# # assert_equal 'vala', cache.get('ab')
287+
288+
# # assert_equal %w[a b], cache.get_multi('a', 'b', 'c').keys.sort
289+
# # assert_equal multi_keys, cache.get_multi(multi_keys.keys)
290+
# # cache.set_multi(multi_keys, 10)
291+
# # rescue Dalli::NetworkError
292+
# # puts 'NetworkError got to test'
293+
# # rescue Dalli::RingError
294+
# # puts 'RingError got to test'
295+
# # end
296+
# end
297+
# end
298+
299+
# # workers.each(&:join)
300+
# Process.wait
301+
# # cache.flush
302+
# end
303+
# end
51304
end

0 commit comments

Comments
 (0)