diff --git a/.github/workflows/copilot-setup-steps.yml b/.github/workflows/copilot-setup-steps.yml index 57142783665..3850a61278e 100644 --- a/.github/workflows/copilot-setup-steps.yml +++ b/.github/workflows/copilot-setup-steps.yml @@ -22,6 +22,17 @@ jobs: distribution: 'temurin' cache: maven + - name: Prepare custom Maven repository directory + run: mkdir -p .m2_repo + + - name: Cache custom Maven repository + uses: actions/cache@v4 + with: + path: .m2_repo + key: ${{ runner.os }}-m2-repo-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-m2-repo- + - name: Validate formatting configuration run: mvn -B -T 2C -Dmaven.repo.local=.m2_repo formatter:validate impsort:check xml-format:xml-check diff --git a/core/common/io/src/main/java/org/eclipse/rdf4j/common/io/NioFile.java b/core/common/io/src/main/java/org/eclipse/rdf4j/common/io/NioFile.java index d2b0e3ecc45..b14e45d37d2 100644 --- a/core/common/io/src/main/java/org/eclipse/rdf4j/common/io/NioFile.java +++ b/core/common/io/src/main/java/org/eclipse/rdf4j/common/io/NioFile.java @@ -11,6 +11,7 @@ package org.eclipse.rdf4j.common.io; import java.io.Closeable; +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -22,6 +23,7 @@ import java.nio.file.StandardOpenOption; import java.util.EnumSet; import java.util.Set; +import java.util.concurrent.TimeUnit; /** * File wrapper that protects against concurrent file closing events due to e.g. {@link Thread#interrupt() thread @@ -45,6 +47,13 @@ public final class NioFile implements Closeable { private volatile FileChannel fc; + /** + * Disable strict guards via system property to maintain legacy behavior without additional exceptions. Property: + * org.eclipse.rdf4j.common.io.niofile.disableStrictGuards (default: false) + */ + private static final boolean STRICT_GUARDS = !Boolean + .getBoolean("org.eclipse.rdf4j.common.io.niofile.disableStrictGuards"); + private volatile boolean explictlyClosed; /** @@ -238,12 +247,24 @@ public long transferTo(long position, long count, WritableByteChannel target) th * @throws IOException */ public int write(ByteBuffer buf, long offset) throws IOException { + final int startPosition = buf.position(); while (true) { try { - return fc.write(buf, offset); + // Ensure the entire buffer is written, even if the underlying channel performs a partial write + while (buf.hasRemaining()) { + long position = offset + (buf.position() - startPosition); + int n = fc.write(buf, position); + if (n == 0) { + // Avoid tight spin in pathological cases: reattempt write + // FileChannel positional writes may occasionally return 0 without progress + continue; + } + } + return buf.position() - startPosition; } catch (ClosedByInterruptException e) { throw e; } catch (ClosedChannelException e) { + // Preserve already-consumed bytes on retry reopen(e); } } @@ -258,12 +279,29 @@ public int write(ByteBuffer buf, long offset) throws IOException { * @throws IOException */ public int read(ByteBuffer buf, long offset) throws IOException { + final int startPosition = buf.position(); while (true) { try { - return fc.read(buf, offset); + while (buf.hasRemaining()) { + long position = offset + (buf.position() - startPosition); + int n = fc.read(buf, position); + if (n < 0) { + // Preserve FileChannel contract: if no bytes were read and EOF is reached, return -1 + if (buf.position() == startPosition) { + return -1; + } + break; // EOF after having read some bytes + } + if (n == 0) { + // Avoid tight spin; allow retry in case of transient 0-byte read + continue; + } + } + return buf.position() - startPosition; } catch (ClosedByInterruptException e) { throw e; } catch (ClosedChannelException e) { + // Preserve already-consumed bytes on retry reopen(e); } } @@ -277,7 +315,10 @@ public int read(ByteBuffer buf, long offset) throws IOException { * @throws IOException */ public void writeBytes(byte[] value, long offset) throws IOException { - write(ByteBuffer.wrap(value), offset); + int write = write(ByteBuffer.wrap(value), offset); + if (STRICT_GUARDS && write != value.length) { + throw new IOException("Incomplete writeBytes: expected " + value.length + ", wrote " + write); + } } /** @@ -290,7 +331,10 @@ public void writeBytes(byte[] value, long offset) throws IOException { */ public byte[] readBytes(long offset, int length) throws IOException { ByteBuffer buf = ByteBuffer.allocate(length); - read(buf, offset); + int read = read(buf, offset); + if (STRICT_GUARDS && read < length) { + throw new EOFException("Unexpected EOF in readBytes: expected " + length + ", read " + read); + } return buf.array(); } @@ -326,7 +370,10 @@ public byte readByte(long offset) throws IOException { public void writeLong(long value, long offset) throws IOException { ByteBuffer buf = ByteBuffer.allocate(8); buf.putLong(0, value); - write(buf, offset); + int write = write(buf, offset); + if (STRICT_GUARDS && write != 8) { + throw new IOException("Incomplete writeLong: wrote " + write); + } } /** @@ -338,7 +385,10 @@ public void writeLong(long value, long offset) throws IOException { */ public long readLong(long offset) throws IOException { ByteBuffer buf = ByteBuffer.allocate(8); - read(buf, offset); + int read = read(buf, offset); + if (STRICT_GUARDS && read < 8) { + throw new EOFException("Unexpected EOF in readLong: read " + read); + } return buf.getLong(0); } @@ -352,7 +402,10 @@ public long readLong(long offset) throws IOException { public void writeInt(int value, long offset) throws IOException { ByteBuffer buf = ByteBuffer.allocate(4); buf.putInt(0, value); - write(buf, offset); + int write = write(buf, offset); + if (STRICT_GUARDS && write != 4) { + throw new IOException("Incomplete writeInt: wrote " + write); + } } /** @@ -364,7 +417,10 @@ public void writeInt(int value, long offset) throws IOException { */ public int readInt(long offset) throws IOException { ByteBuffer buf = ByteBuffer.allocate(4); - read(buf, offset); + int read = read(buf, offset); + if (STRICT_GUARDS && read < 4) { + throw new EOFException("Unexpected EOF in readInt: read " + read); + } return buf.getInt(0); } } diff --git a/core/common/io/src/test/java/org/eclipse/rdf4j/common/io/NioFileEOFContractTest.java b/core/common/io/src/test/java/org/eclipse/rdf4j/common/io/NioFileEOFContractTest.java new file mode 100644 index 00000000000..32e549bf5c2 --- /dev/null +++ b/core/common/io/src/test/java/org/eclipse/rdf4j/common/io/NioFileEOFContractTest.java @@ -0,0 +1,43 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.common.io; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Verifies the EOF contract for NioFile#read(ByteBuffer,long): when the underlying channel is at EOF and no bytes were + * read, the method must return -1 (EOF sentinel), not 0. + */ +public class NioFileEOFContractTest { + + @TempDir + File tmp; + + @Test + public void readReturnsMinusOneAtEofWhenNoBytesRead() throws Exception { + Path p = tmp.toPath().resolve("empty.dat"); + Files.write(p, new byte[0]); // empty file + + try (NioFile nf = new NioFile(p.toFile())) { + ByteBuffer buf = ByteBuffer.allocate(16); + int n = nf.read(buf, 0); + assertEquals(-1, n, "EOF sentinel -1 expected when no bytes were read"); + } + } +} diff --git a/core/common/io/src/test/java/org/eclipse/rdf4j/common/io/NioFileLargeReadGuardRegressionTest.java b/core/common/io/src/test/java/org/eclipse/rdf4j/common/io/NioFileLargeReadGuardRegressionTest.java new file mode 100644 index 00000000000..3a1ddb98723 --- /dev/null +++ b/core/common/io/src/test/java/org/eclipse/rdf4j/common/io/NioFileLargeReadGuardRegressionTest.java @@ -0,0 +1,89 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.common.io; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Regression test for heap guard misclassification: NioFile.read(ByteBuffer,long) must not preemptively throw + * IOException based solely on buffer size when the buffer is already allocated. Previously, a guard attempted to + * compare the requested read size to free heap and could throw before performing any IO, breaking legitimate large + * heap-buffer reads. + */ +public class NioFileLargeReadGuardRegressionTest { + + @TempDir + File tmp; + + @Test + public void largeHeapBufferReadDoesNotPreemptivelyThrow() throws Exception { + // Prepare a small file; the size/content are irrelevant to the regression + Path p = tmp.toPath().resolve("small.dat"); + byte[] small = new byte[1024]; + Files.write(p, small); + + // Allocate a heap buffer just above the previous guard threshold (128MB) + final int size = 129 * 1024 * 1024; // 129MB + final ByteBuffer buf; + try { + buf = ByteBuffer.allocate(size); + } catch (OutOfMemoryError oom) { + // Not enough heap available in this environment to run the check; skip rather than fail + Assumptions.assumeTrue(false, "Insufficient heap to allocate 129MB test buffer"); + return; // unreachable, but keeps compiler happy + } + + // Reduce observed free heap below the requested read size so that a pre-check (if present) would misclassify + // this legitimate large buffer read as risky and throw. We allocate temporary blocks to lower free heap. + // If we cannot safely get below the threshold, skip the test to avoid flakiness across environments. + if (!saturateHeapToBelow(size)) { + Assumptions.assumeTrue(false, "Could not reduce free heap below threshold deterministically"); + return; + } + + try (NioFile nf = new NioFile(p.toFile())) { + int n = nf.read(buf, 0); + // Success is simply: no IOException thrown; value can be -1 (EOF) or >=0 bytes read + assertTrue(n >= -1, "read() returned unexpected value"); + } + } + + private static boolean saturateHeapToBelow(long bytes) { + final Runtime rt = Runtime.getRuntime(); + final java.util.List blocks = new java.util.ArrayList<>(); + final int block = 8 * 1024 * 1024; // 8MB steps to avoid big spikes + try { + for (int i = 0; i < 512; i++) { // cap allocations defensively + long alloc = rt.totalMemory() - rt.freeMemory(); + long free = rt.maxMemory() - alloc; + if (free <= bytes) { + return true; + } + int size = (int) Math.min(block, Math.max(1, free - bytes)); + blocks.add(new byte[size]); + } + } catch (OutOfMemoryError oom) { + // Best-effort: after OOM, the VM may still have reduced free; treat as inconclusive + } + long alloc = rt.totalMemory() - rt.freeMemory(); + long free = rt.maxMemory() - alloc; + return free <= bytes; + } +} diff --git a/core/common/io/src/test/java/org/eclipse/rdf4j/common/io/NioFilePartialIOTest.java b/core/common/io/src/test/java/org/eclipse/rdf4j/common/io/NioFilePartialIOTest.java new file mode 100644 index 00000000000..1da95c507ff --- /dev/null +++ b/core/common/io/src/test/java/org/eclipse/rdf4j/common/io/NioFilePartialIOTest.java @@ -0,0 +1,449 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.common.io; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Tests that NioFile loops to complete full writes/reads even when the underlying FileChannel performs partial IO. + */ +public class NioFilePartialIOTest { + + @TempDir + File tmp; + + @Test + public void writeBytesLoopsOnPartialWrite() throws Exception { + Path p = tmp.toPath().resolve("wbytes.dat"); + NioFile nf = new NioFile(p.toFile()); + injectChoppyChannel(nf, 1); // every write call becomes partial + + byte[] data = new byte[64 * 1024 + 123]; + new Random(42).nextBytes(data); + nf.writeBytes(data, 0); + nf.close(); + + byte[] readBack = Files.readAllBytes(p); + assertArrayEquals(data, readBack); + } + + @Test + public void writePrimitivesLoopOnPartialWrite() throws Exception { + Path p = tmp.toPath().resolve("wprim.dat"); + NioFile nf = new NioFile(p.toFile()); + injectChoppyChannel(nf, 1); + + nf.writeByte((byte) 0x7F, 0); + nf.writeInt(0xCAFEBABE, 1); + nf.writeLong(0x0123456789ABCDEFL, 1 + 4); + nf.close(); + + try (FileChannel fc = FileChannel.open(p, StandardOpenOption.READ)) { + ByteBuffer buf = ByteBuffer.allocate(1 + 4 + 8); + fc.read(buf); + buf.rewind(); + assertEquals((byte) 0x7F, buf.get()); + assertEquals(0xCAFEBABE, buf.getInt()); + assertEquals(0x0123456789ABCDEFL, buf.getLong()); + } + } + + @Test + public void readBytesLoopsOnPartialRead() throws Exception { + Path p = tmp.toPath().resolve("rbytes.dat"); + byte[] data = new byte[32 * 1024 + 17]; + new Random(24).nextBytes(data); + Files.write(p, data); + + NioFile nf = new NioFile(p.toFile()); + injectChoppyChannel(nf, 1, true, false); // partial reads only + + byte[] got = nf.readBytes(0, data.length); + nf.close(); + assertArrayEquals(data, got); + } + + @Test + public void readPrimitivesLoopOnPartialRead() throws Exception { + Path p = tmp.toPath().resolve("rprim.dat"); + try (FileChannel fc = FileChannel.open(p, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { + ByteBuffer buf = ByteBuffer.allocate(1 + 4 + 8); + buf.put((byte) 0x12); + buf.putInt(0xDEADBEEF); + buf.putLong(0x0F1E2D3C4B5A6978L); + buf.rewind(); + fc.write(buf); + } + + NioFile nf = new NioFile(p.toFile()); + injectChoppyChannel(nf, 1, true, false); + + assertEquals((byte) 0x12, nf.readByte(0)); + assertEquals(0xDEADBEEF, nf.readInt(1)); + assertEquals(0x0F1E2D3C4B5A6978L, nf.readLong(1 + 4)); + nf.close(); + } + + @Test + public void writeBytesResumesAfterChannelClose() throws Exception { + Path p = tmp.toPath().resolve("wbytes-close.dat"); + byte[] data = new byte[4096 + 37]; + new Random(11).nextBytes(data); + + try (NioFile nf = new NioFile(p.toFile())) { + injectClosingChannel(nf, 64, false, true); + nf.writeBytes(data, 0); + } + + byte[] readBack = Files.readAllBytes(p); + assertArrayEquals(data, readBack); + } + + @Test + public void readBytesResumesAfterChannelClose() throws Exception { + Path p = tmp.toPath().resolve("rbytes-close.dat"); + byte[] data = new byte[2048 + 19]; + new Random(29).nextBytes(data); + Files.write(p, data); + + byte[] got; + try (NioFile nf = new NioFile(p.toFile())) { + injectClosingChannel(nf, 32, true, false); + got = nf.readBytes(0, data.length); + } + + assertArrayEquals(data, got); + } + + // --- helpers --- + + private static void injectChoppyChannel(NioFile nf, int frequency) { + injectChoppyChannel(nf, frequency, false, false); + } + + private static void injectChoppyChannel(NioFile nf, int frequency, boolean choppyRead, boolean choppyWrite) { + try { + Field fcField = NioFile.class.getDeclaredField("fc"); + fcField.setAccessible(true); + FileChannel original = (FileChannel) fcField.get(nf); + fcField.set(nf, new ChoppyFileChannel(original, frequency, choppyRead, choppyWrite)); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + private static void injectClosingChannel(NioFile nf, int partialSize, boolean closeOnRead, boolean closeOnWrite) { + try { + Field fcField = NioFile.class.getDeclaredField("fc"); + fcField.setAccessible(true); + FileChannel original = (FileChannel) fcField.get(nf); + fcField.set(nf, new ClosingFileChannel(original, partialSize, closeOnRead, closeOnWrite)); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + static class ChoppyFileChannel extends FileChannel { + private final FileChannel delegate; + private final int frequency; + private final boolean choppyRead; + private final boolean choppyWrite; + private int op; + + ChoppyFileChannel(FileChannel delegate, int frequency, boolean choppyRead, boolean choppyWrite) { + this.delegate = delegate; + this.frequency = Math.max(1, frequency); + this.choppyRead = choppyRead; + this.choppyWrite = choppyWrite; + } + + @Override + public int write(ByteBuffer src, long position) throws IOException { + op++; + if (choppyWrite && op % frequency == 0) { + int rem = src.remaining(); + int part = Math.max(1, rem / 2); + ByteBuffer slice = src.slice(); + slice.limit(part); + int n = delegate.write(slice, position); + src.position(src.position() + n); + return n; + } + return delegate.write(src, position); + } + + @Override + public int read(ByteBuffer dst, long position) throws IOException { + op++; + if (choppyRead && op % frequency == 0) { + int want = dst.remaining(); + int part = Math.max(1, want / 2); + ByteBuffer slice = ByteBuffer.allocate(part); + int n = delegate.read(slice, position); + slice.rewind(); + if (n > 0) { + int lim = Math.min(n, part); + dst.put((ByteBuffer) slice.limit(lim)); + } + return n; + } + return delegate.read(dst, position); + } + + // Straight delegates for other abstract methods + @Override + public int write(ByteBuffer src) throws IOException { + return delegate.write(src); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + return delegate.write(srcs, offset, length); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return delegate.read(dst); + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + return delegate.read(dsts, offset, length); + } + + @Override + public long position() throws IOException { + return delegate.position(); + } + + @Override + public FileChannel position(long newPosition) throws IOException { + delegate.position(newPosition); + return this; + } + + @Override + public long size() throws IOException { + return delegate.size(); + } + + @Override + public FileChannel truncate(long size) throws IOException { + delegate.truncate(size); + return this; + } + + @Override + public void force(boolean metaData) throws IOException { + delegate.force(metaData); + } + + @Override + public long transferTo(long position, long count, java.nio.channels.WritableByteChannel target) + throws IOException { + return delegate.transferTo(position, count, target); + } + + @Override + public long transferFrom(java.nio.channels.ReadableByteChannel src, long position, long count) + throws IOException { + return delegate.transferFrom(src, position, count); + } + + @Override + public java.nio.MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { + return delegate.map(mode, position, size); + } + + @Override + public java.nio.channels.FileLock lock(long position, long size, boolean shared) throws IOException { + return delegate.lock(position, size, shared); + } + + @Override + public java.nio.channels.FileLock tryLock(long position, long size, boolean shared) throws IOException { + return delegate.tryLock(position, size, shared); + } + + @Override + protected void implCloseChannel() throws IOException { + delegate.close(); + } + } + + static class ClosingFileChannel extends FileChannel { + private final FileChannel delegate; + private final int partialSize; + private final boolean closeOnRead; + private final boolean closeOnWrite; + private final AtomicBoolean readClosed = new AtomicBoolean(); + private final AtomicBoolean writeClosed = new AtomicBoolean(); + + ClosingFileChannel(FileChannel delegate, int partialSize, boolean closeOnRead, boolean closeOnWrite) { + this.delegate = delegate; + this.partialSize = Math.max(1, partialSize); + this.closeOnRead = closeOnRead; + this.closeOnWrite = closeOnWrite; + } + + @Override + public int write(ByteBuffer src, long position) throws IOException { + if (closeOnWrite && writeClosed.compareAndSet(false, true)) { + int requested = Math.min(partialSize, src.remaining()); + ByteBuffer slice = src.slice(); + slice.limit(requested); + int written = delegate.write(slice, position); + src.position(src.position() + written); + if (written > 0) { + doClose(); + throw new ClosedChannelException(); + } + writeClosed.set(false); + } + ensureOpen(); + return delegate.write(src, position); + } + + @Override + public int read(ByteBuffer dst, long position) throws IOException { + if (closeOnRead && readClosed.compareAndSet(false, true)) { + int requested = Math.min(partialSize, dst.remaining()); + ByteBuffer tmp = ByteBuffer.allocate(requested); + int n = delegate.read(tmp, position); + if (n > 0) { + tmp.flip(); + tmp.limit(n); + dst.put(tmp); + doClose(); + throw new ClosedChannelException(); + } + readClosed.set(false); + return n; + } + ensureOpen(); + return delegate.read(dst, position); + } + + private void ensureOpen() throws ClosedChannelException { + if (!delegate.isOpen() || !isOpen()) { + throw new ClosedChannelException(); + } + } + + private void doClose() throws IOException { + // close wrapper first so AbstractInterruptibleChannel marks it as closed + if (isOpen()) { + close(); + } else if (delegate.isOpen()) { + delegate.close(); + } + } + + @Override + public int write(ByteBuffer src) throws IOException { + ensureOpen(); + return delegate.write(src); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + ensureOpen(); + return delegate.write(srcs, offset, length); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + ensureOpen(); + return delegate.read(dst); + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + ensureOpen(); + return delegate.read(dsts, offset, length); + } + + @Override + public long position() throws IOException { + return delegate.position(); + } + + @Override + public FileChannel position(long newPosition) throws IOException { + delegate.position(newPosition); + return this; + } + + @Override + public long size() throws IOException { + return delegate.size(); + } + + @Override + public FileChannel truncate(long size) throws IOException { + delegate.truncate(size); + return this; + } + + @Override + public void force(boolean metaData) throws IOException { + delegate.force(metaData); + } + + @Override + public long transferTo(long position, long count, java.nio.channels.WritableByteChannel target) + throws IOException { + return delegate.transferTo(position, count, target); + } + + @Override + public long transferFrom(java.nio.channels.ReadableByteChannel src, long position, long count) + throws IOException { + return delegate.transferFrom(src, position, count); + } + + @Override + public java.nio.MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { + return delegate.map(mode, position, size); + } + + @Override + public java.nio.channels.FileLock lock(long position, long size, boolean shared) throws IOException { + return delegate.lock(position, size, shared); + } + + @Override + public java.nio.channels.FileLock tryLock(long position, long size, boolean shared) throws IOException { + return delegate.tryLock(position, size, shared); + } + + @Override + protected void implCloseChannel() throws IOException { + delegate.close(); + } + } +} diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFile.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFile.java index fe8013124ac..3f7e85f22f9 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFile.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/TxnStatusFile.java @@ -12,6 +12,7 @@ import static java.nio.charset.StandardCharsets.US_ASCII; +import java.io.EOFException; import java.io.File; import java.io.IOException; @@ -127,8 +128,13 @@ public TxnStatus getTxnStatus() throws IOException { if (disabled) { return TxnStatus.NONE; } - - byte[] bytes = nioFile.readBytes(0, 1); + byte[] bytes; + try { + bytes = nioFile.readBytes(0, 1); + } catch (EOFException e) { + // empty file = NONE status + return TxnStatus.NONE; + } TxnStatus status; diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/ValueStore.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/ValueStore.java index 3f1e72c3d79..9c4786bf27e 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/ValueStore.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/ValueStore.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Optional; import org.eclipse.rdf4j.common.annotation.InternalUseOnly; @@ -283,6 +284,9 @@ public T getIRI(int id) throws IOException { * @throws IOException If an I/O error occurred. */ public int getID(Value value) throws IOException { + if (logger.isDebugEnabled()) { + logger.debug("getID start thread={} value={}", threadName(), describeValue(value)); + } // Try to get the internal ID from the value itself boolean isOwnValue = isOwnValue(value); @@ -293,6 +297,10 @@ public int getID(Value value) throws IOException { int id = nativeValue.getInternalID(); if (id != NativeValue.UNKNOWN_ID) { + if (logger.isDebugEnabled()) { + logger.debug("getID returning cached internal id {} for value={} thread={}", id, + describeValue(value), threadName()); + } return id; } } @@ -309,6 +317,11 @@ public int getID(Value value) throws IOException { ((NativeValue) value).setInternalID(id, revision); } + if (logger.isDebugEnabled()) { + logger.debug("getID returning cache id {} for value={} thread={}", id, describeValue(value), + threadName()); + } + return id; } @@ -320,6 +333,10 @@ public int getID(Value value) throws IOException { } if (data != null) { + if (logger.isDebugEnabled()) { + logger.debug("getID querying datastore for value={} thread={} dataSummary={}", describeValue(value), + threadName(), summarize(data)); + } int id = dataStore.getID(data); if (id == NativeValue.UNKNOWN_ID && value instanceof Literal) { @@ -336,14 +353,49 @@ public int getID(Value value) throws IOException { nv.setInternalID(id, revision); valueIDCache.put(nv, Integer.valueOf(id)); } + + if (logger.isDebugEnabled()) { + logger.debug("getID resolved value={} id={} thread={}", describeValue(value), id, threadName()); + } } return id; } + if (logger.isDebugEnabled()) { + logger.debug("getID returning UNKNOWN for value={} thread={}", describeValue(value), threadName()); + } + return NativeValue.UNKNOWN_ID; } + private static String summarize(byte[] data) { + if (data == null) { + return "null"; + } + return "len=" + data.length + ",hash=" + Arrays.hashCode(data); + } + + private static String threadName() { + return Thread.currentThread().getName(); + } + + private static String describeValue(Value value) { + if (value == null) { + return "null"; + } + String lexical; + try { + lexical = value.stringValue(); + } catch (Exception e) { + lexical = String.valueOf(value); + } + if (lexical.length() > 120) { + lexical = lexical.substring(0, 117) + "..."; + } + return value.getClass().getSimpleName() + '[' + lexical + ']'; + } + /** * Stores the supplied value and returns the ID that has been assigned to it. In case the value was already present, * the value will not be stored again and the ID of the existing value is returned. @@ -352,7 +404,10 @@ public int getID(Value value) throws IOException { * @return The ID that has been assigned to the value. * @throws IOException If an I/O error occurred. */ - public int storeValue(Value value) throws IOException { + public synchronized int storeValue(Value value) throws IOException { + if (logger.isDebugEnabled()) { + logger.debug("storeValue start thread={} value={}", threadName(), describeValue(value)); + } // Try to get the internal ID from the value itself boolean isOwnValue = isOwnValue(value); @@ -364,6 +419,10 @@ public int storeValue(Value value) throws IOException { int id = nativeValue.getInternalID(); if (id != NativeValue.UNKNOWN_ID) { + if (logger.isDebugEnabled()) { + logger.debug("storeValue returning cached internal id {} for value={} thread={}", id, + describeValue(value), threadName()); + } return id; } } @@ -380,6 +439,11 @@ public int storeValue(Value value) throws IOException { ((NativeValue) value).setInternalID(id, revision); } + if (logger.isDebugEnabled()) { + logger.debug("storeValue returning cached id {} for value={} thread={}", id, describeValue(value), + threadName()); + } + return id; } @@ -387,6 +451,13 @@ public int storeValue(Value value) throws IOException { // store which will handle duplicates byte[] valueData = value2data(value, true); + if (valueData == null) { + if (logger.isDebugEnabled()) { + logger.debug("storeValue computed no data for value={} thread={}", describeValue(value), threadName()); + } + return NativeValue.UNKNOWN_ID; + } + int id = dataStore.storeData(valueData); NativeValue nv = isOwnValue ? (NativeValue) value : getNativeValue(value); @@ -397,6 +468,11 @@ public int storeValue(Value value) throws IOException { // Update cache valueIDCache.put(nv, id); + if (logger.isDebugEnabled()) { + logger.debug("storeValue stored value={} assigned id={} thread={} dataSummary={}", describeValue(nv), id, + threadName(), summarize(valueData)); + } + return id; } @@ -453,6 +529,10 @@ public void checkConsistency() throws SailException, IOException { for (int id = 1; id <= maxID; id++) { try { byte[] data = dataStore.getData(id); + if (data == null || data.length == 0) { + // Defensive guard against truncated/empty records which otherwise cause AIOOBE in isNamespaceData + throw new SailException("Empty data array for value with id " + id); + } if (isNamespaceData(data)) { String namespace = data2namespace(data); try { @@ -463,6 +543,8 @@ public void checkConsistency() throws SailException, IOException { } catch (IllegalArgumentException e) { // throw SailException } + logger.error("Inconsistent namespace data for id {} (also id {}): {}", id, + getNamespaceID(namespace, false), namespace); throw new SailException( "Store must be manually exported and imported to fix namespaces like " + namespace); } else { @@ -513,20 +595,33 @@ private boolean revisionIsCurrent(NativeValue value) { } private byte[] value2data(Value value, boolean create) throws IOException { + byte[] data; if (value instanceof IRI) { - return uri2data((IRI) value, create); + data = uri2data((IRI) value, create); } else if (value instanceof BNode) { - return bnode2data((BNode) value, create); + data = bnode2data((BNode) value, create); } else if (value instanceof Literal) { - return literal2data((Literal) value, create); + data = literal2data((Literal) value, create); } else { throw new IllegalArgumentException("value parameter should be a URI, BNode or Literal"); } + + if (logger.isDebugEnabled()) { + logger.debug("value2data thread={} value={} create={} summary={}", threadName(), describeValue(value), + create, summarize(data)); + } + + return data; } private byte[] uri2data(IRI uri, boolean create) throws IOException { int nsID = getNamespaceID(uri.getNamespace(), create); + if (logger.isDebugEnabled()) { + logger.debug("uri2data thread={} namespace='{}' nsId={} create={}", threadName(), uri.getNamespace(), nsID, + create); + } + if (nsID == -1) { // Unknown namespace means unknown URI return null; @@ -541,6 +636,11 @@ private byte[] uri2data(IRI uri, boolean create) throws IOException { ByteArrayUtil.putInt(nsID, uriData, 1); ByteArrayUtil.put(localNameData, uriData, 5); + if (logger.isDebugEnabled()) { + logger.debug("uri2data produced len={} summary={} thread={}", uriData.length, summarize(uriData), + threadName()); + } + return uriData; } @@ -582,12 +682,21 @@ private byte[] literal2data(String label, Optional lang, IRI dt, boolean } } + if (logger.isDebugEnabled()) { + logger.debug("literal2data thread={} valueLength={} langPresent={} datatype={} datatypeId={} create={}", + threadName(), label.length(), lang.isPresent(), dt, datatypeID, create); + } + // Get language tag in UTF-8 byte[] langData = null; int langDataLength = 0; if (lang.isPresent()) { langData = lang.get().getBytes(StandardCharsets.UTF_8); langDataLength = langData.length; + if (langDataLength > 255) { + throw new IllegalArgumentException( + "Language tag too long (length " + langDataLength + " > maximum 255): " + lang.get()); + } } // Get label in UTF-8 @@ -597,12 +706,17 @@ private byte[] literal2data(String label, Optional lang, IRI dt, boolean byte[] literalData = new byte[6 + langDataLength + labelData.length]; literalData[0] = LITERAL_VALUE; ByteArrayUtil.putInt(datatypeID, literalData, 1); - literalData[5] = (byte) langDataLength; + literalData[5] = (byte) (langDataLength & 0xFF); if (langData != null) { ByteArrayUtil.put(langData, literalData, 6); } ByteArrayUtil.put(labelData, literalData, 6 + langDataLength); + if (logger.isDebugEnabled()) { + logger.debug("literal2data produced len={} summary={} thread={}", literalData.length, + summarize(literalData), threadName()); + } + return literalData; } @@ -675,7 +789,7 @@ private T data2literal(int id, byte[] data) th // Get language tag String lang = null; - int langLength = data[5]; + int langLength = data[5] & 0xFF; if (langLength > 0) { lang = new String(data, 6, langLength, StandardCharsets.UTF_8); } @@ -705,8 +819,15 @@ private String data2namespace(byte[] data) { } private int getNamespaceID(String namespace, boolean create) throws IOException { + if (logger.isDebugEnabled()) { + logger.debug("getNamespaceID thread={} namespace='{}' create={}", threadName(), namespace, create); + } Integer cacheID = namespaceIDCache.get(namespace); if (cacheID != null) { + if (logger.isDebugEnabled()) { + logger.debug("getNamespaceID cache hit namespace='{}' id={} thread={}", namespace, cacheID, + threadName()); + } return cacheID; } @@ -721,6 +842,11 @@ private int getNamespaceID(String namespace, boolean create) throws IOException if (id != -1) { namespaceIDCache.put(namespace, id); + if (logger.isDebugEnabled()) { + logger.debug("getNamespaceID resolved namespace='{}' id={} thread={}", namespace, id, threadName()); + } + } else if (logger.isDebugEnabled()) { + logger.debug("getNamespaceID unresolved namespace='{}' thread={}", namespace, threadName()); } return id; diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataFile.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataFile.java index 550954750a7..a4448f9840a 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataFile.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataFile.java @@ -18,7 +18,9 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; +import org.eclipse.rdf4j.common.annotation.InternalUseOnly; import org.eclipse.rdf4j.common.io.NioFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +52,12 @@ public class DataFile implements Closeable { private static final long HEADER_LENGTH = MAGIC_NUMBER.length + 1; + // Guard parameters + private static final int FALLBACK_LARGE_READ_THRESHOLD = 128 * 1024 * 1024; // 128MB fallback + public static final String LARGE_READ_THRESHOLD_PROPERTY = "org.eclipse.rdf4j.sail.nativerdf.datastore.DataFile.largeReadThresholdBytes"; + public static final int LARGE_READ_THRESHOLD = getConfiguredLargeReadThreshold(); + private static final int SOFT_FAIL_CAP_BYTES = 32 * 1024 * 1024; // 32MB + /*-----------* * Variables * *-----------*/ @@ -147,7 +155,7 @@ public byte[] tryRecoverBetweenOffsets(long startOffset, long endOffset) throws * @param data The data to store, must not be null. * @return The byte-offset in the file at which the data was stored. */ - public long storeData(byte[] data) throws IOException { + synchronized public long storeData(byte[] data) throws IOException { assert data != null : "data must not be null"; long offset = nioFileSize; @@ -197,7 +205,7 @@ synchronized private void flush() throws IOException { } - private int remainingBufferCapacity() { + synchronized private int remainingBufferCapacity() { return buffer.capacity() - buffer.position(); } @@ -227,16 +235,8 @@ public byte[] getData(long offset) throws IOException { (data[2] << 8) & 0x0000ff00 | (data[3]) & 0x000000ff; - // If the data length is larger than 750MB, we are likely reading the wrong data. Probably data corruption. The - // limit of 750MB was chosen based on results from experimenting in the NativeSailStoreCorruptionTest class. - if (dataLength > 128 * 1024 * 1024) { - if (SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) { - logger.error( - "Data length is {}MB which is larger than 750MB. This is likely data corruption. Truncating length to 32 MB.", - dataLength / ((1024 * 1024))); - dataLength = 32 * 1024 * 1024; - } - } + // Validate and possibly reduce the length before allocating a large array + dataLength = guardedDataLength(dataLength); try { @@ -248,16 +248,22 @@ public byte[] getData(long offset) throws IOException { // adjust the approximate average with 1 part actual length and 99 parts previous average up to a // sensible // max of 200 - dataLengthApproximateAverage = (int) (Math.min(200, + dataLengthApproximateAverage = (int) Math.max(0, Math.min(200, ((dataLengthApproximateAverage / 100.0) * 99) + (dataLength / 100.0))); - return Arrays.copyOfRange(data, 4, dataLength + 4); + int i = dataLength + 4; + if (i < 0 || i > data.length) { + throw new IOException("Corrupt data record at offset " + offset + ". Data length: " + dataLength); + } + + return Arrays.copyOfRange(data, 4, i); } else { // adjust the approximate average, but favour the actual dataLength since dataLength predictions misses // are costly - dataLengthApproximateAverage = Math.min(200, (dataLengthApproximateAverage + dataLength) / 2); + dataLengthApproximateAverage = Math.max(0, + Math.min(200, (dataLengthApproximateAverage + dataLength) / 2)); // we didn't read enough data so we need to execute a new read data = new byte[dataLength]; @@ -267,7 +273,7 @@ public byte[] getData(long offset) throws IOException { return data; } } catch (OutOfMemoryError e) { - if (dataLength > 128 * 1024 * 1024) { + if (dataLength > LARGE_READ_THRESHOLD) { logger.error( "Trying to read large amounts of data may be a sign of data corruption. Consider setting the system property org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes to true"); } @@ -276,12 +282,109 @@ public byte[] getData(long offset) throws IOException { } + /** + * For very large reads, ensure there appears to be sufficient free heap to allocate the requested record. If soft + * fail mode is enabled and insufficient memory is observed, returns a reduced cap to allow recovery; otherwise + * throws an IOException with guidance for remediation. + */ + private int guardedDataLength(int requested) throws IOException { + if (requested <= 0) { + return requested; + } + + // Soft-fail corruption cap remains in effect for oversized claims + if (requested > LARGE_READ_THRESHOLD && SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) { + logger.error( + "Data length is {}MB which is larger than {}MB. This is likely data corruption. Truncating length to {} MB.", + requested / (1024 * 1024), LARGE_READ_THRESHOLD / (1024 * 1024), + SOFT_FAIL_CAP_BYTES / (1024 * 1024)); + return SOFT_FAIL_CAP_BYTES; + } + + if (requested <= LARGE_READ_THRESHOLD) { + return requested; + } + + Runtime rt = Runtime.getRuntime(); + for (int i = 0; i < 6; i++) { // initial check + up to 5 GC attempts + long free = getFreeMemory(rt); + if (free >= requested) { + return requested; + } + if (i < 5) { + System.gc(); + try { + TimeUnit.MILLISECONDS.sleep(1); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + + long free = getFreeMemory(rt); + if (SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) { + logger.error( + "Attempt to read {} MB but only {} MB free heap available. Truncating to {} MB due to soft-fail mode.", + requested / (1024 * 1024), free / (1024 * 1024), SOFT_FAIL_CAP_BYTES / (1024 * 1024)); + return SOFT_FAIL_CAP_BYTES; + } + throw new IOException("Attempt to read " + (requested / (1024 * 1024)) + " MB but only " + + (free / (1024 * 1024)) + + " MB free heap available. This may indicate corrupted data length. Consider enabling soft-fail mode via system property 'org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes'=true to attempt recovery."); + } + + @InternalUseOnly + public long getFreeMemory(Runtime rt) { + // this method is overridden in tests to simulate low-heap conditions + long allocated = rt.totalMemory() - rt.freeMemory(); + return (rt.maxMemory() - allocated); + } + + private static int getConfiguredLargeReadThreshold() { + int defaultThreshold = defaultLargeReadThreshold(); + String configured = System.getProperty(LARGE_READ_THRESHOLD_PROPERTY); + if (configured == null || configured.isBlank()) { + logger.debug( + "Using default large read threshold of {} MB. To configure, set system property {} to a positive integer value in bytes.", + defaultThreshold / (1024 * 1024), LARGE_READ_THRESHOLD_PROPERTY); + return defaultThreshold; + } + try { + int parsed = Integer.parseInt(configured.trim()); + if (parsed <= 0) { + logger.warn( + "Ignoring non-positive value {} for system property {}. Falling back to {} MB.", + configured, LARGE_READ_THRESHOLD_PROPERTY, defaultThreshold / (1024 * 1024)); + return defaultThreshold; + } + return parsed; + } catch (NumberFormatException e) { + logger.warn( + "Ignoring non-numeric value {} for system property {}. Falling back to {} MB.", + configured, LARGE_READ_THRESHOLD_PROPERTY, defaultThreshold / (1024 * 1024)); + return defaultThreshold; + } + } + + private static int defaultLargeReadThreshold() { + long maxMemory = Runtime.getRuntime().maxMemory(); + if (maxMemory <= 0) { + return FALLBACK_LARGE_READ_THRESHOLD; + } + long threshold = maxMemory / 16L; + if (threshold <= 0) { + return FALLBACK_LARGE_READ_THRESHOLD; + } + return Math.toIntExact(Math.min(threshold, Integer.MAX_VALUE)); + } + /** * Discards all stored data. * * @throws IOException If an I/O error occurred. */ - public void clear() throws IOException { + synchronized public void clear() throws IOException { nioFile.truncate(HEADER_LENGTH); nioFileSize = HEADER_LENGTH; buffer.clear(); @@ -290,7 +393,7 @@ public void clear() throws IOException { /** * Syncs any unstored data to the hash file. */ - public void sync() throws IOException { + synchronized public void sync() throws IOException { flush(); if (forceSync) { @@ -298,7 +401,7 @@ public void sync() throws IOException { } } - public void sync(boolean force) throws IOException { + synchronized public void sync(boolean force) throws IOException { flush(); nioFile.force(force); @@ -310,9 +413,9 @@ public void sync(boolean force) throws IOException { * @throws IOException */ @Override - public void close() throws IOException { + synchronized public void close() throws IOException { flush(); - + nioFile.force(true); nioFile.close(); } diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataStore.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataStore.java index b8e775c49e1..90c46561c40 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataStore.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataStore.java @@ -35,13 +35,14 @@ public class DataStore implements Closeable { * Variables * *-----------*/ + private static final Logger logger = LoggerFactory.getLogger(DataStore.class); + private final DataFile dataFile; private final IDFile idFile; private final HashFile hashFile; - private static final Logger logger = LoggerFactory.getLogger(DataStore.class); private ValueStore valueStore; /*--------------* @@ -77,115 +78,139 @@ public DataStore(File dataDir, String filePrefix, boolean forceSync, ValueStore public byte[] getData(int id) throws IOException { assert id > 0 : "id must be larger than 0, is: " + id; - // Data not in cache or cache not used, fetch from file long offset = idFile.getOffset(id); if (offset != 0L) { byte[] data = dataFile.getData(offset); + + assert data != null : "data must not be null"; + + if (logger.isDebugEnabled()) { + logger.debug("getData thread={} id={} offset={} resultLength={}", threadName(), id, offset, + data.length); + } + if (data.length == 0 && NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) { - try { - long offsetNoCache = idFile.getOffsetNoCache(id); - if (offset != offsetNoCache) { - logger.error("IDFile cache mismatch for id {}: cached={}, raw={}. Using raw.", id, offset, - offsetNoCache); - offset = offsetNoCache; - data = dataFile.getData(offset); - } - } catch (IOException e) { - // If raw read fails, keep cached offset - } + data = attemptToRecoverCorruptData(id, offset, data); + } + return data; + } - // Attempt recovery by using neighboring offsets to infer the bounds - long startData = offset + 4; // default start if no previous valid entry - // Find previous entry end: prevOffset + 4 + prevLength - int prev = id - 1; - for (; prev >= 1; prev--) { - long po = idFile.getOffset(prev); - try { - long poRaw = idFile.getOffsetNoCache(prev); - if (po != poRaw) { - logger.error("IDFile cache mismatch for prev id {}: cached={}, raw={}. Using raw.", prev, - po, poRaw); - po = poRaw; - } - } catch (IOException e) { - // use cached po if raw read fails - } - if (po > 0L) { + return null; + } + + /** + * + * @param id + * @param offset + * @param data + * @return + * @throws IOException + * @throws RecoveredDataException if recovery was successful + */ + private byte[] attemptToRecoverCorruptData(int id, long offset, byte[] data) throws IOException { + // Data corruption detected: ID points to an offset, but no data could be read. Attempt to recover. + try { + long offsetNoCache = idFile.getOffsetNoCache(id); + if (offset != offsetNoCache) { + logger.error("IDFile cache mismatch for id {}: cached={}, raw={}. Using raw.", id, offset, + offsetNoCache); + offset = offsetNoCache; + data = dataFile.getData(offset); + } + } catch (IOException e) { + // If raw read fails, keep cached offset + } + + // Attempt recovery by using neighboring offsets to infer the bounds + long startData = offset + 4; // default start if no previous valid entry + // Find previous entry end: prevOffset + 4 + prevLength + int prev = id - 1; + for (; prev >= 1; prev--) { + long po = idFile.getOffset(prev); + try { + long poRaw = idFile.getOffsetNoCache(prev); + if (po != poRaw) { + logger.error("IDFile cache mismatch for prev id {}: cached={}, raw={}. Using raw.", prev, + po, poRaw); + po = poRaw; + } + } catch (IOException e) { + // use cached po if raw read fails + } + if (po > 0L) { + try { + byte[] prevData = dataFile.getData(po); + if (prevData != null && prevData.length > 0) { try { - byte[] prevData = dataFile.getData(po); - if (prevData != null && prevData.length > 0) { - try { - if (valueStore != null && Thread.currentThread().getStackTrace().length < 512) { - NativeValue nativeValue = valueStore.data2value(prev, prevData); - logger.warn("Data in previous ID ({}) is: {}", prev, nativeValue); - } else { - logger.warn("Data in previous ID ({}) is: {}", prev, - new String(prevData, StandardCharsets.UTF_8)); - } - } catch (Exception ignored) { - } - startData = po + 4L + prevData.length; - break; + if (valueStore != null && Thread.currentThread().getStackTrace().length < 512) { + NativeValue nativeValue = valueStore.data2value(prev, prevData); + logger.warn("Data in previous ID ({}) is: {}", prev, nativeValue); + } else { + logger.warn("Data in previous ID ({}) is: {}", prev, + new String(prevData, StandardCharsets.UTF_8)); } } catch (Exception ignored) { } + startData = po + 4L + prevData.length; + break; } + } catch (Exception ignored) { } + } + } - // Find next entry start as the end bound - long endOffset = 0L; - int maxId = idFile.getMaxID(); - int next = id + 1; - for (; next <= maxId; next++) { - long no = idFile.getOffset(next); - try { - long noRaw = idFile.getOffsetNoCache(next); - if (no != noRaw) { - logger.error("IDFile cache mismatch for next id {}: cached={}, raw={}. Using raw.", next, - no, noRaw); - no = noRaw; - } - } catch (IOException e) { - // use cached value if raw read fails - } - if (no > 0L) { + // Find next entry start as the end bound + long endOffset = 0L; + int maxId = idFile.getMaxID(); + int next = id + 1; + for (; next <= maxId; next++) { + long no = idFile.getOffset(next); + try { + long noRaw = idFile.getOffsetNoCache(next); + if (no != noRaw) { + logger.error("IDFile cache mismatch for next id {}: cached={}, raw={}. Using raw.", next, + no, noRaw); + no = noRaw; + } + } catch (IOException e) { + // use cached value if raw read fails + } + if (no > 0L) { + try { + byte[] nextData = dataFile.getData(no); + if (nextData != null && nextData.length > 0) { try { - byte[] nextData = dataFile.getData(no); - if (nextData != null && nextData.length > 0) { - try { - if (valueStore != null && Thread.currentThread().getStackTrace().length < 512) { - NativeValue nativeValue = valueStore.data2value(next, nextData); - logger.warn("Data in next ID ({}) is: {}", next, nativeValue); - } else { - logger.warn("Data in next ID ({}) is: {}", next, - new String(nextData, StandardCharsets.UTF_8)); - } - } catch (Exception ignored) { - } - endOffset = no; - break; + if (valueStore != null && Thread.currentThread().getStackTrace().length < 512) { + NativeValue nativeValue = valueStore.data2value(next, nextData); + logger.warn("Data in next ID ({}) is: {}", next, nativeValue); + } else { + logger.warn("Data in next ID ({}) is: {}", next, + new String(nextData, StandardCharsets.UTF_8)); } - } catch (Exception e) { + } catch (Exception ignored) { } - + endOffset = no; + break; } + } catch (Exception e) { } - if (endOffset == 0L) { - // Fallback: use current file size as end bound - endOffset = dataFile.getFileSize(); - } - if (endOffset > startData) { - // tryRecoverBetweenOffsets expects an offset to a 4-byte length, so pass (startData - 4) - byte[] recovered = dataFile.tryRecoverBetweenOffsets(Math.max(0L, startData - 4L), endOffset); - throw new RecoveredDataException(id, recovered); - } + } - return data; + } + if (endOffset == 0L) { + // Fallback: use current file size as end bound + endOffset = dataFile.getFileSize(); + } + if (endOffset > startData) { + // tryRecoverBetweenOffsets expects an offset to a 4-byte length, so pass (startData - 4) + byte[] recovered = dataFile.tryRecoverBetweenOffsets(Math.max(0L, startData - 4L), endOffset); + throw new RecoveredDataException(id, recovered); } - return null; + assert data.length == 0; + return data; } /** @@ -200,16 +225,25 @@ public int getID(byte[] queryData) throws IOException { int id; - // Value not in cache or cache not used, fetch from file int hash = getDataHash(queryData); + if (logger.isDebugEnabled()) { + logger.debug("getID start thread={} hash={} summary={}", threadName(), hash, summarize(queryData)); + } + HashFile.IDIterator iter = hashFile.getIDIterator(hash); try { while ((id = iter.next()) >= 0) { long offset = idFile.getOffset(id); byte[] data = dataFile.getData(offset); + boolean match = Arrays.equals(queryData, data); + + if (logger.isDebugEnabled()) { + logger.debug( + "getID candidate thread={} hash={} candidateId={} offset={} match={} candidateSummary={}", + threadName(), hash, id, offset, match, summarize(data)); + } - if (Arrays.equals(queryData, data)) { - // Matching data found + if (match) { break; } } @@ -217,6 +251,10 @@ public int getID(byte[] queryData) throws IOException { iter.close(); } + if (logger.isDebugEnabled()) { + logger.debug("getID result thread={} hash={} id={}", threadName(), hash, id); + } + return id; } @@ -237,16 +275,29 @@ public int getMaxID() { * @return The ID that has been assigned to the value. * @throws IOException If an I/O error occurred. */ - public int storeData(byte[] data) throws IOException { + public synchronized int storeData(byte[] data) throws IOException { assert data != null : "data must not be null"; + if (logger.isDebugEnabled()) { + int hash = getDataHash(data); + logger.debug("storeData start thread={} hash={} summary={}", threadName(), hash, summarize(data)); + } + int id = getID(data); if (id == -1) { - // Data not stored yet, store it under a new ID. + int hash = getDataHash(data); long offset = dataFile.storeData(data); id = idFile.storeOffset(offset); - hashFile.storeID(getDataHash(data), id); + hashFile.storeID(hash, id); + if (logger.isDebugEnabled()) { + logger.debug("storeData stored thread={} hash={} id={} offset={} summary={}", threadName(), hash, id, + offset, summarize(data)); + } + } else if (logger.isDebugEnabled()) { + int hash = getDataHash(data); + logger.debug("storeData reuse thread={} hash={} existingId={} summary={}", threadName(), hash, id, + summarize(data)); } return id; @@ -258,6 +309,9 @@ public int storeData(byte[] data) throws IOException { * @throws IOException If an I/O error occurred. */ public void sync() throws IOException { + if (logger.isDebugEnabled()) { + logger.debug("sync thread={} invoked", threadName()); + } hashFile.sync(); idFile.sync(); dataFile.sync(); @@ -269,6 +323,9 @@ public void sync() throws IOException { * @throws IOException If an I/O error occurred. */ public void clear() throws IOException { + if (logger.isDebugEnabled()) { + logger.debug("clear thread={} invoked", threadName()); + } try { hashFile.clear(); } finally { @@ -305,6 +362,17 @@ public void close() throws IOException { * @param data The data to calculate the hash code for. * @return A hash code for the supplied data. */ + private static String summarize(byte[] data) { + if (data == null) { + return "null"; + } + return "len=" + data.length + ",hash=" + Arrays.hashCode(data); + } + + private static String threadName() { + return Thread.currentThread().getName(); + } + private int getDataHash(byte[] data) { CRC32 crc32 = new CRC32(); crc32.update(data); diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/HashFile.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/HashFile.java index 594caa5872b..cdb17e02a3b 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/HashFile.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/HashFile.java @@ -297,12 +297,20 @@ public void sync() throws IOException { public void sync(boolean force) throws IOException { sync(); + // Always honor explicit requests to force metadata to disk while preserving data durability for sync(false) nioFile.force(force); } @Override public void close() throws IOException { - nioFile.close(); + // Persist current header (bucketCount, bucketSize, itemCount) before closing + // to ensure readers after reopen see the correct table layout even if no + // explicit sync() was called after structural changes (e.g., rehash). + try { + sync(true); + } finally { + nioFile.close(); + } } /*-----------------* @@ -494,6 +502,7 @@ private void increaseHashTable() throws IOException { // Discard the temp file } tmpFile.delete(); + sync(true); } /*------------------------* diff --git a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/IDFile.java b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/IDFile.java index 805ce7382cf..92b2cf305d1 100644 --- a/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/IDFile.java +++ b/core/sail/nativerdf/src/main/java/org/eclipse/rdf4j/sail/nativerdf/datastore/IDFile.java @@ -264,7 +264,7 @@ public void sync() throws IOException { } public void sync(boolean force) throws IOException { - nioFile.force(false); + nioFile.force(force); } /** diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/DataFileDirectWriteFlushRaceReproducerTestIT.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/DataFileDirectWriteFlushRaceReproducerTestIT.java new file mode 100644 index 00000000000..274c39ccf4f --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/DataFileDirectWriteFlushRaceReproducerTestIT.java @@ -0,0 +1,185 @@ +/** + ******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + ******************************************************************************* + */ +package org.eclipse.rdf4j.sail.nativerdf; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.parallel.Isolated; + +/** + * Reproduces a corruption scenario where a mix of small (buffered) and large (direct-write) values are written while a + * concurrent reader forces frequent DataFile.flush() via reads. The race between flush() offset calculation and + * direct-write size updates can misplace buffered bytes, corrupting records. + */ +@Tag("slow") +@Isolated +public class DataFileDirectWriteFlushRaceReproducerTestIT { + + @TempDir + File dataDir; + + private ValueStore valueStore; + + @BeforeEach + public void setup() throws IOException { + valueStore = new ValueStore(dataDir); + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + } + + @AfterEach + public void tearDown() throws IOException { + try { + if (valueStore != null) { + valueStore.close(); + } + } finally { + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + } + } + + @Test + public void mixedDirectAndBufferedWritesWithConcurrentFlushCauseCorruption() throws Exception { + // Multiple writers with occasional large literals to trigger direct-write path + int writerThreads = 4; // small-value writers (buffered path) + int valuesPerWriter = 40_000; + + ExecutorService pool = Executors.newFixedThreadPool(writerThreads + 1); + CountDownLatch start = new CountDownLatch(1); + + DataStore ds = (DataStore) reflect(valueStore, "dataStore"); + + List> futures = new ArrayList<>(); + + for (int i = 0; i < writerThreads; i++) { + final int seed = 7777 + i; + futures.add(pool.submit(() -> { + Random rnd = new Random(seed); + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + for (int j = 0; j < valuesPerWriter; j++) { + try { + // Always write a small IRI first to exercise buffered writes + String ns = "http://ns/" + rnd.nextInt(64) + "/"; + String local = "s" + rnd.nextInt(50_000); + valueStore.storeValue(valueStore.createIRI(ns, local)); + + // Small literal keeps buffer hot (buffered path) + valueStore.storeValue(valueStore.createLiteral("v" + rnd.nextInt(10000))); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return null; + })); + } + + // Big writer: continuously write large literals to trigger direct-write path + futures.add(pool.submit(() -> { + Random rnd = new Random(9999); + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + for (int i = 0; i < valuesPerWriter; i++) { + try { + String big = buildString(12_000 + rnd.nextInt(6000)); + valueStore.storeValue(valueStore.createLiteral(big)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return null; + })); + + // Flusher thread: keep forcing getData(1) and random ids to trigger flush() frequently + futures.add(pool.submit(() -> { + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + Random r = new Random(1337); + long until = System.nanoTime() + TimeUnit.SECONDS.toNanos(30); + while (System.nanoTime() < until) { + try { + int max = ds.getMaxID(); + if (max >= 1) { + // alternate between a stable low id and a random high id + ds.getData(1); + ds.getData(1 + r.nextInt(Math.max(1, max))); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + Thread.onSpinWait(); + } + return null; + })); + + start.countDown(); + + try { + for (Future f : futures) { + f.get(); + } + } finally { + pool.shutdownNow(); + } + + valueStore.close(); + valueStore = new ValueStore(dataDir); + + valueStore.checkConsistency(); + } + + private static String buildString(int targetLength) { + StringBuilder b = new StringBuilder(targetLength); + while (b.length() < targetLength) { + b.append('x'); + } + return b.toString(); + } + + private static Object reflect(Object target, String field) { + try { + Field f = target.getClass().getDeclaredField(field); + f.setAccessible(true); + return f.get(target); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } +} diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/DataFileFlushRaceReproducerTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/DataFileFlushRaceReproducerTest.java new file mode 100644 index 00000000000..0516056f511 --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/DataFileFlushRaceReproducerTest.java @@ -0,0 +1,179 @@ +/** + ******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + ******************************************************************************* + */ +package org.eclipse.rdf4j.sail.nativerdf; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.parallel.Isolated; + +/** + * Attempts to deterministically reproduce corruption caused by a race between {@code DataFile.flush()} and concurrent + * buffered writes. The test hammers the {@link ValueStore} with small writes while a separate thread forces frequent + * {@code DataFile.getData(...)} calls (which invoke {@code flush()}). Finally, after closing and reopening the store, a + * consistency check is expected to fail if corruption has occurred. + * + * The workload and explicit flusher increase the likelihood of the following race: - Writer increases the in-memory + * write buffer and {@code nioFileSize} without flushing - Concurrent read forces {@code flush()}, which computes the + * flush offset using the current {@code nioFileSize} while copying a smaller snapshot of the buffer content, producing + * misaligned writes. + */ +@Isolated +public class DataFileFlushRaceReproducerTest { + + @TempDir + File dataDir; + + private ValueStore valueStore; + + @BeforeEach + public void setup() throws IOException { + valueStore = new ValueStore(dataDir); + // surface corruption as exceptions (not soft-fail) + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + } + + @AfterEach + public void tearDown() throws IOException { + try { + if (valueStore != null) { + valueStore.close(); + } + } finally { + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + } + } + + @Test + public void flushDuringConcurrentWritesTriggersCorruption() throws Exception { + int writerThreads = 8; // DataStore serializes storeData, but this keeps the buffer hot + int valuesPerWriter = 50_000; // small values -> buffered path, many flush opportunities + + ExecutorService pool = Executors.newFixedThreadPool(writerThreads + 1); + CountDownLatch start = new CountDownLatch(1); + + // reflectively access the underlying DataStore so we can force getData(id) to + // call DataFile.flush() on every iteration (ValueStore caches may avoid reads otherwise) + DataStore ds = (DataStore) reflect(valueStore, "dataStore"); + + List> futures = new ArrayList<>(); + + // Writers + for (int i = 0; i < writerThreads; i++) { + final int seed = 4242 + i; + futures.add(pool.submit(() -> { + Random rnd = new Random(seed); + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + for (int j = 0; j < valuesPerWriter; j++) { + try { + // keep values small to stay on buffered write path + String ns = "http://ex/" + rnd.nextInt(64) + "/"; + String local = "l" + rnd.nextInt(20_000); + valueStore.storeValue(valueStore.createIRI(ns + local)); + + switch (rnd.nextInt(3)) { + case 0: + valueStore.storeValue(valueStore.createLiteral("v" + rnd.nextInt(10_000))); + break; + case 1: + valueStore.storeValue(valueStore.createLiteral("v" + rnd.nextInt(10_000), "en")); + break; + default: + valueStore.storeValue(valueStore + .createLiteral("v" + rnd.nextInt(10_000), + valueStore.createIRI("http://dt/" + rnd.nextInt(128)))); + break; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return null; + })); + } + + // Flusher: repeatedly read an early id to force DataFile.getData(...)->flush() + futures.add(pool.submit(() -> { + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + // spin for roughly the duration of writers + long until = System.nanoTime() + TimeUnit.SECONDS.toNanos(25); + while (System.nanoTime() < until) { + try { + int max = ds.getMaxID(); + if (max >= 1) { + // focus on a small id to avoid cache churn and ensure frequent flushes + ds.getData(1); + } + } catch (IOException e) { + // propagate to fail the test + throw new RuntimeException(e); + } + // minimal pause to yield + Thread.onSpinWait(); + } + return null; + })); + + // go! + start.countDown(); + + try { + for (Future f : futures) { + f.get(); + } + } finally { + pool.shutdownNow(); + } + + // Close and reopen to ensure we load from disk only + valueStore.close(); + valueStore = new ValueStore(dataDir); + + valueStore.checkConsistency(); + } + + private static Object reflect(Object target, String field) { + try { + Field f = target.getClass().getDeclaredField(field); + f.setAccessible(true); + return f.get(target); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } +} diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/DataFileHeapGuardTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/DataFileHeapGuardTest.java new file mode 100644 index 00000000000..2ce06a0217a --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/DataFileHeapGuardTest.java @@ -0,0 +1,73 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.nativerdf; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; + +import org.eclipse.rdf4j.common.io.NioFile; +import org.eclipse.rdf4j.sail.nativerdf.datastore.DataFile; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.parallel.Isolated; + +/** + * Verifies that DataFile enforces a large-allocation guard during reads and that the guard can be tuned through system + * properties. + */ +@Isolated +public class DataFileHeapGuardTest { + + @TempDir + File tmp; + + @BeforeEach + public void setUp() { + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + } + + @Test + public void getData_largeLength_triggersGuardIOException() throws Exception { + System.out.println("DataFile.LARGE_READ_THRESHOLD " + DataFile.LARGE_READ_THRESHOLD); + System.out.println("DataFile.LARGE_READ_THRESHOLD " + DataFile.LARGE_READ_THRESHOLD / 1024 / 1024 + "MB"); + Path p = tmp.toPath().resolve("guard.dat"); + + try (DataFile df = new DataFileWithSimulatedLowHeap(p.toFile())) { + // write header + } + + try (NioFile nf = new NioFile(p.toFile())) { + long headerOffset = 4; // MAGIC_NUMBER(3) + version(1) + int huge = DataFile.LARGE_READ_THRESHOLD + 1; + nf.writeInt(huge, headerOffset); + } + + try (DataFile df = new DataFileWithSimulatedLowHeap(p.toFile())) { + assertThrows(IOException.class, () -> df.getData(4)); + } + } + + static class DataFileWithSimulatedLowHeap extends DataFile { + public DataFileWithSimulatedLowHeap(File file) throws IOException { + super(file); + } + + @Override + public long getFreeMemory(Runtime rt) { + return DataFile.LARGE_READ_THRESHOLD / 2; + } + } + +} diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStoreCorruptionTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStoreCorruptionTestIT.java similarity index 98% rename from core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStoreCorruptionTest.java rename to core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStoreCorruptionTestIT.java index 12119ceb50b..2f75229837e 100644 --- a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStoreCorruptionTest.java +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeSailStoreCorruptionTestIT.java @@ -39,17 +39,21 @@ import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.parallel.Isolated; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Tests how the NativeStore handles corruption in the data files. */ -public class NativeSailStoreCorruptionTest { +@Tag("slow") +@Isolated +public class NativeSailStoreCorruptionTestIT { - private static final Logger logger = LoggerFactory.getLogger(NativeSailStoreCorruptionTest.class); + private static final Logger logger = LoggerFactory.getLogger(NativeSailStoreCorruptionTestIT.class); @TempDir File tempFolder; diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreConcurrentValueStoreCorruptionTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreConcurrentValueStoreCorruptionTest.java new file mode 100644 index 00000000000..731cd3911a1 --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreConcurrentValueStoreCorruptionTest.java @@ -0,0 +1,125 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.nativerdf; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.parallel.Isolated; + +/** + * Exercises the {@link ValueStore} obtained from an initialized {@link NativeStore} with highly concurrent writes. The + * workload mirrors {@link ValueStoreConcurrentWriteCorruptionTest} but goes through {@link NativeStore} to demonstrate + * that the embedded {@link ValueStore} can still become corrupted. + */ +@Isolated +public class NativeStoreConcurrentValueStoreCorruptionTest { + + @TempDir + File dataDir; + + @AfterEach + public void resetSoftFailFlag() { + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = true; + } + + @Test + public void concurrentNativeValueStoreWritesTriggerCorruption() throws Exception { + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + + NativeStore store = new NativeStore(dataDir, "spoc,posc"); + store.init(); + + ValueStore valueStore = (ValueStore) store.getValueFactory(); + + int writers = 16; + int valuesPerWriter = 1000; + + ExecutorService pool = Executors.newFixedThreadPool(writers); + CountDownLatch start = new CountDownLatch(1); + List> futures = new ArrayList<>(); + + for (int i = 0; i < writers; i++) { + final int seed = 42 + i; + futures.add(pool.submit(() -> { + Random random = new Random(seed); + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + for (int j = 0; j < valuesPerWriter; j++) { + try { + storeRandomValue(random, valueStore); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + return null; + })); + } + + start.countDown(); + + try { + for (Future future : futures) { + future.get(); + } + } finally { + pool.shutdownNow(); + } + + store.shutDown(); + + NativeStore reopened = new NativeStore(dataDir, "spoc,posc"); + reopened.init(); + + ValueStore reopenedValueStore = (ValueStore) reopened.getValueFactory(); + + reopenedValueStore.checkConsistency(); + + reopened.shutDown(); + } + + private static void storeRandomValue(Random random, ValueStore valueStore) throws IOException { + String namespace = "http://example.org/ns/" + random.nextInt(200) + "/"; + String localName = "s" + random.nextInt(50_000); + valueStore.storeValue(valueStore.createIRI(namespace, localName)); + + switch (random.nextInt(3)) { + case 0: + valueStore.storeValue(valueStore.createLiteral("value-" + random.nextInt(100_000))); + break; + case 1: + valueStore.storeValue(valueStore.createLiteral("value-" + random.nextInt(100_000), "en")); + break; + default: + valueStore.storeValue(valueStore.createLiteral( + "value-" + random.nextInt(100_000), + valueStore.createIRI("http://example.org/dt/" + random.nextInt(256)))); + break; + } + } +} diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreRepositoryCorruptionReproducerTestIT.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreRepositoryCorruptionReproducerTestIT.java new file mode 100644 index 00000000000..33ca07ef208 --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreRepositoryCorruptionReproducerTestIT.java @@ -0,0 +1,191 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.nativerdf; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.rdf4j.common.transaction.IsolationLevels; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.repository.RepositoryConnection; +import org.eclipse.rdf4j.repository.RepositoryException; +import org.eclipse.rdf4j.repository.RepositoryResult; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.parallel.Isolated; + +/** + * Reproduces a corruption scenario using only public repository APIs. Multiple writer threads perform rapid + * begin/add/commit cycles with {@link IsolationLevels#NONE} while a concurrent reader thread continuously iterates over + * all statements. This stresses NativeStore's value writes (buffered and direct-write) and flush-on-read behavior + * without manipulating files directly. + * + * The test expects that, with soft-fail disabled, iterating statements after the workload can throw a + * {@link RepositoryException} if values have become corrupted on disk. This is a reproducer; it is expected to fail on + * affected implementations. + */ +@Tag("slow") +@Isolated +public class NativeStoreRepositoryCorruptionReproducerTestIT { + + @TempDir + File dataDir; + + @AfterEach + public void resetSoftFail() { + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + } + + @Test + public void concurrentAddAndReadMayCorrupt() throws Exception { + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; // surface corruption + + NativeStore store = new NativeStore(dataDir, "spoc,posc"); + store.init(); + SailRepository repo = new SailRepository(store); + repo.init(); + final SailRepository repoRef = repo; // effectively final for lambdas + + int writers = 8; + int perWriterOps = 15_000; + + ExecutorService pool = Executors.newFixedThreadPool(writers + 2); + CountDownLatch start = new CountDownLatch(1); + List> futures = new ArrayList<>(); + + // Writer threads: add small and occasional large values to exercise both buffered and direct-write paths + for (int i = 0; i < writers; i++) { + final int seed = 1234 + i; + futures.add(pool.submit(() -> { + Random rnd = new Random(seed); + try (RepositoryConnection conn = repoRef.getConnection()) { + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + ValueFactory vf = conn.getValueFactory(); + for (int j = 0; j < perWriterOps; j++) { + conn.begin(IsolationLevels.NONE); + try { + // small IRI + literal + String ns = "http://ex/" + rnd.nextInt(128) + "/"; + String local = "l" + rnd.nextInt(50_000); + conn.add(vf.createIRI(ns, local), vf.createIRI("urn:p"), vf.createLiteral("v" + j)); + + // occasionally write larger literal to hit direct write path + if ((j % 200) == 0) { + String big = buildString(12_000 + rnd.nextInt(4000)); + conn.add(vf.createIRI("urn:s" + j), vf.createIRI("urn:p"), vf.createLiteral(big)); + } + conn.commit(); + } catch (Throwable t) { + try { + conn.rollback(); + } catch (Throwable ignore) { + } + // surface failures in worker + throw new RuntimeException(t); + } + } + } + return null; + })); + } + + // Reader thread: continuously iterate, forcing flush-on-read in DataFile through ValueStore + futures.add(pool.submit(() -> { + try (RepositoryConnection conn = repoRef.getConnection()) { + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + conn.begin(IsolationLevels.NONE); + try { + long until = System.nanoTime() + TimeUnit.SECONDS.toNanos(30); + while (System.nanoTime() < until) { + try (RepositoryResult statements = conn.getStatements(null, null, null, false)) { + statements.forEachRemaining(st -> { + st.toString(); + // no-op; force materialization + }); + } + Thread.onSpinWait(); + } + conn.commit(); + } catch (Throwable t) { + try { + conn.rollback(); + } catch (Throwable ignore) { + } + throw new RuntimeException(t); + } + } + return null; + })); + + // kick off + start.countDown(); + try { + for (Future f : futures) { + f.get(); + } + } finally { + pool.shutdownNow(); + pool.awaitTermination(10, TimeUnit.SECONDS); + } + + // Close and reopen to ensure disk state is reloaded + repo.shutDown(); + store.shutDown(); + + store = new NativeStore(dataDir, "spoc,posc"); + store.init(); + repo = new SailRepository(store); + repo.init(); + + try (RepositoryConnection conn = repo.getConnection()) { + // If corruption occurred, iterating statements should throw RepositoryException + + try (RepositoryResult statements = conn.getStatements(null, null, null, false)) { + statements.forEachRemaining(st -> { + st.toString(); + }); + } + } + + repo.shutDown(); + store.shutDown(); + } + + private static String buildString(int len) { + StringBuilder sb = new StringBuilder(len); + while (sb.length() < len) { + sb.append('x'); + } + return sb.toString(); + } +} diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreUtf8ExtremesCorruptionTestIT.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreUtf8ExtremesCorruptionTestIT.java new file mode 100644 index 00000000000..1228738c683 --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreUtf8ExtremesCorruptionTestIT.java @@ -0,0 +1,240 @@ +/** + ******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.nativerdf; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.parallel.Isolated; + +/** + * Attempts to reproduce a corrupt NativeStore by iteratively storing IRIs and BNodes whose UTF-8 encodings exercise + * extremes and boundary lengths (1/2/3/4-byte code points; sizes near small and page-sized thresholds). A concurrent + * reader thread repeatedly calls into the underlying DataStore to force frequent DataFile.flush() operations while + * writes happen, increasing the chance of a flush/write race corrupting values. + * + * If corruption occurs, the subsequent checkConsistency() call will throw, causing this test to fail. This test is + * designed as a reproducer (expected to fail when corruption is possible) rather than an assertion of correctness. + */ +@Tag("slow") +@Isolated +public class NativeStoreUtf8ExtremesCorruptionTestIT { + + @TempDir + File dataDir; + + private NativeStore store; + + private ValueStore valueStore; + + @BeforeEach + public void setup() throws Exception { + // surface corruption as exceptions + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + store = new NativeStore(dataDir, "spoc,posc"); + store.init(); + valueStore = (ValueStore) store.getValueFactory(); + } + + @AfterEach + public void tearDown() throws Exception { + try { + if (store != null) { + store.shutDown(); + } + } finally { + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + } + } + + @Test + public void utf8ExtremesForIrisAndBNodesMayCorrupt() throws Exception { + // Reflect underlying DataStore to drive flushes via getData(id) + DataStore ds = (DataStore) reflect(valueStore, "dataStore"); + + ExecutorService pool = Executors.newFixedThreadPool(3); + CountDownLatch start = new CountDownLatch(1); + + List> futures = new ArrayList<>(); + + // Writer: iterate across code point classes and length boundaries + futures.add(pool.submit(() -> { + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + String ns = "http://example.org/ns/"; + + // code point classes: ASCII(1B), U+00A2(2B), U+20AC(3B), U+1F600(4B) + String[] cps = new String[] { "a", "\u00A2", "\u20AC", "\uD83D\uDE00" }; + + // length targets (bytes) to exercise small/medium/page-ish boundaries + int[] byteTargets = new int[] { + 1, 2, 3, 4, 5, + 15, 31, 63, 127, 255, 256, 257, + 512 - 8, 512 - 1, 512, 512 + 1, + 1024 - 8, 1024 - 1, 1024, 1024 + 1, + 2048 - 8, 2048 - 1, 2048, 2048 + 1, + 4096 - 16, 4096 - 8, 4096 - 5, 4096 - 4, 4096 - 1, 4096, 4096 + 1, (64 * 1024) - 1, (64 * 1024), + (64 * 1024) + 1, + (1024 * 1024) - 1, (1024 * 1024), (1024 * 1024) + 1 + }; + + try { + // prime namespace so ns id is stable + valueStore.storeValue(valueStore.createIRI(ns, "seed")); + + for (String cp : cps) { + for (int targetBytes : byteTargets) { + // Build local name approximately targetBytes long in UTF-8 + String local = buildUtf8Length(cp, targetBytes); + valueStore.storeValue(valueStore.createIRI(ns, local)); + + // Also add a BNode id with similar UTF-8 size + String bnodeId = buildUtf8Length(cp, targetBytes); + valueStore.storeValue(valueStore.createBNode(bnodeId)); + + // Interleave small literal writes to keep buffered path hot + valueStore.storeValue(valueStore.createLiteral("v" + targetBytes)); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + })); + + // Flusher: aggressively force reads to trigger DataFile.flush() + futures.add(pool.submit(() -> { + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + long until = System.nanoTime() + Duration.ofSeconds(30).toNanos(); + while (System.nanoTime() < until) { + try { + int max = ds.getMaxID(); + if (max >= 1) { + // alternate between a stable low id and a random-ish recent id + ds.getData(1); + ds.getData(Math.max(1, max)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + Thread.onSpinWait(); + } + return null; + })); + + // Big IRI writer: periodically write very large IRI/BNode values to trigger direct-write path + futures.add(pool.submit(() -> { + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + String ns = "http://example.org/big/"; + try { + // prime + valueStore.storeValue(valueStore.createIRI(ns, "start")); + for (int i = 0; i < 2000; i++) { + String bigLocal = buildUtf8Length("a", 12_000 + (i % 200)); + valueStore.storeValue(valueStore.createIRI(ns, bigLocal)); + // large bnode ids too + String bigBNode = buildUtf8Length("\uD83D\uDE00", 10_000 + (i % 300)); + valueStore.storeValue(valueStore.createBNode(bigBNode)); + // keep buffer hot in between + valueStore.storeValue(valueStore.createLiteral("keep" + i)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + })); + + // go! + start.countDown(); + try { + for (Future f : futures) { + f.get(); + } + } finally { + pool.shutdownNow(); + pool.awaitTermination(5, TimeUnit.SECONDS); + } + + // Close and reopen to force load-from-disk + store.shutDown(); + store = new NativeStore(dataDir, "spoc,posc"); + store.init(); + valueStore = (ValueStore) store.getValueFactory(); + + // If corruption occurred this should throw, causing the test to fail (reproducer) + valueStore.checkConsistency(); + } + + private static Object reflect(Object target, String field) { + try { + Field f = target.getClass().getDeclaredField(field); + f.setAccessible(true); + return f.get(target); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + private static String buildUtf8Length(String unit, int targetBytes) { + if (targetBytes <= 0) { + return ""; + } + StringBuilder sb = new StringBuilder(); + int unitBytes = unit.getBytes(StandardCharsets.UTF_8).length; + if (unitBytes <= 0) { + unitBytes = 1; + } + int reps = Math.max(1, targetBytes / unitBytes); + for (int i = 0; i < reps; i++) { + sb.append(unit); + } + // if we overshot, trim by bytes (safe: only append ASCII 'x' when under) + while (sb.toString().getBytes(StandardCharsets.UTF_8).length > targetBytes && sb.length() > 0) { + sb.setLength(sb.length() - 1); + } + while (sb.toString().getBytes(StandardCharsets.UTF_8).length < targetBytes) { + sb.append('x'); + } + return sb.toString(); + } +} diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreValueStoreCorruptionReproductionTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreValueStoreCorruptionReproductionTest.java new file mode 100644 index 00000000000..9f16a21f76b --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreValueStoreCorruptionReproductionTest.java @@ -0,0 +1,122 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.nativerdf; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.model.util.Values; +import org.eclipse.rdf4j.model.vocabulary.RDF; +import org.eclipse.rdf4j.model.vocabulary.RDFS; +import org.eclipse.rdf4j.repository.RepositoryConnection; +import org.eclipse.rdf4j.repository.RepositoryException; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.parallel.Isolated; + +/** + * Reproduces a corrupt ValueStore by tampering with the values.dat type byte and verifies that reading statements fails + * when soft-fail is disabled. + */ +@Isolated +public class NativeStoreValueStoreCorruptionReproductionTest { + + @TempDir + File tempFolder; + + private SailRepository repo; + + private File dataDir; + + private final ValueFactory F = SimpleValueFactory.getInstance(); + + @BeforeEach + public void setup() { + dataDir = new File(tempFolder, "dbmodel"); + dataDir.mkdir(); + repo = new SailRepository(new NativeStore(dataDir, "spoc,posc")); + repo.init(); + + // Insert the same base dataset used by NativeSailStoreCorruptionTest to ensure stable file layout + IRI CTX_1 = F.createIRI("urn:one"); + IRI CTX_2 = F.createIRI("urn:two"); + + Statement S0 = F.createStatement(F.createIRI("http://example.org/a0"), RDFS.LABEL, F.createLiteral("zero")); + Statement S1 = F.createStatement(F.createIRI("http://example.org/b1"), RDFS.LABEL, F.createLiteral("one")); + Statement S2 = F.createStatement(F.createIRI("http://example.org/c2"), RDFS.LABEL, F.createLiteral("two")); + Statement S3 = F.createStatement(Values.bnode(), RDF.TYPE, Values.bnode()); + Statement S4 = F.createStatement(F.createIRI("http://example.org/c2"), RDFS.LABEL, + F.createLiteral("two", "en")); + Statement S5 = F.createStatement(F.createIRI("http://example.org/c2"), RDFS.LABEL, F.createLiteral(1.2)); + + try (RepositoryConnection conn = repo.getConnection()) { + conn.add(S0); + conn.add(S1, CTX_1); + conn.add(S2, CTX_2); + conn.add(S2, CTX_2); + conn.add(S3, CTX_2); + conn.add(S4, CTX_2); + conn.add(S5, CTX_2); + } + } + + @AfterEach + public void tearDown() { + repo.shutDown(); + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + } + + @Test + public void corruptValuesDatInvalidTypeShouldBreakReads() throws IOException { + // Disable soft-fail to surface corruption as an exception + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + + // Close repo to release files for mutation + repo.shutDown(); + + // Flip a byte in values.dat at a position that maps to a value type marker + // This offset mirrors NativeSailStoreCorruptionTest.testCorruptValuesDatFileInvalidTypeError + File valuesFile = new File(dataDir, "values.dat"); + overwriteByte(valuesFile, 174, 0x0); + + // Reopen; attempting to read statements should now throw a RepositoryException + repo.init(); + try (RepositoryConnection conn = repo.getConnection()) { + assertThrows(RepositoryException.class, () -> { + conn.getStatements(null, null, null, false).forEachRemaining(s -> { + // Force materialization of all statements + }); + }); + } + } + + private static void overwriteByte(File file, long pos, int newVal) throws IOException { + try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) { + long fileLength = raf.length(); + if (pos >= fileLength) { + throw new IOException( + "Attempt to write outside the existing file bounds: " + pos + " >= " + fileLength); + } + raf.seek(pos); + raf.writeByte(newVal); + } + } +} diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreValueStoreCorruptionTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreValueStoreCorruptionTest.java new file mode 100644 index 00000000000..60c6c580fa5 --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/NativeStoreValueStoreCorruptionTest.java @@ -0,0 +1,272 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + ******************************************************************************/ + +package org.eclipse.rdf4j.sail.nativerdf; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.File; +import java.util.List; + +import org.eclipse.rdf4j.common.iteration.Iterations; +import org.eclipse.rdf4j.common.transaction.IsolationLevels; +import org.eclipse.rdf4j.model.Literal; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.RepositoryConnection; +import org.eclipse.rdf4j.repository.RepositoryException; +import org.eclipse.rdf4j.repository.RepositoryResult; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Demonstrates how data written through normal repository operations can corrupt the underlying {@link ValueStore}. + */ +public class NativeStoreValueStoreCorruptionTest { + + @TempDir + File dataDir; + + @Test + public void longLanguageTagShouldNotCorruptValueStore() throws Exception { + SailRepository repo = new SailRepository(new NativeStore(dataDir)); + repo.init(); + + String lang = buildLanguageTag(256); + + assertThrows(RepositoryException.class, () -> { + + try (RepositoryConnection connection = repo.getConnection()) { + connection.begin(IsolationLevels.NONE); + connection.add(connection.getValueFactory().createIRI("urn:subj"), + connection.getValueFactory().createIRI("urn:pred"), + connection.getValueFactory().createLiteral("value", lang)); + connection.commit(); + } + }); + + repo.shutDown(); + + SailRepository reopened = new SailRepository(new NativeStore(dataDir)); + reopened.init(); + + try (RepositoryConnection connection = reopened.getConnection()) { + try (RepositoryResult statements = connection.getStatements(null, null, null, true)) { + List list = Iterations.asList(statements); + assertEquals(0, list.size()); + } + } + + reopened.shutDown(); + } + + @Test + public void longLanguageTagShouldNotCorruptValueStoreIncremental() throws Exception { + SailRepository repo = new SailRepository(new NativeStore(dataDir)); + repo.init(); + + for (int i = 0; i < 256; i++) { + String lang = buildLanguageTag(i); + + try (RepositoryConnection connection = repo.getConnection()) { + connection.begin(IsolationLevels.NONE); + connection.add(connection.getValueFactory().createIRI("urn:subj"), + connection.getValueFactory().createIRI("urn:pred"), + connection.getValueFactory().createLiteral("value", lang)); + connection.commit(); + } + + repo.shutDown(); + + SailRepository reopened = new SailRepository(new NativeStore(dataDir)); + reopened.init(); + + try (RepositoryConnection connection = reopened.getConnection()) { + try (RepositoryResult statements = connection.getStatements(null, null, null, true)) { + List list = Iterations.asList(statements); + assertEquals(1, list.size()); + Literal literal = (Literal) list.get(0).getObject(); + assertEquals(lang, literal.getLanguage().orElseThrow()); + } + } + + try (SailRepositoryConnection connection = reopened.getConnection()) { + connection.clear(); + } + + reopened.shutDown(); + } + } + + @Test + public void longDatatypeShouldNotCorruptValueStore() throws Exception { + SailRepository repo = new SailRepository(new NativeStore(dataDir)); + repo.init(); + + String longDatatype = buildIRIOfLength(5000); + + try (RepositoryConnection connection = repo.getConnection()) { + connection.begin(IsolationLevels.NONE); + connection.add(connection.getValueFactory().createIRI("urn:subj"), + connection.getValueFactory().createIRI("urn:pred"), + connection.getValueFactory() + .createLiteral("value", + connection.getValueFactory().createIRI(longDatatype))); + connection.commit(); + } + + repo.shutDown(); + + SailRepository reopened = new SailRepository(new NativeStore(dataDir)); + reopened.init(); + + try (RepositoryConnection connection = reopened.getConnection()) { + try (RepositoryResult statements = connection.getStatements(null, null, null, true)) { + List list = Iterations.asList(statements); + assertEquals(1, list.size()); + Literal literal = (Literal) list.get(0).getObject(); + assertEquals(longDatatype, literal.getDatatype().stringValue()); + } + } + + reopened.shutDown(); + } + + @Test + public void longLabelShouldNotCorruptValueStore() throws Exception { + SailRepository repo = new SailRepository(new NativeStore(dataDir)); + repo.init(); + + String longLabel = buildString(20000); + + try (RepositoryConnection connection = repo.getConnection()) { + connection.begin(IsolationLevels.NONE); + connection.add(connection.getValueFactory().createIRI("urn:subj"), + connection.getValueFactory().createIRI("urn:pred"), + connection.getValueFactory().createLiteral(longLabel)); + connection.commit(); + } + + repo.shutDown(); + + SailRepository reopened = new SailRepository(new NativeStore(dataDir)); + reopened.init(); + + try (RepositoryConnection connection = reopened.getConnection()) { + try (RepositoryResult statements = connection.getStatements(null, null, null, true)) { + List list = Iterations.asList(statements); + assertEquals(1, list.size()); + Literal literal = (Literal) list.get(0).getObject(); + assertEquals(longLabel, literal.getLabel()); + } + } + + reopened.shutDown(); + } + + @Test + public void longIRIShouldNotCorruptValueStore() throws Exception { + SailRepository repo = new SailRepository(new NativeStore(dataDir)); + repo.init(); + + String longIri = buildIRIOfLength(5000); + + try (RepositoryConnection connection = repo.getConnection()) { + connection.begin(IsolationLevels.NONE); + connection.add(connection.getValueFactory().createIRI(longIri), + connection.getValueFactory().createIRI("urn:pred"), + connection.getValueFactory().createLiteral("value")); + connection.commit(); + } + + repo.shutDown(); + + SailRepository reopened = new SailRepository(new NativeStore(dataDir)); + reopened.init(); + + try (RepositoryConnection connection = reopened.getConnection()) { + try (RepositoryResult statements = connection.getStatements(null, null, null, true)) { + List list = Iterations.asList(statements); + assertEquals(1, list.size()); + Statement st = list.get(0); + assertEquals(longIri, st.getSubject().stringValue()); + } + } + + reopened.shutDown(); + } + + @Test + public void longBNodeShouldNotCorruptValueStore() throws Exception { + SailRepository repo = new SailRepository(new NativeStore(dataDir)); + repo.init(); + + String longBnodeId = buildString(10000); + + try (RepositoryConnection connection = repo.getConnection()) { + connection.begin(IsolationLevels.NONE); + connection.add(connection.getValueFactory().createBNode(longBnodeId), + connection.getValueFactory().createIRI("urn:pred"), + connection.getValueFactory().createLiteral("value")); + connection.commit(); + } + + repo.shutDown(); + + SailRepository reopened = new SailRepository(new NativeStore(dataDir)); + reopened.init(); + + try (RepositoryConnection connection = reopened.getConnection()) { + try (RepositoryResult statements = connection.getStatements(null, null, null, true)) { + List list = Iterations.asList(statements); + assertEquals(1, list.size()); + Statement st = list.get(0); + // ensure subject is a BNode with the long id + assertEquals(longBnodeId, st.getSubject().stringValue()); + } + } + + reopened.shutDown(); + } + + private String buildString(int targetLength) { + StringBuilder builder = new StringBuilder(targetLength); + while (builder.length() < targetLength) { + builder.append('x'); + } + return builder.toString(); + } + + private String buildIRIOfLength(int targetLength) { + String base = "http://example.org/"; + StringBuilder builder = new StringBuilder(targetLength); + builder.append(base); + while (builder.length() < targetLength) { + builder.append('a'); + } + return builder.toString(); + } + + private String buildLanguageTag(int targetLength) { + StringBuilder builder = new StringBuilder(targetLength); + builder.append("en"); + while (builder.length() < targetLength) { + builder.append('-'); + int segmentLength = Math.min(8, targetLength - builder.length()); + for (int i = 0; i < segmentLength; i++) { + builder.append('a'); + } + } + return builder.toString(); + } +} diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/ValueStoreConcurrentWriteCorruptionTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/ValueStoreConcurrentWriteCorruptionTest.java new file mode 100644 index 00000000000..09618446592 --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/ValueStoreConcurrentWriteCorruptionTest.java @@ -0,0 +1,132 @@ +/** + ******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + ******************************************************************************* + */ +package org.eclipse.rdf4j.sail.nativerdf; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.parallel.Isolated; + +/** + * Induces concurrent writes directly against {@link ValueStore} to demonstrate a race condition that corrupts the + * underlying values data files. This does not go through {@link NativeStore}'s internal locking and therefore + * deliberately stresses {@link org.eclipse.rdf4j.sail.nativerdf.datastore.DataFile} concurrent write behavior. + */ +@Isolated +public class ValueStoreConcurrentWriteCorruptionTest { + + @TempDir + File dataDir; + + private ValueStore valueStore; + + private final ValueFactory vf = SimpleValueFactory.getInstance(); + + @BeforeEach + public void setup() throws IOException { + valueStore = new ValueStore(dataDir); + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + } + + @AfterEach + public void tearDown() throws IOException { + try { + if (valueStore != null) { + valueStore.close(); + } + } finally { + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + } + } + + @Test + public void concurrentStoreValueShouldCorrupt() throws Exception { + int writers = 16; + int valuesPerWriter = 8000; + + ExecutorService pool = Executors.newFixedThreadPool(writers); + CountDownLatch start = new CountDownLatch(1); + + try { + List> futures = new ArrayList<>(); + + for (int i = 0; i < writers; i++) { + final int seed = 42 + i; + futures.add(pool.submit(() -> writeLoad(start, seed, valuesPerWriter))); + } + + // start all writers concurrently + start.countDown(); + + // wait for completion (propagate any exception from workers) + for (Future f : futures) { + f.get(); + } + } finally { + pool.shutdownNow(); + } + + // Close and reopen the ValueStore to force data reload from disk + valueStore.close(); + valueStore = new ValueStore(dataDir); + + valueStore.checkConsistency(); + + } + + private void writeLoad(CountDownLatch start, int seed, int count) { + try { + start.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + Random rnd = new Random(seed); + for (int i = 0; i < count; i++) { + try { + // small values to exercise buffered write path + String ns = "http://ex/" + rnd.nextInt(100) + "/"; + String local = "l" + rnd.nextInt(50_000); + valueStore.storeValue(vf.createIRI(ns + local)); + + switch (rnd.nextInt(3)) { + case 0: + valueStore.storeValue(vf.createLiteral("v" + rnd.nextInt(10_000))); + break; + case 1: + valueStore.storeValue(vf.createLiteral("v" + rnd.nextInt(10_000), "en")); + break; + default: + valueStore.storeValue( + vf.createLiteral("v" + rnd.nextInt(10_000), vf.createIRI("http://dt/" + rnd.nextInt(100)))); + break; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/ValueStorePartialWriteCorruptionTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/ValueStorePartialWriteCorruptionTest.java new file mode 100644 index 00000000000..689c3b02a57 --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/ValueStorePartialWriteCorruptionTest.java @@ -0,0 +1,289 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.nativerdf; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.parallel.Isolated; + +/** + * Induces ValueStore corruption by simulating partial file writes at the channel level. This test replaces the + * FileChannel inside the internal NioFile used by ValueStore's DataFile with a wrapper that occasionally reports fewer + * bytes written than requested. Since DataFile/NioFile do not loop to ensure full writes, this can leave truncated + * records (e.g., zeroed payloads or invalid length prefixes), reproducing the user-observed symptoms. + */ +@Isolated +public class ValueStorePartialWriteCorruptionTest { + + @TempDir + File dataDir; + + private ValueStore valueStore; + + @BeforeEach + public void setup() throws IOException { + valueStore = new ValueStore(dataDir); + // Disable soft-fail to surface corruption as exceptions in checkConsistency + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + injectChoppyChannel(valueStore, /* partialWriteFrequency= */1); + } + + @AfterEach + public void tearDown() throws IOException { + try { + if (valueStore != null) { + valueStore.close(); + } + } finally { + NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; + } + } + + @Test + public void partialWritesCauseValueStoreInconsistency() throws Exception { + int writers = 8; + int valuesPerWriter = 20_000; + + ExecutorService pool = Executors.newFixedThreadPool(writers); + CountDownLatch start = new CountDownLatch(1); + List> futures = new ArrayList<>(); + + for (int i = 0; i < writers; i++) { + final int seed = 2025 + i; + futures.add(pool.submit(() -> { + Random rnd = new Random(seed); + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + for (int j = 0; j < valuesPerWriter; j++) { + try { + // Mix of small IRIs and occasionally large literals to trigger both buffered and direct writes + String ns = "http://ex/" + rnd.nextInt(64) + "/"; + String local = "s" + rnd.nextInt(50_000); + valueStore.storeValue(valueStore.createIRI(ns, local)); + if ((j % 200) == 0) { + String big = buildString(12_000 + rnd.nextInt(6000)); + valueStore.storeValue(valueStore.createLiteral(big)); + } else { + valueStore.storeValue(valueStore.createLiteral("v" + j)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return null; + })); + } + + start.countDown(); + for (Future f : futures) { + f.get(); + } + pool.shutdownNow(); + + // Close and reopen to force reload from disk only + valueStore.close(); + valueStore = new ValueStore(dataDir); + + // After the write-loop fix in NioFile, partial low-level writes are retried until complete. + // Therefore, checkConsistency should not throw anymore. + valueStore.checkConsistency(); + } + + private static String buildString(int len) { + StringBuilder sb = new StringBuilder(len); + while (sb.length() < len) { + sb.append('x'); + } + return sb.toString(); + } + + /** + * Replace the internal FileChannel of the ValueStore's DataFile with a wrapper that sometimes performs partial + * writes. + */ + private static void injectChoppyChannel(ValueStore vs, int partialWriteFrequency) { + try { + Field dsField = ValueStore.class.getDeclaredField("dataStore"); + dsField.setAccessible(true); + DataStore ds = (DataStore) dsField.get(vs); + + Field dfField = DataStore.class.getDeclaredField("dataFile"); + dfField.setAccessible(true); + Object dataFile = dfField.get(ds); + + Field nioField = dataFile.getClass().getDeclaredField("nioFile"); + nioField.setAccessible(true); + Object nioFile = nioField.get(dataFile); + + Field fcField = nioFile.getClass().getDeclaredField("fc"); + fcField.setAccessible(true); + FileChannel original = (FileChannel) fcField.get(nioFile); + + fcField.set(nioFile, new ChoppyFileChannel(original, partialWriteFrequency)); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + /** + * A FileChannel wrapper that intermittently writes fewer bytes than requested. + */ + static class ChoppyFileChannel extends FileChannel { + private final FileChannel delegate; + private final int frequency; + private int opCount = 0; + + ChoppyFileChannel(FileChannel delegate, int frequency) { + this.delegate = delegate; + this.frequency = Math.max(1, frequency); + } + + @Override + public int write(ByteBuffer src, long position) throws IOException { + opCount++; + if (opCount % frequency == 0) { + int remaining = src.remaining(); + if (remaining > 4) { + int partial = Math.max(1, remaining / 2); + // write only a part + ByteBuffer slice = src.slice(); + slice.limit(partial); + int written = delegate.write(slice, position); + // advance original buffer by 'written' bytes but leave unwritten bytes unflushed + src.position(src.position() + written); + return written; + } + } + return delegate.write(src, position); + } + + @Override + public int write(ByteBuffer src) throws IOException { + opCount++; + if (opCount % frequency == 0) { + int remaining = src.remaining(); + if (remaining > 4) { + int partial = Math.max(1, remaining / 2); + ByteBuffer slice = src.slice(); + slice.limit(partial); + int written = delegate.write(slice); + src.position(src.position() + written); + return written; + } + } + return delegate.write(src); + } + + // Delegations / stubs for other abstract methods + @Override + public int read(ByteBuffer dst) throws IOException { + return delegate.read(dst); + } + + // remove duplicate signature (below there is another read(ByteBuffer,long)) + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + return delegate.read(dsts, offset, length); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + return delegate.write(srcs, offset, length); + } + + @Override + public long position() throws IOException { + return delegate.position(); + } + + @Override + public FileChannel position(long newPosition) throws IOException { + delegate.position(newPosition); + return this; + } + + @Override + public long size() throws IOException { + return delegate.size(); + } + + @Override + public FileChannel truncate(long size) throws IOException { + delegate.truncate(size); + return this; + } + + @Override + public void force(boolean metaData) throws IOException { + delegate.force(metaData); + } + + @Override + public long transferTo(long position, long count, java.nio.channels.WritableByteChannel target) + throws IOException { + return delegate.transferTo(position, count, target); + } + + @Override + public long transferFrom(java.nio.channels.ReadableByteChannel src, long position, long count) + throws IOException { + return delegate.transferFrom(src, position, count); + } + + @Override + public int read(ByteBuffer dst, long position) throws IOException { + return delegate.read(dst, position); + } + + @Override + public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { + return delegate.map(mode, position, size); + } + + @Override + public FileLock lock(long position, long size, boolean shared) throws IOException { + return delegate.lock(position, size, shared); + } + + @Override + public FileLock tryLock(long position, long size, boolean shared) throws IOException { + return delegate.tryLock(position, size, shared); + } + + @Override + protected void implCloseChannel() throws IOException { + delegate.close(); + } + } +} diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataStoreRecoveryTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataStoreRecoveryTest.java index 1c2a9ffab98..9c2d7675796 100644 --- a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataStoreRecoveryTest.java +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/datastore/DataStoreRecoveryTest.java @@ -20,11 +20,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.parallel.Isolated; /** * Tests recovery in DataStore.getData when the stored data length is zero but neighboring ID offsets exist. The * recovery uses the next ID's offset to infer the correct data length. */ +@Isolated public class DataStoreRecoveryTest { @TempDir diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/datastore/HashFileSyncBehaviorTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/datastore/HashFileSyncBehaviorTest.java new file mode 100644 index 00000000000..95d07a6d684 --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/datastore/HashFileSyncBehaviorTest.java @@ -0,0 +1,219 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.nativerdf.datastore; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; + +import org.eclipse.rdf4j.common.io.NioFile; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Verifies that HashFile.sync(boolean) honors the caller's request to force metadata to disk by passing {@code true} to + * the underlying FileChannel#force(boolean) call. This must also hold when HashFile is constructed with + * {@code forceSync=true}. + */ +public class HashFileSyncBehaviorTest { + + @TempDir + File tempDir; + + private File hashFilePath; + + @BeforeEach + public void setup() { + hashFilePath = new File(tempDir, "values.hash"); + } + + @AfterEach + public void tearDown() { + // nothing to close because individual tests close files explicitly when needed + } + + @Test + public void syncFalseForcesFileContent_whenForceSyncDisabled() throws Exception { + try (HashFile hf = new HashFile(hashFilePath, /* forceSync= */ false, /* initialSize= */ 16)) { + TrackingFileChannel tracker = injectTrackingChannel(hf); + + // Act + hf.sync(false); + + // Assert: even without metadata, content must be flushed + assertThat(tracker.forceFalseCount) + .as("HashFile.sync(false) should call force(false) on the underlying channel when forceSync=false") + .isGreaterThan(0); + } + } + + @Test + public void syncTrueHonorsMetadataFlag_whenForceSyncDisabled() throws Exception { + try (HashFile hf = new HashFile(hashFilePath, /* forceSync= */ false, /* initialSize= */ 16)) { + TrackingFileChannel tracker = injectTrackingChannel(hf); + + // Act + hf.sync(true); + + // Assert: must have at least one force(true) call + assertThat(tracker.forceTrueCount) + .as("HashFile.sync(true) should call force(true) on the underlying channel when forceSync=false") + .isGreaterThan(0); + } + } + + @Test + public void syncTrueHonorsMetadataFlag_whenForceSyncEnabled() throws Exception { + try (HashFile hf = new HashFile(hashFilePath, /* forceSync= */ true, /* initialSize= */ 16)) { + TrackingFileChannel tracker = injectTrackingChannel(hf); + + // Act + hf.sync(true); + + // Assert: must have at least one force(true) call + assertThat(tracker.forceTrueCount) + .as("HashFile.sync(true) should call force(true) on the underlying channel when forceSync=true") + .isGreaterThan(0); + } + } + + private static TrackingFileChannel injectTrackingChannel(HashFile hf) throws Exception { + // Access private final NioFile field on HashFile + Field nioFileField = HashFile.class.getDeclaredField("nioFile"); + nioFileField.setAccessible(true); + NioFile nio = (NioFile) nioFileField.get(hf); + + // Access private volatile FileChannel field on NioFile + Field fcField = NioFile.class.getDeclaredField("fc"); + fcField.setAccessible(true); + FileChannel delegate = (FileChannel) fcField.get(nio); + + TrackingFileChannel tracking = new TrackingFileChannel(delegate); + fcField.set(nio, tracking); + return tracking; + } + + /** + * Delegating channel that tracks calls to force(boolean). + */ + static class TrackingFileChannel extends FileChannel { + final FileChannel delegate; + volatile int forceTrueCount = 0; + volatile int forceFalseCount = 0; + + TrackingFileChannel(FileChannel delegate) { + this.delegate = delegate; + } + + @Override + public void force(boolean metaData) throws IOException { + if (metaData) { + forceTrueCount++; + } else { + forceFalseCount++; + } + delegate.force(metaData); + } + + // Delegations for abstract methods / other operations + @Override + public int read(ByteBuffer dst) throws IOException { + return delegate.read(dst); + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + return delegate.read(dsts, offset, length); + } + + @Override + public int write(ByteBuffer src) throws IOException { + return delegate.write(src); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + return delegate.write(srcs, offset, length); + } + + @Override + public long position() throws IOException { + return delegate.position(); + } + + @Override + public FileChannel position(long newPosition) throws IOException { + delegate.position(newPosition); + return this; + } + + @Override + public long size() throws IOException { + return delegate.size(); + } + + @Override + public FileChannel truncate(long size) throws IOException { + delegate.truncate(size); + return this; + } + + @Override + public void implCloseChannel() throws IOException { + delegate.close(); + } + + @Override + public int read(ByteBuffer dst, long position) throws IOException { + return delegate.read(dst, position); + } + + @Override + public int write(ByteBuffer src, long position) throws IOException { + return delegate.write(src, position); + } + + @Override + public long transferTo(long position, long count, java.nio.channels.WritableByteChannel target) + throws IOException { + return delegate.transferTo(position, count, target); + } + + @Override + public long transferFrom(java.nio.channels.ReadableByteChannel src, long position, long count) + throws IOException { + return delegate.transferFrom(src, position, count); + } + + @Override + public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { + return delegate.map(mode, position, size); + } + + @Override + public FileLock lock(long position, long size, boolean shared) throws IOException { + return delegate.lock(position, size, shared); + } + + @Override + public FileLock tryLock(long position, long size, boolean shared) throws IOException { + return delegate.tryLock(position, size, shared); + } + } +} diff --git a/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/datastore/IDFileSyncBehaviorTest.java b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/datastore/IDFileSyncBehaviorTest.java new file mode 100644 index 00000000000..13e48ad329d --- /dev/null +++ b/core/sail/nativerdf/src/test/java/org/eclipse/rdf4j/sail/nativerdf/datastore/IDFileSyncBehaviorTest.java @@ -0,0 +1,166 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.nativerdf.datastore; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; + +import org.eclipse.rdf4j.common.io.NioFile; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Verifies that IDFile.sync(boolean) passes the caller's boolean flag through to FileChannel#force(boolean), ensuring + * metadata can be flushed when requested. + */ +public class IDFileSyncBehaviorTest { + + @TempDir + File tempDir; + + @Test + public void syncTrueHonorsMetadataFlag() throws Exception { + File file = new File(tempDir, "values.id"); + try (IDFile id = new IDFile(file, /* forceSync= */ false)) { + TrackingFileChannel tracker = injectTrackingChannel(id); + + id.sync(true); + + assertThat(tracker.forceTrueCount) + .as("IDFile.sync(true) should call force(true) on the underlying channel") + .isGreaterThan(0); + } + } + + private static TrackingFileChannel injectTrackingChannel(IDFile id) throws Exception { + Field nioFileField = IDFile.class.getDeclaredField("nioFile"); + nioFileField.setAccessible(true); + NioFile nio = (NioFile) nioFileField.get(id); + + Field fcField = NioFile.class.getDeclaredField("fc"); + fcField.setAccessible(true); + FileChannel delegate = (FileChannel) fcField.get(nio); + + TrackingFileChannel tracking = new TrackingFileChannel(delegate); + fcField.set(nio, tracking); + return tracking; + } + + static class TrackingFileChannel extends FileChannel { + final FileChannel delegate; + volatile int forceTrueCount = 0; + volatile int forceFalseCount = 0; + + TrackingFileChannel(FileChannel delegate) { + this.delegate = delegate; + } + + @Override + public void force(boolean metaData) throws java.io.IOException { + if (metaData) { + forceTrueCount++; + } else { + forceFalseCount++; + } + delegate.force(metaData); + } + + @Override + public int read(ByteBuffer dst) throws java.io.IOException { + return delegate.read(dst); + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws java.io.IOException { + return delegate.read(dsts, offset, length); + } + + @Override + public int write(ByteBuffer src) throws java.io.IOException { + return delegate.write(src); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws java.io.IOException { + return delegate.write(srcs, offset, length); + } + + @Override + public long position() throws java.io.IOException { + return delegate.position(); + } + + @Override + public FileChannel position(long newPosition) throws java.io.IOException { + delegate.position(newPosition); + return this; + } + + @Override + public long size() throws java.io.IOException { + return delegate.size(); + } + + @Override + public FileChannel truncate(long size) throws java.io.IOException { + delegate.truncate(size); + return this; + } + + @Override + protected void implCloseChannel() throws java.io.IOException { + delegate.close(); + } + + @Override + public int read(ByteBuffer dst, long position) throws java.io.IOException { + return delegate.read(dst, position); + } + + @Override + public int write(ByteBuffer src, long position) throws java.io.IOException { + return delegate.write(src, position); + } + + @Override + public long transferTo(long position, long count, java.nio.channels.WritableByteChannel target) + throws java.io.IOException { + return delegate.transferTo(position, count, target); + } + + @Override + public long transferFrom(java.nio.channels.ReadableByteChannel src, long position, long count) + throws java.io.IOException { + return delegate.transferFrom(src, position, count); + } + + @Override + public MappedByteBuffer map(MapMode mode, long position, long size) throws java.io.IOException { + return delegate.map(mode, position, size); + } + + @Override + public FileLock lock(long position, long size, boolean shared) throws java.io.IOException { + return delegate.lock(position, size, shared); + } + + @Override + public FileLock tryLock(long position, long size, boolean shared) throws java.io.IOException { + return delegate.tryLock(position, size, shared); + } + } +}