Skip to content

Commit e4aba0d

Browse files
mroiter-larusconker84
andauthoredNov 22, 2022
Issue #542: Add a strategy to merge edge nodes properties when creating relationships using Relationship pattern strategy (#544)
* Issue #542: Add a strategy to merge edge nodes properties when creating relationships using Relationship pattern strategy * some refactoring Co-authored-by: Andrea Santurbano <[email protected]>
1 parent 571cdf3 commit e4aba0d

File tree

15 files changed

+166
-130
lines changed

15 files changed

+166
-130
lines changed
 

‎.gitignore

-1
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,3 @@ Thumbs.db
3131
bin
3232
doc/node
3333
doc/node_modules
34-

‎common/src/main/kotlin/streams/service/sink/strategy/NodePatternIngestionStrategy.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class NodePatternIngestionStrategy(private val nodePatternConfiguration: NodePat
1515
|MERGE (n${getLabelsAsString(nodePatternConfiguration.labels)}{${
1616
getNodeMergeKeys("keys", nodePatternConfiguration.keys)
1717
}})
18-
|SET n = event.properties
18+
|SET n ${if (nodePatternConfiguration.mergeProperties) "+" else ""}= event.properties
1919
|SET n += event.keys
2020
""".trimMargin()
2121

‎common/src/main/kotlin/streams/service/sink/strategy/PatternConfiguration.kt

+12-11
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ private fun cleanProperties(type: PatternConfigurationType, properties: List<Str
4242
interface PatternConfiguration
4343

4444
data class NodePatternConfiguration(val keys: Set<String>, val type: PatternConfigurationType,
45-
val labels: List<String>, val properties: List<String>): PatternConfiguration {
45+
val labels: List<String>, val properties: List<String>, val mergeProperties: Boolean): PatternConfiguration {
4646
companion object {
4747

4848
// (:LabelA{!id,foo,bar})
4949
@JvmStatic private val cypherNodePatternConfigured = """\((:\w+\s*(?::\s*(?:\w+)\s*)*)\s*(?:\{\s*(-?[\w!\.]+\s*(?:,\s*-?[!\w\*\.]+\s*)*)\})?\)$""".toRegex()
5050
// LabelA{!id,foo,bar}
5151
@JvmStatic private val simpleNodePatternConfigured = """^(\w+\s*(?::\s*(?:\w+)\s*)*)\s*(?:\{\s*(-?[\w!\.]+\s*(?:,\s*-?[!\w\*\.]+\s*)*)\})?$""".toRegex()
52-
fun parse(pattern: String): NodePatternConfiguration {
52+
fun parse(pattern: String, mergeProperties: Boolean): NodePatternConfiguration {
5353
val isCypherPattern = pattern.startsWith("(")
5454
val regex = if (isCypherPattern) cypherNodePatternConfigured else simpleNodePatternConfigured
5555
val matcher = regex.matchEntire(pattern)
@@ -75,25 +75,26 @@ data class NodePatternConfiguration(val keys: Set<String>, val type: PatternConf
7575
val cleanedProperties = cleanProperties(type, properties)
7676

7777
return NodePatternConfiguration(keys = keys, type = type,
78-
labels = labels, properties = cleanedProperties)
78+
labels = labels, properties = cleanedProperties, mergeProperties)
7979
}
8080
}
8181
}
82+
8283
}
8384

8485

8586
data class RelationshipPatternConfiguration(val start: NodePatternConfiguration, val end: NodePatternConfiguration,
8687
val relType: String, val type: PatternConfigurationType,
87-
val properties: List<String>): PatternConfiguration {
88+
val properties: List<String>, val mergeProperties: Boolean): PatternConfiguration {
8889
companion object {
8990

9091
// we don't allow ALL for start/end nodes in rels
9192
// it's public for testing purpose
92-
fun getNodeConf(pattern: String): NodePatternConfiguration {
93-
val start = NodePatternConfiguration.parse(pattern)
93+
fun getNodeConf(pattern: String, mergeProperties: Boolean): NodePatternConfiguration {
94+
val start = NodePatternConfiguration.parse(pattern, mergeProperties)
9495
return if (start.type == PatternConfigurationType.ALL) {
9596
NodePatternConfiguration(keys = start.keys, type = PatternConfigurationType.INCLUDE,
96-
labels = start.labels, properties = start.properties)
97+
labels = start.labels, properties = start.properties, mergeProperties)
9798
} else {
9899
start
99100
}
@@ -148,7 +149,7 @@ data class RelationshipPatternConfiguration(val start: NodePatternConfiguration,
148149
}
149150
}
150151

151-
fun parse(pattern: String): RelationshipPatternConfiguration {
152+
fun parse(pattern: String, mergeNodeProps: Boolean, mergeRelProps: Boolean): RelationshipPatternConfiguration {
152153
val isCypherPattern = pattern.startsWith("(")
153154
val regex = if (isCypherPattern) {
154155
cypherRelationshipPatternConfigured
@@ -169,20 +170,20 @@ data class RelationshipPatternConfiguration(val start: NodePatternConfiguration,
169170
val metadata = RelationshipPatternMetaData.create(isCypherPattern, isLeftToRight, matcher.groupValues)
170171

171172
val start = try {
172-
getNodeConf(metadata.startPattern)
173+
getNodeConf(metadata.startPattern, mergeNodeProps)
173174
} catch (e: Exception) {
174175
throw IllegalArgumentException("The Relationship pattern $pattern is invalid")
175176
}
176177
val end = try {
177-
getNodeConf(metadata.endPattern)
178+
getNodeConf(metadata.endPattern, mergeNodeProps)
178179
} catch (e: Exception) {
179180
throw IllegalArgumentException("The Relationship pattern $pattern is invalid")
180181
}
181182
val type = getPatternConfiguredType(metadata.properties)
182183
isHomogeneousPattern(type, metadata.properties, pattern, "Relationship")
183184
val cleanedProperties = cleanProperties(type, metadata.properties)
184185
return RelationshipPatternConfiguration(start = start, end = end, relType = metadata.relType,
185-
properties = cleanedProperties, type = type)
186+
properties = cleanedProperties, type = type, mergeProperties = mergeRelProps)
186187
}
187188
}
188189

‎common/src/main/kotlin/streams/service/sink/strategy/RelationshipPatternIngestionStrategy.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@ class RelationshipPatternIngestionStrategy(private val relationshipPatternConfig
1515
|MERGE (start${getLabelsAsString(relationshipPatternConfiguration.start.labels)}{${
1616
getNodeMergeKeys("start.keys", relationshipPatternConfiguration.start.keys)
1717
}})
18-
|SET start = event.start.properties
18+
|SET start ${if (relationshipPatternConfiguration.mergeProperties) "+" else ""}= event.start.properties
1919
|SET start += event.start.keys
2020
|MERGE (end${getLabelsAsString(relationshipPatternConfiguration.end.labels)}{${
2121
getNodeMergeKeys("end.keys", relationshipPatternConfiguration.end.keys)
2222
}})
23-
|SET end = event.end.properties
23+
|SET end ${if (relationshipPatternConfiguration.mergeProperties) "+" else ""}= event.end.properties
2424
|SET end += event.end.keys
2525
|MERGE (start)-[r:${relationshipPatternConfiguration.relType}]->(end)
26-
|SET r = event.properties
26+
|SET r ${if (relationshipPatternConfiguration.mergeProperties) "+" else ""}= event.properties
2727
""".trimMargin()
2828

2929
private val deleteRelationshipTemplate: String = """

‎common/src/main/kotlin/streams/service/sink/strategy/SourceIdIngestionStrategy.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ import streams.utils.IngestionUtils.getLabelsAsString
77
import streams.utils.SchemaUtils
88
import streams.utils.StreamsUtils
99

10-
data class SourceIdIngestionStrategyConfig(val labelName: String = "SourceEvent", val idName: String = "sourceId")
10+
data class SourceIdIngestionStrategyConfig(val labelName: String = "SourceEvent", val idName: String = "sourceId") {
11+
companion object {
12+
val DEFAULT = SourceIdIngestionStrategyConfig()
13+
}
14+
}
1115

1216
class SourceIdIngestionStrategy(config: SourceIdIngestionStrategyConfig = SourceIdIngestionStrategyConfig()): IngestionStrategy {
1317

‎common/src/test/kotlin/streams/service/sink/strategy/NodePatternIngestionStrategyTest.kt

+9-9
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class NodePatternIngestionStrategyTest {
1010
@Test
1111
fun `should get all properties`() {
1212
// given
13-
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id})")
13+
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id})", true)
1414
val strategy = NodePatternIngestionStrategy(config)
1515
val data = mapOf("id" to 1, "foo" to "foo", "bar" to "bar", "foobar" to "foobar")
1616

@@ -22,7 +22,7 @@ class NodePatternIngestionStrategyTest {
2222
assertEquals("""
2323
|${StreamsUtils.UNWIND}
2424
|MERGE (n:LabelA:LabelB{id: event.keys.id})
25-
|SET n = event.properties
25+
|SET n += event.properties
2626
|SET n += event.keys
2727
""".trimMargin(), queryEvents[0].query)
2828
assertEquals(listOf(mapOf("keys" to mapOf("id" to 1),
@@ -37,7 +37,7 @@ class NodePatternIngestionStrategyTest {
3737
@Test
3838
fun `should get nested properties`() {
3939
// given
40-
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id, foo.bar})")
40+
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id, foo.bar})", false)
4141
val strategy = NodePatternIngestionStrategy(config)
4242
val data = mapOf("id" to 1, "foo" to mapOf("bar" to "bar", "foobar" to "foobar"))
4343

@@ -65,7 +65,7 @@ class NodePatternIngestionStrategyTest {
6565
@Test
6666
fun `should exclude nested properties`() {
6767
// given
68-
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id, -foo})")
68+
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id, -foo})", false)
6969
val strategy = NodePatternIngestionStrategy(config)
7070
val map = mapOf("id" to 1, "foo" to mapOf("bar" to "bar", "foobar" to "foobar"), "prop" to 100)
7171

@@ -93,7 +93,7 @@ class NodePatternIngestionStrategyTest {
9393
@Test
9494
fun `should include nested properties`() {
9595
// given
96-
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id, foo})")
96+
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id, foo})", true)
9797
val strategy = NodePatternIngestionStrategy(config)
9898
val data = mapOf("id" to 1, "foo" to mapOf("bar" to "bar", "foobar" to "foobar"), "prop" to 100)
9999

@@ -106,7 +106,7 @@ class NodePatternIngestionStrategyTest {
106106
assertEquals("""
107107
|${StreamsUtils.UNWIND}
108108
|MERGE (n:LabelA:LabelB{id: event.keys.id})
109-
|SET n = event.properties
109+
|SET n += event.properties
110110
|SET n += event.keys
111111
""".trimMargin(),
112112
queryEvents[0].query)
@@ -121,7 +121,7 @@ class NodePatternIngestionStrategyTest {
121121
@Test
122122
fun `should exclude the properties`() {
123123
// given
124-
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id,-foo,-bar})")
124+
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id,-foo,-bar})", false)
125125
val strategy = NodePatternIngestionStrategy(config)
126126
val data = mapOf("id" to 1, "foo" to "foo", "bar" to "bar", "foobar" to "foobar")
127127

@@ -146,7 +146,7 @@ class NodePatternIngestionStrategyTest {
146146
@Test
147147
fun `should include the properties`() {
148148
// given
149-
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id,foo,bar})")
149+
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id,foo,bar})", false)
150150
val strategy = NodePatternIngestionStrategy(config)
151151
val data = mapOf("id" to 1, "foo" to "foo", "bar" to "bar", "foobar" to "foobar")
152152

@@ -170,7 +170,7 @@ class NodePatternIngestionStrategyTest {
170170
@Test
171171
fun `should delete the node`() {
172172
// given
173-
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id})")
173+
val config = NodePatternConfiguration.parse("(:LabelA:LabelB{!id})", true)
174174
val strategy = NodePatternIngestionStrategy(config)
175175
val data = mapOf("id" to 1, "foo" to "foo", "bar" to "bar", "foobar" to "foobar")
176176

‎common/src/test/kotlin/streams/service/sink/strategy/PatternConfigurationTest.kt

+66-66
Large diffs are not rendered by default.

‎common/src/test/kotlin/streams/service/sink/strategy/RelationshipPatternIngestionStrategyTest.kt

+11-11
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class RelationshipPatternIngestionStrategyTest {
1313
val startPattern = "LabelA{!idStart}"
1414
val endPattern = "LabelB{!idEnd}"
1515
val pattern = "(:$startPattern)-[:REL_TYPE]->(:$endPattern)"
16-
val config = RelationshipPatternConfiguration.parse(pattern)
16+
val config = RelationshipPatternConfiguration.parse(pattern, mergeNodeProps = false, mergeRelProps = true)
1717
val strategy = RelationshipPatternIngestionStrategy(config)
1818
val data = mapOf("idStart" to 1, "idEnd" to 2,
1919
"foo" to "foo",
@@ -28,13 +28,13 @@ class RelationshipPatternIngestionStrategyTest {
2828
assertEquals("""
2929
|${StreamsUtils.UNWIND}
3030
|MERGE (start:LabelA{idStart: event.start.keys.idStart})
31-
|SET start = event.start.properties
31+
|SET start += event.start.properties
3232
|SET start += event.start.keys
3333
|MERGE (end:LabelB{idEnd: event.end.keys.idEnd})
34-
|SET end = event.end.properties
34+
|SET end += event.end.properties
3535
|SET end += event.end.keys
3636
|MERGE (start)-[r:REL_TYPE]->(end)
37-
|SET r = event.properties
37+
|SET r += event.properties
3838
""".trimMargin(), queryEvents[0].query)
3939
assertEquals(listOf(mapOf("start" to mapOf("keys" to mapOf("idStart" to 1), "properties" to emptyMap()),
4040
"end" to mapOf("keys" to mapOf("idEnd" to 2), "properties" to emptyMap()),
@@ -50,7 +50,7 @@ class RelationshipPatternIngestionStrategyTest {
5050
val startPattern = "LabelA{!idStart}"
5151
val endPattern = "LabelB{!idEnd}"
5252
val pattern = "$startPattern REL_TYPE $endPattern"
53-
val config = RelationshipPatternConfiguration.parse(pattern)
53+
val config = RelationshipPatternConfiguration.parse(pattern, mergeNodeProps = false, mergeRelProps = false)
5454
val strategy = RelationshipPatternIngestionStrategy(config)
5555
val data = mapOf("idStart" to 1, "idEnd" to 2,
5656
"foo" to "foo",
@@ -87,7 +87,7 @@ class RelationshipPatternIngestionStrategyTest {
8787
val startPattern = "LabelA{!idStart}"
8888
val endPattern = "LabelB{!idEnd}"
8989
val pattern = "(:$endPattern)<-[:REL_TYPE]-(:$startPattern)"
90-
val config = RelationshipPatternConfiguration.parse(pattern)
90+
val config = RelationshipPatternConfiguration.parse(pattern, mergeNodeProps = false, mergeRelProps = false)
9191
val strategy = RelationshipPatternIngestionStrategy(config)
9292
val data = mapOf("idStart" to 1, "idEnd" to 2,
9393
"foo" to "foo",
@@ -124,7 +124,7 @@ class RelationshipPatternIngestionStrategyTest {
124124
val startPattern = "LabelA{!idStart, foo.mapFoo}"
125125
val endPattern = "LabelB{!idEnd, bar.mapBar}"
126126
val pattern = "(:$startPattern)-[:REL_TYPE]->(:$endPattern)"
127-
val config = RelationshipPatternConfiguration.parse(pattern)
127+
val config = RelationshipPatternConfiguration.parse(pattern, mergeNodeProps = false, mergeRelProps = true)
128128
val strategy = RelationshipPatternIngestionStrategy(config)
129129
val data = mapOf("idStart" to 1, "idEnd" to 2,
130130
"foo" to mapOf("mapFoo" to "mapFoo"),
@@ -141,13 +141,13 @@ class RelationshipPatternIngestionStrategyTest {
141141
assertEquals("""
142142
|${StreamsUtils.UNWIND}
143143
|MERGE (start:LabelA{idStart: event.start.keys.idStart})
144-
|SET start = event.start.properties
144+
|SET start += event.start.properties
145145
|SET start += event.start.keys
146146
|MERGE (end:LabelB{idEnd: event.end.keys.idEnd})
147-
|SET end = event.end.properties
147+
|SET end += event.end.properties
148148
|SET end += event.end.keys
149149
|MERGE (start)-[r:REL_TYPE]->(end)
150-
|SET r = event.properties
150+
|SET r += event.properties
151151
""".trimMargin(), queryEvents[0].query)
152152
assertEquals(listOf(
153153
mapOf("start" to mapOf("keys" to mapOf("idStart" to 1), "properties" to mapOf("foo.mapFoo" to "mapFoo")),
@@ -165,7 +165,7 @@ class RelationshipPatternIngestionStrategyTest {
165165
val startPattern = "LabelA{!idStart}"
166166
val endPattern = "LabelB{!idEnd}"
167167
val pattern = "(:$startPattern)-[:REL_TYPE]->(:$endPattern)"
168-
val config = RelationshipPatternConfiguration.parse(pattern)
168+
val config = RelationshipPatternConfiguration.parse(pattern, mergeNodeProps = false, mergeRelProps = false)
169169
val strategy = RelationshipPatternIngestionStrategy(config)
170170
val data = mapOf("idStart" to 1, "idEnd" to 2,
171171
"foo" to "foo",

‎doc/package-lock.json

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfig.kt

+19-3
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): Neo4jConnectorConfig(confi
2828
val kafkaBrokerProperties: Map<String, Any?>
2929

3030
init {
31-
val sourceIdStrategyConfig = SourceIdIngestionStrategyConfig(getString(TOPIC_CDC_SOURCE_ID_LABEL_NAME), getString(TOPIC_CDC_SOURCE_ID_ID_NAME))
3231
topics = Topics.from(originals as Map<String, Any?>, "streams.sink." to "neo4j.")
33-
strategyMap = TopicUtils.toStrategyMap(topics, sourceIdStrategyConfig)
32+
strategyMap = TopicUtils.toStrategyMap(topics)
3433

3534
parallelBatches = getBoolean(BATCH_PARALLELIZE)
3635
val kafkaPrefix = "kafka."
@@ -68,9 +67,18 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): Neo4jConnectorConfig(confi
6867
const val TOPIC_CDC_SOURCE_ID_ID_NAME = "neo4j.topic.cdc.sourceId.idName"
6968
const val TOPIC_PATTERN_NODE_PREFIX = "neo4j.topic.pattern.node."
7069
const val TOPIC_PATTERN_RELATIONSHIP_PREFIX = "neo4j.topic.pattern.relationship."
70+
const val TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED = "neo4j.topic.pattern.merge.node.properties.enabled"
71+
const val TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED = "neo4j.topic.pattern.merge.relationship.properties.enabled"
7172
const val TOPIC_CDC_SCHEMA = "neo4j.topic.cdc.schema"
7273
const val TOPIC_CUD = "neo4j.topic.cud"
7374

75+
76+
const val DEFAULT_BATCH_PARALLELIZE = true
77+
const val DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED = false
78+
const val DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED = false
79+
80+
81+
7482
private val sourceIdIngestionStrategyConfig = SourceIdIngestionStrategyConfig()
7583

7684
fun config(): ConfigDef = Neo4jConnectorConfig.config()
@@ -92,11 +100,19 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): Neo4jConnectorConfig(confi
92100
.build())
93101
.define(ConfigKeyBuilder.of(BATCH_PARALLELIZE, ConfigDef.Type.BOOLEAN)
94102
.documentation(PropertiesUtil.getProperty(BATCH_PARALLELIZE)).importance(ConfigDef.Importance.MEDIUM)
95-
.defaultValue(true).group(ConfigGroup.BATCH)
103+
.defaultValue(DEFAULT_BATCH_PARALLELIZE).group(ConfigGroup.BATCH)
96104
.build())
97105
.define(ConfigKeyBuilder.of(TOPIC_CUD, ConfigDef.Type.STRING)
98106
.documentation(PropertiesUtil.getProperty(TOPIC_CUD)).importance(ConfigDef.Importance.HIGH)
99107
.defaultValue("").group(ConfigGroup.TOPIC_CYPHER_MAPPING)
100108
.build())
109+
.define(ConfigKeyBuilder.of(TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED, ConfigDef.Type.BOOLEAN)
110+
.documentation(PropertiesUtil.getProperty(TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED)).importance(ConfigDef.Importance.MEDIUM)
111+
.defaultValue(DEFAULT_TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED).group(ConfigGroup.TOPIC_CYPHER_MAPPING)
112+
.build())
113+
.define(ConfigKeyBuilder.of(TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED, ConfigDef.Type.BOOLEAN)
114+
.documentation(PropertiesUtil.getProperty(TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED)).importance(ConfigDef.Importance.MEDIUM)
115+
.defaultValue(DEFAULT_TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED).group(ConfigGroup.TOPIC_CYPHER_MAPPING)
116+
.build())
101117
}
102118
}

‎kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jStrategyStorage.kt

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class Neo4jStrategyStorage(val config: Neo4jSinkConnectorConfig): StreamsStrateg
1818
when (val topicConfig = topicConfigMap.getOrDefault(topicType, emptyList<Any>())) {
1919
is Collection<*> -> topicConfig.contains(topic)
2020
is Map<*, *> -> topicConfig.containsKey(topic)
21+
is Pair<*, *> -> (topicConfig.first as Set<String>).contains(topic)
2122
else -> false
2223
}
2324
}

‎common/src/main/kotlin/streams/service/Topics.kt ‎kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/utils/Topics.kt

+30-21
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package streams.service
22

3+
import streams.kafka.connect.sink.Neo4jSinkConnectorConfig
34
import streams.service.sink.strategy.*
45
import kotlin.reflect.jvm.javaType
56

@@ -11,29 +12,22 @@ private fun TopicType.replaceKeyBy(replacePrefix: Pair<String, String>) = if (re
1112
this.key.replace(replacePrefix.first, replacePrefix.second)
1213

1314
data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
14-
val cdcSourceIdTopics: Set<String> = emptySet(),
15+
val cdcSourceIdTopics: Pair<Set<String>, SourceIdIngestionStrategyConfig> = (emptySet<String>() to SourceIdIngestionStrategyConfig()),
1516
val cdcSchemaTopics: Set<String> = emptySet(),
1617
val cudTopics: Set<String> = emptySet(),
1718
val nodePatternTopics: Map<String, NodePatternConfiguration> = emptyMap(),
1819
val relPatternTopics: Map<String, RelationshipPatternConfiguration> = emptyMap(),
1920
val invalid: List<String> = emptyList()) {
2021

21-
operator fun plus(other: Topics): Topics {
22-
return Topics(cypherTopics = this.cypherTopics + other.cypherTopics,
23-
cdcSourceIdTopics = this.cdcSourceIdTopics + other.cdcSourceIdTopics,
24-
cdcSchemaTopics = this.cdcSchemaTopics + other.cdcSchemaTopics,
25-
cudTopics = this.cudTopics + other.cudTopics,
26-
nodePatternTopics = this.nodePatternTopics + other.nodePatternTopics,
27-
relPatternTopics = this.relPatternTopics + other.relPatternTopics,
28-
invalid = this.invalid + other.invalid)
29-
}
30-
3122
fun allTopics(): List<String> = this.asMap()
3223
.map {
33-
if (it.key.group == TopicTypeGroup.CDC || it.key.group == TopicTypeGroup.CUD) {
34-
(it.value as Set<String>).toList()
35-
} else {
36-
(it.value as Map<String, Any>).keys.toList()
24+
when (it.key.group) {
25+
TopicTypeGroup.CDC, TopicTypeGroup.CUD -> if (it.key != TopicType.CDC_SOURCE_ID) {
26+
(it.value as Set<String>).toList()
27+
} else {
28+
(it.value as Pair<Set<String>, SourceIdIngestionStrategyConfig>).first
29+
}
30+
else -> (it.value as Map<String, Any>).keys.toList()
3731
}
3832
}
3933
.flatten()
@@ -43,7 +37,10 @@ data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
4337
TopicType.PATTERN_NODE to nodePatternTopics, TopicType.PATTERN_RELATIONSHIP to relPatternTopics)
4438

4539
companion object {
46-
fun from(map: Map<String, Any?>, replacePrefix: Pair<String, String> = ("" to ""), dbName: String = "", invalidTopics: List<String> = emptyList()): Topics {
40+
fun from(map: Map<String, Any?>,
41+
replacePrefix: Pair<String, String> = ("" to ""),
42+
dbName: String = "",
43+
invalidTopics: List<String> = emptyList()): Topics {
4744
val config = map
4845
.filterKeys { if (dbName.isNotBlank()) it.toLowerCase().endsWith(".to.$dbName") else !it.contains(".to.") }
4946
.mapKeys { if (dbName.isNotBlank()) it.key.replace(".to.$dbName", "", true) else it.key }
@@ -54,16 +51,25 @@ data class Topics(val cypherTopics: Map<String, String> = emptyMap(),
5451
val nodePatterKey = TopicType.PATTERN_NODE.replaceKeyBy(replacePrefix)
5552
val relPatterKey = TopicType.PATTERN_RELATIONSHIP.replaceKeyBy(replacePrefix)
5653
val cypherTopics = TopicUtils.filterByPrefix(config, cypherTopicPrefix)
54+
val mergeNodeProperties = map[Neo4jSinkConnectorConfig.TOPIC_PATTERN_MERGE_NODE_PROPERTIES_ENABLED]
55+
.toString()
56+
.toBoolean()
57+
val mergeRelProperties = map[Neo4jSinkConnectorConfig.TOPIC_PATTERN_MERGE_RELATIONSHIP_PROPERTIES_ENABLED]
58+
.toString()
59+
.toBoolean()
5760
val nodePatternTopics = TopicUtils
5861
.filterByPrefix(config, nodePatterKey, invalidTopics)
59-
.mapValues { NodePatternConfiguration.parse(it.value) }
62+
.mapValues { NodePatternConfiguration.parse(it.value,mergeNodeProperties) }
6063
val relPatternTopics = TopicUtils
6164
.filterByPrefix(config, relPatterKey, invalidTopics)
62-
.mapValues { RelationshipPatternConfiguration.parse(it.value) }
65+
.mapValues { RelationshipPatternConfiguration.parse(it.value, mergeNodeProperties, mergeRelProperties) }
6366
val cdcSourceIdTopics = TopicUtils.splitTopics(config[sourceIdKey] as? String, invalidTopics)
6467
val cdcSchemaTopics = TopicUtils.splitTopics(config[schemaKey] as? String, invalidTopics)
6568
val cudTopics = TopicUtils.splitTopics(config[cudKey] as? String, invalidTopics)
66-
return Topics(cypherTopics, cdcSourceIdTopics, cdcSchemaTopics, cudTopics, nodePatternTopics, relPatternTopics)
69+
val sourceIdStrategyConfig = SourceIdIngestionStrategyConfig(
70+
map.getOrDefault(Neo4jSinkConnectorConfig.TOPIC_CDC_SOURCE_ID_LABEL_NAME, SourceIdIngestionStrategyConfig.DEFAULT.labelName).toString(),
71+
map.getOrDefault(Neo4jSinkConnectorConfig.TOPIC_CDC_SOURCE_ID_ID_NAME, SourceIdIngestionStrategyConfig.DEFAULT.idName).toString())
72+
return Topics(cypherTopics, (cdcSourceIdTopics to sourceIdStrategyConfig), cdcSchemaTopics, cudTopics, nodePatternTopics, relPatternTopics)
6773
}
6874
}
6975
}
@@ -104,12 +110,15 @@ object TopicUtils {
104110
}
105111
}
106112

107-
fun toStrategyMap(topics: Topics, sourceIdStrategyConfig: SourceIdIngestionStrategyConfig): Map<TopicType, Any> {
113+
fun toStrategyMap(topics: Topics): Map<TopicType, Any> {
108114
return topics.asMap()
109115
.filterKeys { it != TopicType.CYPHER }
110116
.mapValues { (type, config) ->
111117
when (type) {
112-
TopicType.CDC_SOURCE_ID -> SourceIdIngestionStrategy(sourceIdStrategyConfig)
118+
TopicType.CDC_SOURCE_ID -> {
119+
val (topics, sourceIdStrategyConfig) = (config as Pair<Set<String>, SourceIdIngestionStrategyConfig>)
120+
SourceIdIngestionStrategy(sourceIdStrategyConfig)
121+
}
113122
TopicType.CDC_SCHEMA -> SchemaIngestionStrategy()
114123
TopicType.CUD -> CUDIngestionStrategy()
115124
TopicType.PATTERN_NODE -> {

‎kafka-connect-neo4j/src/main/resources/kafka-connect-neo4j.properties

+5
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ neo4j.topic.cdc.schema=Type: String;\nDescription: The topic list (separated by
4545
neo4j.batch.parallelize=Type: Boolean;\nDescription: If enabled messages are processed concurrently in the sink. \
4646
Non concurrent execution supports in-order processing, e.g. for CDC (default true)
4747
neo4j.topic.cud=Type: String;\nDescription: The topic list (separated by semicolon) that manages CUD events
48+
neo4j.topic.pattern.merge.node.properties.enabled=Type: Boolean;\nDescription: If enabled nodes properties will be merged when \
49+
using Sink `Node pattern` strategy (default false). In case of using Sink `Relationship pattern` strategy edge nodes properties will be merged when \
50+
creating relationships (default false)
51+
neo4j.topic.pattern.merge.relationship.properties.enabled=Type: Boolean;\nDescription: If enabled relationships properties will be merged when creating relationships \
52+
using Sink `Relationship pattern` strategy (default false)
4853

4954
## Source Properties
5055
topic=Type: String;\nDescription: The topic where the Source will publish the data

‎pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
<jackson.version>2.13.4</jackson.version>
5858
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
5959
<neo4j.java.driver.version>4.4.9</neo4j.java.driver.version>
60-
<testcontainers.version>1.15.1</testcontainers.version>
60+
<testcontainers.version>1.17.6</testcontainers.version>
6161
<avro.version>1.8.2</avro.version>
6262
<mokito.version>4.8.0</mokito.version>
6363
<junit.version>4.13.2</junit.version>

‎test-support/src/main/kotlin/streams/Neo4jContainerExtension.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private class DatabasesWaitStrategy(private val auth: AuthToken): AbstractWaitSt
5757
}
5858

5959
class Neo4jContainerExtension(dockerImage: String): Neo4jContainer<Neo4jContainerExtension>(dockerImage) {
60-
constructor(): this("neo4j:4.4.10-enterprise")
60+
constructor(): this("neo4j:5.1-enterprise")
6161
private val logger = LoggerFactory.getLogger(Neo4jContainerExtension::class.java)
6262
var driver: Driver? = null
6363
var session: Session? = null
@@ -100,7 +100,7 @@ class Neo4jContainerExtension(dockerImage: String): Neo4jContainer<Neo4jContaine
100100
}
101101

102102
fun withKafka(kafka: KafkaContainer): Neo4jContainerExtension {
103-
return withKafka(kafka.network, kafka.networkAliases.map { "$it:9092" }.joinToString(","))
103+
return withKafka(kafka.network!!, kafka.networkAliases.map { "$it:9092" }.joinToString(","))
104104
}
105105

106106
fun withKafka(network: Network, bootstrapServers: String): Neo4jContainerExtension {

0 commit comments

Comments
 (0)
Please sign in to comment.