Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection setting for blocking mode of socket #135

Merged
merged 2 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ $connectionSettings = (new \PhpMqtt\Client\ConnectionSettings)
// The password used for authentication when connecting to the broker.
->setPassword(null)

// Whether to use a blocking socket or not. By default, the socket is non-blocking,
// which is required when using subscriptions and/or {@see MqttClient::loop()}.
// In rare cases, it might be required to use a blocking socket though. One such example
// is when sending large messages (e.g. binaries) and the broker has a limited receive buffer.
//
// Note: When using a blocking socket, the MQTT client can get stuck if the socket is broken
// or when the broker does not consume the sent data fast enough. Use with caution.
->useBlockingSocket(false)

// The connect timeout defines the maximum amount of seconds the client will try to establish
// a socket connection with the broker. The value cannot be less than 1 second.
->setConnectTimeout(60)
Expand Down
30 changes: 30 additions & 0 deletions src/ConnectionSettings.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ConnectionSettings
{
private ?string $username = null;
private ?string $password = null;
private bool $useBlockingSocket = false;
private int $connectTimeout = 60;
private int $socketTimeout = 5;
private int $resendTimeout = 10;
Expand Down Expand Up @@ -80,6 +81,35 @@ public function getPassword(): ?string
return $this->password;
}

/**
* Whether to use a blocking socket or not. By default, the socket is non-blocking,
* which is required when using subscriptions and/or {@see MqttClient::loop()}.
* In rare cases, it might be required to use a blocking socket though. One such example
* is when sending large messages (e.g. binaries) and the broker has a limited receive buffer.
*
* Note: When using a blocking socket, the MQTT client can get stuck if the socket is broken
* or when the broker does not consume the sent data fast enough. Use with caution.
*
* @param bool $useBlockingSocket
* @return ConnectionSettings
*/
public function useBlockingSocket(bool $useBlockingSocket): ConnectionSettings
{
$copy = clone $this;

$copy->useBlockingModeForSocket = $useBlockingSocket;

return $copy;
}

/**
* @return bool
*/
public function shouldUseBlockingSocket(): bool
{
return $this->useBlockingSocket;
}

/**
* The connect timeout is the maximum amount of seconds the client will try to establish
* a socket connection with the broker. The value cannot be less than 1 second.
Expand Down
2 changes: 1 addition & 1 deletion src/MqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ protected function establishSocketConnection(): void
}

stream_set_timeout($socket, $this->settings->getSocketTimeout());
stream_set_blocking($socket, false);
stream_set_blocking($socket, $this->settings->shouldUseBlockingSocket());

$this->logger->debug('Socket opened and ready to use.');

Expand Down
40 changes: 30 additions & 10 deletions tests/Feature/PublishSubscribeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace Tests\Feature;

use PhpMqtt\Client\ConnectionSettings;
use PhpMqtt\Client\MqttClient;
use Tests\TestCase;

Expand All @@ -20,29 +21,40 @@ class PublishSubscribeTest extends TestCase
public function publishSubscribeData(): array
{
return [
['test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []],
['test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']],
['test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']],
['test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']],
['test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']],
['test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
['test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
[false, 'test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []],
[false, 'test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']],
[false, 'test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']],
[false, 'test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']],
[false, 'test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']],
[false, 'test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
[false, 'test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
[true, 'test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []],
[true, 'test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']],
[true, 'test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']],
[true, 'test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']],
[true, 'test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']],
[true, 'test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
[true, 'test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
];
}

/**
* @dataProvider publishSubscribeData
*/
public function test_publishing_and_subscribing_using_quality_of_service_0_works_as_intended(
bool $useBlockingSocket,
string $subscriptionTopicFilter,
string $publishTopic,
string $publishMessage,
array $matchedTopicWildcards
): void
{
// We connect and subscribe to a topic using the first client.
$connectionSettings = (new ConnectionSettings())
->useBlockingSocket($useBlockingSocket);

$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
$subscriber->connect(null, true);
$subscriber->connect($connectionSettings, true);

$subscriber->subscribe(
$subscriptionTopicFilter,
Expand Down Expand Up @@ -76,15 +88,19 @@ function (string $topic, string $message, bool $retained, array $wildcards) use
* @dataProvider publishSubscribeData
*/
public function test_publishing_and_subscribing_using_quality_of_service_1_works_as_intended(
bool $useBlockingSocket,
string $subscriptionTopicFilter,
string $publishTopic,
string $publishMessage,
array $matchedTopicWildcards
): void
{
// We connect and subscribe to a topic using the first client.
$connectionSettings = (new ConnectionSettings())
->useBlockingSocket($useBlockingSocket);

$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
$subscriber->connect(null, true);
$subscriber->connect($connectionSettings, true);

$subscriber->subscribe(
$subscriptionTopicFilter,
Expand Down Expand Up @@ -118,15 +134,19 @@ function (string $topic, string $message, bool $retained, array $wildcards) use
* @dataProvider publishSubscribeData
*/
public function test_publishing_and_subscribing_using_quality_of_service_2_works_as_intended(
bool $useBlockingSocket,
string $subscriptionTopicFilter,
string $publishTopic,
string $publishMessage,
array $matchedTopicWildcards
): void
{
// We connect and subscribe to a topic using the first client.
$connectionSettings = (new ConnectionSettings())
->useBlockingSocket($useBlockingSocket);

$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
$subscriber->connect(null, true);
$subscriber->connect($connectionSettings, true);

$subscription = function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $subscriptionTopicFilter, $publishTopic, $publishMessage, $matchedTopicWildcards) {
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
Expand Down