Skip to content

Commit 80fd7f7

Browse files
committed
feat: parallel writes, streaming reads, bloom filters, and cardinality estimates
- Parallelize Parquet writes with CompletableFuture (3 files written concurrently) - Optimize getContextIDs() to use CSPO index with last-seen dedup - Move compaction to background single-thread executor - Implement catalog-based cardinality estimation in S3EvaluationStatistics - Refactor ParquetQuadSource to stream rows lazily instead of loading all into memory - Add row group filtering using Parquet column statistics (min/max) - Add BloomFilter class for leading-component filtering per Parquet file - Add close() to RawEntrySource; MergeIterator closes sources when exhausted
1 parent 4da2241 commit 80fd7f7

10 files changed

Lines changed: 727 additions & 340 deletions

File tree

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/S3EvaluationStatistics.java

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,108 @@
1010
*******************************************************************************/
1111
package org.eclipse.rdf4j.sail.s3;
1212

13+
import java.util.List;
14+
15+
import org.eclipse.rdf4j.model.IRI;
16+
import org.eclipse.rdf4j.model.Resource;
17+
import org.eclipse.rdf4j.model.Value;
18+
import org.eclipse.rdf4j.query.algebra.StatementPattern;
19+
import org.eclipse.rdf4j.query.algebra.Var;
1320
import org.eclipse.rdf4j.query.algebra.evaluation.impl.EvaluationStatistics;
21+
import org.eclipse.rdf4j.sail.s3.storage.Catalog;
1422

1523
/**
16-
* Evaluation statistics for the S3 sail. Currently uses the base class's default cardinality estimation. This can be
17-
* enhanced later to query the actual storage for more accurate estimates.
24+
* Evaluation statistics for the S3 SAIL. Uses catalog-level file statistics (row counts and min/max ranges) to estimate
25+
* statement pattern cardinality.
1826
*/
1927
class S3EvaluationStatistics extends EvaluationStatistics {
2028

29+
private final S3ValueStore valueStore;
30+
private final Catalog catalog;
31+
32+
S3EvaluationStatistics(S3ValueStore valueStore, Catalog catalog) {
33+
this.valueStore = valueStore;
34+
this.catalog = catalog;
35+
}
36+
2137
@Override
2238
protected CardinalityCalculator createCardinalityCalculator() {
2339
return new S3CardinalityCalculator();
2440
}
2541

2642
protected class S3CardinalityCalculator extends CardinalityCalculator {
27-
// Uses the default cardinality estimation from the base class.
28-
// Can be enhanced to consult S3ValueStore and storage for accurate estimates.
43+
44+
@Override
45+
protected double getCardinality(StatementPattern sp) {
46+
Value subj = getConstantValue(sp.getSubjectVar());
47+
if (subj != null && !(subj instanceof Resource)) {
48+
subj = null;
49+
}
50+
Value pred = getConstantValue(sp.getPredicateVar());
51+
if (pred != null && !(pred instanceof IRI)) {
52+
pred = null;
53+
}
54+
Value obj = getConstantValue(sp.getObjectVar());
55+
Value context = getConstantValue(sp.getContextVar());
56+
if (context != null && !(context instanceof Resource)) {
57+
context = null;
58+
}
59+
return estimateCardinality((Resource) subj, (IRI) pred, obj, (Resource) context);
60+
}
61+
62+
private Value getConstantValue(Var var) {
63+
return (var != null) ? var.getValue() : null;
64+
}
65+
}
66+
67+
private double estimateCardinality(Resource subj, IRI pred, Value obj, Resource context) {
68+
long subjID = S3ValueStore.UNKNOWN_ID;
69+
if (subj != null) {
70+
subjID = valueStore.getId(subj);
71+
if (subjID == S3ValueStore.UNKNOWN_ID) {
72+
return 0;
73+
}
74+
}
75+
76+
long predID = S3ValueStore.UNKNOWN_ID;
77+
if (pred != null) {
78+
predID = valueStore.getId(pred);
79+
if (predID == S3ValueStore.UNKNOWN_ID) {
80+
return 0;
81+
}
82+
}
83+
84+
long objID = S3ValueStore.UNKNOWN_ID;
85+
if (obj != null) {
86+
objID = valueStore.getId(obj);
87+
if (objID == S3ValueStore.UNKNOWN_ID) {
88+
return 0;
89+
}
90+
}
91+
92+
long contextID = S3ValueStore.UNKNOWN_ID;
93+
if (context != null) {
94+
contextID = valueStore.getId(context);
95+
if (contextID == S3ValueStore.UNKNOWN_ID) {
96+
return 0;
97+
}
98+
}
99+
100+
if (catalog == null) {
101+
return 1000;
102+
}
103+
104+
// Sum row counts from files whose stats allow matching the pattern,
105+
// then divide by number of sort orders since each triple is stored 3 times
106+
List<Catalog.ParquetFileInfo> files = catalog.getFiles();
107+
long totalMatchingRows = 0;
108+
for (Catalog.ParquetFileInfo file : files) {
109+
if (file.mayContain(subjID, predID, objID, contextID)) {
110+
totalMatchingRows += file.getRowCount();
111+
}
112+
}
113+
114+
int numSortOrders = 3;
115+
return (double) totalMatchingRows / numSortOrders;
29116
}
30117
}

0 commit comments

Comments
 (0)