Skip to content

Commit 46f9a19

Browse files
committed
Store multiple elements in a value to reduce size on disk.
1 parent 83371cb commit 46f9a19

1 file changed

Lines changed: 134 additions & 28 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java

Lines changed: 134 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.openDatabase;
1515
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.readTransaction;
1616
import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.transaction;
17+
import static org.eclipse.rdf4j.sail.lmdb.Varint.firstToLength;
1718
import static org.lwjgl.system.MemoryStack.stackPush;
1819
import static org.lwjgl.system.MemoryUtil.NULL;
1920
import static org.lwjgl.util.lmdb.LMDB.MDB_CREATE;
21+
import static org.lwjgl.util.lmdb.LMDB.MDB_CURRENT;
2022
import static org.lwjgl.util.lmdb.LMDB.MDB_DUPSORT;
2123
import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST;
2224
import static org.lwjgl.util.lmdb.LMDB.MDB_FIRST_DUP;
@@ -31,12 +33,15 @@
3133
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTFOUND;
3234
import static org.lwjgl.util.lmdb.LMDB.MDB_NOTLS;
3335
import static org.lwjgl.util.lmdb.LMDB.MDB_PREV;
36+
import static org.lwjgl.util.lmdb.LMDB.MDB_SET;
3437
import static org.lwjgl.util.lmdb.LMDB.MDB_SET_RANGE;
3538
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
3639
import static org.lwjgl.util.lmdb.LMDB.mdb_cmp;
3740
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_close;
41+
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_del;
3842
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_get;
3943
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_open;
44+
import static org.lwjgl.util.lmdb.LMDB.mdb_cursor_put;
4045
import static org.lwjgl.util.lmdb.LMDB.mdb_dbi_close;
4146
import static org.lwjgl.util.lmdb.LMDB.mdb_dbi_open;
4247
import static org.lwjgl.util.lmdb.LMDB.mdb_dcmp;
@@ -178,27 +183,6 @@ class TripleStore implements Closeable {
178183

179184
private TxnRecordCache recordCache = null;
180185

181-
static final Comparator<ByteBuffer> COMPARATOR = new Comparator<ByteBuffer>() {
182-
@Override
183-
public int compare(ByteBuffer b1, ByteBuffer b2) {
184-
int b1Len = b1.remaining();
185-
int b2Len = b2.remaining();
186-
int diff = compareRegion(b1, b1.position(), b2, b2.position(), Math.min(b1Len, b2Len));
187-
if (diff != 0) {
188-
return diff;
189-
}
190-
return b1Len > b2Len ? 1 : -1;
191-
}
192-
193-
public int compareRegion(ByteBuffer array1, int startIdx1, ByteBuffer array2, int startIdx2, int length) {
194-
int result = 0;
195-
for (int i = 0; result == 0 && i < length; i++) {
196-
result = (array1.get(startIdx1 + i) & 0xff) - (array2.get(startIdx2 + i) & 0xff);
197-
}
198-
return result;
199-
}
200-
};
201-
202186
TripleStore(File dir, LmdbStoreConfig config, ValueStore valueStore) throws IOException, SailException {
203187
this.dir = dir;
204188
this.forceSync = config.getForceSync();
@@ -920,6 +904,112 @@ private boolean requiresResize() {
920904
}
921905
}
922906

907+
private int merge(long cursor, int elements, MDBVal keyVal, MDBVal dataVal, ByteBuffer newValueBuf,
908+
ByteBuffer target) throws IOException {
909+
int rc = mdb_cursor_get(cursor, keyVal, dataVal, MDB_SET);
910+
int newValuesSize = 0;
911+
if (rc == MDB_SUCCESS) {
912+
rc = E(mdb_cursor_get(cursor, keyVal, dataVal, MDB_GET_BOTH_RANGE));
913+
if (rc == MDB_SUCCESS) {
914+
ByteBuffer existing = dataVal.mv_data();
915+
916+
newValuesSize = newValueBuf.remaining();
917+
int existingValuesSize = existing.remaining();
918+
int diff = -1;
919+
while (existing.hasRemaining() &&
920+
(diff = compareRegion(newValueBuf, 0, existing, existing.position(),
921+
Math.min(newValuesSize, existing.remaining()))) > 0) {
922+
for (int i = 0; i < elements; i++) {
923+
skipVarint(existing);
924+
}
925+
}
926+
if (diff == 0) {
927+
return MDB_KEYEXIST;
928+
}
929+
int insertPos = existing.position();
930+
if (insertPos > 0) {
931+
// copy existing elements and insert new elements in between
932+
target.clear();
933+
target.put(existing.slice().flip());
934+
target.put(newValueBuf);
935+
if (existing.hasRemaining()) {
936+
target.put(existing);
937+
}
938+
// delete existing entry
939+
E(mdb_cursor_del(cursor, 0));
940+
// store one or more new entries
941+
int totalSize = target.position();
942+
target.flip();
943+
if (insertPos + newValuesSize > 500) {
944+
target.limit(insertPos);
945+
dataVal.mv_data(target);
946+
E(mdb_cursor_put(cursor, keyVal, dataVal, 0));
947+
target.position(insertPos);
948+
target.limit(totalSize);
949+
}
950+
if (target.remaining() > 500) {
951+
target.limit(insertPos + newValuesSize);
952+
dataVal.mv_data(target);
953+
E(mdb_cursor_put(cursor, keyVal, dataVal, 0));
954+
target.position(insertPos + newValuesSize);
955+
target.limit(totalSize);
956+
}
957+
if (target.hasRemaining()) {
958+
dataVal.mv_data(target);
959+
E(mdb_cursor_put(cursor, keyVal, dataVal, 0));
960+
}
961+
} else {
962+
if (existingValuesSize + newValuesSize <= 500) {
963+
target.clear();
964+
target.put(newValueBuf);
965+
target.put(existing);
966+
target.flip();
967+
dataVal.mv_data(target);
968+
E(mdb_cursor_del(cursor, 0));
969+
E(mdb_cursor_put(cursor, keyVal, dataVal, 0));
970+
} else {
971+
dataVal.mv_data(newValueBuf);
972+
E(mdb_cursor_put(cursor, keyVal, dataVal, 0));
973+
}
974+
}
975+
return MDB_SUCCESS;
976+
} else {
977+
E(mdb_cursor_get(cursor, keyVal, dataVal, MDB_LAST_DUP));
978+
if (dataVal.mv_data().remaining() + newValuesSize <= 500) {
979+
// append to last existing value
980+
ByteBuffer existing = dataVal.mv_data();
981+
target.clear();
982+
target.put(existing);
983+
target.put(newValueBuf);
984+
target.flip();
985+
E(mdb_cursor_del(cursor, 0));
986+
dataVal.mv_data(target);
987+
E(mdb_cursor_put(cursor, keyVal, dataVal, 0));
988+
return MDB_SUCCESS;
989+
}
990+
}
991+
}
992+
dataVal.mv_data(newValueBuf);
993+
E(mdb_cursor_put(cursor, keyVal, dataVal, 0));
994+
return MDB_SUCCESS;
995+
}
996+
997+
private static int compareRegion(ByteBuffer bb1, int startIdx1, ByteBuffer bb2, int startIdx2, int length) {
998+
int result = 0;
999+
for (int i = 0; result == 0 && i < length; i++) {
1000+
result = (bb1.get(startIdx1 + i) & 0xff) - (bb2.get(startIdx2 + i) & 0xff);
1001+
}
1002+
return result;
1003+
}
1004+
1005+
private void skipVarint(ByteBuffer other) {
1006+
int i = firstToLength(other.get()) - 1;
1007+
assert i >= 0;
1008+
if (i > 0) {
1009+
other.position(i + other.position());
1010+
}
1011+
}
1012+
9231013
public boolean storeTriple(long subj, long pred, long obj, long context, boolean explicit) throws IOException {
9241014
TripleIndex mainIndex = indexes.get(0);
9251015
boolean stAdded;
@@ -929,11 +1019,13 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
9291019
MDBVal dataVal = MDBVal.calloc(stack);
9301020
ByteBuffer keyBuf = stack.malloc(MAX_KEY_LENGTH);
9311021
ByteBuffer valueBuf = stack.malloc(MAX_KEY_LENGTH);
1022+
ByteBuffer mergedBuf = stack.malloc(500 + MAX_KEY_LENGTH);
9321023
mainIndex.toEntry(keyBuf, valueBuf, subj, pred, obj, context);
9331024
keyBuf.flip();
9341025
keyVal.mv_data(keyBuf);
9351026
valueBuf.flip();
9361027
dataVal.mv_data(valueBuf);
1028+
PointerBuffer pCursor = stack.mallocPointer(1);
9371029

9381030
if (recordCache == null) {
9391031
if (requiresResize()) {
@@ -953,14 +1045,21 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
9531045
return recordCache.storeRecord(quad, explicit);
9541046
}
9551047

956-
int rc = mdb_put(writeTxn, mainIndex.getDB(explicit), keyVal, dataVal, MDB_NODUPDATA);
957-
if (rc != MDB_SUCCESS && rc != MDB_KEYEXIST) {
958-
throw new IOException(mdb_strerror(rc));
1048+
mdb_cursor_open(writeTxn, mainIndex.getDB(explicit), pCursor);
1049+
long cursor = pCursor.get(0);
1050+
try {
1051+
int rc = merge(cursor, 4 - mainIndex.indexSplitPosition, keyVal, dataVal, valueBuf, mergedBuf);
1052+
if (rc != MDB_SUCCESS && rc != MDB_KEYEXIST) {
1053+
throw new IOException(mdb_strerror(rc));
1054+
}
1055+
stAdded = rc == MDB_SUCCESS;
1056+
} finally {
1057+
mdb_cursor_close(cursor);
9591058
}
960-
stAdded = rc == MDB_SUCCESS;
1059+
9611060
boolean foundImplicit = false;
9621061
if (explicit && stAdded) {
963-
foundImplicit = mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal) == MDB_SUCCESS;
1062+
// foundImplicit = mdb_del(writeTxn, mainIndex.getDB(false), keyVal, dataVal) == MDB_SUCCESS;
9641063
}
9651064

9661065
if (stAdded) {
@@ -977,9 +1076,16 @@ public boolean storeTriple(long subj, long pred, long obj, long context, boolean
9771076
dataVal.mv_data(valueBuf);
9781077

9791078
if (foundImplicit) {
980-
E(mdb_del(writeTxn, index.getDB(false), keyVal, dataVal));
1079+
// E(mdb_del(writeTxn, index.getDB(false), keyVal, dataVal));
1080+
}
1081+
1082+
mdb_cursor_open(writeTxn, index.getDB(explicit), pCursor);
1083+
cursor = pCursor.get(0);
1084+
try {
1085+
merge(cursor, 4 - index.indexSplitPosition, keyVal, dataVal, valueBuf, mergedBuf);
1086+
} finally {
1087+
mdb_cursor_close(cursor);
9811088
}
982-
E(mdb_put(writeTxn, index.getDB(explicit), keyVal, dataVal, 0));
9831089
}
9841090

9851091
incrementContext(stack, context);

0 commit comments

Comments
 (0)