Skip to content

Commit

Permalink
HDDS-10874. Create non-caching XceiverClientFactory implementation (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
adoroszlai authored Aug 8, 2024
1 parent a4be83f commit 9b6c142
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 130 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneSecurityUtil;

import java.io.IOException;

/**
* Factory for XceiverClientSpi implementations. Client instances are not cached.
*/
public class XceiverClientCreator implements XceiverClientFactory {
private final ConfigurationSource conf;
private final boolean topologyAwareRead;
private final ClientTrustManager trustManager;
private final boolean securityEnabled;

public XceiverClientCreator(ConfigurationSource conf) {
this(conf, null);
}

public XceiverClientCreator(ConfigurationSource conf, ClientTrustManager trustManager) {
this.conf = conf;
this.securityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
topologyAwareRead = conf.getBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
this.trustManager = trustManager;
if (securityEnabled) {
Preconditions.checkNotNull(trustManager);
}
}

public boolean isSecurityEnabled() {
return securityEnabled;
}

protected XceiverClientSpi newClient(Pipeline pipeline) throws IOException {
XceiverClientSpi client;
switch (pipeline.getType()) {
case RATIS:
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, trustManager);
break;
case STAND_ALONE:
client = new XceiverClientGrpc(pipeline, conf, trustManager);
break;
case EC:
client = new ECXceiverClientGrpc(pipeline, conf, trustManager);
break;
case CHAINED:
default:
throw new IOException("not implemented " + pipeline.getType());
}
try {
client.connect();
} catch (Exception e) {
throw new IOException(e);
}
return client;
}

@Override
public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException {
return acquireClient(pipeline, false);
}

@Override
public void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient) {
releaseClient(xceiverClient, invalidateClient, false);
}

@Override
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline) throws IOException {
return acquireClient(pipeline);
}

@Override
public void releaseClientForReadData(XceiverClientSpi xceiverClient, boolean invalidateClient) {
releaseClient(xceiverClient, invalidateClient, topologyAwareRead);
}

@Override
public XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware) throws IOException {
return newClient(pipeline);
}

@Override
public void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient, boolean topologyAware) {
IOUtils.closeQuietly(xceiverClient);
}

@Override
public void close() throws Exception {
// clients are not tracked, closing each client is the responsibility of users of this class
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,53 @@
*/
public interface XceiverClientFactory extends AutoCloseable {

/**
* Acquires a XceiverClientSpi connected to a container capable of
* storing the specified key. It does not consider the topology
* of the datanodes in the pipeline (e.g. closest datanode to the
* client)
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException;

void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient);
/**
* Releases a XceiverClientSpi after use.
*
* @param client client to release
* @param invalidateClient if true, invalidates the client in cache
*/
void releaseClient(XceiverClientSpi client, boolean invalidateClient);

/**
* Acquires a XceiverClientSpi connected to a container for read.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
throws IOException;

void releaseClientForReadData(XceiverClientSpi xceiverClient,
/**
* Releases a read XceiverClientSpi after use.
*
* @param client client to release
* @param invalidateClient if true, invalidates the client in cache
*/
void releaseClientForReadData(XceiverClientSpi client,
boolean invalidateClient);

/**
* Acquires a XceiverClientSpi connected to a container capable of
* storing the specified key.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
XceiverClientSpi acquireClient(Pipeline pipeline, boolean topologyAware)
throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hadoop.hdds.scm;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hdds.conf.Config;
Expand All @@ -30,8 +29,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -61,18 +58,14 @@
* without reestablishing connection. But the connection will be closed if
* not being used for a period of time.
*/
public class XceiverClientManager implements XceiverClientFactory {
public class XceiverClientManager extends XceiverClientCreator {
private static final Logger LOG =
LoggerFactory.getLogger(XceiverClientManager.class);
//TODO : change this to SCM configuration class
private final ConfigurationSource conf;
private final Cache<String, XceiverClientSpi> clientCache;
private final CacheMetrics cacheMetrics;
private ClientTrustManager trustManager;

private static XceiverClientMetrics metrics;
private boolean isSecurityEnabled;
private final boolean topologyAwareRead;

/**
* Creates a new XceiverClientManager for non secured ozone cluster.
* For security enabled ozone cluster, client should use the other constructor
Expand All @@ -87,15 +80,10 @@ public XceiverClientManager(ConfigurationSource conf) throws IOException {
public XceiverClientManager(ConfigurationSource conf,
ScmClientConfig clientConf,
ClientTrustManager trustManager) throws IOException {
super(conf, trustManager);
Preconditions.checkNotNull(clientConf);
Preconditions.checkNotNull(conf);
long staleThresholdMs = clientConf.getStaleThreshold(MILLISECONDS);
this.conf = conf;
this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
if (isSecurityEnabled) {
Preconditions.checkNotNull(trustManager);
this.trustManager = trustManager;
}

this.clientCache = CacheBuilder.newBuilder()
.recordStats()
Expand All @@ -114,9 +102,6 @@ public void onRemoval(
}
}
}).build();
topologyAwareRead = conf.getBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);

cacheMetrics = CacheMetrics.create(clientCache, this);
}
Expand All @@ -127,50 +112,10 @@ public Cache<String, XceiverClientSpi> getClientCache() {
}

/**
* Acquires a XceiverClientSpi connected to a container capable of
* storing the specified key. It does not consider the topology
* of the datanodes in the pipeline (e.g. closest datanode to the
* client)
*
* If there is already a cached XceiverClientSpi, simply return
* the cached otherwise create a new one.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
@Override
public XceiverClientSpi acquireClient(Pipeline pipeline)
throws IOException {
return acquireClient(pipeline, false);
}

/**
* Acquires a XceiverClientSpi connected to a container for read.
*
* If there is already a cached XceiverClientSpi, simply return
* the cached otherwise create a new one.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
@Override
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
throws IOException {
return acquireClient(pipeline, topologyAwareRead);
}

/**
* Acquires a XceiverClientSpi connected to a container capable of
* storing the specified key.
* {@inheritDoc}
*
* If there is already a cached XceiverClientSpi, simply return
* the cached otherwise create a new one.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
@Override
public XceiverClientSpi acquireClient(Pipeline pipeline,
Expand All @@ -187,29 +132,6 @@ public XceiverClientSpi acquireClient(Pipeline pipeline,
}
}

/**
* Releases a XceiverClientSpi after use.
*
* @param client client to release
* @param invalidateClient if true, invalidates the client in cache
*/
@Override
public void releaseClient(XceiverClientSpi client, boolean invalidateClient) {
releaseClient(client, invalidateClient, false);
}

/**
* Releases a read XceiverClientSpi after use.
*
* @param client client to release
* @param invalidateClient if true, invalidates the client in cache
*/
@Override
public void releaseClientForReadData(XceiverClientSpi client,
boolean invalidateClient) {
releaseClient(client, invalidateClient, topologyAwareRead);
}

@Override
public void releaseClient(XceiverClientSpi client, boolean invalidateClient,
boolean topologyAware) {
Expand All @@ -227,39 +149,16 @@ public void releaseClient(XceiverClientSpi client, boolean invalidateClient,
}
}

private XceiverClientSpi getClient(Pipeline pipeline, boolean topologyAware)
protected XceiverClientSpi getClient(Pipeline pipeline, boolean topologyAware)
throws IOException {
HddsProtos.ReplicationType type = pipeline.getType();
try {
// create different client different pipeline node based on
// network topology
String key = getPipelineCacheKey(pipeline, topologyAware);
return clientCache.get(key, new Callable<XceiverClientSpi>() {
@Override
public XceiverClientSpi call() throws Exception {
XceiverClientSpi client = null;
switch (type) {
case RATIS:
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf,
trustManager);
break;
case STAND_ALONE:
client = new XceiverClientGrpc(pipeline, conf, trustManager);
break;
case EC:
client = new ECXceiverClientGrpc(pipeline, conf, trustManager);
break;
case CHAINED:
default:
throw new IOException("not implemented " + pipeline.getType());
}
client.connect();
return client;
}
});
return clientCache.get(key, () -> newClient(pipeline));
} catch (Exception e) {
throw new IOException(
"Exception getting XceiverClient: " + e.toString(), e);
"Exception getting XceiverClient: " + e, e);
}
}

Expand Down Expand Up @@ -293,7 +192,7 @@ private String getPipelineCacheKey(Pipeline pipeline,
}
}

if (isSecurityEnabled) {
if (isSecurityEnabled()) {
// Append user short name to key to prevent a different user
// from using same instance of xceiverClient.
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientCreator;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
Expand Down Expand Up @@ -79,11 +80,11 @@ private static void startCluster(OzoneConfiguration conf) throws Exception {
storageContainerLocationClient.allocateContainer(
SCMTestUtils.getReplicationType(conf),
HddsProtos.ReplicationFactor.ONE, OzoneConsts.OZONE);
XceiverClientManager xceiverClientManager = new XceiverClientManager(conf);
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), null);
try (XceiverClientFactory factory = new XceiverClientCreator(conf);
XceiverClientSpi client = factory.acquireClient(container.getPipeline())) {
ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), null);
}
}

static void shutdownCluster() {
Expand Down
Loading

0 comments on commit 9b6c142

Please sign in to comment.