123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Properties;
- public class KafkaConsumerApp {
- public static void main(String[] args) {
- Properties props = new Properties();
- KafkaConsumerCommon.configure(props);
- // The consumer Group ID is now required on subscribe().
- props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerCommon.GROUP);
- KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
- /* Let the consumer handle the partitions */
- myConsumer.subscribe(Arrays.asList(KafkaCommon.TOPIC));
- try {
- while (true) {
- KafkaConsumerCommon.process(myConsumer);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- myConsumer.close();
- }
- }
- private static void subscribeExamples(KafkaConsumer<String, String> c) {
- // In tutorial but does not compile: needs a rebalance callback
- // c.subscribe("my-*");
- // Maybe that way ?
- // c.subscribe(Pattern.compile("my-[\\w]+"), null);
- // Typical initial subscription.
- c.subscribe(Arrays.asList(KafkaCommon.TOPIC));
- // Replaces the current subscription set, does not extend it
- c.subscribe(Arrays.asList("another-topic"));
- // Better for incremental cases.
- ArrayList<String> topics = new ArrayList<String>();
- topics.add(KafkaCommon.TOPIC);
- topics.add("my-other-topic");
- topics.add("yetAnotherTopic");
- c.subscribe(topics);
- // Unsubcribe all topics.
- c.unsubscribe();
- // Alternative
- topics.clear();
- c.subscribe(topics);
- }
- }
|