KafkaConsumerApp.java 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerRecords;
  3. import org.apache.kafka.clients.consumer.KafkaConsumer;
  4. import java.util.ArrayList;
  5. import java.util.Arrays;
  6. import java.util.Properties;
  7. public class KafkaConsumerApp {
  8. public static void main(String[] args) {
  9. Properties props = new Properties();
  10. // We use "put" since there are strings. Otherwise, use setProperty.
  11. // These are the 3 required properties.
  12. // 1. Cluster membership: partition leaders, etc
  13. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093");
  14. // 2. How keys and values are deserialized.
  15. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER);
  16. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER);
  17. // 3. The consumer Group ID is now required.
  18. props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
  19. KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
  20. /* Let the consumer handle the partitions */
  21. myConsumer.subscribe(Arrays.asList(KafkaCommon.TOPIC));
  22. try {
  23. while (true) {
  24. ConsumerRecords<String, String> records = myConsumer.poll(KafkaConsumerCommon.TIMEOUT);
  25. KafkaConsumerCommon.process(records);
  26. }
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. } finally {
  30. myConsumer.close();
  31. }
  32. }
  33. private static void subscribeExamples(KafkaConsumer<String, String> c) {
  34. // In tutorial but does not compile: needs a rebalance callback
  35. // c.subscribe("my-*");
  36. // Maybe that way ?
  37. // c.subscribe(Pattern.compile("my-[\\w]+"), null);
  38. // Typical initial subscription.
  39. c.subscribe(Arrays.asList(KafkaCommon.TOPIC));
  40. // Replaces the current subscription set, does not extend it
  41. c.subscribe(Arrays.asList("another-topic"));
  42. // Better for incremental cases.
  43. ArrayList<String> topics = new ArrayList<String>();
  44. topics.add(KafkaCommon.TOPIC);
  45. topics.add("my-other-topic");
  46. topics.add("yetAnotherTopic");
  47. c.subscribe(topics);
  48. // Unsubcribe all topics.
  49. c.unsubscribe();
  50. // Alternative
  51. topics.clear();
  52. c.subscribe(topics);
  53. }
  54. }