import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerApp { public static void main(String[] args) { Properties props = new Properties(); KafkaConsumerCommon.configure(props); // The consumer Group ID is now required on subscribe(). props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerCommon.GROUP); KafkaConsumer myConsumer = new KafkaConsumer(props); /* Let the consumer handle the partitions */ myConsumer.subscribe(Arrays.asList(KafkaCommon.TOPIC)); try { while (true) { KafkaConsumerCommon.process(myConsumer); } } catch (Exception e) { e.printStackTrace(); } finally { myConsumer.close(); } } private static void subscribeExamples(KafkaConsumer c) { // In tutorial but does not compile: needs a rebalance callback // c.subscribe("my-*"); // Maybe that way ? // c.subscribe(Pattern.compile("my-[\\w]+"), null); // Typical initial subscription. c.subscribe(Arrays.asList(KafkaCommon.TOPIC)); // Replaces the current subscription set, does not extend it c.subscribe(Arrays.asList("another-topic")); // Better for incremental cases. ArrayList topics = new ArrayList(); topics.add(KafkaCommon.TOPIC); topics.add("my-other-topic"); topics.add("yetAnotherTopic"); c.subscribe(topics); // Unsubcribe all topics. c.unsubscribe(); // Alternative topics.clear(); c.subscribe(topics); } }