Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ public Result getResult() {
return result;
}

@Override
public String toString() {
return "server:" + server + " result:" + result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.spi.scan.ScanServerAttempt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,14 +50,31 @@ public class ScanServerAttemptsImpl {

ScanServerAttemptReporter createReporter(String server, TabletId tablet) {
return result -> {
LOG.trace("Received result: {}", result);
LOG.trace("Received result: {} {} {}", result, tablet, server);
synchronized (attempts) {
attempts.computeIfAbsent(tablet, k -> new ArrayList<>())
.add(new ScanServerAttemptImpl(result, server));
}
};
}

public interface BatchAttemptReporter {
void report(Set<KeyExtent> extents, ScanServerAttempt.Result result);
}

BatchAttemptReporter createReporter(String server) {
return (tablets, result) -> {
LOG.trace("Received result: {} {} {}", result, tablets, server);
synchronized (attempts) {
var attempt = new ScanServerAttemptImpl(result, server);
tablets.forEach(extent -> {
var tablet = new TabletIdImpl(extent);
attempts.computeIfAbsent(tablet, k -> new ArrayList<>()).add(attempt);
});
}
};
}

/**
* Creates and returns a snapshot of {@link ScanServerAttempt} objects that were added before this
* call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TimedOutException;
import org.apache.accumulo.core.clientImpl.ScanServerAttemptsImpl.BatchAttemptReporter;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
Expand Down Expand Up @@ -365,12 +366,12 @@ private class QueryTask implements Runnable {
private final List<Column> columns;
private int semaphoreSize;
private final long busyTimeout;
private final ScanServerAttemptReporter reporter;
private final BatchAttemptReporter reporter;
private final Duration scanServerSelectorDelay;

QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges,
Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns,
long busyTimeout, ScanServerAttemptReporter reporter, Duration scanServerSelectorDelay) {
long busyTimeout, BatchAttemptReporter reporter, Duration scanServerSelectorDelay) {
this.tsLocation = tsLocation;
this.tabletsRanges = tabletsRanges;
this.receiver = receiver;
Expand Down Expand Up @@ -401,6 +402,10 @@ public void run() {
options, authorizations, timeoutTracker, busyTimeout);

if (!tsFailures.isEmpty()) {
// On scan servers routine failures that occur on tservers, like not serving tablet or a
// tablet closing, are not expected. So for scan server record any failures seen as an
// error.
reporter.report(tsFailures.keySet(), ScanServerAttempt.Result.ERROR);
locator.invalidateCache(tsFailures.keySet());
synchronized (failures) {
failures.putAll(tsFailures);
Expand All @@ -422,7 +427,7 @@ public void run() {
if (e.getCause() instanceof ScanServerBusyException) {
result = ScanServerAttempt.Result.BUSY;
}
reporter.report(result);
reporter.report(tabletsRanges.keySet(), result);
} catch (AccumuloSecurityException e) {
e.setTableInfo(getTableInfo());
log.debug("AccumuloSecurityException thrown", e);
Expand Down Expand Up @@ -496,7 +501,6 @@ public void run() {
}
}
}

}

private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
Expand All @@ -506,7 +510,7 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,

long busyTimeout = 0;
Duration scanServerSelectorDelay = null;
Map<String,ScanServerAttemptReporter> reporters = Map.of();
Map<String,BatchAttemptReporter> reporters = Map.of();

if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
var scanServerData = rebinToScanServers(binnedRanges, startTime);
Expand Down Expand Up @@ -564,7 +568,7 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns,
busyTimeout, reporters.getOrDefault(tsLocation, r -> {}), scanServerSelectorDelay);
busyTimeout, reporters.getOrDefault(tsLocation, (t, r) -> {}), scanServerSelectorDelay);
queryTasks.add(queryTask);
} else {
HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<>();
Expand All @@ -573,15 +577,16 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
if (tabletSubset.size() >= maxTabletsPerRequest) {
QueryTask queryTask =
new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, busyTimeout,
reporters.getOrDefault(tsLocation, r -> {}), scanServerSelectorDelay);
reporters.getOrDefault(tsLocation, (t, r) -> {}), scanServerSelectorDelay);
queryTasks.add(queryTask);
tabletSubset = new HashMap<>();
}
}

if (!tabletSubset.isEmpty()) {
QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, columns,
busyTimeout, reporters.getOrDefault(tsLocation, r -> {}), scanServerSelectorDelay);
QueryTask queryTask =
new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, busyTimeout,
reporters.getOrDefault(tsLocation, (t, r) -> {}), scanServerSelectorDelay);
queryTasks.add(queryTask);
}
}
Expand All @@ -599,7 +604,7 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
private static class ScanServerData {
Map<String,Map<KeyExtent,List<Range>>> binnedRanges;
ScanServerSelections actions;
Map<String,ScanServerAttemptReporter> reporters;
Map<String,BatchAttemptReporter> reporters;
}

private ScanServerData rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
Expand Down Expand Up @@ -652,7 +657,7 @@ public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWa

Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>();

Map<String,ScanServerAttemptReporter> reporters = new HashMap<>();
Map<String,BatchAttemptReporter> reporters = new HashMap<>();

for (TabletIdImpl tabletId : tabletIds) {
KeyExtent extent = tabletId.toKeyExtent();
Expand All @@ -672,7 +677,7 @@ public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWa
rangeMap.put(extent, ranges);

var server = serverToUse;
reporters.computeIfAbsent(serverToUse, k -> scanAttempts.createReporter(server, tabletId));
reporters.computeIfAbsent(serverToUse, k -> scanAttempts.createReporter(server));
}

ScanServerData ssd = new ScanServerData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import static org.apache.accumulo.core.spi.scan.RendezvousHasher.Mode.HOST;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -82,8 +84,7 @@ public class ConfigurableScanServerHostSelector extends ConfigurableScanServerSe
/**
* @return map of previous failure keyed on host name with a set of servers per host
*/
Map<String,Set<String>> computeFailuresByHost(TabletId tablet, SelectorParameters params) {
var attempts = params.getAttempts(tablet);
Map<String,Set<String>> computeFailuresByHost(Collection<? extends ScanServerAttempt> attempts) {
if (attempts.isEmpty()) {
return Map.of();
}
Expand Down Expand Up @@ -152,13 +153,25 @@ private List<String> getServersForHostAttempt(int hostAttempt, TabletId tablet,
}

@Override
int selectServers(SelectorParameters params, Profile profile, RendezvousHasher rhasher,
Map<TabletId,String> serversToUse) {
ScanServerSelections selectServers(ScanServerSelector.SelectorParameters params, Profile profile,
RendezvousHasher rhasher) {

int maxHostAttempt = 0;
int maxTabletErrors = 0;

HashMap<TabletId,String> serversToUse = new HashMap<>();

for (TabletId tablet : params.getTablets()) {
Map<String,Set<String>> prevFailures = computeFailuresByHost(tablet, params);
var attempts = params.getAttempts(tablet);
Map<String,Set<String>> prevFailures = computeFailuresByHost(attempts);

int tabletErrors = 0;
for (var attempt : attempts) {
if (attempt.getResult() == ScanServerAttempt.Result.ERROR) {
tabletErrors++;
}
}
maxTabletErrors = Math.max(tabletErrors, maxTabletErrors);

for (int hostAttempt = 0; hostAttempt < profile.getAttemptPlans().size(); hostAttempt++) {
maxHostAttempt = Math.max(hostAttempt, maxHostAttempt);
Expand All @@ -183,6 +196,24 @@ int selectServers(SelectorParameters params, Profile profile, RendezvousHasher r
}
}

return maxHostAttempt;
Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(maxHostAttempt));
Duration delay = computeDelay(maxTabletErrors);

return new ScanServerSelections() {
@Override
public String getScanServer(TabletId tabletId) {
return serversToUse.get(tabletId);
}

@Override
public Duration getDelay() {
return delay;
}

@Override
public Duration getBusyTimeout() {
return busyTO;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -424,34 +425,27 @@ public Duration getBusyTimeout() {
};
}

Map<TabletId,String> serversToUse = new HashMap<>();
return selectServers(params, profile, rhasher);
}

int maxAttempts = selectServers(params, profile, rhasher, serversToUse);
protected Duration computeDelay(int errorAttempts) {
if (errorAttempts == 0) {
return Duration.ZERO;
} else {
return Duration.ofMillis((long) Math.min(30_000, 100 * Math.pow(2, (errorAttempts - 1))));
}
}

Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(maxAttempts));
ScanServerSelections selectServers(ScanServerSelector.SelectorParameters params, Profile profile,
RendezvousHasher rhasher) {
int attempts = 0;
int errorAttempts = 0;

return new ScanServerSelections() {
@Override
public String getScanServer(TabletId tabletId) {
return serversToUse.get(tabletId);
}
HashMap<TabletId,String> serversToUse = new HashMap<>();

@Override
public Duration getDelay() {
return Duration.ZERO;
}

@Override
public Duration getBusyTimeout() {
return busyTO;
}
};
}

int selectServers(ScanServerSelector.SelectorParameters params, Profile profile,
RendezvousHasher rhasher, Map<TabletId,String> serversToUse) {
int attempts = params.getTablets().stream()
.mapToInt(tablet -> params.getAttempts(tablet).size()).max().orElse(0);
for (TabletId tablet : params.getTablets()) {
attempts = Math.max(attempts, params.getAttempts(tablet).size());
}

int numServers = profile.getNumServers(attempts,
rhasher.getSnapshot().getServersForGroup(profile.group).size());
Expand All @@ -461,9 +455,17 @@ int selectServers(ScanServerSelector.SelectorParameters params, Profile profile,

var tabletAttempts = params.getAttempts(tablet);
if (!tabletAttempts.isEmpty()) {
HashSet<String> attemptServers = new HashSet<>();
int errorCount = 0;
for (var attempt : tabletAttempts) {
attemptServers.add(attempt.getServer());
if (attempt.getResult() == ScanServerAttempt.Result.ERROR) {
errorCount++;
}
}
errorAttempts = Math.max(errorCount, errorAttempts);
// remove servers that failed in previous attempts
var attemptServers =
tabletAttempts.stream().map(ScanServerAttempt::getServer).collect(Collectors.toSet());

var copy = rendezvousServers.stream().filter(server -> !attemptServers.contains(server))
.collect(Collectors.toList());
if (!copy.isEmpty()) {
Expand All @@ -475,6 +477,25 @@ int selectServers(ScanServerSelector.SelectorParameters params, Profile profile,
String serverToUse = rendezvousServers.get(RANDOM.nextInt(rendezvousServers.size()));
serversToUse.put(tablet, serverToUse);
}
return attempts;

Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(attempts));
Duration delay = computeDelay(errorAttempts);

return new ScanServerSelections() {
@Override
public String getScanServer(TabletId tabletId) {
return serversToUse.get(tabletId);
}

@Override
public Duration getDelay() {
return delay;
}

@Override
public Duration getBusyTimeout() {
return busyTO;
}
};
}
}
Loading