import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.ArrayList; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerApp { public static final String STRING_DESERIALIZER = StringDeserializer.class.getName(); public static final String TOPIC = "my-topic"; public static final Integer TIMEOUT = 200; public static void main(String[] args) { Properties props = new Properties(); // We use "put" since there are strings. Otherwise, use setProperty. // These are the 3 required properties. // 1. Cluster membership: partition leaders, etc props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093"); // 2. How keys and values are deserialized. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 3. The consumer Group ID is now required. props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo"); KafkaConsumer myConsumer = new KafkaConsumer(props); myConsumer.subscribe(Arrays.asList(TOPIC)); Boolean started = false; Integer pass = 0; try { while (true) { ConsumerRecords records = myConsumer.poll(TIMEOUT); if (!started) { started = true; System.out.printf("Started"); } // for (ConsumerRecord cr : records.records(TOPIC)) { // System.out.printf("\t\tKey: %s Value: %s\n", cr.key(), cr.value()); // } for (ConsumerRecord cr : records) { System.out.printf("Pass: %d/%d Topic: %s Partition: %d, Offset: %d, Key: %s Value: %s\n", pass, records.count(), cr.key(), cr.partition(), cr.offset(), cr.key(), cr.value()); } pass++; } } catch (Exception e) { e.printStackTrace(); } finally { myConsumer.close(); } } private static void assignDemo(KafkaConsumer c) { TopicPartition partition0 = new TopicPartition(TOPIC, 0); ArrayList partitions = new ArrayList(); partitions.add(partition0); c.assign(partitions); // NOT incremental ! } private static void subscribeExamples(KafkaConsumer c) { // In tutorial but does not compile: needs a rebalance callback // c.subscribe("my-*"); // Maybe that way ? // c.subscribe(Pattern.compile("my-[\\w]+"), null); // Typical initial subscription. c.subscribe(Arrays.asList(TOPIC)); // Replaces the current subscription set, does not extend it c.subscribe(Arrays.asList("another-topic")); // Better for incremental cases. ArrayList topics = new ArrayList(); topics.add(TOPIC); topics.add("my-other-topic"); topics.add("yetAnotherTopic"); c.subscribe(topics); // Unsubcribe all topics. c.unsubscribe(); // Alternative topics.clear(); c.subscribe(topics); } }