diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/exception/ApisixException.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/exception/ApisixException.java new file mode 100644 index 00000000..4f7e1b41 --- /dev/null +++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/exception/ApisixException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.apisix.plugin.runner.exception; + +public class ApisixException extends RuntimeException { + + private int code; + + public ApisixException(int code, String msg) { + super(msg); + this.code = code; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } +} diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/exception/ExceptionCaught.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/exception/ExceptionCaught.java new file mode 100644 index 00000000..c208a683 --- /dev/null +++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/exception/ExceptionCaught.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.apisix.plugin.runner.exception; + +import io.netty.channel.ChannelHandlerContext; + +public interface ExceptionCaught { + + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; + +} diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ExceptionCaughtHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ExceptionCaughtHandler.java index fb6bfe52..acd7bdb7 100644 --- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ExceptionCaughtHandler.java +++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/ExceptionCaughtHandler.java @@ -21,15 +21,33 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.apisix.plugin.runner.A6ErrResponse; +import org.apache.apisix.plugin.runner.exception.ExceptionCaught; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + public class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(ExceptionCaughtHandler.class); + List<ExceptionCaught> exceptionCaughtList = new ArrayList<ExceptionCaught>(); + + public ExceptionCaughtHandler() { + + } + + public ExceptionCaughtHandler(List<ExceptionCaught> exceptionCaughtList) { + this.exceptionCaughtList = exceptionCaughtList; + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("handle request error: ", cause); + if (!exceptionCaughtList.isEmpty()) { + exceptionCaughtList.get(0).exceptionCaught(ctx, cause); + return; + } A6ErrResponse errResponse = new A6ErrResponse(Code.SERVICE_UNAVAILABLE); ctx.writeAndFlush(errResponse); } diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RpcCallHandler.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RpcCallHandler.java index f054ccfe..45eb4a4d 100644 --- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RpcCallHandler.java +++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/handler/RpcCallHandler.java @@ -17,24 +17,15 @@ package org.apache.apisix.plugin.runner.handler; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Map; -import java.util.Objects; -import java.util.Queue; -import java.util.Set; - import com.google.common.cache.Cache; import io.github.api7.A6.Err.Code; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import lombok.RequiredArgsConstructor; import org.apache.apisix.plugin.runner.A6Conf; import org.apache.apisix.plugin.runner.A6ErrRequest; -import org.apache.apisix.plugin.runner.A6ErrResponse; import org.apache.apisix.plugin.runner.A6Request; import org.apache.apisix.plugin.runner.ExtraInfoRequest; import org.apache.apisix.plugin.runner.ExtraInfoResponse; @@ -42,14 +33,21 @@ import org.apache.apisix.plugin.runner.HttpResponse; import org.apache.apisix.plugin.runner.PostRequest; import org.apache.apisix.plugin.runner.PostResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.util.CollectionUtils; -import lombok.RequiredArgsConstructor; - +import org.apache.apisix.plugin.runner.exception.ApisixException; import org.apache.apisix.plugin.runner.constants.Constants; import org.apache.apisix.plugin.runner.filter.PluginFilter; import org.apache.apisix.plugin.runner.filter.PluginFilterChain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; @RequiredArgsConstructor public class RpcCallHandler extends SimpleChannelInboundHandler<A6Request> { @@ -311,8 +309,7 @@ private void preReadReq() { } private void errorHandle(ChannelHandlerContext ctx, int code) { - A6ErrResponse errResponse = new A6ErrResponse(code); - ctx.writeAndFlush(errResponse); + throw new ApisixException(code, Code.name(code)); } private void cleanCtx() { diff --git a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java index 865983a4..041b05cb 100644 --- a/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java +++ b/runner-core/src/main/java/org/apache/apisix/plugin/runner/server/ApplicationRunner.java @@ -17,21 +17,6 @@ package org.apache.apisix.plugin.runner.server; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.CommandLineRunner; -import org.springframework.stereotype.Component; import com.google.common.cache.Cache; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; @@ -47,9 +32,9 @@ import io.netty.channel.unix.DomainSocketChannel; import io.netty.handler.logging.LoggingHandler; import lombok.RequiredArgsConstructor; - import org.apache.apisix.plugin.runner.A6Conf; import org.apache.apisix.plugin.runner.A6ConfigWatcher; +import org.apache.apisix.plugin.runner.exception.ExceptionCaught; import org.apache.apisix.plugin.runner.filter.PluginFilter; import org.apache.apisix.plugin.runner.handler.PrepareConfHandler; import org.apache.apisix.plugin.runner.handler.RpcCallHandler; @@ -57,6 +42,23 @@ import org.apache.apisix.plugin.runner.handler.BinaryProtocolDecoder; import org.apache.apisix.plugin.runner.handler.PayloadEncoder; import org.apache.apisix.plugin.runner.handler.ExceptionCaughtHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.ServiceLoader; +import java.util.stream.Collectors; @Component @RequiredArgsConstructor @@ -72,9 +74,16 @@ public class ApplicationRunner implements CommandLineRunner { private ObjectProvider<PluginFilter> filterProvider; private ObjectProvider<A6ConfigWatcher> watcherProvider; + public static final List<ExceptionCaught> EXCEPTION_LIST = new ArrayList<ExceptionCaught>(); + + static { + ServiceLoader<ExceptionCaught> serviceLoader = ServiceLoader.load(ExceptionCaught.class); + serviceLoader.forEach(EXCEPTION_LIST::add); + } + @Autowired public ApplicationRunner(Cache<Long, A6Conf> cache, - ObjectProvider<PluginFilter> filterProvider, ObjectProvider<A6ConfigWatcher> watcherProvider) { + ObjectProvider<PluginFilter> filterProvider, ObjectProvider<A6ConfigWatcher> watcherProvider) { this.cache = cache; this.filterProvider = filterProvider; this.watcherProvider = watcherProvider; @@ -133,7 +142,7 @@ protected void initChannel(DomainSocketChannel channel) { .addLast("payloadDecoder", new PayloadDecoder()) .addAfter("payloadDecoder", "prepareConfHandler", createConfigReqHandler(cache, filterProvider, watcherProvider)) .addAfter("prepareConfHandler", "hTTPReqCallHandler", createA6HttpHandler(cache)) - .addLast("exceptionCaughtHandler", new ExceptionCaughtHandler()); + .addLast("exceptionCaughtHandler", new ExceptionCaughtHandler(EXCEPTION_LIST)); } }); diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java index d7360cb1..4d171101 100644 --- a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java +++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/A6HttpCallHandlerTest.java @@ -29,11 +29,12 @@ import org.apache.apisix.plugin.runner.A6ConfigRequest; import org.apache.apisix.plugin.runner.A6ConfigResponse; import org.apache.apisix.plugin.runner.A6ConfigWatcher; -import org.apache.apisix.plugin.runner.A6ErrResponse; import org.apache.apisix.plugin.runner.HttpRequest; import org.apache.apisix.plugin.runner.HttpResponse; import org.apache.apisix.plugin.runner.filter.PluginFilter; import org.apache.apisix.plugin.runner.filter.PluginFilterChain; +import org.apache.apisix.plugin.runner.server.ApplicationRunner; +import org.apache.apisix.plugin.runner.PostResponse; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -173,7 +174,7 @@ public Boolean requiredBody() { A6ConfigRequest request = new A6ConfigRequest(req); prepareConfHandler = new PrepareConfHandler(cache, filters, watchers); - channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler); + channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, new ExceptionCaughtHandler(ApplicationRunner.EXCEPTION_LIST)); channel.writeInbound(request); channel.finish(); A6ConfigResponse response = channel.readOutbound(); @@ -181,7 +182,7 @@ public Boolean requiredBody() { prepareConfHandler = new PrepareConfHandler(cache, filters, watchers); rpcCallHandler = new RpcCallHandler(cache); - channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler); + channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler, new ExceptionCaughtHandler(ApplicationRunner.EXCEPTION_LIST)); } @AfterEach @@ -202,9 +203,9 @@ void testCannotFindConfToken() { HttpRequest request = new HttpRequest(req); channel.writeInbound(request); channel.finish(); - A6ErrResponse response = channel.readOutbound(); + PostResponse response = channel.readOutbound(); io.github.api7.A6.Err.Resp err = io.github.api7.A6.Err.Resp.getRootAsResp(response.encode()); - Assertions.assertEquals(err.code(), Code.CONF_TOKEN_NOT_FOUND); + Assertions.assertEquals(err.code(), Code.SERVICE_UNAVAILABLE); } @Test diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/GlobalException.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/GlobalException.java new file mode 100644 index 00000000..1e38afe3 --- /dev/null +++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/GlobalException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.apisix.plugin.runner.handler; + +import io.github.api7.A6.Err.Code; +import io.netty.channel.ChannelHandlerContext; +import org.apache.apisix.plugin.runner.PostResponse; +import org.apache.apisix.plugin.runner.exception.ExceptionCaught; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GlobalException implements ExceptionCaught { + + private final Logger logger = LoggerFactory.getLogger(GlobalException.class); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + // expand exception + logger.warn("plugin expend"); + PostResponse errResponse = new PostResponse(Code.SERVICE_UNAVAILABLE); + errResponse.setStatusCode(10001); + errResponse.setBody("test exception"); + ctx.writeAndFlush(errResponse); + } +} diff --git a/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestGlobalExceptionExt.java b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestGlobalExceptionExt.java new file mode 100644 index 00000000..8ea9ec29 --- /dev/null +++ b/runner-core/src/test/java/org/apache/apisix/plugin/runner/handler/TestGlobalExceptionExt.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.apisix.plugin.runner.handler; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.flatbuffers.FlatBufferBuilder; +import io.github.api7.A6.Err.Code; +import io.github.api7.A6.TextEntry; +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.apisix.plugin.runner.PostResponse; +import org.apache.apisix.plugin.runner.A6Conf; +import org.apache.apisix.plugin.runner.A6ConfigWatcher; +import org.apache.apisix.plugin.runner.A6ConfigRequest; +import org.apache.apisix.plugin.runner.PostRequest; +import org.apache.apisix.plugin.runner.A6ConfigResponse; +import org.apache.apisix.plugin.runner.exception.ApisixException; +import org.apache.apisix.plugin.runner.filter.PluginFilter; +import org.apache.apisix.plugin.runner.filter.PluginFilterChain; +import org.apache.apisix.plugin.runner.server.ApplicationRunner; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@DisplayName("test global exception passed SPI ") +public class TestGlobalExceptionExt { + + private PrintStream console = null; + private ByteArrayOutputStream bytes = null; + + RpcCallHandler rpcCallHandler; + + Cache<Long, A6Conf> cache; + + Map<String, PluginFilter> filters; + + List<A6ConfigWatcher> watchers; + + EmbeddedChannel channel; + + PrepareConfHandler prepareConfHandler; + + long confToken; + + @BeforeEach + void setUp() { + bytes = new ByteArrayOutputStream(); + console = System.out; + System.setOut(new PrintStream(bytes)); + + filters = new HashMap<>(); + filters.put("UpstreamFilter", new PluginFilter() { + @Override + public String name() { + return "UpstreamFilter"; + } + + @Override + public void postFilter(PostRequest request, PostResponse response, PluginFilterChain chain) { + throw new ApisixException(10001, "exception"); + } + } + ); + watchers = new ArrayList<>(); + cache = CacheBuilder.newBuilder().expireAfterWrite(3600, TimeUnit.SECONDS).maximumSize(1000).build(); + FlatBufferBuilder builder = new FlatBufferBuilder(); + + int foo = builder.createString("UpstreamFilter"); + int bar = builder.createString("{\"conf_key1\":\"conf_value1\",\"conf_key2\":2}"); + int filter = TextEntry.createTextEntry(builder, foo, bar); + + int confVector = io.github.api7.A6.PrepareConf.Req.createConfVector(builder, new int[]{filter}); + io.github.api7.A6.PrepareConf.Req.startReq(builder); + io.github.api7.A6.PrepareConf.Req.addConf(builder, confVector); + builder.finish(io.github.api7.A6.PrepareConf.Req.endReq(builder)); + io.github.api7.A6.PrepareConf.Req req = io.github.api7.A6.PrepareConf.Req.getRootAsReq(builder.dataBuffer()); + A6ConfigRequest request = new A6ConfigRequest(req); + prepareConfHandler = new PrepareConfHandler(cache, filters, watchers); + channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, new ExceptionCaughtHandler(ApplicationRunner.EXCEPTION_LIST)); + channel.writeInbound(request); + channel.finish(); + A6ConfigResponse response = channel.readOutbound(); + confToken = response.getConfToken(); + + prepareConfHandler = new PrepareConfHandler(cache, filters, watchers); + rpcCallHandler = new RpcCallHandler(cache); + channel = new EmbeddedChannel(new BinaryProtocolDecoder(), prepareConfHandler, rpcCallHandler, new ExceptionCaughtHandler(ApplicationRunner.EXCEPTION_LIST)); + } + + @AfterEach + void setDown() { + System.setOut(console); + } + + @Test + @DisplayName("test spiExceptionExt") + void doPostFilter() { + FlatBufferBuilder builder = new FlatBufferBuilder(); + + int headerKey = builder.createString("headerKey"); + int headerValue = builder.createString("headerValue"); + + int header = TextEntry.createTextEntry(builder, headerKey, headerValue); + int headerVector = + io.github.api7.A6.HTTPRespCall.Req.createHeadersVector(builder, new int[]{header}); + io.github.api7.A6.HTTPRespCall.Req.startReq(builder); + io.github.api7.A6.HTTPRespCall.Req.addId(builder, 8888L); + io.github.api7.A6.HTTPRespCall.Req.addConfToken(builder, confToken); + io.github.api7.A6.HTTPRespCall.Req.addStatus(builder, 418); + io.github.api7.A6.HTTPRespCall.Req.addHeaders(builder, headerVector); + builder.finish(io.github.api7.A6.HTTPRespCall.Req.endReq(builder)); + io.github.api7.A6.HTTPRespCall.Req req = io.github.api7.A6.HTTPRespCall.Req.getRootAsReq(builder.dataBuffer()); + PostRequest request = new PostRequest(req); + channel.writeInbound(request); + channel.finish(); + PostResponse response = channel.readOutbound(); + io.github.api7.A6.Err.Resp err = io.github.api7.A6.Err.Resp.getRootAsResp(response.encode()); + Assertions.assertEquals(err.code(), Code.SERVICE_UNAVAILABLE); + } +} diff --git a/runner-core/src/test/resources/META-INF/services/org.apache.apisix.plugin.runner.exception.ExceptionCaught b/runner-core/src/test/resources/META-INF/services/org.apache.apisix.plugin.runner.exception.ExceptionCaught new file mode 100644 index 00000000..c8d2a470 --- /dev/null +++ b/runner-core/src/test/resources/META-INF/services/org.apache.apisix.plugin.runner.exception.ExceptionCaught @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.apisix.plugin.runner.handler.GlobalException