|
@@ -0,0 +1,77 @@
|
|
|
+# 06 Consuming messages with Kafka Consumers and Consumer Groups
|
|
|
+
|
|
|
+Very similar to producer.
|
|
|
+
|
|
|
+## Creating a consumer
|
|
|
+
|
|
|
+- Properties:
|
|
|
+ - `bootstrap.servers`
|
|
|
+ - `key.serializer`
|
|
|
+ - `value.serializer`
|
|
|
+ - and others
|
|
|
+ - => `KafkaProducer`
|
|
|
+ - [Consumer config doc](http://kafka.apache.org/documentation.html#consumerconfigs)
|
|
|
+- See [KafkaConsumerApp](../src/main/java/KafkaConsumerApp.java)
|
|
|
+
|
|
|
+## (Un)Subscribing
|
|
|
+
|
|
|
+- A single consumer can subscribe to any number of topics
|
|
|
+- subscribe takes either a list of strings or a regex string,
|
|
|
+ - each call replaces the previous subscription list
|
|
|
+- They can also unsubscribe all topics at once.
|
|
|
+
|
|
|
+### consumer.subscribe()
|
|
|
+
|
|
|
+- automatic/dynamic partition assignment
|
|
|
+ - that single consumer listens to every partition on every listed topic
|
|
|
+ - entirely managed by the `KafkaConsumer`
|
|
|
+ - will get newly added partitions to subscribed topics (`SubscriptionState`)
|
|
|
+- now needs a valid `group.id` == `ConsumerConfig.GROUP_ID_CONFIG`
|
|
|
+
|
|
|
+### consumer.assign()
|
|
|
+
|
|
|
+- used to subscribe to explicit partitions
|
|
|
+ - one or more partitions, regardless of topic
|
|
|
+ - manual, self-administering mode
|
|
|
+- more of an advanced case
|
|
|
+- does not needs a `group.id`
|
|
|
+- blocks instead of erroring if one of the partitions does not exist, in
|
|
|
+ `updateFetchPositions.updateFetchPositions()` (see poll loop below).
|
|
|
+
|
|
|
+
|
|
|
+## The poll loop
|
|
|
+
|
|
|
+- primary function of the `KafkaConsumer`
|
|
|
+- continuously poll the brokers for data
|
|
|
+- single API for handling all Consumer <-> Broker interactions
|
|
|
+ - a lot of interactions beyond message retrieval
|
|
|
+- when `consumer.assign()` or `.subscribe()` is called, the contents of the topics
|
|
|
+ and partitions collections in metadata are used to set fields within the
|
|
|
+ `SubscriptionState consumer.subscriptions` field
|
|
|
+ - this value is the source of truth for everything the consumer is assigned to
|
|
|
+ - most consuming operations interact with it and the `ConsumerCoordinator consumer.coordinator` field
|
|
|
+- when `consumer.poll()` is invoked, it uses properties (starting with the `bootstrap.servers`) to
|
|
|
+ request metadata about the cluster.
|
|
|
+- the `Fetcher<K, V> consumer.fetcher` performs several fetch-related operations
|
|
|
+ between consumer and cluster
|
|
|
+ - it delegates actual communication to the `ConsumerNetworkClient consumer.client`
|
|
|
+ - that consumer, in addition to the actual requests, performs the heartbeats
|
|
|
+ informing the cluster of the client health
|
|
|
+ - the fetch also requests (via the CNC) the cluster metadata, initially then
|
|
|
+ periodically afterwards
|
|
|
+ - it obtains information about what topics/partitions are available from the
|
|
|
+ `SubscriptionState`
|
|
|
+- the `ConsumerCoordinator consumer.coordinator` uses the metadata to coordinate
|
|
|
+ the consumer.
|
|
|
+ - handles automatic/dynamic partition reassignment by notifying them to the SubscriptionState
|
|
|
+ - committing offsets to the cluster so the cluster is aware of the state of subscriptions,
|
|
|
+ for each topic and partition.
|
|
|
+- the timeout applies to the time the CNC spends polling the cluster for messages
|
|
|
+ to return. It does not apply to the initial setup operations (hence poll remaining
|
|
|
+ stuck on nonexistent partitions during the initial update)
|
|
|
+ - it is a minimum amount of time the cycle will take, not a maximum
|
|
|
+- when the timeout expires, a batch of records are returned
|
|
|
+ - they are parsed, deserialized, and stored by topic and partition internally
|
|
|
+ in the Fetcher
|
|
|
+ - once this is done, the fetcher returns this result for application processing.
|
|
|
+
|