diff --git a/pom.xml b/pom.xml index 25baeb2..ef83f98 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.stimulussoft filequeue file queue project - 1.3.0 + 1.4.0 jar Light weight, high performance, simple, reliable and persistent queue https://github.com/stimulussoft/filequeue @@ -232,6 +232,11 @@ jackson-databind ${jackson} + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson} + com.google.guava guava diff --git a/src/main/java/com/stimulussoft/filequeue/FileQueue.java b/src/main/java/com/stimulussoft/filequeue/FileQueue.java index 1530faf..c225b7d 100755 --- a/src/main/java/com/stimulussoft/filequeue/FileQueue.java +++ b/src/main/java/com/stimulussoft/filequeue/FileQueue.java @@ -14,6 +14,7 @@ package com.stimulussoft.filequeue; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.stimulussoft.filequeue.processor.Consumer; @@ -370,6 +371,19 @@ public int getMaxQueueSize() { return builder.getMaxQueueSize(); } + /** + * Set a custom {@link ObjectMapper} to use for serializing and deserializing {@link FileQueueItem} + * @param objectMapper ObjectMapper instance + * @return config configuration + */ + public Config objectMapper(ObjectMapper objectMapper) { + builder.objectMapper(objectMapper); + return this; + } + + public ObjectMapper getObjectMapper() { + return builder.getObjectMapper(); + } } /** diff --git a/src/main/java/com/stimulussoft/filequeue/processor/QueueProcessor.java b/src/main/java/com/stimulussoft/filequeue/processor/QueueProcessor.java index 9642d8e..0c51257 100644 --- a/src/main/java/com/stimulussoft/filequeue/processor/QueueProcessor.java +++ b/src/main/java/com/stimulussoft/filequeue/processor/QueueProcessor.java @@ -91,7 +91,7 @@ public enum RetryDelayAlgorithm {FIXED, EXPONENTIAL} if (builder.queuePath == null) throw new IllegalArgumentException("queue path must be specified"); if (builder.type == null) throw new IllegalArgumentException("item type must be specified"); if (builder.consumer == null) throw new IllegalArgumentException("consumer must be specified"); - objectMapper = createObjectMapper(); + objectMapper = Optional.ofNullable(builder.objectMapper).orElse(createObjectMapper()); if (!objectMapper.canSerialize(objectMapper.constructType(builder.type).getClass())) throw new IllegalArgumentException("The given type is not serializable. it cannot be serialized by jackson"); this.queueName = builder.queueName; diff --git a/src/main/java/com/stimulussoft/filequeue/processor/QueueProcessorBuilder.java b/src/main/java/com/stimulussoft/filequeue/processor/QueueProcessorBuilder.java index 411dbbc..b0fff0f 100644 --- a/src/main/java/com/stimulussoft/filequeue/processor/QueueProcessorBuilder.java +++ b/src/main/java/com/stimulussoft/filequeue/processor/QueueProcessorBuilder.java @@ -270,6 +270,10 @@ public QueueProcessorBuilder objectMapper(ObjectMapper objectMapper) { return this; } + public ObjectMapper getObjectMapper() { + return this.objectMapper; + } + public QueueProcessor build() throws IOException, IllegalStateException, IllegalArgumentException, InterruptedException { return new QueueProcessor<>(this); } diff --git a/src/test/java/com/stimulussoft/filequeue/FileQueueTest.java b/src/test/java/com/stimulussoft/filequeue/FileQueueTest.java index 43a1a08..d002df5 100644 --- a/src/test/java/com/stimulussoft/filequeue/FileQueueTest.java +++ b/src/test/java/com/stimulussoft/filequeue/FileQueueTest.java @@ -1,5 +1,8 @@ package com.stimulussoft.filequeue; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.google.common.collect.Maps; import com.google.common.io.MoreFiles; import com.google.common.io.RecursiveDeleteOption; @@ -12,13 +15,17 @@ import org.junit.Test; import java.io.File; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.NotDirectoryException; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.ZonedDateTime; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.fail; @@ -107,6 +114,50 @@ public void testWithoutRetries() throws Exception { MoreFiles.deleteDirectoryContents(db, RecursiveDeleteOption.ALLOW_INSECURE); } + @Test + public void testItemWithJava8DateTimeWithRetries() throws Exception { + String queueName = "testItemWithJava8DateTimeWithoutRetries"; + Path db = setup("filequeue test java8 datetime with retries", queueName, producedTestWithRetries, processedTestWithRetries); + retryTestWithRetries.clear(); + + MoreFiles.deleteDirectoryContents(db, RecursiveDeleteOption.ALLOW_INSECURE); + FileQueue queue = FileQueue.fileQueue(); + FileQueue.Config config = FileQueue.config(queueName, db, TestFileQueueItemWithDateTime.class, new TestRetryConsumer2(), executorService) + .maxQueueSize(MAXQUEUESIZE).maxTries(RETRIES) + .retryDelay(RETRYDELAY).retryDelayUnit(RetryDelayTimeUnit).persistRetryDelay(1); + + JsonMapper jsonMapper = JsonMapper.builder() + .findAndAddModules() + .addModule(new JavaTimeModule()) + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + .build(); + + config.objectMapper(jsonMapper); + + queue.startQueue(config); + Assert.assertEquals(queue.getConfig().getMaxTries(), RETRIES); + Assert.assertEquals(queue.getConfig().getRetryDelay(), RETRYDELAY); + Assert.assertEquals(queue.getConfig().getRetryDelayUnit(), RetryDelayTimeUnit); + + // we will use a thread pool here to test if queueItem() method is thread-safe. + ExecutorService executor = Executors.newFixedThreadPool(6); + for (int i = 0; i < ROUNDS; i++) { + final int no = i; + executor.execute(() -> { + producedTestWithRetries.incrementAndGet(); + try { + queue.queueItem(new TestFileQueueItemWithDateTime(no, ZonedDateTime.now()), 1, TimeUnit.MINUTES); + } catch (Exception e) { + fail("failed push items with 60 seconds"); + } + }); + } + done(queue, producedTestWithRetries, processedTestWithRetries, retryTestWithRetries, ROUNDS); + executor.shutdown(); + queue.stopQueue(); + MoreFiles.deleteDirectoryContents(db, RecursiveDeleteOption.ALLOW_INSECURE); + } + @Test public void testWithRetries() throws Exception { @@ -391,6 +442,26 @@ public Integer getId() { } + static class TestFileQueueItemWithDateTime extends TestFileQueueItem { + ZonedDateTime dateTime; + + public TestFileQueueItemWithDateTime() { + } + + public TestFileQueueItemWithDateTime(Integer id, ZonedDateTime dateTime) { + super(id); + this.dateTime = dateTime; + } + + public ZonedDateTime getDateTime() { + return dateTime; + } + + public void setDateTime(ZonedDateTime dateTime) { + this.dateTime = dateTime; + } + } + /* Implement Throwable Queue Item */