06 Consuming messages.md 13 KB

06 Consuming messages with Kafka Consumers and Consumer Groups

Very similar to producer.

Creating a consumer

(Un)Subscribing

  • A single consumer can subscribe to any number of topics
  • subscribe takes either a list of strings or a regex string,
    • each call replaces the previous subscription list
  • They can also unsubscribe all topics at once.

consumer.subscribe()

  • automatic/dynamic partition assignment
    • that single consumer listens to every partition on every listed topic
    • entirely managed by the KafkaConsumer
      • will get newly added partitions to subscribed topics (SubscriptionState)
  • now needs a valid group.id == ConsumerConfig.GROUP_ID_CONFIG

consumer.assign()

  • used to subscribe to explicit partitions
    • one or more partitions, regardless of topic
    • manual, self-administering mode
  • more of an advanced case
  • does not needs a group.id
  • blocks instead of erroring if one of the partitions does not exist, in updateFetchPositions.updateFetchPositions() (see poll loop below).

The poll loop

  • primary function of the KafkaConsumer
  • continuously poll the brokers for data
  • single API for handling all Consumer <-> Broker interactions
    • a lot of interactions beyond message retrieval
  • when consumer.assign() or .subscribe() is called, the contents of the topics and partitions collections in metadata are used to set fields within the SubscriptionState consumer.subscriptions field
    • this value is the source of truth for everything the consumer is assigned to
    • most consuming operations interact with it and the ConsumerCoordinator consumer.coordinator field
  • when consumer.poll() is invoked, it uses properties (starting with the bootstrap.servers) to request metadata about the cluster.
  • the Fetcher<K, V> consumer.fetcher performs several fetch-related operations between consumer and cluster
    • it delegates actual communication to the ConsumerNetworkClient consumer.client
    • that consumer, in addition to the actual requests, performs the heartbeats informing the cluster of the client health
    • the fetch also requests (via the CNC) the cluster metadata, initially then periodically afterwards
    • it obtains information about what topics/partitions are available from the SubscriptionState
  • the ConsumerCoordinator consumer.coordinator uses the metadata to coordinate the consumer.
    • handles automatic/dynamic partition reassignment by notifying them to the SubscriptionState
    • committing offsets to the cluster so the cluster is aware of the state of subscriptions, for each topic and partition.
  • the timeout applies to the time the CNC spends polling the cluster for messages to return. It does not apply to the initial setup operations (hence poll remaining stuck on nonexistent partitions during the initial update)
    • it is a minimum amount of time the cycle will take, not a maximum
  • when the timeout expires, a batch of records are returned
    • they are parsed, deserialized, and stored by topic and partition internally in the Fetcher
    • once this is done, the fetcher returns this result for application processing.

The poll() process is a single-threaded operation:

  • 1 poll loop per-consumer
  • 1 thread per-consumer

This is for internal simplicity and to force parallel consumption to be performed otherwise.

Processing messages

The poll() method returns a ConsumerRecords, a collection of ConsumerRecord:

public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
    /* ... */
}
  • Since the poll() runs in a single thread, there is nothing running while the records are being processed
  • The more topics and partitions a single process subscribes to, the more it has to do within that single polling loop, which can make the consumer slow

Offsets and positions

Just because something is read does not mean it is committed:

  • There are different categories of offsets, representing the stage they're in:
    • a consumer needs to know what it has vs has not read
    • what it confirms it has read (and processed) is the last committed offset
    • this is whence consuming starts for a given partition, subject to offset reset
  • Each partition is exclusive with regard to consumer offsets: for any topic, a consumer is tracking one offset per partition it is subscribed to
  • Consumers read at the current position towards the log-end offset.
  • Offsets from the last committed to the current position are uncommitted.
  • Two optional offset commit properties control these offsets:
    • enable.auto.commit lets Kafka decide when current position offsets are upgraded to full committed offsets. Defaults to true.
    • Since it has no knowledge of what work is actually performed, it can only use a delay for that. That delay is the auto.commit.interval, in msec, which defaults to 5000, which is a lot for high-throughput cases.
The extent to which your system can be tolerant of eventual consistency is detemined by its reliability.

By default, consumers start reading from a new partition at the latest committed offset.

  • Optional property:
    • auto.offset.reset can be earliest, latest (default), or none which throws an exception and lets code decide. See "rebalancing" below.

Offset choice is different depending on whether the topology has a single consumer, or a ConsumerGroup.

  • Kafka stores offsets in a special topic called __consumer_offsets, with 50 partitions.
  • They are produced by the ConsumerCoordinator, which means a Consumer is also a Producer, but for just that topic. Once they have been committed, the coordinator updates the SubscriptionState accordingly, allowing the Fetch to know which offsets it should be retrieving.

Manual committing

There are two methods to commit:

  • commitSync(): used to achieve exact control of when the record is truly processed; when one doesn't want to process newer records until the older ones are committed.
    • It should be called after processing a batch of ConsumerRecord, not just a single record, which increases latency to no avail
    • It blocks until it receives a response from the cluster
    • It may fail, meaning a recovery process must be started. It retries automatically until it succeeds, or it receives an unrecoverable error.
    • The retry.backoff.ms == RETRY_BACKOFF_MS_CONFIG (default: 100) property defines the retry delay, similarly to the producer backoff (for good reason)
    • Using this manual committing is a tradeoff between throughput/performance (less) and control over consistency (more), as this adds some latency to the polling process.
  • commitAsync()
    • does not retry automatically, because it can't know whether the previous commit succeeded or failed, which could lead to ordering and duplication issues
    • instead, it accepts a callback, triggered on receiving the commit response from the cluster.
    • better throughput because it doesn't block the polling process.
    • Don't use it without registering the callback and handling it as needed.

Deciding on a commit strategy

  • Consistency control:
    • need to know when something is "done" (define "done" first)
  • Atomicity:
    • ability to treat consumption and processing as a single atomic operation
    • obtaining exactly-once semantics instead of at-least-once

Scaling out consumers

  • Scaling a single-thread, single-consumer app to the bandwidth, number of topics and partitions of a full K cluster is not realistic
  • The solution is to scale out consuming to more consumers, but they can't jusst consumer anything without synchronizing in some way
  • This is the reason for Consumer groups: a collection of independent Consumer working as a team, i.e. declaring the same group.id.
    • This allows them to share the message consumption and processing load, with more parallelism.
    • It allows more redundancy: failure or limitations of a given consumer are automatically handled and balanced by K.
    • It offers more performance, with the ability to support a large backlog
  • A Consumer group is create when individual consumers
    • with a commmon group.id...
    • invoke the subscribe() method...
    • and pass a common topics list.
  • One of the brokers gets elected as the GroupCoordinator for that topic.
    • Its job is to monitor and maintain group membership.
    • It collaborates with the ClusterCoordinator and ZooKeeper to monitor and assign partitions within a topic to individual consumers in the group.
  • As soon as a Consumer group is formed, each consumer is sending heartbeats,
    • configured by properties:
      • heartbeat.interval.ms == HEARTBEAT_INTERVAL_MS_CONFIG (default 3000 msec): the interval between heartbeat sends
      • session.timeout.ms == SESSION_TIMEOUT_MS_CONFIG (default 30000 msec)
    • the CG coordinator relies on these heartbeats to evalue whether the consumer is alive and able to participate in the group
    • if the coordinator does not receive heartbeat during a "total time" (?) larger than session.timeout.ms, it will consider the consumer failed and take corrective action, following its priority: ensuring that the purpose of the group (sharing the load of consuming those topics) is being met.
    • these corrections are consumer rebalance operations, which is complex
      • remaining consumers now need to absorb the workload no longer handled by the failed consumer
      • they need to find up to where the failed consumer had worked (commit offset) for all partitions, and catch up without creating duplicates.
      • the ability to perform these rebalances is critical to cluster health.
      • example 1: the failed consumer handled messages but could not commit them; in that case the new consumer are likely to re-process them, possibly introducing duplicates
      • example 2: a new consumer joins the group
      • example 3: a new partition is added to the topic
  • Applications are configured with a consumer group to handle their topics

Rebalancing

The offset at which a new consumer in a group starts consuming is defined by the auto.offset.reset == AUTO_OFFSET_RESET_CONFIG property.

If the rebalance was triggered at a point when a previous consumer had already read but not yet committed some offset, the new consumer is likely to read it again.

The primary purpose of the Group Coordinator is to evenly ablance available consumers to partitions.

  • If possible, it will assign a 1:1 consumer/partition ratio.
  • If there are more consumers than partitions, it will let the extra consumers idle, leading to over-provisioning
    • Apparently (to be confirmed), even if there are more partitions than consumers, it will not share a partition across multiple consumers
  • If a new partition becomes available, or a consumer fails, or is added, the Group Coordinator initiates the rebalancing protocol, engaging each Consumer coordinator

Consumer configuration: performance and efficiency

  • fetch.min.bytes : minimum number of bytes returned from the poll. Analogous to the batch.size settings on the producer
  • max.fetch.wait.ms maximum time to wait if that threshold has not yet been met. Analogous to the linger.ms setting on the producer
  • max.partition.fetch.bytes: maximum number of bytes per partition returned in a poll() call, to avoid spending too much time on processing before calling poll() again.
  • max.poll.records similar but in terms of records, not bytes, and not per partition

Advanced topics not covered

  • Consumer position control
    • seek(TopicPartition, long): specify the offset from which to read on a given partition
    • seekToBeginning(Collection<TopicPartition>): seek from the beginning of a group of topics and partitions
    • seekToEnd(Collection<TopicPartition>): its opposite
  • Flow control allows pausing/resuming subscription to some topics or partitions
    • pause(TopicPartition)
    • resume(TopicPartition)
    • mostly useful when a single consumer has to read multiple topics and partitions
  • Rebalance listeners: get notified when rebalances occur in the cluster, to adjust in consumer code how to handle the offsets.