Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Parallelized map and optimize Database search API #2669

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ import com.google.common.truth.Truth.assertThat
import java.math.BigDecimal
import java.time.Instant
import java.util.Date
import junit.framework.TestCase.assertTrue
import kotlin.system.measureTimeMillis
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.runBlocking
Expand Down Expand Up @@ -5169,6 +5172,24 @@ class DatabaseImplTest {
assertThat(localChangeResourceReferences.size).isEqualTo(locallyCreatedPatients.size)
}

@Test
fun `pmap_shouldExecuteMappingTasksInParallel`() = runBlocking {
val numberList = listOf(2, 3)
var squaredNumberList: List<Int>

val timeTaken = measureTimeMillis {
squaredNumberList =
numberList.pmap {
delay(1000L)
it * 2
}
}
assertTrue(squaredNumberList.isNotEmpty())
assertThat(squaredNumberList.first()).isEqualTo(4)
assertThat(squaredNumberList.last()).isEqualTo(6)
assertTrue(timeTaken < 2000)
}

private companion object {
const val mockEpochTimeStamp = 1628516301000
const val TEST_PATIENT_1_ID = "test_patient_1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import androidx.annotation.VisibleForTesting
import androidx.room.Room
import androidx.room.withTransaction
import androidx.sqlite.db.SimpleSQLiteQuery
import ca.uhn.fhir.context.FhirContext
import ca.uhn.fhir.parser.IParser
import ca.uhn.fhir.util.FhirTerser
import com.google.android.fhir.DatabaseErrorStrategy
Expand All @@ -42,6 +43,10 @@ import com.google.android.fhir.toLocalChange
import com.google.android.fhir.updateMeta
import java.time.Instant
import java.util.UUID
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import org.hl7.fhir.r4.model.IdType
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType
Expand Down Expand Up @@ -227,8 +232,11 @@ internal class DatabaseImpl(
query: SearchQuery,
): List<ResourceWithUUID<R>> {
return db.withTransaction {
resourceDao.getResources(SimpleSQLiteQuery(query.query, query.args.toTypedArray())).map {
ResourceWithUUID(it.uuid, iParser.parseResource(it.serializedResource) as R)
resourceDao.getResources(SimpleSQLiteQuery(query.query, query.args.toTypedArray())).pmap {
ResourceWithUUID(
it.uuid,
FhirContext.forR4Cached().newJsonParser().parseResource(it.serializedResource) as R,
)
}
}
}
Expand All @@ -239,11 +247,12 @@ internal class DatabaseImpl(
return db.withTransaction {
resourceDao
.getForwardReferencedResources(SimpleSQLiteQuery(query.query, query.args.toTypedArray()))
.map {
.pmap {
ForwardIncludeSearchResult(
it.matchingIndex,
it.baseResourceUUID,
iParser.parseResource(it.serializedResource) as Resource,
FhirContext.forR4Cached().newJsonParser().parseResource(it.serializedResource)
as Resource,
)
}
}
Expand All @@ -255,11 +264,12 @@ internal class DatabaseImpl(
return db.withTransaction {
resourceDao
.getReverseReferencedResources(SimpleSQLiteQuery(query.query, query.args.toTypedArray()))
.map {
.pmap {
ReverseIncludeSearchResult(
it.matchingIndex,
it.baseResourceTypeAndId,
iParser.parseResource(it.serializedResource) as Resource,
FhirContext.forR4Cached().newJsonParser().parseResource(it.serializedResource)
as Resource,
)
}
}
Expand Down Expand Up @@ -460,6 +470,11 @@ internal class DatabaseImpl(
}
}

/** Implementation of a parallelized map */
suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you rename pmap to something which recommends to pass functions doing CPU intensive work.
May be "pmapCPU" ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes total sense because of the Dispatcher constraint

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had restricted it for use in the DB search API class but with the rename I could potentially move it out to the generic Utils class for reuse elsewhere.

map { async(Dispatchers.Default) { f(it) } }.awaitAll()
}

internal data class DatabaseConfig(
val inMemory: Boolean,
val enableEncryption: Boolean,
Expand Down
2 changes: 1 addition & 1 deletion engine/src/test/java/com/google/android/fhir/UtilTest.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Google LLC
* Copyright 2022-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Loading