Skip to content

Commit 9c8fae2

Browse files
committed
GH-2998 add benchmarks
1 parent c58eb52 commit 9c8fae2

2 files changed

Lines changed: 480 additions & 0 deletions

File tree

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
12+
package org.eclipse.rdf4j.sail.lmdb.benchmark;
13+
14+
import java.io.File;
15+
import java.io.IOException;
16+
import java.io.InputStream;
17+
import java.io.StringWriter;
18+
import java.util.ArrayList;
19+
import java.util.HashSet;
20+
import java.util.List;
21+
import java.util.Random;
22+
import java.util.Set;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.Future;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.stream.Collectors;
29+
30+
import org.apache.commons.io.FileUtils;
31+
import org.assertj.core.util.Files;
32+
import org.eclipse.rdf4j.model.IRI;
33+
import org.eclipse.rdf4j.model.Model;
34+
import org.eclipse.rdf4j.model.Statement;
35+
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
36+
import org.eclipse.rdf4j.model.util.Values;
37+
import org.eclipse.rdf4j.repository.RepositoryResult;
38+
import org.eclipse.rdf4j.repository.sail.SailRepository;
39+
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
40+
import org.eclipse.rdf4j.rio.RDFFormat;
41+
import org.eclipse.rdf4j.rio.Rio;
42+
import org.eclipse.rdf4j.sail.NotifyingSail;
43+
import org.eclipse.rdf4j.sail.NotifyingSailConnection;
44+
import org.eclipse.rdf4j.sail.SailConnectionListener;
45+
import org.eclipse.rdf4j.sail.SailException;
46+
import org.eclipse.rdf4j.sail.helpers.NotifyingSailConnectionWrapper;
47+
import org.eclipse.rdf4j.sail.helpers.NotifyingSailWrapper;
48+
import org.eclipse.rdf4j.sail.lmdb.LmdbStore;
49+
import org.openjdk.jmh.annotations.Benchmark;
50+
import org.openjdk.jmh.annotations.BenchmarkMode;
51+
import org.openjdk.jmh.annotations.Fork;
52+
import org.openjdk.jmh.annotations.Level;
53+
import org.openjdk.jmh.annotations.Measurement;
54+
import org.openjdk.jmh.annotations.Mode;
55+
import org.openjdk.jmh.annotations.OutputTimeUnit;
56+
import org.openjdk.jmh.annotations.Scope;
57+
import org.openjdk.jmh.annotations.Setup;
58+
import org.openjdk.jmh.annotations.State;
59+
import org.openjdk.jmh.annotations.Warmup;
60+
import org.openjdk.jmh.runner.Runner;
61+
import org.openjdk.jmh.runner.RunnerException;
62+
import org.openjdk.jmh.runner.options.Options;
63+
import org.openjdk.jmh.runner.options.OptionsBuilder;
64+
import org.slf4j.LoggerFactory;
65+
66+
import ch.qos.logback.classic.Logger;
67+
68+
/**
69+
* @author Håvard Ottestad
70+
*/
71+
@State(Scope.Benchmark)
72+
@Warmup(iterations = 0)
73+
@BenchmarkMode({ Mode.AverageTime })
74+
@Fork(value = 1, jvmArgs = { "-Xms1G", "-Xmx1G", "-XX:+UseParallelGC" })
75+
@Measurement(iterations = 10, batchSize = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS)
76+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
77+
public class OverflowBenchmarkConcurrent {
78+
79+
@Setup(Level.Trial)
80+
public void setup() {
81+
((Logger) (LoggerFactory
82+
.getLogger("org.eclipse.rdf4j.sail.lmdb.MemoryOverflowModel")))
83+
.setLevel(ch.qos.logback.classic.Level.DEBUG);
84+
}
85+
86+
public static void main(String[] args) throws RunnerException {
87+
Options opt = new OptionsBuilder()
88+
.include("OverflowBenchmarkConcurrent") // adapt to run other benchmark tests
89+
.build();
90+
91+
new Runner(opt).run();
92+
}
93+
94+
@Benchmark
95+
public void manyConcurrentTransactions() throws IOException {
96+
File temporaryFolder = Files.newTemporaryFolder();
97+
SailRepository sailRepository = new SailRepository(new NotifySailWrapper(new NotifySailWrapper(
98+
new NotifySailWrapper(
99+
new NotifySailWrapper(new NotifySailWrapper(new LmdbStore(temporaryFolder)))))));
100+
ExecutorService executorService = Executors.newFixedThreadPool(10);
101+
102+
try {
103+
104+
Model parse;
105+
try (InputStream resourceAsStream = getClass().getClassLoader()
106+
.getResourceAsStream("benchmarkFiles/datagovbe-valid.ttl")) {
107+
parse = Rio.parse(resourceAsStream, RDFFormat.TURTLE);
108+
}
109+
110+
List<Future<?>> futureList = new ArrayList<>();
111+
112+
CountDownLatch countDownLatch = new CountDownLatch(1);
113+
114+
for (int i = 0; i < 38; i++) {
115+
var seed = i + 485924;
116+
{
117+
Future<?> submit = executorService.submit(() -> {
118+
try {
119+
countDownLatch.await();
120+
} catch (InterruptedException e) {
121+
throw new RuntimeException(e);
122+
}
123+
try (SailRepositoryConnection connection = sailRepository.getConnection()) {
124+
125+
int addSize = new Random(seed).nextInt(parse.size());
126+
IRI context = Values.iri("http://example.org/" + new Random(seed + 1).nextInt(10));
127+
List<Statement> collect = parse.stream()
128+
.skip(addSize)
129+
.limit(10_000)
130+
.map(s -> SimpleValueFactory.getInstance()
131+
.createStatement(s.getSubject(), s.getPredicate(), s.getObject(), context))
132+
.collect(Collectors.toList());
133+
StringWriter stringWriter = new StringWriter();
134+
Rio.write(collect, stringWriter, RDFFormat.TRIG);
135+
String string = stringWriter.toString();
136+
137+
connection.prepareUpdate("INSERT DATA { GRAPH " + string + " }").execute();
138+
139+
System.out.println("Added");
140+
}
141+
});
142+
futureList.add(submit);
143+
}
144+
{
145+
Future<?> submit = executorService.submit(() -> {
146+
try {
147+
countDownLatch.await();
148+
} catch (InterruptedException e) {
149+
throw new RuntimeException(e);
150+
}
151+
try (SailRepositoryConnection connection = sailRepository.getConnection()) {
152+
System.out.println("Waiting");
153+
long l = System.currentTimeMillis();
154+
while (!connection.isEmpty()) {
155+
try {
156+
Thread.sleep(1);
157+
} catch (InterruptedException e) {
158+
return;
159+
}
160+
if (System.currentTimeMillis() - l > 1000) {
161+
break;
162+
}
163+
}
164+
System.out.println("Removing");
165+
connection.begin();
166+
try (RepositoryResult<Statement> statements = connection.getStatements(null, null, null)) {
167+
statements.stream().limit(10_000).forEach(connection::remove);
168+
}
169+
connection.commit();
170+
171+
System.out.println("Removed");
172+
}
173+
});
174+
futureList.add(submit);
175+
}
176+
}
177+
178+
countDownLatch.countDown();
179+
180+
for (int i = 0; i < futureList.size(); i++) {
181+
Future<?> future = futureList.get(i);
182+
try {
183+
future.get();
184+
} catch (Exception e) {
185+
throw new RuntimeException(e);
186+
}
187+
System.out.println("Done: " + i);
188+
}
189+
190+
} finally {
191+
try {
192+
executorService.shutdownNow();
193+
} finally {
194+
try {
195+
sailRepository.shutDown();
196+
} finally {
197+
FileUtils.deleteDirectory(temporaryFolder);
198+
}
199+
}
200+
}
201+
202+
}
203+
204+
static class NotifySailWrapper extends NotifyingSailWrapper {
205+
206+
public NotifySailWrapper(NotifyingSail baseSail) {
207+
super(baseSail);
208+
}
209+
210+
@Override
211+
public NotifyingSailConnection getConnection() throws SailException {
212+
return new Connection(super.getConnection());
213+
}
214+
}
215+
216+
static class Connection extends NotifyingSailConnectionWrapper implements SailConnectionListener {
217+
218+
Set<Statement> addedStatements = new HashSet<>();
219+
Set<Statement> removedStatements = new HashSet<>();
220+
221+
public Connection(NotifyingSailConnection wrappedCon) {
222+
super(wrappedCon);
223+
addConnectionListener(this);
224+
}
225+
226+
@Override
227+
public void statementAdded(Statement st) {
228+
removedStatements.remove(st);
229+
addedStatements.add(st);
230+
}
231+
232+
@Override
233+
public void statementRemoved(Statement st) {
234+
addedStatements.remove(st);
235+
removedStatements.add(st);
236+
}
237+
238+
}
239+
240+
}

0 commit comments

Comments
 (0)