Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only insert hostnames if not present for internal broadcast addrs #455

Merged
merged 4 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ public void doVerb(MessageIn<CrossVpcIpMappingAck> message, int id)
targetName,
targetInternal,
targetExternal);
CrossVpcIpMappingHandshaker.instance.updateCrossVpcMappings(targetName, targetInternal, targetExternal);
CrossVpcIpMappingHandshaker.instance.updateCrossVpcMappings(targetName, targetInternal);
}
}
117 changes: 42 additions & 75 deletions src/java/com/palantir/cassandra/cvim/CrossVpcIpMappingHandshaker.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.InetAddress;
import java.time.Duration;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -57,128 +58,101 @@ public class CrossVpcIpMappingHandshaker
public final static Duration scheduledInterval = Duration.ofSeconds(1);
public final static Duration minHandshakeInterval = Duration.ofMillis(500);
private static volatile long lastTriggeredHandshakeMillis = 0;
private final ConcurrentHashMap<InetAddressIp, InetAddressIp> privatePublicIpMappings;
private final ConcurrentHashMap<InetAddressIp, InetAddressHostname> ipHostnameMappings;

// A map of broadcast address -> hostname, for use when the broadcast address of a given node may be in a different
// VPC than this node.
private final ConcurrentHashMap<InetAddressIp, InetAddressHostname> privateIpToHostname;

private CrossVpcIpMappingHandshaker()
{
this.privatePublicIpMappings = new ConcurrentHashMap<>();
this.ipHostnameMappings = new ConcurrentHashMap<>();
this.privateIpToHostname = new ConcurrentHashMap<>();
}

public void updateCrossVpcMappings(InetAddressHostname host, InetAddressIp internalIp, InetAddressIp externalIp)
public void updateCrossVpcMappings(InetAddressHostname host, InetAddressIp internalIp)
{
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled()) {
return;
}
InetAddressIp oldExternalIp = this.privatePublicIpMappings.get(internalIp);
if (!externalIp.equals(oldExternalIp))
{
this.privatePublicIpMappings.put(internalIp, externalIp);
logger.trace("Updated private/public IP mapping for {} from {}->{} to {}",
host, internalIp, oldExternalIp, externalIp);
}

InetAddressHostname old = this.ipHostnameMappings.get(internalIp);
if (!host.equals(old))
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled())
{
this.ipHostnameMappings.put(internalIp, host);
logger.trace("Updated private IP/hostname mapping from {}->{} to {}", internalIp, old, host);
return;
}
old = this.ipHostnameMappings.get(externalIp);
InetAddressHostname old = this.privateIpToHostname.get(internalIp);
if (!host.equals(old))
{
this.ipHostnameMappings.put(externalIp, host);
logger.trace("Updated public IP/hostname mapping from {}->{} to {}", externalIp, old, host);
this.privateIpToHostname.put(internalIp, host);
logger.warn("Updated private IP to hostname mapping from {}->{} to {}->{}", internalIp, old, internalIp, host);
}
}

/**
* Depending on which cross-vpc flags are enabled/disabled, will check the cross-vpc mappings and either swap the
* given endpoint with an endpoint derived from a public IP, or DNS using a hostname. If no mappings are found will
* return the original endpoint.
* Depending on which cross-vpc flags are enabled/disabled, will check the cross-vpc mappings and update the
* endpoint with one derived from DNS using a hostname. If no mappings are found will return the original endpoint.
*/
public InetAddress maybeSwapAddress(InetAddress endpoint)
public InetAddress maybeUpdateAddress(InetAddress endpoint)
{
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled()) {
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled())
{
return endpoint;
}
InetAddressIp proposedAddress = new InetAddressIp(endpoint.getHostAddress());
if (DatabaseDescriptor.isCrossVpcHostnameSwappingEnabled() && ipHostnameMappings.containsKey(proposedAddress)) {
return maybeSwapHostname(endpoint);
}
if (DatabaseDescriptor.isCrossVpcIpSwappingEnabled() && privatePublicIpMappings.containsKey(proposedAddress)) {
return maybeSwapIp(endpoint);
if (DatabaseDescriptor.isCrossVpcHostnameSwappingEnabled() && privateIpToHostname.containsKey(proposedAddress))
{
return maybeInsertHostname(endpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: { should be on the next line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing on a couple other places in this class

}
return endpoint;
}

private InetAddress maybeSwapHostname(InetAddress endpoint)
private InetAddress maybeInsertHostname(InetAddress endpoint)
{
InetAddressHostname hostname = ipHostnameMappings.get(new InetAddressIp(endpoint.getHostAddress()));
InetAddressHostname hostname = privateIpToHostname.get(new InetAddressIp(endpoint.getHostAddress()));
logger.trace("Performing DNS lookup for host {}", hostname);
InetAddress resolved;
Set<InetAddress> resolved;
try
{
resolved = InetAddress.getByName(hostname.toString());
resolved = Arrays.stream(InetAddress.getAllByName(hostname.toString())).collect(Collectors.toSet());
}
catch (UnknownHostException e)
{
logger.error("Cross VPC mapping contains unresolvable hostname for endpoint {} (unresolved: {})",
endpoint, hostname);
return endpoint;
}
if (!resolved.equals(endpoint))
if (!resolved.contains(endpoint))
{
logger.trace("DNS-resolved address different than provided endpoint. Swapping. provided: {} resolved: {}",
logger.debug("DNS-resolved address different than provided endpoint. This should mean that the endpoint " +
"includes a VPC-internal IP. Swapping. provided: {} resolved: {}",
endpoint, resolved);
return resolved;
}
return endpoint;
}

private InetAddress maybeSwapIp(InetAddress endpoint)
{
InetAddressIp proposedIp = new InetAddressIp(endpoint.getHostAddress());
InetAddressIp mappedIp = privatePublicIpMappings.get(proposedIp);
if (!mappedIp.equals(proposedIp))
return resolved.stream().findFirst().get();
} else
{
try
{
InetAddress result = InetAddress.getByName(mappedIp.toString());
logger.trace("Swapped address {} for {} via private-public IP mapping", endpoint, result);
return result;
}
catch (UnknownHostException e)
{
logger.error("Failed to resolve host for externally-mapped IP {}->{}. " +
"Ensure the address mapping does not contain hostnames", endpoint, mappedIp);
}
logger.trace("Endpoint matches resolved addresses. Not taking any action. provided: {} resolved: {}",
endpoint, resolved);
}
return endpoint;
}

/**
* Checks cross-vpc mapping to return an associated hostname with the given endpoint if present. Use this method
* if you don't want to invoke reverse-DNS like {@link #maybeSwapHostname(InetAddress)} within
* {@link #maybeSwapAddress(InetAddress)} does. Additionally note that this method does not _swap_ hostnames, only
* if you don't want to invoke reverse-DNS like {@link #maybeInsertHostname(InetAddress)} within
* {@link #maybeUpdateAddress(InetAddress)} does. Additionally note that this method does not _swap_ hostnames, only
* provides the hostname associated with a given endpoint if it is present in the cross-vpc mapping.
*/
public Optional<InetAddressHostname> getAssociatedHostname(InetAddress endpoint)
{
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled()) {
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled())
{
return Optional.empty();
}
InetAddressIp ip = new InetAddressIp(endpoint.getHostAddress());
return Optional.ofNullable(ipHostnameMappings.get(ip));
return Optional.ofNullable(privateIpToHostname.get(ip));
}

public void triggerHandshakeWithAllPeers()
{
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled()) {
if (!DatabaseDescriptor.isCrossVpcInternodeCommunicationEnabled())
{
return;
}
try {
try
{
if (System.currentTimeMillis() - lastTriggeredHandshakeMillis < minHandshakeInterval.toMillis())
{
logger.trace("Ignoring handshake request as last handshake is too recent");
Expand Down Expand Up @@ -288,22 +262,15 @@ void setLastTriggeredHandshakeMillis(long millis)
lastTriggeredHandshakeMillis = millis;
}

@VisibleForTesting
Map<InetAddressIp, InetAddressIp> getCrossVpcIpMapping()
{
return this.privatePublicIpMappings;
}

@VisibleForTesting
Map<InetAddressIp, InetAddressHostname> getCrossVpcIpHostnameMapping()
{
return this.ipHostnameMappings;
return this.privateIpToHostname;
}

@VisibleForTesting
void clearMappings()
{
this.ipHostnameMappings.clear();
this.privatePublicIpMappings.clear();
this.privateIpToHostname.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void doVerb(MessageIn<CrossVpcIpMappingSyn> message, int id) throws Unkno
targetExternalIp);


CrossVpcIpMappingHandshaker.instance.updateCrossVpcMappings(sourceName, sourceInternalIp, sourceExternalIp);
CrossVpcIpMappingHandshaker.instance.updateCrossVpcMappings(sourceName, sourceInternalIp);

CrossVpcIpMappingAck ack = new CrossVpcIpMappingAck(targetName, targetInternalIp, targetExternalIp);
MessageOut<CrossVpcIpMappingAck> ackMessage = new MessageOut<>(MessagingService.Verb.CROSS_VPC_IP_MAPPING_ACK,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,7 @@ public static void setCrossVpcInternodeCommunication(boolean setting)
conf.cross_vpc_internode_communication_enabled = setting;
}

@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's file a github issue to remove this entirely at some point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done #456

public static Boolean isCrossVpcIpSwappingEnabled()
{
return conf.cross_vpc_ip_swapping_enabled;
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/security/SSLFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static SSLServerSocket getServerSocket(EncryptionOptions options, InetAdd
public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port, InetAddress localAddress, int localPort) throws Exception
{
SSLContext ctx = createSSLContext(options, true);
InetAddress mappedAddress = CrossVpcIpMappingHandshaker.instance.maybeSwapAddress(address);
InetAddress mappedAddress = CrossVpcIpMappingHandshaker.instance.maybeUpdateAddress(address);
SSLSocket socket = maybeConnectWithTimeout(() -> (SSLSocket) ctx.getSocketFactory().createSocket(mappedAddress, port, localAddress, localPort));
prepareSocket(socket, options, mappedAddress);
return socket;
Expand All @@ -105,7 +105,7 @@ public static SSLSocket getSocket(EncryptionOptions options, InetAddress address
public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port) throws Exception
{
SSLContext ctx = createSSLContext(options, true);
InetAddress mappedAddress = CrossVpcIpMappingHandshaker.instance.maybeSwapAddress(address);
InetAddress mappedAddress = CrossVpcIpMappingHandshaker.instance.maybeUpdateAddress(address);
SSLSocket socket = maybeConnectWithTimeout(() -> (SSLSocket) ctx.getSocketFactory().createSocket(mappedAddress, port));
prepareSocket(socket, options, mappedAddress);
return socket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ public void doVerb_invokedByMessagingService() throws UnknownHostException
@Test
public void doVerb_invokesCrossVpcIpMappingHandshaker() throws UnknownHostException
{
InetAddress remote = InetAddress.getByName("127.0.0.2");
InetAddressHostname targetName = new InetAddressHostname("target");
InetAddressIp targetExternalIp = new InetAddressIp("2.2.2.2");
InetAddressIp targetInternalIp = new InetAddressIp("127.0.0.1");
// localhost/10.100.0.1 -> localhost/127.0.0.1
InetAddress remote = InetAddress.getByName("10.100.0.1");
InetAddressHostname targetName = new InetAddressHostname("localhost");
InetAddressIp targetExternalIp = new InetAddressIp("2.2.2.2"); // some other proxy IP
InetAddressIp targetInternalIp = new InetAddressIp("10.100.0.1");
InetAddress input = InetAddress.getByName(targetInternalIp.toString());
CrossVpcIpMappingAck ack = new CrossVpcIpMappingAck(targetName, targetInternalIp, targetExternalIp);
MessageIn<CrossVpcIpMappingAck> messageIn = MessageIn.create(remote,
Expand All @@ -79,12 +80,12 @@ public void doVerb_invokesCrossVpcIpMappingHandshaker() throws UnknownHostExcept
MessagingService.Verb.CROSS_VPC_IP_MAPPING_ACK,
MessagingService.current_version);
DatabaseDescriptor.setCrossVpcInternodeCommunication(true);
DatabaseDescriptor.setCrossVpcHostnameSwapping(false);
DatabaseDescriptor.setCrossVpcHostnameSwapping(true);
DatabaseDescriptor.setCrossVpcIpSwapping(true);
InetAddress result = CrossVpcIpMappingHandshaker.instance.maybeSwapAddress(input);
InetAddress result = CrossVpcIpMappingHandshaker.instance.maybeUpdateAddress(input);
assertThat(result.getHostAddress()).isNotEqualTo(targetExternalIp.toString());
handler.doVerb(messageIn, 0);
result = CrossVpcIpMappingHandshaker.instance.maybeSwapAddress(input);
assertThat(result.getHostAddress()).isEqualTo(targetExternalIp.toString());
result = CrossVpcIpMappingHandshaker.instance.maybeUpdateAddress(input);
assertThat(result.getHostAddress()).isEqualTo("127.0.0.1");
}
}
Loading