KafkaConsumerApp.java 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerRecords;
  3. import org.apache.kafka.clients.consumer.KafkaConsumer;
  4. import java.util.ArrayList;
  5. import java.util.Arrays;
  6. import java.util.Properties;
  7. public class KafkaConsumerApp {
  8. public static void main(String[] args) {
  9. Properties props = new Properties();
  10. KafkaConsumerCommon.configure(props);
  11. // The consumer Group ID is now required on subscribe().
  12. props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerCommon.GROUP);
  13. KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
  14. /* Let the consumer handle the partitions */
  15. myConsumer.subscribe(Arrays.asList(KafkaCommon.TOPIC));
  16. try {
  17. while (true) {
  18. KafkaConsumerCommon.process(myConsumer);
  19. }
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. } finally {
  23. myConsumer.close();
  24. }
  25. }
  26. private static void subscribeExamples(KafkaConsumer<String, String> c) {
  27. // In tutorial but does not compile: needs a rebalance callback
  28. // c.subscribe("my-*");
  29. // Maybe that way ?
  30. // c.subscribe(Pattern.compile("my-[\\w]+"), null);
  31. // Typical initial subscription.
  32. c.subscribe(Arrays.asList(KafkaCommon.TOPIC));
  33. // Replaces the current subscription set, does not extend it
  34. c.subscribe(Arrays.asList("another-topic"));
  35. // Better for incremental cases.
  36. ArrayList<String> topics = new ArrayList<String>();
  37. topics.add(KafkaCommon.TOPIC);
  38. topics.add("my-other-topic");
  39. topics.add("yetAnotherTopic");
  40. c.subscribe(topics);
  41. // Unsubcribe all topics.
  42. c.unsubscribe();
  43. // Alternative
  44. topics.clear();
  45. c.subscribe(topics);
  46. }
  47. }