Skip to content

Commit 89a73e1

Browse files
committed
refactor: deduplicate bloom filter logic and simplify serialization
Consolidate duplicated buildBloomFilter and leading-component switch logic from S3SailStore and Compactor into BloomFilter. Replace hand-rolled byte serialization with ByteBuffer. Merge queryQuads overloads and eliminate double shouldCompact evaluation in compaction.
1 parent 80fd7f7 commit 89a73e1

4 files changed

Lines changed: 75 additions & 135 deletions

File tree

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/S3SailStore.java

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ private void writeParquetFiles(long epoch, List<QuadEntry> allEntries, QuadStats
287287
ParquetSchemas.SortOrder sortOrder = ParquetSchemas.SortOrder.fromSuffix(sortSuffix);
288288
byte[] parquetData = ParquetFileBuilder.build(sorted, sortOrder);
289289

290-
BloomFilter bloom = buildBloomFilter(sorted, sortSuffix);
290+
BloomFilter bloom = BloomFilter.buildForEntries(sorted, sortSuffix);
291291

292292
String s3Key = Catalog.dataKey(0, epoch, sortSuffix);
293293
objectStore.put(s3Key, parquetData);
@@ -303,32 +303,6 @@ private void writeParquetFiles(long epoch, List<QuadEntry> allEntries, QuadStats
303303
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
304304
}
305305

306-
/**
307-
* Builds a bloom filter for the leading component of the given sort order.
308-
*/
309-
static BloomFilter buildBloomFilter(List<QuadEntry> entries, String sortSuffix) {
310-
BloomFilter bloom = new BloomFilter(Math.max(1, entries.size()), 0.01);
311-
for (QuadEntry entry : entries) {
312-
bloom.add(leadingComponent(entry, sortSuffix));
313-
}
314-
return bloom;
315-
}
316-
317-
private static long leadingComponent(QuadEntry entry, String sortSuffix) {
318-
switch (sortSuffix.charAt(0)) {
319-
case 's':
320-
return entry.subject;
321-
case 'o':
322-
return entry.object;
323-
case 'c':
324-
return entry.context;
325-
case 'p':
326-
return entry.predicate;
327-
default:
328-
return entry.subject;
329-
}
330-
}
331-
332306
private void persistMetadata(long epoch) {
333307
// Save catalog first: if we crash after catalog but before values,
334308
// on restart we have new nextValueId but old values — IDs are gaps (safe).
@@ -365,14 +339,14 @@ private void runCompactionIfNeeded() {
365339

366340
pendingCompaction = CompletableFuture.runAsync(() -> {
367341
try {
368-
doCompaction();
342+
doCompaction(needsL0, needsL1);
369343
} catch (Exception e) {
370344
logger.error("Background compaction failed", e);
371345
}
372346
}, compactionExecutor);
373347
}
374348

375-
private void doCompaction() {
349+
private void doCompaction(boolean compactL0, boolean compactL1) {
376350
List<Compactor.CompactionResult> results = new ArrayList<>();
377351

378352
// Snapshot file list under synchronization
@@ -382,7 +356,7 @@ private void doCompaction() {
382356
}
383357

384358
// L0→L1 compaction
385-
if (compactionPolicy.shouldCompact(files, 0)) {
359+
if (compactL0) {
386360
List<Catalog.ParquetFileInfo> l0Files = CompactionPolicy.filesAtLevel(files, 0);
387361
long compactEpoch = epochCounter.getAndIncrement();
388362
results.add(compactor.compact(l0Files, 0, 1, compactEpoch, catalog));
@@ -391,8 +365,8 @@ private void doCompaction() {
391365
}
392366
}
393367

394-
// L1→L2 compaction
395-
if (compactionPolicy.shouldCompact(files, 1)) {
368+
// L1→L2 compaction (re-check after L0 compaction may have produced new L1 files)
369+
if (compactL1 || (compactL0 && compactionPolicy.shouldCompact(files, 1))) {
396370
List<Catalog.ParquetFileInfo> l1Files = CompactionPolicy.filesAtLevel(files, 1);
397371
long compactEpoch = epochCounter.getAndIncrement();
398372
results.add(compactor.compact(l1Files, 1, 2, compactEpoch, catalog));
@@ -423,16 +397,8 @@ private boolean hasPersistence() {
423397
}
424398

425399
/**
426-
* Queries quads using the best available source (merged Parquet + MemTable, or MemTable only).
427-
*/
428-
private Iterator<long[]> queryQuads(long s, long p, long o, long c, boolean explicit) {
429-
return hasPersistence()
430-
? createMergedIterator(s, p, o, c, explicit, null)
431-
: memTable.scan(s, p, o, c, explicit);
432-
}
433-
434-
/**
435-
* Queries quads with a preferred index hint.
400+
* Queries quads using the best available source (merged Parquet + MemTable, or MemTable only). If
401+
* {@code preferredIndex} is non-null, it is used instead of automatic index selection.
436402
*/
437403
private Iterator<long[]> queryQuads(long s, long p, long o, long c, boolean explicit,
438404
QuadIndex preferredIndex) {
@@ -500,7 +466,7 @@ CloseableIteration<? extends Statement> createStatementIterator(
500466
ArrayList<CloseableIteration<? extends Statement>> perContextIterList = new ArrayList<>(contextIDList.size());
501467

502468
for (long contextID : contextIDList) {
503-
Iterator<long[]> quads = queryQuads(subjID, predID, objID, contextID, explicit);
469+
Iterator<long[]> quads = queryQuads(subjID, predID, objID, contextID, explicit, null);
504470
perContextIterList.add(new QuadToStatementIteration(quads, valueStore));
505471
}
506472

@@ -746,7 +712,7 @@ private long removeStatements(Resource subj, IRI pred, Value obj, boolean explic
746712

747713
long removeCount = 0;
748714
for (long contextId : contextIds) {
749-
Iterator<long[]> iter = queryQuads(subjID, predID, objID, contextId, explicit);
715+
Iterator<long[]> iter = queryQuads(subjID, predID, objID, contextId, explicit, null);
750716

751717
// Buffer results before removing to avoid ConcurrentModificationException
752718
// when the iterator is backed by the MemTable's own map

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/BloomFilter.java

Lines changed: 63 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
*******************************************************************************/
1111
package org.eclipse.rdf4j.sail.s3.storage;
1212

13+
import java.nio.ByteBuffer;
1314
import java.util.Base64;
15+
import java.util.List;
1416

1517
/**
1618
* A simple bit-array bloom filter for long values. Uses two independent hash functions derived from a single
@@ -78,31 +80,81 @@ public boolean mightContain(long value) {
7880
return true;
7981
}
8082

83+
/**
84+
* Builds a bloom filter for the leading component of the given sort order.
85+
*/
86+
public static BloomFilter buildForEntries(List<QuadEntry> entries, String sortSuffix) {
87+
BloomFilter bloom = new BloomFilter(Math.max(1, entries.size()), 0.01);
88+
for (QuadEntry entry : entries) {
89+
bloom.add(leadingComponent(entry, sortSuffix));
90+
}
91+
return bloom;
92+
}
93+
94+
/**
95+
* Extracts the leading component value from a quad entry based on the sort order suffix.
96+
*/
97+
static long leadingComponent(QuadEntry entry, String sortSuffix) {
98+
switch (sortSuffix.charAt(0)) {
99+
case 's':
100+
return entry.subject;
101+
case 'o':
102+
return entry.object;
103+
case 'c':
104+
return entry.context;
105+
case 'p':
106+
return entry.predicate;
107+
default:
108+
return entry.subject;
109+
}
110+
}
111+
112+
/**
113+
* Extracts the leading component value from raw quad IDs based on the sort order suffix.
114+
*/
115+
static long leadingComponent(long s, long p, long o, long c, String sortOrder) {
116+
if (sortOrder == null) {
117+
return -1;
118+
}
119+
switch (sortOrder.charAt(0)) {
120+
case 's':
121+
return s;
122+
case 'p':
123+
return p;
124+
case 'o':
125+
return o;
126+
case 'c':
127+
return c;
128+
default:
129+
return -1;
130+
}
131+
}
132+
81133
/**
82134
* Serializes this bloom filter to a Base64-encoded string for JSON storage.
83135
*/
84136
public String toBase64() {
85137
// Format: [numBits (4 bytes)] [numHashFunctions (4 bytes)] [bits array (8 bytes each)]
86-
byte[] data = new byte[8 + bits.length * 8];
87-
writeInt(data, 0, numBits);
88-
writeInt(data, 4, numHashFunctions);
89-
for (int i = 0; i < bits.length; i++) {
90-
writeLong(data, 8 + i * 8, bits[i]);
138+
ByteBuffer buf = ByteBuffer.allocate(8 + bits.length * 8);
139+
buf.putInt(numBits);
140+
buf.putInt(numHashFunctions);
141+
for (long word : bits) {
142+
buf.putLong(word);
91143
}
92-
return Base64.getEncoder().encodeToString(data);
144+
return Base64.getEncoder().encodeToString(buf.array());
93145
}
94146

95147
/**
96148
* Deserializes a bloom filter from a Base64-encoded string.
97149
*/
98150
public static BloomFilter fromBase64(String encoded) {
99-
byte[] data = Base64.getDecoder().decode(encoded);
100-
int numBits = readInt(data, 0);
101-
int numHash = readInt(data, 4);
102-
int arrayLen = (data.length - 8) / 8;
151+
ByteBuffer buf = ByteBuffer.wrap(Base64.getDecoder().decode(encoded));
152+
int numBits = buf.getInt();
153+
int numHash = buf.getInt();
154+
int arrayLen = buf.remaining() / 8;
103155
long[] bits = new long[arrayLen];
104156
for (int i = 0; i < arrayLen; i++) {
105-
bits[i] = readLong(data, 8 + i * 8);
157+
bits[i] = buf.getLong();
106158
}
107159
return new BloomFilter(bits, numBits, numHash);
108160
}
@@ -125,33 +177,4 @@ private static int optimalNumHashFunctions(int n, int m) {
125177
return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
126178
}
127179

128-
private static void writeInt(byte[] buf, int offset, int value) {
129-
buf[offset] = (byte) (value >>> 24);
130-
buf[offset + 1] = (byte) (value >>> 16);
131-
buf[offset + 2] = (byte) (value >>> 8);
132-
buf[offset + 3] = (byte) value;
133-
}
134-
135-
private static void writeLong(byte[] buf, int offset, long value) {
136-
buf[offset] = (byte) (value >>> 56);
137-
buf[offset + 1] = (byte) (value >>> 48);
138-
buf[offset + 2] = (byte) (value >>> 40);
139-
buf[offset + 3] = (byte) (value >>> 32);
140-
buf[offset + 4] = (byte) (value >>> 24);
141-
buf[offset + 5] = (byte) (value >>> 16);
142-
buf[offset + 6] = (byte) (value >>> 8);
143-
buf[offset + 7] = (byte) value;
144-
}
145-
146-
private static int readInt(byte[] buf, int offset) {
147-
return ((buf[offset] & 0xFF) << 24) | ((buf[offset + 1] & 0xFF) << 16)
148-
| ((buf[offset + 2] & 0xFF) << 8) | (buf[offset + 3] & 0xFF);
149-
}
150-
151-
private static long readLong(byte[] buf, int offset) {
152-
return ((long) (buf[offset] & 0xFF) << 56) | ((long) (buf[offset + 1] & 0xFF) << 48)
153-
| ((long) (buf[offset + 2] & 0xFF) << 40) | ((long) (buf[offset + 3] & 0xFF) << 32)
154-
| ((long) (buf[offset + 4] & 0xFF) << 24) | ((long) (buf[offset + 5] & 0xFF) << 16)
155-
| ((long) (buf[offset + 6] & 0xFF) << 8) | (long) (buf[offset + 7] & 0xFF);
156-
}
157180
}

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/Catalog.java

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -415,28 +415,6 @@ public void setBloomFilter(BloomFilter filter) {
415415
this.bloomFilterBase64 = filter != null ? filter.toBase64() : null;
416416
}
417417

418-
/**
419-
* Returns the leading component's filter value for this file's sort order. SPOC → subject, OPSC → object, CSPO
420-
* → context.
421-
*/
422-
private long getLeadingFilterValue(long s, long p, long o, long c) {
423-
if (sortOrder == null) {
424-
return -1;
425-
}
426-
switch (sortOrder.charAt(0)) {
427-
case 's':
428-
return s;
429-
case 'o':
430-
return o;
431-
case 'c':
432-
return c;
433-
case 'p':
434-
return p;
435-
default:
436-
return -1;
437-
}
438-
}
439-
440418
/**
441419
* Tests whether this file's statistics allow it to contain a quad matching the given pattern. Bound components
442420
* (>= 0) are checked against the file's min/max range; unbound components (< 0) are wildcards. Also checks the
@@ -458,7 +436,7 @@ public boolean mayContain(long s, long p, long o, long c) {
458436
// Check bloom filter for the leading component
459437
BloomFilter bf = getBloomFilter();
460438
if (bf != null) {
461-
long leadingVal = getLeadingFilterValue(s, p, o, c);
439+
long leadingVal = BloomFilter.leadingComponent(s, p, o, c, sortOrder);
462440
if (leadingVal >= 0 && !bf.mightContain(leadingVal)) {
463441
return false;
464442
}

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/Compactor.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,7 @@ public CompactionResult compact(List<Catalog.ParquetFileInfo> sourceFiles,
112112
cache.writeThrough(s3Key, parquetData);
113113
}
114114

115-
// Build bloom filter for the leading component
116-
BloomFilter bloom = buildBloomFilter(merged, suffix);
115+
BloomFilter bloom = BloomFilter.buildForEntries(merged, suffix);
117116

118117
QuadStats stats = QuadStats.fromEntries(merged);
119118
newFiles.add(new Catalog.ParquetFileInfo(s3Key, targetLevel, suffix, merged.size(),
@@ -159,32 +158,6 @@ private List<QuadEntry> mergeEntries(List<RawEntrySource> sources, QuadIndex qua
159158
return new ArrayList<>(deduped.values());
160159
}
161160

162-
private static BloomFilter buildBloomFilter(List<QuadEntry> entries, String sortSuffix) {
163-
BloomFilter bloom = new BloomFilter(Math.max(1, entries.size()), 0.01);
164-
for (QuadEntry entry : entries) {
165-
long val;
166-
switch (sortSuffix.charAt(0)) {
167-
case 's':
168-
val = entry.subject;
169-
break;
170-
case 'o':
171-
val = entry.object;
172-
break;
173-
case 'c':
174-
val = entry.context;
175-
break;
176-
case 'p':
177-
val = entry.predicate;
178-
break;
179-
default:
180-
val = entry.subject;
181-
break;
182-
}
183-
bloom.add(val);
184-
}
185-
return bloom;
186-
}
187-
188161
private static class CompactKey implements Comparable<CompactKey> {
189162
final byte[] key;
190163

0 commit comments

Comments
 (0)