Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bruceyan/issue monitor 20230624 #1382

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions agent/hippo4j-agent-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
<artifactId>hippo4j-threadpool-dynamic-mode-config</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-kernel-monitor</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import cn.hippo4j.common.extension.design.AbstractSubjectCenter;
import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.event.DynamicThreadPoolRefreshListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ConfigurableApplicationContext;

import java.lang.reflect.Method;

/**
* Event publishing started interceptor
*/
@Slf4j
public class EventPublishingStartedInterceptor implements InstanceMethodsAroundInterceptor {

private static final ILog LOGGER = LogManager.getLogger(EventPublishingStartedInterceptor.class);
Expand All @@ -57,6 +59,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
dynamicRefresh.registerListener();
AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.THREAD_POOL_DYNAMIC_REFRESH,
new DynamicThreadPoolRefreshListener());

return ret;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ public static class Apollo {
public static List<String> NAMESPACE = Arrays.asList("application");
}

/**
* Monitor
*/
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Monitor {

public static Boolean enable = Boolean.TRUE;

public static String collectTypes = "micrometer";

public static String threadPoolTypes = "dynamic";

public static Long initialDelay = 10000L;

public static Long collectInterval = 5000L;
}

public static String CONFIG_FILE_TYPE;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.hippo4j.agent.plugin.spring.common.define;

import cn.hippo4j.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import cn.hippo4j.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import cn.hippo4j.agent.core.plugin.match.ClassMatch;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;

import static cn.hippo4j.agent.core.plugin.match.NameMatch.byName;
import static net.bytebuddy.matcher.ElementMatchers.named;

public class SpringApplicationInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String ENHANCE_CLASS = "org.springframework.boot.SpringApplication";

private static final String RUN_INTERCEPTOR = "cn.hippo4j.agent.plugin.spring.common.interceptor.SpringApplicationRunInterceptor";

@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}

@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}

@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {

@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("run");
}

@Override
public String getMethodsInterceptor() {
return RUN_INTERCEPTOR;
}

@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}

@Override
protected String[] witnessClasses() {
return new String[]{"org.springframework.boot.SpringApplication"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.handler.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.toolkit.BooleanUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -64,13 +65,12 @@ public static void registerThreadPoolInstances(ApplicationContext context) {
for (Map.Entry<String, Executor> entry : beansWithAnnotation.entrySet()) {
String beanName = entry.getKey();
Executor bean = entry.getValue();
ThreadPoolExecutor executor = (ThreadPoolExecutor) bean;
// TODO
// if (DynamicThreadPoolAdapterChoose.match(bean)) {
// executor = DynamicThreadPoolAdapterChoose.unwrap(bean);
// } else {
// executor = (ThreadPoolExecutor) bean;
// }
ThreadPoolExecutor executor;
if (DynamicThreadPoolAdapterChoose.match(bean)) {
executor = DynamicThreadPoolAdapterChoose.unwrap(bean);
} else {
executor = (ThreadPoolExecutor) bean;
}
if (executor == null) {
LOGGER.warn("[Hippo4j-Agent] Thread pool is null, ignore bean registration. beanName={}, beanClass={}", beanName, bean.getClass().getName());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cn.hippo4j.core.executor.support.adpter;
package cn.hippo4j.common.api;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.hippo4j.common.handler;

import cn.hippo4j.common.api.DynamicThreadPoolAdapter;
import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry;
import cn.hippo4j.common.toolkit.CollectionUtil;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
* Dynamic thread pool adapter choose.
*/
public class DynamicThreadPoolAdapterChoose {

private static final List<DynamicThreadPoolAdapter> DYNAMIC_THREAD_POOL_ADAPTERS = new ArrayList<>();

static {
DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorAdapter());
DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorServiceAdapter());
DYNAMIC_THREAD_POOL_ADAPTERS.add(new ZipkinExecutorAdapter());
loadCustomerAdapter();
}

/**
* Check if the object contains thread pool information.
*
* @param executor objects where there may be instances
* of dynamic thread pools
* @return matching results
*/
public static boolean match(Object executor) {
return DYNAMIC_THREAD_POOL_ADAPTERS.stream().anyMatch(each -> each.match(executor));
}

/**
* Get the dynamic thread pool reference in the object.
*
* @param executor objects where there may be instances
* of dynamic thread pools
* @return get the real dynamic thread pool instance
*/
public static ThreadPoolExecutor unwrap(Object executor) {
Optional<DynamicThreadPoolAdapter> dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst();
return dynamicThreadPoolAdapterOptional.map(each -> each.unwrap(executor)).orElse(null);
}

/**
* If the {@link DynamicThreadPoolAdapter#match(Object)} conditions are met,
* the thread pool is replaced with a dynamic thread pool.
*
* @param executor objects where there may be instances
* of dynamic thread pools
* @param dynamicThreadPoolExecutor dynamic thread-pool executor
*/
public static void replace(Object executor, Executor dynamicThreadPoolExecutor) {
Optional<DynamicThreadPoolAdapter> dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst();
dynamicThreadPoolAdapterOptional.ifPresent(dynamicThreadPoolAdapter -> dynamicThreadPoolAdapter.replace(executor, dynamicThreadPoolExecutor));
}

/**
* Load SPI customer adapter.
*/
private static void loadCustomerAdapter() {
ServiceLoaderRegistry.register(DynamicThreadPoolAdapter.class);
Collection<DynamicThreadPoolAdapter> instances = ServiceLoaderRegistry.getSingletonServiceInstances(DynamicThreadPoolAdapter.class);
if (CollectionUtil.isEmpty(instances)) {
return;
}
for (DynamicThreadPoolAdapter instance : instances) {
if (instance != null) {
DYNAMIC_THREAD_POOL_ADAPTERS.add(instance);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package cn.hippo4j.core.executor.state;
package cn.hippo4j.common.handler;

import cn.hippo4j.common.toolkit.ReflectUtil;
import lombok.extern.slf4j.Slf4j;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.hippo4j.common.handler;

import cn.hippo4j.common.api.DynamicThreadPoolAdapter;
import cn.hippo4j.common.toolkit.ReflectUtil;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
* Transmittable thread local executor adapter.
*/
public class TransmittableThreadLocalExecutorAdapter implements DynamicThreadPoolAdapter {

private static final String MATCH_CLASS_NAME = "ExecutorTtlWrapper";

private static final String FIELD_NAME = "executor";

@Override
public boolean match(Object executor) {
return Objects.equals(MATCH_CLASS_NAME, executor.getClass().getSimpleName());
}

@Override
public ThreadPoolExecutor unwrap(Object executor) {
return (ThreadPoolExecutor) ReflectUtil.getFieldValue(executor, FIELD_NAME);
}

@Override
public void replace(Object executor, Executor dynamicThreadPoolExecutor) {
ReflectUtil.setFieldValue(executor, FIELD_NAME, dynamicThreadPoolExecutor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.hippo4j.common.handler;

import cn.hippo4j.common.api.DynamicThreadPoolAdapter;
import cn.hippo4j.common.toolkit.ReflectUtil;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
* Transmittable thread local executor service adapter.
*/
public class TransmittableThreadLocalExecutorServiceAdapter implements DynamicThreadPoolAdapter {

private static final String MATCH_CLASS_NAME = "ExecutorServiceTtlWrapper";

private static final String FIELD_NAME = "executorService";

@Override
public boolean match(Object executor) {
return Objects.equals(MATCH_CLASS_NAME, executor.getClass().getSimpleName());
}

@Override
public ThreadPoolExecutor unwrap(Object executor) {
return (ThreadPoolExecutor) ReflectUtil.getFieldValue(executor, FIELD_NAME);
}

@Override
public void replace(Object executor, Executor dynamicThreadPoolExecutor) {
ReflectUtil.setFieldValue(executor, FIELD_NAME, dynamicThreadPoolExecutor);
}
}
Loading