Skip to content

Commit

Permalink
HTTP Sink Reporter (#99)
Browse files Browse the repository at this point in the history
* Initial commit - common classes

Utility classes (for byte conversion) and Reports generating classes
finished, starting to work on ConfigDef classes

* Wrapped up ConfigDef, abstracted RecordReport, initial Sender impl

* small fixes, now to tests and integration

* Javadocs added, alloting to test ReportHolder

* Http sink reporter handling

* Unit tests for common reporting modules

* starting/closing the Controller, added some tests

* fixing type mismatches, scalafmt

* fixing unit tests

* fixing IT and Functionals hopefully, small stability

* Fixed error reporting

---------

Co-authored-by: David Sloan <[email protected]>
  • Loading branch information
GoMati-MU and davidsloan authored Sep 20, 2024
1 parent 7fc7c4e commit e39ec93
Show file tree
Hide file tree
Showing 37 changed files with 1,908 additions and 217 deletions.
17 changes: 17 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ lazy val subProjects: Seq[Project] = Seq(
`test-utils`,
`query-language`,
`java-common`,
`sink-reporting`,
`gcp-common`,
common,
`sql-common`,
Expand Down Expand Up @@ -95,6 +96,21 @@ lazy val `java-common` = (project in file("java-connectors/kafka-connect-common"
.configureAssembly(false)
.configureTests(javaCommonTestDeps)

lazy val `sink-reporting` = (project in file("java-connectors/kafka-connect-sink-reporting"))
.dependsOn(`java-common`)
.dependsOn(`test-utils` % "test->test")
.settings(
settings ++
Seq(
name := "kafka-connect-sink-reporting",
description := "Common reporting components from java",
libraryDependencies ++= javaCommonDeps,
publish / skip := true,
),
)
.configureAssembly(false)
.configureTests(javaCommonTestDeps)

lazy val `gcp-common` = (project in file("java-connectors/kafka-connect-gcp-common"))
.dependsOn(`java-common`)
.dependsOn(`test-utils` % "test->test")
Expand Down Expand Up @@ -324,6 +340,7 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7"))

lazy val http = (project in file("kafka-connect-http"))
.dependsOn(common)
.dependsOn(`sink-reporting`)
.dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.common.util;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;

/**
* Class with converters to/from bytes.
*/
public class ByteConverters {

/**
* Converts Object to byte array.
*
* @param obj {@link Object} to convert.
* @return array of bytes that represents the object or IOException if operation failed
*/
public static byte[] toBytes(Object obj) throws IOException {
ByteArrayOutputStream boas = new ByteArrayOutputStream();

try (ObjectOutputStream ois = new ObjectOutputStream(boas)) {
ois.writeObject(obj);
return boas.toByteArray();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.common.util;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doThrow;

import cyclops.control.Either;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;

class ByteConvertersTest {

@Test
void toBytesShouldNaturallyConvertStringToBytes() throws IOException {
//given
String textToConvert = "SOME_TEXT_TO_CONVERT";
byte[] fullTextObjectInBytes = getBytesIncludingHeader(textToConvert);

//when
byte[] bytesFromText = ByteConverters.toBytes(textToConvert);

//then
assertArrayEquals(fullTextObjectInBytes, bytesFromText);
}

@Test
void toBytesShouldReturnIOExceptionIfConversionFailed() throws IOException {
//given
String textToConvert = "TEXT_THAT_FAILS";
IOException badTimesException = new IOException("BAD TIMES");

//when
Either<IOException, byte[]> result;
try (MockedConstruction<ObjectOutputStream> ignored =
Mockito.mockConstruction(ObjectOutputStream.class,
(mock, context) -> doThrow(badTimesException).when(mock).writeObject(textToConvert))) {
assertThrows(IOException.class, () -> ByteConverters.toBytes(textToConvert));
}
}

private static byte[] getBytesIncludingHeader(String textToConvert) {
byte[] textInBytes = textToConvert.getBytes(StandardCharsets.UTF_8);
byte[] stringByteHeader = new byte[]{-84, -19, 0, 5, 116, 0, (byte) textToConvert.length()};
byte[] fullTextObjectInBytes = new byte[stringByteHeader.length + textInBytes.length];

for (int i = 0; i < fullTextObjectInBytes.length; i++) {
if (i < stringByteHeader.length) {
fullTextObjectInBytes[i] = stringByteHeader[i];
} else {
fullTextObjectInBytes[i] = textInBytes[i - stringByteHeader.length];
}
}

return fullTextObjectInBytes;
}
}
12 changes: 12 additions & 0 deletions java-connectors/kafka-connect-sink-reporting/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
project(':kafka-connect-sink-reporting') {


test {
maxParallelForks = 1
}

dependencies {
implementation project(':kafka-connect-common')
testImplementation(project(path: ':test-utils', configuration: 'testArtifacts'))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.reporting;

import cyclops.control.Try;
import io.lenses.streamreactor.connect.reporting.exceptions.ReportingException;
import io.lenses.streamreactor.connect.reporting.model.RecordReport;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class ReportHolder {

private static final int DEFAULT_QUEUES_SIZE = 1000;
private static final int DEFAULT_OFFER_TIME_MILLIS = 300;
private static final int DEFAULT_POLL_TIME_MILLIS = 100;
private final BlockingQueue<RecordReport> reportsToSend;

public ReportHolder(BlockingQueue<RecordReport> recordReportsQueue) {
this.reportsToSend =
Optional.ofNullable(recordReportsQueue)
.orElse(new ArrayBlockingQueue<>(DEFAULT_QUEUES_SIZE));
}

/**
* Offers Report to be queued for ReportSender to send. Since reporting is non-critical operation,
* if it fails, the connector just leaves it.
*/
public void enqueueReport(RecordReport recordReport) {
Try.withCatch(() -> reportsToSend.offer(recordReport, DEFAULT_OFFER_TIME_MILLIS, TimeUnit.MILLISECONDS));
}

/**
* Polls for report to send.
*
* @return RecordReport instance or null if the specified waiting time elapses before an element is available
*/
public RecordReport pollReport() {
try {
return reportsToSend.poll(DEFAULT_POLL_TIME_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new ReportingException("InterruptedException happened:", e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.reporting;

import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkTask;

public abstract class ReportableSinkTask extends SinkTask {

@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
reportPreviousCommitSuccesses();
reportPreviousCommitFailures();
return super.preCommit(currentOffsets);
}

protected abstract void reportPreviousCommitFailures();

protected abstract void reportPreviousCommitSuccesses();
}
Loading

0 comments on commit e39ec93

Please sign in to comment.