Skip to content

Commit 6bfff36

Browse files
authored
Improved plugin start in case of an high number of databases (#528)
1 parent 11f0ff1 commit 6bfff36

File tree

7 files changed

+107
-73
lines changed

7 files changed

+107
-73
lines changed

common/src/main/kotlin/streams/config/StreamsConfig.kt

+25-33
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,20 @@ package streams.config
33
import kotlinx.coroutines.runBlocking
44
import kotlinx.coroutines.sync.Mutex
55
import kotlinx.coroutines.sync.withLock
6-
import org.neo4j.dbms.api.DatabaseManagementService
76
import org.neo4j.kernel.internal.GraphDatabaseAPI
87
import org.neo4j.logging.Log
98
import org.neo4j.logging.internal.LogService
109
import org.neo4j.plugin.configuration.ConfigurationLifecycle
1110
import org.neo4j.plugin.configuration.ConfigurationLifecycleUtils
1211
import org.neo4j.plugin.configuration.EventType
1312
import org.neo4j.plugin.configuration.listners.ConfigurationLifecycleListener
14-
import streams.extensions.databaseManagementService
15-
import streams.extensions.getDefaultDbName
16-
import streams.extensions.isAvailable
1713
import streams.utils.Neo4jUtils
18-
import streams.utils.ProcedureUtils
1914
import streams.utils.StreamsUtils
2015
import java.io.File
2116
import java.util.concurrent.ConcurrentHashMap
2217
import java.util.concurrent.atomic.AtomicReference
2318

24-
class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementService) {
19+
class StreamsConfig(private val log: Log) {
2520

2621
companion object {
2722
private const val SUN_JAVA_COMMAND = "sun.java.command"
@@ -41,6 +36,8 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
4136
const val POLL_INTERVAL = "streams.sink.poll.interval"
4237
const val INSTANCE_WAIT_TIMEOUT = "streams.wait.timeout"
4338
const val INSTANCE_WAIT_TIMEOUT_VALUE = 120000L
39+
const val CONFIG_WAIT_FOR_AVAILABLE = "streams.wait.for.available"
40+
const val CONFIG_WAIT_FOR_AVAILABLE_VALUE = true
4441

4542
private const val DEFAULT_TRIGGER_PERIOD: Int = 10000
4643

@@ -59,8 +56,8 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
5956

6057
fun getInstance(db: GraphDatabaseAPI): StreamsConfig = cache.computeIfAbsent(StreamsUtils.getName(db)) {
6158
StreamsConfig(log = db.dependencyResolver
62-
.resolveDependency(LogService::class.java)
63-
.getUserLog(StreamsConfig::class.java), db.databaseManagementService())
59+
.resolveDependency(LogService::class.java)
60+
.getUserLog(StreamsConfig::class.java))
6461
}
6562

6663
fun removeInstance(db: GraphDatabaseAPI) {
@@ -83,11 +80,13 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
8380
fun getSystemDbWaitTimeout(config: Map<String, Any?>) = config.getOrDefault(SYSTEM_DB_WAIT_TIMEOUT, SYSTEM_DB_WAIT_TIMEOUT_VALUE).toString().toLong()
8481

8582
fun getInstanceWaitTimeout(config: Map<String, Any?>) = config.getOrDefault(INSTANCE_WAIT_TIMEOUT, INSTANCE_WAIT_TIMEOUT_VALUE).toString().toLong()
83+
84+
fun isWaitForAvailable(config: Map<String, Any?>) = config.getOrDefault(CONFIG_WAIT_FOR_AVAILABLE, CONFIG_WAIT_FOR_AVAILABLE_VALUE).toString().toBoolean()
8685
}
8786

8887
private val configLifecycle: ConfigurationLifecycle
8988

90-
private enum class Status {RUNNING, STOPPED, CLOSED, UNKNOWN}
89+
enum class Status {RUNNING, STARTING, STOPPED, CLOSED, UNKNOWN}
9190

9291
private val status = AtomicReference(Status.UNKNOWN)
9392

@@ -100,37 +99,30 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
10099
true, log, true, "streams.", "kafka.")
101100
}
102101

103-
fun start() = runBlocking {
102+
fun start(db: GraphDatabaseAPI) = runBlocking {
104103
if (log.isDebugEnabled) {
105104
log.debug("Starting StreamsConfig")
106105
}
107106
mutex.withLock {
108-
if (status.get() == Status.RUNNING) return@runBlocking
107+
if (setOf(Status.RUNNING, Status.STARTING).contains(status.get())) return@runBlocking
109108
try {
110-
// wait for all database to be ready
111-
val isInstanceReady = StreamsUtils.blockUntilFalseOrTimeout(getInstanceWaitTimeout()) {
112-
if (log.isDebugEnabled) {
113-
log.debug("Waiting for the Neo4j instance to be ready...")
114-
}
115-
dbms.isAvailable(100)
116-
}
117-
if (!isInstanceReady) {
118-
log.warn("${getInstanceWaitTimeout()} ms have passed and the instance is not online, the Streams plugin will not started")
119-
return@runBlocking
120-
}
121-
if (ProcedureUtils.isCluster(dbms)) {
122-
log.info("We're in cluster instance waiting for the ${StreamsUtils.LEADER}s to be elected in each database")
123-
// in case is a cluster we wait for the correct cluster formation => LEADER elected
124-
Neo4jUtils.waitForTheLeaders(dbms, log) { configStart() }
109+
if (isWaitForAvailable()) {
110+
status.set(Status.STARTING)
111+
Neo4jUtils.waitForAvailable(db, log, getInstanceWaitTimeout(), { status.set(Status.UNKNOWN) }) { configStart() }
125112
} else {
126113
configStart()
127114
}
128115
} catch (e: Exception) {
129116
log.warn("Cannot start StreamsConfig because of the following exception:", e)
117+
status.set(Status.UNKNOWN)
130118
}
131119
}
132120
}
133121

122+
fun startEager() = runBlocking {
123+
configStart()
124+
}
125+
134126
private fun configStart() = try {
135127
configLifecycle.start()
136128
status.set(Status.RUNNING)
@@ -139,14 +131,16 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
139131
log.error("Cannot start the StreamsConfig because of the following exception", e)
140132
}
141133

134+
fun status(): Status = status.get()
135+
142136
fun stop(shutdown: Boolean = false) = runBlocking {
143137
if (log.isDebugEnabled) {
144138
log.debug("Stopping StreamsConfig")
145139
}
146140
mutex.withLock {
147-
val status = getStopStatus(shutdown)
148-
if (this@StreamsConfig.status.get() == status) return@runBlocking
149-
configStop(shutdown, status)
141+
val stopStatus = getStopStatus(shutdown)
142+
if (status.get() == stopStatus) return@runBlocking
143+
configStop(shutdown, stopStatus)
150144
}
151145
}
152146

@@ -201,10 +195,6 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
201195

202196
fun getConfiguration(): Map<String, Any> = ConfigurationLifecycleUtils.toMap(configLifecycle.configuration)
203197

204-
fun defaultDbName() = this.dbms.getDefaultDbName()
205-
206-
fun isDefaultDb(dbName: String) = this.defaultDbName() == dbName
207-
208198
fun isSourceGloballyEnabled() = Companion.isSourceGloballyEnabled(getConfiguration())
209199

210200
fun isSourceEnabled(dbName: String) = Companion.isSourceEnabled(getConfiguration(), dbName)
@@ -221,4 +211,6 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
221211

222212
fun getInstanceWaitTimeout() = Companion.getInstanceWaitTimeout(getConfiguration())
223213

214+
fun isWaitForAvailable() = Companion.isWaitForAvailable(getConfiguration())
215+
224216
}

common/src/main/kotlin/streams/config/StreamsConfigProcedures.kt

+36-6
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
package streams.configuration
1+
package streams.config
22

33
import org.neo4j.graphdb.GraphDatabaseService
44
import org.neo4j.kernel.internal.GraphDatabaseAPI
55
import org.neo4j.logging.Log
66
import org.neo4j.procedure.*
7-
import streams.config.StreamsConfig
87
import streams.events.KeyValueResult
98
import java.util.stream.Stream
109

@@ -24,7 +23,7 @@ class StreamsConfigProcedures {
2423
var db: GraphDatabaseService? = null
2524

2625
@Admin
27-
@Procedure
26+
@Procedure("streams.configuration.set")
2827
@Description("""
2928
streams.configuration.set(<properties_map>, <config_map>) YIELD name, value
3029
""")
@@ -35,14 +34,13 @@ class StreamsConfigProcedures {
3534
}
3635
val map = properties.mapValues { it.value.toString() }
3736
val instance = StreamsConfig.getInstance(db!! as GraphDatabaseAPI)
38-
println("Instance hash: ${instance.hashCode()}")
3937
val cfg = StreamsConfigProceduresConfiguration(config)
4038
instance.setProperties(map, cfg.save)
4139
return get()
4240
}
4341

4442
@Admin
45-
@Procedure
43+
@Procedure("streams.configuration.remove")
4644
@Description("""
4745
streams.configuration.remove(<properties_list>, <config_map>) YIELD name, value
4846
""")
@@ -58,7 +56,7 @@ class StreamsConfigProcedures {
5856
}
5957

6058
@Admin
61-
@Procedure
59+
@Procedure("streams.configuration.get")
6260
@Description("""
6361
streams.configuration.get() YIELD name, value
6462
""")
@@ -67,4 +65,36 @@ class StreamsConfigProcedures {
6765
.entries
6866
.map { KeyValueResult(it.key, it.value) }
6967
.stream()
68+
69+
@Admin
70+
@Procedure("streams.configuration.status")
71+
@Description("""
72+
streams.configuration.status() YIELD status
73+
""")
74+
fun status(): Stream<KeyValueResult> = Stream.of(KeyValueResult("status", StreamsConfig.getInstance(db!! as GraphDatabaseAPI).status().toString()))
75+
76+
@Admin
77+
@Procedure("streams.configuration.start")
78+
@Description("""
79+
streams.configuration.start() YIELD status
80+
""")
81+
fun start(): Stream<KeyValueResult> {
82+
val streamsConfig = StreamsConfig.getInstance(db!! as GraphDatabaseAPI)
83+
if (!setOf(StreamsConfig.Status.RUNNING, StreamsConfig.Status.STARTING)
84+
.contains(streamsConfig.status())) {
85+
streamsConfig.startEager()
86+
}
87+
return Stream.of(KeyValueResult("status", streamsConfig.status().toString()))
88+
}
89+
90+
@Admin
91+
@Procedure("streams.configuration.stop")
92+
@Description("""
93+
streams.configuration.stop() YIELD status
94+
""")
95+
fun stop(): Stream<KeyValueResult> {
96+
val streamsConfig = StreamsConfig.getInstance(db!! as GraphDatabaseAPI)
97+
streamsConfig.stop()
98+
return Stream.of(KeyValueResult("status", streamsConfig.status().toString()))
99+
}
70100
}

common/src/main/kotlin/streams/extensions/DatabaseManagementServiceExtensions.kt

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package streams.extensions
33
import org.neo4j.dbms.api.DatabaseManagementService
44
import org.neo4j.kernel.internal.GraphDatabaseAPI
55
import streams.utils.StreamsUtils
6-
import java.util.concurrent.TimeUnit
76

87
fun DatabaseManagementService.getSystemDb() = this.database(StreamsUtils.SYSTEM_DATABASE_NAME) as GraphDatabaseAPI
98

common/src/main/kotlin/streams/utils/Neo4jUtils.kt

+42-29
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,18 @@ import kotlinx.coroutines.delay
66
import kotlinx.coroutines.launch
77
import org.neo4j.configuration.Config
88
import org.neo4j.configuration.GraphDatabaseSettings
9-
import org.neo4j.dbms.api.DatabaseManagementService
109
import org.neo4j.graphdb.QueryExecutionException
1110
import org.neo4j.kernel.internal.GraphDatabaseAPI
1211
import org.neo4j.logging.Log
1312
import streams.extensions.execute
14-
import java.lang.reflect.InvocationTargetException
1513
import kotlin.streams.toList
1614

1715
object Neo4jUtils {
1816
fun isWriteableInstance(db: GraphDatabaseAPI, availableAction: () -> Boolean = { true }): Boolean {
1917
try {
20-
val isSlave = StreamsUtils.ignoreExceptions(
21-
{
22-
val hadb = Class.forName("org.neo4j.kernel.ha.HighlyAvailableGraphDatabase")
23-
hadb.isInstance(db) && !(hadb.getMethod("isMaster").invoke(db) as Boolean)
24-
}, ClassNotFoundException::class.java, IllegalAccessException::class.java,
25-
InvocationTargetException::class.java, NoSuchMethodException::class.java)
26-
if (isSlave != null && isSlave) {
27-
return false
28-
}
29-
30-
return availableAction() && ProcedureUtils.clusterMemberRole(db).equals(StreamsUtils.LEADER, ignoreCase = true)
18+
return availableAction() && ProcedureUtils
19+
.clusterMemberRole(db)
20+
.equals(StreamsUtils.LEADER, ignoreCase = true)
3121
} catch (e: QueryExecutionException) {
3222
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
3323
return availableAction()
@@ -55,11 +45,8 @@ object Neo4jUtils {
5545
.toList()
5646
.contains(StreamsUtils.LEADER)
5747
}
58-
} catch (e: QueryExecutionException) {
59-
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
60-
false
61-
}
62-
throw e
48+
} catch (e: Exception) {
49+
false
6350
}
6451

6552
fun <T> executeInWriteableInstance(db: GraphDatabaseAPI,
@@ -70,24 +57,50 @@ object Neo4jUtils {
7057
null
7158
}
7259

73-
fun isClusterCorrectlyFormed(dbms: DatabaseManagementService) = dbms.listDatabases()
74-
.filterNot { it == StreamsUtils.SYSTEM_DATABASE_NAME }
75-
.map { dbms.database(it) as GraphDatabaseAPI }
76-
.all { clusterHasLeader(it) }
60+
fun waitForTheLeaders(db: GraphDatabaseAPI, log: Log, timeout: Long = 240000, onFailure: () -> Unit = {}, action: () -> Unit) {
61+
GlobalScope.launch(Dispatchers.IO) {
62+
val start = System.currentTimeMillis()
63+
val delay: Long = 2000
64+
var isClusterCorrectlyFormed: Boolean
65+
do {
66+
isClusterCorrectlyFormed = clusterHasLeader(db)
67+
if (!isClusterCorrectlyFormed) {
68+
log.info("${StreamsUtils.LEADER} not found, new check comes in $delay milliseconds...")
69+
delay(delay)
70+
}
71+
} while (!isClusterCorrectlyFormed && System.currentTimeMillis() - start < timeout)
72+
if (isClusterCorrectlyFormed) {
73+
log.debug("${StreamsUtils.LEADER} has been found")
74+
action()
75+
} else {
76+
log.warn("$timeout ms have passed and the ${StreamsUtils.LEADER} has not been elected, the Streams plugin will not started")
77+
onFailure()
78+
}
79+
}
80+
}
7781

78-
fun waitForTheLeaders(dbms: DatabaseManagementService, log: Log, timeout: Long = 120000, action: () -> Unit) {
82+
fun waitForAvailable(db: GraphDatabaseAPI, log: Log, timeout: Long = 240000, onFailure: () -> Unit = {}, action: () -> Unit) {
7983
GlobalScope.launch(Dispatchers.IO) {
8084
val start = System.currentTimeMillis()
8185
val delay: Long = 2000
82-
while (!isClusterCorrectlyFormed(dbms) && System.currentTimeMillis() - start < timeout) {
83-
log.info("${StreamsUtils.LEADER} not found, new check comes in $delay milliseconds...")
84-
delay(delay)
86+
var isAvailable: Boolean
87+
do {
88+
isAvailable = db.isAvailable(delay)
89+
if (!isAvailable) {
90+
log.debug("Waiting for Neo4j to be ready...")
91+
}
92+
} while (!isAvailable && System.currentTimeMillis() - start < timeout)
93+
if (isAvailable) {
94+
log.debug("Neo4j is ready")
95+
action()
96+
} else {
97+
log.warn("$timeout ms have passed and Neo4j is not online, the Streams plugin will not started")
98+
onFailure()
8599
}
86-
action()
87100
}
88101
}
89102

90103
fun isReadReplica(db: GraphDatabaseAPI): Boolean = db.dependencyResolver
91-
.resolveDependency(Config::class.java)
92-
.let { it.get(GraphDatabaseSettings.mode).name == "READ_REPLICA" }
104+
.resolveDependency(Config::class.java)
105+
.let { it.get(GraphDatabaseSettings.mode).name == "READ_REPLICA" }
93106
}

consumer/src/main/kotlin/streams/StreamsEventSinkAvailabilityListener.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class StreamsEventSinkAvailabilityListener(dependencies: StreamsEventSinkExtensi
2727
override fun available() = runBlocking {
2828
mutex.withLock {
2929
setAvailable(db, true)
30-
streamsConfig.start()
30+
streamsConfig.start(db)
3131
}
3232
}
3333

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -1307,8 +1307,8 @@ class Neo4jSinkTaskTest {
13071307
}
13081308
}
13091309

1310-
@Test()
1311-
@Ignore("Ignore, flaky")
1310+
@Test
1311+
@Ignore("flaky")
13121312
fun `should stop the query and fails with small timeout and vice versa`() {
13131313
val myTopic = "foo"
13141314
val props = mutableMapOf<String, String>()

producer/src/main/kotlin/streams/StreamsEventRouterAvailabilityListener.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class StreamsEventRouterAvailabilityListener(private val db: GraphDatabaseAPI,
2727
override fun available() = runBlocking {
2828
mutex.withLock {
2929
setAvailable(db, true)
30-
streamsConfig.start()
30+
streamsConfig.start(db)
3131
}
3232
}
3333

0 commit comments

Comments
 (0)