KafkaAssignApp.java 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import org.apache.kafka.common.TopicPartition;
  6. import org.apache.kafka.common.serialization.StringDeserializer;
  7. import java.util.ArrayList;
  8. import java.util.Arrays;
  9. import java.util.Properties;
  10. public class KafkaAssignApp {
  11. public static final String TOPIC = "my-topic";
  12. public static final Integer TIMEOUT = 10;
  13. public static void main(String[] args) {
  14. Properties props = new Properties();
  15. // We use "put" since there are strings. Otherwise, use setProperty.
  16. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  17. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  18. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  19. // props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
  20. KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
  21. ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
  22. TopicPartition myTopicPart0 = new TopicPartition(TOPIC, 0);
  23. TopicPartition myTopicPart1 = new TopicPartition(TOPIC, 1);
  24. partitions.add(myTopicPart0);
  25. // Will block poll if the partition does not exist.
  26. partitions.add(myTopicPart1);
  27. myConsumer.assign(partitions);
  28. Boolean started = false;
  29. Integer pass = 0;
  30. try {
  31. while (true) {
  32. ConsumerRecords<String, String> records = myConsumer.poll(TIMEOUT);
  33. if (!started) {
  34. started = true;
  35. System.out.printf("Started");
  36. }
  37. for (ConsumerRecord<String, String> cr : records) {
  38. System.out.printf("Pass: %d/%d Topic: %s Partition: %d, Offset: %d, Key: %s Value: %s\n",
  39. pass, records.count(),
  40. cr.key(), cr.partition(), cr.offset(), cr.key(), cr.value());
  41. }
  42. pass++;
  43. }
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. } finally {
  47. myConsumer.close();
  48. }
  49. }
  50. }