Skip to content

Commit 9944173

Browse files
authored
GH-5578 Reorder change sets (#5579)
2 parents ee25854 + f52fe4a commit 9944173

4 files changed

Lines changed: 317 additions & 55 deletions

File tree

core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,39 @@ public abstract class Changeset implements SailSink, ModelFactory {
123123

124124
private boolean closed;
125125

126+
public static boolean isOrderIndependent(Changeset changeset1, Changeset changeset2) {
127+
Objects.requireNonNull(changeset1, "changeset1");
128+
Objects.requireNonNull(changeset2, "changeset2");
129+
130+
boolean c1StatementChanges = hasStatementChanges(changeset1);
131+
boolean c2StatementChanges = hasStatementChanges(changeset2);
132+
boolean c1NamespaceChanges = hasNamespaceChanges(changeset1);
133+
boolean c2NamespaceChanges = hasNamespaceChanges(changeset2);
134+
135+
// Both changesets are order independent if they don't add or remove anything that the other changeset modifies.
136+
// To simplify the check we will assume that if one changeset only modifies namespaces and the other only
137+
// statements then they are order independent.
138+
if ((c1NamespaceChanges && !c1StatementChanges && c2StatementChanges && !c2NamespaceChanges)
139+
|| (c2NamespaceChanges && !c2StatementChanges && c1StatementChanges && !c1NamespaceChanges)) {
140+
return true;
141+
}
142+
143+
return false;
144+
}
145+
146+
private static boolean hasNamespaceChanges(Changeset changeset) {
147+
Map<String, String> addedNamespaces = changeset.getAddedNamespaces();
148+
Set<String> removedPrefixes = changeset.getRemovedPrefixes();
149+
return changeset.namespaceCleared || (addedNamespaces != null && !addedNamespaces.isEmpty())
150+
|| (removedPrefixes != null && !removedPrefixes.isEmpty());
151+
}
152+
153+
private static boolean hasStatementChanges(Changeset changeset) {
154+
Set<Resource> deprecatedContexts = changeset.getDeprecatedContexts();
155+
return changeset.statementCleared || changeset.hasApproved() || changeset.hasDeprecated()
156+
|| (deprecatedContexts != null && !deprecatedContexts.isEmpty());
157+
}
158+
126159
@Override
127160
public void close() throws SailException {
128161
closed = true;

core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,11 +399,38 @@ void merge(Changeset change) {
399399
void compressChanges() {
400400
try {
401401
semaphore.lock();
402+
403+
if (changes.size() == 2) {
404+
boolean swap = false;
405+
Changeset[] array = changes.toArray(new Changeset[0]);
406+
if (!(array[0].hasDeprecated() || array[0].hasApproved())
407+
&& (array[1].hasDeprecated() || array[1].hasApproved())) {
408+
409+
if (Changeset.isOrderIndependent(array[0], array[1])) {
410+
411+
// The last change contains all the approved and deprecated statements, faster to swap before
412+
// merging. This is typically the case when parsing a file with namespaces at the beginning.
413+
414+
Changeset second = changes.removeLast();
415+
if (changes.peekLast().isRefback()) {
416+
changes.addLast(second);
417+
return;
418+
}
419+
Changeset first = changes.removeLast();
420+
421+
changes.addFirst(second);
422+
changes.addLast(first);
423+
}
424+
425+
}
426+
427+
}
428+
402429
while (changes.size() > 1) {
403430
Changeset pop = changes.removeLast();
404431
if (changes.peekLast().isRefback()) {
405432
changes.addLast(pop);
406-
break;
433+
return;
407434
}
408435

409436
try {

core/sail/base/src/test/java/org/eclipse/rdf4j/sail/base/ChangesetTest.java

Lines changed: 145 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
*******************************************************************************/
1111
package org.eclipse.rdf4j.sail.base;
1212

13+
import static org.junit.jupiter.api.Assertions.assertFalse;
14+
import static org.junit.jupiter.api.Assertions.assertTrue;
15+
1316
import java.util.Iterator;
1417
import java.util.concurrent.CountDownLatch;
1518
import java.util.concurrent.ExecutionException;
@@ -33,6 +36,90 @@ public class ChangesetTest {
3336
SimpleValueFactory vf = SimpleValueFactory.getInstance();
3437
Resource[] allGraph = {};
3538

39+
@Test
40+
public void testOrderIndependentNamespacesVsStatements() {
41+
Changeset namespacesOnly = getChangeset();
42+
namespacesOnly.setNamespace("ex", "http://example.org/");
43+
44+
Changeset statementsOnly = getChangeset();
45+
statementsOnly.approve(vf.createStatement(vf.createIRI("urn:s"), vf.createIRI("urn:p"),
46+
vf.createLiteral("o")));
47+
48+
assertTrue(Changeset.isOrderIndependent(namespacesOnly, statementsOnly));
49+
assertTrue(Changeset.isOrderIndependent(statementsOnly, namespacesOnly));
50+
}
51+
52+
@Test
53+
public void testOrderDependentOnOverlappingStatements() {
54+
Changeset change1 = getChangeset();
55+
Changeset change2 = getChangeset();
56+
57+
Statement statement = vf.createStatement(vf.createIRI("urn:s"), vf.createIRI("urn:p"),
58+
vf.createLiteral("o"));
59+
60+
change1.approve(statement);
61+
change2.deprecate(statement);
62+
63+
assertFalse(Changeset.isOrderIndependent(change1, change2));
64+
assertFalse(Changeset.isOrderIndependent(change2, change1));
65+
}
66+
67+
@Test
68+
public void testOrderIndependentWhenStatementsCleared() {
69+
Changeset statementCleared = getChangeset();
70+
statementCleared.clear();
71+
72+
Changeset namespacesCleared = getChangeset();
73+
namespacesCleared.clearNamespaces();
74+
75+
assertTrue(Changeset.isOrderIndependent(statementCleared, namespacesCleared));
76+
assertTrue(Changeset.isOrderIndependent(namespacesCleared, statementCleared));
77+
}
78+
79+
@Test
80+
public void testOrderIndependentWhenDeprecatedStatements() {
81+
Statement statement = vf.createStatement(vf.createIRI("urn:s"), vf.createIRI("urn:p"),
82+
vf.createLiteral("o"));
83+
84+
Changeset deprecatedOnly = getChangeset();
85+
deprecatedOnly.deprecate(statement);
86+
87+
Changeset namespacesCleared = getChangeset();
88+
namespacesCleared.clearNamespaces();
89+
90+
assertTrue(Changeset.isOrderIndependent(deprecatedOnly, namespacesCleared));
91+
assertTrue(Changeset.isOrderIndependent(namespacesCleared, deprecatedOnly));
92+
}
93+
94+
@Test
95+
public void testOrderIndependentWhenClearingContexts() {
96+
Changeset contextCleared = getChangeset();
97+
contextCleared.clear(vf.createIRI("urn:ctx"));
98+
99+
Changeset namespaceRemoved = getChangeset();
100+
namespaceRemoved.removeNamespace("ex");
101+
102+
assertTrue(Changeset.isOrderIndependent(contextCleared, namespaceRemoved));
103+
assertTrue(Changeset.isOrderIndependent(namespaceRemoved, contextCleared));
104+
}
105+
106+
@Test
107+
public void testOrderDependentWhenMixingNamespacesAndStatements() {
108+
Statement statement = vf.createStatement(vf.createIRI("urn:s"), vf.createIRI("urn:p"),
109+
vf.createLiteral("o"));
110+
111+
Changeset mixedChanges = getChangeset();
112+
mixedChanges.approve(statement);
113+
mixedChanges.setNamespace("ex", "http://example.org/");
114+
115+
Changeset statementsOnly = getChangeset();
116+
statementsOnly.approve(vf.createStatement(vf.createIRI("urn:s2"), vf.createIRI("urn:p2"),
117+
vf.createLiteral("o2")));
118+
119+
assertFalse(Changeset.isOrderIndependent(mixedChanges, statementsOnly));
120+
assertFalse(Changeset.isOrderIndependent(statementsOnly, mixedChanges));
121+
}
122+
36123
@Test
37124
public void testConcurrency() {
38125

@@ -47,68 +134,72 @@ public void testConcurrency() {
47134
t.setDaemon(true);
48135
return t;
49136
});
137+
try {
138+
139+
Runnable addingData = () -> {
140+
countDownLatch.countDown();
141+
try {
142+
countDownLatch.await();
143+
} catch (InterruptedException e) {
144+
e.printStackTrace();
145+
}
146+
for (int i = 0; i < 100000; i++) {
147+
changeset.approve(vf.createStatement(vf.createBNode(), RDF.TYPE, RDFS.RESOURCE));
148+
}
50149

51-
Runnable addingData = () -> {
52-
countDownLatch.countDown();
53-
try {
54-
countDownLatch.await();
55-
} catch (InterruptedException e) {
56-
e.printStackTrace();
57-
}
58-
for (int i = 0; i < 100000; i++) {
59-
changeset.approve(vf.createStatement(vf.createBNode(), RDF.TYPE, RDFS.RESOURCE));
60-
}
61-
62-
};
63-
64-
Runnable readingData = () -> {
65-
countDownLatch.countDown();
66-
try {
67-
countDownLatch.await();
68-
} catch (InterruptedException e) {
69-
e.printStackTrace();
70-
}
71-
for (int i = 0; i < 100000; i++) {
72-
changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph);
73-
}
74-
};
75-
76-
Runnable readingDataIterator = () -> {
77-
countDownLatch.countDown();
78-
try {
79-
countDownLatch.await();
80-
} catch (InterruptedException e) {
81-
e.printStackTrace();
82-
}
83-
84-
while (!changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph)) {
85-
Thread.yield();
86-
}
150+
};
87151

88-
for (int i = 0; i < 100; i++) {
89-
Iterator<Statement> approvedStatements = changeset.getApprovedStatements(null, RDF.TYPE, null, allGraph)
90-
.iterator();
152+
Runnable readingData = () -> {
153+
countDownLatch.countDown();
154+
try {
155+
countDownLatch.await();
156+
} catch (InterruptedException e) {
157+
e.printStackTrace();
158+
}
159+
for (int i = 0; i < 100000; i++) {
160+
changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph);
161+
}
162+
};
163+
164+
Runnable readingDataIterator = () -> {
165+
countDownLatch.countDown();
166+
try {
167+
countDownLatch.await();
168+
} catch (InterruptedException e) {
169+
e.printStackTrace();
170+
}
91171

92-
for (int j = 0; j < 100 && approvedStatements.hasNext(); j++) {
93-
approvedStatements.next();
172+
while (!changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph)) {
173+
Thread.yield();
94174
}
95175

96-
}
97-
};
176+
for (int i = 0; i < 100; i++) {
177+
Iterator<Statement> approvedStatements = changeset
178+
.getApprovedStatements(null, RDF.TYPE, null, allGraph)
179+
.iterator();
98180

99-
Stream.of(addingData, readingData, readingDataIterator)
100-
.map(executorService::submit)
101-
.collect(Collectors.toList())
102-
.forEach(future -> {
103-
try {
104-
future.get();
105-
} catch (InterruptedException | ExecutionException e) {
106-
throw new RuntimeException(e);
181+
for (int j = 0; j < 100 && approvedStatements.hasNext(); j++) {
182+
approvedStatements.next();
107183
}
108-
});
109184

110-
executorService.shutdown();
111-
executorService.shutdownNow();
185+
}
186+
};
187+
188+
Stream.of(addingData, readingData, readingDataIterator)
189+
.map(executorService::submit)
190+
.collect(Collectors.toList())
191+
.forEach(future -> {
192+
try {
193+
future.get();
194+
} catch (InterruptedException | ExecutionException e) {
195+
throw new RuntimeException(e);
196+
}
197+
});
198+
} finally {
199+
executorService.shutdown();
200+
executorService.shutdownNow();
201+
}
202+
112203
}
113204

114205
private Changeset getChangeset() {

0 commit comments

Comments
 (0)