diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 4c102ce8..b34ac7ea 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -22,6 +22,10 @@ ], 'options' => [ + 'queue' => [ + 'durable' => true, + 'auto_delete' => false, + ], ], /* diff --git a/src/Queue/Connection/ConfigFactory.php b/src/Queue/Connection/ConfigFactory.php index 7783b9cd..abd07a0d 100644 --- a/src/Queue/Connection/ConfigFactory.php +++ b/src/Queue/Connection/ConfigFactory.php @@ -37,7 +37,9 @@ public static function make(array $config = []): AMQPConnectionConfig self::getHostFromConfig($connectionConfig, $config); self::getHeartbeatFromConfig($connectionConfig, $config); + self::getKeepAliveFromConfig($connectionConfig, $config); self::getNetworkProtocolFromConfig($connectionConfig, $config); + self::getTimeoutFromConfig($connectionConfig, $config); }); } @@ -93,10 +95,38 @@ protected static function getHeartbeatFromConfig(AMQPConnectionConfig $connectio } } + protected static function getKeepAliveFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $keepAlive = Arr::get($config, self::CONFIG_OPTIONS.'.keep_alive'); + + if (is_bool($keepAlive)) { + $connectionConfig->setKeepalive($keepAlive); + } + } + protected static function getNetworkProtocolFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void { if ($networkProtocol = Arr::get($config, 'network_protocol')) { $connectionConfig->setNetworkProtocol($networkProtocol); } } + + protected static function getTimeoutFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + if ($connectionTimeout = Arr::get($config, 'connection_timeout')) { + $connectionConfig->setConnectionTimeout($connectionTimeout); + } + + if ($readTimeout = Arr::get($config, 'read_timeout')) { + $connectionConfig->setReadTimeout($readTimeout); + } + + if ($writeTimeout = Arr::get($config, 'write_timeout')) { + $connectionConfig->setWriteTimeout($writeTimeout); + } + + if ($channelRPCTimeout = Arr::get($config, 'channel_rpc_timeout')) { + $connectionConfig->setChannelRPCTimeout($channelRPCTimeout); + } + } } diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 957620ef..4d238169 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -169,7 +169,11 @@ public function laterRaw($delay, string $payload, $queue = null, int $attempts = $destination = $this->getQueue($queue).'.delay.'.$ttl; - $this->declareQueue($destination, true, false, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); + $options = $this->config->getOptions(); + $durable = Arr::get($options, 'durable') ?: true; + $autoDelete = Arr::get($options, 'auto_delete') ?: false; + + $this->declareQueue($destination, $durable, $autoDelete, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); [$message, $correlationId] = $this->createMessage($payload, $attempts); @@ -717,7 +721,11 @@ protected function declareDestination(string $destination, string $exchange = nu } // Create a queue for amq.direct publishing. - $this->declareQueue($destination, true, false, $this->getQueueArguments($destination)); + $options = $this->config->getOptions(); + $durable = Arr::get($options, 'durable') ?: true; + $autoDelete = Arr::get($options, 'auto_delete') ?: false; + + $this->declareQueue($destination, $durable, $autoDelete, $this->getQueueArguments($destination)); } /**