Skip to content

Commit e09ab56

Browse files
authored
Revert "Avoid filtering by lastSeenFile where a post process action is configured (#192)" (#210)
This reverts commit 771ed92
1 parent 8651362 commit e09ab56

File tree

3 files changed

+16
-83
lines changed

3 files changed

+16
-83
lines changed

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/files/CloudSourceFileQueue.scala

+10-28
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
* limitations under the License.
1515
*/
1616
package io.lenses.streamreactor.connect.cloud.common.source.files
17+
1718
import cats.implicits.catsSyntaxEitherId
1819
import cats.implicits.catsSyntaxOptionId
1920
import cats.implicits.toShow
2021
import com.typesafe.scalalogging.LazyLogging
2122
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
2223
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
2324
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
24-
import io.lenses.streamreactor.connect.cloud.common.source.config.PostProcessAction
2525
import io.lenses.streamreactor.connect.cloud.common.storage.FileListError
2626
import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata
2727
import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse
@@ -32,14 +32,6 @@ trait SourceFileQueue {
3232
def next(): Either[FileListError, Option[CloudLocation]]
3333
}
3434

35-
/**
36-
* A tracker for the last seen file.
37-
*
38-
* @param lastSeenFile An optional metadata object representing the last seen file.
39-
* @tparam SM The type of the file metadata.
40-
*/
41-
case class LastSeenFileTracker[SM <: FileMetadata](var lastSeenFile: Option[SM])
42-
4335
/**
4436
* Blocking processor for queues of operations. Used to ensure consistency.
4537
* Will block any further writes by the current file until the remote has caught up.
@@ -57,7 +49,7 @@ class CloudSourceFileQueue[SM <: FileMetadata] private (
5749
* files will be cleaned up after processing, removing the need to track the last seen file.
5850
* In such cases, this parameter will be None.
5951
*/
60-
private var lastSeenFileTracker: Option[LastSeenFileTracker[SM]],
52+
private var lastSeenFile: Option[SM],
6153
)(
6254
implicit
6355
cloudLocationValidator: CloudLocationValidator,
@@ -70,7 +62,7 @@ class CloudSourceFileQueue[SM <: FileMetadata] private (
7062
)(
7163
implicit
7264
cloudLocationValidator: CloudLocationValidator,
73-
) = this(taskId, batchListerFn, Seq.empty, Some(LastSeenFileTracker[SM](None)))
65+
) = this(taskId, batchListerFn, Seq.empty, None)
7466

7567
override def next(): Either[FileListError, Option[CloudLocation]] =
7668
files match {
@@ -90,11 +82,10 @@ class CloudSourceFileQueue[SM <: FileMetadata] private (
9082

9183
private def retrieveNextFile(
9284
): Either[FileListError, Option[CloudLocation]] = {
93-
val nextBatch: Either[FileListError, Option[ListResponse[String, SM]]] =
94-
batchListerFn(lastSeenFileTracker.flatMap(_.lastSeenFile))
85+
val nextBatch: Either[FileListError, Option[ListResponse[String, SM]]] = batchListerFn(lastSeenFile)
9586
nextBatch.flatMap {
9687
case Some(ListOfKeysResponse(bucket, prefix, value, meta)) =>
97-
lastSeenFileTracker = lastSeenFileTracker.map(_.copy(lastSeenFile = meta.some))
88+
lastSeenFile = meta.some
9889
files = value.map(path =>
9990
CloudLocation(
10091
bucket,
@@ -116,11 +107,10 @@ class CloudSourceFileQueue[SM <: FileMetadata] private (
116107

117108
object CloudSourceFileQueue {
118109
def from[SM <: FileMetadata](
119-
batchListerFn: Option[SM] => Either[FileListError, Option[ListOfKeysResponse[SM]]],
120-
storageInterface: StorageInterface[SM],
121-
startingFile: CloudLocation,
122-
taskId: ConnectorTaskId,
123-
maybePostProcessAction: Option[PostProcessAction],
110+
batchListerFn: Option[SM] => Either[FileListError, Option[ListOfKeysResponse[SM]]],
111+
storageInterface: StorageInterface[SM],
112+
startingFile: CloudLocation,
113+
taskId: ConnectorTaskId,
124114
)(
125115
implicit
126116
cloudLocationValidator: CloudLocationValidator,
@@ -133,15 +123,7 @@ object CloudSourceFileQueue {
133123
case _ =>
134124
Option.empty[SM]
135125
}
136-
new CloudSourceFileQueue[SM](
137-
taskId,
138-
batchListerFn,
139-
Seq(startingFile),
140-
// Creates an instance of LastSeenFileTracker if no PostProcessAction is set
141-
// If PostProcessAction is set then files will be cleaned up after processing,
142-
// which removes the requirement to seek through to the last seen file.
143-
Option.when(maybePostProcessAction.isEmpty)(LastSeenFileTracker[SM](lastSeen)),
144-
)
126+
new CloudSourceFileQueue(taskId, batchListerFn, Seq(startingFile), lastSeen)
145127
}
146128

147129
}

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/state/ReaderManagerBuilder.scala

-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ object ReaderManagerBuilder extends LazyLogging {
7272
storageInterface,
7373
location,
7474
connectorTaskId,
75-
adaptedSbo.postProcessAction,
7675
)
7776
}
7877
sourceFileQueue <- IO.fromEither(

kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/source/files/CloudSourceFileQueueTest.scala

+6-54
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@ import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
2222
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
2323
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
2424
import io.lenses.streamreactor.connect.cloud.common.sink.seek.TestFileMetadata
25-
import io.lenses.streamreactor.connect.cloud.common.source.config.DeletePostProcessAction
2625
import io.lenses.streamreactor.connect.cloud.common.storage.FileListError
2726
import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse
2827
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
2928
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData
3029
import org.mockito.ArgumentMatchers._
31-
import org.mockito.InOrder
3230
import org.mockito.MockitoSugar
3331
import org.scalatest.BeforeAndAfter
3432
import org.scalatest.flatspec.AnyFlatSpec
@@ -85,7 +83,12 @@ class CloudSourceFileQueueTest extends AnyFlatSpec with Matchers with MockitoSug
8583
any[Option[TestFileMetadata]],
8684
)
8785
// file 0 = 0.json
88-
verifyRefreshAndRollover(batchListerFn, sourceFileQueue, order)
86+
sourceFileQueue.next() should be(Right(Some(fileLocs(0).atLine(-1).withTimestamp(lastModified))))
87+
order.verify(batchListerFn)(none)
88+
89+
// file 1 = 1.json
90+
sourceFileQueue.next() should be(Right(Some(fileLocs(1).atLine(-1).withTimestamp(lastModified))))
91+
order.verifyNoMoreInteractions()
8992

9093
// file 2 = 2.json
9194
sourceFileQueue.next() should be(Right(Some(fileLocs(2).atLine(-1).withTimestamp(lastModified))))
@@ -124,7 +127,6 @@ class CloudSourceFileQueueTest extends AnyFlatSpec with Matchers with MockitoSug
124127
mockStorageIface,
125128
fileLocs(2).atLine(1000).withTimestamp(lastModified),
126129
taskId,
127-
Option.empty,
128130
)
129131

130132
val order = inOrder(batchListerFn)
@@ -173,54 +175,4 @@ class CloudSourceFileQueueTest extends AnyFlatSpec with Matchers with MockitoSug
173175
sourceFileQueue.next() shouldBe expected
174176
}
175177

176-
"list" should "not pass lastSeenFile to batchListerFn when PostProcessAction is set" in {
177-
178-
val batchListerFn =
179-
mock[Option[TestFileMetadata] => Either[FileListError, Option[ListOfKeysResponse[TestFileMetadata]]]]
180-
181-
val mockStorageIface = mock[StorageInterface[TestFileMetadata]]
182-
when(
183-
mockStorageIface.seekToFile(
184-
anyString(),
185-
anyString(),
186-
any[Option[Instant]],
187-
),
188-
).thenAnswer((_: String, file: String, _: Option[Instant]) => TestFileMetadata(file, lastModified).some)
189-
190-
val sourceFileQueue =
191-
CloudSourceFileQueue.from(
192-
batchListerFn,
193-
mockStorageIface,
194-
fileLocs(1).atLine(1000).withTimestamp(lastModified),
195-
taskId,
196-
Some(new DeletePostProcessAction(true)),
197-
)
198-
199-
doAnswer(x => listBatch(x)).when(batchListerFn)(
200-
any[Option[TestFileMetadata]],
201-
)
202-
203-
val order = inOrder(batchListerFn)
204-
205-
// we are starting from a previously read file, so we use fileLocs(1) - this doesn't cause a read to the storage layer to discover next file
206-
sourceFileQueue.next() should be(Right(Some(fileLocs(1).atLine(1000).withTimestamp(lastModified))))
207-
order.verifyNoMoreInteractions()
208-
209-
verifyRefreshAndRollover(batchListerFn, sourceFileQueue, order)
210-
verifyRefreshAndRollover(batchListerFn, sourceFileQueue, order)
211-
verifyRefreshAndRollover(batchListerFn, sourceFileQueue, order)
212-
213-
}
214-
215-
private def verifyRefreshAndRollover(
216-
batchListerFn: Option[TestFileMetadata] => Either[FileListError, Option[ListOfKeysResponse[TestFileMetadata]]],
217-
sourceFileQueue: CloudSourceFileQueue[TestFileMetadata],
218-
order: InOrder,
219-
): Unit = {
220-
// REFRESH
221-
sourceFileQueue.next() should be(Right(Some(fileLocs(0).atLine(-1).withTimestamp(lastModified))))
222-
order.verify(batchListerFn)(none)
223-
sourceFileQueue.next() should be(Right(Some(fileLocs(1).atLine(-1).withTimestamp(lastModified))))
224-
order.verifyNoMoreInteractions()
225-
}
226178
}

0 commit comments

Comments
 (0)