Skip to content

Commit 4d38e1a

Browse files
author
jiangwh
committedSep 20, 2021
add
1 parent 16cd692 commit 4d38e1a

File tree

1 file changed

+165
-0
lines changed

1 file changed

+165
-0
lines changed
 

‎microservice/Hystrix.md

+165
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
# Hystrix
2+
3+
## 简单的示例
4+
5+
```java
6+
public static void main(String[] args) throws InterruptedException {
7+
HystrixCommandGroupKey key = HystrixCommandGroupKey.Factory.asKey("cmd");
8+
HystrixCommand.Setter setter = HystrixCommand.Setter.withGroupKey(key);
9+
/////////////////////////////////////////
10+
HystrixCommandProperties.Setter proSetter = HystrixCommandProperties.defaultSetter();
11+
proSetter.withCircuitBreakerEnabled(true);
12+
proSetter.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE);
13+
proSetter.withExecutionTimeoutEnabled(true);
14+
proSetter.withExecutionIsolationSemaphoreMaxConcurrentRequests(2);
15+
proSetter.withExecutionIsolationThreadInterruptOnTimeout(true);
16+
proSetter.withExecutionTimeoutInMilliseconds(5 * 1000);
17+
setter.andCommandPropertiesDefaults(proSetter);
18+
19+
/////////////////////////////////////////
20+
HystrixThreadPoolProperties.Setter threadSetter = HystrixThreadPoolProperties.defaultSetter();
21+
22+
setter.andThreadPoolPropertiesDefaults(threadSetter);
23+
24+
HystrixCommand<String> hystrixCommand = new HystrixCommand<String>(setter) {
25+
@Override
26+
protected String run() throws Exception {
27+
return "finish";
28+
}
29+
30+
@Override
31+
protected String getFallback() {
32+
return "fallback";
33+
}
34+
};
35+
36+
String res = hystrixCommand.execute();
37+
System.out.println(res);
38+
39+
}
40+
```
41+
42+
利用hystrix可以实现限流、熔断、降低的操作,下面具体介绍下。
43+
44+
## 限流
45+
46+
### 限流策略
47+
48+
hystrix中提供了两种限流的策略。
49+
50+
- THREAD
51+
52+
- SEMAPHORE
53+
54+
Thread策略,就使用thread pool的队列size 以及 线程池数量来控制请求并发量。
55+
56+
```java
57+
/* package */static class HystrixThreadPoolDefault implements HystrixThreadPool {
58+
private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);
59+
60+
private final HystrixThreadPoolProperties properties;
61+
private final BlockingQueue<Runnable> queue;
62+
private final ThreadPoolExecutor threadPool;
63+
private final HystrixThreadPoolMetrics metrics;
64+
private final int queueSize;
65+
66+
}
67+
//ThreadPool 创建的方式
68+
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
69+
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
70+
71+
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
72+
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
73+
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
74+
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
75+
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
76+
77+
if (allowMaximumSizeToDivergeFromCoreSize) {
78+
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
79+
if (dynamicCoreSize > dynamicMaximumSize) {
80+
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
81+
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
82+
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
83+
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
84+
} else {
85+
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
86+
}
87+
} else {
88+
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
89+
}
90+
}
91+
```
92+
93+
94+
95+
Semaphore策略,需要将thread pool的队列size设置为无限(Integer.MAX_VALUE),然后ConcurrentHashMap、TryableSemaphore.tryAcquire()方法来控制并发。
96+
97+
```java
98+
//获取semaphore
99+
protected TryableSemaphore getExecutionSemaphore() {
100+
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
101+
if (executionSemaphoreOverride == null) {
102+
TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
103+
if (_s == null) {
104+
// we didn't find one cache so setup
105+
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
106+
// assign whatever got set (this or another thread)
107+
return executionSemaphorePerCircuit.get(commandKey.name());
108+
} else {
109+
return _s;
110+
}
111+
} else {
112+
return executionSemaphoreOverride;
113+
}
114+
} else {
115+
// return NoOp implementation since we're not using SEMAPHORE isolation
116+
return TryableSemaphoreNoOp.DEFAULT;
117+
}
118+
}
119+
//执行
120+
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
121+
// mark that we're starting execution on the ExecutionHook
122+
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
123+
executionHook.onStart(_cmd);
124+
125+
/* determine if we're allowed to execute */
126+
if (circuitBreaker.allowRequest()) {
127+
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
128+
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
129+
final Action0 singleSemaphoreRelease = new Action0() {
130+
@Override
131+
public void call() {
132+
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
133+
executionSemaphore.release();
134+
}
135+
}
136+
};
137+
138+
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
139+
@Override
140+
public void call(Throwable t) {
141+
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
142+
}
143+
};
144+
145+
if (executionSemaphore.tryAcquire()) {
146+
try {
147+
/* used to track userThreadExecutionTime */
148+
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
149+
return executeCommandAndObserve(_cmd)
150+
.doOnError(markExceptionThrown)
151+
.doOnTerminate(singleSemaphoreRelease)
152+
.doOnUnsubscribe(singleSemaphoreRelease);
153+
} catch (RuntimeException e) {
154+
return Observable.error(e);
155+
}
156+
} else {
157+
return handleSemaphoreRejectionViaFallback();
158+
}
159+
} else {
160+
return handleShortCircuitViaFallback();
161+
}
162+
}
163+
164+
```
165+

0 commit comments

Comments
 (0)
Please sign in to comment.