Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2261]: feat(Coordinator): Introduce banned id manager and checker #2255

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class AccessManager {
private final CoordinatorConf coordinatorConf;
private final ClusterManager clusterManager;
private final QuotaManager quotaManager;
private final BannedManager bannedManager;
private final Configuration hadoopConf;
private List<AccessChecker> accessCheckers = Lists.newArrayList();

Expand All @@ -53,6 +54,7 @@ public AccessManager(
this.clusterManager = clusterManager;
this.hadoopConf = hadoopConf;
this.quotaManager = quotaManager;
this.bannedManager = new BannedManager(coordinatorConf);
init();
}

Expand Down Expand Up @@ -103,6 +105,10 @@ public QuotaManager getQuotaManager() {
return quotaManager;
}

public BannedManager getBannedManager() {
return bannedManager;
}

public void close() throws IOException {
for (AccessChecker checker : accessCheckers) {
checker.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator;

import java.util.Collections;
import java.util.Set;

import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** BannedManager is a manager for ban the abnormal app. */
public class BannedManager {
private static final Logger LOG = LoggerFactory.getLogger(BannedManager.class);
// version, bannedIds
private volatile Pair<String, Set<String>> bannedInfo = Pair.of("0", Collections.emptySet());

public BannedManager(CoordinatorConf conf) {
LOG.info("BannedManager initialized successfully.");
}

public boolean checkBanned(String id) {
return bannedInfo.getValue().contains(id);
}

public void reloadBannedIdsFromRest(Pair<String, Set<String>> newBannedIds) {
if (newBannedIds.getKey().equals(bannedInfo.getKey())) {
LOG.warn("receive bannedIds from rest with the same version: {}", newBannedIds.getKey());
}
bannedInfo = newBannedIds;
}

public String getVersion() {
return bannedInfo.getKey();
}

public Pair<String, Set<String>> getBannedInfo() {
return bannedInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,16 @@ public class CoordinatorConf extends RssBaseConf {
.asList()
.defaultValues("appHeartbeat", "heartbeat")
.withDescription("Exclude record rpc audit operation list, separated by ','");
public static final ConfigOption<String> COORDINATOR_ACCESS_BANNED_ID_PROVIDER =
ConfigOptions.key("rss.coordinator.access.bannedIdProvider")
.stringType()
.noDefaultValue()
.withDescription("Get the banned id from Access banned id provider ");
public static final ConfigOption<String> COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN =
ConfigOptions.key("rss.coordinator.access.bannedIdProviderPattern")
.stringType()
.defaultValue("(.*)")
.withDescription("The regular banned id pattern to extract");

public CoordinatorConf() {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator.access.checker;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.ReconfigurableRegistry;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;

/**
* AccessBannedChecker maintain a list of banned id and update it periodically, it checks the banned
* id in the access request and reject if the id is in the banned list.
*/
public class AccessBannedChecker extends AbstractAccessChecker {
private static final Logger LOG = LoggerFactory.getLogger(AccessBannedChecker.class);
private final AccessManager accessManager;
private String bannedIdProviderKey;
private Pattern bannedIdProviderPattern;

public AccessBannedChecker(AccessManager accessManager) throws Exception {
super(accessManager);
this.accessManager = accessManager;
CoordinatorConf conf = accessManager.getCoordinatorConf();
bannedIdProviderKey = conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER);
updateBannedIdProviderPattern(conf);

LOG.info(
"Construct BannedChecker. BannedIdProviderKey is {}, pattern is {}",
bannedIdProviderKey,
bannedIdProviderPattern.pattern());
ReconfigurableRegistry.register(
Sets.newHashSet(
CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER.key(),
CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN.key()),
(theConf, changedProperties) -> {
if (changedProperties == null) {
return;
}
if (changedProperties.contains(
CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER.key())) {
this.bannedIdProviderKey =
conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER);
}
if (changedProperties.contains(
CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER.key())) {
updateBannedIdProviderPattern(conf);
}
});
}

@Override
public AccessCheckResult check(AccessInfo accessInfo) {
if (accessInfo.getExtraProperties() != null
&& bannedIdProviderKey != null
&& accessInfo.getExtraProperties().containsKey(bannedIdProviderKey)) {
String bannedIdPropertyValue = accessInfo.getExtraProperties().get(bannedIdProviderKey);
Matcher matcher = bannedIdProviderPattern.matcher(bannedIdPropertyValue);
if (matcher.find()) {
String bannedId = matcher.group(1);
if (accessManager.getBannedManager() != null
&& accessManager.getBannedManager().checkBanned(bannedId)) {
String msg = String.format("Denied by BannedChecker, accessInfo[%s].", accessInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("BannedIdPropertyValue is {}, {}", bannedIdPropertyValue, msg);
}
CoordinatorMetrics.counterTotalBannedDeniedRequest.inc();
return new AccessCheckResult(false, msg);
}
}
}

return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
}

private void updateBannedIdProviderPattern(RssConf conf) {
String bannedIdProviderRegex =
conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN);
bannedIdProviderPattern = Pattern.compile(bannedIdProviderRegex);
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class CoordinatorMetrics {
private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request";
private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request";
private static final String TOTAL_QUOTA_DENIED_REQUEST = "total_quota_denied_request";
private static final String TOTAL_BANNED_DENIED_REQUEST = "total_banned_denied_request";
public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_";
public static final String APP_NUM_TO_USER = "app_num";
public static final String USER_LABEL = "user_name";
Expand All @@ -57,6 +58,7 @@ public class CoordinatorMetrics {
public static Counter counterTotalCandidatesDeniedRequest;
public static Counter counterTotalQuotaDeniedRequest;
public static Counter counterTotalLoadDeniedRequest;
public static Counter counterTotalBannedDeniedRequest;
public static final Map<String, Gauge> GAUGE_USED_REMOTE_STORAGE = JavaUtils.newConcurrentMap();

private static MetricsManager metricsManager;
Expand Down Expand Up @@ -118,5 +120,6 @@ private static void setUpMetrics() {
metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST);
counterTotalQuotaDeniedRequest = metricsManager.addCounter(TOTAL_QUOTA_DENIED_REQUEST);
counterTotalLoadDeniedRequest = metricsManager.addCounter(TOTAL_LOAD_DENIED_REQUEST);
counterTotalBannedDeniedRequest = metricsManager.addCounter(TOTAL_BANNED_DENIED_REQUEST);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator.web.resource;

import java.util.Set;
import javax.servlet.ServletContext;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hbase.thirdparty.javax.ws.rs.Consumes;
import org.apache.hbase.thirdparty.javax.ws.rs.GET;
import org.apache.hbase.thirdparty.javax.ws.rs.POST;
import org.apache.hbase.thirdparty.javax.ws.rs.Path;
import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.web.resource.BaseResource;
import org.apache.uniffle.common.web.resource.Response;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.BannedManager;
import org.apache.uniffle.coordinator.web.vo.BannedReloadVO;

@Path("/banned")
public class BannedResource extends BaseResource {
private static final Logger LOG = LoggerFactory.getLogger(BannedResource.class);
@Context protected ServletContext servletContext;

@Consumes(MediaType.APPLICATION_JSON)
@POST
@Path("/reload")
public Response<String> reload(BannedReloadVO bannedReloadVo) {
BannedManager bannedManager = getAccessManager().getBannedManager();
if (bannedManager != null && bannedReloadVo != null) {
bannedManager.reloadBannedIdsFromRest(
Pair.of(bannedReloadVo.getVersion(), bannedReloadVo.getIds()));
LOG.info("reload {} banned ids.", bannedReloadVo.getIds().size());
return Response.success("success");
} else {
return Response.fail("bannedManager is not initialized or bannedIds is null.");
}
}

@GET
@Path("version")
public Response<String> version() {
BannedManager bannedManager = getAccessManager().getBannedManager();
if (bannedManager != null) {
String version = bannedManager.getVersion();
LOG.info("Get version of banned ids is {}.", version);
return Response.success(version);
} else {
return Response.fail("bannedManager is not initialized.");
}
}

@GET
@Path("get")
public Response<Pair<String, Set<String>>> get() {
BannedManager bannedManager = getAccessManager().getBannedManager();
if (bannedManager != null) {
Pair<String, Set<String>> bannedInfo = bannedManager.getBannedInfo();
LOG.info(
"Get version:{} include {} bannedIds ",
bannedInfo.getKey(),
bannedInfo.getValue().size());
return Response.success(bannedInfo);
} else {
return Response.fail("bannedManager is not initialized.");
}
}

private AccessManager getAccessManager() {
return (AccessManager) servletContext.getAttribute(AccessManager.class.getCanonicalName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,9 @@ public String getCoordinatorStacks() {
public Class<ConfOpsResource> getConfOps() {
return ConfOpsResource.class;
}

@Path("/banned")
public Class<BannedResource> getBannedResource() {
return BannedResource.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator.web.vo;

import java.util.Collections;
import java.util.Set;

public class BannedReloadVO {
private String version;
private Set<String> ids = Collections.emptySet();

public String getVersion() {
return version;
}

public Set<String> getIds() {
return ids;
}

public void setIds(Set<String> ids) {
if (ids == null) {
ids = Collections.emptySet();
}
this.ids = ids;
}

public void setVersion(String version) {
this.version = version;
}

@Override
public String toString() {
return "BannedIdsVO{"
+ "versionId='"
+ version
+ '\''
+ ", size of bannedIds="
+ ids.size()
+ '}';
}
}
Loading
Loading