KafkaAssignApp.java 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. import org.apache.kafka.clients.consumer.KafkaConsumer;
  2. import org.apache.kafka.common.TopicPartition;
  3. import java.util.ArrayList;
  4. import java.util.Properties;
  5. public class KafkaAssignApp {
  6. public static void main(String[] args) {
  7. Properties props = new Properties();
  8. KafkaConsumerCommon.configure(props);
  9. // The consumer Group ID is NOT required for assign;
  10. // props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
  11. KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
  12. ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
  13. TopicPartition myTopicPart0 = new TopicPartition(KafkaCommon.TOPIC, 0);
  14. TopicPartition myTopicPart1 = new TopicPartition(KafkaCommon.TOPIC, 1);
  15. partitions.add(myTopicPart0);
  16. // Will block poll if the partition does not exist.
  17. partitions.add(myTopicPart1);
  18. /* Don't let the consumer handle the partitions */
  19. myConsumer.assign(partitions);
  20. try {
  21. while (true) {
  22. KafkaConsumerCommon.process(myConsumer);
  23. }
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. } finally {
  27. myConsumer.close();
  28. }
  29. }
  30. private static void assignExamples(KafkaConsumer<String, String> c) {
  31. TopicPartition partition0 = new TopicPartition(KafkaCommon.TOPIC, 0);
  32. ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
  33. partitions.add(partition0);
  34. c.assign(partitions); // NOT incremental !
  35. }
  36. }