diff --git a/src/main/java/com/splunk/hecclient/Event.java b/src/main/java/com/splunk/hecclient/Event.java index 0a21ebcd..c44a2e69 100644 --- a/src/main/java/com/splunk/hecclient/Event.java +++ b/src/main/java/com/splunk/hecclient/Event.java @@ -240,15 +240,15 @@ public final Object getTied() { return tied; } - public Event addFields(final Map fields) { + public Event addFields(final Map fields) { return this; } - public Event setFields(final Map fields) { + public Event setFields(final Map fields) { return this; } - public Map getFields() { + public Map getFields() { return null; } diff --git a/src/main/java/com/splunk/hecclient/EventBatch.java b/src/main/java/com/splunk/hecclient/EventBatch.java index 352108f6..5319f6af 100644 --- a/src/main/java/com/splunk/hecclient/EventBatch.java +++ b/src/main/java/com/splunk/hecclient/EventBatch.java @@ -47,7 +47,7 @@ public abstract class EventBatch { public abstract void add(Event event); public abstract EventBatch createFromThis(); - public final void addExtraFields(final Map fields) { + public final void addExtraFields(final Map fields) { // recalculate the batch length since we inject more meta data to each event int newLength = 0; for (final Event event: events) { diff --git a/src/main/java/com/splunk/hecclient/HecChannel.java b/src/main/java/com/splunk/hecclient/HecChannel.java index c7f49805..ac78f972 100644 --- a/src/main/java/com/splunk/hecclient/HecChannel.java +++ b/src/main/java/com/splunk/hecclient/HecChannel.java @@ -22,7 +22,7 @@ final class HecChannel { private String id; - private Map chField; + private Map chField; private IndexerInf indexer; private boolean isAvailable; diff --git a/src/main/java/com/splunk/hecclient/JsonEvent.java b/src/main/java/com/splunk/hecclient/JsonEvent.java index da24c8e7..5707ad3f 100644 --- a/src/main/java/com/splunk/hecclient/JsonEvent.java +++ b/src/main/java/com/splunk/hecclient/JsonEvent.java @@ -33,7 +33,7 @@ */ @JsonInclude(JsonInclude.Include.NON_NULL) public final class JsonEvent extends Event { - private Map fields; + private Map fields; /** * Creates a new json event. @@ -67,7 +67,7 @@ public JsonEvent(Object data, Object tied) { * @since 1.0 */ @Override - public JsonEvent addFields(final Map extraFields) { + public JsonEvent addFields(final Map extraFields) { if (extraFields == null || extraFields.isEmpty()) { return this; } @@ -93,7 +93,7 @@ public JsonEvent addFields(final Map extraFields) { * @since 1.0 */ @Override - public JsonEvent setFields(final Map extraFields) { + public JsonEvent setFields(final Map extraFields) { fields = extraFields; invalidate(); return this; @@ -108,7 +108,7 @@ public JsonEvent setFields(final Map extraFields) { * @since 1.0 */ @Override - public Map getFields() { + public Map getFields() { return fields; } diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index 0859a825..3a2b28a5 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -245,7 +245,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { final String lineBreaker; final boolean useRecordTimestamp; - final Map enrichments; + final Map enrichments; final boolean trackData; final boolean hasTrustStorePath; @@ -461,13 +461,13 @@ private static String[] split(String data, String sep) { return null; } - private static Map parseEnrichments(String enrichment) { + private static Map parseEnrichments(String enrichment) { String[] kvs = split(enrichment, ","); if (kvs == null) { return null; } - Map enrichmentKvs = new HashMap<>(); + Map enrichmentKvs = new HashMap<>(); for (final String kv : kvs) { String[] kvPairs = split(kv, "="); if (kvPairs.length != 2) { diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index f238c433..abe27e5c 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -404,7 +404,7 @@ private Event createHecEventFrom(final SinkRecord record) { event.setTied(record); event.addFields(connectorConfig.enrichments); } catch(Exception e) { - log.error("event does not follow correct HEC pre-formatted format: {}", record.value().toString()); + log.error("event does not follow correct HEC pre-formatted format: {}", record.value().toString(), e); event = createHECEventNonFormatted(record); } } else { @@ -416,7 +416,7 @@ private Event createHecEventFrom(final SinkRecord record) { } if (connectorConfig.trackData) { - Map trackMetas = new HashMap<>(); + Map trackMetas = new HashMap<>(); trackMetas.put("kafka_offset", String.valueOf(record.kafkaOffset())); trackMetas.put("kafka_timestamp", String.valueOf(record.timestamp())); trackMetas.put("kafka_topic", record.topic()); @@ -459,7 +459,7 @@ private Event addHeaders(Event event, SinkRecord record) { // "custom_header_1,custom_header_2,custom_header_3" if (!connectorConfig.headerCustom.isEmpty()) { String[] customHeaders = connectorConfig.headerCustom.split(",\\s?"); - Map headerMap = new HashMap<>(); + Map headerMap = new HashMap<>(); for (String header : customHeaders) { Header customHeader = headers.lastWithName(header); if (customHeader != null) { diff --git a/src/test/java/com/splunk/hecclient/JsonEvenBatchTest.java b/src/test/java/com/splunk/hecclient/JsonEvenBatchTest.java index 29fefbf1..a70f968e 100644 --- a/src/test/java/com/splunk/hecclient/JsonEvenBatchTest.java +++ b/src/test/java/com/splunk/hecclient/JsonEvenBatchTest.java @@ -113,7 +113,7 @@ public void setterGetter() { Assert.assertEquals(1, events.size()); // Add extra fields - Map fields = new HashMap<>(); + Map fields = new HashMap<>(); fields.put("hello", "world"); batch.addExtraFields(fields); diff --git a/src/test/java/com/splunk/hecclient/JsonEventTest.java b/src/test/java/com/splunk/hecclient/JsonEventTest.java index 5c27b6c4..5e07fa6a 100644 --- a/src/test/java/com/splunk/hecclient/JsonEventTest.java +++ b/src/test/java/com/splunk/hecclient/JsonEventTest.java @@ -65,14 +65,14 @@ public void addFields() { Assert.assertNull(event.getFields()); // empty extra fields - Map fields = new HashMap<>(); + Map fields = new HashMap<>(); event.addFields(fields); Assert.assertNull(event.getFields()); // one item fields.put("ni", "hao"); event.addFields(fields); - Map fieldsGot = event.getFields(); + Map fieldsGot = event.getFields(); Assert.assertNotNull(fieldsGot); Assert.assertEquals(false, fieldsGot.isEmpty()); Assert.assertEquals(1, fieldsGot.size()); @@ -180,15 +180,15 @@ public void getterSetter() { event.setTied("hao"); Assert.assertEquals("hao", event.getTied()); - Map fields = new HashMap<>(); + Map fields = new HashMap<>(); fields.put("hello", "world"); event.setFields(fields); Assert.assertEquals(fields, event.getFields()); - Map moreFields = new HashMap<>(); + Map moreFields = new HashMap<>(); moreFields.put("ni", "hao"); event.addFields(moreFields); - Map got = event.getFields(); + Map got = event.getFields(); Assert.assertNotNull(got); Assert.assertEquals(2, got.size()); Assert.assertEquals("world", got.get("hello")); @@ -219,7 +219,7 @@ private void doSerialize(Object data, SerialAndDeserial sad) { String tied = "tied"; Event event = new JsonEvent(data, tied); - Map fields = new HashMap<>(); + Map fields = new HashMap<>(); fields.put("ni", "hao"); event.addFields(fields); event.setHost("localhost"); @@ -239,7 +239,7 @@ private void doSerialize(Object data, SerialAndDeserial sad) { Assert.assertEquals("test-sourcetype", deserialized.getSourcetype()); Assert.assertEquals(event.getTime(), deserialized.getTime()); - Map fieldsGot = deserialized.getFields(); + Map fieldsGot = deserialized.getFields(); Assert.assertEquals("hao", fieldsGot.get("ni")); } } diff --git a/src/test/java/com/splunk/hecclient/RawEventTest.java b/src/test/java/com/splunk/hecclient/RawEventTest.java index 4fd789c2..97903208 100644 --- a/src/test/java/com/splunk/hecclient/RawEventTest.java +++ b/src/test/java/com/splunk/hecclient/RawEventTest.java @@ -191,7 +191,7 @@ private void writeTo(final String lineBreaker) { @Test public void getterSetter() { Event event = new RawEvent("ni", null); - Map m = new HashMap(); + Map m = new HashMap<>(); m.put("hello", "world"); event.setFields(m); Assert.assertNull(event.getFields()); // we ignore extra fields for raw event diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index 2198fae6..f74d4583 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -21,14 +21,12 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.header.Header; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Assert; import org.junit.Test; -import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Instant; import java.util.*; @@ -267,6 +265,52 @@ public void putWithRawAndAck() { putWithSuccess(true, true); } + @Test + public void checkFormattedEvent() { + + SplunkSinkTask task = new SplunkSinkTask(); + UnitUtil uu = new UnitUtil(0); + Map config = uu.createTaskConfig(); + config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(false)); + config.put(SplunkSinkConnectorConfig.HEC_EVENT_FORMATTED_CONF, String.valueOf(true)); + + Collection record = createSinkRecords( + 1, "{" + + "\"index\":\"main\"," + + "\"event\":\"Hello, world!\"," + + "\"host\":\"host-01\"," + + "\"source\":\"bu\"," + + "\"fields\":{\"foo\":\"bar\",\"CLASS\":\"class1\",\"cust_id\":[\"000013934\",\"000013935\"]}}" + ); + + HecMock hec = new HecMock(task); + hec.setSendReturnResult(HecMock.success); + task.setHec(hec); + task.start(config); + task.put(record); + + List batches = hec.getBatches(); + for (Iterator iter = batches.listIterator(); iter.hasNext();) { + EventBatch batch = iter.next(); + List event_list = batch.getEvents(); + Iterator iterator = event_list.listIterator() ; + Event event = iterator.next(); + + Assert.assertEquals("host-01", event.getHost()); + + Assert.assertEquals("bar", event.getFields().get("foo")); + + Object custIdObject = event.getFields().get("cust_id"); + Assert.assertTrue(custIdObject instanceof List); + @SuppressWarnings("unchecked") + List custIdList = (List) custIdObject; + final List expectedCustIdList = Arrays.asList("000013934", "000013935"); + Assert.assertEquals(expectedCustIdList, custIdList); + break; + } + task.stop(); + } + @Test public void checkExtractedTimestamp() {