Skip to content

Commit

Permalink
Only insert hostnames if not present for internal broadcast addrs (#455)
Browse files Browse the repository at this point in the history
  • Loading branch information
zpear authored Nov 22, 2023
1 parent 07430a0 commit 80068a8
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ public void doVerb(MessageIn<CrossVpcIpMappingAck> message, int id)
targetName,
targetInternal,
targetExternal);
CrossVpcIpMappingHandshaker.instance.updateCrossVpcMappings(targetName, targetInternal, targetExternal);
CrossVpcIpMappingHandshaker.instance.updateCrossVpcMappings(targetName, targetInternal);
}
}
117 changes: 42 additions & 75 deletions src/java/com/palantir/cassandra/cvim/CrossVpcIpMappingHandshaker.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.InetAddress;
import java.time.Duration;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -57,128 +58,101 @@ public class CrossVpcIpMappingHandshaker
public final static Duration scheduledInterval = Duration.ofSeconds(1);
public final static Duration minHandshakeInterval = Duration.ofMillis(500);
private static volatile long lastTriggeredHandshakeMillis = 0;
private final ConcurrentHashMap<InetAddressIp, InetAddressIp> privatePublicIpMappings;
private final ConcurrentHashMap<InetAddressIp, InetAddressHostname> ipHostnameMappings;

// A map of broadcast address -> hostname, for use when the broadcast address of a given node may be in a different
// VPC than this node.
private final ConcurrentHashMap<InetAddressIp, InetAddressHostname> privateIpToHostname;

private CrossVpcIpMappingHandshaker()
{
this.privatePublicIpMappings = new ConcurrentHashMap<>();
this.ipHostnameMappings = new ConcurrentHashMap<>();
this.privateIpToHostname = new ConcurrentHashMap<>();
}

public void updateCrossVpcMappings(InetAddressHostname host, InetAddressIp internalIp, InetAddressIp externalIp)
public void updateCrossVpcMappings(InetAddressHostname host, InetAddressIp internalIp)
{
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled()) {
return;
}
InetAddressIp oldExternalIp = this.privatePublicIpMappings.get(internalIp);
if (!externalIp.equals(oldExternalIp))
{
this.privatePublicIpMappings.put(internalIp, externalIp);
logger.trace("Updated private/public IP mapping for {} from {}->{} to {}",
host, internalIp, oldExternalIp, externalIp);
}

InetAddressHostname old = this.ipHostnameMappings.get(internalIp);
if (!host.equals(old))
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled())
{
this.ipHostnameMappings.put(internalIp, host);
logger.trace("Updated private IP/hostname mapping from {}->{} to {}", internalIp, old, host);
return;
}
old = this.ipHostnameMappings.get(externalIp);
InetAddressHostname old = this.privateIpToHostname.get(internalIp);
if (!host.equals(old))
{
this.ipHostnameMappings.put(externalIp, host);
logger.trace("Updated public IP/hostname mapping from {}->{} to {}", externalIp, old, host);
this.privateIpToHostname.put(internalIp, host);
logger.warn("Updated private IP to hostname mapping from {}->{} to {}->{}", internalIp, old, internalIp, host);
}
}

/**
* Depending on which cross-vpc flags are enabled/disabled, will check the cross-vpc mappings and either swap the
* given endpoint with an endpoint derived from a public IP, or DNS using a hostname. If no mappings are found will
* return the original endpoint.
* Depending on which cross-vpc flags are enabled/disabled, will check the cross-vpc mappings and update the
* endpoint with one derived from DNS using a hostname. If no mappings are found will return the original endpoint.
*/
public InetAddress maybeSwapAddress(InetAddress endpoint)
public InetAddress maybeUpdateAddress(InetAddress endpoint)
{
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled()) {
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled())
{
return endpoint;
}
InetAddressIp proposedAddress = new InetAddressIp(endpoint.getHostAddress());
if (DatabaseDescriptor.isCrossVpcHostnameSwappingEnabled() && ipHostnameMappings.containsKey(proposedAddress)) {
return maybeSwapHostname(endpoint);
}
if (DatabaseDescriptor.isCrossVpcIpSwappingEnabled() && privatePublicIpMappings.containsKey(proposedAddress)) {
return maybeSwapIp(endpoint);
if (DatabaseDescriptor.isCrossVpcHostnameSwappingEnabled() && privateIpToHostname.containsKey(proposedAddress))
{
return maybeInsertHostname(endpoint);
}
return endpoint;
}

private InetAddress maybeSwapHostname(InetAddress endpoint)
private InetAddress maybeInsertHostname(InetAddress endpoint)
{
InetAddressHostname hostname = ipHostnameMappings.get(new InetAddressIp(endpoint.getHostAddress()));
InetAddressHostname hostname = privateIpToHostname.get(new InetAddressIp(endpoint.getHostAddress()));
logger.trace("Performing DNS lookup for host {}", hostname);
InetAddress resolved;
Set<InetAddress> resolved;
try
{
resolved = InetAddress.getByName(hostname.toString());
resolved = Arrays.stream(InetAddress.getAllByName(hostname.toString())).collect(Collectors.toSet());
}
catch (UnknownHostException e)
{
logger.error("Cross VPC mapping contains unresolvable hostname for endpoint {} (unresolved: {})",
endpoint, hostname);
return endpoint;
}
if (!resolved.equals(endpoint))
if (!resolved.contains(endpoint))
{
logger.trace("DNS-resolved address different than provided endpoint. Swapping. provided: {} resolved: {}",
logger.debug("DNS-resolved address different than provided endpoint. This should mean that the endpoint " +
"includes a VPC-internal IP. Swapping. provided: {} resolved: {}",
endpoint, resolved);
return resolved;
}
return endpoint;
}

private InetAddress maybeSwapIp(InetAddress endpoint)
{
InetAddressIp proposedIp = new InetAddressIp(endpoint.getHostAddress());
InetAddressIp mappedIp = privatePublicIpMappings.get(proposedIp);
if (!mappedIp.equals(proposedIp))
return resolved.stream().findFirst().get();
} else
{
try
{
InetAddress result = InetAddress.getByName(mappedIp.toString());
logger.trace("Swapped address {} for {} via private-public IP mapping", endpoint, result);
return result;
}
catch (UnknownHostException e)
{
logger.error("Failed to resolve host for externally-mapped IP {}->{}. " +
"Ensure the address mapping does not contain hostnames", endpoint, mappedIp);
}
logger.trace("Endpoint matches resolved addresses. Not taking any action. provided: {} resolved: {}",
endpoint, resolved);
}
return endpoint;
}

/**
* Checks cross-vpc mapping to return an associated hostname with the given endpoint if present. Use this method
* if you don't want to invoke reverse-DNS like {@link #maybeSwapHostname(InetAddress)} within
* {@link #maybeSwapAddress(InetAddress)} does. Additionally note that this method does not _swap_ hostnames, only
* if you don't want to invoke reverse-DNS like {@link #maybeInsertHostname(InetAddress)} within
* {@link #maybeUpdateAddress(InetAddress)} does. Additionally note that this method does not _swap_ hostnames, only
* provides the hostname associated with a given endpoint if it is present in the cross-vpc mapping.
*/
public Optional<InetAddressHostname> getAssociatedHostname(InetAddress endpoint)
{
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled()) {
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled())
{
return Optional.empty();
}
InetAddressIp ip = new InetAddressIp(endpoint.getHostAddress());
return Optional.ofNullable(ipHostnameMappings.get(ip));
return Optional.ofNullable(privateIpToHostname.get(ip));
}

public void triggerHandshakeWithAllPeers()
{
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled()) {
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled())
{
return;
}
try {
try
{
if (System.currentTimeMillis() - lastTriggeredHandshakeMillis < minHandshakeInterval.toMillis())
{
logger.trace("Ignoring handshake request as last handshake is too recent");
Expand Down Expand Up @@ -288,22 +262,15 @@ void setLastTriggeredHandshakeMillis(long millis)
lastTriggeredHandshakeMillis = millis;
}

@VisibleForTesting
Map<InetAddressIp, InetAddressIp> getCrossVpcIpMapping()
{
return this.privatePublicIpMappings;
}

@VisibleForTesting
Map<InetAddressIp, InetAddressHostname> getCrossVpcIpHostnameMapping()
{
return this.ipHostnameMappings;
return this.privateIpToHostname;
}

@VisibleForTesting
void clearMappings()
{
this.ipHostnameMappings.clear();
this.privatePublicIpMappings.clear();
this.privateIpToHostname.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void doVerb(MessageIn<CrossVpcIpMappingSyn> message, int id) throws Unkno
targetExternalIp);


CrossVpcIpMappingHandshaker.instance.updateCrossVpcMappings(sourceName, sourceInternalIp, sourceExternalIp);
CrossVpcIpMappingHandshaker.instance.updateCrossVpcMappings(sourceName, sourceInternalIp);

CrossVpcIpMappingAck ack = new CrossVpcIpMappingAck(targetName, targetInternalIp, targetExternalIp);
MessageOut<CrossVpcIpMappingAck> ackMessage = new MessageOut<>(MessagingService.Verb.CROSS_VPC_IP_MAPPING_ACK,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,7 @@ public static void setCrossVpcInternodeCommunication(boolean setting)
conf.cross_vpc_internode_communication_enabled = setting;
}

@Deprecated
public static Boolean isCrossVpcIpSwappingEnabled()
{
return conf.cross_vpc_ip_swapping_enabled;
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/security/SSLFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static SSLServerSocket getServerSocket(EncryptionOptions options, InetAdd
public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port, InetAddress localAddress, int localPort) throws Exception
{
SSLContext ctx = createSSLContext(options, true);
InetAddress mappedAddress = CrossVpcIpMappingHandshaker.instance.maybeSwapAddress(address);
InetAddress mappedAddress = CrossVpcIpMappingHandshaker.instance.maybeUpdateAddress(address);
SSLSocket socket = maybeConnectWithTimeout(() -> (SSLSocket) ctx.getSocketFactory().createSocket(mappedAddress, port, localAddress, localPort));
prepareSocket(socket, options, mappedAddress);
return socket;
Expand All @@ -105,7 +105,7 @@ public static SSLSocket getSocket(EncryptionOptions options, InetAddress address
public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port) throws Exception
{
SSLContext ctx = createSSLContext(options, true);
InetAddress mappedAddress = CrossVpcIpMappingHandshaker.instance.maybeSwapAddress(address);
InetAddress mappedAddress = CrossVpcIpMappingHandshaker.instance.maybeUpdateAddress(address);
SSLSocket socket = maybeConnectWithTimeout(() -> (SSLSocket) ctx.getSocketFactory().createSocket(mappedAddress, port));
prepareSocket(socket, options, mappedAddress);
return socket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ public void doVerb_invokedByMessagingService() throws UnknownHostException
@Test
public void doVerb_invokesCrossVpcIpMappingHandshaker() throws UnknownHostException
{
InetAddress remote = InetAddress.getByName("127.0.0.2");
InetAddressHostname targetName = new InetAddressHostname("target");
InetAddressIp targetExternalIp = new InetAddressIp("2.2.2.2");
InetAddressIp targetInternalIp = new InetAddressIp("127.0.0.1");
// localhost/10.100.0.1 -> localhost/127.0.0.1
InetAddress remote = InetAddress.getByName("10.100.0.1");
InetAddressHostname targetName = new InetAddressHostname("localhost");
InetAddressIp targetExternalIp = new InetAddressIp("2.2.2.2"); // some other proxy IP
InetAddressIp targetInternalIp = new InetAddressIp("10.100.0.1");
InetAddress input = InetAddress.getByName(targetInternalIp.toString());
CrossVpcIpMappingAck ack = new CrossVpcIpMappingAck(targetName, targetInternalIp, targetExternalIp);
MessageIn<CrossVpcIpMappingAck> messageIn = MessageIn.create(remote,
Expand All @@ -79,12 +80,12 @@ public void doVerb_invokesCrossVpcIpMappingHandshaker() throws UnknownHostExcept
MessagingService.Verb.CROSS_VPC_IP_MAPPING_ACK,
MessagingService.current_version);
DatabaseDescriptor.setCrossVpcInternodeCommunication(true);
DatabaseDescriptor.setCrossVpcHostnameSwapping(false);
DatabaseDescriptor.setCrossVpcHostnameSwapping(true);
DatabaseDescriptor.setCrossVpcIpSwapping(true);
InetAddress result = CrossVpcIpMappingHandshaker.instance.maybeSwapAddress(input);
InetAddress result = CrossVpcIpMappingHandshaker.instance.maybeUpdateAddress(input);
assertThat(result.getHostAddress()).isNotEqualTo(targetExternalIp.toString());
handler.doVerb(messageIn, 0);
result = CrossVpcIpMappingHandshaker.instance.maybeSwapAddress(input);
assertThat(result.getHostAddress()).isEqualTo(targetExternalIp.toString());
result = CrossVpcIpMappingHandshaker.instance.maybeUpdateAddress(input);
assertThat(result.getHostAddress()).isEqualTo("127.0.0.1");
}
}
Loading

0 comments on commit 80068a8

Please sign in to comment.