Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRILL-7191 / DRILL-7026]: RM state blob persistence in Zookeeper and Integration of Distributed queue configuration with Planner #1762

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ private ExecConstants() {
new OptionDescription("Indicates how long a query can wait in queue before the query fails. Range: 0-9223372036854775807"));

// New Smart RM boot time configs
public static final String RM_WAIT_THREAD_INTERVAL = "exec.rm.wait_thread_interval";
public static final String RM_QUERY_TAGS_KEY = "exec.rm.queryTags";
public static final StringValidator RM_QUERY_TAGS_VALIDATOR = new StringValidator(RM_QUERY_TAGS_KEY,
new OptionDescription("Allows user to set coma separated list of tags for all the queries submitted over a session"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
*/
package org.apache.drill.exec.coord;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.coord.store.TransientStoreConfig;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities
* as well as understand other node's existence and capabilities.
Expand Down Expand Up @@ -60,6 +61,10 @@ public abstract class ClusterCoordinator implements AutoCloseable {
*/
public abstract Collection<DrillbitEndpoint> getAvailableEndpoints();

public Map<String, DrillbitEndpoint> getAvailableEndpointsUUID() {
throw new UnsupportedOperationException("Only supported by Zookeeper Cluster Coordinator outside YARN");
}

/**
* Get a collection of ONLINE drillbit endpoints by excluding the drillbits
* that are in QUIESCENT state (drillbits that are shutting down). Primarily used by the planner
Expand All @@ -70,6 +75,10 @@ public abstract class ClusterCoordinator implements AutoCloseable {

public abstract Collection<DrillbitEndpoint> getOnlineEndPoints();

public Map<DrillbitEndpoint, String> getOnlineEndpointsUUID() {
throw new UnsupportedOperationException("Only supported by Zookeeper Cluster Coordinator outside YARN");
}

public abstract RegistrationHandle update(RegistrationHandle handle, State state);

public interface RegistrationHandle {
Expand All @@ -79,6 +88,8 @@ public interface RegistrationHandle {
*/
public abstract DrillbitEndpoint getEndPoint();

public abstract String getId();

public abstract void setEndPoint(DrillbitEndpoint endpoint);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,6 @@
*/
package org.apache.drill.exec.coord.local;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.DistributedSemaphore;
import org.apache.drill.exec.coord.store.CachingTransientStoreFactory;
Expand All @@ -34,9 +25,17 @@
import org.apache.drill.exec.coord.store.TransientStoreFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;

import org.apache.drill.shaded.guava.com.google.common.collect.Maps;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class LocalClusterCoordinator extends ClusterCoordinator {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class);

Expand Down Expand Up @@ -103,7 +102,15 @@ public RegistrationHandle update(RegistrationHandle handle, State state) {

@Override
public Collection<DrillbitEndpoint> getAvailableEndpoints() {
return endpoints.values();
return getAvailableEndpointsUUID().values();
}

public Map<String, DrillbitEndpoint> getAvailableEndpointsUUID() {
Map<String, DrillbitEndpoint> availableEndpointsUUID = new HashMap<>();
for (Map.Entry<RegistrationHandle, DrillbitEndpoint> entry : endpoints.entrySet()) {
availableEndpointsUUID.put(entry.getKey().getId(), entry.getValue());
}
return availableEndpointsUUID;
}

/**
Expand All @@ -115,19 +122,27 @@ public Collection<DrillbitEndpoint> getAvailableEndpoints() {
*/
@Override
public Collection<DrillbitEndpoint> getOnlineEndPoints() {
Collection<DrillbitEndpoint> runningEndPoints = new ArrayList<>();
for (DrillbitEndpoint endpoint: endpoints.values()){
if(isDrillbitInState(endpoint, State.ONLINE)) {
runningEndPoints.add(endpoint);
return getOnlineEndpointsUUID().keySet();
}

public Map<DrillbitEndpoint, String> getOnlineEndpointsUUID() {
Map<DrillbitEndpoint, String> onlineEndpointsUUID = new HashMap<>();
for (Map.Entry<RegistrationHandle, DrillbitEndpoint> entry : endpoints.entrySet()) {
if(isDrillbitInState(entry.getValue(), State.ONLINE)) {
onlineEndpointsUUID.put(entry.getValue(), entry.getKey().getId());
}
}
return runningEndPoints;
return onlineEndpointsUUID;
}

private class Handle implements RegistrationHandle {
private final UUID id = UUID.randomUUID();
private DrillbitEndpoint drillbitEndpoint;

private Handle(DrillbitEndpoint data) {
drillbitEndpoint = data;
}

/**
* Get the drillbit endpoint associated with the registration handle
* @return drillbit endpoint
Expand All @@ -140,8 +155,9 @@ public void setEndPoint(DrillbitEndpoint endpoint) {
this.drillbitEndpoint = endpoint;
}

private Handle(DrillbitEndpoint data) {
drillbitEndpoint = data;
@Override
public String getId() {
return id.toString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,11 @@
*/
package org.apache.drill.exec.coord.zk;

import static org.apache.drill.shaded.guava.com.google.common.collect.Collections2.transform;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.drill.shaded.guava.com.google.common.base.Throwables;
import org.apache.commons.collections.keyvalue.MultiKey;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
Expand All @@ -57,7 +42,21 @@
import org.apache.drill.exec.coord.store.TransientStoreFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
import org.apache.drill.shaded.guava.com.google.common.base.Throwables;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Manages cluster coordination utilizing zookeeper. *
Expand All @@ -74,8 +73,8 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
private ServiceCache<DrillbitEndpoint> serviceCache;
private DrillbitEndpoint endpoint;

// endpointsMap maps Multikey( comprises of endoint address and port) to Drillbit endpoints
private ConcurrentHashMap<MultiKey, DrillbitEndpoint> endpointsMap = new ConcurrentHashMap<MultiKey,DrillbitEndpoint>();
// endpointsMap maps String UUID to Drillbit endpoints
private ConcurrentHashMap<String, DrillbitEndpoint> endpointsMap = new ConcurrentHashMap<>();
private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$");

public ZKClusterCoordinator(DrillConfig config, String connect) {
Expand Down Expand Up @@ -237,7 +236,12 @@ public RegistrationHandle update(RegistrationHandle handle, State state) {

@Override
public Collection<DrillbitEndpoint> getAvailableEndpoints() {
return this.endpoints;
return getAvailableEndpointsUUID().values();
}

@Override
public Map<String, DrillbitEndpoint> getAvailableEndpointsUUID() {
return this.endpointsMap;
}

/*
Expand All @@ -249,14 +253,19 @@ public Collection<DrillbitEndpoint> getAvailableEndpoints() {
*/
@Override
public Collection<DrillbitEndpoint> getOnlineEndPoints() {
Collection<DrillbitEndpoint> runningEndPoints = new ArrayList<>();
for (DrillbitEndpoint endpoint: endpoints){
if(isDrillbitInState(endpoint, State.ONLINE)) {
runningEndPoints.add(endpoint);
return getOnlineEndpointsUUID().keySet();
}

@Override
public Map<DrillbitEndpoint, String> getOnlineEndpointsUUID() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is duplicate code similar to that of the LocalClusterCoordinator. Can you move this function into a common place and use it in both the places?

Map<DrillbitEndpoint, String> onlineEndpointsUUID = new HashMap<>();
for (Map.Entry<String, DrillbitEndpoint> endpointEntry : endpointsMap.entrySet()) {
if (isDrillbitInState(endpointEntry.getValue(), State.ONLINE)) {
onlineEndpointsUUID.put(endpointEntry.getValue(), endpointEntry.getKey());
}
}
logger.debug("Online endpoints in ZK are" + runningEndPoints.toString());
return runningEndPoints;
logger.debug("Online endpoints in ZK are" + onlineEndpointsUUID.keySet().toString());
return onlineEndpointsUUID;
}

@Override
Expand All @@ -273,14 +282,11 @@ public <V> TransientStore<V> getOrCreateTransientStore(final TransientStoreConfi
private synchronized void updateEndpoints() {
try {
// All active bits in the Zookeeper
Collection<DrillbitEndpoint> newDrillbitSet =
transform(discovery.queryForInstances(serviceName),
new Function<ServiceInstance<DrillbitEndpoint>, DrillbitEndpoint>() {
@Override
public DrillbitEndpoint apply(ServiceInstance<DrillbitEndpoint> input) {
return input.getPayload();
}
});
final Map<String, DrillbitEndpoint> activeEndpointsUUID = discovery.queryForInstances(serviceName).stream()
.collect(Collectors.toMap(ServiceInstance::getId, ServiceInstance::getPayload));

final Map<DrillbitEndpoint, String> UUIDtoEndpoints = activeEndpointsUUID.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));

// set of newly dead bits : original bits - new set of active bits.
Set<DrillbitEndpoint> unregisteredBits = new HashSet<>();
Expand All @@ -290,29 +296,32 @@ public DrillbitEndpoint apply(ServiceInstance<DrillbitEndpoint> input) {

// Updates the endpoints map if there is a change in state of the endpoint or with the addition
// of new drillbit endpoints. Registered endpoints is set to newly live drillbit endpoints.
for ( DrillbitEndpoint endpoint : newDrillbitSet) {
String endpointAddress = endpoint.getAddress();
int endpointPort = endpoint.getUserPort();
if (! endpointsMap.containsKey(new MultiKey(endpointAddress, endpointPort))) {
registeredBits.add(endpoint);
}
endpointsMap.put(new MultiKey(endpointAddress, endpointPort),endpoint);
for (Map.Entry<String, DrillbitEndpoint> endpointToUUID : activeEndpointsUUID.entrySet()) {
endpointsMap.put(endpointToUUID.getKey(), endpointToUUID.getValue());
}

// Remove all the endpoints that are newly dead
for ( MultiKey key: endpointsMap.keySet()) {
if(!newDrillbitSet.contains(endpointsMap.get(key))) {
unregisteredBits.add(endpointsMap.get(key));
endpointsMap.remove(key);
for ( String bitUUID: endpointsMap.keySet()) {
if (!activeEndpointsUUID.containsKey(bitUUID)) {
final DrillbitEndpoint unregisteredBit = endpointsMap.get(bitUUID);
unregisteredBits.add(unregisteredBit);

if (UUIDtoEndpoints.containsKey(unregisteredBit)) {
logger.info("Drillbit registered again with different UUID. [Details: Address: {}, UserPort: {}," +
" PreviousUUID: {}, CurrentUUID: {}", unregisteredBit.getAddress(), unregisteredBit.getUserPort(),
bitUUID, UUIDtoEndpoints.get(unregisteredBit));
}
endpointsMap.remove(bitUUID);
}
}
endpoints = endpointsMap.values();
if (logger.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("Active drillbit set changed. Now includes ");
builder.append(newDrillbitSet.size());
builder.append(activeEndpointsUUID.size());
builder.append(" total bits. New active drillbits:\n");
builder.append("Address | User Port | Control Port | Data Port | Version | State\n");
for (DrillbitEndpoint bit: newDrillbitSet) {
for (DrillbitEndpoint bit: activeEndpointsUUID.values()) {
builder.append(bit.getAddress()).append(" | ");
builder.append(bit.getUserPort()).append(" | ");
builder.append(bit.getControlPort()).append(" | ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,24 @@ public class ZKRegistrationHandle implements RegistrationHandle {
public final String id;
public DrillbitEndpoint endpoint;

public ZKRegistrationHandle(String id, DrillbitEndpoint endpoint) {
super();
this.id = id;
this.endpoint = endpoint;
}

public DrillbitEndpoint getEndPoint() {
return endpoint;
}

public String getId() {
return id;
}

@Override
public void setEndPoint(DrillbitEndpoint endpoint) {
this.endpoint = endpoint;
}

public ZKRegistrationHandle(String id, DrillbitEndpoint endpoint) {
super();
this.id = id;
this.endpoint = endpoint;
}

}
Loading