Skip to content

Commit

Permalink
Retain directories after post-processing (#197)
Browse files Browse the repository at this point in the history
* Retain directories after post-processing
Customers have noticed that when the Lenses S3 Source Connector deletes files on S3, then the entire path is deleted. Whilst this is actually due to the way S3 works, we can actually do something about this in the connector.

This adds a new boolean KCQL property to the S3 source:
`post.process.action.retain.dirs`
(default value: `false`)

If this is set to `true`, then upon moving/deleting files within the source post processing, then first a zero-byte object will be created to ensure that the path will still be represented on S3.
  • Loading branch information
davidsloan authored Jan 24, 2025
1 parent c3920a3 commit 7e6e823
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class AwsS3StorageInterface(
.contents()
.asScala
.filterNot(AwsS3StorageFilter.filterOut)
.filter(_.size() > 0)
.map(o => S3FileMetadata(o.key(), o.lastModified()))
.filter(md => extensionFilter.forall(_.filter(md)))

Expand Down Expand Up @@ -123,9 +124,11 @@ class AwsS3StorageInterface(
bucket,
prefix,
pagReq.iterator().asScala.flatMap(
_.contents().asScala.filterNot(AwsS3StorageFilter.filterOut).toSeq.map(o =>
S3FileMetadata(o.key(), o.lastModified()),
).filter(md => extensionFilter.forall(_.filter(md))),
_.contents().asScala.filterNot(AwsS3StorageFilter.filterOut)
.filter(_.size() > 0)
.toSeq.map(o => S3FileMetadata(o.key(), o.lastModified())).filter(md =>
extensionFilter.forall(_.filter(md)),
),
).toSeq,
)
}.toEither.leftMap {
Expand Down Expand Up @@ -325,4 +328,35 @@ class AwsS3StorageInterface(
}
}

/**
* Creates a directory in the specified S3 bucket if it does not already exist.
*
* @param bucket The name of the S3 bucket.
* @param path The path of the directory to create.
* @return Either a FileCreateError if the directory could not be created,
* or Unit if the directory was created successfully or already exists.
*/
override def createDirectoryIfNotExists(bucket: String, path: String): Either[FileCreateError, Unit] = Try {
def ensureEndsWithSlash(input: String): String =
if (input.endsWith("/")) input else input + "/"

val putObjectRequest = PutObjectRequest
.builder()
.ifNoneMatch("*")
.bucket(bucket)
.key(ensureEndsWithSlash(path))
.contentLength(0)
.build()

s3Client.putObject(putObjectRequest, RequestBody.empty())
}
.toEither
.void
// If the object already exists, the "ifNoneMatch" condition will fail, triggering this recovery clause
.recover {
case ex: S3Exception if "PreconditionFailed".equals(ex.awsErrorDetails().errorCode()) =>
()
}
.leftMap(ex => FileCreateError(ex, "empty object file"))

}
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,6 @@ class DatalakeStorageInterface(connectorTaskId: ConnectorTaskId, client: DataLak
Try(client.getFileSystemClient(oldBucket).getFileClient(oldPath).rename(newBucket, newPath)).toEither.leftMap(
FileMoveError(_, oldPath, newPath),
).void

override def createDirectoryIfNotExists(bucket: String, path: String): Either[FileCreateError, Unit] = ().asRight
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,6 @@ object PropsKeyEnum extends Enum[PropsKeyEntry] {

case object PostProcessActionPrefix extends PropsKeyEntry("post.process.action.prefix")

case object PostProcessActionRetain extends PropsKeyEntry("post.process.action.retain.dirs")

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ case class CloudLocation(

def prefixOrDefault(): String = prefix.getOrElse("")

def pathToLowestDirectory(): Option[String] = path.map(p => p.substring(0, p.lastIndexOf("/")))

private def validate(): Validated[Throwable, CloudLocation] =
cloudLocationValidator.validate(this)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2017-2025 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.source.config

import cats.implicits.catsSyntaxEitherId
import io.lenses.streamreactor.connect.cloud.common.storage.FileCreateError
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface

import scala.collection.mutable

/**
* A cache for tracking directories that have been created.
*
* @param storageInterface The storage interface used to create directories.
*/
class DirectoryCache(storageInterface: StorageInterface[_]) {

// A mutable set to keep track of created directories.
private val directoriesCreated = mutable.Set[(String, String)]()

/**
* Ensures that a directory exists in the specified bucket and path.
*
* @param bucket The bucket in which the directory should exist.
* @param path The path of the directory to check or create.
* @return Either a FileCreateError if the directory creation failed, or Unit if the directory exists or was created successfully.
*/
def ensureDirectoryExists(bucket: String, path: String): Either[FileCreateError, Unit] =
if (directoriesCreated.contains((bucket, path))) {
().asRight
} else {
storageInterface.createDirectoryIfNotExists(bucket, path) match {
case Left(value) => value.asLeft
case Right(_) =>
directoriesCreated.add((bucket, path))
().asRight
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.lenses.streamreactor.connect.cloud.common.source.config
import cats.effect.IO
import cats.implicits.catsSyntaxEitherId
import cats.implicits.toBifunctorOps
import cats.implicits.toTraverseOps
import com.typesafe.scalalogging.LazyLogging
Expand All @@ -24,6 +23,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnt
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum.PostProcessActionBucket
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum.PostProcessActionPrefix
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum.PostProcessActionRetain
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.PostProcessActionEntry
import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.PostProcessActionEnum
Expand All @@ -32,11 +32,50 @@ import io.lenses.streamreactor.connect.cloud.common.source.config.kcqlprops.Post
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties

/**
* Trait representing a post-process action to be performed on cloud storage.
*/
trait PostProcessAction {

/**
* Runs the post-process action.
*
* @param storageInterface The storage interface used to interact with the cloud storage.
* @param directoryCache The cache used to track created directories.
* @param cloudLocation The location in the cloud storage where the action should be performed.
* @return An IO effect representing the completion of the action.
*/
def run(
cloudLocation: CloudLocation,
storageInterface: StorageInterface[_],
directoryCache: DirectoryCache,
cloudLocation: CloudLocation,
): IO[Unit]

/**
* Prepares the paths needed for the post-process action.
*
* @param cloudLocation The location in the cloud storage.
* @param directoryCache The cache used to track created directories.
* @return An IO effect containing a tuple with the path and the directory path.
*/
protected def preparePaths(
retainDirs: Boolean,
cloudLocation: CloudLocation,
directoryCache: DirectoryCache,
): IO[(String, String)] =
for {
path <- IO.fromOption(cloudLocation.path)(
new IllegalArgumentException("Cannot proceed without a path, this is probably a logic error"),
)
dirPath <- IO.fromOption(cloudLocation.pathToLowestDirectory())(
new IllegalArgumentException("Cannot proceed without a path, this is probably a logic error"),
)
_ <- if (retainDirs) {
IO.fromEither(directoryCache.ensureDirectoryExists(cloudLocation.bucket, dirPath).leftMap(_.exception))
} else {
IO.unit
}
} yield (path, dirPath)
}

object PostProcessAction {
Expand All @@ -49,12 +88,16 @@ object PostProcessAction {
)
.map {
case Delete =>
new DeletePostProcessAction().asRight
for {
retainDirs: Boolean <- kcqlProperties.getBooleanOrDefault(PostProcessActionRetain, default = false)
} yield new DeletePostProcessAction(retainDirs)

case Move => {
for {
destBucket <- kcqlProperties.getString(PostProcessActionBucket)
destPrefix <- kcqlProperties.getString(PostProcessActionPrefix)
} yield MovePostProcessAction(prefix, dropEndSlash(destBucket), dropEndSlash(destPrefix))
retainDirs <- kcqlProperties.getBooleanOrDefault(PostProcessActionRetain, default = false).toOption
} yield MovePostProcessAction(retainDirs, prefix, dropEndSlash(destBucket), dropEndSlash(destPrefix))
}
.toRight(new IllegalArgumentException("A bucket and a path must be specified for moving files to."))
}
Expand All @@ -65,31 +108,37 @@ object PostProcessAction {
def dropLastCharacterIfPresent(s: String, char: Char): String = if (s.lastOption.contains(char)) s.dropRight(1) else s
}

class DeletePostProcessAction extends PostProcessAction with LazyLogging {
class DeletePostProcessAction(retainDirs: Boolean) extends PostProcessAction with LazyLogging {

def run(
cloudLocation: CloudLocation,
storageInterface: StorageInterface[_],
directoryCache: DirectoryCache,
cloudLocation: CloudLocation,
): IO[Unit] =
for {
_ <- IO.delay(logger.debug("Running delete for {}", cloudLocation))
path <- IO.fromOption(cloudLocation.path)(
new IllegalArgumentException("Cannot delete without a path, this is probably a logic error"),
)
_ <- IO.delay(logger.debug("Running delete for {}", cloudLocation))
(path, _) <- preparePaths(retainDirs, cloudLocation, directoryCache)

del <- IO.fromEither(storageInterface.deleteFiles(cloudLocation.bucket, Seq(path)).leftMap(_.exception))
} yield del
}

case class MovePostProcessAction(originalPrefix: Option[String], newBucket: String, newPrefix: String)
extends PostProcessAction
case class MovePostProcessAction(
retainDirs: Boolean,
originalPrefix: Option[String],
newBucket: String,
newPrefix: String,
) extends PostProcessAction
with StrictLogging {

override def run(
cloudLocation: CloudLocation,
storageInterface: StorageInterface[_],
directoryCache: DirectoryCache,
cloudLocation: CloudLocation,
): IO[Unit] =
for {
path <- IO.fromOption(cloudLocation.path)(
new IllegalArgumentException("Cannot move without a path, this is probably a logic error"),
)
(path, _) <- preparePaths(retainDirs, cloudLocation, directoryCache)

newPath = originalPrefix.map(o => path.replace(o, newPrefix)).getOrElse(path)
_ = logger.info(s"Moving file from ${cloudLocation.bucket}/$path to $newBucket/$newPath newPrefix: $newPrefix")
mov <- IO.fromEither(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ object CloudSourcePropsSchema {
PostProcessAction -> EnumPropsSchema(PostProcessActionEnum),
PostProcessActionBucket -> StringPropsSchema,
PostProcessActionPrefix -> StringPropsSchema,
PostProcessActionRetain -> BooleanPropsSchema,
)

val schema = KcqlPropsSchema(PropsKeyEnum, keys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.source.CommitWatermark
import io.lenses.streamreactor.connect.cloud.common.source.config.DirectoryCache
import io.lenses.streamreactor.connect.cloud.common.source.config.PostProcessAction
import io.lenses.streamreactor.connect.cloud.common.source.files.SourceFileQueue
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
Expand All @@ -45,6 +46,8 @@ class ReaderManager(
maybePostProcessAction: Option[PostProcessAction],
) extends LazyLogging {

val directoryCache = new DirectoryCache(storageInterface)

def poll(): IO[Vector[SourceRecord]] = {
def fromNexFile(pollResults: Vector[SourceRecord], allLimit: Int): IO[Vector[SourceRecord]] =
for {
Expand Down Expand Up @@ -135,7 +138,7 @@ class ReaderManager(
maybePostProcessAction match {
case Some(action) =>
logger.info("PostProcess for {}", commitWatermark)
action.run(commitWatermark.cloudLocation, storageInterface)
action.run(storageInterface, directoryCache, commitWatermark.cloudLocation)
case None =>
logger.info("No PostProcess for {}", commitWatermark)
IO.unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,13 @@ trait StorageInterface[SM <: FileMetadata] extends ResultProcessors {
def deleteFiles(bucket: String, files: Seq[String]): Either[FileDeleteError, Unit]

def mvFile(oldBucket: String, oldPath: String, newBucket: String, newPath: String): Either[FileMoveError, Unit]

/**
* Creates a directory if it does not already exist.
*
* @param bucket The name of the bucket where the directory should be created.
* @param path The path of the directory to create.
* @return Either a FileCreateError if the directory creation failed, or Unit if the directory was created successfully or already exists.
*/
def createDirectoryIfNotExists(bucket: String, path: String): Either[FileCreateError, Unit]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2017-2025 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.source.config

import io.lenses.streamreactor.connect.cloud.common.storage.FileCreateError
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import org.mockito.MockitoSugar
import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatest.matchers.should.Matchers

class DirectoryCacheTest extends AnyFunSuiteLike with Matchers with MockitoSugar {

test("ensureDirectoryExists should return Right(Unit) and add the directory to the cache if it does not exist") {
val storageInterface = mock[StorageInterface[_]]
when(storageInterface.createDirectoryIfNotExists("bucket", "path")).thenReturn(Right(()))
val cache = new DirectoryCache(storageInterface)
cache.ensureDirectoryExists("bucket", "path") should be(Right(()))
cache.ensureDirectoryExists("bucket", "path") should be(Right(()))
verify(storageInterface, times(1)).createDirectoryIfNotExists("bucket", "path")
}

test(
"ensureDirectoryExists should return Left(FileCreateError) and not add to the cache if creating the directory fails",
) {
val storageInterface = mock[StorageInterface[_]]
val error = FileCreateError(new IllegalStateException("Bad"), "data")
when(storageInterface.createDirectoryIfNotExists("bucket", "path")).thenReturn(Left(error))
val cache = new DirectoryCache(storageInterface)
cache.ensureDirectoryExists("bucket", "path") should be(Left(error))
cache.ensureDirectoryExists("bucket", "path") should be(Left(error))
verify(storageInterface, times(2)).createDirectoryIfNotExists("bucket", "path")
}

test("ensureDirectoryExists should not add the directory to the cache if it already exists") {
val storageInterface = mock[StorageInterface[_]]
when(storageInterface.createDirectoryIfNotExists("bucket", "path")).thenReturn(Right(()))
val cache = new DirectoryCache(storageInterface)
cache.ensureDirectoryExists("bucket", "path") should be(Right(()))
cache.ensureDirectoryExists("bucket", "path") should be(Right(()))
verify(storageInterface, times(1)).createDirectoryIfNotExists("bucket", "path")
cache.ensureDirectoryExists("bucket", "path") should be(Right(()))
verify(storageInterface, times(1)).createDirectoryIfNotExists("bucket", "path")
}
}
Loading

0 comments on commit 7e6e823

Please sign in to comment.