|
36 | 36 | import org.slf4j.LoggerFactory; |
37 | 37 | import org.testcontainers.containers.KafkaContainer; |
38 | 38 | import org.testcontainers.containers.Network; |
| 39 | +import org.testcontainers.images.builder.Transferable; |
39 | 40 | import org.testcontainers.lifecycle.Startables; |
40 | 41 | import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; |
41 | 42 | import org.testcontainers.utility.DockerImageName; |
|
47 | 48 | import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; |
48 | 49 | import com.microsoft.azure.kusto.data.exceptions.DataClientException; |
49 | 50 | import com.microsoft.azure.kusto.data.exceptions.DataServiceException; |
| 51 | +import com.microsoft.azure.kusto.kafka.connect.sink.Utils; |
50 | 52 | import com.microsoft.azure.kusto.kafka.connect.sink.Version; |
51 | 53 | import com.microsoft.azure.kusto.kafka.connect.sink.it.containers.KustoKafkaConnectContainer; |
52 | 54 | import com.microsoft.azure.kusto.kafka.connect.sink.it.containers.ProxyContainer; |
@@ -103,14 +105,16 @@ public static void startContainers() throws Exception { |
103 | 105 | refreshDm(); |
104 | 106 | // Mount the libs |
105 | 107 | String mountPath = String.format( |
106 | | - "target/components/packages/microsoftcorporation-kafka-sink-azure-kusto-%s/microsoftcorporation-kafka-sink-azure-kusto-%s/lib", |
107 | | - Version.getVersion(), Version.getVersion()); |
108 | | - log.info("Creating connector jar with version {} and mounting it from {},", Version.getVersion(), mountPath); |
109 | | - connectContainer.withFileSystemBind(mountPath, "/kafka/connect/kafka-sink-azure-kusto"); |
| 108 | + "target/kafka-sink-azure-kusto-%s-jar-with-dependencies.jar", |
| 109 | + Version.getVersion()); |
| 110 | + log.info("Creating connector jar with version {} and mounting it from {}", Version.getVersion(), mountPath); |
| 111 | + Transferable source = MountableFile.forHostPath(mountPath); |
| 112 | + connectContainer.withCopyToContainer(source, Utils.getConnectPath()); |
110 | 113 | Startables.deepStart(Stream.of(kafkaContainer, schemaRegistryContainer, proxyContainer, connectContainer)).join(); |
111 | 114 | log.info("Started containers , copying scripts to container and executing them"); |
112 | 115 | connectContainer.withCopyToContainer(MountableFile.forClasspathResource("download-libs.sh", 744), // rwx--r--r-- |
113 | | - "/kafka/connect/kafka-sink-azure-kusto/download-libs.sh").execInContainer("sh", "/kafka/connect/kafka-sink-azure-kusto/download-libs.sh"); |
| 116 | + String.format("%s/download-libs.sh", Utils.getConnectPath())) |
| 117 | + .execInContainer("sh", String.format("%s/download-libs.sh", Utils.getConnectPath())); |
114 | 118 | // Logs of start up of the container gets published here. This will be handy in case we want to look at startup failures |
115 | 119 | log.debug(connectContainer.getLogs()); |
116 | 120 | } else { |
@@ -206,8 +210,9 @@ public void shouldHandleAllTypesOfEvents() { |
206 | 210 | log.info("Connector state for {} : {}. ", dataFormat, |
207 | 211 | connectContainer.getConnectorTaskState(String.format("adx-connector-%s", dataFormat), 0)); |
208 | 212 | try { |
| 213 | + Thread.sleep(10_000); |
209 | 214 | produceKafkaMessages(dataFormat); |
210 | | - Thread.sleep(10000); |
| 215 | + Thread.sleep(5_000); |
211 | 216 | } catch (IOException | InterruptedException e) { |
212 | 217 | throw new RuntimeException(e); |
213 | 218 | } |
|
0 commit comments