06 Consuming messages with Kafka Consumers and Consumer Groups
Very similar to producer.
Creating a consumer
(Un)Subscribing
- A single consumer can subscribe to any number of topics
- subscribe takes either a list of strings or a regex string,
- each call replaces the previous subscription list
- They can also unsubscribe all topics at once.
consumer.subscribe()
- automatic/dynamic partition assignment
- that single consumer listens to every partition on every listed topic
- entirely managed by the
KafkaConsumer
- will get newly added partitions to subscribed topics (
SubscriptionState
)
- now needs a valid
group.id
== ConsumerConfig.GROUP_ID_CONFIG
consumer.assign()
- used to subscribe to explicit partitions
- one or more partitions, regardless of topic
- manual, self-administering mode
- more of an advanced case
- does not needs a
group.id
- blocks instead of erroring if one of the partitions does not exist, in
updateFetchPositions.updateFetchPositions()
(see poll loop below).
The poll loop
- primary function of the
KafkaConsumer
- continuously poll the brokers for data
- single API for handling all Consumer <-> Broker interactions
- a lot of interactions beyond message retrieval
- when
consumer.assign()
or .subscribe()
is called, the contents of the topics
and partitions collections in metadata are used to set fields within the
SubscriptionState consumer.subscriptions
field
- this value is the source of truth for everything the consumer is assigned to
- most consuming operations interact with it and the
ConsumerCoordinator consumer.coordinator
field
- when
consumer.poll()
is invoked, it uses properties (starting with the bootstrap.servers
) to
request metadata about the cluster.
- the
Fetcher<K, V> consumer.fetcher
performs several fetch-related operations
between consumer and cluster
- it delegates actual communication to the
ConsumerNetworkClient consumer.client
- that consumer, in addition to the actual requests, performs the heartbeats
informing the cluster of the client health
- the fetch also requests (via the CNC) the cluster metadata, initially then
periodically afterwards
- it obtains information about what topics/partitions are available from the
SubscriptionState
- the
ConsumerCoordinator consumer.coordinator
uses the metadata to coordinate
the consumer.
- handles automatic/dynamic partition reassignment by notifying them to the SubscriptionState
- committing offsets to the cluster so the cluster is aware of the state of subscriptions,
for each topic and partition.
- the timeout applies to the time the CNC spends polling the cluster for messages
to return. It does not apply to the initial setup operations (hence poll remaining
stuck on nonexistent partitions during the initial update)
- it is a minimum amount of time the cycle will take, not a maximum
- when the timeout expires, a batch of records are returned
- they are parsed, deserialized, and stored by topic and partition internally
in the Fetcher
- once this is done, the fetcher returns this result for application processing.