Skip to content

Commit 143df50

Browse files
GH-4899 Introduce MapDB3 backed queue to the MapDb3CollectionFactory
1 parent 460329b commit 143df50

2 files changed

Lines changed: 210 additions & 9 deletions

File tree

core/collection-factory/mapdb3/src/main/java/org/eclipse/rdf4j/collection/factory/mapdb/MapDb3CollectionFactory.java

Lines changed: 138 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@
1111
package org.eclipse.rdf4j.collection.factory.mapdb;
1212

1313
import java.util.AbstractMap;
14+
import java.util.AbstractQueue;
1415
import java.util.AbstractSet;
1516
import java.util.ArrayList;
1617
import java.util.Collection;
1718
import java.util.HashSet;
1819
import java.util.Iterator;
1920
import java.util.List;
2021
import java.util.Map;
22+
import java.util.NoSuchElementException;
2123
import java.util.Queue;
2224
import java.util.Set;
2325
import java.util.function.BiConsumer;
@@ -41,11 +43,12 @@
4143
import org.mapdb.HTreeMap;
4244
import org.mapdb.Serializer;
4345
import org.mapdb.serializer.SerializerJava;
46+
import org.mapdb.serializer.SerializerLong;
4447

4548
public class MapDb3CollectionFactory implements CollectionFactory {
4649
// The size 16 seems like a nice starting value but others could well
4750
// be better.
48-
private static final int SWITCH_TO_DISK_BASED_SET_AT_SIZE = 16;
51+
private static final int DEFAULT_SWITCH_TO_DISK_BASED_SET_AT_SIZE = 16;
4952
protected volatile DB db;
5053
protected volatile long colectionId = 0;
5154
protected final long iterationCacheSyncThreshold;
@@ -54,6 +57,63 @@ public class MapDb3CollectionFactory implements CollectionFactory {
5457
// So I am not going to worry about this.
5558
private static final boolean ON_32_BIT_VM = "32".equals(System.getProperty("sun.arch.data.model"));
5659

60+
private final class MapDb3BackedQueue<T> extends AbstractQueue<T> {
61+
private final Map<Long, T> m;
62+
private long tail;
63+
private long head;
64+
65+
private MapDb3BackedQueue(Map<Long, T> m) {
66+
this.m = m;
67+
}
68+
69+
@Override
70+
public boolean offer(T arg0) {
71+
m.put((Long) tail++, arg0);
72+
if (tail % iterationCacheSyncThreshold == 0)
73+
db.commit();
74+
return true;
75+
}
76+
77+
@Override
78+
public T peek() {
79+
return m.get(head);
80+
}
81+
82+
@Override
83+
public T poll() {
84+
T r = m.remove(head++);
85+
if (head % iterationCacheSyncThreshold == 0)
86+
db.commit();
87+
return r;
88+
}
89+
90+
@Override
91+
public Iterator<T> iterator() {
92+
return new Iterator<>() {
93+
long at = head;
94+
95+
@Override
96+
public boolean hasNext() {
97+
return at < tail;
98+
}
99+
100+
@Override
101+
public T next() {
102+
if (at >= tail) {
103+
throw new NoSuchElementException();
104+
}
105+
return m.get(at++);
106+
}
107+
108+
};
109+
}
110+
111+
@Override
112+
public int size() {
113+
return (int) (tail - head);
114+
}
115+
}
116+
57117
protected static final class RDF4jMapDB3Exception extends RDF4JException {
58118

59119
private static final long serialVersionUID = 1L;
@@ -112,7 +172,7 @@ public Set<BindingSet> createSetOfBindingSets(Supplier<MutableBindingSet> create
112172
init();
113173
Serializer<BindingSet> serializer = createBindingSetSerializer(create, getHas, getget, getSet);
114174
MemoryTillSizeXSet<BindingSet> set = new MemoryTillSizeXSet<>(colectionId++,
115-
delegate.createSetOfBindingSets(), serializer);
175+
delegate.createSetOfBindingSets(), serializer, DEFAULT_SWITCH_TO_DISK_BASED_SET_AT_SIZE);
116176
return new CommitingSet<>(set, iterationCacheSyncThreshold, db);
117177
} else {
118178
return delegate.createSetOfBindingSets();
@@ -124,7 +184,8 @@ public <T> Set<T> createSet() {
124184
if (iterationCacheSyncThreshold > 0) {
125185
init();
126186
Serializer<T> serializer = createAnySerializer();
127-
MemoryTillSizeXSet<T> set = new MemoryTillSizeXSet<T>(colectionId++, delegate.createSet(), serializer);
187+
MemoryTillSizeXSet<T> set = new MemoryTillSizeXSet<T>(colectionId++, delegate.createSet(), serializer,
188+
DEFAULT_SWITCH_TO_DISK_BASED_SET_AT_SIZE);
128189
return new CommitingSet<T>(set, iterationCacheSyncThreshold, db);
129190
} else {
130191
return delegate.createSet();
@@ -136,7 +197,8 @@ public Set<Value> createValueSet() {
136197
if (iterationCacheSyncThreshold > 0) {
137198
init();
138199
Serializer<Value> serializer = createValueSerializer();
139-
Set<Value> set = new MemoryTillSizeXSet<>(colectionId++, delegate.createValueSet(), serializer);
200+
Set<Value> set = new MemoryTillSizeXSet<>(colectionId++, delegate.createValueSet(), serializer,
201+
DEFAULT_SWITCH_TO_DISK_BASED_SET_AT_SIZE);
140202
return new CommitingSet<Value>(set, iterationCacheSyncThreshold, db);
141203
} else {
142204
return delegate.createValueSet();
@@ -173,12 +235,28 @@ public <V> Map<Value, V> createValueKeyedMap() {
173235

174236
@Override
175237
public <T> Queue<T> createQueue() {
176-
return delegate.createQueue();
238+
if (iterationCacheSyncThreshold > 0) {
239+
init();
240+
Serializer<T> s = createAnySerializer();
241+
Map<Long, T> m = db.hashMap(Long.toHexString(colectionId++), new SerializerLong(), s).create();
242+
243+
return new MemoryTillSizeXQueue<>(delegate.createQueue(), 128, () -> new MapDb3BackedQueue<>(m));
244+
} else {
245+
return delegate.createQueue();
246+
}
177247
}
178248

179249
@Override
180250
public Queue<Value> createValueQueue() {
181-
return delegate.createValueQueue();
251+
if (iterationCacheSyncThreshold > 0) {
252+
init();
253+
Serializer<Value> s = createValueSerializer();
254+
Map<Long, Value> m = db.hashMap(Long.toHexString(colectionId++), new SerializerLong(), s).create();
255+
return new MemoryTillSizeXQueue<>(delegate.createQueue(), 128, () -> new MapDb3BackedQueue<>(m));
256+
257+
} else {
258+
return delegate.createValueQueue();
259+
}
182260
}
183261

184262
@Override
@@ -303,17 +381,19 @@ protected class MemoryTillSizeXSet<V> extends AbstractSet<V> {
303381
private Set<V> wrapped;
304382
private final long setName;
305383
private final Serializer<V> valueSerializer;
384+
private final long switchToDiskAtSize;
306385

307-
public MemoryTillSizeXSet(long setName, Set<V> wrapped, Serializer<V> valueSerializer) {
386+
public MemoryTillSizeXSet(long setName, Set<V> wrapped, Serializer<V> valueSerializer, long switchToSize) {
308387
super();
309388
this.setName = setName;
310389
this.wrapped = wrapped;
311390
this.valueSerializer = valueSerializer;
391+
this.switchToDiskAtSize = switchToSize;
312392
}
313393

314394
@Override
315395
public boolean add(V e) {
316-
if (wrapped instanceof HashSet && wrapped.size() > SWITCH_TO_DISK_BASED_SET_AT_SIZE) {
396+
if (wrapped instanceof HashSet && wrapped.size() > switchToDiskAtSize) {
317397
Set<V> disk = db.hashSet(Long.toHexString(setName), valueSerializer).create();
318398
disk.addAll(wrapped);
319399
wrapped = disk;
@@ -323,7 +403,7 @@ public boolean add(V e) {
323403

324404
@Override
325405
public boolean addAll(Collection<? extends V> arg0) {
326-
if (wrapped instanceof HashSet && arg0.size() > SWITCH_TO_DISK_BASED_SET_AT_SIZE) {
406+
if (wrapped instanceof HashSet && arg0.size() > switchToDiskAtSize) {
327407
Set<V> disk = db.hashSet(Long.toHexString(setName), valueSerializer).create();
328408
disk.addAll(wrapped);
329409
wrapped = disk;
@@ -383,6 +463,55 @@ public int size() {
383463

384464
}
385465

466+
/**
467+
* Only create a disk based set once the contents are large enough that it starts to pay off.
468+
*
469+
* @param <T> of the contents of the set.
470+
*/
471+
protected class MemoryTillSizeXQueue<V> extends AbstractQueue<V> {
472+
private Queue<V> wrapped;
473+
private final long switchToDiskAtSize;
474+
private final Supplier<Queue<V>> supplier;
475+
476+
public MemoryTillSizeXQueue(Queue<V> wrapped, long switchToSize, Supplier<Queue<V>> supplier) {
477+
super();
478+
this.wrapped = wrapped;
479+
this.switchToDiskAtSize = switchToSize;
480+
this.supplier = supplier;
481+
}
482+
483+
@Override
484+
public int size() {
485+
return wrapped.size();
486+
}
487+
488+
@Override
489+
public boolean offer(V e) {
490+
if (!(wrapped instanceof MapDb3BackedQueue) && wrapped.size() > switchToDiskAtSize) {
491+
Queue<V> disk = supplier.get();
492+
disk.addAll(wrapped);
493+
wrapped = disk;
494+
}
495+
return wrapped.offer(e);
496+
}
497+
498+
@Override
499+
public V peek() {
500+
return wrapped.peek();
501+
}
502+
503+
@Override
504+
public V poll() {
505+
return wrapped.poll();
506+
}
507+
508+
@Override
509+
public Iterator<V> iterator() {
510+
return wrapped.iterator();
511+
}
512+
513+
}
514+
386515
/**
387516
* These methods should be overriding in case a store can deliver a better serialization protocol.
388517
*
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.collection.factory.mapdb;
12+
13+
import static org.junit.jupiter.api.Assertions.assertEquals;
14+
import static org.junit.jupiter.api.Assertions.assertFalse;
15+
import static org.junit.jupiter.api.Assertions.assertNotNull;
16+
import static org.junit.jupiter.api.Assertions.assertTrue;
17+
import static org.junit.jupiter.api.Assertions.fail;
18+
19+
import java.util.Iterator;
20+
import java.util.NoSuchElementException;
21+
import java.util.Queue;
22+
23+
import org.junit.jupiter.api.Test;
24+
25+
public class MapDb3CollectionFactoryTest {
26+
27+
@Test
28+
void queuOfferAnd() {
29+
try (MapDb3CollectionFactory mapDb3CollectionFactory = new MapDb3CollectionFactory(1)) {
30+
Queue<String> q = mapDb3CollectionFactory.createQueue();
31+
int size = 1024;
32+
for (int i = 0; i < size; i++) {
33+
assertTrue(q.offer(Integer.toString(i)));
34+
}
35+
assertEquals(size, q.size());
36+
for (int i = 0; i < size; i++) {
37+
String p = q.peek();
38+
assertEquals(p, Integer.toString(i));
39+
String p2 = q.peek();
40+
assertEquals(p2, Integer.toString(i));
41+
String s = q.poll();
42+
assertEquals(s, Integer.toString(i));
43+
}
44+
assertEquals(0, q.size());
45+
}
46+
}
47+
48+
@Test
49+
void iterator() {
50+
try (MapDb3CollectionFactory mapDb3CollectionFactory = new MapDb3CollectionFactory(1)) {
51+
Queue<String> q = mapDb3CollectionFactory.createQueue();
52+
int size = 1024;
53+
for (int i = 0; i < size; i++) {
54+
assertTrue(q.offer(Integer.toString(i)));
55+
}
56+
assertEquals(size, q.size());
57+
Iterator<String> iter = q.iterator();
58+
for (int i = 0; i < size; i++) {
59+
assertTrue(iter.hasNext());
60+
assertEquals(iter.next(), Integer.toString(i));
61+
}
62+
assertFalse(iter.hasNext());
63+
assertEquals(size, q.size());
64+
try {
65+
iter.next();
66+
fail();
67+
} catch (NoSuchElementException e) {
68+
assertNotNull(e);
69+
}
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)