1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 |
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import java.util.ArrayList;
- import java.util.Properties;
- public class KafkaAssignApp {
- public static void main(String[] args) {
- Properties props = new Properties();
- KafkaConsumerCommon.configure(props);
- // The consumer Group ID is NOT required for assign;
- // props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
- KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
- ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
- TopicPartition myTopicPart0 = new TopicPartition(KafkaCommon.TOPIC, 0);
- TopicPartition myTopicPart1 = new TopicPartition(KafkaCommon.TOPIC, 1);
- partitions.add(myTopicPart0);
- // Will block poll if the partition does not exist.
- partitions.add(myTopicPart1);
- /* Don't let the consumer handle the partitions */
- myConsumer.assign(partitions);
- try {
- while (true) {
- KafkaConsumerCommon.process(myConsumer);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- myConsumer.close();
- }
- }
- private static void assignExamples(KafkaConsumer<String, String> c) {
- TopicPartition partition0 = new TopicPartition(KafkaCommon.TOPIC, 0);
- ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
- partitions.add(partition0);
- c.assign(partitions); // NOT incremental !
- }
- }
|