Skip to content

Commit 174dadd

Browse files
mapanmapan1984
authored andcommitted
Improve ChunkCache
Change-Id: I9c876701b14b72d476f7a700465c05f6b870b5c1
1 parent 5e8f4a0 commit 174dadd

File tree

1 file changed

+26
-37
lines changed

1 file changed

+26
-37
lines changed

core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/ChunkCache.java

Lines changed: 26 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.concurrent.ForkJoinPool;
2727
import java.util.concurrent.TimeUnit;
2828
import java.util.concurrent.TimeoutException;
29-
import java.util.concurrent.atomic.AtomicReference;
3029

3130
import org.apache.kafka.common.Configurable;
3231

@@ -79,44 +78,34 @@ public InputStream getChunk(final ObjectKey objectKey,
7978
final var currentChunk = manifest.chunkIndex().chunks().get(chunkId);
8079
startPrefetching(objectKey, manifest, currentChunk.originalPosition + currentChunk.originalSize);
8180
final ChunkKey chunkKey = new ChunkKey(objectKey.value(), chunkId);
82-
final AtomicReference<InputStream> result = new AtomicReference<>();
83-
try {
84-
return cache.asMap()
85-
.compute(chunkKey, (key, val) -> {
86-
final CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
87-
if (val == null) {
88-
statsCounter.recordMiss();
89-
try {
90-
final InputStream chunk =
91-
chunkManager.getChunk(objectKey, manifest, chunkId);
92-
final T t = this.cacheChunk(chunkKey, chunk);
93-
result.getAndSet(cachedChunkToInputStream(t));
94-
return t;
95-
} catch (final StorageBackendException | IOException e) {
96-
throw new CompletionException(e);
97-
}
98-
} else {
99-
statsCounter.recordHit();
100-
try {
101-
final T cachedChunk = val.get();
102-
result.getAndSet(cachedChunkToInputStream(cachedChunk));
103-
return cachedChunk;
104-
} catch (final InterruptedException | ExecutionException e) {
105-
throw new CompletionException(e);
106-
}
107-
}
108-
}, executor);
10981

110-
future.whenComplete((r, ex) -> {
111-
if (ex != null) {
112-
cache.asMap().remove(key, future);
113-
}
114-
});
82+
final CompletableFuture<T> future = cache.asMap().compute(chunkKey, (key, existing) -> {
83+
if (existing != null) {
84+
statsCounter.recordHit();
85+
return existing;
86+
}
87+
88+
statsCounter.recordMiss();
89+
final CompletableFuture<T> created = CompletableFuture.supplyAsync(() -> {
90+
try (InputStream chunk = chunkManager.getChunk(objectKey, manifest, chunkId)) {
91+
return this.cacheChunk(chunkKey, chunk);
92+
} catch (final StorageBackendException | IOException e) {
93+
throw new CompletionException(e);
94+
}
95+
});
96+
97+
created.whenComplete((r, ex) -> {
98+
if (ex != null) {
99+
cache.asMap().remove(key, created);
100+
}
101+
});
115102

116-
return future;
117-
})
118-
.thenApplyAsync(t -> result.get())
119-
.get(getTimeout.toMillis(), TimeUnit.MILLISECONDS);
103+
return created;
104+
});
105+
106+
try {
107+
T t = future.get(getTimeout.toMillis(), TimeUnit.MILLISECONDS);
108+
return this.cachedChunkToInputStream(t);
120109
} catch (final ExecutionException e) {
121110
// Unwrap previously wrapped exceptions if possible.
122111
final Throwable cause = e.getCause();

0 commit comments

Comments
 (0)