|
15 | 15 |
|
16 | 16 | import java.io.File; |
17 | 17 | import java.lang.reflect.Field; |
18 | | -import java.util.concurrent.CountDownLatch; |
19 | 18 | import java.util.concurrent.TimeUnit; |
20 | | -import java.util.concurrent.atomic.AtomicReference; |
21 | 19 | import java.util.stream.Stream; |
22 | 20 |
|
23 | 21 | import org.eclipse.rdf4j.common.iteration.CloseableIteration; |
@@ -95,67 +93,35 @@ private void preloadStatements(LmdbStore store, int round) throws Exception { |
95 | 93 | } |
96 | 94 |
|
97 | 95 | private void runLongLivedReaderRound(LmdbStore store, IsolationLevel isolationLevel, int round) throws Exception { |
98 | | - CountDownLatch readerReady = new CountDownLatch(1); |
99 | | - CountDownLatch writerFinished = new CountDownLatch(1); |
100 | | - AtomicReference<Throwable> failure = new AtomicReference<>(); |
101 | | - |
102 | | - Thread reader = new Thread(() -> { |
103 | | - try (SailConnection connection = store.getConnection()) { |
104 | | - connection.begin(isolationLevel); |
105 | | - try (CloseableIteration<? extends Statement> statements = connection.getStatements(null, RDFS.LABEL, |
106 | | - null, |
107 | | - false)) { |
108 | | - assertThat(statements.hasNext()).isTrue(); |
109 | | - statements.next(); |
110 | | - readerReady.countDown(); |
111 | | - assertThat(writerFinished.await(10, TimeUnit.SECONDS)).isTrue(); |
112 | | - while (statements.hasNext()) { |
113 | | - statements.next(); |
114 | | - Thread.yield(); |
| 96 | + try (SailConnection reader = store.getConnection()) { |
| 97 | + reader.begin(isolationLevel); |
| 98 | + try (CloseableIteration<? extends Statement> statements = reader.getStatements(null, RDFS.LABEL, null, |
| 99 | + false)) { |
| 100 | + assertThat(statements.hasNext()).isTrue(); |
| 101 | + statements.next(); |
| 102 | + try (SailConnection writer = store.getConnection()) { |
| 103 | + writer.begin(isolationLevel); |
| 104 | + forceOverflowOnNextAdd(); |
| 105 | + for (int i = 0; i < REMOVED_PRELOADED_STATEMENTS; i++) { |
| 106 | + writer.removeStatements(subject("preloaded", round, i), RDFS.LABEL, null); |
| 107 | + } |
| 108 | + for (int i = 0; i < COMMITTED_OVERFLOW_STATEMENTS; i++) { |
| 109 | + writer.addStatement(subject("committed", round, i), COMMITTED_PREDICATE, |
| 110 | + VF.createLiteral("committed-" + round + "-" + i)); |
115 | 111 | } |
| 112 | + writer.commit(); |
| 113 | + } finally { |
| 114 | + clearOverflowFlag(); |
116 | 115 | } |
117 | | - try { |
118 | | - connection.commit(); |
119 | | - } catch (SailConflictException e) { |
120 | | - connection.rollback(); |
| 116 | + while (statements.hasNext()) { |
| 117 | + statements.next(); |
121 | 118 | } |
122 | | - } catch (Throwable t) { |
123 | | - failure.compareAndSet(null, t); |
124 | 119 | } |
125 | | - }, "lmdb-lifecycle-reader-" + isolationLevel + "-" + round); |
126 | | - |
127 | | - Thread writer = new Thread(() -> { |
128 | | - try (SailConnection connection = store.getConnection()) { |
129 | | - assertThat(readerReady.await(10, TimeUnit.SECONDS)).isTrue(); |
130 | | - connection.begin(isolationLevel); |
131 | | - forceOverflowOnNextAdd(); |
132 | | - for (int i = 0; i < REMOVED_PRELOADED_STATEMENTS; i++) { |
133 | | - connection.removeStatements(subject("preloaded", round, i), RDFS.LABEL, null); |
134 | | - } |
135 | | - for (int i = 0; i < COMMITTED_OVERFLOW_STATEMENTS; i++) { |
136 | | - connection.addStatement(subject("committed", round, i), COMMITTED_PREDICATE, |
137 | | - VF.createLiteral("committed-" + round + "-" + i)); |
138 | | - } |
139 | | - connection.commit(); |
140 | | - } catch (Throwable t) { |
141 | | - failure.compareAndSet(null, t); |
142 | | - } finally { |
143 | | - clearOverflowFlag(); |
144 | | - writerFinished.countDown(); |
| 120 | + try { |
| 121 | + reader.commit(); |
| 122 | + } catch (SailConflictException e) { |
| 123 | + reader.rollback(); |
145 | 124 | } |
146 | | - }, "lmdb-lifecycle-writer-" + isolationLevel + "-" + round); |
147 | | - |
148 | | - reader.start(); |
149 | | - writer.start(); |
150 | | - |
151 | | - reader.join(TimeUnit.SECONDS.toMillis(20)); |
152 | | - writer.join(TimeUnit.SECONDS.toMillis(20)); |
153 | | - |
154 | | - assertThat(reader.isAlive()).isFalse(); |
155 | | - assertThat(writer.isAlive()).isFalse(); |
156 | | - if (failure.get() != null) { |
157 | | - throw new AssertionError("Lifecycle round failed for " + isolationLevel + " round " + round, |
158 | | - failure.get()); |
159 | 125 | } |
160 | 126 | } |
161 | 127 |
|
|
0 commit comments