1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Properties;
- public class KafkaConsumerApp {
- public static final String STRING_DESERIALIZER = StringDeserializer.class.getName();
- public static final String TOPIC = "my-topic";
- public static final Integer TIMEOUT = 200;
- public static void main(String[] args) {
- Properties props = new Properties();
- // We use "put" since there are strings. Otherwise, use setProperty.
- // These are the 3 required properties.
- // 1. Cluster membership: partition leaders, etc
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093");
- // 2. How keys and values are deserialized.
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- // 3. The consumer Group ID is now required.
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
- KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
- myConsumer.subscribe(Arrays.asList(TOPIC));
- Boolean started = false;
- Integer pass = 0;
- try {
- while (true) {
- ConsumerRecords<String, String> records = myConsumer.poll(TIMEOUT);
- if (!started) {
- started = true;
- System.out.printf("Started");
- }
- // for (ConsumerRecord<String, String> cr : records.records(TOPIC)) {
- // System.out.printf("\t\tKey: %s Value: %s\n", cr.key(), cr.value());
- // }
- for (ConsumerRecord<String, String> cr : records) {
- System.out.printf("Pass: %d/%d Topic: %s Partition: %d, Offset: %d, Key: %s Value: %s\n",
- pass, records.count(),
- cr.key(), cr.partition(), cr.offset(), cr.key(), cr.value());
- }
- pass++;
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- myConsumer.close();
- }
- }
- private static void assignDemo(KafkaConsumer<String, String> c) {
- TopicPartition partition0 = new TopicPartition(TOPIC, 0);
- ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
- partitions.add(partition0);
- c.assign(partitions); // NOT incremental !
- }
- private static void subscribeExamples(KafkaConsumer<String, String> c) {
- // In tutorial but does not compile: needs a rebalance callback
- // c.subscribe("my-*");
- // Maybe that way ?
- // c.subscribe(Pattern.compile("my-[\\w]+"), null);
- // Typical initial subscription.
- c.subscribe(Arrays.asList(TOPIC));
- // Replaces the current subscription set, does not extend it
- c.subscribe(Arrays.asList("another-topic"));
- // Better for incremental cases.
- ArrayList<String> topics = new ArrayList<String>();
- topics.add(TOPIC);
- topics.add("my-other-topic");
- topics.add("yetAnotherTopic");
- c.subscribe(topics);
- // Unsubcribe all topics.
- c.unsubscribe();
- // Alternative
- topics.clear();
- c.subscribe(topics);
- }
- }
|