Skip to content

Commit e9df8d2

Browse files
authored
Introduce File Extension Filtering Logic for GCP Storage and AWS S3 Connectors (#64)
This PR introduces new filtering logic to the GCP Storage Source and AWS S3 Source connectors, allowing users to include or exclude files based on their extensions during the source file search process. This enhancement provides finer control over which files are processed, improving the flexibility and efficiency of data ingestion. GCP Storage Source Connector New Properties: connect.gcpstorage.source.extension.excludes: Description: A comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered. Default: null (No filtering is enabled by default; all files are considered) connect.gcpstorage.source.extension.includes: Description: A comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered. Default: null (All extensions are included by default) AWS S3 Source Connector New Properties: connect.s3.source.extension.excludes: Description: A comma-separated list of file extensions to exclude from the source file search. If this property is not configured, all files are considered. Default: null (No filtering is enabled by default; all files are considered) connect.s3.source.extension.includes: Description: A comma-separated list of file extensions to include in the source file search. If this property is not configured, all files are considered. Default: null (All extensions are included by default) How It Works Include Filtering: If the source.extension.includes property is set, only files with extensions listed in this property will be considered for processing. Exclude Filtering: If the source.extension.excludes property is set, files with extensions listed in this property will be ignored during processing. Combined Use: When both properties are set, the connector will only include files that match the includes property and do not match the excludes property. Use Cases: Inclusion: Users can specify certain file types to process (e.g., .csv, .json), ensuring that only relevant files are ingested. Exclusion: Users can exclude files with extensions that should not be processed (e.g., temporary files like .tmp or backup files like .bak). * Source extension filters: part 1 * Wiring in * Addressing review comments * Making documentation more specific
1 parent e902f7f commit e9df8d2

File tree

21 files changed

+460
-43
lines changed

21 files changed

+460
-43
lines changed

kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/utils/S3ProxyContainerTest.scala

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package io.lenses.streamreactor.connect.aws.s3.utils
22
import com.typesafe.scalalogging.LazyLogging
33
import io.lenses.streamreactor.connect.aws.s3.auth.AwsS3ClientCreator
4-
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._
54
import io.lenses.streamreactor.connect.aws.s3.config.AuthMode
5+
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._
66
import io.lenses.streamreactor.connect.aws.s3.config.S3ConnectionConfig
77
import io.lenses.streamreactor.connect.aws.s3.sink.S3SinkTask
88
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfig
@@ -44,7 +44,13 @@ trait S3ProxyContainerTest
4444
override val prefix: String = "connect.s3"
4545

4646
override def createStorageInterface(client: S3Client): Either[Throwable, AwsS3StorageInterface] =
47-
Try(new AwsS3StorageInterface(connectorTaskId, client, true)).toEither
47+
Try(
48+
new AwsS3StorageInterface(connectorTaskId = connectorTaskId,
49+
s3Client = client,
50+
batchDelete = true,
51+
extensionFilter = Option.empty,
52+
),
53+
).toEither
4854

4955
override def createClient(): Either[Throwable, S3Client] = {
5056

kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@ class S3SinkTask
5050
config: S3SinkConfig,
5151
cloudClient: S3Client,
5252
): AwsS3StorageInterface =
53-
new AwsS3StorageInterface(connectorTaskId, cloudClient, config.batchDelete)
53+
new AwsS3StorageInterface(connectorTaskId = connectorTaskId,
54+
s3Client = cloudClient,
55+
batchDelete = config.batchDelete,
56+
extensionFilter = Option.empty,
57+
)
5458

5559
override def createClient(config: S3ConnectionConfig): Either[Throwable, S3Client] =
5660
AwsS3ClientCreator.make(config)

kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@ class S3SourceTask
4343
config: S3SourceConfig,
4444
s3Client: S3Client,
4545
): AwsS3StorageInterface =
46-
new AwsS3StorageInterface(connectorTaskId, s3Client, config.batchDelete)
46+
new AwsS3StorageInterface(connectorTaskId = connectorTaskId,
47+
s3Client = s3Client,
48+
batchDelete = config.batchDelete,
49+
extensionFilter = config.extensionFilter,
50+
)
4751

4852
override def createClient(config: S3SourceConfig): Either[Throwable, S3Client] =
4953
AwsS3ClientCreator.make(config.connectionConfig)

kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala

+3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
2525
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
2626
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceBucketOptions
2727
import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions
28+
import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter
2829

2930
object S3SourceConfig extends PropsToConfigConverter[S3SourceConfig] {
3031

@@ -52,6 +53,7 @@ object S3SourceConfig extends PropsToConfigConverter[S3SourceConfig] {
5253
s3ConfigDefBuilder.getCompressionCodec(),
5354
s3ConfigDefBuilder.getPartitionSearcherOptions(parsedValues),
5455
s3ConfigDefBuilder.batchDelete(),
56+
s3ConfigDefBuilder.getSourceExtensionFilter,
5557
)
5658

5759
}
@@ -64,4 +66,5 @@ case class S3SourceConfig(
6466
compressionCodec: CompressionCodec,
6567
partitionSearcher: PartitionSearcherOptions,
6668
batchDelete: Boolean,
69+
extensionFilter: Option[ExtensionFilter],
6770
) extends CloudSourceConfig[S3FileMetadata]

kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfigDef.scala

+1
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,6 @@ object S3SourceConfigDef extends S3CommonConfigDef with CloudSourceSettingsKeys
3030
addSourceOrderingSettings(settings)
3131
addSourcePartitionSearcherSettings(settings)
3232
addSourcePartitionExtractorSettings(settings)
33+
addSourceFilteringSettings(settings)
3334
}
3435
}

kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3StorageInterface.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
2020
import io.lenses.streamreactor.connect.cloud.common.config.ObjectMetadata
2121
import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile
2222
import io.lenses.streamreactor.connect.cloud.common.model.UploadableString
23+
import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter
2324
import io.lenses.streamreactor.connect.cloud.common.storage.FileCreateError
2425
import io.lenses.streamreactor.connect.cloud.common.storage.FileDeleteError
2526
import io.lenses.streamreactor.connect.cloud.common.storage.FileListError
@@ -49,6 +50,7 @@ class AwsS3StorageInterface(
4950
connectorTaskId: ConnectorTaskId,
5051
s3Client: S3Client,
5152
batchDelete: Boolean,
53+
extensionFilter: Option[ExtensionFilter],
5254
) extends StorageInterface[S3FileMetadata]
5355
with LazyLogging {
5456

@@ -74,6 +76,7 @@ class AwsS3StorageInterface(
7476
.asScala
7577
.filterNot(AwsS3StorageFilter.filterOut)
7678
.map(o => S3FileMetadata(o.key(), o.lastModified()))
79+
.filter(md => extensionFilter.forall(_.filter(md)))
7780

7881
processAsKey(
7982
bucket,
@@ -121,7 +124,7 @@ class AwsS3StorageInterface(
121124
pagReq.iterator().asScala.flatMap(
122125
_.contents().asScala.filterNot(AwsS3StorageFilter.filterOut).toSeq.map(o =>
123126
S3FileMetadata(o.key(), o.lastModified()),
124-
),
127+
).filter(md => extensionFilter.forall(_.filter(md))),
125128
).toSeq,
126129
)
127130
}.toEither.leftMap {

kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class S3ConfigSettingsTest extends AnyFlatSpec with Matchers with LazyLogging {
3535
val configKeys =
3636
S3SinkConfigDef.config.configKeys().keySet().asScala ++ S3SourceConfigDef.config.configKeys().keySet().asScala
3737

38-
configKeys.size shouldBe 51
38+
configKeys.size shouldBe 53
3939
configKeys.foreach {
4040
k => k.toLowerCase should be(k)
4141
}

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/traits/CloudConfig.scala

+12
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkBucketO
2222
import io.lenses.streamreactor.connect.cloud.common.sink.config.IndexOptions
2323
import io.lenses.streamreactor.connect.cloud.common.source.config.CloudSourceBucketOptions
2424
import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions
25+
import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter
2526
import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata
2627

2728
/**
@@ -99,4 +100,15 @@ trait CloudSourceConfig[MD <: FileMetadata] extends CloudConfig {
99100
* @return The partition searcher options for the cloud source.
100101
*/
101102
def partitionSearcher: PartitionSearcherOptions
103+
104+
/**
105+
* Retrieves the extension filter for the cloud source, if configured.
106+
*
107+
* The extension filter is used to include or exclude files
108+
* based on their extensions when reading from the cloud source.
109+
*
110+
* @return Option containing the extension filter for the cloud source.
111+
*/
112+
def extensionFilter: Option[ExtensionFilter]
113+
102114
}

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettings.scala

+30
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import io.lenses.streamreactor.common.config.base.traits.BaseSettings
1919
import io.lenses.streamreactor.connect.cloud.common.config.ConfigParse
2020
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEntry
2121
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum
22+
import io.lenses.streamreactor.connect.cloud.common.storage.ExtensionFilter
2223
import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata
2324
import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties
2425

@@ -52,4 +53,33 @@ trait CloudSourceSettings extends BaseSettings with CloudSourceSettingsKeys {
5253
wildcardExcludes = getString(PARTITION_SEARCH_INDEX_EXCLUDES).split(',').toSet[String].map(_.trim),
5354
)
5455

56+
/**
57+
* Retrieves the extension filter for the source.
58+
*
59+
* The extension filter is used to include or exclude files
60+
* based on their extensions when reading from the source.
61+
*
62+
* @return The extension filter for the source.
63+
*/
64+
def getSourceExtensionFilter: Option[ExtensionFilter] = {
65+
66+
val includes = extractSetFromProperty(SOURCE_EXTENSION_INCLUDES)
67+
val excludes = extractSetFromProperty(SOURCE_EXTENSION_EXCLUDES)
68+
Option.when(includes.nonEmpty || excludes.nonEmpty)(new ExtensionFilter(includes.getOrElse(Set.empty),
69+
excludes.getOrElse(Set.empty),
70+
))
71+
}
72+
73+
/**
74+
* Extracts the property value from the configuration and transforms it into a set of strings.
75+
*
76+
* Each string in the set represents a file extension. If the extension does not start with a dot, one is added.
77+
*
78+
* @param propertyName The name of the property to extract.
79+
* @return An Option containing a set of strings if the property exists, None otherwise.
80+
*/
81+
private def extractSetFromProperty(propertyName: String): Option[Set[String]] =
82+
Option(getString(propertyName)).map(_.split(",").map(_.toLowerCase).map(s =>
83+
if (s.startsWith(".")) s else s".$s",
84+
).toSet)
5585
}

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/CloudSourceSettingsKeys.scala

+49-6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,49 @@ trait CloudSourceSettingsKeys extends WithConnectorPrefix {
3939
"Comma-separated list of directory prefixes to exclude from the partition search"
4040
private val PARTITION_SEARCH_INDEX_EXCLUDES_DEFAULT: String = ".indexes"
4141

42+
protected val SOURCE_EXTENSION_EXCLUDES: String = s"$connectorPrefix.source.extension.excludes"
43+
private val SOURCE_EXTENSION_EXCLUDES_DOC: String =
44+
"Comma-separated list of file extensions to exclude from the source file search. If not configured, no files will be excluded. When used in conjunction with 'source.extension.includes', files must match the includes list and not match the excludes list to be considered."
45+
private val SOURCE_EXTENSION_EXCLUDES_DEFAULT: String = null
46+
47+
protected val SOURCE_EXTENSION_INCLUDES: String = s"$connectorPrefix.source.extension.includes"
48+
private val SOURCE_EXTENSION_INCLUDES_DOC: String =
49+
"Comma-separated list of file extensions to include in the source file search. If not configured, all files are considered. When used in conjunction with 'source.extension.excludes', files must match the includes list and not match the excludes list to be considered."
50+
private val SOURCE_EXTENSION_INCLUDES_DEFAULT: String = null
51+
52+
/**
53+
* Adds source filtering settings to the provided ConfigDef.
54+
*
55+
* The settings include the file extensions to include and exclude when searching for source files.
56+
*
57+
* @param configDef The ConfigDef to which the settings are added.
58+
* @return The ConfigDef with the added settings.
59+
*/
60+
def addSourceFilteringSettings(configDef: ConfigDef): ConfigDef =
61+
configDef
62+
.define(
63+
SOURCE_EXTENSION_EXCLUDES,
64+
Type.STRING,
65+
SOURCE_EXTENSION_EXCLUDES_DEFAULT,
66+
Importance.LOW,
67+
SOURCE_EXTENSION_EXCLUDES_DOC,
68+
"Source Filtering",
69+
2,
70+
ConfigDef.Width.LONG,
71+
SOURCE_EXTENSION_EXCLUDES,
72+
)
73+
.define(
74+
SOURCE_EXTENSION_INCLUDES,
75+
Type.STRING,
76+
SOURCE_EXTENSION_INCLUDES_DEFAULT,
77+
Importance.LOW,
78+
SOURCE_EXTENSION_INCLUDES_DOC,
79+
"Source Filtering",
80+
1,
81+
ConfigDef.Width.LONG,
82+
SOURCE_EXTENSION_INCLUDES,
83+
)
84+
4285
def addSourceOrderingSettings(configDef: ConfigDef): ConfigDef =
4386
configDef
4487
.define(
@@ -100,15 +143,15 @@ trait CloudSourceSettingsKeys extends WithConnectorPrefix {
100143
)
101144

102145
val SOURCE_PARTITION_EXTRACTOR_TYPE = s"$connectorPrefix.source.partition.extractor.type"
103-
val SOURCE_PARTITION_EXTRACTOR_TYPE_DOC =
146+
private val SOURCE_PARTITION_EXTRACTOR_TYPE_DOC =
104147
"If you want to read to specific partitions when running the source. Options are 'hierarchical' (to match the sink's hierarchical file storage pattern) and 'regex' (supply a custom regex). Any other value will ignore original partitions and they should be evenly distributed through available partitions (Kafka dependent)."
105148

106-
val SOURCE_PARTITION_EXTRACTOR_REGEX = s"$connectorPrefix.source.partition.extractor.regex"
107-
val SOURCE_PARTITION_EXTRACTOR_REGEX_DOC = "If reading filename from regex, supply the regex here."
149+
val SOURCE_PARTITION_EXTRACTOR_REGEX = s"$connectorPrefix.source.partition.extractor.regex"
150+
private val SOURCE_PARTITION_EXTRACTOR_REGEX_DOC = "If reading filename from regex, supply the regex here."
108151

109-
val SOURCE_ORDERING_TYPE: String = s"$connectorPrefix.ordering.type"
110-
val SOURCE_ORDERING_TYPE_DOC: String = "AlphaNumeric (the default)"
111-
val SOURCE_ORDERING_TYPE_DEFAULT: String = "AlphaNumeric"
152+
val SOURCE_ORDERING_TYPE: String = s"$connectorPrefix.ordering.type"
153+
private val SOURCE_ORDERING_TYPE_DOC: String = "AlphaNumeric (the default)"
154+
private val SOURCE_ORDERING_TYPE_DEFAULT: String = "AlphaNumeric"
112155

113156
def addSourcePartitionExtractorSettings(configDef: ConfigDef): ConfigDef = configDef.define(
114157
SOURCE_PARTITION_EXTRACTOR_TYPE,

kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/S3SourceBucketOptions.scala

-15
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2017-2024 Lenses.io Ltd
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.lenses.streamreactor.connect.cloud.common.storage
17+
18+
/**
19+
* A class used to filter files based on their extensions.
20+
* It allows to include or exclude files with certain extensions.
21+
*
22+
* @constructor create a new ExtensionFilter with allowed and excluded extensions.
23+
* @param allowedExtensions the set of extensions that are allowed.
24+
* @param excludedExtensions the set of extensions that are excluded.
25+
*/
26+
class ExtensionFilter(
27+
val allowedExtensions: Set[String],
28+
val excludedExtensions: Set[String],
29+
) {
30+
31+
/**
32+
* Filters the metadata of a file based on its extension.
33+
*
34+
* @param metadata the metadata of the file to be filtered.
35+
* @return true if the file passes the filter, false otherwise.
36+
*/
37+
def filter[MD <: FileMetadata](metadata: MD): Boolean =
38+
ExtensionFilter.performFilterLogic(metadata.file.toLowerCase, allowedExtensions, excludedExtensions)
39+
40+
/**
41+
* Filters a file based on its name.
42+
*
43+
* @param fileName the name of the file to be filtered.
44+
* @return true if the file passes the filter, false otherwise.
45+
*/
46+
def filter(fileName: String): Boolean =
47+
ExtensionFilter.performFilterLogic(fileName.toLowerCase, allowedExtensions, excludedExtensions)
48+
49+
}
50+
51+
object ExtensionFilter {
52+
53+
def performFilterLogic(
54+
fileName: String,
55+
allowedExtensions: Set[String],
56+
excludedExtensions: Set[String],
57+
): Boolean = {
58+
val allowedContainsEx = allowedExtensions.exists(ext => fileName.endsWith(ext))
59+
val excludedNotContainsEx = excludedExtensions.forall(ext => !fileName.endsWith(ext))
60+
(allowedExtensions.isEmpty || allowedContainsEx) && excludedNotContainsEx
61+
}
62+
63+
}

0 commit comments

Comments
 (0)