17
17
18
18
import java .io .IOException ;
19
19
import java .util .concurrent .CompletableFuture ;
20
+ import java .util .function .Consumer ;
20
21
import java .util .zip .CRC32 ;
21
22
22
23
import org .apache .lucene .store .IndexOutput ;
25
26
import com .apple .foundationdb .TransactionContext ;
26
27
import com .apple .foundationdb .async .AsyncUtil ;
27
28
import com .apple .foundationdb .subspace .Subspace ;
29
+ import com .cloudant .fdblucene .FDBDirectory .FileMetaData ;
28
30
29
31
public final class FDBIndexOutput extends IndexOutput {
30
32
@@ -59,6 +61,7 @@ private static byte[] pageKey(final Subspace subspace, final long pos, final int
59
61
60
62
private final FDBDirectory dir ;
61
63
private final TransactionContext txc ;
64
+ private final byte [] metaKey ;
62
65
private final Subspace subspace ;
63
66
private byte [] txnBuffer ;
64
67
@@ -74,10 +77,12 @@ private static byte[] pageKey(final Subspace subspace, final long pos, final int
74
77
private final int txnSize ;
75
78
76
79
FDBIndexOutput (final FDBDirectory dir , final String resourceDescription , final String name ,
77
- final TransactionContext txc , final Subspace subspace , final int pageSize , final int txnSize ) {
80
+ final TransactionContext txc , final byte [] metaKey , final Subspace subspace , final int pageSize ,
81
+ final int txnSize ) {
78
82
super (resourceDescription , name );
79
83
this .dir = dir ;
80
84
this .txc = txc ;
85
+ this .metaKey = metaKey ;
81
86
this .subspace = subspace ;
82
87
this .readVersionCache = new ReadVersionCache ();
83
88
this .pageSize = pageSize ;
@@ -97,7 +102,7 @@ public void close() throws IOException {
97
102
flushTxnBuffer (subspace , txn , txnBuffer , txnBufferOffset , pointer , pageSize );
98
103
txn .options ().setNextWriteNoWriteConflictRange ();
99
104
100
- dir . setFileLength (txn , getName () , pointer );
105
+ setFileLength (txn , pointer );
101
106
return null ;
102
107
});
103
108
}
@@ -149,7 +154,9 @@ private void flushTxnBuffer() {
149
154
lastFlushFuture = txc .runAsync (txn -> {
150
155
readVersionCache .setReadVersion (txn );
151
156
txn .options ().setTransactionLoggingEnable (String .format ("%s,out,flush,%d" , getName (), pointer ));
152
- flushTxnBuffer (subspace , txn , txnBuffer , txnBufferOffset , pointer , pageSize );
157
+ applyIfExists (txn , value -> {
158
+ flushTxnBuffer (subspace , txn , txnBuffer , txnBufferOffset , pointer , pageSize );
159
+ });
153
160
return AsyncUtil .DONE ;
154
161
});
155
162
}
@@ -160,4 +167,23 @@ private void flushTxnBufferIfFull() {
160
167
}
161
168
}
162
169
170
+ private void setFileLength (final TransactionContext txc , final long length ) {
171
+ txc .run (txn -> {
172
+ applyIfExists (txn , value -> {
173
+ final FileMetaData meta = new FileMetaData (value ).setFileLength (length );
174
+ txn .set (metaKey , meta .pack ());
175
+ });
176
+ return null ;
177
+ });
178
+ }
179
+
180
+ private void applyIfExists (final Transaction txn , final Consumer <byte []> fun ) {
181
+ txn .get (metaKey ).thenApply (value -> {
182
+ if (value != null ) {
183
+ fun .accept (value );
184
+ }
185
+ return null ;
186
+ }).join ();
187
+ }
188
+
163
189
}
0 commit comments