06 Consuming messages.md 3.4 KB

06 Consuming messages with Kafka Consumers and Consumer Groups

Very similar to producer.

Creating a consumer

(Un)Subscribing

  • A single consumer can subscribe to any number of topics
  • subscribe takes either a list of strings or a regex string,
    • each call replaces the previous subscription list
  • They can also unsubscribe all topics at once.

consumer.subscribe()

  • automatic/dynamic partition assignment
    • that single consumer listens to every partition on every listed topic
    • entirely managed by the KafkaConsumer
      • will get newly added partitions to subscribed topics (SubscriptionState)
  • now needs a valid group.id == ConsumerConfig.GROUP_ID_CONFIG

consumer.assign()

  • used to subscribe to explicit partitions
    • one or more partitions, regardless of topic
    • manual, self-administering mode
  • more of an advanced case
  • does not needs a group.id
  • blocks instead of erroring if one of the partitions does not exist, in updateFetchPositions.updateFetchPositions() (see poll loop below).

The poll loop

  • primary function of the KafkaConsumer
  • continuously poll the brokers for data
  • single API for handling all Consumer <-> Broker interactions
    • a lot of interactions beyond message retrieval
  • when consumer.assign() or .subscribe() is called, the contents of the topics and partitions collections in metadata are used to set fields within the SubscriptionState consumer.subscriptions field
    • this value is the source of truth for everything the consumer is assigned to
    • most consuming operations interact with it and the ConsumerCoordinator consumer.coordinator field
  • when consumer.poll() is invoked, it uses properties (starting with the bootstrap.servers) to request metadata about the cluster.
  • the Fetcher<K, V> consumer.fetcher performs several fetch-related operations between consumer and cluster
    • it delegates actual communication to the ConsumerNetworkClient consumer.client
    • that consumer, in addition to the actual requests, performs the heartbeats informing the cluster of the client health
    • the fetch also requests (via the CNC) the cluster metadata, initially then periodically afterwards
    • it obtains information about what topics/partitions are available from the SubscriptionState
  • the ConsumerCoordinator consumer.coordinator uses the metadata to coordinate the consumer.
    • handles automatic/dynamic partition reassignment by notifying them to the SubscriptionState
    • committing offsets to the cluster so the cluster is aware of the state of subscriptions, for each topic and partition.
  • the timeout applies to the time the CNC spends polling the cluster for messages to return. It does not apply to the initial setup operations (hence poll remaining stuck on nonexistent partitions during the initial update)
    • it is a minimum amount of time the cycle will take, not a maximum
  • when the timeout expires, a batch of records are returned
    • they are parsed, deserialized, and stored by topic and partition internally in the Fetcher
    • once this is done, the fetcher returns this result for application processing.