KafkaConsumerApp.java 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import org.apache.kafka.common.TopicPartition;
  6. import org.apache.kafka.common.serialization.StringDeserializer;
  7. import java.util.ArrayList;
  8. import java.util.Arrays;
  9. import java.util.Properties;
  10. public class KafkaConsumerApp {
  11. public static final String STRING_DESERIALIZER = StringDeserializer.class.getName();
  12. public static final String TOPIC = "my-topic";
  13. public static final Integer TIMEOUT = 200;
  14. public static void main(String[] args) {
  15. Properties props = new Properties();
  16. // We use "put" since there are strings. Otherwise, use setProperty.
  17. // These are the 3 required properties.
  18. // 1. Cluster membership: partition leaders, etc
  19. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093");
  20. // 2. How keys and values are deserialized.
  21. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  22. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  23. // 3. The consumer Group ID is now required.
  24. props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
  25. KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
  26. myConsumer.subscribe(Arrays.asList(TOPIC));
  27. Boolean started = false;
  28. Integer pass = 0;
  29. try {
  30. while (true) {
  31. ConsumerRecords<String, String> records = myConsumer.poll(TIMEOUT);
  32. if (!started) {
  33. started = true;
  34. System.out.printf("Started");
  35. }
  36. // for (ConsumerRecord<String, String> cr : records.records(TOPIC)) {
  37. // System.out.printf("\t\tKey: %s Value: %s\n", cr.key(), cr.value());
  38. // }
  39. for (ConsumerRecord<String, String> cr : records) {
  40. System.out.printf("Pass: %d/%d Topic: %s Partition: %d, Offset: %d, Key: %s Value: %s\n",
  41. pass, records.count(),
  42. cr.key(), cr.partition(), cr.offset(), cr.key(), cr.value());
  43. }
  44. pass++;
  45. }
  46. } catch (Exception e) {
  47. e.printStackTrace();
  48. } finally {
  49. myConsumer.close();
  50. }
  51. }
  52. private static void assignDemo(KafkaConsumer<String, String> c) {
  53. TopicPartition partition0 = new TopicPartition(TOPIC, 0);
  54. ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
  55. partitions.add(partition0);
  56. c.assign(partitions); // NOT incremental !
  57. }
  58. private static void subscribeExamples(KafkaConsumer<String, String> c) {
  59. // In tutorial but does not compile: needs a rebalance callback
  60. // c.subscribe("my-*");
  61. // Maybe that way ?
  62. // c.subscribe(Pattern.compile("my-[\\w]+"), null);
  63. // Typical initial subscription.
  64. c.subscribe(Arrays.asList(TOPIC));
  65. // Replaces the current subscription set, does not extend it
  66. c.subscribe(Arrays.asList("another-topic"));
  67. // Better for incremental cases.
  68. ArrayList<String> topics = new ArrayList<String>();
  69. topics.add(TOPIC);
  70. topics.add("my-other-topic");
  71. topics.add("yetAnotherTopic");
  72. c.subscribe(topics);
  73. // Unsubcribe all topics.
  74. c.unsubscribe();
  75. // Alternative
  76. topics.clear();
  77. c.subscribe(topics);
  78. }
  79. }