Skip to content

Commit 85a46e3

Browse files
author
Suraj Nagaraja Kasi
authored
Add retry to query retention time for destination topic (#863)
* Add retry to query retention time for destination topic Since migrating to AdminClient to create destination topic it was observed that sometimes there is a lag in topic metadata to be synced to all brokers. This may result in getting incorrect (null) retention time. Adding a retry logic to account for delay in topic metadata sync. * Adding some extra log messages
1 parent 38d399a commit 85a46e3

2 files changed

Lines changed: 22 additions & 23 deletions

File tree

datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProviderAdmin.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.linkedin.datastream.common.DatastreamRuntimeException;
4040
import com.linkedin.datastream.common.DatastreamSource;
4141
import com.linkedin.datastream.common.DatastreamUtils;
42+
import com.linkedin.datastream.common.PollUtils;
4243
import com.linkedin.datastream.common.VerifiableProperties;
4344
import com.linkedin.datastream.metrics.BrooklinMetricInfo;
4445
import com.linkedin.datastream.server.DatastreamTask;
@@ -71,6 +72,8 @@ public class KafkaTransportProviderAdmin implements TransportProviderAdmin {
7172
private static final int DEFAULT_NUMBER_PARTITIONS = 1;
7273
private static final String DEFAULT_MIN_INSYNC_REPLICAS_CONFIG_VALUE = "2";
7374
private static final String METADATA_KAFKA_BROKERS = DatastreamMetadataConstants.SYSTEM_DESTINATION_PREFIX + "KafkaBrokers";
75+
private static final int GET_RETENTION_RETRY_PERIOD_MS = 2000;
76+
private static final int GET_RETENTION_RETRY_TIMEOUT_MS = 10000;
7477

7578
private final String _transportProviderMetricsNamesPrefix;
7679
private final int _numProducersPerConnector;
@@ -250,23 +253,26 @@ public Duration getRetention(Datastream datastream) {
250253
Validate.notNull(destination, "null destination URI");
251254
String topicName = KafkaTransportProviderUtils.getTopicName(destination);
252255

253-
try {
254-
// TODO: In rare cases it may happen that even though topic has been created, its metadata hasn't synced
255-
// to all the brokers yet and adminClient might query such a broker in which case topic retention will
256-
// not be present in the topic config. To circumvent this we need to query only the controller node in
257-
// Kafka cluster which will always return successful result, but that is more involved.
258-
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
259-
Map<ConfigResource, Config> topicConfigMap = adminClient.describeConfigs(Collections.singletonList(topicResource)).all().get();
260-
Config topicConfig = topicConfigMap.get(topicResource);
261-
ConfigEntry entry = topicConfig.get(TOPIC_RETENTION_MS);
262-
if (entry != null) {
263-
return Duration.ofMillis(Long.parseLong(entry.value()));
256+
// In rare cases it may happen that even though topic has been created, its metadata hasn't synced
257+
// to all the brokers yet and adminClient might query such a broker in which case topic retention will
258+
// not be present in the topic config. Therefore we retry for a few times before giving up.
259+
return PollUtils.poll(() -> {
260+
try {
261+
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
262+
Map<ConfigResource, Config> topicConfigMap =
263+
adminClient.describeConfigs(Collections.singletonList(topicResource)).all().get();
264+
Config topicConfig = topicConfigMap.get(topicResource);
265+
ConfigEntry entry = topicConfig.get(TOPIC_RETENTION_MS);
266+
if (entry != null) {
267+
LOG.info("Retention time for topic {} is {}", topicName, entry.value());
268+
return Duration.ofMillis(Long.parseLong(entry.value()));
269+
}
270+
} catch (ExecutionException e) {
271+
LOG.warn("Failed to retrieve config for topic {}.", topicName, e);
264272
}
265-
} catch (InterruptedException | ExecutionException e) {
266-
LOG.warn("Failed to retrieve config for topic {}.", topicName, e);
267-
}
268-
269-
return null;
273+
LOG.warn("Failed to retrieve retention time for topic {}", topicName);
274+
return null;
275+
}, Objects::nonNull, GET_RETENTION_RETRY_PERIOD_MS, GET_RETENTION_RETRY_TIMEOUT_MS).orElse(null);
270276
}
271277

272278
/**

datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2334,13 +2334,6 @@ private TestSetup createTestCoordinator() throws Exception {
23342334
}
23352335

23362336
private void validateRetention(Datastream stream, DatastreamResources resource, Duration expectedRetention) {
2337-
// Adding a sleep to simulate time required for topic metadata to be synced across all brokers of destination
2338-
// kafka cluster.
2339-
try {
2340-
Thread.sleep(5000);
2341-
} catch (Exception e) {
2342-
}
2343-
23442337
Datastream queryStream = resource.get(stream.getName());
23452338
Assert.assertNotNull(queryStream.getDestination());
23462339
StringMap metadata = queryStream.getMetadata();

0 commit comments

Comments
 (0)