unpartitionedFiles) {
+ this.unpartitionedFiles = unpartitionedFiles;
+ }
+
+ /**
+ * Loads the catalog from the object store.
+ *
+ *
+ * Reads the {@code catalog/current} pointer to find the active catalog version, then parses the corresponding JSON
+ * file. Returns an empty catalog if no pointer or catalog file exists.
+ *
+ * @param store the object store to read from
+ * @param mapper the Jackson ObjectMapper for JSON parsing
+ * @return the loaded catalog, or an empty catalog if none exists
+ */
+ public static Catalog load(ObjectStore store, ObjectMapper mapper) {
+ byte[] pointer = store.get("catalog/current");
+ if (pointer == null) {
+ return new Catalog();
+ }
+ String catalogKey = "catalog/" + new String(pointer, StandardCharsets.UTF_8).trim();
+ byte[] json = store.get(catalogKey);
+ if (json == null) {
+ return new Catalog();
+ }
+ try {
+ return mapper.readValue(json, Catalog.class);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to parse catalog", e);
+ }
+ }
+
+ /**
+ * Saves this catalog to the object store.
+ *
+ *
+ * Writes the catalog JSON to {@code catalog/v{epoch}.json} and updates the {@code catalog/current} pointer. The
+ * epoch field is set to the given value before saving.
+ *
+ * @param store the object store to write to
+ * @param mapper the Jackson ObjectMapper for JSON serialization
+ * @param epoch the epoch number for this catalog version
+ */
+ public void save(ObjectStore store, ObjectMapper mapper, long epoch) {
+ this.epoch = epoch;
+ try {
+ String versionedKey = "v" + epoch + ".json";
+ byte[] json = mapper.writerWithDefaultPrettyPrinter().writeValueAsBytes(this);
+ store.put("catalog/" + versionedKey, json);
+ store.put("catalog/current", versionedKey.getBytes(StandardCharsets.UTF_8));
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to save catalog", e);
+ }
+ }
+
+ /**
+ * Returns the set of predicate IDs that have partitioned files.
+ *
+ * @return set of predicate IDs parsed from the partition keys
+ */
+ public Set getPredicateIds() {
+ return predicatePartitions.keySet()
+ .stream()
+ .map(Long::parseLong)
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Returns the list of Parquet files for the given predicate ID.
+ *
+ * @param predicateId the predicate value ID
+ * @return the list of file info entries, or an empty list if no files exist for this predicate
+ */
+ public List getFilesForPredicate(long predicateId) {
+ return predicatePartitions.getOrDefault(String.valueOf(predicateId), Collections.emptyList());
+ }
+
+ /**
+ * Adds a Parquet file to the partition for the given predicate.
+ *
+ * @param predicateId the predicate value ID
+ * @param info the file info to add
+ */
+ public void addFile(long predicateId, ParquetFileInfo info) {
+ predicatePartitions.computeIfAbsent(String.valueOf(predicateId), k -> new ArrayList<>()).add(info);
+ }
+
+ /**
+ * Removes Parquet files from the partition for the given predicate by their S3 keys.
+ *
+ * @param predicateId the predicate value ID
+ * @param s3Keys the set of S3 keys to remove
+ */
+ public void removeFiles(long predicateId, Set s3Keys) {
+ List files = predicatePartitions.get(String.valueOf(predicateId));
+ if (files != null) {
+ files.removeIf(f -> s3Keys.contains(f.getS3Key()));
+ }
+ }
+
+ /**
+ * Metadata about a single Parquet file in the catalog, including its location, sort order, size, and min/max
+ * statistics for subject, object, and context columns.
+ */
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class ParquetFileInfo {
+
+ @JsonProperty("s3Key")
+ private String s3Key;
+
+ @JsonProperty("level")
+ private int level;
+
+ @JsonProperty("sortOrder")
+ private String sortOrder;
+
+ @JsonProperty("rowCount")
+ private long rowCount;
+
+ @JsonProperty("epoch")
+ private long epoch;
+
+ @JsonProperty("sizeBytes")
+ private long sizeBytes;
+
+ @JsonProperty("minSubject")
+ private long minSubject;
+
+ @JsonProperty("maxSubject")
+ private long maxSubject;
+
+ @JsonProperty("minObject")
+ private long minObject;
+
+ @JsonProperty("maxObject")
+ private long maxObject;
+
+ @JsonProperty("minContext")
+ private long minContext;
+
+ @JsonProperty("maxContext")
+ private long maxContext;
+
+ public ParquetFileInfo() {
+ }
+
+ public ParquetFileInfo(String s3Key, int level, String sortOrder, long rowCount,
+ long epoch, long sizeBytes,
+ long minSubject, long maxSubject,
+ long minObject, long maxObject,
+ long minContext, long maxContext) {
+ this.s3Key = s3Key;
+ this.level = level;
+ this.sortOrder = sortOrder;
+ this.rowCount = rowCount;
+ this.epoch = epoch;
+ this.sizeBytes = sizeBytes;
+ this.minSubject = minSubject;
+ this.maxSubject = maxSubject;
+ this.minObject = minObject;
+ this.maxObject = maxObject;
+ this.minContext = minContext;
+ this.maxContext = maxContext;
+ }
+
+ public String getS3Key() {
+ return s3Key;
+ }
+
+ public void setS3Key(String s3Key) {
+ this.s3Key = s3Key;
+ }
+
+ public int getLevel() {
+ return level;
+ }
+
+ public void setLevel(int level) {
+ this.level = level;
+ }
+
+ public String getSortOrder() {
+ return sortOrder;
+ }
+
+ public void setSortOrder(String sortOrder) {
+ this.sortOrder = sortOrder;
+ }
+
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ public void setRowCount(long rowCount) {
+ this.rowCount = rowCount;
+ }
+
+ public long getEpoch() {
+ return epoch;
+ }
+
+ public void setEpoch(long epoch) {
+ this.epoch = epoch;
+ }
+
+ public long getSizeBytes() {
+ return sizeBytes;
+ }
+
+ public void setSizeBytes(long sizeBytes) {
+ this.sizeBytes = sizeBytes;
+ }
+
+ public long getMinSubject() {
+ return minSubject;
+ }
+
+ public void setMinSubject(long minSubject) {
+ this.minSubject = minSubject;
+ }
+
+ public long getMaxSubject() {
+ return maxSubject;
+ }
+
+ public void setMaxSubject(long maxSubject) {
+ this.maxSubject = maxSubject;
+ }
+
+ public long getMinObject() {
+ return minObject;
+ }
+
+ public void setMinObject(long minObject) {
+ this.minObject = minObject;
+ }
+
+ public long getMaxObject() {
+ return maxObject;
+ }
+
+ public void setMaxObject(long maxObject) {
+ this.maxObject = maxObject;
+ }
+
+ public long getMinContext() {
+ return minContext;
+ }
+
+ public void setMinContext(long minContext) {
+ this.minContext = minContext;
+ }
+
+ public long getMaxContext() {
+ return maxContext;
+ }
+
+ public void setMaxContext(long maxContext) {
+ this.maxContext = maxContext;
+ }
+ }
+}
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/CompactionPolicy.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/CompactionPolicy.java
new file mode 100644
index 00000000000..bd749b3904b
--- /dev/null
+++ b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/CompactionPolicy.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Copyright (c) 2025 Eclipse RDF4J contributors.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Distribution License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *******************************************************************************/
+package org.eclipse.rdf4j.sail.s3.storage;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Determines when compaction should be triggered for a predicate partition. Counts distinct epochs at each level and
+ * compares against configurable thresholds.
+ */
+public class CompactionPolicy {
+
+ /** Default number of L0 epochs before triggering L0→L1 compaction. */
+ public static final int DEFAULT_L0_THRESHOLD = 8;
+
+ /** Default number of L1 epochs before triggering L1→L2 compaction. */
+ public static final int DEFAULT_L1_THRESHOLD = 4;
+
+ private final int l0Threshold;
+ private final int l1Threshold;
+
+ public CompactionPolicy() {
+ this(DEFAULT_L0_THRESHOLD, DEFAULT_L1_THRESHOLD);
+ }
+
+ public CompactionPolicy(int l0Threshold, int l1Threshold) {
+ this.l0Threshold = l0Threshold;
+ this.l1Threshold = l1Threshold;
+ }
+
+ /**
+ * Checks if L0→L1 compaction should run for the given predicate partition files.
+ *
+ * @param files all files in the predicate partition
+ * @return true if the number of distinct L0 epochs >= l0Threshold
+ */
+ public boolean shouldCompactL0(List files) {
+ return countEpochsAtLevel(files, 0) >= l0Threshold;
+ }
+
+ /**
+ * Checks if L1→L2 compaction should run for the given predicate partition files.
+ *
+ * @param files all files in the predicate partition
+ * @return true if the number of distinct L1 epochs >= l1Threshold
+ */
+ public boolean shouldCompactL1(List files) {
+ return countEpochsAtLevel(files, 1) >= l1Threshold;
+ }
+
+ private static int countEpochsAtLevel(List files, int level) {
+ Set epochs = new HashSet<>();
+ for (Catalog.ParquetFileInfo f : files) {
+ if (f.getLevel() == level) {
+ epochs.add(f.getEpoch());
+ }
+ }
+ return epochs.size();
+ }
+
+ /**
+ * Returns the files at the given level for a predicate partition.
+ *
+ * @param files all files in the partition
+ * @param level the target level (0, 1, or 2)
+ * @return files at that level
+ */
+ public static List filesAtLevel(List files, int level) {
+ return files.stream().filter(f -> f.getLevel() == level).toList();
+ }
+}
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/Compactor.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/Compactor.java
new file mode 100644
index 00000000000..0e83f4fbb29
--- /dev/null
+++ b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/Compactor.java
@@ -0,0 +1,256 @@
+/*******************************************************************************
+ * Copyright (c) 2025 Eclipse RDF4J contributors.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Distribution License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *******************************************************************************/
+package org.eclipse.rdf4j.sail.s3.storage;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.eclipse.rdf4j.sail.s3.cache.TieredCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Performs merge compaction on Parquet files within a predicate partition. Merges files at a source level into one set
+ * of files at the target level, per sort order.
+ *
+ *
+ * - L0→L1: merge all L0 files per sort order, tombstones preserved
+ * - L1→L2: merge all L1 files per sort order, tombstones suppressed (L2 = highest level)
+ *
+ */
+public class Compactor {
+
+ private static final Logger logger = LoggerFactory.getLogger(Compactor.class);
+ private static final String[] SORT_ORDERS = { "soc", "osc", "cso" };
+
+ private final ObjectStore objectStore;
+ private final TieredCache cache;
+ private final int rowGroupSize;
+ private final int pageSize;
+
+ public Compactor(ObjectStore objectStore, TieredCache cache, int rowGroupSize, int pageSize) {
+ this.objectStore = objectStore;
+ this.cache = cache;
+ this.rowGroupSize = rowGroupSize;
+ this.pageSize = pageSize;
+ }
+
+ /**
+ * Compacts files at the source level into a single set of files at the target level.
+ *
+ * @param predicateId the predicate partition being compacted
+ * @param sourceFiles all files at the source level in this partition
+ * @param sourceLevel the source level (0 or 1)
+ * @param targetLevel the target level (1 or 2)
+ * @param epoch the epoch for the new compacted files
+ * @param catalog the catalog to update
+ * @return result containing new files created and old files removed
+ */
+ public CompactionResult compact(long predicateId, List sourceFiles,
+ int sourceLevel, int targetLevel, long epoch, Catalog catalog) {
+
+ boolean suppressTombstones = (targetLevel == 2);
+ List newFiles = new ArrayList<>();
+ Set oldKeys = new HashSet<>();
+
+ for (String sortOrder : SORT_ORDERS) {
+ // Collect source files for this sort order, ordered newest-first (highest epoch first)
+ List sortOrderFiles = sourceFiles.stream()
+ .filter(f -> sortOrder.equals(f.getSortOrder()))
+ .sorted(Comparator.comparingLong(Catalog.ParquetFileInfo::getEpoch).reversed())
+ .toList();
+
+ if (sortOrderFiles.isEmpty()) {
+ continue;
+ }
+
+ // Collect old keys for cleanup
+ for (Catalog.ParquetFileInfo f : sortOrderFiles) {
+ oldKeys.add(f.getS3Key());
+ }
+
+ // Build merge sources from Parquet files (newest first)
+ List sources = new ArrayList<>();
+ for (Catalog.ParquetFileInfo fileInfo : sortOrderFiles) {
+ byte[] fileData = cache != null ? cache.get(fileInfo.getS3Key()) : objectStore.get(fileInfo.getS3Key());
+ if (fileData == null) {
+ logger.warn("Missing Parquet file during compaction: {}", fileInfo.getS3Key());
+ continue;
+ }
+ sources.add(new ParquetQuadSource(fileData, sortOrder));
+ }
+
+ if (sources.isEmpty()) {
+ continue;
+ }
+
+ // Merge and collect entries
+ List merged = mergeEntries(sources, suppressTombstones);
+
+ if (merged.isEmpty()) {
+ continue;
+ }
+
+ // Convert to ParquetFileBuilder.QuadEntry
+ List parquetEntries = new ArrayList<>();
+ for (MemTable.QuadEntry e : merged) {
+ parquetEntries.add(new ParquetFileBuilder.QuadEntry(e.subject, e.object, e.context, e.flag));
+ }
+
+ // Write merged Parquet file
+ ParquetSchemas.SortOrder parsedSortOrder = ParquetSchemas.SortOrder.valueOf(sortOrder.toUpperCase());
+ String s3Key = "data/predicates/" + predicateId + "/L" + targetLevel + "-"
+ + String.format("%05d", epoch) + "-" + sortOrder + ".parquet";
+
+ byte[] parquetData = ParquetFileBuilder.build(parquetEntries, ParquetSchemas.PARTITIONED_SCHEMA,
+ parsedSortOrder, predicateId, rowGroupSize, pageSize);
+
+ objectStore.put(s3Key, parquetData);
+ if (cache != null) {
+ cache.writeThrough(s3Key, parquetData);
+ }
+
+ // Compute stats from sorted entries
+ long minSubject = Long.MAX_VALUE, maxSubject = Long.MIN_VALUE;
+ long minObject = Long.MAX_VALUE, maxObject = Long.MIN_VALUE;
+ long minContext = Long.MAX_VALUE, maxContext = Long.MIN_VALUE;
+ for (MemTable.QuadEntry e : merged) {
+ minSubject = Math.min(minSubject, e.subject);
+ maxSubject = Math.max(maxSubject, e.subject);
+ minObject = Math.min(minObject, e.object);
+ maxObject = Math.max(maxObject, e.object);
+ minContext = Math.min(minContext, e.context);
+ maxContext = Math.max(maxContext, e.context);
+ }
+
+ newFiles.add(new Catalog.ParquetFileInfo(s3Key, targetLevel, sortOrder, merged.size(),
+ epoch, parquetData.length,
+ minSubject, maxSubject, minObject, maxObject, minContext, maxContext));
+ }
+
+ // Update catalog: remove old files, add new ones
+ catalog.removeFiles(predicateId, oldKeys);
+ for (Catalog.ParquetFileInfo newFile : newFiles) {
+ catalog.addFile(predicateId, newFile);
+ }
+
+ // Delete old S3 files and invalidate cache
+ for (String key : oldKeys) {
+ objectStore.delete(key);
+ if (cache != null) {
+ cache.invalidate(key);
+ }
+ }
+
+ logger.info("Compacted predicate {} L{}→L{}: {} files merged into {} files",
+ predicateId, sourceLevel, targetLevel, oldKeys.size(), newFiles.size());
+
+ return new CompactionResult(newFiles, oldKeys);
+ }
+
+ private List mergeEntries(List sources, boolean suppressTombstones) {
+ List result = new ArrayList<>();
+
+ // Simple K-way merge: use a priority queue approach
+ // Each source is already sorted. We merge them, dedup by key, newest wins.
+ // For simplicity, read all into one list then dedup.
+ // Since compaction is a background operation, this is acceptable.
+
+ // Use ParquetQuadSource entries directly
+ // Sources are ordered newest-first, so for dedup, first occurrence wins
+ java.util.TreeMap deduped = new java.util.TreeMap<>();
+ for (RawEntrySource source : sources) {
+ while (source.hasNext()) {
+ byte[] key = source.peekKey();
+ byte flag = source.peekFlag();
+ // Only insert if not already present (first = newest wins)
+ CompactKey ck = new CompactKey(key);
+ if (!deduped.containsKey(ck)) {
+ // Decode the key to get quad values
+ // The key format from ParquetQuadSource encodes (subject, object, context) as varints
+ java.nio.ByteBuffer bb = java.nio.ByteBuffer.wrap(key);
+ long v1 = Varint.readUnsigned(bb);
+ long v2 = Varint.readUnsigned(bb);
+ long v3 = Varint.readUnsigned(bb);
+ if (!suppressTombstones || flag != MemTable.FLAG_TOMBSTONE) {
+ deduped.put(ck, new MemTable.QuadEntry(v1, v2, v3, flag));
+ }
+ }
+ source.advance();
+ }
+ }
+
+ if (suppressTombstones) {
+ for (MemTable.QuadEntry e : deduped.values()) {
+ if (e.flag != MemTable.FLAG_TOMBSTONE) {
+ result.add(e);
+ }
+ }
+ } else {
+ result.addAll(deduped.values());
+ }
+
+ return result;
+ }
+
+ private static class CompactKey implements Comparable {
+ final byte[] key;
+
+ CompactKey(byte[] key) {
+ this.key = key.clone();
+ }
+
+ @Override
+ public int compareTo(CompactKey other) {
+ return java.util.Arrays.compareUnsigned(this.key, other.key);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof CompactKey)) {
+ return false;
+ }
+ return java.util.Arrays.equals(key, ((CompactKey) o).key);
+ }
+
+ @Override
+ public int hashCode() {
+ return java.util.Arrays.hashCode(key);
+ }
+ }
+
+ /**
+ * Result of a compaction operation.
+ */
+ public static class CompactionResult {
+ private final List newFiles;
+ private final Set deletedKeys;
+
+ public CompactionResult(List newFiles, Set deletedKeys) {
+ this.newFiles = newFiles;
+ this.deletedKeys = deletedKeys;
+ }
+
+ public List getNewFiles() {
+ return newFiles;
+ }
+
+ public Set getDeletedKeys() {
+ return deletedKeys;
+ }
+ }
+}
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/Manifest.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/Manifest.java
deleted file mode 100644
index 0e72137bcd3..00000000000
--- a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/Manifest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2025 Eclipse RDF4J contributors.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Distribution License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * SPDX-License-Identifier: BSD-3-Clause
- *******************************************************************************/
-package org.eclipse.rdf4j.sail.s3.storage;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * JSON manifest tracking which SSTables exist in the object store.
- *
- * S3 Layout
- *
- *
- * manifest/current -> plain text "v{epoch}.json"
- * manifest/v{epoch}.json -> JSON manifest
- * sstables/L0-{epoch}-{indexName}.sst
- * values/current -> serialized value store
- * namespaces/current -> JSON namespace map
- *
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Manifest {
-
- @JsonProperty("version")
- private int version = 1;
-
- @JsonProperty("nextValueId")
- private long nextValueId;
-
- @JsonProperty("sstables")
- private List sstables = new ArrayList<>();
-
- public Manifest() {
- }
-
- public int getVersion() {
- return version;
- }
-
- public void setVersion(int version) {
- this.version = version;
- }
-
- public long getNextValueId() {
- return nextValueId;
- }
-
- public void setNextValueId(long nextValueId) {
- this.nextValueId = nextValueId;
- }
-
- public List getSstables() {
- return sstables;
- }
-
- public void setSstables(List sstables) {
- this.sstables = sstables;
- }
-
- public static Manifest load(ObjectStore store, ObjectMapper mapper) {
- byte[] pointer = store.get("manifest/current");
- if (pointer == null) {
- return new Manifest();
- }
- String manifestKey = "manifest/" + new String(pointer, StandardCharsets.UTF_8).trim();
- byte[] json = store.get(manifestKey);
- if (json == null) {
- return new Manifest();
- }
- try {
- return mapper.readValue(json, Manifest.class);
- } catch (IOException e) {
- throw new UncheckedIOException("Failed to parse manifest", e);
- }
- }
-
- public void save(ObjectStore store, ObjectMapper mapper, long epoch) {
- try {
- String versionedKey = "v" + epoch + ".json";
- byte[] json = mapper.writerWithDefaultPrettyPrinter().writeValueAsBytes(this);
- store.put("manifest/" + versionedKey, json);
- store.put("manifest/current", versionedKey.getBytes(StandardCharsets.UTF_8));
- } catch (IOException e) {
- throw new UncheckedIOException("Failed to save manifest", e);
- }
- }
-
- @JsonIgnoreProperties(ignoreUnknown = true)
- public static class SSTableInfo {
-
- @JsonProperty("s3Key")
- private String s3Key;
-
- @JsonProperty("level")
- private int level;
-
- @JsonProperty("indexName")
- private String indexName;
-
- @JsonProperty("minKeyHex")
- private String minKeyHex;
-
- @JsonProperty("maxKeyHex")
- private String maxKeyHex;
-
- @JsonProperty("entryCount")
- private long entryCount;
-
- @JsonProperty("epoch")
- private long epoch;
-
- public SSTableInfo() {
- }
-
- public SSTableInfo(String s3Key, int level, String indexName, String minKeyHex, String maxKeyHex,
- long entryCount, long epoch) {
- this.s3Key = s3Key;
- this.level = level;
- this.indexName = indexName;
- this.minKeyHex = minKeyHex;
- this.maxKeyHex = maxKeyHex;
- this.entryCount = entryCount;
- this.epoch = epoch;
- }
-
- public String getS3Key() {
- return s3Key;
- }
-
- public void setS3Key(String s3Key) {
- this.s3Key = s3Key;
- }
-
- public int getLevel() {
- return level;
- }
-
- public void setLevel(int level) {
- this.level = level;
- }
-
- public String getIndexName() {
- return indexName;
- }
-
- public void setIndexName(String indexName) {
- this.indexName = indexName;
- }
-
- public String getMinKeyHex() {
- return minKeyHex;
- }
-
- public void setMinKeyHex(String minKeyHex) {
- this.minKeyHex = minKeyHex;
- }
-
- public String getMaxKeyHex() {
- return maxKeyHex;
- }
-
- public void setMaxKeyHex(String maxKeyHex) {
- this.maxKeyHex = maxKeyHex;
- }
-
- public long getEntryCount() {
- return entryCount;
- }
-
- public void setEntryCount(long entryCount) {
- this.entryCount = entryCount;
- }
-
- public long getEpoch() {
- return epoch;
- }
-
- public void setEpoch(long epoch) {
- this.epoch = epoch;
- }
- }
-}
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/MemTable.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/MemTable.java
index d0ce15a7155..bd5f4b7e512 100644
--- a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/MemTable.java
+++ b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/MemTable.java
@@ -10,9 +10,12 @@
*******************************************************************************/
package org.eclipse.rdf4j.sail.s3.storage;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentNavigableMap;
@@ -247,6 +250,132 @@ public void advance() {
}
}
+ /**
+ * Returns a {@link RawEntrySource} over MemTable entries matching the given predicate, encoded as 3-varint keys in
+ * the specified partition sort order. Used by {@link PartitionMergeIterator} to merge MemTable entries with Parquet
+ * partition files.
+ *
+ * @param predId the predicate ID to filter by
+ * @param subj subject filter, or -1 for wildcard
+ * @param obj object filter, or -1 for wildcard
+ * @param ctx context filter, or -1 for wildcard
+ * @param sortOrder the partition sort order ("soc", "osc", or "cso")
+ * @return a RawEntrySource with 3-varint keys in the specified sort order
+ */
+ public RawEntrySource asPartitionRawSource(long predId, long subj, long obj, long ctx, String sortOrder) {
+ // Scan the SPOC MemTable for entries matching the given predicate
+ byte[] minKey = index.getMinKeyBytes(subj <= 0 ? 0 : subj, predId, obj <= 0 ? 0 : obj,
+ ctx < 0 ? 0 : ctx);
+ byte[] maxKey = index.getMaxKeyBytes(subj <= 0 ? Long.MAX_VALUE : subj, predId,
+ obj <= 0 ? Long.MAX_VALUE : obj, ctx < 0 ? Long.MAX_VALUE : ctx);
+ ConcurrentNavigableMap range = data.subMap(minKey, true, maxKey, true);
+
+ // Collect matching entries, re-encode as 3-varint partition keys
+ List entries = new ArrayList<>();
+ long[] quad = new long[4];
+ for (Map.Entry entry : range.entrySet()) {
+ index.keyToQuad(entry.getKey(), quad);
+ // Verify predicate matches (range scan may include adjacent predicates)
+ if (quad[QuadIndex.PRED_IDX] != predId) {
+ continue;
+ }
+ // Apply additional filters
+ if (subj >= 0 && quad[QuadIndex.SUBJ_IDX] != subj) {
+ continue;
+ }
+ if (obj >= 0 && quad[QuadIndex.OBJ_IDX] != obj) {
+ continue;
+ }
+ if (ctx >= 0 && quad[QuadIndex.CONTEXT_IDX] != ctx) {
+ continue;
+ }
+ byte[] partitionKey = ParquetQuadSource.encodeKey(sortOrder,
+ quad[QuadIndex.SUBJ_IDX], quad[QuadIndex.OBJ_IDX], quad[QuadIndex.CONTEXT_IDX]);
+ entries.add(new PartitionEntry(partitionKey, entry.getValue()[0]));
+ }
+
+ // Sort by partition key (entries may not be in partition sort order)
+ entries.sort((a, b) -> java.util.Arrays.compareUnsigned(a.key, b.key));
+
+ return new PartitionRawSourceImpl(entries);
+ }
+
+ private static class PartitionEntry {
+ final byte[] key;
+ final byte flag;
+
+ PartitionEntry(byte[] key, byte flag) {
+ this.key = key;
+ this.flag = flag;
+ }
+ }
+
+ private static class PartitionRawSourceImpl implements RawEntrySource {
+ private final List entries;
+ private int pos;
+
+ PartitionRawSourceImpl(List entries) {
+ this.entries = entries;
+ this.pos = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return pos < entries.size();
+ }
+
+ @Override
+ public byte[] peekKey() {
+ return entries.get(pos).key;
+ }
+
+ @Override
+ public byte peekFlag() {
+ return entries.get(pos).flag;
+ }
+
+ @Override
+ public void advance() {
+ pos++;
+ }
+ }
+
+ /**
+ * Partitions entries by predicate ID. Returns a map from predicate ID to a list of {@link QuadEntry} records
+ * containing (subject, object, context, flag). Used during Parquet flush to write per-predicate partition files.
+ *
+ * @return map from predicate ID to list of quad entries (without predicate column)
+ */
+ public Map> partitionByPredicate() {
+ Map> result = new LinkedHashMap<>();
+ long[] quad = new long[4];
+ for (Map.Entry entry : data.entrySet()) {
+ index.keyToQuad(entry.getKey(), quad);
+ long predId = quad[QuadIndex.PRED_IDX];
+ result.computeIfAbsent(predId, k -> new ArrayList<>())
+ .add(new QuadEntry(quad[QuadIndex.SUBJ_IDX], quad[QuadIndex.OBJ_IDX],
+ quad[QuadIndex.CONTEXT_IDX], entry.getValue()[0]));
+ }
+ return result;
+ }
+
+ /**
+ * A quad entry with predicate removed (implicit in partition). Used for Parquet file writing.
+ */
+ public static class QuadEntry {
+ public final long subject;
+ public final long object;
+ public final long context;
+ public final byte flag;
+
+ public QuadEntry(long subject, long object, long context, byte flag) {
+ this.subject = subject;
+ this.object = object;
+ this.context = context;
+ this.flag = flag;
+ }
+ }
+
private void checkNotFrozen() {
if (frozen.get()) {
throw new IllegalStateException("MemTable is frozen and cannot accept writes");
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/ParquetFileBuilder.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/ParquetFileBuilder.java
new file mode 100644
index 00000000000..d2f4379b3d9
--- /dev/null
+++ b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/ParquetFileBuilder.java
@@ -0,0 +1,249 @@
+/*******************************************************************************
+ * Copyright (c) 2025 Eclipse RDF4J contributors.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Distribution License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *******************************************************************************/
+package org.eclipse.rdf4j.sail.s3.storage;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.parquet.conf.ParquetConfiguration;
+import org.apache.parquet.conf.PlainParquetConfiguration;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Writes quad entries to a Parquet file in memory and returns the serialized bytes.
+ *
+ *
+ * This builder uses Parquet's {@link OutputFile} API to avoid Hadoop filesystem dependencies. Entries should already be
+ * sorted by the caller according to the specified {@link ParquetSchemas.SortOrder}.
+ *
+ *
+ * Example usage:
+ *
+ *
+ * List<QuadEntry> entries = ...;
+ * byte[] parquetBytes = ParquetFileBuilder.build(entries, SortOrder.SOC);
+ *
+ */
+public final class ParquetFileBuilder {
+
+ /** Default row group size: 8 MiB. */
+ private static final int DEFAULT_ROW_GROUP_SIZE = 8 * 1024 * 1024;
+
+ /** Default page size: 64 KiB. */
+ private static final int DEFAULT_PAGE_SIZE = 64 * 1024;
+
+ private ParquetFileBuilder() {
+ // utility class
+ }
+
+ /**
+ * A quad entry to be written to a Parquet file.
+ *
+ *
+ * For partitioned schemas the predicate is implicit in the partition path, so only subject, object, context, and
+ * flag are stored. For unpartitioned schemas, the predicate field is also written.
+ */
+ public static class QuadEntry {
+ public final long subject;
+ public final long predicate;
+ public final long object;
+ public final long context;
+ public final byte flag;
+
+ /**
+ * Creates a quad entry for partitioned files (predicate implicit in path).
+ *
+ * @param subject the subject value ID
+ * @param object the object value ID
+ * @param context the context value ID
+ * @param flag the entry flag (e.g. insert vs tombstone)
+ */
+ public QuadEntry(long subject, long object, long context, byte flag) {
+ this(subject, -1, object, context, flag);
+ }
+
+ /**
+ * Creates a quad entry for unpartitioned files (predicate stored explicitly).
+ *
+ * @param subject the subject value ID
+ * @param predicate the predicate value ID
+ * @param object the object value ID
+ * @param context the context value ID
+ * @param flag the entry flag (e.g. insert vs tombstone)
+ */
+ public QuadEntry(long subject, long predicate, long object, long context, byte flag) {
+ this.subject = subject;
+ this.predicate = predicate;
+ this.object = object;
+ this.context = context;
+ this.flag = flag;
+ }
+ }
+
+ /**
+ * Builds a Parquet file from the given entries using default settings.
+ *
+ *
+ * Uses {@link ParquetSchemas#PARTITIONED_SCHEMA}, 8 MiB row group size, and 64 KiB page size.
+ *
+ * @param entries the quad entries to write (must already be sorted)
+ * @param sortOrder the sort order of the entries
+ * @return the serialized Parquet file as a byte array
+ */
+ public static byte[] build(List entries, ParquetSchemas.SortOrder sortOrder) {
+ return build(entries, ParquetSchemas.PARTITIONED_SCHEMA, sortOrder, -1,
+ DEFAULT_ROW_GROUP_SIZE, DEFAULT_PAGE_SIZE);
+ }
+
+ /**
+ * Builds a Parquet file from the given entries with full control over parameters.
+ *
+ * @param entries the quad entries to write (must already be sorted)
+ * @param schema the Parquet schema to use
+ * @param sortOrder the sort order of the entries
+ * @param predicateId the predicate ID for partitioned files (ignored for unpartitioned)
+ * @param rowGroupSize the row group size in bytes
+ * @param pageSize the page size in bytes
+ * @return the serialized Parquet file as a byte array
+ */
+ public static byte[] build(List entries, MessageType schema,
+ ParquetSchemas.SortOrder sortOrder, long predicateId,
+ int rowGroupSize, int pageSize) {
+ try {
+ ByteArrayOutputFile outputFile = new ByteArrayOutputFile();
+
+ try (ParquetWriter writer = new QuadEntryWriterBuilder(outputFile, schema)
+ .withConf(new PlainParquetConfiguration())
+ .withCodecFactory(SimpleCodecFactory.INSTANCE)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withRowGroupSize(rowGroupSize)
+ .withPageSize(pageSize)
+ .withDictionaryEncoding(true)
+ .build()) {
+ for (QuadEntry entry : entries) {
+ writer.write(entry);
+ }
+ }
+
+ return outputFile.toByteArray();
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to build Parquet file", e);
+ }
+ }
+
+ /**
+ * Custom {@link WriteSupport} that writes {@link QuadEntry} records to Parquet.
+ */
+ private static class QuadEntryWriteSupport extends WriteSupport {
+
+ private final MessageType schema;
+ private final boolean hasPredicateColumn;
+ private RecordConsumer recordConsumer;
+
+ QuadEntryWriteSupport(MessageType schema) {
+ this.schema = schema;
+ this.hasPredicateColumn = schema.containsField(ParquetSchemas.COL_PREDICATE);
+ }
+
+ @Override
+ public WriteContext init(org.apache.hadoop.conf.Configuration configuration) {
+ return new WriteContext(schema, new HashMap<>());
+ }
+
+ @Override
+ public WriteContext init(ParquetConfiguration configuration) {
+ return new WriteContext(schema, new HashMap<>());
+ }
+
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ this.recordConsumer = recordConsumer;
+ }
+
+ @Override
+ public void write(QuadEntry entry) {
+ recordConsumer.startMessage();
+
+ int fieldIndex = 0;
+
+ // subject
+ recordConsumer.startField(ParquetSchemas.COL_SUBJECT, fieldIndex);
+ recordConsumer.addLong(entry.subject);
+ recordConsumer.endField(ParquetSchemas.COL_SUBJECT, fieldIndex);
+ fieldIndex++;
+
+ // predicate (only for unpartitioned schema)
+ if (hasPredicateColumn) {
+ recordConsumer.startField(ParquetSchemas.COL_PREDICATE, fieldIndex);
+ recordConsumer.addLong(entry.predicate);
+ recordConsumer.endField(ParquetSchemas.COL_PREDICATE, fieldIndex);
+ fieldIndex++;
+ }
+
+ // object
+ recordConsumer.startField(ParquetSchemas.COL_OBJECT, fieldIndex);
+ recordConsumer.addLong(entry.object);
+ recordConsumer.endField(ParquetSchemas.COL_OBJECT, fieldIndex);
+ fieldIndex++;
+
+ // context
+ recordConsumer.startField(ParquetSchemas.COL_CONTEXT, fieldIndex);
+ recordConsumer.addLong(entry.context);
+ recordConsumer.endField(ParquetSchemas.COL_CONTEXT, fieldIndex);
+ fieldIndex++;
+
+ // flag
+ recordConsumer.startField(ParquetSchemas.COL_FLAG, fieldIndex);
+ recordConsumer.addInteger(entry.flag);
+ recordConsumer.endField(ParquetSchemas.COL_FLAG, fieldIndex);
+
+ recordConsumer.endMessage();
+ }
+ }
+
+ /**
+ * Builder for creating a {@link ParquetWriter} that writes {@link QuadEntry} records. Uses
+ * {@link PlainParquetConfiguration} to avoid Hadoop runtime dependencies.
+ */
+ private static class QuadEntryWriterBuilder
+ extends ParquetWriter.Builder {
+
+ private final MessageType schema;
+
+ QuadEntryWriterBuilder(OutputFile file, MessageType schema) {
+ super(file);
+ this.schema = schema;
+ }
+
+ @Override
+ protected QuadEntryWriterBuilder self() {
+ return this;
+ }
+
+ @Override
+ protected WriteSupport getWriteSupport(
+ org.apache.hadoop.conf.Configuration conf) {
+ return new QuadEntryWriteSupport(schema);
+ }
+
+ @Override
+ protected WriteSupport getWriteSupport(ParquetConfiguration conf) {
+ return new QuadEntryWriteSupport(schema);
+ }
+ }
+}
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/ParquetFilterBuilder.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/ParquetFilterBuilder.java
new file mode 100644
index 00000000000..5f32dfe01c7
--- /dev/null
+++ b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/ParquetFilterBuilder.java
@@ -0,0 +1,82 @@
+/*******************************************************************************
+ * Copyright (c) 2025 Eclipse RDF4J contributors.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Distribution License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *******************************************************************************/
+package org.eclipse.rdf4j.sail.s3.storage;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+
+/**
+ * Builds Parquet {@link FilterPredicate}s from quad query patterns. Bound components (>= 0) become equality filters;
+ * unbound components (-1) are omitted.
+ */
+public class ParquetFilterBuilder {
+
+ /**
+ * Builds a Parquet filter for a within-partition query (predicate is implicit).
+ *
+ * @param subject subject ID, or -1 for wildcard
+ * @param object object ID, or -1 for wildcard
+ * @param context context ID, or -1 for wildcard
+ * @return a FilterCompat.Filter, or FilterCompat.NOOP if no filters apply
+ */
+ public static FilterCompat.Filter buildPartitionedFilter(long subject, long object, long context) {
+ FilterPredicate predicate = null;
+
+ if (subject >= 0) {
+ predicate = eq(longColumn(ParquetSchemas.COL_SUBJECT), subject);
+ }
+ if (object >= 0) {
+ FilterPredicate objFilter = eq(longColumn(ParquetSchemas.COL_OBJECT), object);
+ predicate = predicate != null ? and(predicate, objFilter) : objFilter;
+ }
+ if (context >= 0) {
+ FilterPredicate ctxFilter = eq(longColumn(ParquetSchemas.COL_CONTEXT), context);
+ predicate = predicate != null ? and(predicate, ctxFilter) : ctxFilter;
+ }
+
+ return predicate != null ? FilterCompat.get(predicate) : FilterCompat.NOOP;
+ }
+
+ /**
+ * Builds a Parquet filter for an unpartitioned file query (all 4 components).
+ *
+ * @param subject subject ID, or -1 for wildcard
+ * @param predId predicate ID, or -1 for wildcard
+ * @param object object ID, or -1 for wildcard
+ * @param context context ID, or -1 for wildcard
+ * @return a FilterCompat.Filter, or FilterCompat.NOOP if no filters apply
+ */
+ public static FilterCompat.Filter buildUnpartitionedFilter(long subject, long predId, long object, long context) {
+ FilterPredicate predicate = null;
+
+ if (subject >= 0) {
+ predicate = eq(longColumn(ParquetSchemas.COL_SUBJECT), subject);
+ }
+ if (predId >= 0) {
+ FilterPredicate pFilter = eq(longColumn(ParquetSchemas.COL_PREDICATE), predId);
+ predicate = predicate != null ? and(predicate, pFilter) : pFilter;
+ }
+ if (object >= 0) {
+ FilterPredicate objFilter = eq(longColumn(ParquetSchemas.COL_OBJECT), object);
+ predicate = predicate != null ? and(predicate, objFilter) : objFilter;
+ }
+ if (context >= 0) {
+ FilterPredicate ctxFilter = eq(longColumn(ParquetSchemas.COL_CONTEXT), context);
+ predicate = predicate != null ? and(predicate, ctxFilter) : ctxFilter;
+ }
+
+ return predicate != null ? FilterCompat.get(predicate) : FilterCompat.NOOP;
+ }
+}
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/ParquetQuadSource.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/ParquetQuadSource.java
new file mode 100644
index 00000000000..3824e9f1001
--- /dev/null
+++ b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/ParquetQuadSource.java
@@ -0,0 +1,185 @@
+/*******************************************************************************
+ * Copyright (c) 2025 Eclipse RDF4J contributors.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Distribution License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *******************************************************************************/
+package org.eclipse.rdf4j.sail.s3.storage;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.conf.PlainParquetConfiguration;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * A {@link RawEntrySource} that reads entries from an in-memory Parquet file. Entries are sorted according to the
+ * file's sort order (soc, osc, cso) and emitted as varint-encoded byte[] keys with 1-byte flag values.
+ *
+ *
+ * The key format encodes (value1, value2, value3) as varints in the sort order of the file. For example, an "soc" file
+ * produces keys as varint(subject)||varint(object)||varint(context).
+ *
+ */
+public class ParquetQuadSource implements RawEntrySource {
+
+ private final List entries;
+ private int pos;
+
+ /**
+ * Creates a source from Parquet file bytes.
+ *
+ * @param parquetData the complete Parquet file as byte[]
+ * @param sortOrder the sort order of the file ("soc", "osc", or "cso")
+ */
+ public ParquetQuadSource(byte[] parquetData, String sortOrder) {
+ this.entries = readAllEntries(parquetData, sortOrder);
+ this.pos = 0;
+ }
+
+ /**
+ * Creates a source from Parquet file bytes with filtering.
+ *
+ * @param parquetData the complete Parquet file as byte[]
+ * @param sortOrder the sort order of the file
+ * @param subject subject filter, or -1 for wildcard
+ * @param object object filter, or -1 for wildcard
+ * @param context context filter, or -1 for wildcard
+ */
+ public ParquetQuadSource(byte[] parquetData, String sortOrder, long subject, long object, long context) {
+ List all = readAllEntries(parquetData, sortOrder);
+ if (subject >= 0 || object >= 0 || context >= 0) {
+ List filtered = new ArrayList<>();
+ for (Entry e : all) {
+ if ((subject >= 0 && e.subject != subject)
+ || (object >= 0 && e.object != object)
+ || (context >= 0 && e.context != context)) {
+ continue;
+ }
+ filtered.add(e);
+ }
+ this.entries = filtered;
+ } else {
+ this.entries = all;
+ }
+ this.pos = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return pos < entries.size();
+ }
+
+ @Override
+ public byte[] peekKey() {
+ return entries.get(pos).key;
+ }
+
+ @Override
+ public byte peekFlag() {
+ return entries.get(pos).flag;
+ }
+
+ @Override
+ public void advance() {
+ pos++;
+ }
+
+ private static List readAllEntries(byte[] parquetData, String sortOrder) {
+ List result = new ArrayList<>();
+ ByteArrayInputFile inputFile = new ByteArrayInputFile(parquetData);
+
+ try (ParquetFileReader reader = ParquetFileReader.open(inputFile,
+ new ParquetReadOptions.Builder(new PlainParquetConfiguration())
+ .withCodecFactory(SimpleCodecFactory.INSTANCE)
+ .build())) {
+ MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+ MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
+
+ PageReadStore pages;
+ while ((pages = reader.readNextRowGroup()) != null) {
+ long rows = pages.getRowCount();
+ RecordReader recordReader = columnIO.getRecordReader(pages,
+ new GroupRecordConverter(schema));
+
+ for (long i = 0; i < rows; i++) {
+ Group group = recordReader.read();
+ long subject = group.getLong(ParquetSchemas.COL_SUBJECT, 0);
+ long object = group.getLong(ParquetSchemas.COL_OBJECT, 0);
+ long context = group.getLong(ParquetSchemas.COL_CONTEXT, 0);
+ int flag = group.getInteger(ParquetSchemas.COL_FLAG, 0);
+
+ byte[] key = encodeKey(sortOrder, subject, object, context);
+ result.add(new Entry(key, (byte) flag, subject, object, context));
+ }
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to read Parquet file", e);
+ }
+
+ return result;
+ }
+
+ /**
+ * Encodes a key in the given sort order as varints.
+ */
+ static byte[] encodeKey(String sortOrder, long subject, long object, long context) {
+ long v1, v2, v3;
+ switch (sortOrder) {
+ case "osc":
+ v1 = object;
+ v2 = subject;
+ v3 = context;
+ break;
+ case "cso":
+ v1 = context;
+ v2 = subject;
+ v3 = object;
+ break;
+ case "soc":
+ default:
+ v1 = subject;
+ v2 = object;
+ v3 = context;
+ break;
+ }
+
+ int len = Varint.calcLengthUnsigned(v1) + Varint.calcLengthUnsigned(v2) + Varint.calcLengthUnsigned(v3);
+ ByteBuffer bb = ByteBuffer.allocate(len);
+ Varint.writeUnsigned(bb, v1);
+ Varint.writeUnsigned(bb, v2);
+ Varint.writeUnsigned(bb, v3);
+ return bb.array();
+ }
+
+ private static class Entry {
+ final byte[] key;
+ final byte flag;
+ final long subject;
+ final long object;
+ final long context;
+
+ Entry(byte[] key, byte flag, long subject, long object, long context) {
+ this.key = key;
+ this.flag = flag;
+ this.subject = subject;
+ this.object = object;
+ this.context = context;
+ }
+ }
+}
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/ParquetSchemas.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/ParquetSchemas.java
new file mode 100644
index 00000000000..d8c6e1d8b9c
--- /dev/null
+++ b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/ParquetSchemas.java
@@ -0,0 +1,126 @@
+/*******************************************************************************
+ * Copyright (c) 2025 Eclipse RDF4J contributors.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Distribution License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *******************************************************************************/
+package org.eclipse.rdf4j.sail.s3.storage;
+
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
+
+/**
+ * Parquet schema definitions for quad storage.
+ *
+ *
+ * Two schemas are provided:
+ *
+ * - {@link #PARTITIONED_SCHEMA} - for files within {@code predicates/{id}/} directories, where the predicate is
+ * implicit in the partition path.
+ * - {@link #UNPARTITIONED_SCHEMA} - for files in {@code _unpartitioned/}, which include an explicit predicate
+ * column.
+ *
+ */
+public final class ParquetSchemas {
+
+ /** Column name for subject ID. */
+ public static final String COL_SUBJECT = "subject";
+
+ /** Column name for predicate ID. */
+ public static final String COL_PREDICATE = "predicate";
+
+ /** Column name for object ID. */
+ public static final String COL_OBJECT = "object";
+
+ /** Column name for context (named graph) ID. */
+ public static final String COL_CONTEXT = "context";
+
+ /** Column name for the entry flag (e.g. insert vs tombstone). */
+ public static final String COL_FLAG = "flag";
+
+ /**
+ * Schema for partitioned Parquet files stored under {@code predicates/{id}/}. The predicate is implicit in the
+ * directory path and not stored as a column.
+ */
+ public static final MessageType PARTITIONED_SCHEMA = Types.buildMessage()
+ .required(PrimitiveTypeName.INT64)
+ .named(COL_SUBJECT)
+ .required(PrimitiveTypeName.INT64)
+ .named(COL_OBJECT)
+ .required(PrimitiveTypeName.INT64)
+ .named(COL_CONTEXT)
+ .required(PrimitiveTypeName.INT32)
+ .named(COL_FLAG)
+ .named("quad_partitioned");
+
+ /**
+ * Schema for unpartitioned Parquet files stored under {@code _unpartitioned/}. Includes an explicit predicate
+ * column.
+ */
+ public static final MessageType UNPARTITIONED_SCHEMA = Types.buildMessage()
+ .required(PrimitiveTypeName.INT64)
+ .named(COL_SUBJECT)
+ .required(PrimitiveTypeName.INT64)
+ .named(COL_PREDICATE)
+ .required(PrimitiveTypeName.INT64)
+ .named(COL_OBJECT)
+ .required(PrimitiveTypeName.INT64)
+ .named(COL_CONTEXT)
+ .required(PrimitiveTypeName.INT32)
+ .named(COL_FLAG)
+ .named("quad_unpartitioned");
+
+ /**
+ * Sort orders for quad entries within a Parquet file.
+ */
+ public enum SortOrder {
+ /** Subject-Object-Context ordering (partitioned). */
+ SOC("soc"),
+ /** Object-Subject-Context ordering (partitioned). */
+ OSC("osc"),
+ /** Context-Subject-Object ordering (partitioned). */
+ CSO("cso"),
+ /** Subject-Predicate-Object-Context ordering (unpartitioned). */
+ SPOC("spoc");
+
+ private final String suffix;
+
+ SortOrder(String suffix) {
+ this.suffix = suffix;
+ }
+
+ /**
+ * Returns the file-name suffix for this sort order.
+ *
+ * @return the suffix string
+ */
+ public String suffix() {
+ return suffix;
+ }
+
+ /**
+ * Returns the SortOrder for the given suffix string.
+ *
+ * @param suffix the suffix (e.g. "soc", "osc", "cso", "spoc")
+ * @return the matching SortOrder
+ * @throws IllegalArgumentException if no match found
+ */
+ public static SortOrder fromSuffix(String suffix) {
+ for (SortOrder so : values()) {
+ if (so.suffix.equals(suffix)) {
+ return so;
+ }
+ }
+ throw new IllegalArgumentException("Unknown sort order suffix: " + suffix);
+ }
+ }
+
+ private ParquetSchemas() {
+ // utility class
+ }
+}
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/PartitionIndexSelector.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/PartitionIndexSelector.java
new file mode 100644
index 00000000000..58746fd0c6f
--- /dev/null
+++ b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/PartitionIndexSelector.java
@@ -0,0 +1,122 @@
+/*******************************************************************************
+ * Copyright (c) 2025 Eclipse RDF4J contributors.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Distribution License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *******************************************************************************/
+package org.eclipse.rdf4j.sail.s3.storage;
+
+/**
+ * Selects the best sort order for queries within a predicate partition.
+ *
+ *
+ * Within a predicate partition the predicate component is implicit, so query optimization selects among
+ * three-dimensional sort orders over the remaining components: subject, object, and context.
+ *
+ *
+ * - soc - sorted by (subject, object, context)
+ * - osc - sorted by (object, subject, context)
+ * - cso - sorted by (context, subject, object)
+ *
+ *
+ *
+ * The selection strategy counts leading bound components for each sort order and picks the order with the highest
+ * score. On ties, {@code soc} is preferred as the default.
+ */
+public class PartitionIndexSelector {
+
+ private PartitionIndexSelector() {
+ // utility class
+ }
+
+ /**
+ * Selects the best sort order for a within-partition query.
+ *
+ *
+ * Within a partition, predicate is fixed, so we pick from:
+ *
+ * - soc: sort by (subject, object, context)
+ * - osc: sort by (object, subject, context)
+ * - cso: sort by (context, subject, object)
+ *
+ *
+ * Each sort order is scored by counting its leading bound components. The order with the highest score wins. Ties
+ * are broken in favor of {@code soc}.
+ *
+ * @param subjectBound true if the subject is bound in the query
+ * @param objectBound true if the object is bound in the query
+ * @param contextBound true if the context is bound in the query
+ * @return the sort order suffix string: "soc", "osc", or "cso"
+ */
+ public static String selectSortOrder(boolean subjectBound, boolean objectBound, boolean contextBound) {
+ // Score each sort order by counting leading bound components
+
+ // soc: subject -> object -> context
+ int socScore = 0;
+ if (subjectBound) {
+ socScore++;
+ if (objectBound) {
+ socScore++;
+ if (contextBound) {
+ socScore++;
+ }
+ }
+ }
+
+ // osc: object -> subject -> context
+ int oscScore = 0;
+ if (objectBound) {
+ oscScore++;
+ if (subjectBound) {
+ oscScore++;
+ if (contextBound) {
+ oscScore++;
+ }
+ }
+ }
+
+ // cso: context -> subject -> object
+ int csoScore = 0;
+ if (contextBound) {
+ csoScore++;
+ if (subjectBound) {
+ csoScore++;
+ if (objectBound) {
+ csoScore++;
+ }
+ }
+ }
+
+ // Pick highest score; ties prefer soc (default)
+ if (oscScore > socScore && oscScore > csoScore) {
+ return "osc";
+ }
+ if (csoScore > socScore && csoScore > oscScore) {
+ return "cso";
+ }
+ return "soc";
+ }
+
+ /**
+ * Returns the column order for a given sort order suffix. Used when sorting entries before writing to Parquet.
+ *
+ * @param sortOrder the sort order suffix: "soc", "osc", or "cso"
+ * @return array of column names in sort priority order
+ */
+ public static String[] getColumnOrder(String sortOrder) {
+ switch (sortOrder) {
+ case "soc":
+ return new String[] { "subject", "object", "context" };
+ case "osc":
+ return new String[] { "object", "subject", "context" };
+ case "cso":
+ return new String[] { "context", "subject", "object" };
+ default:
+ return new String[] { "subject", "object", "context" };
+ }
+ }
+}
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/PartitionMergeIterator.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/PartitionMergeIterator.java
new file mode 100644
index 00000000000..3109503cb72
--- /dev/null
+++ b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/PartitionMergeIterator.java
@@ -0,0 +1,183 @@
+/*******************************************************************************
+ * Copyright (c) 2025 Eclipse RDF4J contributors.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Distribution License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *******************************************************************************/
+package org.eclipse.rdf4j.sail.s3.storage;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+/**
+ * K-way merge iterator for within-partition queries. Works with 3-varint keys (subject, object, context encoded in
+ * partition sort order) where the predicate is implicit in the partition directory.
+ *
+ *
+ * Sources are ordered newest-to-oldest. Deduplicates entries with the same key (newest wins), suppresses tombstones,
+ * and filters by expected flag.
+ *
+ */
+public class PartitionMergeIterator implements Iterator {
+
+ private final long predicateId;
+ private final String sortOrder;
+ private final byte expectedFlag;
+ private final long patternS, patternO, patternC;
+ private final PriorityQueue heap;
+ private long[] next;
+
+ /**
+ * @param sources list of sources ordered newest-to-oldest (index 0 = newest)
+ * @param predicateId the predicate ID for this partition (injected into results)
+ * @param sortOrder the sort order of all sources ("soc", "osc", or "cso")
+ * @param expectedFlag the flag to match (FLAG_EXPLICIT or FLAG_INFERRED)
+ * @param s subject pattern, or -1 for wildcard
+ * @param o object pattern, or -1 for wildcard
+ * @param c context pattern, or -1 for wildcard
+ */
+ public PartitionMergeIterator(List sources, long predicateId, String sortOrder,
+ byte expectedFlag, long s, long o, long c) {
+ this.predicateId = predicateId;
+ this.sortOrder = sortOrder;
+ this.expectedFlag = expectedFlag;
+ this.patternS = s;
+ this.patternO = o;
+ this.patternC = c;
+ this.heap = new PriorityQueue<>();
+
+ for (int i = 0; i < sources.size(); i++) {
+ RawEntrySource src = sources.get(i);
+ if (src.hasNext()) {
+ heap.add(new SourceCursor(src, i));
+ }
+ }
+
+ advance();
+ }
+
+ private void advance() {
+ next = null;
+ while (!heap.isEmpty()) {
+ // Pop minimum key
+ SourceCursor min = heap.poll();
+ byte[] winningKey = min.source.peekKey().clone();
+ byte winningFlag = min.source.peekFlag();
+
+ // Advance the winning source
+ min.source.advance();
+ if (min.source.hasNext()) {
+ heap.add(min);
+ }
+
+ // Drain all sources with the same key (deduplication)
+ while (!heap.isEmpty() && Arrays.compareUnsigned(heap.peek().source.peekKey(), winningKey) == 0) {
+ SourceCursor dup = heap.poll();
+ dup.source.advance();
+ if (dup.source.hasNext()) {
+ heap.add(dup);
+ }
+ }
+
+ // Tombstone suppression
+ if (winningFlag == MemTable.FLAG_TOMBSTONE) {
+ continue;
+ }
+
+ // Flag filter
+ if (winningFlag != expectedFlag) {
+ continue;
+ }
+
+ // Decode 3-varint key to (subject, object, context) based on sort order
+ long[] quad = decodePartitionKey(winningKey, sortOrder, predicateId);
+
+ // Pattern filter
+ if ((patternS >= 0 && quad[QuadIndex.SUBJ_IDX] != patternS)
+ || (patternO >= 0 && quad[QuadIndex.OBJ_IDX] != patternO)
+ || (patternC >= 0 && quad[QuadIndex.CONTEXT_IDX] != patternC)) {
+ continue;
+ }
+
+ next = quad;
+ return;
+ }
+ }
+
+ /**
+ * Decodes a 3-varint partition key into a full SPOC quad array.
+ */
+ static long[] decodePartitionKey(byte[] key, String sortOrder, long predicateId) {
+ ByteBuffer bb = ByteBuffer.wrap(key);
+ long v1 = Varint.readUnsigned(bb);
+ long v2 = Varint.readUnsigned(bb);
+ long v3 = Varint.readUnsigned(bb);
+
+ long[] quad = new long[4];
+ quad[QuadIndex.PRED_IDX] = predicateId;
+
+ switch (sortOrder) {
+ case "osc":
+ quad[QuadIndex.OBJ_IDX] = v1;
+ quad[QuadIndex.SUBJ_IDX] = v2;
+ quad[QuadIndex.CONTEXT_IDX] = v3;
+ break;
+ case "cso":
+ quad[QuadIndex.CONTEXT_IDX] = v1;
+ quad[QuadIndex.SUBJ_IDX] = v2;
+ quad[QuadIndex.OBJ_IDX] = v3;
+ break;
+ case "soc":
+ default:
+ quad[QuadIndex.SUBJ_IDX] = v1;
+ quad[QuadIndex.OBJ_IDX] = v2;
+ quad[QuadIndex.CONTEXT_IDX] = v3;
+ break;
+ }
+
+ return quad;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ @Override
+ public long[] next() {
+ if (next == null) {
+ throw new NoSuchElementException();
+ }
+ long[] result = next;
+ advance();
+ return result;
+ }
+
+ private static class SourceCursor implements Comparable {
+ final RawEntrySource source;
+ final int sourceIndex; // lower = newer
+
+ SourceCursor(RawEntrySource source, int sourceIndex) {
+ this.source = source;
+ this.sourceIndex = sourceIndex;
+ }
+
+ @Override
+ public int compareTo(SourceCursor other) {
+ int keyCmp = Arrays.compareUnsigned(this.source.peekKey(), other.source.peekKey());
+ if (keyCmp != 0) {
+ return keyCmp;
+ }
+ // Ties broken by source index: lower = newer = wins (poll first)
+ return Integer.compare(this.sourceIndex, other.sourceIndex);
+ }
+ }
+}
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/RawEntrySource.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/RawEntrySource.java
index 4a77d548753..9095094a319 100644
--- a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/RawEntrySource.java
+++ b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/RawEntrySource.java
@@ -11,7 +11,7 @@
package org.eclipse.rdf4j.sail.s3.storage;
/**
- * A source of raw key/flag entries for the {@link MergeIterator}. Both {@link MemTable} and {@link SSTable} expose this
+ * A source of raw key/flag entries for merge iterators. {@link MemTable} and {@link ParquetQuadSource} expose this
* interface over a key range.
*/
public interface RawEntrySource {
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/SSTable.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/SSTable.java
deleted file mode 100644
index 984617245c2..00000000000
--- a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/SSTable.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2025 Eclipse RDF4J contributors.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Distribution License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * SPDX-License-Identifier: BSD-3-Clause
- *******************************************************************************/
-package org.eclipse.rdf4j.sail.s3.storage;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-/**
- * Reads and queries an immutable SSTable from its binary representation. The entire SSTable byte[] is held in memory
- * (Phase 1c). The block index enables binary search to find the starting block for range scans.
- */
-public class SSTable {
-
- private final byte[] raw;
- private final QuadIndex quadIndex;
-
- // Parsed from footer
- private final long blockIndexOffset;
- private final int blockIndexLength;
- private final long statsOffset;
- private final int statsLength;
-
- // Parsed from block index
- private final int blockCount;
- private final byte[][] blockFirstKeys;
- private final long[] blockOffsets;
- private final int[] blockLengths;
-
- // Parsed from stats
- private final byte[] minKey;
- private final byte[] maxKey;
- private final long entryCount;
-
- public SSTable(byte[] raw, QuadIndex quadIndex) {
- this.raw = raw;
- this.quadIndex = quadIndex;
-
- // Parse footer (last 32 bytes)
- ByteBuffer footer = ByteBuffer.wrap(raw, raw.length - SSTableWriter.FOOTER_SIZE, SSTableWriter.FOOTER_SIZE);
- int magic = footer.getInt();
- if (magic != SSTableWriter.MAGIC) {
- throw new IllegalArgumentException("Invalid SSTable magic: 0x" + Integer.toHexString(magic));
- }
- int version = footer.getInt();
- if (version != SSTableWriter.VERSION) {
- throw new IllegalArgumentException("Unsupported SSTable version: " + version);
- }
- this.blockIndexOffset = footer.getLong();
- this.blockIndexLength = footer.getInt();
- this.statsOffset = footer.getLong();
- this.statsLength = footer.getInt();
-
- // Parse block index
- ByteBuffer biBuffer = ByteBuffer.wrap(raw, (int) blockIndexOffset, blockIndexLength);
- this.blockCount = biBuffer.getInt();
- this.blockFirstKeys = new byte[blockCount][];
- this.blockOffsets = new long[blockCount];
- this.blockLengths = new int[blockCount];
- for (int i = 0; i < blockCount; i++) {
- int keyLen = (int) Varint.readUnsigned(biBuffer);
- blockFirstKeys[i] = new byte[keyLen];
- biBuffer.get(blockFirstKeys[i]);
- blockOffsets[i] = biBuffer.getLong();
- blockLengths[i] = biBuffer.getInt();
- }
-
- // Parse stats
- ByteBuffer statsBuffer = ByteBuffer.wrap(raw, (int) statsOffset, statsLength);
- int minKeyLen = (int) Varint.readUnsigned(statsBuffer);
- this.minKey = new byte[minKeyLen];
- statsBuffer.get(this.minKey);
- int maxKeyLen = (int) Varint.readUnsigned(statsBuffer);
- this.maxKey = new byte[maxKeyLen];
- statsBuffer.get(this.maxKey);
- this.entryCount = statsBuffer.getLong();
- }
-
- public byte[] getMinKey() {
- return minKey;
- }
-
- public byte[] getMaxKey() {
- return maxKey;
- }
-
- public long getEntryCount() {
- return entryCount;
- }
-
- /**
- * Scans for matching quads, filtering by flag (explicit/inferred) and pattern. Same contract as
- * {@link MemTable#scan(long, long, long, long, boolean)}.
- */
- public Iterator scan(long s, long p, long o, long c, boolean explicit) {
- byte expectedFlag = explicit ? MemTable.FLAG_EXPLICIT : MemTable.FLAG_INFERRED;
- byte[] scanMinKey = quadIndex.getMinKeyBytes(s, p, o, c);
- byte[] scanMaxKey = quadIndex.getMaxKeyBytes(s, p, o, c);
- int startBlock = findStartBlock(scanMinKey);
-
- return new ScanIterator(startBlock, scanMinKey, scanMaxKey, expectedFlag, s, p, o, c);
- }
-
- /**
- * Returns a {@link RawEntrySource} over the given key range. Includes tombstones (no flag filtering). Used by
- * {@link MergeIterator}.
- */
- public RawEntrySource asRawSource(long s, long p, long o, long c) {
- byte[] scanMinKey = quadIndex.getMinKeyBytes(s, p, o, c);
- byte[] scanMaxKey = quadIndex.getMaxKeyBytes(s, p, o, c);
- int startBlock = findStartBlock(scanMinKey);
-
- return new RawSourceImpl(startBlock, scanMinKey, scanMaxKey);
- }
-
- /**
- * Binary search to find the block that could contain the given key.
- */
- private int findStartBlock(byte[] targetKey) {
- int lo = 0, hi = blockCount - 1;
- int result = 0;
- while (lo <= hi) {
- int mid = (lo + hi) >>> 1;
- int cmp = Arrays.compareUnsigned(blockFirstKeys[mid], targetKey);
- if (cmp <= 0) {
- result = mid;
- lo = mid + 1;
- } else {
- hi = mid - 1;
- }
- }
- return result;
- }
-
- /**
- * Reads the next entry from the data region at the given position.
- *
- * @return the position after the entry, or -1 if we've exceeded the data region
- */
- private int readEntry(int pos, byte[][] keyOut, byte[] flagOut) {
- if (pos >= blockIndexOffset) {
- return -1;
- }
- ByteBuffer bb = ByteBuffer.wrap(raw, pos, (int) blockIndexOffset - pos);
- int keyLen = (int) Varint.readUnsigned(bb);
- int newPos = pos + (bb.position() - 0) + keyLen + 1;
- // Recalculate from actual buffer position
- int absKeyStart = pos + (bb.position() - (int) 0);
-
- // Re-read properly
- bb = ByteBuffer.wrap(raw, pos, (int) blockIndexOffset - pos);
- keyLen = (int) Varint.readUnsigned(bb);
- byte[] key = new byte[keyLen];
- bb.get(key);
- byte flag = bb.get();
-
- keyOut[0] = key;
- flagOut[0] = flag;
- return pos + Varint.calcLengthUnsigned(keyLen) + keyLen + 1;
- }
-
- private class ScanIterator implements Iterator {
- private int pos;
- private final byte[] scanMaxKey;
- private final byte expectedFlag;
- private final long patternS, patternP, patternO, patternC;
- private long[] next;
- private final byte[][] keyBuf = new byte[1][];
- private final byte[] flagBuf = new byte[1];
-
- ScanIterator(int startBlock, byte[] scanMinKey, byte[] scanMaxKey, byte expectedFlag,
- long s, long p, long o, long c) {
- this.pos = (int) blockOffsets[startBlock];
- this.scanMaxKey = scanMaxKey;
- this.expectedFlag = expectedFlag;
- this.patternS = s;
- this.patternP = p;
- this.patternO = o;
- this.patternC = c;
-
- // Skip entries before scanMinKey
- skipToMinKey(scanMinKey);
- advance();
- }
-
- private void skipToMinKey(byte[] scanMinKey) {
- while (pos < blockIndexOffset) {
- int savedPos = pos;
- int nextPos = readEntry(pos, keyBuf, flagBuf);
- if (nextPos < 0) {
- break;
- }
- if (Arrays.compareUnsigned(keyBuf[0], scanMinKey) >= 0) {
- pos = savedPos; // revert - this entry is in range
- return;
- }
- pos = nextPos;
- }
- }
-
- private void advance() {
- next = null;
- while (pos < blockIndexOffset) {
- int nextPos = readEntry(pos, keyBuf, flagBuf);
- if (nextPos < 0) {
- break;
- }
- pos = nextPos;
-
- byte[] key = keyBuf[0];
- byte flag = flagBuf[0];
-
- // Past max key?
- if (Arrays.compareUnsigned(key, scanMaxKey) > 0) {
- pos = (int) blockIndexOffset; // done
- return;
- }
-
- if (flag != expectedFlag) {
- continue;
- }
-
- long[] quad = new long[4];
- quadIndex.keyToQuad(key, quad);
-
- if ((patternS >= 0 && quad[QuadIndex.SUBJ_IDX] != patternS)
- || (patternP >= 0 && quad[QuadIndex.PRED_IDX] != patternP)
- || (patternO >= 0 && quad[QuadIndex.OBJ_IDX] != patternO)
- || (patternC >= 0 && quad[QuadIndex.CONTEXT_IDX] != patternC)) {
- continue;
- }
-
- next = quad;
- return;
- }
- }
-
- @Override
- public boolean hasNext() {
- return next != null;
- }
-
- @Override
- public long[] next() {
- if (next == null) {
- throw new NoSuchElementException();
- }
- long[] result = next;
- advance();
- return result;
- }
- }
-
- private class RawSourceImpl implements RawEntrySource {
- private int pos;
- private final byte[] scanMaxKey;
- private byte[] currentKey;
- private byte currentFlag;
- private boolean valid;
- private final byte[][] keyBuf = new byte[1][];
- private final byte[] flagBuf = new byte[1];
-
- RawSourceImpl(int startBlock, byte[] scanMinKey, byte[] scanMaxKey) {
- this.pos = (int) blockOffsets[startBlock];
- this.scanMaxKey = scanMaxKey;
- skipToMinKey(scanMinKey);
- }
-
- private void skipToMinKey(byte[] scanMinKey) {
- while (pos < blockIndexOffset) {
- int savedPos = pos;
- int nextPos = readEntry(pos, keyBuf, flagBuf);
- if (nextPos < 0) {
- valid = false;
- return;
- }
- if (Arrays.compareUnsigned(keyBuf[0], scanMinKey) >= 0) {
- currentKey = keyBuf[0];
- currentFlag = flagBuf[0];
- pos = nextPos;
- valid = Arrays.compareUnsigned(currentKey, scanMaxKey) <= 0;
- return;
- }
- pos = nextPos;
- }
- valid = false;
- }
-
- @Override
- public boolean hasNext() {
- return valid;
- }
-
- @Override
- public byte[] peekKey() {
- return currentKey;
- }
-
- @Override
- public byte peekFlag() {
- return currentFlag;
- }
-
- @Override
- public void advance() {
- if (pos >= blockIndexOffset) {
- valid = false;
- return;
- }
- int nextPos = readEntry(pos, keyBuf, flagBuf);
- if (nextPos < 0) {
- valid = false;
- return;
- }
- pos = nextPos;
- currentKey = keyBuf[0];
- currentFlag = flagBuf[0];
- if (Arrays.compareUnsigned(currentKey, scanMaxKey) > 0) {
- valid = false;
- }
- }
- }
-}
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/SSTableWriter.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/SSTableWriter.java
deleted file mode 100644
index 6bcc272b7cf..00000000000
--- a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/SSTableWriter.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2025 Eclipse RDF4J contributors.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Distribution License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * SPDX-License-Identifier: BSD-3-Clause
- *******************************************************************************/
-package org.eclipse.rdf4j.sail.s3.storage;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Serializes a frozen {@link MemTable} into SSTable binary format.
- *
- * Format
- *
- *
- * [DATA BLOCKS]
- * Per entry: [key_length varint][key bytes][flag 1 byte]
- * Block boundary when cumulative size exceeds blockSize
- *
- * [BLOCK INDEX]
- * [block_count: 4-byte int BE]
- * Per block: [first_key_length varint][first_key bytes][offset: 8-byte long BE][length: 4-byte int BE]
- *
- * [STATS]
- * [min_key_length varint][min_key bytes]
- * [max_key_length varint][max_key bytes]
- * [entry_count: 8-byte long BE]
- *
- * [FOOTER: 32 bytes]
- * [magic: 4 bytes = 0x53535431 "SST1"]
- * [version: 4 bytes = 1]
- * [block_index_offset: 8-byte long BE]
- * [block_index_length: 4-byte int BE]
- * [stats_offset: 8-byte long BE]
- * [stats_length: 4-byte int BE]
- *
- */
-public class SSTableWriter {
-
- static final int MAGIC = 0x53535431; // "SST1"
- static final int VERSION = 1;
- static final int FOOTER_SIZE = 32;
- static final int DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024; // 4 MiB
-
- public static byte[] write(MemTable memTable) {
- return write(memTable, DEFAULT_BLOCK_SIZE);
- }
-
- public static byte[] write(MemTable memTable, int blockSize) {
- try {
- Map data = memTable.getData();
- if (data.isEmpty()) {
- throw new IllegalArgumentException("Cannot write empty MemTable to SSTable");
- }
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(baos);
-
- // Track block boundaries
- List blocks = new ArrayList<>();
- byte[] firstKeyInBlock = null;
- long blockStartOffset = 0;
- long currentBlockSize = 0;
-
- byte[] minKey = null;
- byte[] maxKey = null;
- long entryCount = 0;
-
- for (Map.Entry entry : data.entrySet()) {
- byte[] key = entry.getKey();
- byte flag = entry.getValue()[0];
-
- if (minKey == null) {
- minKey = key;
- }
- maxKey = key;
- entryCount++;
-
- // Start a new block if needed
- if (firstKeyInBlock == null) {
- firstKeyInBlock = key;
- blockStartOffset = baos.size();
- currentBlockSize = 0;
- }
-
- // Write entry: [key_length varint][key bytes][flag 1 byte]
- writeVarint(out, key.length);
- out.write(key);
- out.write(flag);
- currentBlockSize += varintLength(key.length) + key.length + 1;
-
- // Check block boundary
- if (currentBlockSize >= blockSize) {
- long blockEnd = baos.size();
- blocks.add(new BlockInfo(firstKeyInBlock, blockStartOffset, (int) (blockEnd - blockStartOffset)));
- firstKeyInBlock = null;
- currentBlockSize = 0;
- }
- }
-
- // Finalize last block
- if (firstKeyInBlock != null) {
- long blockEnd = baos.size();
- blocks.add(new BlockInfo(firstKeyInBlock, blockStartOffset, (int) (blockEnd - blockStartOffset)));
- }
-
- // Write block index
- long blockIndexOffset = baos.size();
- out.writeInt(blocks.size());
- for (BlockInfo block : blocks) {
- writeVarint(out, block.firstKey.length);
- out.write(block.firstKey);
- out.writeLong(block.offset);
- out.writeInt(block.length);
- }
- int blockIndexLength = (int) (baos.size() - blockIndexOffset);
-
- // Write stats
- long statsOffset = baos.size();
- writeVarint(out, minKey.length);
- out.write(minKey);
- writeVarint(out, maxKey.length);
- out.write(maxKey);
- out.writeLong(entryCount);
- int statsLength = (int) (baos.size() - statsOffset);
-
- // Write footer (32 bytes)
- out.writeInt(MAGIC);
- out.writeInt(VERSION);
- out.writeLong(blockIndexOffset);
- out.writeInt(blockIndexLength);
- out.writeLong(statsOffset);
- out.writeInt(statsLength);
-
- out.flush();
- return baos.toByteArray();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
- private static void writeVarint(DataOutputStream out, int value) throws IOException {
- // Simple varint for lengths (always non-negative int)
- ByteBuffer bb = ByteBuffer.allocate(5);
- Varint.writeUnsigned(bb, value);
- out.write(bb.array(), 0, bb.position());
- }
-
- private static int varintLength(int value) {
- return Varint.calcLengthUnsigned(value);
- }
-
- private static class BlockInfo {
- final byte[] firstKey;
- final long offset;
- final int length;
-
- BlockInfo(byte[] firstKey, long offset, int length) {
- this.firstKey = firstKey;
- this.offset = offset;
- this.length = length;
- }
- }
-}
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/SimpleCodecFactory.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/SimpleCodecFactory.java
new file mode 100644
index 00000000000..8f098c419ff
--- /dev/null
+++ b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/SimpleCodecFactory.java
@@ -0,0 +1,138 @@
+/*******************************************************************************
+ * Copyright (c) 2025 Eclipse RDF4J contributors.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Distribution License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *******************************************************************************/
+package org.eclipse.rdf4j.sail.s3.storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import com.github.luben.zstd.Zstd;
+
+/**
+ * A lightweight {@link CompressionCodecFactory} that handles ZSTD and UNCOMPRESSED codecs without any Hadoop
+ * dependencies. Uses {@code zstd-jni} directly for ZSTD compression/decompression.
+ */
+final class SimpleCodecFactory implements CompressionCodecFactory {
+
+ static final SimpleCodecFactory INSTANCE = new SimpleCodecFactory();
+
+ private SimpleCodecFactory() {
+ }
+
+ @Override
+ public BytesInputCompressor getCompressor(CompressionCodecName codec) {
+ switch (codec) {
+ case ZSTD:
+ return ZSTD_COMPRESSOR;
+ case UNCOMPRESSED:
+ return NOOP_COMPRESSOR;
+ default:
+ throw new UnsupportedOperationException("Unsupported compression codec: " + codec);
+ }
+ }
+
+ @Override
+ public BytesInputDecompressor getDecompressor(CompressionCodecName codec) {
+ switch (codec) {
+ case ZSTD:
+ return ZSTD_DECOMPRESSOR;
+ case UNCOMPRESSED:
+ return NOOP_DECOMPRESSOR;
+ default:
+ throw new UnsupportedOperationException("Unsupported compression codec: " + codec);
+ }
+ }
+
+ @Override
+ public void release() {
+ // no resources to release
+ }
+
+ private static final BytesInputCompressor ZSTD_COMPRESSOR = new BytesInputCompressor() {
+ @Override
+ public BytesInput compress(BytesInput bytes) throws IOException {
+ byte[] input = bytes.toByteArray();
+ byte[] compressed = Zstd.compress(input);
+ return BytesInput.from(compressed);
+ }
+
+ @Override
+ public CompressionCodecName getCodecName() {
+ return CompressionCodecName.ZSTD;
+ }
+
+ @Override
+ public void release() {
+ }
+ };
+
+ private static final BytesInputDecompressor ZSTD_DECOMPRESSOR = new BytesInputDecompressor() {
+ @Override
+ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+ byte[] input = bytes.toByteArray();
+ byte[] decompressed = new byte[uncompressedSize];
+ Zstd.decompress(decompressed, input);
+ return BytesInput.from(decompressed);
+ }
+
+ @Override
+ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
+ throws IOException {
+ byte[] compressedBytes = new byte[compressedSize];
+ input.get(compressedBytes);
+ byte[] decompressed = new byte[uncompressedSize];
+ Zstd.decompress(decompressed, compressedBytes);
+ output.put(decompressed);
+ }
+
+ @Override
+ public void release() {
+ }
+ };
+
+ private static final BytesInputCompressor NOOP_COMPRESSOR = new BytesInputCompressor() {
+ @Override
+ public BytesInput compress(BytesInput bytes) throws IOException {
+ return bytes;
+ }
+
+ @Override
+ public CompressionCodecName getCodecName() {
+ return CompressionCodecName.UNCOMPRESSED;
+ }
+
+ @Override
+ public void release() {
+ }
+ };
+
+ private static final BytesInputDecompressor NOOP_DECOMPRESSOR = new BytesInputDecompressor() {
+ @Override
+ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+ return bytes;
+ }
+
+ @Override
+ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
+ throws IOException {
+ byte[] data = new byte[compressedSize];
+ input.get(data);
+ output.put(data);
+ }
+
+ @Override
+ public void release() {
+ }
+ };
+}
diff --git a/core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/ManifestTest.java b/core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/ManifestTest.java
deleted file mode 100644
index 8a0e48a334a..00000000000
--- a/core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/ManifestTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2025 Eclipse RDF4J contributors.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Distribution License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * SPDX-License-Identifier: BSD-3-Clause
- *******************************************************************************/
-package org.eclipse.rdf4j.sail.s3.storage;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-class ManifestTest {
-
- @TempDir
- Path tempDir;
-
- @Test
- void roundTrip() throws Exception {
- ObjectMapper mapper = new ObjectMapper();
- FileSystemObjectStore store = new FileSystemObjectStore(tempDir);
-
- Manifest manifest = new Manifest();
- manifest.setNextValueId(42);
- List infos = new ArrayList<>();
- infos.add(new Manifest.SSTableInfo("sstables/L0-1-spoc.sst", 0, "spoc", "0102", "0304", 10, 1));
- infos.add(new Manifest.SSTableInfo("sstables/L0-1-posc.sst", 0, "posc", "0506", "0708", 10, 1));
- manifest.setSstables(infos);
-
- manifest.save(store, mapper, 1);
-
- Manifest loaded = Manifest.load(store, mapper);
- assertEquals(1, loaded.getVersion());
- assertEquals(42, loaded.getNextValueId());
- assertEquals(2, loaded.getSstables().size());
- assertEquals("sstables/L0-1-spoc.sst", loaded.getSstables().get(0).getS3Key());
- assertEquals("spoc", loaded.getSstables().get(0).getIndexName());
- assertEquals(10, loaded.getSstables().get(0).getEntryCount());
- assertEquals(1, loaded.getSstables().get(0).getEpoch());
- }
-
- @Test
- void loadReturnsEmptyManifestWhenNoneExists() {
- FileSystemObjectStore store = new FileSystemObjectStore(tempDir);
- ObjectMapper mapper = new ObjectMapper();
-
- Manifest loaded = Manifest.load(store, mapper);
- assertNotNull(loaded);
- assertEquals(0, loaded.getSstables().size());
- assertEquals(0, loaded.getNextValueId());
- }
-
- @Test
- void multipleVersions() throws Exception {
- ObjectMapper mapper = new ObjectMapper();
- FileSystemObjectStore store = new FileSystemObjectStore(tempDir);
-
- // Save version 1
- Manifest m1 = new Manifest();
- m1.setNextValueId(10);
- m1.save(store, mapper, 1);
-
- // Save version 2
- Manifest m2 = new Manifest();
- m2.setNextValueId(20);
- List infos = new ArrayList<>();
- infos.add(new Manifest.SSTableInfo("sstables/L0-2-spoc.sst", 0, "spoc", "01", "02", 5, 2));
- m2.setSstables(infos);
- m2.save(store, mapper, 2);
-
- // Load should return the latest (version 2)
- Manifest loaded = Manifest.load(store, mapper);
- assertEquals(20, loaded.getNextValueId());
- assertEquals(1, loaded.getSstables().size());
- }
-}
diff --git a/core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/MergeIteratorTest.java b/core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/MergeIteratorTest.java
index 77ff3896ecb..b5fa1076a1d 100644
--- a/core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/MergeIteratorTest.java
+++ b/core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/MergeIteratorTest.java
@@ -126,20 +126,18 @@ void patternFilter() {
}
@Test
- void mergeMemTableWithSSTable() {
- // MemTable (newer) + SSTable (older)
+ void mergeMemTableWithOlderSource() {
+ // MemTable (newer) + older MemTable source
MemTable memTable = new MemTable(spoc);
memTable.put(1, 2, 3, 0, true);
MemTable olderData = new MemTable(spoc);
olderData.put(2, 3, 4, 0, true);
olderData.put(4, 5, 6, 0, true);
- byte[] sstData = SSTableWriter.write(olderData);
- SSTable sst = new SSTable(sstData, spoc);
List sources = Arrays.asList(
memTable.asRawSource(-1, -1, -1, -1),
- sst.asRawSource(-1, -1, -1, -1));
+ olderData.asRawSource(-1, -1, -1, -1));
MergeIterator iter = new MergeIterator(sources, spoc, MemTable.FLAG_EXPLICIT, -1, -1, -1, -1);
List results = toList(iter);
@@ -147,20 +145,18 @@ void mergeMemTableWithSSTable() {
}
@Test
- void tombstoneInMemTableShadowsSSTable() {
- // SSTable has a value, MemTable deletes it
+ void tombstoneInNewerShadowsOlder() {
+ // Older source has a value, newer MemTable deletes it
MemTable olderData = new MemTable(spoc);
olderData.put(1, 2, 3, 0, true);
olderData.put(4, 5, 6, 0, true);
- byte[] sstData = SSTableWriter.write(olderData);
- SSTable sst = new SSTable(sstData, spoc);
MemTable memTable = new MemTable(spoc);
- memTable.remove(1, 2, 3, 0, true); // tombstone shadows SSTable entry
+ memTable.remove(1, 2, 3, 0, true); // tombstone shadows older entry
List sources = Arrays.asList(
memTable.asRawSource(-1, -1, -1, -1),
- sst.asRawSource(-1, -1, -1, -1));
+ olderData.asRawSource(-1, -1, -1, -1));
MergeIterator iter = new MergeIterator(sources, spoc, MemTable.FLAG_EXPLICIT, -1, -1, -1, -1);
List results = toList(iter);
diff --git a/core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/SSTableWriterReaderTest.java b/core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/SSTableWriterReaderTest.java
deleted file mode 100644
index be49b3f82b9..00000000000
--- a/core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/SSTableWriterReaderTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2025 Eclipse RDF4J contributors.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Distribution License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * SPDX-License-Identifier: BSD-3-Clause
- *******************************************************************************/
-package org.eclipse.rdf4j.sail.s3.storage;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.junit.jupiter.api.Test;
-
-class SSTableWriterReaderTest {
-
- private final QuadIndex spoc = new QuadIndex("spoc");
-
- @Test
- void roundTrip_singleEntry() {
- MemTable mt = new MemTable(spoc);
- mt.put(1, 2, 3, 4, true);
-
- byte[] sstData = SSTableWriter.write(mt);
- SSTable sst = new SSTable(sstData, spoc);
-
- assertEquals(1, sst.getEntryCount());
-
- Iterator iter = sst.scan(1, 2, 3, 4, true);
- assertTrue(iter.hasNext());
- long[] quad = iter.next();
- assertArrayEquals(new long[] { 1, 2, 3, 4 }, quad);
- assertFalse(iter.hasNext());
- }
-
- @Test
- void roundTrip_multipleEntries() {
- MemTable mt = new MemTable(spoc);
- mt.put(1, 2, 3, 0, true);
- mt.put(1, 2, 4, 0, true);
- mt.put(2, 3, 4, 0, true);
- mt.put(10, 20, 30, 40, true);
-
- byte[] sstData = SSTableWriter.write(mt);
- SSTable sst = new SSTable(sstData, spoc);
-
- assertEquals(4, sst.getEntryCount());
-
- // Wildcard scan
- List results = toList(sst.scan(-1, -1, -1, -1, true));
- assertEquals(4, results.size());
- }
-
- @Test
- void roundTrip_patternFilter() {
- MemTable mt = new MemTable(spoc);
- mt.put(1, 2, 3, 0, true);
- mt.put(1, 2, 4, 0, true);
- mt.put(2, 3, 4, 0, true);
-
- byte[] sstData = SSTableWriter.write(mt);
- SSTable sst = new SSTable(sstData, spoc);
-
- // Filter by subject=1
- List results = toList(sst.scan(1, -1, -1, -1, true));
- assertEquals(2, results.size());
- assertEquals(1, results.get(0)[0]);
- assertEquals(1, results.get(1)[0]);
-
- // Filter by subject=2
- results = toList(sst.scan(2, -1, -1, -1, true));
- assertEquals(1, results.size());
- assertEquals(2, results.get(0)[0]);
- }
-
- @Test
- void roundTrip_tombstonesFilteredInScan() {
- MemTable mt = new MemTable(spoc);
- mt.put(1, 2, 3, 0, true);
- mt.put(1, 2, 4, 0, true);
- mt.remove(1, 2, 3, 0, true); // tombstone
-
- byte[] sstData = SSTableWriter.write(mt);
- SSTable sst = new SSTable(sstData, spoc);
-
- // Tombstone entry is still in the SSTable (entryCount includes it)
- assertEquals(2, sst.getEntryCount());
-
- // But scan filters it out
- List results = toList(sst.scan(-1, -1, -1, -1, true));
- assertEquals(1, results.size());
- assertArrayEquals(new long[] { 1, 2, 4, 0 }, results.get(0));
- }
-
- @Test
- void roundTrip_tombstonesVisibleInRawSource() {
- MemTable mt = new MemTable(spoc);
- mt.put(1, 2, 3, 0, true);
- mt.remove(1, 2, 3, 0, true);
-
- byte[] sstData = SSTableWriter.write(mt);
- SSTable sst = new SSTable(sstData, spoc);
-
- RawEntrySource source = sst.asRawSource(-1, -1, -1, -1);
- assertTrue(source.hasNext());
- // The tombstone should be visible
- assertEquals(MemTable.FLAG_TOMBSTONE, source.peekFlag());
- }
-
- @Test
- void roundTrip_explicitVsInferred() {
- MemTable mt = new MemTable(spoc);
- mt.put(1, 2, 3, 0, true); // explicit
- mt.put(4, 5, 6, 0, false); // inferred
-
- byte[] sstData = SSTableWriter.write(mt);
- SSTable sst = new SSTable(sstData, spoc);
-
- List explicitResults = toList(sst.scan(-1, -1, -1, -1, true));
- assertEquals(1, explicitResults.size());
- assertArrayEquals(new long[] { 1, 2, 3, 0 }, explicitResults.get(0));
-
- List inferredResults = toList(sst.scan(-1, -1, -1, -1, false));
- assertEquals(1, inferredResults.size());
- assertArrayEquals(new long[] { 4, 5, 6, 0 }, inferredResults.get(0));
- }
-
- @Test
- void roundTrip_smallBlockSize() {
- // Use a very small block size to test multi-block SSTables
- MemTable mt = new MemTable(spoc);
- for (long i = 1; i <= 100; i++) {
- mt.put(i, i + 1, i + 2, 0, true);
- }
-
- byte[] sstData = SSTableWriter.write(mt, 64); // tiny blocks
- SSTable sst = new SSTable(sstData, spoc);
-
- assertEquals(100, sst.getEntryCount());
-
- // Verify all entries are retrievable
- List results = toList(sst.scan(-1, -1, -1, -1, true));
- assertEquals(100, results.size());
-
- // Verify range scan with block index seeking
- results = toList(sst.scan(50, -1, -1, -1, true));
- assertEquals(1, results.size());
- assertEquals(50, results.get(0)[0]);
- }
-
- @Test
- void roundTrip_largeIds() {
- MemTable mt = new MemTable(spoc);
- mt.put(100000, 200000, 300000, 400000, true);
-
- byte[] sstData = SSTableWriter.write(mt);
- SSTable sst = new SSTable(sstData, spoc);
-
- List results = toList(sst.scan(-1, -1, -1, -1, true));
- assertEquals(1, results.size());
- assertArrayEquals(new long[] { 100000, 200000, 300000, 400000 }, results.get(0));
- }
-
- @Test
- void emptyMemTable_throwsException() {
- MemTable mt = new MemTable(spoc);
- assertThrows(IllegalArgumentException.class, () -> SSTableWriter.write(mt));
- }
-
- private List toList(Iterator iter) {
- List list = new ArrayList<>();
- while (iter.hasNext()) {
- list.add(iter.next());
- }
- return list;
- }
-}
From 60707caf56aa0842f332cfdb1742852b6dff18aa Mon Sep 17 00:00:00 2001
From: Chengxu Bian
Date: Thu, 26 Feb 2026 21:11:10 -0500
Subject: [PATCH 04/10] feat: stats-based pruning, workbench UI, and
instance-level S3 config (Phase 3)
Replace predicate partitioning with flat files and per-file min/max stats
for pruning. Add S3 Store to RDF4J Workbench with creation form and TTL
config template. S3 connection settings (bucket, endpoint, credentials)
resolve from environment variables (RDF4J_S3_*) or system properties so
multiple repositories share a single bucket, each isolated by s3Prefix.
---
.../eclipse/rdf4j/repository/config/s3.ttl | 25 ++
core/sail/s3/docker-compose.yml | 32 ++
.../eclipse/rdf4j/sail/s3/S3SailStore.java | 316 ++++++++----------
.../rdf4j/sail/s3/config/S3StoreConfig.java | 65 +++-
.../rdf4j/sail/s3/storage/Catalog.java | 126 +++----
.../rdf4j/sail/s3/storage/Compactor.java | 77 ++---
.../rdf4j/sail/s3/storage/MemTable.java | 177 ++++------
.../sail/s3/storage/ParquetFileBuilder.java | 44 +--
.../sail/s3/storage/ParquetQuadSource.java | 91 ++---
.../rdf4j/sail/s3/storage/ParquetSchemas.java | 47 +--
.../s3/storage/PartitionIndexSelector.java | 122 -------
.../s3/storage/PartitionMergeIterator.java | 183 ----------
.../rdf4j/sail/s3/storage/QuadIndex.java | 8 +-
.../rdf4j/sail/s3/S3PersistenceTest.java | 156 +++++++++
.../rdf4j/sail/s3/storage/CatalogTest.java | 150 +++++++++
.../sail/s3/storage/MemTableReorderTest.java | 163 +++++++++
.../sail/s3/storage/ParquetRoundTripTest.java | 199 +++++++++++
.../s3/storage/QuadIndexSelectionTest.java | 106 ++++++
.../main/webapp/transformations/create-s3.xsl | 75 +++++
.../main/webapp/transformations/create.xsl | 1 +
20 files changed, 1301 insertions(+), 862 deletions(-)
create mode 100644 core/repository/api/src/main/resources/org/eclipse/rdf4j/repository/config/s3.ttl
create mode 100644 core/sail/s3/docker-compose.yml
delete mode 100644 core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/PartitionIndexSelector.java
delete mode 100644 core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/storage/PartitionMergeIterator.java
create mode 100644 core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/CatalogTest.java
create mode 100644 core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/MemTableReorderTest.java
create mode 100644 core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/ParquetRoundTripTest.java
create mode 100644 core/sail/s3/src/test/java/org/eclipse/rdf4j/sail/s3/storage/QuadIndexSelectionTest.java
create mode 100644 tools/workbench/src/main/webapp/transformations/create-s3.xsl
diff --git a/core/repository/api/src/main/resources/org/eclipse/rdf4j/repository/config/s3.ttl b/core/repository/api/src/main/resources/org/eclipse/rdf4j/repository/config/s3.ttl
new file mode 100644
index 00000000000..3806dc0bed4
--- /dev/null
+++ b/core/repository/api/src/main/resources/org/eclipse/rdf4j/repository/config/s3.ttl
@@ -0,0 +1,25 @@
+#
+# Configuration template for an S3Store
+#
+# S3 connection settings (bucket, endpoint, region, credentials) are configured
+# at the RDF4J instance level via environment variables or system properties:
+#
+# RDF4J_S3_BUCKET, RDF4J_S3_ENDPOINT, RDF4J_S3_REGION,
+# RDF4J_S3_ACCESS_KEY, RDF4J_S3_SECRET_KEY, RDF4J_S3_FORCE_PATH_STYLE
+#
+# Each repository uses s3Prefix to partition its data within the shared bucket.
+#
+@prefix rdfs: .
+@prefix config: .
+@prefix s3: .
+
+[] a config:Repository ;
+ config:rep.id "{%Repository ID|s3%}" ;
+ rdfs:label "{%Repository title|S3 Store%}" ;
+ config:rep.impl [
+ config:rep.type "openrdf:SailRepository" ;
+ config:sail.impl [
+ config:sail.type "rdf4j:S3Store" ;
+ s3:s3Prefix "{%S3 Prefix|%}"
+ ]
+ ].
diff --git a/core/sail/s3/docker-compose.yml b/core/sail/s3/docker-compose.yml
new file mode 100644
index 00000000000..096bbf7fcb2
--- /dev/null
+++ b/core/sail/s3/docker-compose.yml
@@ -0,0 +1,32 @@
+services:
+ minio:
+ image: minio/minio:latest
+ ports:
+ - "9000:9000"
+ - "9001:9001"
+ environment:
+ MINIO_ROOT_USER: minioadmin
+ MINIO_ROOT_PASSWORD: minioadmin
+ command: server /data --console-address ":9001"
+ volumes:
+ - minio-data:/data
+ healthcheck:
+ test: ["CMD", "mc", "ready", "local"]
+ interval: 5s
+ timeout: 5s
+ retries: 5
+
+ createbucket:
+ image: minio/mc:latest
+ depends_on:
+ minio:
+ condition: service_healthy
+ entrypoint: >
+ /bin/sh -c "
+ mc alias set local http://minio:9000 minioadmin minioadmin;
+ mc mb --ignore-existing local/rdf4j-data;
+ echo 'Bucket ready';
+ "
+
+volumes:
+ minio-data:
diff --git a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/S3SailStore.java b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/S3SailStore.java
index 488fc60adc2..8aec14e4df3 100644
--- a/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/S3SailStore.java
+++ b/core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/S3SailStore.java
@@ -49,12 +49,11 @@
import org.eclipse.rdf4j.sail.s3.storage.CompactionPolicy;
import org.eclipse.rdf4j.sail.s3.storage.Compactor;
import org.eclipse.rdf4j.sail.s3.storage.MemTable;
+import org.eclipse.rdf4j.sail.s3.storage.MergeIterator;
import org.eclipse.rdf4j.sail.s3.storage.ObjectStore;
import org.eclipse.rdf4j.sail.s3.storage.ParquetFileBuilder;
import org.eclipse.rdf4j.sail.s3.storage.ParquetQuadSource;
import org.eclipse.rdf4j.sail.s3.storage.ParquetSchemas;
-import org.eclipse.rdf4j.sail.s3.storage.PartitionIndexSelector;
-import org.eclipse.rdf4j.sail.s3.storage.PartitionMergeIterator;
import org.eclipse.rdf4j.sail.s3.storage.QuadIndex;
import org.eclipse.rdf4j.sail.s3.storage.RawEntrySource;
import org.eclipse.rdf4j.sail.s3.storage.S3ObjectStore;
@@ -65,11 +64,11 @@
/**
* {@link SailStore} implementation that stores RDF quads using Parquet files on S3-compatible object storage with
- * predicate-based vertical partitioning.
+ * stats-based pruning (no predicate partitioning).
*
*
- * Architecture: single in-memory {@link MemTable} in SPOC order → on flush, partition by predicate and write 3 Parquet
- * files per partition (SOC, OSC, CSO sort orders) → multi-tier cache (Caffeine heap + disk) → compaction.
+ * Architecture: single in-memory {@link MemTable} in SPOC order → on flush, write 3 Parquet files per epoch (SPOC,
+ * OPSC, CSPO sort orders) → multi-tier cache (Caffeine heap + disk) → compaction.
*
*
*
@@ -80,15 +79,18 @@ class S3SailStore implements SailStore {
final Logger logger = LoggerFactory.getLogger(S3SailStore.class);
- private static final String[] SORT_ORDERS = { "soc", "osc", "cso" };
+ private static final QuadIndex SPOC_INDEX = new QuadIndex("spoc");
+ private static final QuadIndex OPSC_INDEX = new QuadIndex("opsc");
+ private static final QuadIndex CSPO_INDEX = new QuadIndex("cspo");
+ private static final List ALL_INDEXES = List.of(SPOC_INDEX, OPSC_INDEX, CSPO_INDEX);
+
private static final int DEFAULT_ROW_GROUP_SIZE = 8 * 1024 * 1024; // 8 MiB
private static final int DEFAULT_PAGE_SIZE = 64 * 1024; // 64 KiB
private final S3ValueStore valueStore;
private final S3NamespaceStore namespaceStore;
- // Single MemTable in SPOC order (new design: 1x memory, partition on flush)
- private final QuadIndex spocIndex;
+ // Single MemTable in SPOC order
private volatile MemTable memTable;
private volatile boolean mayHaveInferred;
@@ -129,8 +131,7 @@ class S3SailStore implements SailStore {
this.pageSize = DEFAULT_PAGE_SIZE;
// Single SPOC index for the MemTable
- this.spocIndex = new QuadIndex("spoc");
- this.memTable = new MemTable(spocIndex);
+ this.memTable = new MemTable(SPOC_INDEX);
// Initialize persistence
if (objectStore != null) {
@@ -198,7 +199,7 @@ public void close() throws SailException {
}
/**
- * Flushes active MemTable to Parquet files on the object store, partitioned by predicate.
+ * Flushes active MemTable to Parquet files on the object store. Writes one file per sort order (SPOC, OPSC, CSPO).
*/
private void flushToObjectStore() {
if (objectStore == null) {
@@ -221,62 +222,63 @@ private void flushToObjectStore() {
// Freeze active MemTable and swap in fresh one
MemTable frozen = memTable;
frozen.freeze();
- memTable = new MemTable(spocIndex);
-
- // Partition by predicate
- Map> partitions = frozen.partitionByPredicate();
-
- // For each predicate partition, write 3 Parquet files (all sort orders)
- for (Map.Entry> partEntry : partitions.entrySet()) {
- long predId = partEntry.getKey();
- List entries = partEntry.getValue();
-
- // Set predicate label for debugging
- Value predValue = valueStore.getValue(predId);
- if (predValue != null) {
- catalog.getPredicateLabels().put(String.valueOf(predId), predValue.stringValue());
+ memTable = new MemTable(SPOC_INDEX);
+
+ // Collect all entries as full quads
+ List allQuads = new ArrayList<>(frozen.size());
+ long[] quad = new long[4];
+ for (Map.Entry entry : frozen.getData().entrySet()) {
+ long[] q = new long[5]; // s, p, o, c, flag
+ frozen.getIndex().keyToQuad(entry.getKey(), quad);
+ q[0] = quad[QuadIndex.SUBJ_IDX];
+ q[1] = quad[QuadIndex.PRED_IDX];
+ q[2] = quad[QuadIndex.OBJ_IDX];
+ q[3] = quad[QuadIndex.CONTEXT_IDX];
+ q[4] = entry.getValue()[0];
+ allQuads.add(q);
+ }
+
+ // Compute stats across all entries
+ long minSubject = Long.MAX_VALUE, maxSubject = Long.MIN_VALUE;
+ long minPredicate = Long.MAX_VALUE, maxPredicate = Long.MIN_VALUE;
+ long minObject = Long.MAX_VALUE, maxObject = Long.MIN_VALUE;
+ long minContext = Long.MAX_VALUE, maxContext = Long.MIN_VALUE;
+ for (long[] q : allQuads) {
+ minSubject = Math.min(minSubject, q[0]);
+ maxSubject = Math.max(maxSubject, q[0]);
+ minPredicate = Math.min(minPredicate, q[1]);
+ maxPredicate = Math.max(maxPredicate, q[1]);
+ minObject = Math.min(minObject, q[2]);
+ maxObject = Math.max(maxObject, q[2]);
+ minContext = Math.min(minContext, q[3]);
+ maxContext = Math.max(maxContext, q[3]);
+ }
+
+ // For each sort order, sort and write one Parquet file
+ for (QuadIndex sortIndex : ALL_INDEXES) {
+ String sortSuffix = sortIndex.getFieldSeqString();
+
+ // Sort entries according to the sort order
+ List sorted = sortQuadEntries(allQuads, sortIndex);
+
+ // Build Parquet file
+ ParquetSchemas.SortOrder sortOrder = ParquetSchemas.SortOrder.fromSuffix(sortSuffix);
+ byte[] parquetData = ParquetFileBuilder.build(sorted, ParquetSchemas.QUAD_SCHEMA,
+ sortOrder, rowGroupSize, pageSize);
+
+ String s3Key = "data/L0-" + String.format("%05d", epoch) + "-" + sortSuffix + ".parquet";
+
+ objectStore.put(s3Key, parquetData);
+
+ // Write-through to cache
+ if (cache != null) {
+ cache.writeThrough(s3Key, parquetData);
}
- for (String sortOrder : SORT_ORDERS) {
- // Sort entries according to sort order
- List sorted = sortEntries(entries, sortOrder);
-
- // Build Parquet file
- List pqEntries = new ArrayList<>(sorted.size());
- for (MemTable.QuadEntry e : sorted) {
- pqEntries.add(new ParquetFileBuilder.QuadEntry(e.subject, e.object, e.context, e.flag));
- }
-
- byte[] parquetData = ParquetFileBuilder.build(pqEntries, ParquetSchemas.PARTITIONED_SCHEMA,
- ParquetSchemas.SortOrder.fromSuffix(sortOrder), predId, rowGroupSize, pageSize);
-
- String s3Key = "data/predicates/" + predId + "/L0-"
- + String.format("%05d", epoch) + "-" + sortOrder + ".parquet";
-
- objectStore.put(s3Key, parquetData);
-
- // Write-through to cache
- if (cache != null) {
- cache.writeThrough(s3Key, parquetData);
- }
-
- // Compute stats
- long minSubject = Long.MAX_VALUE, maxSubject = Long.MIN_VALUE;
- long minObject = Long.MAX_VALUE, maxObject = Long.MIN_VALUE;
- long minContext = Long.MAX_VALUE, maxContext = Long.MIN_VALUE;
- for (MemTable.QuadEntry e : sorted) {
- minSubject = Math.min(minSubject, e.subject);
- maxSubject = Math.max(maxSubject, e.subject);
- minObject = Math.min(minObject, e.object);
- maxObject = Math.max(maxObject, e.object);
- minContext = Math.min(minContext, e.context);
- maxContext = Math.max(maxContext, e.context);
- }
-
- catalog.addFile(predId, new Catalog.ParquetFileInfo(
- s3Key, 0, sortOrder, sorted.size(), epoch, parquetData.length,
- minSubject, maxSubject, minObject, maxObject, minContext, maxContext));
- }
+ catalog.addFile(new Catalog.ParquetFileInfo(
+ s3Key, 0, sortSuffix, sorted.size(), epoch, parquetData.length,
+ minSubject, maxSubject, minPredicate, maxPredicate,
+ minObject, maxObject, minContext, maxContext));
}
// Persist value store and namespaces
@@ -293,31 +295,42 @@ private void flushToObjectStore() {
}
/**
- * Sorts entries according to the given sort order.
+ * Sorts quad entries according to the given sort index.
*/
- private static List sortEntries(List entries, String sortOrder) {
- List sorted = new ArrayList<>(entries);
- Comparator cmp;
- switch (sortOrder) {
- case "osc":
- cmp = Comparator.comparingLong((MemTable.QuadEntry e) -> e.object)
- .thenComparingLong(e -> e.subject)
- .thenComparingLong(e -> e.context);
- break;
- case "cso":
- cmp = Comparator.comparingLong((MemTable.QuadEntry e) -> e.context)
- .thenComparingLong(e -> e.subject)
- .thenComparingLong(e -> e.object);
- break;
- case "soc":
+ private static List sortQuadEntries(List