Skip to content

Commit

Permalink
http2 watch
Browse files Browse the repository at this point in the history
  • Loading branch information
PleaseGiveMeTheCoke committed Oct 13, 2024
1 parent 6d94a2c commit 8b68d8a
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 55 deletions.
5 changes: 5 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,11 @@ public interface ConfigurationKeys {

String TRANSPORT_PROTOCOL = TRANSPORT_PREFIX + "protocol";

/**
* http1 or http2
*/
String HTTP_VERSION = TRANSPORT_PREFIX + "httpVersion";

/**
* The constant ENABLE_TM_CLIENT_BATCH_SEND_REQUEST
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.seata.common;

import org.apache.http.HttpVersion;

import java.time.Duration;

public interface DefaultValues {
Expand Down Expand Up @@ -64,6 +66,7 @@ public interface DefaultValues {
String DEFAULT_NIO_WORKER_THREAD_PREFIX = "NettyServerNIOWorker";
String DEFAULT_EXECUTOR_THREAD_PREFIX = "NettyServerBizHandler";
String DEFAULT_PROTOCOL = "seata";
String DEFAULT_HTTP_VERSION = "http1";

boolean DEFAULT_TRANSPORT_HEARTBEAT = true;
boolean DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.common.util;

import io.netty.handler.codec.http.QueryStringEncoder;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.http.client.methods.CloseableHttpResponse;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class Http2ClientUtil {


public static Response doPost(String url, Map<String, String> params, Map<String, String> header,
int timeout) throws IOException {
OkHttpClient client = new OkHttpClient()
.newBuilder()
.readTimeout(timeout, TimeUnit.SECONDS)
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
.build();
QueryStringEncoder queryStringEncoder = new QueryStringEncoder(url);
params.forEach(queryStringEncoder::addParam);
Request.Builder builder = new Request.Builder();
header.forEach(builder::addHeader);
Request request = builder.url(queryStringEncoder.toString()).build();
return client.newCall(request).execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
Expand All @@ -35,7 +34,6 @@
import io.netty.handler.codec.http.multipart.InterfaceHttpData;

import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicReference;

public class HttpDispatchHandler extends SimpleChannelInboundHandler<HttpRequest> {

Expand All @@ -46,6 +44,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest)
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode requestDataNode = objectMapper.createObjectNode();
requestDataNode.putIfAbsent("param", ParameterParser.convertParamMap(queryStringDecoder.parameters()));
requestDataNode.putPOJO("channel", ctx.channel());

if (httpRequest.method() == HttpMethod.POST) {
ObjectNode bodyDataNode = objectMapper.createObjectNode();
Expand All @@ -63,13 +62,11 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest)
HttpInvocation httpInvocation = ControllerManager.getHttpInvocation(path);
Object httpController = httpInvocation.getController();
Method handleMethod = httpInvocation.getMethod();
AtomicReference<Channel> channel = new AtomicReference<>(ctx.channel());
Object[] args = ParameterParser.getArgValues(channel, httpInvocation.getParamMetaData(), handleMethod, requestDataNode);
Object[] args = ParameterParser.getArgValues(httpInvocation.getParamMetaData(), handleMethod, requestDataNode);
Object result = handleMethod.invoke(httpController, args);
if (channel.get() == null) {
if (requestDataNode.get("channel") == null) {
return;
}

FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(objectMapper.writeValueAsBytes(result)));
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
ctx.writeAndFlush(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.lang.reflect.Parameter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -55,20 +54,20 @@ public static ObjectNode convertParamMap(Map<String, List<String>> paramMap) thr
return paramNode;
}

public static Object[] getArgValues(AtomicReference<Channel> channel, ParamMetaData[] paramMetaDatas, Method handleMethod, ObjectNode paramMap) throws JsonProcessingException {
public static Object[] getArgValues(ParamMetaData[] paramMetaDatas, Method handleMethod, ObjectNode paramMap) throws JsonProcessingException {
Class<?>[] parameterTypes = handleMethod.getParameterTypes();
Parameter[] parameters = handleMethod.getParameters();
return getParameters(channel, parameterTypes, paramMetaDatas, parameters, paramMap);
return getParameters(parameterTypes, paramMetaDatas, parameters, paramMap);
}

private static Object[] getParameters(AtomicReference<Channel> channel, Class<?>[] parameterTypes, ParamMetaData[] paramMetaDatas, Parameter[] parameters, ObjectNode paramMap) throws JsonProcessingException {
private static Object[] getParameters(Class<?>[] parameterTypes, ParamMetaData[] paramMetaDatas, Parameter[] parameters, ObjectNode paramMap) throws JsonProcessingException {
int length = parameterTypes.length;
Object[] ret = new Object[length];
for (int i = 0; i < length; i++) {
Class<?> parameterType = parameterTypes[i];
String parameterName = parameters[i].getName();
ParamMetaData paramMetaData = paramMetaDatas[i];
ret[i] = getArgValue(channel, parameterType, parameterName, paramMetaData, paramMap);
ret[i] = getArgValue(parameterType, parameterName, paramMetaData, paramMap);
if (!parameterType.isAssignableFrom(ret[i].getClass())) {
LOGGER.error("[HttpDispatchHandler] not compatible parameter type, expect {}, but {}", parameterType, ret[i].getClass());
ret[i] = null;
Expand All @@ -79,13 +78,13 @@ private static Object[] getParameters(AtomicReference<Channel> channel, Class<?>
}


private static Object getArgValue(AtomicReference<Channel> channel, Class<?> parameterType, String parameterName, ParamMetaData paramMetaData, ObjectNode paramMap) throws JsonProcessingException {
private static Object getArgValue(Class<?> parameterType, String parameterName, ParamMetaData paramMetaData, ObjectNode paramMap) throws JsonProcessingException {
ParamMetaData.ParamConvertType paramConvertType = paramMetaData.getParamConvertType();
ObjectMapper objectMapper = new ObjectMapper();
if (parameterType.equals(Channel.class)) {
Channel ret = channel.get();
channel.set(null);
return ret;
JsonNode jsonNode = paramMap.get("channel");
paramMap.putPOJO("channel", null);
return objectMapper.convertValue(jsonNode, Channel.class);
} else if (ParamMetaData.ParamConvertType.MODEL_ATTRIBUTE.equals(paramConvertType)) {
JsonNode param = paramMap.get("param");
return objectMapper.convertValue(param, parameterType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.QueryStringDecoder;
Expand All @@ -39,7 +38,6 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class Http2DispatchHandler extends ChannelDuplexHandler {

Expand All @@ -64,25 +62,26 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ObjectMapper objectMapper = new ObjectMapper();
requestDataNode = objectMapper.createObjectNode();
requestDataNode.putIfAbsent("param", ParameterParser.convertParamMap(queryStringDecoder.parameters()));
requestDataNode.putPOJO("channel", ctx.channel());
} else if (msg instanceof Http2DataFrame) {
ObjectMapper objectMapper = new ObjectMapper();
Http2DataFrame http2DataFrame = (Http2DataFrame) msg;
ByteBuf byteBuf = http2DataFrame.content();
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
requestDataNode.putIfAbsent("body", objectMapper.readTree(bytes));
invoke(ctx, path, objectMapper);
invoke(ctx);
}
}

private void invoke(ChannelHandlerContext ctx, String path, ObjectMapper objectMapper) throws JsonProcessingException, IllegalAccessException, InvocationTargetException {
private void invoke(ChannelHandlerContext ctx) throws JsonProcessingException, IllegalAccessException, InvocationTargetException {
ObjectMapper objectMapper = new ObjectMapper();
HttpInvocation httpInvocation = ControllerManager.getHttpInvocation(path);
Object httpController = httpInvocation.getController();
Method handleMethod = httpInvocation.getMethod();
AtomicReference<Channel> channel = new AtomicReference<>(ctx.channel());
Object[] args = ParameterParser.getArgValues(channel, httpInvocation.getParamMetaData(), handleMethod, requestDataNode);
Object[] args = ParameterParser.getArgValues(httpInvocation.getParamMetaData(), handleMethod, requestDataNode);
Object result = handleMethod.invoke(httpController, args);
if (channel.get() == null) {
if (requestDataNode.get("channel") == null) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.Response;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.exception.AuthenticationFailedException;
import org.apache.seata.common.exception.RetryableException;
Expand All @@ -44,6 +45,7 @@
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.Http2ClientUtil;
import org.apache.seata.common.util.HttpClientUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.config.ConfigChangeListener;
Expand All @@ -56,9 +58,12 @@
import org.apache.http.entity.ContentType;
import org.apache.http.util.EntityUtils;
import org.apache.http.protocol.HTTP;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.seata.common.DefaultValues.DEFAULT_HTTP_VERSION;

/**
* The type File registry service.
*
Expand Down Expand Up @@ -313,24 +318,44 @@ private static boolean watch() throws RetryableException {
if (StringUtils.isNotBlank(jwtToken)) {
header.put(AUTHORIZATION_HEADER, jwtToken);
}
try (CloseableHttpResponse response =
HttpClientUtil.doPost("http://" + tcAddress + "/metadata/v1/watch", param, header, 30000)) {
if (response != null) {
StatusLine statusLine = response.getStatusLine();
if (statusLine != null && statusLine.getStatusCode() == HttpStatus.SC_UNAUTHORIZED) {
if (StringUtils.isNotBlank(USERNAME) && StringUtils.isNotBlank(PASSWORD)) {
throw new RetryableException("Authentication failed!");
} else {
throw new AuthenticationFailedException("Authentication failed! you should configure the correct username and password.");
}
String httpVersion = CONFIG.getConfig(org.apache.seata.common.ConfigurationKeys.TRANSPORT_PROTOCOL, DEFAULT_HTTP_VERSION);
if (httpVersion.equals(DEFAULT_HTTP_VERSION)) {
return watchWithHttp(header, param, tcAddress);
} else {
return watchWithHttp2(header, param, tcAddress);
}
}
return false;
}

private static Boolean watchWithHttp2(Map<String, String> header, Map<String, String> param, String tcAddress) throws RetryableException {
try (Response response =
Http2ClientUtil.doPost("http://" + tcAddress + "/metadata/v1/watch", param, header, 30000)) {
int code = response.code();
return code == HttpStatus.SC_OK;
} catch (IOException e) {
LOGGER.error("watch cluster node: {}, fail: {}", tcAddress, e.getMessage());
throw new RetryableException(e.getMessage(), e);
}
}

private static Boolean watchWithHttp(Map<String, String> header, Map<String, String> param, String tcAddress) throws RetryableException {
try (CloseableHttpResponse response =
HttpClientUtil.doPost("http://" + tcAddress + "/metadata/v1/watch", param, header, 30000)) {
if (response != null) {
StatusLine statusLine = response.getStatusLine();
if (statusLine != null && statusLine.getStatusCode() == HttpStatus.SC_UNAUTHORIZED) {
if (StringUtils.isNotBlank(USERNAME) && StringUtils.isNotBlank(PASSWORD)) {
throw new RetryableException("Authentication failed!");
} else {
throw new AuthenticationFailedException("Authentication failed! you should configure the correct username and password.");
}
return statusLine != null && statusLine.getStatusCode() == HttpStatus.SC_OK;
}
} catch (IOException e) {
LOGGER.error("watch cluster node: {}, fail: {}", tcAddress, e.getMessage());
throw new RetryableException(e.getMessage(), e);
return statusLine != null && statusLine.getStatusCode() == HttpStatus.SC_OK;
}
break;
} catch (IOException e) {
LOGGER.error("watch cluster node: {}, fail: {}", tcAddress, e.getMessage());
throw new RetryableException(e.getMessage(), e);
}
return false;
}
Expand Down Expand Up @@ -447,7 +472,7 @@ public List<InetSocketAddress> lookup(String key) throws Exception {
CURRENT_TRANSACTION_SERVICE_GROUP = key;
CURRENT_TRANSACTION_CLUSTER_NAME = clusterName;
if (!METADATA.containsGroup(clusterName)) {
String raftClusterAddress = CONFIG.getConfig(getRaftAddrFileKey());
String raftClusterAddress = CONFIG.getConfig("httpVersion");
if (StringUtils.isNotBlank(raftClusterAddress)) {
List<InetSocketAddress> list = new ArrayList<>();
String[] addresses = raftClusterAddress.split(",");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.Http2StreamChannel;
import org.apache.seata.common.thread.NamedThreadFactory;
Expand Down Expand Up @@ -62,7 +65,7 @@ public void init() {
for (String group : WATCHERS.keySet()) {
Optional.ofNullable(WATCHERS.remove(group))
.ifPresent(watchers -> watchers.parallelStream().forEach(watcher -> {
if (watcher.getTimeout() != -1L && System.currentTimeMillis() >= watcher.getTimeout()) {
if (System.currentTimeMillis() >= watcher.getTimeout()) {
HttpServletResponse httpServletResponse =
(HttpServletResponse)((AsyncContext)watcher.getAsyncContext()).getResponse();
watcher.setDone(true);
Expand Down Expand Up @@ -94,21 +97,15 @@ private void notify(Watcher<?> watcher) {
Channel channel = (Channel) watcher.getAsyncContext();
if (channel instanceof Http2StreamChannel) {
// http2
String group = watcher.getGroup();
// No need to remove it, just put it back.
WATCHERS.computeIfAbsent(group, value -> new ConcurrentLinkedQueue<>()).add(watcher);
channel.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.wrappedBuffer("changed\n".getBytes())));
return;
} else {
// http
channel.writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK));
}

AsyncContext asyncContext = (AsyncContext)watcher.getAsyncContext();
HttpServletResponse httpServletResponse = (HttpServletResponse)asyncContext.getResponse();
watcher.setDone(true);
if (logger.isDebugEnabled()) {
logger.debug("notify cluster change event to: {}", asyncContext.getRequest().getRemoteAddr());
logger.debug("notify cluster change event to: {}", channel.remoteAddress());
}
httpServletResponse.setStatus(HttpServletResponse.SC_OK);
asyncContext.complete();
}

public void registryWatcher(Watcher<?> watcher) {
Expand Down
Loading

0 comments on commit 8b68d8a

Please sign in to comment.