Skip to content

Commit

Permalink
Merge pull request #338 from rabbitmq/rabbitmq-dotnet-client-337
Browse files Browse the repository at this point in the history
Dispose of sockets that failed to connect
  • Loading branch information
kjnilsson authored Jul 13, 2017
2 parents 6a1bf83 + d79f406 commit e71e5d4
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 127 deletions.
1 change: 1 addition & 0 deletions projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
<PackageReference Include="System.Net.Sockets" Version="4.3.0" />
<PackageReference Include="System.Reflection.Extensions" Version="4.3.0" />
<PackageReference Include="System.Reflection.TypeExtensions" Version="4.3.0" />
<PackageReference Include="System.Runtime" Version="4.3.0" />
<PackageReference Include="System.Runtime.Extensions" Version="4.3.0" />
<PackageReference Include="System.Text.RegularExpressions" Version="4.3.0" />
<PackageReference Include="System.Threading" Version="4.3.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace RabbitMQ.Client
/// Wrapper interface for standard TCP-client. Provides socket for socket frame handler class.
/// </summary>
/// <remarks>Contains all methods that are currenty in use in rabbitmq client.</remarks>
public interface ITcpClient
public interface ITcpClient : IDisposable
{
bool Connected { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,29 +84,20 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
int connectionTimeout, int readTimeout, int writeTimeout)
{
Endpoint = endpoint;
m_socket = null;
if (Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork)

if (ShouldTryIPv6(endpoint))
{
try
{
m_socket = socketFactory(AddressFamily.InterNetworkV6);
Connect(m_socket, endpoint, connectionTimeout);
}
catch (ConnectFailureException) // could not connect using IPv6
{
m_socket = null;
}
// Mono might raise a SocketException when using IPv4 addresses on
// an OS that supports IPv6
catch (SocketException)
try {
m_socket = ConnectUsingIPv6(endpoint, socketFactory, connectionTimeout);
} catch (ConnectFailureException)
{
m_socket = null;
}
}

if (m_socket == null && endpoint.AddressFamily != AddressFamily.InterNetworkV6)
{
m_socket = socketFactory(AddressFamily.InterNetwork);
Connect(m_socket, endpoint, connectionTimeout);
m_socket = ConnectUsingIPv4(endpoint, socketFactory, connectionTimeout);
}

Stream netstream = m_socket.GetStream();
Expand Down Expand Up @@ -164,12 +155,10 @@ public int ReadTimeout
m_socket.ReceiveTimeout = value;
}
}
#pragma warning disable 0168
catch (SocketException _)
catch (SocketException)
{
// means that the socket is already closed
}
#pragma warning restore 0168
}
}

Expand All @@ -190,13 +179,6 @@ public void Close()
{
try
{
try
{

} catch (ArgumentException)
{
// ignore, we are closing anyway
};
m_socket.Close();
}
catch (Exception)
Expand Down Expand Up @@ -273,14 +255,48 @@ public void Flush()
}
}

private void Connect(ITcpClient socket, AmqpTcpEndpoint endpoint, int timeout)
private bool ShouldTryIPv6(AmqpTcpEndpoint endpoint)
{
return (Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork);
}

private ITcpClient ConnectUsingIPv6(AmqpTcpEndpoint endpoint,
Func<AddressFamily, ITcpClient> socketFactory,
int timeout)
{
return ConnectUsingAddressFamily(endpoint, socketFactory, timeout, AddressFamily.InterNetworkV6);
}

private ITcpClient ConnectUsingIPv4(AmqpTcpEndpoint endpoint,
Func<AddressFamily, ITcpClient> socketFactory,
int timeout)
{
return ConnectUsingAddressFamily(endpoint, socketFactory, timeout, AddressFamily.InterNetwork);
}

private ITcpClient ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint,
Func<AddressFamily, ITcpClient> socketFactory,
int timeout, AddressFamily family)
{
ITcpClient socket = socketFactory(family);
try {
ConnectOrFail(socket, endpoint, timeout);
return socket;
} catch (ConnectFailureException e) {
socket.Dispose();
throw e;
}
}

private void ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, int timeout)
{
try
{
socket.ConnectAsync(endpoint.HostName, endpoint.Port)
.TimeoutAfter(timeout)
.ConfigureAwait(false)
.GetAwaiter()//this ensures exceptions aren't wrapped in an AggregateException
// this ensures exceptions aren't wrapped in an AggregateException
.GetAwaiter()
.GetResult();
}
catch (ArgumentException e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,15 @@ public virtual async Task ConnectAsync(string host, int port)

public virtual void Close()
{
if(sock != null)
this.Dispose();
}

public virtual void Dispose()
{
if (sock != null)
{
sock.Dispose();
}
sock = null;
}

Expand Down
97 changes: 0 additions & 97 deletions projects/client/Unit/src/unit/TestConnectionChurnHandleLeak.cs

This file was deleted.

0 comments on commit e71e5d4

Please sign in to comment.