KafkaAssignApp.java 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerRecords;
  3. import org.apache.kafka.clients.consumer.KafkaConsumer;
  4. import org.apache.kafka.common.TopicPartition;
  5. import java.util.ArrayList;
  6. import java.util.Properties;
  7. public class KafkaAssignApp {
  8. public static void main(String[] args) {
  9. Properties props = new Properties();
  10. // We use "put" since there are strings. Otherwise, use setProperty.
  11. // These are the 3 required properties.
  12. // 1. Cluster membership: partition leaders, etc
  13. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093");
  14. // 2. How keys and values are deserialized.
  15. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER);
  16. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER);
  17. // 3. The consumer Group ID is NOT required for assign;
  18. // props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
  19. KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
  20. ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
  21. TopicPartition myTopicPart0 = new TopicPartition(KafkaCommon.TOPIC, 0);
  22. TopicPartition myTopicPart1 = new TopicPartition(KafkaCommon.TOPIC, 1);
  23. partitions.add(myTopicPart0);
  24. // Will block poll if the partition does not exist.
  25. partitions.add(myTopicPart1);
  26. /* Don't let the consumer handle the partitions */
  27. myConsumer.assign(partitions);
  28. try {
  29. while (true) {
  30. ConsumerRecords<String, String> records = myConsumer.poll(KafkaConsumerCommon.TIMEOUT);
  31. KafkaConsumerCommon.process(records);
  32. }
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. } finally {
  36. myConsumer.close();
  37. }
  38. }
  39. private static void assignExamples(KafkaConsumer<String, String> c) {
  40. TopicPartition partition0 = new TopicPartition(KafkaCommon.TOPIC, 0);
  41. ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
  42. partitions.add(partition0);
  43. c.assign(partitions); // NOT incremental !
  44. }
  45. }