12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import java.util.Properties;
- public class KafkaConsumerCommon {
- public static final String GROUP = "test-group";
- public static final String STRING_DESERIALIZER = StringDeserializer.class.getName();
- public static final Integer TIMEOUT = 10;
- protected static Boolean started = false;
- protected static Integer pass = 0;
- static void configure(Properties props) {
- // We use "put" since there are strings. Otherwise, use setProperty.
- // These are the 3 required properties.
- // 1. Cluster membership: partition leaders, etc
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCommon.BROKERS);
- // 2. How keys and values are deserialized.
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, STRING_DESERIALIZER);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, STRING_DESERIALIZER);
- }
- static void process(KafkaConsumer<String, String> c) {
- ConsumerRecords<String, String> records = c.poll(KafkaConsumerCommon.TIMEOUT);
- if (!started) {
- started = true;
- System.out.printf("Started");
- }
- // for (ConsumerRecord<String, String> cr : records.records(TOPIC)) {
- // System.out.printf("\t\tKey: %s Value: %s\n", cr.key(), cr.value());
- // }
- for (ConsumerRecord<String, String> cr : records) {
- System.out.printf("Pass: %d/%d Topic: %s Partition: %d, Offset: %d, Key: %s Value: %s\n",
- pass, records.count(),
- cr.key(), cr.partition(), cr.offset(), cr.key(), cr.value());
- }
- pass++;
- }
- }
|