Skip to content

Commit

Permalink
ARTEMIS-5308 Replicate journal after local checks
Browse files Browse the repository at this point in the history
Say there's an issue with the record on the local journal, it would fail on the local journal before reaching the replica.
  • Loading branch information
brusdev authored and clebertsuconic committed Feb 13, 2025
1 parent 00f69a1 commit 6dd8837
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,6 @@ public LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage large
ActiveMQServerLogger.LOGGER.messageWithHeaderTooLarge(largeMessage.getMessageID(), logger.getName());

logger.debug("Message header too large for {}", largeMessage);
new Exception("Trace").printStackTrace();

throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(messageEncodeSize, maxRecordSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public void appendAddRecord(final long id,
if (logger.isTraceEnabled()) {
logger.trace("Append record id = {} recordType = {}", id, recordType);
}
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record);
localJournal.appendAddRecord(id, recordType, persister, record, sync);
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record);
}

/**
Expand All @@ -137,8 +137,8 @@ public void appendAddRecord(final long id,
if (logger.isTraceEnabled()) {
logger.trace("Append record id = {} recordType = {}", id, recordType);
}
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record);
localJournal.appendAddRecord(id, recordType, persister, record, sync, completionCallback);
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record);
}

@Override
Expand All @@ -151,8 +151,8 @@ public void appendAddEvent(long id,
if (logger.isTraceEnabled()) {
logger.trace("Append record id = {} recordType = {}", id, recordType);
}
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.EVENT, id, recordType, persister, record);
localJournal.appendAddEvent(id, recordType, persister, record, sync, completionCallback);
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.EVENT, id, recordType, persister, record);
}

/**
Expand Down Expand Up @@ -188,8 +188,8 @@ public void appendAddRecordTransactional(final long txID,
if (logger.isTraceEnabled()) {
logger.trace("Append record txID={} recordType = {}", id, recordType);
}
replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.ADD, txID, id, recordType, persister, record);
localJournal.appendAddRecordTransactional(txID, id, recordType, persister, record);
replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.ADD, txID, id, recordType, persister, record);
}

/**
Expand All @@ -203,17 +203,17 @@ public void appendCommitRecord(final long txID, final boolean sync) throws Excep
if (logger.isTraceEnabled()) {
logger.trace("AppendCommit txID={}", txID);
}
replicationManager.appendCommitRecord(journalID, txID, sync, true);
localJournal.appendCommitRecord(txID, sync);
replicationManager.appendCommitRecord(journalID, txID, sync, true);
}

@Override
public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("AppendCommit {}", txID);
}
replicationManager.appendCommitRecord(journalID, txID, sync, true);
localJournal.appendCommitRecord(txID, sync, callback);
replicationManager.appendCommitRecord(journalID, txID, sync, true);
}

@Override
Expand All @@ -224,8 +224,8 @@ public void appendCommitRecord(long txID,
if (logger.isTraceEnabled()) {
logger.trace("AppendCommit {}", txID);
}
replicationManager.appendCommitRecord(journalID, txID, sync, lineUpContext);
localJournal.appendCommitRecord(txID, sync, callback, lineUpContext);
replicationManager.appendCommitRecord(journalID, txID, sync, lineUpContext);
}

/**
Expand All @@ -239,8 +239,8 @@ public void appendDeleteRecord(final long id, final boolean sync) throws Excepti
if (logger.isTraceEnabled()) {
logger.trace("AppendDelete {}", id);
}
replicationManager.appendDeleteRecord(journalID, id);
localJournal.appendDeleteRecord(id, sync);
replicationManager.appendDeleteRecord(journalID, id);
}

/**
Expand All @@ -254,8 +254,8 @@ public void tryAppendDeleteRecord(final long id, final JournalUpdateCallback upd
if (logger.isTraceEnabled()) {
logger.trace("AppendDelete {}", id);
}
replicationManager.appendDeleteRecord(journalID, id);
localJournal.tryAppendDeleteRecord(id, updateCallback, sync);
replicationManager.appendDeleteRecord(journalID, id);
}

@Override
Expand All @@ -265,8 +265,8 @@ public void appendDeleteRecord(final long id,
if (logger.isTraceEnabled()) {
logger.trace("AppendDelete {}", id);
}
replicationManager.appendDeleteRecord(journalID, id);
localJournal.appendDeleteRecord(id, sync, completionCallback);
replicationManager.appendDeleteRecord(journalID, id);
}

@Override
Expand All @@ -277,8 +277,8 @@ public void tryAppendDeleteRecord(final long id,
if (logger.isTraceEnabled()) {
logger.trace("AppendDelete {}", id);
}
replicationManager.appendDeleteRecord(journalID, id);
localJournal.tryAppendDeleteRecord(id, sync, updateCallback, completionCallback);
replicationManager.appendDeleteRecord(journalID, id);
}
/**
* @param txID
Expand Down Expand Up @@ -306,8 +306,8 @@ public void appendDeleteRecordTransactional(final long txID,
if (logger.isTraceEnabled()) {
logger.trace("AppendDelete txID={} id={}", txID, id);
}
replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
localJournal.appendDeleteRecordTransactional(txID, id, record);
replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
}

/**
Expand All @@ -321,8 +321,8 @@ public void appendDeleteRecordTransactional(final long txID, final long id) thro
if (logger.isTraceEnabled()) {
logger.trace("AppendDelete (noencoding) txID={} id={}", txID, id);
}
replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
localJournal.appendDeleteRecordTransactional(txID, id);
replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
}

/**
Expand Down Expand Up @@ -351,8 +351,8 @@ public void appendPrepareRecord(final long txID,
if (logger.isTraceEnabled()) {
logger.trace("AppendPrepare txID={}", txID);
}
replicationManager.appendPrepareRecord(journalID, txID, transactionData);
localJournal.appendPrepareRecord(txID, transactionData, sync);
replicationManager.appendPrepareRecord(journalID, txID, transactionData);
}

@Override
Expand All @@ -363,8 +363,8 @@ public void appendPrepareRecord(final long txID,
if (logger.isTraceEnabled()) {
logger.trace("AppendPrepare txID={}", txID);
}
replicationManager.appendPrepareRecord(journalID, txID, transactionData);
localJournal.appendPrepareRecord(txID, transactionData, sync, callback);
replicationManager.appendPrepareRecord(journalID, txID, transactionData);
}

/**
Expand All @@ -378,17 +378,17 @@ public void appendRollbackRecord(final long txID, final boolean sync) throws Exc
if (logger.isTraceEnabled()) {
logger.trace("AppendRollback {}", txID);
}
replicationManager.appendRollbackRecord(journalID, txID);
localJournal.appendRollbackRecord(txID, sync);
replicationManager.appendRollbackRecord(journalID, txID);
}

@Override
public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("AppendRollback {}", txID);
}
replicationManager.appendRollbackRecord(journalID, txID);
localJournal.appendRollbackRecord(txID, sync, callback);
replicationManager.appendRollbackRecord(journalID, txID);
}

/**
Expand Down Expand Up @@ -435,8 +435,8 @@ public void appendUpdateRecord(final long id,
if (logger.isTraceEnabled()) {
logger.trace("AppendUpdateRecord id = {} , recordType = {}", id, recordType);
}
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
localJournal.appendUpdateRecord(id, recordType, persister, record, sync);
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
}

@Override
Expand All @@ -449,8 +449,8 @@ public void tryAppendUpdateRecord(final long id,
if (logger.isTraceEnabled()) {
logger.trace("AppendUpdateRecord id = {} , recordType = {}", id, recordType);
}
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
localJournal.tryAppendUpdateRecord(id, recordType, persister, record, updateCallback, sync, replaceable);
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
}

@Override
Expand All @@ -463,8 +463,8 @@ public void appendUpdateRecord(final long id,
if (logger.isTraceEnabled()) {
logger.trace("AppendUpdateRecord id = {} , recordType = {}", id, journalRecordType);
}
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
localJournal.appendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback);
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
}

@Override
Expand All @@ -479,8 +479,8 @@ public void tryAppendUpdateRecord(final long id,
if (logger.isTraceEnabled()) {
logger.trace("AppendUpdateRecord id = {} , recordType = {}", id, journalRecordType);
}
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, replaceableUpdate, updateCallback, completionCallback);
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
}

/**
Expand Down Expand Up @@ -516,8 +516,8 @@ public void appendUpdateRecordTransactional(final long txID,
if (logger.isTraceEnabled()) {
logger.trace("AppendUpdateRecord txid={} id = {} , recordType = {}", txID, id, recordType);
}
replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, persister, record);
localJournal.appendUpdateRecordTransactional(txID, id, recordType, persister, record);
replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, persister, record);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.replication;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.core.journal.Journal;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class ReplicatedJournalTest {

private static final int NO_INVOCATION = 0;
private static final int JOURNAL_INVOCATION = 1;
private static final int REPLICA_INVOCATION = 2;

@Test
public void testAppendInvocationOrder() throws Exception {
AtomicInteger firstInvocation = new AtomicInteger(NO_INVOCATION);

Journal mockJournal = Mockito.mock(Journal.class, invocationOnMock -> {
if (invocationOnMock.getMethod().getName().startsWith("append") ||
invocationOnMock.getMethod().getName().startsWith("tryAppend")) {
firstInvocation.compareAndSet(NO_INVOCATION, JOURNAL_INVOCATION);
}
return null;
});

ReplicationManager replicationManager = Mockito.mock(ReplicationManager.class, invocationOnMock -> {
if (invocationOnMock.getMethod().getName().startsWith("append") ||
invocationOnMock.getMethod().getName().startsWith("tryAppend")) {
firstInvocation.compareAndSet(NO_INVOCATION, REPLICA_INVOCATION);
}
return null;
});

ReplicatedJournal replicatedJournal = new ReplicatedJournal((byte)0, mockJournal, replicationManager);

for (Method method : ReplicatedJournal.class.getDeclaredMethods()) {
if (method.getName().startsWith("append") ||
method.getName().startsWith("tryAppend")) {
List<Object> args = new ArrayList<>();
for (Class parameterType : method.getParameterTypes()) {
if (boolean.class.equals(parameterType)) {
args.add(false);
} else if (byte.class.equals(parameterType)) {
args.add((byte)0);
} else if (long.class.equals(parameterType)) {
args.add((long)0);
} else {
args.add(null);
}
}

method.invoke(replicatedJournal, args.toArray());

assertEquals(JOURNAL_INVOCATION, firstInvocation.get(), method.toString());

firstInvocation.set(NO_INVOCATION);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -363,6 +364,62 @@ public void testSendPacketsWithFailure() throws Exception {
}
}

@TestTemplate
public void testSendMessageWithLargeHeader() throws Exception {
setupServer(true, TestInterceptor.class.getName());

manager = liveServer.getReplicationManager();
waitForComponent(manager);

ClientSessionFactory sf = createSessionFactory(locator);

try (ClientSession producerSession = sf.createSession();
ClientSession consumerSession = sf.createSession()) {

producerSession.start();
producerSession.createQueue(QueueConfiguration.of(ADDRESS));
ClientProducer producer = producerSession.createProducer(ADDRESS);

consumerSession.start();
ClientConsumer consumer = consumerSession.createConsumer(ADDRESS);

ClientMessage messageBefore = producerSession.createMessage(true);
setBody(0, messageBefore);
messageBefore.putIntProperty("counter", 0);
producer.send(messageBefore);

ClientMessage messageReceivedBefore = consumer.receive(1000);
assertNotNull(messageReceivedBefore, "Message should exist!");
assertMessageBody(0, messageReceivedBefore);
assertEquals(0, messageReceivedBefore.getIntProperty("counter").intValue());
messageReceivedBefore.acknowledge();

ClientMessage messageWithLargeHeader = producerSession.createMessage(true);
setBody(1, messageWithLargeHeader);
messageWithLargeHeader.putIntProperty("counter", 1);
messageWithLargeHeader.putStringProperty("large-property", "z".repeat(512 * 1024));
try {
producer.send(messageWithLargeHeader);
fail();
} catch (Exception e) {
assertTrue(e.getMessage().contains("AMQ149005"));
}

ClientMessage messageAfter = producerSession.createMessage(true);
setBody(2, messageAfter);
messageAfter.putIntProperty("counter", 2);
producer.send(messageAfter);

ClientMessage messageReceivedAfter = consumer.receive(1000);
assertNotNull(messageReceivedAfter, "Message should exist!");
assertMessageBody(2, messageReceivedAfter);
assertEquals(2, messageReceivedAfter.getIntProperty("counter").intValue());
messageReceivedAfter.acknowledge();

assertNull(consumer.receiveImmediate());
}
}

@TestTemplate
public void testExceptionSettingActionBefore() throws Exception {
OperationContext ctx = OperationContextImpl.getContext(factory);
Expand Down

0 comments on commit 6dd8837

Please sign in to comment.