Skip to content

Fix ObjectMapper configuration to support java 8 date-time classes #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>com.stimulussoft</groupId>
<artifactId>filequeue</artifactId>
<name>file queue project</name>
<version>1.3.0</version>
<version>1.4.0</version>
<packaging>jar</packaging>
<description>Light weight, high performance, simple, reliable and persistent queue</description>
<url>https://github.com/stimulussoft/filequeue</url>
Expand Down Expand Up @@ -232,6 +232,11 @@
<artifactId>jackson-databind</artifactId>
<version>${jackson}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/stimulussoft/filequeue/FileQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.stimulussoft.filequeue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.stimulussoft.filequeue.processor.Consumer;
Expand Down Expand Up @@ -370,6 +371,19 @@ public int getMaxQueueSize() {
return builder.getMaxQueueSize();
}

/**
* Set a custom {@link ObjectMapper} to use for serializing and deserializing {@link FileQueueItem}
* @param objectMapper ObjectMapper instance
* @return config configuration
*/
public Config objectMapper(ObjectMapper objectMapper) {
builder.objectMapper(objectMapper);
return this;
}

public ObjectMapper getObjectMapper() {
return builder.getObjectMapper();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public enum RetryDelayAlgorithm {FIXED, EXPONENTIAL}
if (builder.queuePath == null) throw new IllegalArgumentException("queue path must be specified");
if (builder.type == null) throw new IllegalArgumentException("item type must be specified");
if (builder.consumer == null) throw new IllegalArgumentException("consumer must be specified");
objectMapper = createObjectMapper();
objectMapper = Optional.ofNullable(builder.objectMapper).orElse(createObjectMapper());
if (!objectMapper.canSerialize(objectMapper.constructType(builder.type).getClass()))
throw new IllegalArgumentException("The given type is not serializable. it cannot be serialized by jackson");
this.queueName = builder.queueName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ public QueueProcessorBuilder objectMapper(ObjectMapper objectMapper) {
return this;
}

public ObjectMapper getObjectMapper() {
return this.objectMapper;
}

public <T1 extends T> QueueProcessor build() throws IOException, IllegalStateException, IllegalArgumentException, InterruptedException {
return new QueueProcessor<>(this);
}
Expand Down
75 changes: 73 additions & 2 deletions src/test/java/com/stimulussoft/filequeue/FileQueueTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.stimulussoft.filequeue;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.collect.Maps;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
Expand All @@ -12,13 +15,17 @@
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NotDirectoryException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.ZonedDateTime;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.fail;
Expand Down Expand Up @@ -107,6 +114,50 @@ public void testWithoutRetries() throws Exception {
MoreFiles.deleteDirectoryContents(db, RecursiveDeleteOption.ALLOW_INSECURE);

}
@Test
public void testItemWithJava8DateTimeWithRetries() throws Exception {
String queueName = "testItemWithJava8DateTimeWithoutRetries";
Path db = setup("filequeue test java8 datetime with retries", queueName, producedTestWithRetries, processedTestWithRetries);
retryTestWithRetries.clear();

MoreFiles.deleteDirectoryContents(db, RecursiveDeleteOption.ALLOW_INSECURE);
FileQueue<FileQueueItem> queue = FileQueue.fileQueue();
FileQueue.Config config = FileQueue.config(queueName, db, TestFileQueueItemWithDateTime.class, new TestRetryConsumer2(), executorService)
.maxQueueSize(MAXQUEUESIZE).maxTries(RETRIES)
.retryDelay(RETRYDELAY).retryDelayUnit(RetryDelayTimeUnit).persistRetryDelay(1);

JsonMapper jsonMapper = JsonMapper.builder()
.findAndAddModules()
.addModule(new JavaTimeModule())
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.build();

config.objectMapper(jsonMapper);

queue.startQueue(config);
Assert.assertEquals(queue.getConfig().getMaxTries(), RETRIES);
Assert.assertEquals(queue.getConfig().getRetryDelay(), RETRYDELAY);
Assert.assertEquals(queue.getConfig().getRetryDelayUnit(), RetryDelayTimeUnit);

// we will use a thread pool here to test if queueItem() method is thread-safe.
ExecutorService executor = Executors.newFixedThreadPool(6);
for (int i = 0; i < ROUNDS; i++) {
final int no = i;
executor.execute(() -> {
producedTestWithRetries.incrementAndGet();
try {
queue.queueItem(new TestFileQueueItemWithDateTime(no, ZonedDateTime.now()), 1, TimeUnit.MINUTES);
} catch (Exception e) {
fail("failed push items with 60 seconds");
}
});
}
done(queue, producedTestWithRetries, processedTestWithRetries, retryTestWithRetries, ROUNDS);
executor.shutdown();
queue.stopQueue();
MoreFiles.deleteDirectoryContents(db, RecursiveDeleteOption.ALLOW_INSECURE);
}


@Test
public void testWithRetries() throws Exception {
Expand Down Expand Up @@ -391,6 +442,26 @@ public Integer getId() {

}

static class TestFileQueueItemWithDateTime extends TestFileQueueItem {
ZonedDateTime dateTime;

public TestFileQueueItemWithDateTime() {
}

public TestFileQueueItemWithDateTime(Integer id, ZonedDateTime dateTime) {
super(id);
this.dateTime = dateTime;
}

public ZonedDateTime getDateTime() {
return dateTime;
}

public void setDateTime(ZonedDateTime dateTime) {
this.dateTime = dateTime;
}
}


/* Implement Throwable Queue Item */

Expand Down