Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,39 @@ public abstract class Changeset implements SailSink, ModelFactory {

private boolean closed;

public static boolean isOrderIndependent(Changeset changeset1, Changeset changeset2) {
Objects.requireNonNull(changeset1, "changeset1");
Objects.requireNonNull(changeset2, "changeset2");

boolean c1StatementChanges = hasStatementChanges(changeset1);
boolean c2StatementChanges = hasStatementChanges(changeset2);
boolean c1NamespaceChanges = hasNamespaceChanges(changeset1);
boolean c2NamespaceChanges = hasNamespaceChanges(changeset2);

// Both changesets are order independent if they don't add or remove anything that the other changeset modifies.
// To simplify the check we will assume that if one changeset only modifies namespaces and the other only
// statements then they are order independent.
if ((c1NamespaceChanges && !c1StatementChanges && c2StatementChanges && !c2NamespaceChanges)
|| (c2NamespaceChanges && !c2StatementChanges && c1StatementChanges && !c1NamespaceChanges)) {
return true;
}

return false;
}

private static boolean hasNamespaceChanges(Changeset changeset) {
Map<String, String> addedNamespaces = changeset.getAddedNamespaces();
Set<String> removedPrefixes = changeset.getRemovedPrefixes();
return changeset.namespaceCleared || (addedNamespaces != null && !addedNamespaces.isEmpty())
|| (removedPrefixes != null && !removedPrefixes.isEmpty());
}

private static boolean hasStatementChanges(Changeset changeset) {
Set<Resource> deprecatedContexts = changeset.getDeprecatedContexts();
return changeset.statementCleared || changeset.hasApproved() || changeset.hasDeprecated()
|| (deprecatedContexts != null && !deprecatedContexts.isEmpty());
}

@Override
public void close() throws SailException {
closed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,11 +399,38 @@ void merge(Changeset change) {
void compressChanges() {
try {
semaphore.lock();

if (changes.size() == 2) {
boolean swap = false;
Changeset[] array = changes.toArray(new Changeset[0]);
if (!(array[0].hasDeprecated() || array[0].hasApproved())
&& (array[1].hasDeprecated() || array[1].hasApproved())) {

if (Changeset.isOrderIndependent(array[0], array[1])) {

// The last change contains all the approved and deprecated statements, faster to swap before
// merging. This is typically the case when parsing a file with namespaces at the beginning.

Changeset second = changes.removeLast();
if (changes.peekLast().isRefback()) {
changes.addLast(second);
return;
}
Changeset first = changes.removeLast();

changes.addFirst(second);
changes.addLast(first);
}

}

}

while (changes.size() > 1) {
Changeset pop = changes.removeLast();
if (changes.peekLast().isRefback()) {
changes.addLast(pop);
break;
return;
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
*******************************************************************************/
package org.eclipse.rdf4j.sail.base;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand All @@ -33,6 +36,90 @@ public class ChangesetTest {
SimpleValueFactory vf = SimpleValueFactory.getInstance();
Resource[] allGraph = {};

@Test
public void testOrderIndependentNamespacesVsStatements() {
Changeset namespacesOnly = getChangeset();
namespacesOnly.setNamespace("ex", "http://example.org/");

Changeset statementsOnly = getChangeset();
statementsOnly.approve(vf.createStatement(vf.createIRI("urn:s"), vf.createIRI("urn:p"),
vf.createLiteral("o")));

assertTrue(Changeset.isOrderIndependent(namespacesOnly, statementsOnly));
assertTrue(Changeset.isOrderIndependent(statementsOnly, namespacesOnly));
}

@Test
public void testOrderDependentOnOverlappingStatements() {
Changeset change1 = getChangeset();
Changeset change2 = getChangeset();

Statement statement = vf.createStatement(vf.createIRI("urn:s"), vf.createIRI("urn:p"),
vf.createLiteral("o"));

change1.approve(statement);
change2.deprecate(statement);

assertFalse(Changeset.isOrderIndependent(change1, change2));
assertFalse(Changeset.isOrderIndependent(change2, change1));
}

@Test
public void testOrderIndependentWhenStatementsCleared() {
Changeset statementCleared = getChangeset();
statementCleared.clear();

Changeset namespacesCleared = getChangeset();
namespacesCleared.clearNamespaces();

assertTrue(Changeset.isOrderIndependent(statementCleared, namespacesCleared));
assertTrue(Changeset.isOrderIndependent(namespacesCleared, statementCleared));
}

@Test
public void testOrderIndependentWhenDeprecatedStatements() {
Statement statement = vf.createStatement(vf.createIRI("urn:s"), vf.createIRI("urn:p"),
vf.createLiteral("o"));

Changeset deprecatedOnly = getChangeset();
deprecatedOnly.deprecate(statement);

Changeset namespacesCleared = getChangeset();
namespacesCleared.clearNamespaces();

assertTrue(Changeset.isOrderIndependent(deprecatedOnly, namespacesCleared));
assertTrue(Changeset.isOrderIndependent(namespacesCleared, deprecatedOnly));
}

@Test
public void testOrderIndependentWhenClearingContexts() {
Changeset contextCleared = getChangeset();
contextCleared.clear(vf.createIRI("urn:ctx"));

Changeset namespaceRemoved = getChangeset();
namespaceRemoved.removeNamespace("ex");

assertTrue(Changeset.isOrderIndependent(contextCleared, namespaceRemoved));
assertTrue(Changeset.isOrderIndependent(namespaceRemoved, contextCleared));
}

@Test
public void testOrderDependentWhenMixingNamespacesAndStatements() {
Statement statement = vf.createStatement(vf.createIRI("urn:s"), vf.createIRI("urn:p"),
vf.createLiteral("o"));

Changeset mixedChanges = getChangeset();
mixedChanges.approve(statement);
mixedChanges.setNamespace("ex", "http://example.org/");

Changeset statementsOnly = getChangeset();
statementsOnly.approve(vf.createStatement(vf.createIRI("urn:s2"), vf.createIRI("urn:p2"),
vf.createLiteral("o2")));

assertFalse(Changeset.isOrderIndependent(mixedChanges, statementsOnly));
assertFalse(Changeset.isOrderIndependent(statementsOnly, mixedChanges));
}

@Test
public void testConcurrency() {

Expand All @@ -47,68 +134,72 @@ public void testConcurrency() {
t.setDaemon(true);
return t;
});
try {

Runnable addingData = () -> {
countDownLatch.countDown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 100000; i++) {
changeset.approve(vf.createStatement(vf.createBNode(), RDF.TYPE, RDFS.RESOURCE));
}

Runnable addingData = () -> {
countDownLatch.countDown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 100000; i++) {
changeset.approve(vf.createStatement(vf.createBNode(), RDF.TYPE, RDFS.RESOURCE));
}

};

Runnable readingData = () -> {
countDownLatch.countDown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 100000; i++) {
changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph);
}
};

Runnable readingDataIterator = () -> {
countDownLatch.countDown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

while (!changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph)) {
Thread.yield();
}
};

for (int i = 0; i < 100; i++) {
Iterator<Statement> approvedStatements = changeset.getApprovedStatements(null, RDF.TYPE, null, allGraph)
.iterator();
Runnable readingData = () -> {
countDownLatch.countDown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 100000; i++) {
changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph);
}
};

Runnable readingDataIterator = () -> {
countDownLatch.countDown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

for (int j = 0; j < 100 && approvedStatements.hasNext(); j++) {
approvedStatements.next();
while (!changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph)) {
Thread.yield();
}

}
};
for (int i = 0; i < 100; i++) {
Iterator<Statement> approvedStatements = changeset
.getApprovedStatements(null, RDF.TYPE, null, allGraph)
.iterator();

Stream.of(addingData, readingData, readingDataIterator)
.map(executorService::submit)
.collect(Collectors.toList())
.forEach(future -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
for (int j = 0; j < 100 && approvedStatements.hasNext(); j++) {
approvedStatements.next();
}
});

executorService.shutdown();
executorService.shutdownNow();
}
};

Stream.of(addingData, readingData, readingDataIterator)
.map(executorService::submit)
.collect(Collectors.toList())
.forEach(future -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
} finally {
executorService.shutdown();
executorService.shutdownNow();
}

}

private Changeset getChangeset() {
Expand Down
Loading
Loading