import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Properties; public class KafkaConsumerCommon { public static final String GROUP = "test-group"; public static final String STRING_DESERIALIZER = StringDeserializer.class.getName(); public static final Integer TIMEOUT = 10; protected static Boolean started = false; protected static Integer pass = 0; static void configure(Properties props) { // We use "put" since there are strings. Otherwise, use setProperty. // These are the 3 required properties. // 1. Cluster membership: partition leaders, etc props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCommon.BROKERS); // 2. How keys and values are deserialized. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, STRING_DESERIALIZER); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, STRING_DESERIALIZER); } static void process(KafkaConsumer c) { ConsumerRecords records = c.poll(KafkaConsumerCommon.TIMEOUT); if (!started) { started = true; System.out.printf("Started"); } // for (ConsumerRecord cr : records.records(TOPIC)) { // System.out.printf("\t\tKey: %s Value: %s\n", cr.key(), cr.value()); // } for (ConsumerRecord cr : records) { System.out.printf("Pass: %d/%d Topic: %s Partition: %d, Offset: %d, Key: %s Value: %s\n", pass, records.count(), cr.key(), cr.partition(), cr.offset(), cr.key(), cr.value()); } pass++; } }