17
17
18
18
import java .io .IOException ;
19
19
import java .util .concurrent .CompletableFuture ;
20
+ import java .util .function .Consumer ;
20
21
import java .util .zip .CRC32 ;
21
22
22
23
import org .apache .lucene .store .IndexOutput ;
@@ -153,7 +154,9 @@ private void flushTxnBuffer() {
153
154
lastFlushFuture = txc .runAsync (txn -> {
154
155
readVersionCache .setReadVersion (txn );
155
156
txn .options ().setTransactionLoggingEnable (String .format ("%s,out,flush,%d" , getName (), pointer ));
156
- flushTxnBuffer (subspace , txn , txnBuffer , txnBufferOffset , pointer , pageSize );
157
+ applyIfExists (txn , value -> {
158
+ flushTxnBuffer (subspace , txn , txnBuffer , txnBufferOffset , pointer , pageSize );
159
+ });
157
160
return AsyncUtil .DONE ;
158
161
});
159
162
}
@@ -166,11 +169,21 @@ private void flushTxnBufferIfFull() {
166
169
167
170
private void setFileLength (final TransactionContext txc , final long length ) {
168
171
txc .run (txn -> {
169
- final byte [] value = txn .get (metaKey ).join ();
170
- final FileMetaData meta = new FileMetaData (value ).setFileLength (length );
171
- txn .set (metaKey , meta .pack ());
172
+ applyIfExists (txn , value -> {
173
+ final FileMetaData meta = new FileMetaData (value ).setFileLength (length );
174
+ txn .set (metaKey , meta .pack ());
175
+ });
172
176
return null ;
173
177
});
174
178
}
175
179
180
+ private void applyIfExists (final Transaction txn , final Consumer <byte []> fun ) {
181
+ txn .get (metaKey ).thenApply (value -> {
182
+ if (value != null ) {
183
+ fun .accept (value );
184
+ }
185
+ return null ;
186
+ }).join ();
187
+ }
188
+
176
189
}
0 commit comments