Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/main/java/com/splunk/hecclient/Event.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,15 @@ public final Object getTied() {
return tied;
}

public Event addFields(final Map<String, String> fields) {
public Event addFields(final Map<String, Object> fields) {
return this;
}

public Event setFields(final Map<String, String> fields) {
public Event setFields(final Map<String, Object> fields) {
return this;
}

public Map<String, String> getFields() {
public Map<String, Object> getFields() {
return null;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/splunk/hecclient/EventBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public abstract class EventBatch {
public abstract void add(Event event);
public abstract EventBatch createFromThis();

public final void addExtraFields(final Map<String, String> fields) {
public final void addExtraFields(final Map<String, Object> fields) {
// recalculate the batch length since we inject more meta data to each event
int newLength = 0;
for (final Event event: events) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/splunk/hecclient/HecChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

final class HecChannel {
private String id;
private Map<String, String> chField;
private Map<String, Object> chField;
private IndexerInf indexer;
private boolean isAvailable;

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/splunk/hecclient/JsonEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public final class JsonEvent extends Event {
private Map<String, String> fields;
private Map<String, Object> fields;

/**
* Creates a new json event.
Expand Down Expand Up @@ -67,7 +67,7 @@ public JsonEvent(Object data, Object tied) {
* @since 1.0
*/
@Override
public JsonEvent addFields(final Map<String, String> extraFields) {
public JsonEvent addFields(final Map<String, Object> extraFields) {
if (extraFields == null || extraFields.isEmpty()) {
return this;
}
Expand All @@ -93,7 +93,7 @@ public JsonEvent addFields(final Map<String, String> extraFields) {
* @since 1.0
*/
@Override
public JsonEvent setFields(final Map<String, String> extraFields) {
public JsonEvent setFields(final Map<String, Object> extraFields) {
fields = extraFields;
invalidate();
return this;
Expand All @@ -108,7 +108,7 @@ public JsonEvent setFields(final Map<String, String> extraFields) {
* @since 1.0
*/
@Override
public Map<String, String> getFields() {
public Map<String, Object> getFields() {
return fields;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {

final String lineBreaker;
final boolean useRecordTimestamp;
final Map<String, String> enrichments;
final Map<String, Object> enrichments;
final boolean trackData;

final boolean hasTrustStorePath;
Expand Down Expand Up @@ -461,13 +461,13 @@ private static String[] split(String data, String sep) {
return null;
}

private static Map<String, String> parseEnrichments(String enrichment) {
private static Map<String, Object> parseEnrichments(String enrichment) {
String[] kvs = split(enrichment, ",");
if (kvs == null) {
return null;
}

Map<String, String> enrichmentKvs = new HashMap<>();
Map<String, Object> enrichmentKvs = new HashMap<>();
for (final String kv : kvs) {
String[] kvPairs = split(kv, "=");
if (kvPairs.length != 2) {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ private Event createHecEventFrom(final SinkRecord record) {
event.setTied(record);
event.addFields(connectorConfig.enrichments);
} catch(Exception e) {
log.error("event does not follow correct HEC pre-formatted format: {}", record.value().toString());
log.error("event does not follow correct HEC pre-formatted format: {}", record.value().toString(), e);
event = createHECEventNonFormatted(record);
}
} else {
Expand All @@ -416,7 +416,7 @@ private Event createHecEventFrom(final SinkRecord record) {
}

if (connectorConfig.trackData) {
Map<String, String> trackMetas = new HashMap<>();
Map<String, Object> trackMetas = new HashMap<>();
trackMetas.put("kafka_offset", String.valueOf(record.kafkaOffset()));
trackMetas.put("kafka_timestamp", String.valueOf(record.timestamp()));
trackMetas.put("kafka_topic", record.topic());
Expand Down Expand Up @@ -459,7 +459,7 @@ private Event addHeaders(Event event, SinkRecord record) {
// "custom_header_1,custom_header_2,custom_header_3"
if (!connectorConfig.headerCustom.isEmpty()) {
String[] customHeaders = connectorConfig.headerCustom.split(",\\s?");
Map<String, String> headerMap = new HashMap<>();
Map<String, Object> headerMap = new HashMap<>();
for (String header : customHeaders) {
Header customHeader = headers.lastWithName(header);
if (customHeader != null) {
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/com/splunk/hecclient/JsonEvenBatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void setterGetter() {
Assert.assertEquals(1, events.size());

// Add extra fields
Map<String, String> fields = new HashMap<>();
Map<String, Object> fields = new HashMap<>();
fields.put("hello", "world");
batch.addExtraFields(fields);

Expand Down
14 changes: 7 additions & 7 deletions src/test/java/com/splunk/hecclient/JsonEventTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ public void addFields() {
Assert.assertNull(event.getFields());

// empty extra fields
Map<String, String> fields = new HashMap<>();
Map<String, Object> fields = new HashMap<>();
event.addFields(fields);
Assert.assertNull(event.getFields());

// one item
fields.put("ni", "hao");
event.addFields(fields);
Map<String, String> fieldsGot = event.getFields();
Map<String, Object> fieldsGot = event.getFields();
Assert.assertNotNull(fieldsGot);
Assert.assertEquals(false, fieldsGot.isEmpty());
Assert.assertEquals(1, fieldsGot.size());
Expand Down Expand Up @@ -180,15 +180,15 @@ public void getterSetter() {
event.setTied("hao");
Assert.assertEquals("hao", event.getTied());

Map<String, String> fields = new HashMap<>();
Map<String, Object> fields = new HashMap<>();
fields.put("hello", "world");
event.setFields(fields);
Assert.assertEquals(fields, event.getFields());

Map<String, String> moreFields = new HashMap<>();
Map<String, Object> moreFields = new HashMap<>();
moreFields.put("ni", "hao");
event.addFields(moreFields);
Map<String, String> got = event.getFields();
Map<String, Object> got = event.getFields();
Assert.assertNotNull(got);
Assert.assertEquals(2, got.size());
Assert.assertEquals("world", got.get("hello"));
Expand Down Expand Up @@ -219,7 +219,7 @@ private void doSerialize(Object data, SerialAndDeserial sad) {
String tied = "tied";
Event event = new JsonEvent(data, tied);

Map<String, String> fields = new HashMap<>();
Map<String, Object> fields = new HashMap<>();
fields.put("ni", "hao");
event.addFields(fields);
event.setHost("localhost");
Expand All @@ -239,7 +239,7 @@ private void doSerialize(Object data, SerialAndDeserial sad) {
Assert.assertEquals("test-sourcetype", deserialized.getSourcetype());
Assert.assertEquals(event.getTime(), deserialized.getTime());

Map<String, String> fieldsGot = deserialized.getFields();
Map<String, Object> fieldsGot = deserialized.getFields();
Assert.assertEquals("hao", fieldsGot.get("ni"));
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/com/splunk/hecclient/RawEventTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private void writeTo(final String lineBreaker) {
@Test
public void getterSetter() {
Event event = new RawEvent("ni", null);
Map<String, String> m = new HashMap<String, String>();
Map<String, Object> m = new HashMap<>();
m.put("hello", "world");
event.setFields(m);
Assert.assertNull(event.getFields()); // we ignore extra fields for raw event
Expand Down
48 changes: 46 additions & 2 deletions src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Assert;
import org.junit.Test;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.*;
Expand Down Expand Up @@ -267,6 +265,52 @@ public void putWithRawAndAck() {
putWithSuccess(true, true);
}

@Test
public void checkFormattedEvent() {

SplunkSinkTask task = new SplunkSinkTask();
UnitUtil uu = new UnitUtil(0);
Map<String, String> config = uu.createTaskConfig();
config.put(SplunkSinkConnectorConfig.RAW_CONF, String.valueOf(false));
config.put(SplunkSinkConnectorConfig.HEC_EVENT_FORMATTED_CONF, String.valueOf(true));

Collection<SinkRecord> record = createSinkRecords(
1, "{" +
"\"index\":\"main\"," +
"\"event\":\"Hello, world!\"," +
"\"host\":\"host-01\"," +
"\"source\":\"bu\"," +
"\"fields\":{\"foo\":\"bar\",\"CLASS\":\"class1\",\"cust_id\":[\"000013934\",\"000013935\"]}}"
);

HecMock hec = new HecMock(task);
hec.setSendReturnResult(HecMock.success);
task.setHec(hec);
task.start(config);
task.put(record);

List<EventBatch> batches = hec.getBatches();
for (Iterator<EventBatch> iter = batches.listIterator(); iter.hasNext();) {
EventBatch batch = iter.next();
List<Event> event_list = batch.getEvents();
Iterator<Event> iterator = event_list.listIterator() ;
Event event = iterator.next();

Assert.assertEquals("host-01", event.getHost());

Assert.assertEquals("bar", event.getFields().get("foo"));

Object custIdObject = event.getFields().get("cust_id");
Assert.assertTrue(custIdObject instanceof List);
@SuppressWarnings("unchecked")
List<String> custIdList = (List<String>) custIdObject;
final List<String> expectedCustIdList = Arrays.asList("000013934", "000013935");
Assert.assertEquals(expectedCustIdList, custIdList);
break;
}
task.stop();
}

@Test
public void checkExtractedTimestamp() {

Expand Down
Loading