diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java index d59d0268..c6a3fffc 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java @@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext; import io.openmessaging.storage.dledger.protocol.AppendEntryRequest; import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; +import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest; import io.openmessaging.storage.dledger.protocol.DLedgerRequestCode; import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; import io.openmessaging.storage.dledger.protocol.GetEntriesRequest; @@ -37,6 +38,7 @@ import io.openmessaging.storage.dledger.protocol.RequestOrResponse; import io.openmessaging.storage.dledger.protocol.VoteRequest; import io.openmessaging.storage.dledger.protocol.VoteResponse; +import io.openmessaging.storage.dledger.remoting.header.AppendHeader; import io.openmessaging.storage.dledger.utils.DLedgerUtils; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -77,11 +79,13 @@ public DLedgerRpcNettyService(DLedgerServer dLedgerServer) { this(dLedgerServer, null, null, null); } - public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) { + public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, + NettyClientConfig nettyClientConfig) { this(dLedgerServer, nettyServerConfig, nettyClientConfig, null); } - public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) { + public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, + NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) { this.dLedgerServer = dLedgerServer; this.memberState = dLedgerServer.getMemberState(); NettyRequestProcessor protocolProcessor = new NettyRequestProcessor() { @@ -254,7 +258,7 @@ public CompletableFuture push(PushEntryRequest request) throw @Override public CompletableFuture leadershipTransfer( - LeadershipTransferRequest request) throws Exception { + LeadershipTransferRequest request) throws Exception { CompletableFuture future = new CompletableFuture<>(); try { RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), null); @@ -283,7 +287,7 @@ public CompletableFuture leadershipTransfer( } private void writeResponse(RequestOrResponse storeResp, Throwable t, RemotingCommand request, - ChannelHandlerContext ctx) { + ChannelHandlerContext ctx) { RemotingCommand response = null; try { if (t != null) { @@ -325,7 +329,10 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand break; } case APPEND: { - AppendEntryRequest appendEntryRequest = JSON.parseObject(request.getBody(), AppendEntryRequest.class); + AppendHeader appendHeader = (AppendHeader) request.decodeCommandCustomHeader(AppendHeader.class); + AppendEntryRequest appendEntryRequest = ((appendHeader == null) | !appendHeader.isBatch()) ? + JSON.parseObject(request.getBody(), AppendEntryRequest.class) : + JSON.parseObject(request.getBody(), BatchAppendEntryRequest.class); CompletableFuture future = handleAppend(appendEntryRequest); future.whenCompleteAsync((x, y) -> { writeResponse(x, y, request, ctx); @@ -379,7 +386,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand future.whenCompleteAsync((x, y) -> { writeResponse(x, y, request, ctx); logger.info("LEADERSHIP_TRANSFER FINISHED. Request={}, response={}, cost={}ms", - request, x, DLedgerUtils.elapsed(start)); + request, x, DLedgerUtils.elapsed(start)); }, futureExecutor); break; } @@ -392,7 +399,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand @Override public CompletableFuture handleLeadershipTransfer( - LeadershipTransferRequest leadershipTransferRequest) throws Exception { + LeadershipTransferRequest leadershipTransferRequest) throws Exception { return dLedgerServer.handleLeadershipTransfer(leadershipTransferRequest); } diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java index 723e1c9a..c81500e6 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java @@ -211,7 +211,7 @@ public CompletableFuture handleAppend(AppendEntryRequest re // record positions to return; long[] positions = new long[batchRequest.getBatchMsgs().size()]; DLedgerEntry resEntry = null; - // split bodys to append + // split bodies to append int index = 0; Iterator iterator = batchRequest.getBatchMsgs().iterator(); while (iterator.hasNext()) { @@ -227,7 +227,7 @@ public CompletableFuture handleAppend(AppendEntryRequest re return batchAppendFuture; } throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" + - " with empty bodys"); + " with empty bodies"); } else { DLedgerEntry dLedgerEntry = new DLedgerEntry(); dLedgerEntry.setBody(request.getBody()); diff --git a/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClient.java b/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClient.java index 4d45171c..9c4e3dd4 100644 --- a/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClient.java +++ b/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClient.java @@ -19,6 +19,7 @@ import io.openmessaging.storage.dledger.ShutdownAbleThread; import io.openmessaging.storage.dledger.protocol.AppendEntryRequest; import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; +import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest; import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; import io.openmessaging.storage.dledger.protocol.GetEntriesRequest; import io.openmessaging.storage.dledger.protocol.GetEntriesResponse; @@ -28,6 +29,8 @@ import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest; import io.openmessaging.storage.dledger.utils.DLedgerUtils; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -55,6 +58,18 @@ public DLedgerClient(String group, String peers) { } public AppendEntryResponse append(byte[] body) { + return batchAppend(Collections.singletonList(body)); + } + + public AppendEntryResponse batchAppend(List bodies) { + + if (null == bodies || bodies.size() == 0) { + logger.warn("Batch append data is empty"); + AppendEntryResponse appendEntryResponse = new AppendEntryResponse(); + appendEntryResponse.setCode(DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()); + return appendEntryResponse; + } + try { waitOnUpdatingMetadata(1500, false); if (leaderId == null) { @@ -62,10 +77,16 @@ public AppendEntryResponse append(byte[] body) { appendEntryResponse.setCode(DLedgerResponseCode.METADATA_ERROR.getCode()); return appendEntryResponse; } - AppendEntryRequest appendEntryRequest = new AppendEntryRequest(); + AppendEntryRequest appendEntryRequest; + if (bodies.size() == 1) { + appendEntryRequest = new AppendEntryRequest(); + appendEntryRequest.setBody(bodies.get(0)); + } else { + appendEntryRequest = new BatchAppendEntryRequest(); + ((BatchAppendEntryRequest) appendEntryRequest).setBatchMsgs(bodies); + } appendEntryRequest.setGroup(group); appendEntryRequest.setRemoteId(leaderId); - appendEntryRequest.setBody(body); AppendEntryResponse response = dLedgerClientRpcService.append(appendEntryRequest).get(); if (response.getCode() == DLedgerResponseCode.NOT_LEADER.getCode()) { waitOnUpdatingMetadata(1500, true); @@ -82,6 +103,7 @@ public AppendEntryResponse append(byte[] body) { appendEntryResponse.setCode(DLedgerResponseCode.INTERNAL_ERROR.getCode()); return appendEntryResponse; } + } public GetEntriesResponse get(long index) { diff --git a/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClientRpcNettyService.java b/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClientRpcNettyService.java index 465b562c..e7809300 100644 --- a/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClientRpcNettyService.java +++ b/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClientRpcNettyService.java @@ -19,6 +19,7 @@ import com.alibaba.fastjson.JSON; import io.openmessaging.storage.dledger.protocol.AppendEntryRequest; import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; +import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest; import io.openmessaging.storage.dledger.protocol.DLedgerRequestCode; import io.openmessaging.storage.dledger.protocol.GetEntriesRequest; import io.openmessaging.storage.dledger.protocol.GetEntriesResponse; @@ -26,6 +27,7 @@ import io.openmessaging.storage.dledger.protocol.MetadataResponse; import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse; import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest; +import io.openmessaging.storage.dledger.remoting.header.AppendHeader; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; @@ -41,7 +43,13 @@ public DLedgerClientRpcNettyService() { @Override public CompletableFuture append(AppendEntryRequest request) throws Exception { - RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.APPEND.getCode(), null); + + AppendHeader appendHeader = null; + if (request instanceof BatchAppendEntryRequest) { + appendHeader = new AppendHeader(); + appendHeader.setBatch(true); + } + RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.APPEND.getCode(), appendHeader); wrapperRequest.setBody(JSON.toJSONBytes(request)); RemotingCommand wrapperResponse = this.remotingClient.invokeSync(getPeerAddr(request.getRemoteId()), wrapperRequest, 3000); AppendEntryResponse response = JSON.parseObject(wrapperResponse.getBody(), AppendEntryResponse.class); @@ -58,7 +66,8 @@ public CompletableFuture metadata(MetadataRequest request) thr } @Override - public CompletableFuture leadershipTransfer(LeadershipTransferRequest request) throws Exception { + public CompletableFuture leadershipTransfer( + LeadershipTransferRequest request) throws Exception { RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), null); wrapperRequest.setBody(JSON.toJSONBytes(request)); RemotingCommand wrapperResponse = this.remotingClient.invokeSync(getPeerAddr(request.getRemoteId()), wrapperRequest, 10000); diff --git a/src/main/java/io/openmessaging/storage/dledger/cmdline/AppendCommand.java b/src/main/java/io/openmessaging/storage/dledger/cmdline/AppendCommand.java index 5ef36d8a..526b11ba 100644 --- a/src/main/java/io/openmessaging/storage/dledger/cmdline/AppendCommand.java +++ b/src/main/java/io/openmessaging/storage/dledger/cmdline/AppendCommand.java @@ -18,11 +18,16 @@ import com.alibaba.fastjson.JSON; import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; import io.openmessaging.storage.dledger.client.DLedgerClient; import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Parameters(commandDescription = "append data to DLedger server") public class AppendCommand extends BaseCommand { private static Logger logger = LoggerFactory.getLogger(AppendCommand.class); @@ -34,18 +39,32 @@ public class AppendCommand extends BaseCommand { private String peers = "n0-localhost:20911"; @Parameter(names = {"--data", "-d"}, description = "the data to append") - private String data = "Hello"; + private List data = new ArrayList<>(); @Parameter(names = {"--count", "-c"}, description = "append several times") private int count = 1; @Override public void doCommand() { + + if (null == data || data.isEmpty()) { + logger.warn("Not data append to dledger server"); + return; + } DLedgerClient dLedgerClient = new DLedgerClient(group, peers); dLedgerClient.startup(); - for (int i = 0; i < count; i++) { - AppendEntryResponse response = dLedgerClient.append(data.getBytes()); - logger.info("Append Result:{}", JSON.toJSONString(response)); + if (data.size() == 1) { + byte[] dataBytes = data.get(0).getBytes(); + for (int i = 0; i < count; i++) { + AppendEntryResponse response = dLedgerClient.append(dataBytes); + logger.info("Append Result:{}", JSON.toJSONString(response)); + } + } else { + List dataList = data.stream().map(String::getBytes).collect(Collectors.toList()); + for (int i = 0; i < count; i++) { + AppendEntryResponse response = dLedgerClient.batchAppend(dataList); + logger.info("Append Result:{}", JSON.toJSONString(response)); + } } dLedgerClient.shutdown(); } diff --git a/src/main/java/io/openmessaging/storage/dledger/remoting/header/AppendHeader.java b/src/main/java/io/openmessaging/storage/dledger/remoting/header/AppendHeader.java new file mode 100644 index 00000000..784ae36f --- /dev/null +++ b/src/main/java/io/openmessaging/storage/dledger/remoting/header/AppendHeader.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017-2022 The DLedger Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.remoting.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class AppendHeader implements CommandCustomHeader { + + private boolean batch = false; + + public boolean isBatch() { + return batch; + } + + public void setBatch(boolean batch) { + this.batch = batch; + } + + @Override + public void checkFields() throws RemotingCommandException { + //nothing to do + } +} diff --git a/src/test/java/io/openmessaging/storage/dledger/client/DLedgerClientTest.java b/src/test/java/io/openmessaging/storage/dledger/client/DLedgerClientTest.java new file mode 100644 index 00000000..509f925b --- /dev/null +++ b/src/test/java/io/openmessaging/storage/dledger/client/DLedgerClientTest.java @@ -0,0 +1,124 @@ +/* + * Copyright 2017-2022 The DLedger Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.client; + +import io.openmessaging.storage.dledger.DLedgerConfig; +import io.openmessaging.storage.dledger.DLedgerServer; +import io.openmessaging.storage.dledger.ServerTestHarness; +import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; +import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; +import io.openmessaging.storage.dledger.protocol.GetEntriesResponse; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DLedgerClientTest extends ServerTestHarness { + + @Test + void testAppendSingleServer() { + + String group = UUID.randomUUID().toString(); + String selfId = "n0"; + String peers = "n0-localhost:10001"; + launchServer(group, peers, selfId, selfId, DLedgerConfig.FILE); + DLedgerClient dLedgerClient = launchClient(group, peers); + AppendEntryResponse append = dLedgerClient.append(group.getBytes(StandardCharsets.UTF_8)); + GetEntriesResponse response = dLedgerClient.get(append.getIndex()); + Assertions.assertEquals(group, new String(response.getEntries().get(0).getBody())); + } + + @Test + void testAppendThreeServer() throws InterruptedException { + + String group = UUID.randomUUID().toString(); + String peers = "n0-localhost:10006;n1-localhost:10007;n2-localhost:10008"; + DLedgerServer dLedgerServer0 = launchServer(group, peers, "n0", "n1", DLedgerConfig.FILE); + DLedgerServer dLedgerServer1 = launchServer(group, peers, "n1", "n1", DLedgerConfig.FILE); + DLedgerServer dLedgerServer2 = launchServer(group, peers, "n2", "n1", DLedgerConfig.FILE); + DLedgerClient dLedgerClient = launchClient(group, peers); + + for (int i = 0; i < 10; i++) { + AppendEntryResponse appendEntryResponse = dLedgerClient.append((group + i).getBytes(StandardCharsets.UTF_8)); + Assertions.assertEquals(appendEntryResponse.getCode(), DLedgerResponseCode.SUCCESS.getCode()); + Assertions.assertEquals(i, appendEntryResponse.getIndex()); + } + + Thread.sleep(100); + Assertions.assertEquals(9, dLedgerServer0.getDLedgerStore().getLedgerEndIndex()); + Assertions.assertEquals(9, dLedgerServer1.getDLedgerStore().getLedgerEndIndex()); + Assertions.assertEquals(9, dLedgerServer2.getDLedgerStore().getLedgerEndIndex()); + + for (int i = 0; i < 10; i++) { + GetEntriesResponse getEntriesResponse = dLedgerClient.get(i); + Assertions.assertEquals(1, getEntriesResponse.getEntries().size()); + Assertions.assertEquals(i, getEntriesResponse.getEntries().get(0).getIndex()); + Assertions.assertArrayEquals((group + i).getBytes(), getEntriesResponse.getEntries().get(0).getBody()); + } + } + + @Test + void testBatchAppendSingleServer() { + String group = UUID.randomUUID().toString(); + String selfId = "n0"; + String peers = "n0-localhost:10001"; + launchServer(group, peers, selfId, selfId, DLedgerConfig.FILE); + DLedgerClient dLedgerClient = launchClient(group, peers); + List bodies = new ArrayList<>(); + bodies.add("1".getBytes(StandardCharsets.UTF_8)); + bodies.add("2".getBytes(StandardCharsets.UTF_8)); + bodies.add("3".getBytes(StandardCharsets.UTF_8)); + dLedgerClient.batchAppend(bodies); + GetEntriesResponse response0 = dLedgerClient.get(0); + GetEntriesResponse response1 = dLedgerClient.get(1); + GetEntriesResponse response2 = dLedgerClient.get(2); + Assertions.assertEquals("1", new String(response0.getEntries().get(0).getBody())); + Assertions.assertEquals("2", new String(response1.getEntries().get(0).getBody())); + Assertions.assertEquals("3", new String(response2.getEntries().get(0).getBody())); + } + + @Test + void testBatchAppendThreeServer() throws InterruptedException { + + String group = UUID.randomUUID().toString(); + String peers = "n0-localhost:10006;n1-localhost:10007;n2-localhost:10008"; + DLedgerServer dLedgerServer0 = launchServer(group, peers, "n0", "n1", DLedgerConfig.FILE); + DLedgerServer dLedgerServer1 = launchServer(group, peers, "n1", "n1", DLedgerConfig.FILE); + DLedgerServer dLedgerServer2 = launchServer(group, peers, "n2", "n1", DLedgerConfig.FILE); + DLedgerClient dLedgerClient = launchClient(group, peers); + List bodies = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + bodies.add((group + i).getBytes(StandardCharsets.UTF_8)); + } + AppendEntryResponse appendEntryResponse = dLedgerClient.batchAppend(bodies); + Assertions.assertEquals(appendEntryResponse.getCode(), DLedgerResponseCode.SUCCESS.getCode()); + Assertions.assertEquals(9L, appendEntryResponse.getIndex()); + Thread.sleep(100); + Assertions.assertEquals(9, dLedgerServer0.getDLedgerStore().getLedgerEndIndex()); + Assertions.assertEquals(9, dLedgerServer1.getDLedgerStore().getLedgerEndIndex()); + Assertions.assertEquals(9, dLedgerServer2.getDLedgerStore().getLedgerEndIndex()); + + for (int i = 0; i < 10; i++) { + GetEntriesResponse getEntriesResponse = dLedgerClient.get(i); + Assertions.assertEquals(1, getEntriesResponse.getEntries().size()); + Assertions.assertEquals(i, getEntriesResponse.getEntries().get(0).getIndex()); + Assertions.assertArrayEquals((group + i).getBytes(), getEntriesResponse.getEntries().get(0).getBody()); + } + } +} \ No newline at end of file