Skip to content

Commit 526133d

Browse files
committed
Add workaround for subscribe() and psubscribe()
Signed-off-by: Cy Rossignol <[email protected]>
1 parent 7a0eafe commit 526133d

File tree

4 files changed

+297
-5
lines changed

4 files changed

+297
-5
lines changed

src/Connections/PredisConnection.php

+103
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22

33
namespace Monospice\LaravelRedisSentinel\Connections;
44

5+
use Closure;
56
use Illuminate\Redis\Connections\PredisConnection as LaravelPredisConnection;
67
use Monospice\SpicyIdentifiers\DynamicMethod;
78
use Predis\ClientInterface as Client;
89
use Predis\CommunicationException;
10+
use Predis\PubSub\Consumer as PubSub;
11+
use RuntimeException;
912

1013
/**
1114
* Executes Redis commands using the Predis client.
@@ -121,6 +124,60 @@ public function setUpdateSentinels($enable)
121124
return $this;
122125
}
123126

127+
/**
128+
* Subscribe to a set of given channels for messages.
129+
*
130+
* @param array|string $channels The names of the channels to subscribe to.
131+
* @param Closure $callback Executed for each message. Receives the
132+
* message string in the first argument and the message channel as the
133+
* second argument. Return FALSE to unsubscribe.
134+
* @param string $method The subscription command ("subscribe" or
135+
* "psubscribe").
136+
*
137+
* @return void
138+
*/
139+
public function createSubscription(
140+
$channels,
141+
Closure $callback,
142+
$method = 'subscribe'
143+
) {
144+
$this->retryOnFailure(function () use ($method, $channels, $callback) {
145+
$loop = $this->pubSubLoop([ $method => (array) $channels ]);
146+
147+
if ($method === 'psubscribe') {
148+
$messageKind = 'pmessage';
149+
} else {
150+
$messageKind = 'message';
151+
}
152+
153+
$this->consumeMessages($loop, $messageKind, $callback);
154+
155+
unset($loop);
156+
});
157+
}
158+
159+
/**
160+
* Create a new PUB/SUB subscriber and pass messages to the callback if
161+
* provided.
162+
*
163+
* WARNING: Consumers created using this method are not monitored for
164+
* connection failures. For Sentinel support, use one of the methods
165+
* provided by the Laravel API instead (subscribe() and psubscribe()).
166+
*
167+
* @param array|null $options Configures the channel(s) to subscribe to.
168+
* @param callable $callback Optional callback executed for each message
169+
* published to the configured channel(s).
170+
*
171+
* @return \Predis\PubSub\Consumer|null A PUB/SUB context used to create
172+
* a subscription loop if no callback provided.
173+
*/
174+
public function pubSubLoop($options = null, $callback = null)
175+
{
176+
// Messages published to the master propagate to each of the slaves. We
177+
// pick a random slave to distribute load away from the master:
178+
return $this->getRandomSlave()->pubSubLoop($options, $callback);
179+
}
180+
124181
/**
125182
* Execute commands in a transaction.
126183
*
@@ -180,6 +237,27 @@ protected function retryOnFailure(callable $callback)
180237
throw $exception;
181238
}
182239

240+
/**
241+
* Execute the provided callback for each message read by the PUB/SUB
242+
* consumer.
243+
*
244+
* @param PubSub $loop Reads the messages published to a channel.
245+
* @param string $kind The subscribed message type ([p]message).
246+
* @param Closure $callback Executed for each message.
247+
*
248+
* @return void
249+
*/
250+
protected function consumeMessages(PubSub $loop, $kind, Closure $callback)
251+
{
252+
foreach ($loop as $message) {
253+
if ($message->kind === $kind) {
254+
if ($callback($message->payload, $message->channel) === false) {
255+
return;
256+
}
257+
}
258+
}
259+
}
260+
183261
/**
184262
* Get a Predis client instance for the master.
185263
*
@@ -189,4 +267,29 @@ protected function getMaster()
189267
{
190268
return $this->client->getClientFor('master');
191269
}
270+
271+
/**
272+
* Get a Predis client instance for a random slave.
273+
*
274+
* @param bool $fallbackToMaster If TRUE, return a client for the master
275+
* if the connection does not include any slaves.
276+
*
277+
* @return Client The client instance for the selected slave.
278+
*/
279+
protected function getRandomSlave($fallbackToMaster = true)
280+
{
281+
$slaves = $this->client->getConnection()->getSlaves();
282+
283+
if (count($slaves) > 0) {
284+
$slave = $slaves[rand(1, count($slaves)) - 1];
285+
286+
return $this->client->getClientFor($slave->getParameters()->alias);
287+
}
288+
289+
if ($fallbackToMaster) {
290+
return $this->getMaster();
291+
}
292+
293+
throw new RuntimeException('No slave present on connection.');
294+
}
192295
}

tests/Integration/Connections/PredisConnectionTest.php

+149-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Predis\Client;
1010
use Predis\Connection\ConnectionException;
1111
use Predis\Connection\NodeConnectionInterface;
12+
use Predis\PubSub\Consumer;
1213
use Predis\Transaction\MultiExec;
1314

1415
class PredisConnectionTest extends IntegrationTestCase
@@ -20,6 +21,16 @@ class PredisConnectionTest extends IntegrationTestCase
2021
*/
2122
protected $subject;
2223

24+
/**
25+
* Messages to publish and expect for subscribe tests.
26+
*
27+
* @var array
28+
*/
29+
protected $expectedMessages = [
30+
'test-channel-1' => [ 'test message 1', 'test message 2', ],
31+
'test-channel-2' => [ 'test message 1', 'test message 2', ],
32+
];
33+
2334
/**
2435
* Run this setup before each test
2536
*
@@ -44,6 +55,143 @@ public function tearDown()
4455
Mockery::close();
4556
}
4657

58+
public function testAllowsSubscriptionsOnAggregateConnection()
59+
{
60+
// The Predis client itself does not currently support subscriptions on
61+
// Sentinel connections and throws an exception that this class fixes.
62+
$this->assertInstanceOf(Consumer::class, $this->subject->pubSubLoop());
63+
}
64+
65+
public function testSubscribesToPubSubChannels()
66+
{
67+
// Don't block the test with retries if it failed to read the expected
68+
// number of messages from the server:
69+
$this->subject->setRetryLimit(0);
70+
71+
foreach ([ 'createSubscription', 'subscribe' ] as $method) {
72+
$received = [ ];
73+
74+
$test = function ($channels, $count) use (&$received, $method) {
75+
$this->subject->$method(
76+
$channels,
77+
function ($message, $channel) use (&$received, &$count) {
78+
$received[$channel][] = $message;
79+
80+
if (--$count === 0) {
81+
return false;
82+
}
83+
}
84+
);
85+
};
86+
87+
$this->testClient->publishForTest($test, $this->expectedMessages);
88+
89+
$this->assertEquals($this->expectedMessages, $received);
90+
}
91+
}
92+
93+
public function testSubscribesToPubSubChannelsByPattern()
94+
{
95+
// Don't block the test with retries if it failed to read the expected
96+
// number of messages from the server:
97+
$this->subject->setRetryLimit(0);
98+
99+
foreach ([ 'createSubscription', 'psubscribe' ] as $method) {
100+
$received = [ ];
101+
102+
$test = function ($channels, $count) use (&$received, $method) {
103+
$this->subject->$method(
104+
'test-channel-*',
105+
function ($message, $channel) use (&$received, &$count) {
106+
$received[$channel][] = $message;
107+
108+
if (--$count === 0) {
109+
return false;
110+
}
111+
},
112+
'psubscribe'
113+
);
114+
};
115+
116+
$this->testClient->publishForTest($test, $this->expectedMessages);
117+
118+
$this->assertEquals($this->expectedMessages, $received);
119+
}
120+
}
121+
122+
public function testSubscribesToPubSubChannelsUsingPredisApi()
123+
{
124+
// Don't block the test with retries if it failed to read the expected
125+
// number of messages from the server:
126+
$this->subject->setRetryLimit(0);
127+
128+
$received = [ ];
129+
130+
$test = function ($channels, $count) use (&$received) {
131+
$this->subject->pubSubLoop(
132+
[ 'subscribe' => $channels ],
133+
function ($loop, $message) use (&$received, &$count) {
134+
if ($message->kind === 'message') {
135+
$received[$message->channel][] = $message->payload;
136+
137+
if (--$count === 0) {
138+
return false;
139+
}
140+
}
141+
}
142+
);
143+
};
144+
145+
$this->testClient->publishForTest($test, $this->expectedMessages);
146+
147+
$this->assertEquals($this->expectedMessages, $received);
148+
}
149+
150+
public function testRetriesSubscriptionWhenConnectionFails()
151+
{
152+
$this->switchToMinimumTimeout();
153+
154+
$expectedRetries = 2;
155+
$this->subject = new PredisConnection($this->makeClientSpy());
156+
$this->subject->setRetryLimit($expectedRetries);
157+
$this->subject->setRetryWait(0); // retry immediately
158+
159+
// With a read-write timeout, Predis throws a ConnectionException if
160+
// nothing publishes to the channel for the duration specified by the
161+
// timeout value. We'll use this with a low timeout to simulate a real
162+
// connection failure so we don't need to block a server manually.
163+
try {
164+
$this->subject->subscribe([ 'channel' ], function () {
165+
return false;
166+
});
167+
} catch (ConnectionException $exception) {
168+
// With PHPUnit, we need to wrap the throwing block to perform
169+
// assertions afterward.
170+
}
171+
172+
$this->subject->client()->getConnection() // +1 for initial attempt:
173+
->shouldHaveReceived('querySentinel')->times($expectedRetries + 1);
174+
}
175+
176+
public function testSubscribesToSlaveByDefault()
177+
{
178+
$loop = $this->subject->pubSubLoop();
179+
$role = $loop->getClient()->executeRaw([ 'ROLE' ]);
180+
181+
$this->assertEquals('slave', $role[0]);
182+
}
183+
184+
public function testSubscribeFallsBackToMaster()
185+
{
186+
$this->subject->client()->getConnection()
187+
->shouldReceive('getSlaves')->andReturn([ ]);
188+
189+
$loop = $this->subject->pubSubLoop();
190+
$role = $loop->getClient()->executeRaw([ 'ROLE' ]);
191+
192+
$this->assertEquals('master', $role[0]);
193+
}
194+
47195
public function testAllowsTransactionsOnAggregateConnection()
48196
{
49197
// The Predis client itself does not currently support transactions on
@@ -121,7 +269,7 @@ public function testRetriesTransactionWhenConnectionFails()
121269

122270
public function testCanReconnectWhenConnectionFails()
123271
{
124-
$retries = (2 / $this->switchToMinimumTimeout()) + 1;
272+
$retries = ceil(2 / $this->switchToMinimumTimeout()) + 1;
125273
$attempts = 0;
126274

127275
$this->subject = new PredisConnection($this->makeClientSpy());

tests/Support/IntegrationTestCase.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ public function assertRedisKeyEquals($key, $expected)
109109
* Assert that the number of items in the Redis list at the specified key
110110
* equals the provided count.
111111
*
112-
* @param string $key The key of the list in Redis to compare
113-
* @param int expected The number of items that the list should contain.
112+
* @param string $key The key of the list in Redis to compare
113+
* @param int $expected The number of items that the list should contain.
114114
*
115115
* @return void
116116
*/
@@ -126,8 +126,8 @@ public function assertRedisListCount($key, $expected)
126126
* Assert that the number of items in the Redis sorted set at the specified
127127
* key equals the provided count.
128128
*
129-
* @param string $key The key of the sorted set in Redis to compare
130-
* @param int expected The number of items that the set should contain.
129+
* @param string $key The key of the sorted set in Redis to compare
130+
* @param int $expected The number of items that the set should contain.
131131
*
132132
* @return void
133133
*/

tests/Support/TestClient.php

+41
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,46 @@ public function __construct(array $sentinels, array $options)
5858
$connection->setRetryLimit(3);
5959
}
6060

61+
/**
62+
* Publish the supplied messages to the specified channels after executing
63+
* the provided callback.
64+
*
65+
* @param Closure $callback Executes subscribe() or psubscribe() commands.
66+
* Receives an array of channel names in first argument and the number of
67+
* messages to wait for.
68+
* @param array $messages Two-dimensional array keyed by channel names.
69+
* Each contains an array of message strings to publish on the channel.
70+
*
71+
* @return void
72+
*/
73+
public function publishForTest(Closure $callback, array $messages)
74+
{
75+
$callback = function () use ($messages, $callback) {
76+
$channels = array_keys($messages);
77+
$count = count(call_user_func_array('array_merge', $messages));
78+
79+
$callback($channels, $count);
80+
};
81+
82+
$stringMessages = var_export($messages, true);
83+
84+
$process = $this->makeBackgroundCommandProcessForMaster("
85+
usleep(100000);
86+
87+
foreach ($stringMessages as \$channel => \$messages) {
88+
foreach (\$messages as \$message) {
89+
\$client->publish(\$channel, \$message);
90+
}
91+
}
92+
");
93+
94+
$process->mustRun(function ($type, $buffer) use ($callback) {
95+
if ($buffer === self::BACKGROUND_PROCESS_READY_BEACON) {
96+
$callback();
97+
}
98+
});
99+
}
100+
61101
/**
62102
* Signal the current Redis master to sleep for the specified number of
63103
* seconds.
@@ -86,6 +126,7 @@ public function blockMasterFor($seconds, Closure $callback = null)
86126

87127
$process->mustRun(function ($type, $buffer) use ($callback) {
88128
if ($buffer === self::BACKGROUND_PROCESS_READY_BEACON) {
129+
usleep(1000);
89130
$callback();
90131
}
91132
});

0 commit comments

Comments
 (0)