KafkaConsumerCommon.java 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import org.apache.kafka.common.serialization.StringDeserializer;
  6. import java.util.Properties;
  7. public class KafkaConsumerCommon {
  8. public static final String GROUP = "test-group";
  9. public static final String STRING_DESERIALIZER = StringDeserializer.class.getName();
  10. public static final Integer TIMEOUT = 10;
  11. protected static Boolean started = false;
  12. protected static Integer pass = 0;
  13. static void configure(Properties props) {
  14. // We use "put" since there are strings. Otherwise, use setProperty.
  15. // These are the 3 required properties.
  16. // 1. Cluster membership: partition leaders, etc
  17. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCommon.BROKERS);
  18. // 2. How keys and values are deserialized.
  19. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, STRING_DESERIALIZER);
  20. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, STRING_DESERIALIZER);
  21. }
  22. static void process(KafkaConsumer<String, String> c) {
  23. ConsumerRecords<String, String> records = c.poll(KafkaConsumerCommon.TIMEOUT);
  24. if (!started) {
  25. started = true;
  26. System.out.printf("Started");
  27. }
  28. // for (ConsumerRecord<String, String> cr : records.records(TOPIC)) {
  29. // System.out.printf("\t\tKey: %s Value: %s\n", cr.key(), cr.value());
  30. // }
  31. for (ConsumerRecord<String, String> cr : records) {
  32. System.out.printf("Pass: %d/%d Topic: %s Partition: %d, Offset: %d, Key: %s Value: %s\n",
  33. pass, records.count(),
  34. cr.key(), cr.partition(), cr.offset(), cr.key(), cr.value());
  35. }
  36. pass++;
  37. }
  38. }