Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6355200
Added function to parallelise uploads. Still need to address non-thre…
k0raph Jan 11, 2026
6aef506
Fixed notification manager updates jumping around and improved thread…
k0raph Jan 11, 2026
1a5bac6
fixed FileUploadHelper to make sure uploads are correctly reflecting …
k0raph Jan 11, 2026
df66baa
removed magic number
k0raph Jan 12, 2026
a13d265
Merge branch 'master' into parallelise-uploads
k0raph Jan 12, 2026
032f7cf
extracted various classes, favouring dependency injection to make tes…
k0raph Jan 18, 2026
61ea286
Merge branch 'parallelise-uploads' of github.com:k0raph/nextcloud-and…
k0raph Jan 18, 2026
cdbfd9f
extracted various classes, favouring dependency injection to make tes…
k0raph Jan 18, 2026
3c31dbe
added a prefer param to allow the user to configure the number of con…
k0raph Jan 18, 2026
99ae6f8
Merge pull request #1 from k0raph/user-configurable-max-concurrent-up…
k0raph Jan 18, 2026
ac3f5c1
Merge branch 'master' into parallelise-uploads
k0raph Jan 18, 2026
0100b1f
addressed codacy issues
k0raph Jan 19, 2026
4d641dc
Merge branch 'parallelise-uploads' of github.com:k0raph/nextcloud-and…
k0raph Jan 19, 2026
fea4862
Merge branch 'master' into parallelise-uploads
k0raph Jan 19, 2026
40188f2
ran spotless apply
k0raph Jan 20, 2026
a1f1232
Merge branch 'master' into parallelise-uploads
k0raph Jan 20, 2026
cf8eb86
Merge branch 'master' into parallelise-uploads
k0raph Jan 20, 2026
67bdf0b
Explorting alternative solution - parallelising workers - tests added
k0raph Jan 20, 2026
bda6728
Merge branch 'parallelise-uploads' of github.com:k0raph/nextcloud-and…
k0raph Jan 20, 2026
94872b2
fixed FileUploadWorker to correctly update progress of uploading file…
k0raph Jan 20, 2026
d92b579
ran spotless apply
k0raph Jan 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.nextcloud.client.database.entity.toUploadEntity
import com.nextcloud.client.device.BatteryStatus
import com.nextcloud.client.device.PowerManagementService
import com.nextcloud.client.jobs.BackgroundJobManager
import com.nextcloud.client.jobs.upload.FileUploadWorker.Companion.currentUploadFileOperation
import com.nextcloud.client.jobs.upload.FileUploadWorker.Companion.activeUploadFileOperations
import com.nextcloud.client.network.Connectivity
import com.nextcloud.client.network.ConnectivityService
import com.nextcloud.utils.extensions.getUploadIds
Expand Down Expand Up @@ -360,17 +360,12 @@ class FileUploadHelper {

@Suppress("ReturnCount")
fun isUploadingNow(upload: OCUpload?): Boolean {
val currentUploadFileOperation = currentUploadFileOperation
if (currentUploadFileOperation == null || currentUploadFileOperation.user == null) return false
if (upload == null || upload.accountName != currentUploadFileOperation.user.accountName) return false

return if (currentUploadFileOperation.oldFile != null) {
// For file conflicts check old file remote path
upload.remotePath == currentUploadFileOperation.remotePath ||
upload.remotePath == currentUploadFileOperation.oldFile!!
.remotePath
} else {
upload.remotePath == currentUploadFileOperation.remotePath
upload ?: return false

return activeUploadFileOperations.values.any { operation ->
operation.user?.accountName == upload.accountName &&
(upload.remotePath == operation.remotePath ||
upload.remotePath == operation.oldFile?.remotePath)
}
}

Expand Down
184 changes: 124 additions & 60 deletions app/src/main/java/com/nextcloud/client/jobs/upload/FileUploadWorker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,17 @@ import com.owncloud.android.operations.UploadFileOperation
import com.owncloud.android.ui.notifications.NotificationUtils
import com.owncloud.android.utils.theme.ViewThemeUtils
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.withContext
import java.io.File
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.random.Random

@Suppress("LongParameterList", "TooGenericExceptionCaught")
Expand All @@ -73,10 +81,15 @@ class FileUploadWorker(
const val UPLOAD_IDS = "uploads_ids"
const val CURRENT_BATCH_INDEX = "batch_index"
const val TOTAL_UPLOAD_SIZE = "total_upload_size"
const val SHOW_SAME_FILE_ALREADY_EXISTS_NOTIFICATION = "show_same_file_already_exists_notification"

var currentUploadFileOperation: UploadFileOperation? = null
/**
* The maximum number of concurrent parallel uploads
*/
const val MAX_CONCURRENT_UPLOADS = 10

const val SHOW_SAME_FILE_ALREADY_EXISTS_NOTIFICATION = "show_same_file_already_exists_notification"

val activeUploadFileOperations = ConcurrentHashMap<String, UploadFileOperation>()
private const val UPLOADS_ADDED_MESSAGE = "UPLOADS_ADDED"
private const val UPLOAD_START_MESSAGE = "UPLOAD_START"
private const val UPLOAD_FINISH_MESSAGE = "UPLOAD_FINISH"
Expand All @@ -103,20 +116,18 @@ class FileUploadWorker(
fun getUploadFinishMessage(): String = FileUploadWorker::class.java.name + UPLOAD_FINISH_MESSAGE

fun cancelCurrentUpload(remotePath: String, accountName: String, onCompleted: () -> Unit) {
currentUploadFileOperation?.let {
activeUploadFileOperations.values.forEach {
if (it.remotePath == remotePath && it.user.accountName == accountName) {
it.cancel(ResultCode.USER_CANCELLED)
onCompleted()
}
}
onCompleted()
}

fun isUploading(remotePath: String?, accountName: String?): Boolean {
currentUploadFileOperation?.let {
return it.remotePath == remotePath && it.user.accountName == accountName
return activeUploadFileOperations.values.any {
it.remotePath == remotePath && it.user.accountName == accountName
}

return false
}

fun getUploadAction(action: String): Int = when (action) {
Expand All @@ -127,7 +138,9 @@ class FileUploadWorker(
}
}

private var lastPercent = 0
private val lastPercents = ConcurrentHashMap<String, Int>()
private val lastUpdateTimes = ConcurrentHashMap<String, Long>()

private val notificationId = Random.nextInt()
private val notificationManager = UploadNotificationManager(context, viewThemeUtils, notificationId)
private val intents = FileUploaderIntents(context)
Expand Down Expand Up @@ -202,7 +215,7 @@ class FileUploadWorker(
Log_OC.e(TAG, "FileUploadWorker stopped")

setIdleWorkerState()
currentUploadFileOperation?.cancel(null)
activeUploadFileOperations.values.forEach { it.cancel(null) }
notificationManager.dismissNotification()
}

Expand All @@ -211,7 +224,8 @@ class FileUploadWorker(
}

private fun setIdleWorkerState() {
WorkerStateObserver.send(WorkerState.FileUploadCompleted(currentUploadFileOperation?.file))
val lastOp = activeUploadFileOperations.values.lastOrNull()
WorkerStateObserver.send(WorkerState.FileUploadCompleted(lastOp?.file))
}

@Suppress("ReturnCount", "LongMethod", "DEPRECATION")
Expand Down Expand Up @@ -253,52 +267,95 @@ class FileUploadWorker(
val ocAccount = OwnCloudAccount(user.toPlatformAccount(), context)
val client = OwnCloudClientManagerFactory.getDefaultSingleton().getClientFor(ocAccount, context)

for ((index, upload) in uploads.withIndex()) {
ensureActive()
return@withContext parallelUpload(
uploads,
user,
previouslyUploadedFileSize,
totalUploadSize,
client,
accountName
)
}

if (preferences.isGlobalUploadPaused) {
Log_OC.d(TAG, "Upload is paused, skip uploading files!")
notificationManager.notifyPaused(
intents.openUploadListIntent(null)
)
return@withContext Result.success()
}
private suspend fun parallelUpload(
uploads: List<OCUpload>?,
user: User,
previouslyUploadedFileSize: Int,
totalUploadSize: Int,
client: OwnCloudClient,
accountName: String
): Result {
if (uploads.isNullOrEmpty()) {
return Result.success()
}

if (canExitEarly()) {
notificationManager.showConnectionErrorNotification()
return@withContext Result.failure()
}
val semaphore = Semaphore(MAX_CONCURRENT_UPLOADS)
val quotaExceeded = AtomicBoolean(false)
val completedCount = AtomicInteger(0)
val storageManager = FileDataStorageManager(user, context.contentResolver)

coroutineScope {
for (upload in uploads) {
if (quotaExceeded.get()) break
ensureActive()

launch {
if (preferences.isGlobalUploadPaused) {
Log_OC.d(TAG, "Upload is paused, skip uploading files!")
notificationManager.notifyPaused(intents.openUploadListIntent(null))
return@launch
}

setWorkerState(user)
val operation = createUploadFileOperation(upload, user)
currentUploadFileOperation = operation
semaphore.withPermit {
if (quotaExceeded.get() || isStopped) return@launch

val currentIndex = (index + 1)
val currentUploadIndex = (currentIndex + previouslyUploadedFileSize)
notificationManager.prepareForStart(
operation,
startIntent = intents.openUploadListIntent(operation),
currentUploadIndex = currentUploadIndex,
totalUploadSize = totalUploadSize
)
if (canExitEarly()) {
notificationManager.showConnectionErrorNotification()
return@launch
}

val result = withContext(Dispatchers.IO) {
upload(operation, user, client)
}
val entity = uploadsStorageManager.uploadDao.getUploadById(upload.uploadId, accountName)
uploadsStorageManager.updateStatus(entity, result.isSuccess)
currentUploadFileOperation = null

if (result.code == ResultCode.QUOTA_EXCEEDED) {
Log_OC.w(TAG, "Quota exceeded, stopping uploads")
notificationManager.showQuotaExceedNotification(operation)
break
}
setWorkerState(user)
val operation = createUploadFileOperation(upload, user, storageManager)
activeUploadFileOperations[operation.originalStoragePath] = operation

try {
val currentUploadIndex = previouslyUploadedFileSize + completedCount.incrementAndGet()

// Synchronize notification updates
synchronized(notificationManager) {
notificationManager.prepareForStart(
operation,
startIntent = intents.openUploadListIntent(operation),
currentUploadIndex = currentUploadIndex,
totalUploadSize = totalUploadSize
)
}

val result = upload(operation, user, client)

val entity = uploadsStorageManager.uploadDao.getUploadById(upload.uploadId, accountName)
uploadsStorageManager.updateStatus(entity, result.isSuccess)

if (result.code == ResultCode.QUOTA_EXCEEDED) {
Log_OC.w(TAG, "Quota exceeded, stopping uploads")
notificationManager.showQuotaExceedNotification(operation)
quotaExceeded.set(true)
this@coroutineScope.cancel("Quota exceeded")
return@launch
}

sendUploadFinishEvent(totalUploadSize, currentUploadIndex, operation, result)
sendUploadFinishEvent(totalUploadSize, currentUploadIndex, operation, result)
} finally {
activeUploadFileOperations.remove(operation.originalStoragePath)
lastPercents.remove(operation.originalStoragePath)
lastUpdateTimes.remove(operation.originalStoragePath)
}
}
}
}
}

return@withContext Result.success()
return if (quotaExceeded.get()) Result.failure() else Result.success()
}

private fun sendUploadFinishEvent(
Expand Down Expand Up @@ -336,7 +393,11 @@ class FileUploadWorker(
return result
}

private fun createUploadFileOperation(upload: OCUpload, user: User): UploadFileOperation = UploadFileOperation(
private fun createUploadFileOperation(
upload: OCUpload,
user: User,
storageManager: FileDataStorageManager
): UploadFileOperation = UploadFileOperation(
uploadsStorageManager,
connectivityService,
powerManagementService,
Expand All @@ -349,7 +410,7 @@ class FileUploadWorker(
upload.isUseWifiOnly,
upload.isWhileChargingOnly,
true,
FileDataStorageManager(user, context.contentResolver)
storageManager
).apply {
addDataTransferProgressListener(this@FileUploadWorker)
}
Expand Down Expand Up @@ -410,20 +471,24 @@ class FileUploadWorker(
totalToTransfer: Long,
fileAbsoluteName: String
) {
val operation = activeUploadFileOperations[fileAbsoluteName] ?: return
val percent = getPercent(totalTransferredSoFar, totalToTransfer)
val currentTime = System.currentTimeMillis()

val lastPercent = lastPercents[fileAbsoluteName] ?: 0
val lastUpdateTime = lastUpdateTimes[fileAbsoluteName] ?: 0L

if (percent != lastPercent && (currentTime - lastUpdateTime) >= minProgressUpdateInterval) {
notificationManager.run {
val accountName = currentUploadFileOperation?.user?.accountName
val remotePath = currentUploadFileOperation?.remotePath
synchronized(notificationManager) {
val accountName = operation.user.accountName
val remotePath = operation.remotePath

updateUploadProgress(percent, currentUploadFileOperation)
notificationManager.updateUploadProgress(percent, operation)

if (accountName != null && remotePath != null) {
val key: String = FileUploadHelper.buildRemoteName(accountName, remotePath)
val boundListener = FileUploadHelper.mBoundListeners[key]
val filename = currentUploadFileOperation?.fileName ?: ""
val filename = operation.fileName ?: ""

boundListener?.onTransferProgress(
progressRate,
Expand All @@ -433,11 +498,10 @@ class FileUploadWorker(
)
}

dismissOldErrorNotification(currentUploadFileOperation)
notificationManager.dismissOldErrorNotification(operation)
}
lastUpdateTime = currentTime
lastUpdateTimes[fileAbsoluteName] = currentTime
lastPercents[fileAbsoluteName] = percent
}

lastPercent = percent
}
}