import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.ArrayList; import java.util.Properties; public class KafkaAssignApp { public static void main(String[] args) { Properties props = new Properties(); // We use "put" since there are strings. Otherwise, use setProperty. // These are the 3 required properties. // 1. Cluster membership: partition leaders, etc props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093"); // 2. How keys and values are deserialized. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER); // 3. The consumer Group ID is NOT required for assign; // props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo"); KafkaConsumer myConsumer = new KafkaConsumer(props); ArrayList partitions = new ArrayList(); TopicPartition myTopicPart0 = new TopicPartition(KafkaCommon.TOPIC, 0); TopicPartition myTopicPart1 = new TopicPartition(KafkaCommon.TOPIC, 1); partitions.add(myTopicPart0); // Will block poll if the partition does not exist. partitions.add(myTopicPart1); /* Don't let the consumer handle the partitions */ myConsumer.assign(partitions); try { while (true) { ConsumerRecords records = myConsumer.poll(KafkaConsumerCommon.TIMEOUT); KafkaConsumerCommon.process(records); } } catch (Exception e) { e.printStackTrace(); } finally { myConsumer.close(); } } private static void assignExamples(KafkaConsumer c) { TopicPartition partition0 = new TopicPartition(KafkaCommon.TOPIC, 0); ArrayList partitions = new ArrayList(); partitions.add(partition0); c.assign(partitions); // NOT incremental ! } }