|
24 | 24 | import java.util.concurrent.Future; |
25 | 25 | import java.util.concurrent.TimeUnit; |
26 | 26 | import java.util.function.Consumer; |
| 27 | +import java.util.Collection; |
27 | 28 | import java.util.regex.Pattern; |
28 | 29 | import java.util.List; |
29 | 30 | import java.util.ArrayList; |
|
32 | 33 | import okhttp3.OkHttpClient; |
33 | 34 | import okhttp3.Request; |
34 | 35 | import okhttp3.Response; |
| 36 | +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; |
35 | 37 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
36 | 38 | import org.apache.kafka.clients.consumer.ConsumerRecords; |
37 | 39 | import org.apache.kafka.clients.consumer.KafkaConsumer; |
38 | | -import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; |
39 | 40 | import org.apache.kafka.clients.producer.Callback; |
40 | 41 | import org.apache.kafka.clients.producer.KafkaProducer; |
41 | 42 | import org.apache.kafka.clients.producer.Producer; |
@@ -270,7 +271,17 @@ public void run() { |
270 | 271 | consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
271 | 272 | consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
272 | 273 | KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties); |
273 | | - consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener()); |
| 274 | + consumer.subscribe(topicPattern, new ConsumerRebalanceListener() { |
| 275 | + @Override |
| 276 | + public void onPartitionsRevoked(Collection<TopicPartition> collection) { |
| 277 | + |
| 278 | + } |
| 279 | + |
| 280 | + @Override |
| 281 | + public void onPartitionsAssigned(Collection<TopicPartition> collection) { |
| 282 | + |
| 283 | + } |
| 284 | + }); |
274 | 285 | while (true) { |
275 | 286 | if (pollAndInvoke(consumer)) break; |
276 | 287 | } |
|
0 commit comments