Skip to content

Commit

Permalink
HDDS-9015. Block CSR request in SCM for "hdds.x509.rootca.certificate…
Browse files Browse the repository at this point in the history
….polling.interval" time period (apache#5064)
  • Loading branch information
ChenSammi authored Aug 4, 2023
1 parent 47fcf45 commit bd8bb39
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public enum ResultCodes {
INVALID_PIPELINE_STATE,
DUPLICATED_PIPELINE_ID,
TIMEOUT,
CA_ROTATION_IN_PROGRESS
CA_ROTATION_IN_PROGRESS,
CA_ROTATION_IN_POST_PROGRESS,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
Expand Down Expand Up @@ -408,7 +409,9 @@ public synchronized List<X509Certificate> getTrustChain()
chain.add(lastInsertedCert);
List<X509Certificate> caCertList =
OzoneSecurityUtil.convertToX509(listCA());
while (!getAllRootCaCerts().contains(lastInsertedCert)) {
Set<X509Certificate> rootCaCertList = getAllRootCaCerts();
while (!rootCaCertList.isEmpty() &&
!rootCaCertList.contains(lastInsertedCert)) {
Optional<X509Certificate> issuerOpt =
getIssuerForCert(lastInsertedCert, caCertList);
if (issuerOpt.isPresent()) {
Expand Down Expand Up @@ -1403,8 +1406,9 @@ public void run() {
public synchronized void setCACertificate(X509Certificate cert)
throws Exception {
caCertId = cert.getSerialNumber().toString();
String pemCert = CertificateCodec.getPEMEncodedString(cert);
certificateMap.put(caCertId,
CertificateCodec.getCertPathFromPemEncodedString(
CertificateCodec.getPEMEncodedString(cert)));
CertificateCodec.getCertPathFromPemEncodedString(pemCert));
pemEncodedCACerts = Arrays.asList(pemCert);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ enum Status {
DUPLICATED_PIPELINE_ID = 41;
TIMEOUT = 42;
CA_ROTATION_IN_PROGRESS = 43;
CA_ROTATION_IN_POST_PROGRESS = 44;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.safemode.Precheck;

import org.apache.hadoop.hdds.scm.security.RootCARotationManager;
import org.apache.hadoop.hdds.scm.server.ContainerReportQueue;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReport;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.ha.ConfUtils;
import org.apache.hadoop.util.StringUtils;
Expand Down Expand Up @@ -225,4 +227,25 @@ public static String getContainerReportConfPrefix() {
+ SCMEvents.INCREMENTAL_CONTAINER_REPORT.getName());
}

public static void checkIfCertSignRequestAllowed(
RootCARotationManager rotationManager, boolean isScmCertRenew,
OzoneConfiguration config, String operation) throws SCMException {
if (rotationManager != null) {
if (rotationManager.isRotationInProgress() && !isScmCertRenew) {
throw new SCMException("Root CA and Sub CA rotation is in-progress." +
" Please try the operation later again.",
SCMException.ResultCodes.CA_ROTATION_IN_PROGRESS);
}
if (rotationManager.isPostRotationInProgress()) {
SecurityConfig securityConfig = new SecurityConfig(config);
throw new SCMException("The operation " + operation +
" is prohibited due to root CA " +
"and sub CA rotation have just finished. " +
"The prohibition state will last at most " +
securityConfig.getRootCaCertificatePollingInterval() + ". " +
"Please try the operation later again.",
SCMException.ResultCodes.CA_ROTATION_IN_POST_PROGRESS);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,12 @@ protected final <T extends Message> T readConfiguration(
}

}

/**
* Deletes the persisted configuration mapped to the specified serviceName.
* @throws IOException on failure
*/
protected final void deleteConfiguration() throws IOException {
stateManager.deleteConfiguration(getServiceName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ void saveConfiguration(String serviceName, ByteString bytes)
*/
ByteString readConfiguration(String serviceName) throws IOException;

/**
* Deletes the persisted configuration mapped to the specified serviceName.
* @param serviceName name of the {@link StatefulService}, obtained through
* {@link SCMService#getServiceName()}
* @throws IOException on failure
*/
@Replicate
void deleteConfiguration(String serviceName) throws IOException;

/**
* Sets the updated reference to the table when reloading SCM state.
* @param statefulServiceConfig table from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ public ByteString readConfiguration(String serviceName) throws IOException {
return statefulServiceConfig.get(serviceName);
}

/**
* Deletes the persisted configuration mapped to the specified serviceName.
* @param serviceName name of the {@link StatefulService}, obtained through
* {@link SCMService#getServiceName()}
* @throws IOException on failure
*/
@Override
public void deleteConfiguration(String serviceName) throws IOException {
statefulServiceConfig.delete(serviceName);
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.CertInfoProto;
import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
import org.apache.hadoop.hdds.scm.ha.SCMServiceException;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.ha.StatefulService;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.CertInfo;
import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
import org.apache.hadoop.hdds.security.x509.certificate.authority.profile.DefaultCAProfile;
import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient;
Expand All @@ -56,6 +58,7 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executors;
Expand All @@ -75,7 +78,7 @@
/**
* Root CA Rotation Service is a service in SCM to control the CA rotation.
*/
public class RootCARotationManager implements SCMService {
public class RootCARotationManager extends StatefulService {

public static final Logger LOG =
LoggerFactory.getLogger(RootCARotationManager.class);
Expand All @@ -89,11 +92,13 @@ public class RootCARotationManager implements SCMService {
private final Duration renewalGracePeriod;
private final Date timeOfDay;
private final Duration ackTimeout;
private final Duration rootCertPollInterval;
private final SCMCertificateClient scmCertClient;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final AtomicBoolean isProcessing = new AtomicBoolean(false);
private final AtomicReference<Long> processStartTime =
new AtomicReference<>();
private final AtomicBoolean isPostProcessing = new AtomicBoolean(false);
private final String threadName = this.getClass().getSimpleName();
private final String newCAComponent = SCM_ROOT_CA_COMPONENT_NAME +
HDDS_NEW_KEY_CERT_DIR_NAME_SUFFIX +
Expand All @@ -105,6 +110,7 @@ public class RootCARotationManager implements SCMService {
private ScheduledFuture waitAckTask;
private ScheduledFuture waitAckTimeoutTask;
private final RootCARotationMetrics metrics;
private ScheduledFuture clearPostProcessingTask;

/**
* Constructs RootCARotationManager with the specified arguments.
Expand All @@ -131,6 +137,7 @@ public class RootCARotationManager implements SCMService {
* (4) Rotation Committed
*/
public RootCARotationManager(StorageContainerManager scm) {
super(scm.getStatefulServiceStateManager());
this.scm = scm;
this.ozoneConf = scm.getConfiguration();
this.secConf = new SecurityConfig(ozoneConf);
Expand All @@ -141,6 +148,7 @@ public RootCARotationManager(StorageContainerManager scm) {
renewalGracePeriod = secConf.getRenewalGracePeriod();
timeOfDay = Date.from(LocalDateTime.parse(secConf.getCaRotationTimeOfDay())
.atZone(ZoneId.systemDefault()).toInstant());
rootCertPollInterval = secConf.getRootCaCertificatePollingInterval();

executorService = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat(threadName)
Expand Down Expand Up @@ -178,14 +186,25 @@ public void notifyStatusChanged() {
if (waitAckTimeoutTask != null) {
waitAckTask.cancel(true);
}
if (clearPostProcessingTask != null) {
clearPostProcessingTask.cancel(true);
}
isProcessing.set(false);
processStartTime.set(null);
isPostProcessing.set(false);
}
return;
}

if (isRunning.compareAndSet(false, true)) {
LOG.info("notifyStatusChanged: enable monitor task");
// enable post rotation task if needed.
try {
checkAndHandlePostProcessing();
} catch (IOException | CertificateException e) {
throw new RuntimeException(
"Error while checking post-processing state.", e);
}
}
return;
}
Expand Down Expand Up @@ -252,6 +271,10 @@ public boolean isRotationInProgress() {
return isProcessing.get();
}

public boolean isPostRotationInProgress() {
return isPostProcessing.get();
}

/**
* Task to monitor certificate lifetime and start rotation if needed.
*/
Expand Down Expand Up @@ -662,14 +685,16 @@ public void run() {
processStartTime.set(null);

// save root certificate to certStore
X509Certificate rootCACert = null;
try {
if (scm.getCertificateStore().getCertificateByID(
rootCACertHolder.getSerialNumber(), VALID_CERTS) == null) {
LOG.info("Persist root certificate {} to cert store",
rootCACertId);
rootCACert =
CertificateCodec.getX509Certificate(rootCACertHolder);
scm.getCertificateStore().storeValidCertificate(
rootCACertHolder.getSerialNumber(),
CertificateCodec.getX509Certificate(rootCACertHolder),
rootCACertHolder.getSerialNumber(), rootCACert,
HddsProtos.NodeType.SCM);
}
} catch (CertificateException | IOException e) {
Expand All @@ -683,6 +708,17 @@ public void run() {
String msg = "Root certificate " + rootCACertId +
" rotation is finished successfully after " + timeTaken + " ns";
cleanupAndStop(msg);

// set the isPostProcessing to true, which will block the CSR
// signing in this period.
enterPostProcessing(rootCertPollInterval.toMillis());
// save the new root certificate to rocksdb through ratis
if (rootCACert != null) {
saveConfiguration(new CertInfo.Builder()
.setX509Certificate(rootCACert)
.setTimestamp(rootCACert.getNotBefore().getTime())
.build().getProtobuf());
}
} catch (Throwable e) {
LOG.error("Execution error", e);
handler.resetRotationPrepareAcks();
Expand All @@ -695,6 +731,21 @@ public void run() {
}
}

private void enterPostProcessing(long delay) {
isPostProcessing.set(true);
LOG.info("isPostProcessing is true for {} ms", delay);
clearPostProcessingTask = executorService.schedule(() -> {
isPostProcessing.set(false);
LOG.info("isPostProcessing is false");
try {
deleteConfiguration();
LOG.info("Stateful configuration is deleted");
} catch (IOException e) {
LOG.error("Failed to delete stateful configuration", e);
}
}, delay, TimeUnit.MILLISECONDS);
}

/**
* Stops scheduled monitor task.
*/
Expand Down Expand Up @@ -728,4 +779,52 @@ public boolean shouldSkipRootCert(String newRootCertId) throws IOException {
}
return false;
}

private void checkAndHandlePostProcessing() throws IOException,
CertificateException {
CertInfoProto proto = readConfiguration(CertInfoProto.class);
if (proto == null) {
LOG.info("No {} configuration found in stateful storage",
getServiceName());
return;
}

X509Certificate cert =
CertificateCodec.getX509Certificate(proto.getX509Certificate());

List<X509Certificate> scmCertChain = scmCertClient.getTrustChain();
Preconditions.checkArgument(scmCertChain.size() > 1);
X509Certificate rootCert = scmCertChain.get(scmCertChain.size() - 1);

int result = rootCert.getSerialNumber().compareTo(cert.getSerialNumber());
if (result > 0) {
// this could happen if the previous stateful configuration is not deleted
LOG.warn("Root CA certificate ID {} in stateful storage is smaller than" +
" current scm's root certificate ID {}", cert.getSerialNumber(),
rootCert.getSerialNumber());

deleteConfiguration();
LOG.warn("Stateful configuration is deleted");
return;
} else if (result < 0) {
// this should not happen
throw new RuntimeException("Root CA certificate ID " +
cert.getSerialNumber() + " in stateful storage is bigger than " +
"current scm's root CA certificate ID " + rootCert.getSerialNumber());
}

Date issueTime = rootCert.getNotBefore();
Date now = Calendar.getInstance().getTime();
Duration gap = Duration.between(issueTime.toInstant(), now.toInstant());
gap = gap.minus(rootCertPollInterval);
if (gap.isNegative()) {
long delay = -gap.toMillis();
enterPostProcessing(delay);
} else {
// this could happen if the service stopped for a long and restarts
LOG.info("Root CA certificate ID {} in stateful storage has already " +
"come out of post-processing state", cert.getSerialNumber());
deleteConfiguration();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StorageContainerLocationProtocolService.newReflectiveBlockingService;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdds.scm.ScmUtils.checkIfCertSignRequestAllowed;
import static org.apache.hadoop.hdds.scm.ha.HASecurityUtils.createSCMRatisTLSConfig;
import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
Expand All @@ -131,14 +132,14 @@ public class SCMClientProtocolServer implements
private final RPC.Server clientRpcServer;
private final InetSocketAddress clientRpcAddress;
private final StorageContainerManager scm;
private final OzoneConfiguration config;
private final ProtocolMessageMetrics<ProtocolMessageEnum> protocolMetrics;

public SCMClientProtocolServer(
OzoneConfiguration conf,
public SCMClientProtocolServer(OzoneConfiguration conf,
StorageContainerManager scm,
ReconfigurationHandler reconfigurationHandler
) throws IOException {
ReconfigurationHandler reconfigurationHandler) throws IOException {
this.scm = scm;
this.config = conf;
final int handlerCount =
conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
OZONE_SCM_HANDLER_COUNT_DEFAULT);
Expand Down Expand Up @@ -803,12 +804,8 @@ public void transferLeadership(String newLeaderId)
throw new SCMException("SCM HA not enabled.", ResultCodes.INTERNAL_ERROR);
}

if (scm.getRootCARotationManager() != null &&
scm.getRootCARotationManager().isRotationInProgress()) {
throw new SCMException(("Root CA and Sub CA rotation is in-progress." +
" Please try the operation later again."),
ResultCodes.CA_ROTATION_IN_PROGRESS);
}
checkIfCertSignRequestAllowed(scm.getRootCARotationManager(),
false, config, "transferLeadership");

boolean auditSuccess = true;
final Map<String, String> auditMap = Maps.newHashMap();
Expand Down
Loading

0 comments on commit bd8bb39

Please sign in to comment.