diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 4d06f3b1aab..7e66f370682 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -25,6 +25,7 @@ import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Stream; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.admin.TabletAvailability; @@ -669,4 +670,24 @@ default void removeBulkLoadInProgressFlag(String path) { default ScanServerRefStore scanServerRefs() { throw new UnsupportedOperationException(); } + + record OrphanedCompaction(ExternalCompactionId id, TableId table, String dir) { + } + + /** + * Tracks compactions that were removed from the metadata table but may still be running on + * compactors. The tmp files associated with these compactions can eventually be removed when the + * compaction is no longer running. + */ + interface OrphanedCompactionStore { + Stream list(); + + void add(Collection orphanedCompactions); + + void delete(Collection orphanedCompactions); + } + + default OrphanedCompactionStore orphanedCompactions() { + throw new UnsupportedOperationException(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 39e1c35e3b0..fa7a98ce38f 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -547,4 +547,36 @@ public static String getRowPrefix() { } + /** + * Holds information about compactions that were deleted from tablets metadata by split or merge + * operations. These may still be running and may have tmp files that need to be cleaned up. + */ + public static class OrphanedCompactionSection { + private static final Section section = + new Section(RESERVED_PREFIX + "ocomp", true, RESERVED_PREFIX + "ocomq", false); + + public static Range getRange() { + return section.getRange(); + } + + public static String getRowPrefix() { + return section.getRowPrefix(); + } + + public static Ample.OrphanedCompaction decodeRow(String row) { + String[] fields = row.split("#"); + Preconditions.checkArgument(fields.length == 4); + Preconditions.checkArgument(getRowPrefix().equals(fields[0])); + return new Ample.OrphanedCompaction(ExternalCompactionId.from(fields[1]), + TableId.of(fields[2]), fields[3]); + } + + public static String encodeRow(Ample.OrphanedCompaction rc) { + // put the compaction id first in the row because its uuid will spread out nicely and avoid + // hot spotting + return getRowPrefix() + "#" + rc.id().canonical() + "#" + rc.table().canonical() + "#" + + rc.dir(); + + } + } } diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 149c12e12be..4e842f3681a 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -24,7 +24,6 @@ import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTION_IDS_POOL; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -262,7 +261,7 @@ public static List getCompactionsRunningOnCompactors(ClientContext con } } - public static Collection + public static Set getCompactionIdsRunningOnCompactors(ClientContext context) { final ExecutorService executor = ThreadPools.getServerThreadPools() .getPoolBuilder(COMPACTOR_RUNNING_COMPACTION_IDS_POOL).numCoreThreads(16).build(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/OrphanedCompactionStoreImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/OrphanedCompactionStoreImpl.java new file mode 100644 index 00000000000..4fe09966b90 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/OrphanedCompactionStoreImpl.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metadata; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.OrphanedCompactionSection; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.server.ServerContext; + +import com.google.common.base.Preconditions; + +public class OrphanedCompactionStoreImpl implements Ample.OrphanedCompactionStore { + private final ServerContext context; + + public OrphanedCompactionStoreImpl(ServerContext context) { + this.context = context; + } + + private Stream createStream(String tableName) { + Scanner scanner = null; + try { + scanner = context.createScanner(tableName, Authorizations.EMPTY); + } catch (TableNotFoundException e) { + throw new IllegalStateException(e); + } + scanner.setRange(OrphanedCompactionSection.getRange()); + return scanner.stream().map(e -> e.getKey().getRowData().toString()) + .map(OrphanedCompactionSection::decodeRow).onClose(scanner::close); + } + + @Override + public Stream list() { + return Stream.concat(createStream(Ample.DataLevel.METADATA.metaTable()), + createStream(Ample.DataLevel.USER.metaTable())); + } + + private void write(Collection orphanedCompactions, + Function converter) { + if (orphanedCompactions.isEmpty()) { + return; + } + + Map> byLevel = orphanedCompactions.stream() + .collect(Collectors.groupingBy(rc -> Ample.DataLevel.of(rc.table()))); + // Do not expect the root to split or merge so it should never have this data + Preconditions.checkArgument(!byLevel.containsKey(Ample.DataLevel.ROOT)); + byLevel.forEach((dl, removed) -> { + try (var writer = context.createBatchWriter(dl.metaTable())) { + for (var rc : removed) { + writer.addMutation(converter.apply(rc)); + } + } catch (TableNotFoundException | MutationsRejectedException e) { + throw new IllegalStateException(e); + } + }); + } + + @Override + public void add(Collection orphanedCompactions) { + write(orphanedCompactions, oc -> { + Mutation m = new Mutation(OrphanedCompactionSection.encodeRow(oc)); + m.put("", "", ""); + return m; + }); + + } + + @Override + public void delete(Collection orphanedCompactions) { + write(orphanedCompactions, oc -> { + Mutation m = new Mutation(OrphanedCompactionSection.encodeRow(oc)); + m.putDelete("", ""); + return m; + }); + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index a4eb470c664..ee17d962dee 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -281,6 +281,11 @@ public ScanServerRefStore scanServerRefs() { return scanServerRefStore; } + @Override + public OrphanedCompactionStore orphanedCompactions() { + return new OrphanedCompactionStoreImpl(getContext()); + } + @VisibleForTesting protected ServerContext getContext() { return context; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java index 06755d1807b..7064b12de37 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java @@ -18,11 +18,13 @@ */ package org.apache.accumulo.server.util; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; @@ -31,8 +33,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ServerOpts; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -46,6 +51,7 @@ import org.apache.accumulo.start.spi.CommandGroups; import org.apache.accumulo.start.spi.KeywordExecutable; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +59,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.google.auto.service.AutoService; +import com.google.common.util.concurrent.MoreExecutors; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@ -127,6 +134,7 @@ public static Set findTempFiles(ServerContext context, String tableId) }); } } + LOG.trace("Final set of compaction tmp files after removing active compactions: {}", matches); return matches; } @@ -137,57 +145,99 @@ public static class DeleteStats { public int error = 0; } - public static DeleteStats deleteTempFiles(ServerContext context, Set filesToDelete) - throws InterruptedException { + public static void findTmpFiles(ServerContext ctx, TableId tableId, String dirName, + Set ecidsForTablet, Consumer findConsumer) { + final Collection vols = ctx.getVolumeManager().getVolumes(); + for (Volume vol : vols) { + try { + final String volPath = vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + tableId.canonical() + Path.SEPARATOR + dirName; + final FileSystem fs = vol.getFileSystem(); + for (ExternalCompactionId ecid : ecidsForTablet) { + final String fileSuffix = "_tmp_" + ecid.canonical(); + FileStatus[] files = null; + try { + files = fs.listStatus(new Path(volPath), (path) -> path.getName().endsWith(fileSuffix)); + } catch (FileNotFoundException e) { + LOG.trace("Failed to list tablet dir {}", volPath, e); + } + if (files != null) { + for (FileStatus file : files) { + findConsumer.accept(file.getPath()); + } + } + } + } catch (IOException e) { + LOG.error("Exception deleting compaction tmp files for table: {}", tableId, e); + } + } + } + + private static boolean deleteTmpFile(ServerContext context, Path p) throws IOException { + if (context.getVolumeManager().exists(p)) { + boolean result = context.getVolumeManager().delete(p); + if (result) { + LOG.debug("Removed old temp file {}", p); + } else { + LOG.error("Unable to remove old temp file {}, operation returned false with no exception", + p); + } + return result; + } + return true; + } + + public static DeleteStats deleteTempFiles(ServerContext context, Set filesToDelete) { + + final ExecutorService delSvc; + if (filesToDelete.size() < 4) { + // Do not bother creating a thread pool and threads for a few files. + delSvc = MoreExecutors.newDirectExecutorService(); + } else { + delSvc = Executors.newFixedThreadPool(8); + } - final ExecutorService delSvc = Executors.newFixedThreadPool(8); - final List> futures = new ArrayList<>(filesToDelete.size()); final DeleteStats stats = new DeleteStats(); + // use a linked list to make removal from the middle of the list quick + final List> futures = new LinkedList<>(); + filesToDelete.forEach(p -> { - futures.add(delSvc.submit(() -> { - if (context.getVolumeManager().exists(p)) { - boolean result = context.getVolumeManager().delete(p); - if (result) { - LOG.debug("Removed old temp file {}", p); - } else { - LOG.error( - "Unable to remove old temp file {}, operation returned false with no exception", p); - } - return result; - } - return true; - })); + futures.add(delSvc.submit(() -> deleteTmpFile(context, p))); }); delSvc.shutdown(); - int expectedResponses = filesToDelete.size(); - while (expectedResponses > 0) { - Iterator> iter = futures.iterator(); - while (iter.hasNext()) { - Future future = iter.next(); - if (future.isDone()) { - expectedResponses--; - iter.remove(); - try { - if (future.get()) { - stats.success++; - } else { - stats.failure++; + try { + int expectedResponses = filesToDelete.size(); + while (expectedResponses > 0) { + Iterator> iter = futures.iterator(); + while (iter.hasNext()) { + Future future = iter.next(); + if (future.isDone()) { + expectedResponses--; + iter.remove(); + try { + if (future.get()) { + stats.success++; + } else { + stats.failure++; + } + } catch (ExecutionException e) { + stats.error++; + LOG.error("Error deleting a compaction tmp file", e); } - } catch (ExecutionException e) { - stats.error++; - LOG.error("Error deleting a compaction tmp file", e); } } + if (expectedResponses > 0) { + LOG.debug("Waiting on {} background delete operations", expectedResponses); + UtilWaitThread.sleep(1_000); + } } - LOG.debug("Waiting on {} background delete operations", expectedResponses); - if (expectedResponses > 0) { - UtilWaitThread.sleep(3_000); - } + delSvc.awaitTermination(10, TimeUnit.MINUTES); + return stats; + } catch (InterruptedException e) { + throw new IllegalStateException(e); } - delSvc.awaitTermination(10, TimeUnit.MINUTES); - return stats; } public FindCompactionTmpFiles() { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index cf157fe8ca6..fe36b4a3c96 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -93,8 +93,6 @@ import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; -import org.apache.accumulo.core.metadata.schema.TabletsMetadata; -import org.apache.accumulo.core.metadata.schema.filters.HasExternalCompactionsFilter; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.spi.compaction.CompactionJob; @@ -108,7 +106,6 @@ import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; -import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction; import org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData; @@ -122,8 +119,8 @@ import org.apache.accumulo.server.compaction.CompactionPluginUtils; import org.apache.accumulo.server.security.AuditedSecurityOperation; import org.apache.accumulo.server.tablets.TabletNameGenerator; +import org.apache.accumulo.server.util.FindCompactionTmpFiles; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -765,6 +762,23 @@ void compactionsFailed(Map compactions) { void compactionFailedForLevel(Map> compactions) { + // CompactionFailed is called from the Compactor when either a compaction fails or is cancelled + // and it's called from the DeadCompactionDetector. Remove compaction tmp files from the tablet + // directory that have a corresponding ecid in the name. Must delete any tmp files before + // removing compaction entry from metadata table. This ensures that in the event of process + // death that the dead compaction will be detected in the future and the files removed then. + try (var tablets = ctx.getAmple().readTablets() + .forTablets(compactions.keySet(), Optional.empty()).fetch(ColumnType.DIR).build()) { + Set tmpFilesToDelete = new HashSet<>(); + for (TabletMetadata tm : tablets) { + var extent = tm.getExtent(); + var ecidsForTablet = compactions.get(extent); + FindCompactionTmpFiles.findTmpFiles(ctx, extent.tableId(), tm.getDirName(), ecidsForTablet, + tmpFilesToDelete::add); + } + FindCompactionTmpFiles.deleteTempFiles(ctx, tmpFilesToDelete); + } + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { compactions.forEach((extent, ecids) -> { try { @@ -791,78 +805,18 @@ public boolean test(TabletMetadata tabletMetadata) { } }); - final List ecidsForTablet = new ArrayList<>(); tabletsMutator.process().forEach((extent, result) -> { if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { - // this should try again later when the dead compaction detector runs, lets log it in case // its a persistent problem if (LOG.isDebugEnabled()) { LOG.debug("Unable to remove failed compaction {} {}", extent, compactions.get(extent)); } - } else { - // compactionFailed is called from the Compactor when either a compaction fails or - // is cancelled and it's called from the DeadCompactionDetector. This block is - // entered when the conditional mutator above successfully deletes an ecid from - // the tablet metadata. Remove compaction tmp files from the tablet directory - // that have a corresponding ecid in the name. - - ecidsForTablet.clear(); - ecidsForTablet.addAll(compactions.get(extent)); - - if (!ecidsForTablet.isEmpty()) { - final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR); - if (tm != null) { - final Collection vols = ctx.getVolumeManager().getVolumes(); - for (Volume vol : vols) { - try { - final String volPath = - vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR - + extent.tableId().canonical() + Path.SEPARATOR + tm.getDirName(); - final FileSystem fs = vol.getFileSystem(); - for (ExternalCompactionId ecid : ecidsForTablet) { - final String fileSuffix = "_tmp_" + ecid.canonical(); - FileStatus[] files = null; - try { - files = fs.listStatus(new Path(volPath), - (path) -> path.getName().endsWith(fileSuffix)); - } catch (FileNotFoundException e) { - LOG.trace("Failed to list tablet dir {}", volPath, e); - } - if (files != null) { - for (FileStatus file : files) { - if (!fs.delete(file.getPath(), false)) { - LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath()); - } else { - LOG.debug("Deleted ecid tmp file: {}", file.getPath()); - } - } - } - } - } catch (IOException e) { - LOG.error("Exception deleting compaction tmp files for tablet: {}", extent, e); - } - } - } else { - // TabletMetadata does not exist for the extent. This could be due to a merge or - // split operation. Use the utility to find tmp files at the table level - deadCompactionDetector.addTableId(extent.tableId()); - } - } } }); } } - protected Set readExternalCompactionIds() { - try (TabletsMetadata tabletsMetadata = - this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER) - .filter(new HasExternalCompactionsFilter()).fetch(ECOMP).build()) { - return tabletsMetadata.stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream()) - .collect(Collectors.toSet()); - } - } - /* Method exists to be called from test */ public CompactionJobQueues getJobQueues() { return jobQueues; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java index ce04296a615..7f09a3efbcb 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java @@ -18,11 +18,13 @@ */ package org.apache.accumulo.manager.compaction.coordinator; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -33,11 +35,11 @@ import java.util.stream.Stream; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateClient; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -49,7 +51,6 @@ import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.util.FindCompactionTmpFiles; -import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +63,6 @@ public class DeadCompactionDetector { private final CompactionCoordinator coordinator; private final ScheduledThreadPoolExecutor schedExecutor; private final ConcurrentHashMap deadCompactions; - private final Set tablesWithUnreferencedTmpFiles = new HashSet<>(); private final Function> fateClients; public DeadCompactionDetector(ServerContext context, CompactionCoordinator coordinator, @@ -75,12 +75,6 @@ public DeadCompactionDetector(ServerContext context, CompactionCoordinator coord this.fateClients = fateClients; } - public void addTableId(TableId tableWithUnreferencedTmpFiles) { - synchronized (tablesWithUnreferencedTmpFiles) { - tablesWithUnreferencedTmpFiles.add(tableWithUnreferencedTmpFiles); - } - } - private void detectDeadCompactions() { /* @@ -162,6 +156,35 @@ private void detectDeadCompactions() { }); } + // Get the list of compaction entries that were removed from the metadata table by a split or + // merge operation. Must get this data before getting the running set of compactions. + List orphanedCompactions; + try (Stream listing = + context.getAmple().orphanedCompactions().list()) { + orphanedCompactions = listing.collect(Collectors.toCollection(ArrayList::new)); + } + + // Must get the set of running compactions after reading compaction ids from the metadata table + Set running = null; + if (!orphanedCompactions.isEmpty() || !tabletCompactions.isEmpty()) { + running = ExternalCompactionUtil.getCompactionIdsRunningOnCompactors(context); + } + + // Delete any tmp files related to compaction metadata entries that were removed by split or + // merge and are no longer running. + if (!orphanedCompactions.isEmpty()) { + var runningSet = Objects.requireNonNull(running); + orphanedCompactions.removeIf(rc -> runningSet.contains(rc.id())); + Set tmpFilesToDelete = new HashSet<>(); + orphanedCompactions.forEach(rc -> { + log.trace("attempting to find tmp files for removed compaction {}", rc); + FindCompactionTmpFiles.findTmpFiles(context, rc.table(), rc.dir(), Set.of(rc.id()), + tmpFilesToDelete::add); + }); + FindCompactionTmpFiles.deleteTempFiles(context, tmpFilesToDelete); + context.getAmple().orphanedCompactions().delete(orphanedCompactions); + } + if (tabletCompactions.isEmpty()) { // Clear out dead compactions, tservers don't think anything is running log.trace("Clearing the dead compaction map, no tablets have compactions running"); @@ -183,9 +206,6 @@ private void detectDeadCompactions() { // In order for this overall algorithm to be correct and avoid race conditions, the compactor // must return ids covering the time period from before reservation until after commit. If the // ids do not cover this time period then legitimate running compactions could be canceled. - Collection running = - ExternalCompactionUtil.getCompactionIdsRunningOnCompactors(context); - running.forEach(ecid -> { if (tabletCompactions.remove(ecid) != null) { log.debug("Ignoring compaction {} that is running on a compactor", ecid); @@ -230,37 +250,6 @@ private void detectDeadCompactions() { coordinator.compactionsFailed(tabletCompactions); this.deadCompactions.keySet().removeAll(toFail); } - - // Find and delete compaction tmp files that are unreferenced - if (!tablesWithUnreferencedTmpFiles.isEmpty()) { - - Set copy = new HashSet<>(); - synchronized (tablesWithUnreferencedTmpFiles) { - copy.addAll(tablesWithUnreferencedTmpFiles); - tablesWithUnreferencedTmpFiles.clear(); - } - - log.debug("Tables that may have unreferenced compaction tmp files: {}", copy); - for (TableId tid : copy) { - try { - final Set matches = FindCompactionTmpFiles.findTempFiles(context, tid.canonical()); - log.debug("Found the following compaction tmp files for table {}:", tid); - matches.forEach(p -> log.debug("{}", p)); - - if (!matches.isEmpty()) { - log.debug("Deleting compaction tmp files for table {}...", tid); - DeleteStats stats = FindCompactionTmpFiles.deleteTempFiles(context, matches); - log.debug( - "Deletion of compaction tmp files for table {} complete. Success:{}, Failure:{}, Error:{}", - tid, stats.success, stats.failure, stats.error); - } - } catch (InterruptedException e) { - log.error("Interrupted while finding compaction tmp files for table: {}", tid.canonical(), - e); - } - } - } - } public void start() { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java index 8e8241fb92d..d184b39cb4e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java @@ -75,6 +75,7 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { Map newFiles = new HashMap<>(); TabletMetadata firstTabletMeta = null; TabletMetadata lastTabletMeta = null; + List orphanedCompactions = new ArrayList<>(); try (var tabletsMetadata = env.getContext().getAmple().readTablets().forTable(range.tableId()) .overlapping(range.prevEndRow(), range.endRow()).build()) { @@ -141,6 +142,19 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { dirs.clear(); } } + + // These compaction metadata entries will be deleted, queue up removal of the tmp file once + // the compaction is no longer running + tabletMeta.getExternalCompactions().keySet().stream() + .map(ecid -> new Ample.OrphanedCompaction(ecid, tabletMeta.getExtent().tableId(), + tabletMeta.getDirName())) + .forEach(orphanedCompactions::add); + if (orphanedCompactions.size() > 1000 && tabletsSeen > 1) { + orphanedCompactions + .forEach(rc -> log.trace("{} adding removed compaction {}", fateId, rc)); + env.getContext().getAmple().orphanedCompactions().add(orphanedCompactions); + orphanedCompactions.clear(); + } } if (tabletsSeen == 1) { @@ -154,6 +168,9 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { lastTabletMeta); } + orphanedCompactions.forEach(rc -> log.trace("{} adding removed compaction {}", fateId, rc)); + env.getContext().getAmple().orphanedCompactions().add(orphanedCompactions); + log.info("{} merge low tablet {}", fateId, firstTabletMeta.getExtent()); log.info("{} merge high tablet {}", fateId, lastTabletMeta.getExtent()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index c12386a3721..f4d2d6df00f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -266,6 +266,16 @@ private void addNewTablets(FateId fateId, FateEnv env, TabletMetadata tabletMeta private void updateExistingTablet(FateId fateId, ServerContext ctx, TabletMetadata tabletMetadata, TabletOperationId opid, NavigableMap newTablets, Map> newTabletsFiles) { + + // queue up the tmp files related to these compaction metadata entries to be eventually deleted + // once the compaction is no longer running + var removedCompactions = tabletMetadata.getExternalCompactions().keySet().stream() + .map(ecid -> new Ample.OrphanedCompaction(ecid, tabletMetadata.getExtent().tableId(), + tabletMetadata.getDirName())) + .toList(); + removedCompactions.forEach(rc -> log.trace("{} adding removed compaction {}", fateId, rc)); + ctx.getAmple().orphanedCompactions().add(removedCompactions); + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { var newExtent = newTablets.navigableKeySet().last(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java index 7bd6824f2f6..002ffbeea80 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java @@ -44,11 +44,15 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.UNSPLITTABLE; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; import static org.apache.accumulo.manager.tableOps.split.UpdateTabletsTest.newSTF; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.OptionalLong; @@ -56,6 +60,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.stream.Stream; import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.data.NamespaceId; @@ -202,9 +207,12 @@ public void testManyColumns() throws Exception { EasyMock.expect(lastTabletMeta.getUnSplittable()).andReturn(unsplittableMeta).atLeastOnce(); EasyMock.expect(lastTabletMeta.getTabletMergeability()).andReturn(mergeability).atLeastOnce(); EasyMock.expect(lastTabletMeta.getMigration()).andReturn(migration).atLeastOnce(); + EasyMock.expect(lastTabletMeta.getDirName()).andReturn("td3").atLeastOnce(); EasyMock.replay(lastTabletMeta, compactions); + Set orphanedCompactions = new HashSet<>(); + testMerge(List.of(tablet1, tablet2, lastTabletMeta), tableId, null, null, tabletMutator -> { EasyMock.expect(tabletMutator.putTime(MetadataTime.parse("L30"))).andReturn(tabletMutator) .once(); @@ -243,9 +251,13 @@ public void testManyColumns() throws Exception { .andReturn(tabletMutator).once(); EasyMock.expect(tabletMutator.deleteMigration()).andReturn(tabletMutator); - }); + }, orphanedCompactions::add); EasyMock.verify(lastTabletMeta, compactions); + + assertEquals(Set.of(new Ample.OrphanedCompaction(cid1, tableId, "td3"), + new Ample.OrphanedCompaction(cid2, tableId, "td3"), + new Ample.OrphanedCompaction(cid3, tableId, "td3")), orphanedCompactions); } @Test @@ -420,6 +432,12 @@ public void testTime() throws Exception { private static void testMerge(List inputTablets, TableId tableId, String start, String end, Consumer expectationsSetter) throws Exception { + testMerge(inputTablets, tableId, start, end, expectationsSetter, removedCompaction -> fail()); + } + + private static void testMerge(List inputTablets, TableId tableId, String start, + String end, Consumer expectationsSetter, + Consumer orphanedCompactionConsumer) throws Exception { MergeInfo mergeInfo = new MergeInfo(tableId, NamespaceId.of("1"), start == null ? null : start.getBytes(UTF_8), end == null ? null : end.getBytes(UTF_8), MergeInfo.Operation.MERGE); @@ -434,6 +452,24 @@ private static void testMerge(List inputTablets, TableId tableId EasyMock.mock(ConditionalTabletsMutatorImpl.class); ConditionalTabletMutatorImpl tabletMutator = EasyMock.mock(ConditionalTabletMutatorImpl.class); + Ample.OrphanedCompactionStore orphanedCompactionStore = new Ample.OrphanedCompactionStore() { + @Override + public Stream list() { + throw new UnsupportedOperationException(); + } + + @Override + public void add(Collection removedCompactions) { + removedCompactions.forEach(orphanedCompactionConsumer); + } + + @Override + public void delete(Collection removedCompactions) { + throw new UnsupportedOperationException(); + } + }; + EasyMock.expect(ample.orphanedCompactions()).andReturn(orphanedCompactionStore); + ServiceLock managerLock = EasyMock.mock(ServiceLock.class); EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java index dad40a0cb13..13ad3bde70e 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java @@ -24,7 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.OptionalLong; @@ -37,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.client.admin.TabletMergeability; @@ -250,6 +253,24 @@ public void testManyColumns() throws Exception { EasyMock.expect(fateEnv.getFileRangeCache()).andReturn(fileRangeCache).atLeastOnce(); EasyMock.expect(fateEnv.getSteadyTime()).andReturn(SteadyTime.from(100_000, TimeUnit.SECONDS)) .atLeastOnce(); + Set orphanedCompactionSet = new HashSet<>(); + Ample.OrphanedCompactionStore ocs = new Ample.OrphanedCompactionStore() { + @Override + public Stream list() { + throw new UnsupportedOperationException(); + } + + @Override + public void add(Collection removedCompactions) { + orphanedCompactionSet.addAll(removedCompactions); + } + + @Override + public void delete(Collection removedCompactions) { + throw new UnsupportedOperationException(); + } + }; + EasyMock.expect(ample.orphanedCompactions()).andReturn(ocs).atLeastOnce(); ServiceLock managerLock = EasyMock.mock(ServiceLock.class); EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes(); @@ -289,6 +310,7 @@ public void testManyColumns() throws Exception { UnSplittableMetadata.toUnSplittable(origExtent, 1000, 1001, 1002, tabletFiles.keySet()); EasyMock.expect(tabletMeta.getUnSplittable()).andReturn(usm).atLeastOnce(); EasyMock.expect(tabletMeta.getMigration()).andReturn(migration).atLeastOnce(); + EasyMock.expect(tabletMeta.getDirName()).andReturn("td1").atLeastOnce(); EasyMock.expect(ample.readTablet(origExtent)).andReturn(tabletMeta); @@ -408,6 +430,10 @@ public void testManyColumns() throws Exception { EasyMock.verify(fateEnv, context, ample, tabletMeta, fileRangeCache, tabletsMutator, tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions); + + assertEquals(Set.of(new Ample.OrphanedCompaction(cid1, tableId, "td1"), + new Ample.OrphanedCompaction(cid2, tableId, "td1"), + new Ample.OrphanedCompaction(cid3, tableId, "td1")), orphanedCompactionSet); } @Test diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction2ITBase.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction2ITBase.java index ad7b19325bb..fd4d228f6ce 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction2ITBase.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction2ITBase.java @@ -138,7 +138,7 @@ public void testSplitCancelsExternalCompaction() throws Exception { // Verify that the tmp file are cleaned up Wait.waitFor(() -> FindCompactionTmpFiles - .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 0, 60_000); + .findTempFiles(getCluster().getServerContext(), tid.canonical()).isEmpty(), 60_000); } } @@ -200,7 +200,7 @@ public void testUserCompactionCancellation() throws Exception { // Verify that the tmp file are cleaned up Wait.waitFor(() -> FindCompactionTmpFiles - .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 0); + .findTempFiles(getCluster().getServerContext(), tid.canonical()).isEmpty()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java index 0101865b3fa..b011f3fb9f1 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java @@ -150,14 +150,16 @@ public void testMergeCancelsExternalCompaction() throws Exception { .collect(Collectors.toSet()); } } - // We need to cancel the compaction or delete the table here because we initiate a user - // compaction above in the test. Even though the external compaction was cancelled - // because we split the table, FaTE will continue to queue up a compaction - client.tableOperations().delete(table1); // Verify that the tmp file are cleaned up Wait.waitFor(() -> FindCompactionTmpFiles - .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 0); + .findTempFiles(getCluster().getServerContext(), tid.canonical()).isEmpty()); + + // We need to cancel the compaction or delete the table here because we initiate a user + // compaction above in the test. Even though the external compaction was cancelled + // because we split the table, FaTE would continue to queue up a compaction + client.tableOperations().cancelCompaction(table1); + } finally { getCluster().getClusterControl().stop(ServerType.COMPACTOR); getCluster().getClusterControl().start(ServerType.COMPACTOR);