From bee6c277363b72efd99fbe4402f7ace9a1bbc685 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 12 Jul 2017 18:53:53 +0300 Subject: [PATCH 1/9] Delete commented out test case .NET Core does not provide the API elements we need to reproduce this test. It was fairly time-dependent as well. --- .../src/unit/TestConnectionChurnHandleLeak.cs | 97 ------------------- 1 file changed, 97 deletions(-) delete mode 100755 projects/client/Unit/src/unit/TestConnectionChurnHandleLeak.cs diff --git a/projects/client/Unit/src/unit/TestConnectionChurnHandleLeak.cs b/projects/client/Unit/src/unit/TestConnectionChurnHandleLeak.cs deleted file mode 100755 index 99a268ea6b..0000000000 --- a/projects/client/Unit/src/unit/TestConnectionChurnHandleLeak.cs +++ /dev/null @@ -1,97 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 1.1. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2016 Pivotal Software, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v1.1: -// -//--------------------------------------------------------------------------- -// The contents of this file are subject to the Mozilla Public License -// Version 1.1 (the "License"); you may not use this file except in -// compliance with the License. You may obtain a copy of the License -// at http://www.mozilla.org/MPL/ -// -// Software distributed under the License is distributed on an "AS IS" -// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -// the License for the specific language governing rights and -// limitations under the License. -// -// The Original Code is RabbitMQ. -// -// The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using NUnit.Framework; -using RabbitMQ.Client.Exceptions; -using System; -using System.Diagnostics; -using System.Threading; - -namespace RabbitMQ.Client.Unit -{ - [TestFixture] - public class TestConnectionChurnHandleLeak : IntegrationFixture - { - //TODO: work out if these tests can be replicated - /* - [Test, Category("GCTest"), Category("MonoBug")] - public void TestHandleLeakWithDisabledHeartbeats() - { - var cf = new ConnectionFactory() - { - RequestedHeartbeat = 0 - }; - PerformLeakTest(cf); - } - - [Test, Category("GCTest"), Category("MonoBug")] - public void TestHandleLeakWithEnabledHeartbeats() - { - var cf = new ConnectionFactory() - { - RequestedHeartbeat = 16 - }; - PerformLeakTest(cf); - } - - - protected void PerformLeakTest(ConnectionFactory cf) - { - var me = Process.GetCurrentProcess(); - var n = me.HandleCount; - Console.WriteLine("{0} handles before the test...", me.HandleCount); - for (var i = 0; i < 1000; i++) - { - var conn = cf.CreateConnection(); - conn.Close(); - } - GC.Collect(); - GC.WaitForPendingFinalizers(); - GC.Collect(); - Thread.Sleep(TimeSpan.FromSeconds(1)); - me = Process.GetCurrentProcess(); - Console.WriteLine("{0} handles after the test...", me.HandleCount); - // allow for a 20% margin of error, as GC behaviour and native handle - // release is difficult to predict - Assert.That(me.HandleCount, Is.LessThanOrEqualTo(n + 200)); - } - */ - } -} From 6f6f652ff8c61a1729a868773613473a46e02241 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 12 Jul 2017 22:07:00 +0300 Subject: [PATCH 2/9] Extract connection methods, generalise for different address families --- .../src/client/impl/SocketFrameHandler.cs | 64 +++++++++++++------ 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs index c77d35d21a..c18c817c81 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs @@ -84,29 +84,15 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, int connectionTimeout, int readTimeout, int writeTimeout) { Endpoint = endpoint; - m_socket = null; + if (Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork) { - try - { - m_socket = socketFactory(AddressFamily.InterNetworkV6); - Connect(m_socket, endpoint, connectionTimeout); - } - catch (ConnectFailureException) // could not connect using IPv6 - { - m_socket = null; - } - // Mono might raise a SocketException when using IPv4 addresses on - // an OS that supports IPv6 - catch (SocketException) - { - m_socket = null; - } + m_socket = ConnectUsingIPv6(endpoint, socketFactory, connectionTimeout); } + if (m_socket == null && endpoint.AddressFamily != AddressFamily.InterNetworkV6) { - m_socket = socketFactory(AddressFamily.InterNetwork); - Connect(m_socket, endpoint, connectionTimeout); + m_socket = ConnectUsingIPv4(endpoint, socketFactory, connectionTimeout); } Stream netstream = m_socket.GetStream(); @@ -273,14 +259,52 @@ public void Flush() } } - private void Connect(ITcpClient socket, AmqpTcpEndpoint endpoint, int timeout) + private ITcpClient ConnectUsingIPv6(AmqpTcpEndpoint endpoint, + Func socketFactory, + int timeout) + { + return ConnectUsingAddressFamily(endpoint, socketFactory, timeout, AddressFamily.InterNetworkV6); + } + + private ITcpClient ConnectUsingIPv4(AmqpTcpEndpoint endpoint, + Func socketFactory, + int timeout) + { + return ConnectUsingAddressFamily(endpoint, socketFactory, timeout, AddressFamily.InterNetwork); + } + + private ITcpClient ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, + Func socketFactory, + int timeout, AddressFamily family) + { + ITcpClient socket; + try + { + socket = socketFactory(family); + ConnectOrFail(socket, endpoint, timeout); + return socket; + } + catch (ConnectFailureException) // could not connect using IPv6 + { + return null; + } + // Mono might raise a SocketException when using IPv4 addresses on + // an OS that supports IPv6 + catch (SocketException) + { + return null; + } + } + + private void ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, int timeout) { try { socket.ConnectAsync(endpoint.HostName, endpoint.Port) .TimeoutAfter(timeout) .ConfigureAwait(false) - .GetAwaiter()//this ensures exceptions aren't wrapped in an AggregateException + // this ensures exceptions aren't wrapped in an AggregateException + .GetAwaiter() .GetResult(); } catch (ArgumentException e) From ff05395816bf4c4330a12a8df6e99e17c50358f5 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 12 Jul 2017 22:43:16 +0300 Subject: [PATCH 3/9] Handle ConnectUsingIPv6 failures, retry with IPv4 --- .../src/client/impl/SocketFrameHandler.cs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs index c18c817c81..34e2949a2d 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs @@ -87,7 +87,14 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, if (Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork) { - m_socket = ConnectUsingIPv6(endpoint, socketFactory, connectionTimeout); + try { + m_socket = ConnectUsingIPv6(endpoint, socketFactory, connectionTimeout); + #pragma warning disable 0168 + } catch (ConnectFailureException cfe) + #pragma warning restore 0168 + { + m_socket = null; + } } if (m_socket == null && endpoint.AddressFamily != AddressFamily.InterNetworkV6) From ba7e1d45eb879157bb5e6d1ffffe230cf8213856 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 12 Jul 2017 22:48:01 +0300 Subject: [PATCH 4/9] Let ConnectFailureException fly free We don't want to suppress them entirely. Methods that try IPv6 then IPv4 (or vice versa) can handle them as needed. --- .../src/client/impl/SocketFrameHandler.cs | 20 +++---------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs index 34e2949a2d..ebda358262 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs @@ -284,23 +284,9 @@ private ITcpClient ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func socketFactory, int timeout, AddressFamily family) { - ITcpClient socket; - try - { - socket = socketFactory(family); - ConnectOrFail(socket, endpoint, timeout); - return socket; - } - catch (ConnectFailureException) // could not connect using IPv6 - { - return null; - } - // Mono might raise a SocketException when using IPv4 addresses on - // an OS that supports IPv6 - catch (SocketException) - { - return null; - } + var socket = socketFactory(family); + ConnectOrFail(socket, endpoint, timeout); + return socket; } private void ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, int timeout) From 3df1c2a0d5402fd16ed624d5762978bbe10d3ef0 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 12 Jul 2017 22:50:29 +0300 Subject: [PATCH 5/9] Simplify --- .../RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs index ebda358262..1775557908 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs @@ -89,9 +89,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, { try { m_socket = ConnectUsingIPv6(endpoint, socketFactory, connectionTimeout); - #pragma warning disable 0168 - } catch (ConnectFailureException cfe) - #pragma warning restore 0168 + } catch (ConnectFailureException) { m_socket = null; } From f67be34d6c35ed5431bcc75421f5d98307bdcc44 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 12 Jul 2017 23:04:45 +0300 Subject: [PATCH 6/9] Extract a method, remove dead code --- .../src/client/impl/SocketFrameHandler.cs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs index 1775557908..3ba6c22b61 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs @@ -85,7 +85,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, { Endpoint = endpoint; - if (Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork) + if (ShouldTryIPV6(endpoint)) { try { m_socket = ConnectUsingIPv6(endpoint, socketFactory, connectionTimeout); @@ -155,12 +155,10 @@ public int ReadTimeout m_socket.ReceiveTimeout = value; } } -#pragma warning disable 0168 - catch (SocketException _) + catch (SocketException) { // means that the socket is already closed } -#pragma warning restore 0168 } } @@ -181,13 +179,6 @@ public void Close() { try { - try - { - - } catch (ArgumentException) - { - // ignore, we are closing anyway - }; m_socket.Close(); } catch (Exception) @@ -264,6 +255,11 @@ public void Flush() } } + private bool ShouldTryIPV6(AmqpTcpEndpoint endpoint) + { + return (Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork) + } + private ITcpClient ConnectUsingIPv6(AmqpTcpEndpoint endpoint, Func socketFactory, int timeout) From 1555ba1af9a35927af774405aa1de271e522f68e Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 12 Jul 2017 23:24:14 +0300 Subject: [PATCH 7/9] Dispose socket on ConnectFailureException Fixes #337. --- .../client/RabbitMQ.Client/RabbitMQ.Client.csproj | 1 + .../RabbitMQ.Client/src/client/api/ITcpClient.cs | 2 +- .../src/client/impl/SocketFrameHandler.cs | 13 +++++++++---- .../src/client/impl/TcpClientAdapter.cs | 10 +++++++++- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj index 00e271f100..1f15ba91d4 100755 --- a/projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj +++ b/projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj @@ -61,6 +61,7 @@ + diff --git a/projects/client/RabbitMQ.Client/src/client/api/ITcpClient.cs b/projects/client/RabbitMQ.Client/src/client/api/ITcpClient.cs index 88eae4a8da..b97264be0f 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/ITcpClient.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/ITcpClient.cs @@ -10,7 +10,7 @@ namespace RabbitMQ.Client /// Wrapper interface for standard TCP-client. Provides socket for socket frame handler class. /// /// Contains all methods that are currenty in use in rabbitmq client. - public interface ITcpClient + public interface ITcpClient : IDisposable { bool Connected { get; } diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs index 3ba6c22b61..8dd00680ae 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs @@ -257,7 +257,7 @@ public void Flush() private bool ShouldTryIPV6(AmqpTcpEndpoint endpoint) { - return (Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork) + return (Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork); } private ITcpClient ConnectUsingIPv6(AmqpTcpEndpoint endpoint, @@ -278,9 +278,14 @@ private ITcpClient ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func socketFactory, int timeout, AddressFamily family) { - var socket = socketFactory(family); - ConnectOrFail(socket, endpoint, timeout); - return socket; + ITcpClient socket = socketFactory(family); + try { + ConnectOrFail(socket, endpoint, timeout); + return socket; + } catch (ConnectFailureException e) { + socket.Dispose(); + throw e; + } } private void ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, int timeout) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs b/projects/client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs index c8e55cf8b6..2b27bdc863 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs @@ -41,8 +41,16 @@ public virtual async Task ConnectAsync(string host, int port) public virtual void Close() { - if(sock != null) + // TODO: Socket.Close() would be a better option here. MK. + this.Dispose(); + } + + public virtual void Dispose() + { + if (sock != null) + { sock.Dispose(); + } sock = null; } From c97a1507a82fd1b714762926c9082653649c49a1 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 12 Jul 2017 23:36:16 +0300 Subject: [PATCH 8/9] IPV6 => IPv6 --- .../RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs index 8dd00680ae..d64cf6fe94 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs @@ -85,7 +85,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, { Endpoint = endpoint; - if (ShouldTryIPV6(endpoint)) + if (ShouldTryIPv6(endpoint)) { try { m_socket = ConnectUsingIPv6(endpoint, socketFactory, connectionTimeout); @@ -255,7 +255,7 @@ public void Flush() } } - private bool ShouldTryIPV6(AmqpTcpEndpoint endpoint) + private bool ShouldTryIPv6(AmqpTcpEndpoint endpoint) { return (Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork); } From d79f40667b556dba5dc39d2e6fecd483cc1b369d Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 13 Jul 2017 14:31:32 +0300 Subject: [PATCH 9/9] Remove a comment that's not really correct Socket.Close is virtually identical to Socket.Dispose. --- .../client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/projects/client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs b/projects/client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs index 2b27bdc863..e8233d04ad 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/TcpClientAdapter.cs @@ -41,7 +41,6 @@ public virtual async Task ConnectAsync(string host, int port) public virtual void Close() { - // TODO: Socket.Close() would be a better option here. MK. this.Dispose(); }