Skip to content

Commit 4da2241

Browse files
committed
refactor: simplify and optimize S3 SAIL internals
- Eliminate double serialization of values/namespaces on flush - Make MemTable.approximateSizeInBytes() O(1) via AtomicLong counter - Precompute field indices in sort comparator to avoid hot-loop switch - Remove duplicate rowGroupSize/pageSize fields from S3SailStore/Compactor - Centralize storage key literals into named constants - Add named type discriminator constants in S3ValueStore - Unify QuadStats accumulation with shared Accumulator inner class - Centralize data key generation in Catalog.dataKey() - Restrict ParquetFileInfo 14-param constructor to package-private
1 parent fde0b71 commit 4da2241

7 files changed

Lines changed: 109 additions & 92 deletions

File tree

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/S3NamespaceStore.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import java.io.IOException;
1414
import java.io.UncheckedIOException;
15+
import java.util.ArrayList;
1516
import java.util.Iterator;
1617
import java.util.LinkedHashMap;
1718
import java.util.List;
@@ -27,6 +28,8 @@
2728
*/
2829
class S3NamespaceStore implements Iterable<SimpleNamespace> {
2930

31+
static final String NAMESPACES_KEY = "namespaces/current";
32+
3033
private final Map<String, SimpleNamespace> namespacesMap = new LinkedHashMap<>(16);
3134

3235
public synchronized String getNamespace(String prefix) {
@@ -61,7 +64,7 @@ public synchronized void clear() {
6164

6265
@SuppressWarnings("unchecked")
6366
synchronized void deserialize(ObjectStore objectStore, ObjectMapper mapper) {
64-
byte[] data = objectStore.get("namespaces/current");
67+
byte[] data = objectStore.get(NAMESPACES_KEY);
6568
if (data == null) {
6669
return;
6770
}
@@ -79,14 +82,14 @@ synchronized void deserialize(ObjectStore objectStore, ObjectMapper mapper) {
7982

8083
synchronized void serialize(ObjectStore objectStore, ObjectMapper mapper) {
8184
try {
82-
List<Map<String, String>> entries = new java.util.ArrayList<>();
85+
List<Map<String, String>> entries = new ArrayList<>();
8386
for (SimpleNamespace ns : namespacesMap.values()) {
8487
Map<String, String> entry = new LinkedHashMap<>();
8588
entry.put("prefix", ns.getPrefix());
8689
entry.put("name", ns.getName());
8790
entries.add(entry);
8891
}
89-
objectStore.put("namespaces/current", mapper.writeValueAsBytes(entries));
92+
objectStore.put(NAMESPACES_KEY, mapper.writeValueAsBytes(entries));
9093
} catch (IOException e) {
9194
throw new UncheckedIOException("Failed to serialize namespaces", e);
9295
}

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/S3SailStore.java

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,6 @@ class S3SailStore implements SailStore {
9595
ALL_INDEXES = List.copyOf(indexes);
9696
}
9797

98-
private static final int DEFAULT_ROW_GROUP_SIZE = 8 * 1024 * 1024; // 8 MiB
99-
private static final int DEFAULT_PAGE_SIZE = 64 * 1024; // 64 KiB
100-
10198
private final S3ValueStore valueStore;
10299
private final S3NamespaceStore namespaceStore;
103100

@@ -114,8 +111,6 @@ class S3SailStore implements SailStore {
114111
private final TieredCache cache;
115112
private final CompactionPolicy compactionPolicy;
116113
private final Compactor compactor;
117-
private final int rowGroupSize;
118-
private final int pageSize;
119114

120115
/**
121116
* A lock to control concurrent access by {@link S3SailSink} to the stores.
@@ -147,8 +142,6 @@ private static ObjectStore createObjectStore(S3StoreConfig config) {
147142
this.namespaceStore = new S3NamespaceStore();
148143
this.objectStore = objectStore;
149144
this.memTableFlushSize = config.getMemTableSize();
150-
this.rowGroupSize = DEFAULT_ROW_GROUP_SIZE;
151-
this.pageSize = DEFAULT_PAGE_SIZE;
152145

153146
// Single SPOC index for the MemTable
154147
this.memTable = new MemTable(SPOC_INDEX);
@@ -165,7 +158,7 @@ private static ObjectStore createObjectStore(S3StoreConfig config) {
165158
config.getDiskCacheSize(), objectStore);
166159

167160
this.compactionPolicy = new CompactionPolicy();
168-
this.compactor = new Compactor(objectStore, cache, rowGroupSize, pageSize);
161+
this.compactor = new Compactor(objectStore, cache);
169162

170163
// Deserialize value store and namespaces
171164
if (catalog.getNextValueId() > 0) {
@@ -227,12 +220,11 @@ private void flushToObjectStore() {
227220
return;
228221
}
229222

230-
// Always persist namespaces and values (they may have changed without any quad writes)
231-
valueStore.serialize(objectStore);
232-
namespaceStore.serialize(objectStore, jsonMapper);
233-
234223
if (memTable.size() == 0) {
235-
return; // no quads to flush — avoid wasting epoch numbers and S3 writes
224+
// No quads to flush — still persist namespaces/values (they may have changed)
225+
valueStore.serialize(objectStore);
226+
namespaceStore.serialize(objectStore, jsonMapper);
227+
return;
236228
}
237229

238230
long epoch = epochCounter.getAndIncrement();
@@ -252,16 +244,14 @@ private void flushToObjectStore() {
252244

253245
private static List<long[]> collectQuads(MemTable frozen) {
254246
List<long[]> allQuads = new ArrayList<>(frozen.size());
255-
long[] quad = new long[4];
247+
long[] scratch = new long[4];
256248
for (Map.Entry<byte[], byte[]> entry : frozen.getData().entrySet()) {
257-
long[] q = new long[5]; // s, p, o, c, flag
258-
frozen.getIndex().keyToQuad(entry.getKey(), quad);
259-
q[0] = quad[QuadIndex.SUBJ_IDX];
260-
q[1] = quad[QuadIndex.PRED_IDX];
261-
q[2] = quad[QuadIndex.OBJ_IDX];
262-
q[3] = quad[QuadIndex.CONTEXT_IDX];
263-
q[4] = entry.getValue()[0];
264-
allQuads.add(q);
249+
frozen.getIndex().keyToQuad(entry.getKey(), scratch);
250+
allQuads.add(new long[] {
251+
scratch[QuadIndex.SUBJ_IDX], scratch[QuadIndex.PRED_IDX],
252+
scratch[QuadIndex.OBJ_IDX], scratch[QuadIndex.CONTEXT_IDX],
253+
entry.getValue()[0]
254+
});
265255
}
266256
return allQuads;
267257
}
@@ -272,10 +262,9 @@ private void writeParquetFiles(long epoch, List<long[]> allQuads, QuadStats stat
272262
List<QuadEntry> sorted = sortQuadEntries(allQuads, sortIndex);
273263

274264
ParquetSchemas.SortOrder sortOrder = ParquetSchemas.SortOrder.fromSuffix(sortSuffix);
275-
byte[] parquetData = ParquetFileBuilder.build(sorted, ParquetSchemas.QUAD_SCHEMA,
276-
sortOrder, rowGroupSize, pageSize);
265+
byte[] parquetData = ParquetFileBuilder.build(sorted, sortOrder);
277266

278-
String s3Key = "data/L0-" + String.format("%05d", epoch) + "-" + sortSuffix + ".parquet";
267+
String s3Key = Catalog.dataKey(0, epoch, sortSuffix);
279268
objectStore.put(s3Key, parquetData);
280269

281270
if (cache != null) {
@@ -304,15 +293,24 @@ private void persistMetadata(long epoch) {
304293
private static List<QuadEntry> sortQuadEntries(List<long[]> quads, QuadIndex sortIndex) {
305294
List<long[]> sorted = new ArrayList<>(quads);
306295
String seq = sortIndex.getFieldSeqString();
296+
int i0 = QuadIndex.fieldCharToIdx(seq.charAt(0));
297+
int i1 = QuadIndex.fieldCharToIdx(seq.charAt(1));
298+
int i2 = QuadIndex.fieldCharToIdx(seq.charAt(2));
299+
int i3 = QuadIndex.fieldCharToIdx(seq.charAt(3));
307300
sorted.sort((a, b) -> {
308-
for (int i = 0; i < 4; i++) {
309-
int idx = QuadIndex.fieldCharToIdx(seq.charAt(i));
310-
int cmp = Long.compare(a[idx], b[idx]);
311-
if (cmp != 0) {
312-
return cmp;
313-
}
301+
int cmp = Long.compare(a[i0], b[i0]);
302+
if (cmp != 0) {
303+
return cmp;
304+
}
305+
cmp = Long.compare(a[i1], b[i1]);
306+
if (cmp != 0) {
307+
return cmp;
308+
}
309+
cmp = Long.compare(a[i2], b[i2]);
310+
if (cmp != 0) {
311+
return cmp;
314312
}
315-
return 0;
313+
return Long.compare(a[i3], b[i3]);
316314
});
317315

318316
List<QuadEntry> result = new ArrayList<>(sorted.size());

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/S3ValueStore.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@
3535
class S3ValueStore extends AbstractValueFactory {
3636

3737
static final long UNKNOWN_ID = -1;
38+
static final String VALUES_KEY = "values/current";
39+
40+
private static final byte TYPE_IRI = 0;
41+
private static final byte TYPE_LITERAL = 1;
42+
private static final byte TYPE_BNODE = 2;
3843

3944
private final ConcurrentHashMap<Value, Long> valueToId = new ConcurrentHashMap<>();
4045
private final ConcurrentHashMap<Long, Value> idToValue = new ConcurrentHashMap<>();
@@ -115,24 +120,24 @@ void serialize(ObjectStore objectStore) {
115120
Value val = entry.getValue();
116121

117122
if (val instanceof IRI) {
118-
out.writeByte(0);
123+
out.writeByte(TYPE_IRI);
119124
writeBytes(out, buf, val.stringValue().getBytes(StandardCharsets.UTF_8));
120125
} else if (val instanceof Literal) {
121-
out.writeByte(1);
126+
out.writeByte(TYPE_LITERAL);
122127
Literal lit = (Literal) val;
123128
writeBytes(out, buf, lit.getLabel().getBytes(StandardCharsets.UTF_8));
124129
writeBytes(out, buf, lit.getDatatype().stringValue().getBytes(StandardCharsets.UTF_8));
125130
writeBytes(out, buf, lit.getLanguage().orElse("").getBytes(StandardCharsets.UTF_8));
126131
} else if (val instanceof BNode) {
127-
out.writeByte(2);
132+
out.writeByte(TYPE_BNODE);
128133
writeBytes(out, buf, ((BNode) val).getID().getBytes(StandardCharsets.UTF_8));
129134
} else {
130135
throw new IllegalStateException("Unsupported value type: " + val.getClass());
131136
}
132137
}
133138

134139
out.flush();
135-
objectStore.put("values/current", baos.toByteArray());
140+
objectStore.put(VALUES_KEY, baos.toByteArray());
136141
} catch (IOException e) {
137142
throw new UncheckedIOException(e);
138143
}
@@ -142,7 +147,7 @@ void serialize(ObjectStore objectStore) {
142147
* Deserializes the value store from the object store.
143148
*/
144149
void deserialize(ObjectStore objectStore, long nextValueId) {
145-
byte[] data = objectStore.get("values/current");
150+
byte[] data = objectStore.get(VALUES_KEY);
146151
if (data == null) {
147152
return;
148153
}
@@ -157,14 +162,14 @@ void deserialize(ObjectStore objectStore, long nextValueId) {
157162

158163
Value val;
159164
switch (type) {
160-
case 0: { // IRI
165+
case TYPE_IRI: {
161166
int len = (int) Varint.readUnsigned(bb);
162167
byte[] payload = new byte[len];
163168
bb.get(payload);
164169
val = createIRI(new String(payload, StandardCharsets.UTF_8));
165170
break;
166171
}
167-
case 1: { // Literal
172+
case TYPE_LITERAL: {
168173
int labelLen = (int) Varint.readUnsigned(bb);
169174
byte[] labelBytes = new byte[labelLen];
170175
bb.get(labelBytes);
@@ -188,7 +193,7 @@ void deserialize(ObjectStore objectStore, long nextValueId) {
188193
}
189194
break;
190195
}
191-
case 2: { // BNode
196+
case TYPE_BNODE: {
192197
int len = (int) Varint.readUnsigned(bb);
193198
byte[] payload = new byte[len];
194199
bb.get(payload);

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/Catalog.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@
4949
@JsonIgnoreProperties(ignoreUnknown = true)
5050
public class Catalog {
5151

52+
static final String CATALOG_POINTER_KEY = "catalog/current";
53+
private static final String CATALOG_DIR = "catalog/";
54+
5255
@JsonProperty("version")
5356
private int version = 3;
5457

@@ -108,11 +111,11 @@ public void setFiles(List<ParquetFileInfo> files) {
108111
* @return the loaded catalog, or an empty catalog if none exists
109112
*/
110113
public static Catalog load(ObjectStore store, ObjectMapper mapper) {
111-
byte[] pointer = store.get("catalog/current");
114+
byte[] pointer = store.get(CATALOG_POINTER_KEY);
112115
if (pointer == null) {
113116
return new Catalog();
114117
}
115-
String catalogKey = "catalog/" + new String(pointer, StandardCharsets.UTF_8).trim();
118+
String catalogKey = CATALOG_DIR + new String(pointer, StandardCharsets.UTF_8).trim();
116119
byte[] json = store.get(catalogKey);
117120
if (json == null) {
118121
return new Catalog();
@@ -140,8 +143,8 @@ public void save(ObjectStore store, ObjectMapper mapper, long epoch) {
140143
try {
141144
String versionedKey = "v" + epoch + ".json";
142145
byte[] json = mapper.writerWithDefaultPrettyPrinter().writeValueAsBytes(this);
143-
store.put("catalog/" + versionedKey, json);
144-
store.put("catalog/current", versionedKey.getBytes(StandardCharsets.UTF_8));
146+
store.put(CATALOG_DIR + versionedKey, json);
147+
store.put(CATALOG_POINTER_KEY, versionedKey.getBytes(StandardCharsets.UTF_8));
145148
} catch (IOException e) {
146149
throw new UncheckedIOException("Failed to save catalog", e);
147150
}
@@ -187,6 +190,13 @@ public List<ParquetFileInfo> getFilesForSortOrder(String sortOrder) {
187190
return result;
188191
}
189192

193+
/**
194+
* Generates the S3 key for a data file at the given level, epoch, and sort suffix.
195+
*/
196+
public static String dataKey(int level, long epoch, String sortSuffix) {
197+
return "data/L" + level + "-" + String.format("%05d", epoch) + "-" + sortSuffix + ".parquet";
198+
}
199+
190200
/**
191201
* Metadata about a single Parquet file in the catalog, including its location, sort order, size, and min/max
192202
* statistics for subject, predicate, object, and context columns.
@@ -246,7 +256,7 @@ public ParquetFileInfo(String s3Key, int level, String sortOrder, long rowCount,
246256
stats.minObject, stats.maxObject, stats.minContext, stats.maxContext);
247257
}
248258

249-
public ParquetFileInfo(String s3Key, int level, String sortOrder, long rowCount,
259+
ParquetFileInfo(String s3Key, int level, String sortOrder, long rowCount,
250260
long epoch, long sizeBytes,
251261
long minSubject, long maxSubject,
252262
long minPredicate, long maxPredicate,

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/Compactor.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,10 @@ public class Compactor {
3838

3939
private final ObjectStore objectStore;
4040
private final TieredCache cache;
41-
private final int rowGroupSize;
42-
private final int pageSize;
4341

44-
public Compactor(ObjectStore objectStore, TieredCache cache, int rowGroupSize, int pageSize) {
42+
public Compactor(ObjectStore objectStore, TieredCache cache) {
4543
this.objectStore = objectStore;
4644
this.cache = cache;
47-
this.rowGroupSize = rowGroupSize;
48-
this.pageSize = pageSize;
4945
}
5046

5147
/**
@@ -107,11 +103,9 @@ public CompactionResult compact(List<Catalog.ParquetFileInfo> sourceFiles,
107103
}
108104

109105
// Write merged Parquet file
110-
String s3Key = "data/L" + targetLevel + "-"
111-
+ String.format("%05d", epoch) + "-" + suffix + ".parquet";
106+
String s3Key = Catalog.dataKey(targetLevel, epoch, suffix);
112107

113-
byte[] parquetData = ParquetFileBuilder.build(merged, ParquetSchemas.QUAD_SCHEMA,
114-
sortOrder, rowGroupSize, pageSize);
108+
byte[] parquetData = ParquetFileBuilder.build(merged, sortOrder);
115109

116110
objectStore.put(s3Key, parquetData);
117111
if (cache != null) {

0 commit comments

Comments
 (0)