Skip to content

Commit abdb028

Browse files
committed
Automatically do initial refresh when adding sync methods
1 parent 009178a commit abdb028

File tree

5 files changed

+70
-7
lines changed

5 files changed

+70
-7
lines changed

sync-core/src/main/java/eu/darken/octi/sync/core/SyncExtensions.kt

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package eu.darken.octi.sync.core
22

33
import kotlinx.coroutines.flow.Flow
4+
import kotlinx.coroutines.flow.first
45
import kotlinx.coroutines.flow.map
6+
import kotlinx.coroutines.flow.mapNotNull
57

68

79
inline fun <reified T : SyncConnector> SyncManager.getConnectorById(identifier: ConnectorId): Flow<T> {
@@ -27,4 +29,10 @@ fun Collection<SyncRead>.latestData(): Collection<SyncRead.Device> = this
2729
override val modules: Collection<SyncRead.Device.Module> = newestModules.values
2830
}
2931
}
30-
.toList()
32+
.toList()
33+
34+
suspend inline fun <reified T : SyncConnector> ConnectorHub.findConnector(crossinline matcher: (T) -> Boolean): T {
35+
return connectors.mapNotNull { connectors ->
36+
connectors.map { it as T }.singleOrNull { matcher(it) }
37+
}.first()
38+
}

syncs-gdrive/src/main/java/eu/darken/octi/syncs/gdrive/GDriveModule.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import dagger.Module
55
import dagger.hilt.InstallIn
66
import dagger.hilt.components.SingletonComponent
77
import dagger.multibindings.IntoSet
8+
import eu.darken.octi.sync.core.ConnectorHub
89
import eu.darken.octi.syncs.gdrive.core.GDriveHub
910

1011
@InstallIn(SingletonComponent::class)
@@ -13,5 +14,5 @@ abstract class GDriveModule {
1314

1415
@Binds
1516
@IntoSet
16-
abstract fun hub(hub: GDriveHub): eu.darken.octi.sync.core.ConnectorHub
17+
abstract fun hub(hub: GDriveHub): ConnectorHub
1718
}

syncs-gdrive/src/main/java/eu/darken/octi/syncs/gdrive/core/GDriveAppDataConnector.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,12 @@ class GDriveAppDataConnector @AssistedInject constructor(
127127
runDriveAction("delete-device: $deviceId") {
128128
appDataRoot.child(DEVICE_DATA_DIR_NAME)
129129
?.listFiles()
130-
?.onEach { log(TAG, DEBUG) { "deleteDevice(): Checking $it" } }
131-
?.singleOrNull { it.name == deviceId.id }
132-
?.onEach { log(TAG, INFO) { "deleteDevice(): Deleting $it" } }
130+
?.onEach { log(TAG, DEBUG) { "deleteDevice(): Checking device dir ${it.name}" } }
131+
?.singleOrNull { file ->
132+
(file.name == deviceId.id).also {
133+
if (it) log(TAG) { "deleteDevice(): Deleting device dir $file" }
134+
}
135+
}
133136
?.deleteAll()
134137
if (deviceId == syncSettings.deviceId) {
135138
log(TAG, WARN) { "We just deleted ourselves, this connector is dead now" }

syncs-gdrive/src/main/java/eu/darken/octi/syncs/gdrive/core/GDriveHub.kt

+27-2
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,28 @@ import com.google.api.client.googleapis.extensions.android.gms.auth.UserRecovera
44
import eu.darken.octi.common.coroutine.AppScope
55
import eu.darken.octi.common.coroutine.DispatcherProvider
66
import eu.darken.octi.common.debug.logging.Logging.Priority.ERROR
7+
import eu.darken.octi.common.debug.logging.Logging.Priority.INFO
8+
import eu.darken.octi.common.debug.logging.Logging.Priority.WARN
79
import eu.darken.octi.common.debug.logging.asLog
810
import eu.darken.octi.common.debug.logging.log
911
import eu.darken.octi.common.debug.logging.logTag
1012
import eu.darken.octi.common.flow.setupCommonEventHandlers
1113
import eu.darken.octi.common.flow.shareLatest
1214
import eu.darken.octi.sync.core.ConnectorHub
1315
import eu.darken.octi.sync.core.ConnectorId
14-
import eu.darken.octi.sync.core.SyncConnector
16+
import eu.darken.octi.sync.core.SyncOptions
1517
import eu.darken.octi.sync.core.SyncSettings
1618
import kotlinx.coroutines.CoroutineScope
1719
import kotlinx.coroutines.NonCancellable
1820
import kotlinx.coroutines.flow.Flow
21+
import kotlinx.coroutines.flow.catch
22+
import kotlinx.coroutines.flow.distinctUntilChangedBy
23+
import kotlinx.coroutines.flow.drop
1924
import kotlinx.coroutines.flow.first
25+
import kotlinx.coroutines.flow.launchIn
26+
import kotlinx.coroutines.flow.map
2027
import kotlinx.coroutines.flow.mapLatest
28+
import kotlinx.coroutines.flow.onEach
2129
import kotlinx.coroutines.plus
2230
import kotlinx.coroutines.withContext
2331
import javax.inject.Inject
@@ -39,7 +47,24 @@ class GDriveHub @Inject constructor(
3947
.setupCommonEventHandlers(TAG) { "connectors" }
4048
.shareLatest(scope + dispatcherProvider.Default)
4149

42-
override val connectors: Flow<Collection<SyncConnector>> = _connectors
50+
override val connectors: Flow<Collection<GDriveAppDataConnector>> = _connectors
51+
52+
init {
53+
_connectors
54+
.drop(1) // Initial launch
55+
.distinctUntilChangedBy { connectors -> connectors.map { it.account } }
56+
.map { connectors -> connectors.filter { it.data.first() == null } }
57+
.onEach { connectors ->
58+
// Connectors that have been added and have no data yet
59+
connectors.forEach { connector ->
60+
log(TAG, INFO) { "Syncing initial data for ${connector.account}" }
61+
connector.sync(SyncOptions())
62+
log(TAG, INFO) { "Initial data sync done for ${connector.account}" }
63+
}
64+
}
65+
.catch { log(TAG, WARN) { "Initial sync flow failed\n${it.asLog()}" } }
66+
.launchIn(scope)
67+
}
4368

4469
override suspend fun owns(connectorId: ConnectorId): Boolean {
4570
return _connectors.first().any { it.identifier == connectorId }

syncs-kserver/src/main/java/eu/darken/octi/syncs/kserver/core/KServerHub.kt

+26
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package eu.darken.octi.syncs.kserver.core
33
import eu.darken.octi.common.coroutine.AppScope
44
import eu.darken.octi.common.coroutine.DispatcherProvider
55
import eu.darken.octi.common.debug.logging.Logging.Priority.ERROR
6+
import eu.darken.octi.common.debug.logging.Logging.Priority.INFO
7+
import eu.darken.octi.common.debug.logging.Logging.Priority.WARN
68
import eu.darken.octi.common.debug.logging.asLog
79
import eu.darken.octi.common.debug.logging.log
810
import eu.darken.octi.common.debug.logging.logTag
@@ -11,12 +13,19 @@ import eu.darken.octi.common.flow.shareLatest
1113
import eu.darken.octi.sync.core.ConnectorHub
1214
import eu.darken.octi.sync.core.ConnectorId
1315
import eu.darken.octi.sync.core.SyncConnector
16+
import eu.darken.octi.sync.core.SyncOptions
1417
import eu.darken.octi.sync.core.SyncSettings
1518
import kotlinx.coroutines.CoroutineScope
1619
import kotlinx.coroutines.NonCancellable
1720
import kotlinx.coroutines.flow.Flow
21+
import kotlinx.coroutines.flow.catch
22+
import kotlinx.coroutines.flow.distinctUntilChangedBy
23+
import kotlinx.coroutines.flow.drop
1824
import kotlinx.coroutines.flow.first
25+
import kotlinx.coroutines.flow.launchIn
26+
import kotlinx.coroutines.flow.map
1927
import kotlinx.coroutines.flow.mapLatest
28+
import kotlinx.coroutines.flow.onEach
2029
import kotlinx.coroutines.plus
2130
import kotlinx.coroutines.withContext
2231
import javax.inject.Inject
@@ -41,6 +50,23 @@ class KServerHub @Inject constructor(
4150

4251
override val connectors: Flow<Collection<SyncConnector>> = _connectors
4352

53+
init {
54+
_connectors
55+
.drop(1) // Initial launch
56+
.distinctUntilChangedBy { connectors -> connectors.map { it.credentials } }
57+
.map { connectors -> connectors.filter { it.data.first() == null } }
58+
.onEach { connectors ->
59+
// Connectors that have been added and have no data yet
60+
connectors.forEach { connector ->
61+
log(TAG, INFO) { "Syncing initial data for ${connector.credentials}" }
62+
connector.sync(SyncOptions())
63+
log(TAG, INFO) { "Initial data sync done for ${connector.credentials}" }
64+
}
65+
}
66+
.catch { log(TAG, WARN) { "Initial sync flow failed\n${it.asLog()}" } }
67+
.launchIn(scope)
68+
}
69+
4470
override suspend fun owns(connectorId: ConnectorId): Boolean {
4571
return _connectors.first().any { it.identifier == connectorId }
4672
}

0 commit comments

Comments
 (0)