Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.admin.TabletAvailability;
Expand Down Expand Up @@ -669,4 +670,24 @@ default void removeBulkLoadInProgressFlag(String path) {
default ScanServerRefStore scanServerRefs() {
throw new UnsupportedOperationException();
}

record RemovedCompaction(ExternalCompactionId id, TableId table, String dir) {
};

/**
* Tracks compactions that were removed from the metadata table but may still be running on
* compactors. The tmp files associated with these compactions can eventually be removed when the
* compaction is no longer running.
*/
interface RemovedCompactionStore {
Stream<RemovedCompaction> list();

void add(Collection<RemovedCompaction> removedCompactions);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a few minutes to understand what a RemovedCompaction represented. My understanding of these changes are that when a tablet is merged or split, the ECID entries in the tablet metadata for the tablets involved are removed. However, the compaction is likely running on a Compactor. Is that right?

I was having trouble understanding the context given the name. I wonder if a different name might better reflect the situation. The compaction itself is not removed, it's OBE. It's been orphaned from it's parent tablet or it's like a dangling ref in a database. Would OrphanedCompaction be a better name?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrphanedCompaction is a much better name, will change to that. Used removed in the name because it was removed from the metadata table.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that right?

Yes that is all correct.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed in db57334


void delete(Collection<RemovedCompaction> removedCompactions);
}

default RemovedCompactionStore removedCompactions() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -547,4 +547,36 @@ public static String getRowPrefix() {

}

/**
* Holds information about compactions that were deleted from tablets metadata by split or merge
* operations. These may have tmp files that need to be cleaned up.
*/
public static class RemovedCompactionSection {
private static final Section section =
new Section(RESERVED_PREFIX + "rcomp", true, RESERVED_PREFIX + "rcomq", false);

public static Range getRange() {
return section.getRange();
}

public static String getRowPrefix() {
return section.getRowPrefix();
}

public static Ample.RemovedCompaction decodeRow(String row) {
String[] fields = row.split("#");
Preconditions.checkArgument(fields.length == 4);
Preconditions.checkArgument(getRowPrefix().equals(fields[0]));
return new Ample.RemovedCompaction(ExternalCompactionId.from(fields[1]),
TableId.of(fields[2]), fields[3]);
}

public static String encodeRow(Ample.RemovedCompaction rc) {
// put the compaction id first in the row because its uuid will spread out nicely and avoid
// hot spotting
return getRowPrefix() + "#" + rc.id().canonical() + "#" + rc.table().canonical() + "#"
+ rc.dir();

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTION_IDS_POOL;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -262,7 +261,7 @@ public static List<ServerId> getCompactionsRunningOnCompactors(ClientContext con
}
}

public static Collection<ExternalCompactionId>
public static Set<ExternalCompactionId>
getCompactionIdsRunningOnCompactors(ClientContext context) {
final ExecutorService executor = ThreadPools.getServerThreadPools()
.getPoolBuilder(COMPACTOR_RUNNING_COMPACTION_IDS_POOL).numCoreThreads(16).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.metadata;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.RemovedCompactionSection;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.server.ServerContext;

import com.google.common.base.Preconditions;

public class RemovedCompactionStoreImpl implements Ample.RemovedCompactionStore {
private final ServerContext context;

public RemovedCompactionStoreImpl(ServerContext context) {
this.context = context;
}

private Stream<Ample.RemovedCompaction> createStream(String tableName) {
Scanner scanner = null;
try {
scanner = context.createScanner(tableName, Authorizations.EMPTY);
} catch (TableNotFoundException e) {
throw new IllegalStateException(e);
}
scanner.setRange(RemovedCompactionSection.getRange());
return scanner.stream().map(e -> e.getKey().getRowData().toString())
.map(RemovedCompactionSection::decodeRow).onClose(scanner::close);
}

@Override
public Stream<Ample.RemovedCompaction> list() {
return Stream.concat(createStream(Ample.DataLevel.METADATA.metaTable()),
createStream(Ample.DataLevel.USER.metaTable()));
}

private void write(Collection<Ample.RemovedCompaction> removedCompactions,
Function<Ample.RemovedCompaction,Mutation> converter) {
if (removedCompactions.isEmpty()) {
return;
}

Map<Ample.DataLevel,List<Ample.RemovedCompaction>> byLevel = removedCompactions.stream()
.collect(Collectors.groupingBy(rc -> Ample.DataLevel.of(rc.table())));
// Do not expect the root to split or merge so it should never have this data
Preconditions.checkArgument(!byLevel.containsKey(Ample.DataLevel.ROOT));
byLevel.forEach((dl, removed) -> {
try (var writer = context.createBatchWriter(dl.metaTable())) {
for (var rc : removed) {
writer.addMutation(converter.apply(rc));
}
} catch (TableNotFoundException | MutationsRejectedException e) {
throw new IllegalStateException(e);
}
});
}

@Override
public void add(Collection<Ample.RemovedCompaction> removedCompactions) {
write(removedCompactions, rc -> {
Mutation m = new Mutation(RemovedCompactionSection.encodeRow(rc));
m.put("", "", "");
return m;
});

}

@Override
public void delete(Collection<Ample.RemovedCompaction> removedCompactions) {
write(removedCompactions, rc -> {
Mutation m = new Mutation(RemovedCompactionSection.encodeRow(rc));
m.putDelete("", "");
return m;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ public ScanServerRefStore scanServerRefs() {
return scanServerRefStore;
}

@Override
public RemovedCompactionStore removedCompactions() {
return new RemovedCompactionStoreImpl(getContext());
}

@VisibleForTesting
protected ServerContext getContext() {
return context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.accumulo.server.util;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
Expand All @@ -31,8 +33,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ServerOpts;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
Expand All @@ -46,13 +51,15 @@
import org.apache.accumulo.start.spi.CommandGroups;
import org.apache.accumulo.start.spi.KeywordExecutable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.auto.service.AutoService;
import com.google.common.util.concurrent.MoreExecutors;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
Expand Down Expand Up @@ -127,6 +134,7 @@ public static Set<Path> findTempFiles(ServerContext context, String tableId)
});
}
}

LOG.trace("Final set of compaction tmp files after removing active compactions: {}", matches);
return matches;
}
Expand All @@ -137,57 +145,99 @@ public static class DeleteStats {
public int error = 0;
}

public static DeleteStats deleteTempFiles(ServerContext context, Set<Path> filesToDelete)
throws InterruptedException {
public static void findTmpFiles(ServerContext ctx, TableId tableId, String dirName,
Set<ExternalCompactionId> ecidsForTablet, Consumer<Path> findConsumer) {
final Collection<Volume> vols = ctx.getVolumeManager().getVolumes();
for (Volume vol : vols) {
try {
final String volPath = vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+ tableId.canonical() + Path.SEPARATOR + dirName;
final FileSystem fs = vol.getFileSystem();
for (ExternalCompactionId ecid : ecidsForTablet) {
final String fileSuffix = "_tmp_" + ecid.canonical();
FileStatus[] files = null;
try {
files = fs.listStatus(new Path(volPath), (path) -> path.getName().endsWith(fileSuffix));
} catch (FileNotFoundException e) {
LOG.trace("Failed to list tablet dir {}", volPath, e);
}
if (files != null) {
for (FileStatus file : files) {
findConsumer.accept(file.getPath());
}
}
}
} catch (IOException e) {
LOG.error("Exception deleting compaction tmp files for table: {}", tableId, e);
}
}
}

private static boolean deleteTmpFile(ServerContext context, Path p) throws IOException {
if (context.getVolumeManager().exists(p)) {
boolean result = context.getVolumeManager().delete(p);
if (result) {
LOG.debug("Removed old temp file {}", p);
} else {
LOG.error("Unable to remove old temp file {}, operation returned false with no exception",
p);
}
return result;
}
return true;
}

public static DeleteStats deleteTempFiles(ServerContext context, Set<Path> filesToDelete) {

final ExecutorService delSvc;
if (filesToDelete.size() < 4) {
// Do not bother creating a thread pool and threads for a few files.
delSvc = MoreExecutors.newDirectExecutorService();
} else {
delSvc = Executors.newFixedThreadPool(8);
}

final ExecutorService delSvc = Executors.newFixedThreadPool(8);
final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
final DeleteStats stats = new DeleteStats();

// use a linked list to make removal from the middle of the list quick
final List<Future<Boolean>> futures = new LinkedList<>();

filesToDelete.forEach(p -> {
futures.add(delSvc.submit(() -> {
if (context.getVolumeManager().exists(p)) {
boolean result = context.getVolumeManager().delete(p);
if (result) {
LOG.debug("Removed old temp file {}", p);
} else {
LOG.error(
"Unable to remove old temp file {}, operation returned false with no exception", p);
}
return result;
}
return true;
}));
futures.add(delSvc.submit(() -> deleteTmpFile(context, p)));
});
delSvc.shutdown();

int expectedResponses = filesToDelete.size();
while (expectedResponses > 0) {
Iterator<Future<Boolean>> iter = futures.iterator();
while (iter.hasNext()) {
Future<Boolean> future = iter.next();
if (future.isDone()) {
expectedResponses--;
iter.remove();
try {
if (future.get()) {
stats.success++;
} else {
stats.failure++;
try {
int expectedResponses = filesToDelete.size();
while (expectedResponses > 0) {
Iterator<Future<Boolean>> iter = futures.iterator();
while (iter.hasNext()) {
Future<Boolean> future = iter.next();
if (future.isDone()) {
expectedResponses--;
iter.remove();
try {
if (future.get()) {
stats.success++;
} else {
stats.failure++;
}
} catch (ExecutionException e) {
stats.error++;
LOG.error("Error deleting a compaction tmp file", e);
}
} catch (ExecutionException e) {
stats.error++;
LOG.error("Error deleting a compaction tmp file", e);
}
}
if (expectedResponses > 0) {
LOG.debug("Waiting on {} background delete operations", expectedResponses);
UtilWaitThread.sleep(1_000);
}
}
LOG.debug("Waiting on {} background delete operations", expectedResponses);
if (expectedResponses > 0) {
UtilWaitThread.sleep(3_000);
}
delSvc.awaitTermination(10, TimeUnit.MINUTES);
return stats;
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
delSvc.awaitTermination(10, TimeUnit.MINUTES);
return stats;
}

public FindCompactionTmpFiles() {
Expand Down
Loading