06 Consuming messages with Kafka Consumers and Consumer Groups
Very similar to producer.
Creating a consumer
(Un)Subscribing
- A single consumer can subscribe to any number of topics
- subscribe takes either a list of strings or a regex string,
- each call replaces the previous subscription list
- They can also unsubscribe all topics at once.
consumer.subscribe()
- automatic/dynamic partition assignment
- that single consumer listens to every partition on every listed topic
- entirely managed by the
KafkaConsumer
- will get newly added partitions to subscribed topics (
SubscriptionState
)
- now needs a valid
group.id
== ConsumerConfig.GROUP_ID_CONFIG
consumer.assign()
- used to subscribe to explicit partitions
- one or more partitions, regardless of topic
- manual, self-administering mode
- more of an advanced case
- does not needs a
group.id
- blocks instead of erroring if one of the partitions does not exist, in
updateFetchPositions.updateFetchPositions()
(see poll loop below).
The poll loop
- primary function of the
KafkaConsumer
- continuously poll the brokers for data
- single API for handling all Consumer <-> Broker interactions
- a lot of interactions beyond message retrieval
- when
consumer.assign()
or .subscribe()
is called, the contents of the topics and partitions collections in
metadata are used to set fields within the
SubscriptionState consumer.subscriptions
field
- this value is the source of truth for everything the consumer is assigned to
- most consuming operations interact with it and the
ConsumerCoordinator consumer.coordinator
field
- when
consumer.poll()
is invoked, it uses properties (starting with the bootstrap.servers
) to request metadata
about the cluster.
- the
Fetcher<K, V> consumer.fetcher
performs several fetch-related operations between consumer and cluster
- it delegates actual communication to the
ConsumerNetworkClient consumer.client
- that consumer, in addition to the actual requests, performs the heartbeats informing the cluster of the client
health
- the fetch also requests (via the CNC) the cluster metadata, initially then periodically afterwards
- it obtains information about what topics/partitions are available from the
SubscriptionState
- the
ConsumerCoordinator consumer.coordinator
uses the metadata to coordinate the consumer.
- handles automatic/dynamic partition reassignment by notifying them to the SubscriptionState
- committing offsets to the cluster so the cluster is aware of the state of subscriptions, for each topic and
partition.
- the timeout applies to the time the CNC spends polling the cluster for messages to return. It does not apply to the
initial setup operations (hence poll remaining stuck on nonexistent partitions during the initial update)
- it is a minimum amount of time the cycle will take, not a maximum
- when the timeout expires, a batch of records are returned
- they are parsed, deserialized, and stored by topic and partition internally in the Fetcher
- once this is done, the fetcher returns this result for application processing.
The poll()
process is a single-threaded operation:
- 1 poll loop per-consumer
- 1 thread per-consumer
This is for internal simplicity and to force parallel consumption to be performed otherwise.
Processing messages
The poll()
method returns a ConsumerRecords
, a collection of ConsumerRecord
:
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
/* ... */
}
- Since the
poll()
runs in a single thread, there is nothing running while the records are being processed
- The more topics and partitions a single process subscribes to, the more it has to do within that single polling loop,
which can make the consumer slow
Offsets and positions
Just because something is read does not mean it is committed:
- There are different categories of offsets, representing the stage they're in:
- a consumer needs to know what it has vs has not read
- what it confirms it has read (and processed) is the last committed offset
- this is whence consuming starts for a given partition, subject to offset reset
- Each partition is exclusive with regard to consumer offsets: for any topic, a
consumer is tracking one offset per partition it is subscribed to
- Consumers read at the current position towards the log-end offset.
- Offsets from the last committed to the current position are uncommitted.
- Two optional offset commit properties control these offsets:
enable.auto.commit
lets Kafka decide when current position offsets are
upgraded to full committed offsets. Defaults to true
.
- Since it has no knowledge of what work is actually performed, it can only
use a delay for that. That delay is the
auto.commit.interval
, in msec,
which defaults to 5000, which is a lot for high-throughput cases.
The extent to which your system can be tolerant of eventual consistency
is detemined by its reliability.
By default, consumers start reading from a new partition at the latest
committed offset.
- Optional property:
auto.offset.reset
can be earliest
, latest
(default), or none
which
throws an exception and lets code decide. See "rebalancing" below.
Offset choice is different depending on whether the topology has a single consumer,
or a ConsumerGroup.
- Kafka stores offsets in a special topic called
__consumer_offsets
, with 50 partitions.
- They are produced by the
ConsumerCoordinator
, which means a Consumer is also
a Producer, but for just that topic. Once they have been committed, the coordinator
updates the SubscriptionState accordingly, allowing the Fetch to know which
offsets it should be retrieving.
Manual committing
There are two methods to commit:
commitSync()
: used to achieve exact control of when the record is truly processed;
when one doesn't want to process newer records until the older ones are committed.
- It should be called after processing a batch of ConsumerRecord, not just a
single record, which increases latency to no avail
- It blocks until it receives a response from the cluster
- It may fail, meaning a recovery process must be started. It retries automatically
until it succeeds, or it receives an unrecoverable error.
- The
retry.backoff.ms
== RETRY_BACKOFF_MS_CONFIG
(default: 100) property defines the retry delay, similarly
to the producer backoff (for good reason)
- Using this manual committing is a tradeoff between throughput/performance (less)
and control over consistency (more), as this adds some latency to the polling
process.
commitAsync()
- does not retry automatically, because it can't know whether the previous
commit succeeded or failed, which could lead to ordering and duplication issues
- instead, it accepts a callback, triggered on receiving the commit response
from the cluster.
- better throughput because it doesn't block the polling process.
- Don't use it without registering the callback and handling it as needed.
Deciding on a commit strategy
- Consistency control:
- need to know when something is "done" (define "done" first)
- Atomicity:
- ability to treat consumption and processing as a single atomic operation
- obtaining exactly-once semantics instead of at-least-once
Scaling out consumers
- Scaling a single-thread, single-consumer app to the bandwidth, number of topics
and partitions of a full K cluster is not realistic
- The solution is to scale out consuming to more consumers, but they can't jusst
consumer anything without synchronizing in some way
- This is the reason for Consumer groups: a collection of independent Consumer
working as a team, i.e. declaring the same
group.id
.
- This allows them to share the message consumption and processing load, with more parallelism.
- It allows more redundancy: failure or limitations of a given consumer are
automatically handled and balanced by K.
- It offers more performance, with the ability to support a large backlog
- A Consumer group is create when individual consumers
- with a commmon
group.id
...
- invoke the
subscribe()
method...
- and pass a common topics list.
- One of the brokers gets elected as the
GroupCoordinator
for that topic.
- Its job is to monitor and maintain group membership.
- It collaborates with the
ClusterCoordinator
and ZooKeeper to monitor and
assign partitions within a topic to individual consumers in the group.
- As soon as a Consumer group is formed, each consumer is sending heartbeats,
- configured by properties:
heartbeat.interval.ms
== HEARTBEAT_INTERVAL_MS_CONFIG
(default 3000 msec):
the interval between heartbeat sends
session.timeout.ms
== SESSION_TIMEOUT_MS_CONFIG
(default 30000 msec)
- the CG coordinator relies on these heartbeats to evalue whether the consumer
is alive and able to participate in the group
- if the coordinator does not receive heartbeat during a "total time" (?)
larger than
session.timeout.ms
, it will consider the consumer failed and take
corrective action, following its priority: ensuring that the purpose of the
group (sharing the load of consuming those topics) is being met.
- these corrections are consumer rebalance operations, which is complex
- remaining consumers now need to absorb the workload no longer handled
by the failed consumer
- they need to find up to where the failed consumer had worked (commit offset)
for all partitions, and catch up without creating duplicates.
- the ability to perform these rebalances is critical to cluster health.
- example 1: the failed consumer handled messages but could not commit
them; in that case the new consumer are likely to re-process them,
possibly introducing duplicates
- example 2: a new consumer joins the group
- example 3: a new partition is added to the topic
- Applications are configured with a consumer group to handle their topics
Rebalancing
The offset at which a new consumer in a group starts consuming is defined by
the auto.offset.reset
== AUTO_OFFSET_RESET_CONFIG
property.
If the rebalance was triggered at a point when a previous consumer had already
read but not yet committed some offset, the new consumer is likely to read it again.
The primary purpose of the Group Coordinator is to evenly ablance available consumers
to partitions.
- If possible, it will assign a 1:1 consumer/partition ratio.
- If there are more consumers than partitions, it will let the extra consumers idle,
leading to over-provisioning
- Apparently (to be confirmed), even if there are more partitions than consumers,
it will not share a partition across multiple consumers
- If a new partition becomes available, or a consumer fails, or is added,
the Group Coordinator initiates the rebalancing protocol, engaging each Consumer coordinator
Consumer configuration: performance and efficiency
fetch.min.bytes
: minimum number of bytes returned from the poll. Analogous
to the batch.size
settings on the producer
max.fetch.wait.ms
maximum time to wait if that threshold has not yet been met.
Analogous to the linger.ms
setting on the producer
max.partition.fetch.bytes
: maximum number of bytes per partition returned in a
poll()
call, to avoid spending too much time on processing before calling
poll()
again.
max.poll.records
similar but in terms of records, not bytes, and not per partition
Advanced topics not covered
- Consumer position control
seek(TopicPartition, long)
: specify the offset from which to read on a
given partition
seekToBeginning(Collection<TopicPartition>)
: seek from the beginning of a group of topics and partitions
seekToEnd(Collection<TopicPartition>)
: its opposite
- Flow control allows pausing/resuming subscription to some topics or partitions
pause(TopicPartition)
resume(TopicPartition)
- mostly useful when a single consumer has to read multiple topics and partitions
- Rebalance listeners: get notified when rebalances occur in the cluster, to adjust
in consumer code how to handle the offsets.