Browse Source

06 Consumer: offsets, commits, coordinator.

Frederic G. MARAND 3 years ago
parent
commit
2fb9734057

+ 15 - 1
.editorconfig

@@ -1,5 +1,19 @@
 root = true
 
+[*]
+charset                  = utf-8
+end_of_line              = LF
+indent_size              = 2
+indent_style             = space
+insert_final_newline     = true
+trim_trailing_whitespace = true
+
+[{Makefile, *.mak}]
+indent_style = tab
+
+[*.go]
+indent_size  = 4
+indent_style = tab
+
 [*.md]
 indent_size = 4
-indent_style = space

+ 1 - 1
.idea/runConfigurations/KafkaAssignApp.xml

@@ -1,7 +1,7 @@
 <component name="ProjectRunConfigurationManager">
   <configuration default="false" name="KafkaAssignApp" type="Application" factoryName="Application">
     <option name="MAIN_CLASS_NAME" value="KafkaAssignApp" />
-    <module name="GettingStartedWithKafka" />
+    <module name="kafkasamples" />
     <method v="2">
       <option name="Make" enabled="true" />
     </method>

+ 2 - 2
.idea/runConfigurations/KafkaConsmumerApp.xml → .idea/runConfigurations/KafkaConsumerApp.xml

@@ -1,7 +1,7 @@
 <component name="ProjectRunConfigurationManager">
-  <configuration default="false" name="KafkaConsmumerApp" type="Application" factoryName="Application">
+  <configuration default="false" name="KafkaConsumerApp" type="Application" factoryName="Application" nameIsGenerated="true">
     <option name="MAIN_CLASS_NAME" value="KafkaConsumerApp" />
-    <module name="GettingStartedWithKafka" />
+    <module name="kafkasamples" />
     <method v="2">
       <option name="Make" enabled="true" />
     </method>

+ 2 - 1
.idea/runConfigurations/KafkaProducerApp.xml

@@ -1,7 +1,8 @@
 <component name="ProjectRunConfigurationManager">
   <configuration default="false" name="KafkaProducerApp" type="Application" factoryName="Application">
+    <option name="ALTERNATIVE_JRE_PATH" value="1.8" />
     <option name="MAIN_CLASS_NAME" value="KafkaProducerApp" />
-    <module name="GettingStartedWithKafka" />
+    <module name="kafkasamples" />
     <RunnerSettings RunnerId="Run" />
     <ConfigurationWrapper RunnerId="Run" />
     <method v="2">

+ 110 - 24
docs/06 Consuming messages.md

@@ -35,43 +35,129 @@ Very similar to producer.
     - manual, self-administering mode
 - more of an advanced case
 - does not needs a `group.id`
-- blocks instead of erroring if one of the partitions does not exist, in 
+- blocks instead of erroring if one of the partitions does not exist, in
   `updateFetchPositions.updateFetchPositions()` (see poll loop below).
 
-
 ## The poll loop
 
 - primary function of the `KafkaConsumer`
 - continuously poll the brokers for data
 - single API for handling all Consumer <-> Broker interactions
     - a lot of interactions beyond message retrieval
-- when `consumer.assign()` or `.subscribe()` is called, the contents of the topics
-  and partitions collections in metadata are used to set fields within the 
+- when `consumer.assign()` or `.subscribe()` is called, the contents of the topics and partitions collections in
+  metadata are used to set fields within the
   `SubscriptionState consumer.subscriptions` field
-  - this value is the source of truth for everything the consumer is assigned to
-  - most consuming operations interact with it and the `ConsumerCoordinator consumer.coordinator` field  
-- when `consumer.poll()` is invoked, it uses properties (starting with the `bootstrap.servers`) to
-  request metadata about the cluster.
-- the `Fetcher<K, V> consumer.fetcher` performs several fetch-related operations
-  between consumer and cluster  
+    - this value is the source of truth for everything the consumer is assigned to
+    - most consuming operations interact with it and the `ConsumerCoordinator consumer.coordinator` field
+- when `consumer.poll()` is invoked, it uses properties (starting with the `bootstrap.servers`) to request metadata
+  about the cluster.
+- the `Fetcher<K, V> consumer.fetcher` performs several fetch-related operations between consumer and cluster
     - it delegates actual communication to the `ConsumerNetworkClient consumer.client`
-    - that consumer, in addition to the actual requests, performs the heartbeats
-      informing the cluster of the client health
-    - the fetch also requests (via the CNC) the cluster metadata, initially then
-      periodically afterwards
+    - that consumer, in addition to the actual requests, performs the heartbeats informing the cluster of the client
+      health
+    - the fetch also requests (via the CNC) the cluster metadata, initially then periodically afterwards
     - it obtains information about what topics/partitions are available from the
       `SubscriptionState`
-- the `ConsumerCoordinator consumer.coordinator` uses the metadata to coordinate
-  the consumer.
+- the `ConsumerCoordinator consumer.coordinator` uses the metadata to coordinate the consumer.
     - handles automatic/dynamic partition reassignment by notifying them to the SubscriptionState
-    - committing offsets to the cluster so the cluster is aware of the state of subscriptions,
-      for each topic and partition.
-- the timeout applies to the time the CNC spends polling the cluster for messages
-  to return. It does not apply to the initial setup operations (hence poll remaining
-  stuck on nonexistent partitions during the initial update)
+    - committing offsets to the cluster so the cluster is aware of the state of subscriptions, for each topic and
+      partition.
+- the timeout applies to the time the CNC spends polling the cluster for messages to return. It does not apply to the
+  initial setup operations (hence poll remaining stuck on nonexistent partitions during the initial update)
     - it is a minimum amount of time the cycle will take, not a maximum
 - when the timeout expires, a batch of records are returned
-    - they are parsed, deserialized, and stored by topic and partition internally
-      in the Fetcher
+    - they are parsed, deserialized, and stored by topic and partition internally in the Fetcher
     - once this is done, the fetcher returns this result for application processing.
-    
+
+The `poll()` process is a single-threaded operation:
+
+- 1 poll loop per-consumer
+- 1 thread per-consumer
+
+This is for internal simplicity and to force parallel consumption to be performed otherwise.
+
+## Processing messages
+
+The `poll()` method returns a `ConsumerRecords`, a collection of `ConsumerRecord`:
+
+```java
+public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
+    /* ... */
+}
+```
+
+- Since the `poll()` runs in a single thread, there is nothing running while the records are being processed
+- The more topics and partitions a single process subscribes to, the more it has to do within that single polling loop,
+  which can make the consumer slow
+
+## Offsets and positions
+
+Just because something is _read_ does not mean it is _committed_:
+
+
+- There are different categories of offsets, representing the stage they're in:
+    - a consumer needs to know what it has vs has not read
+    - what it confirms it has read (and processed) is the _last committed offset_
+    - this is whence consuming starts for a given partition, subject to offset reset
+- Each partition is exclusive with regard to consumer offsets: for any topic, a
+  consumer is tracking one offset per partition it is subscribed to
+- Consumers read at the _current position_ towards the _log-end offset_.
+- Offsets from the last committed to the current position are _uncommitted_.
+- Two **optional** offset commit properties control these offsets:
+    - `enable.auto.commit` lets Kafka decide when current position offsets are
+      upgraded to full committed offsets. Defaults to `true`.
+    - Since it has no knowledge of what work is actually performed, it can only
+      use a delay for that. That delay is the `auto.commit.interval`, in msec,
+      which defaults to 5000, which is a lot for high-throughput cases.
+
+<blockquote>
+The extent to which your system can be tolerant of eventual consistency
+is detemined by its reliability.
+</blockquote>
+
+By default, consumers start reading from a new partition at the `latest` committed offset.
+
+- Optional property:
+    - `auto.offset.reset` can be `earliest`, `latest` (default), or `none` which
+      throws an exception and lets code decide.
+
+Offset choice is different depending on whether the topology has a single consumer,
+or a ConsumerGroup.
+
+- Kafka stores offsets in a special topic called `__consumer_offsets`, with 50 partitions.
+- They are produced by the `ConsumerCoordinator`, which means a Consumer is also
+  a Producer, but for just that topic. Once they have been committed, the coordinator
+  updates the SubscriptionState accordingly, allowing the Fetch to know which
+  offsets it should be retrieving.
+
+## Manual committing
+
+There are two methods to commit:
+
+- `commitSync()`: used to achieve exact control of when the record is truly processed;
+  when one doesn't want to process newer records until the older ones are committed.
+    - It should be called after processing a **batch** of ConsumerRecord, not just a
+       single record, which increases latency to no avail
+    - It blocks until it receives a response from the cluster
+    - It may fail, meaning a recovery process must be started. It retries automatically
+      until it succeeds, or it receives an unrecoverable error.
+    - The `retry.backoff.ms` == `RETRY_BACKOFF_MS_CONFIG` (default: 100) property defines the retry delay, similarly
+      to the producer backoff (for good reason)
+    - Using this manual committing is a tradeoff between throughput/performance (less)
+      and control over consistency (more), as this adds some latency to the polling
+      process.
+- `commitAsync()`
+    - does not retry automatically, because it can't know whether the previous
+      commit succeeded or failed, which could lead to ordering and duplication issues
+    - instead, it accepts a callback, triggered on receiving the commit response
+      from the cluster.
+    - better throughput because it doesn't block the polling process.
+    - Don't use it without registering the callback and handling it as needed.
+
+## Deciding on a commit strategy
+
+- Consistency control:
+    - need to know when something is "done" (define "done" first)
+- Atomicity:
+    - ability to treat consumption and processing as a single atomic operation
+    - obtaining _exactly-once_ semantics instead of _at-least-once_

BIN
docs/images/ConsumerCommittingOffsets.png


BIN
docs/images/ConsumerOffsetAndPosition.png


+ 45 - 45
src/main/java/KafkaAssignApp.java

@@ -1,57 +1,57 @@
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Properties;
 
 public class KafkaAssignApp {
 
-    public static final String TOPIC = "my-topic";
-    public static final Integer TIMEOUT = 10;
-
-    public static void main(String[] args) {
-        Properties props = new Properties();
-        // We use "put" since there are strings. Otherwise, use setProperty.
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        // props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
-
-        KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
-
-        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
-        TopicPartition myTopicPart0 = new TopicPartition(TOPIC, 0);
-        TopicPartition myTopicPart1 = new TopicPartition(TOPIC, 1);
-        partitions.add(myTopicPart0);
-        // Will block poll if the partition does not exist.
-        partitions.add(myTopicPart1);
-        myConsumer.assign(partitions);
-
-        Boolean started = false;
-        Integer pass = 0;
-        try {
-            while (true) {
-                ConsumerRecords<String, String> records = myConsumer.poll(TIMEOUT);
-                if (!started) {
-                    started = true;
-                    System.out.printf("Started");
-                }
-                for (ConsumerRecord<String, String> cr : records) {
-                    System.out.printf("Pass: %d/%d Topic: %s Partition: %d, Offset: %d, Key: %s Value: %s\n",
-                            pass, records.count(),
-                            cr.key(), cr.partition(), cr.offset(), cr.key(), cr.value());
-                }
-                pass++;
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            myConsumer.close();
-        }
+  public static void main(String[] args) {
+    Properties props = new Properties();
+    // We use "put" since there are strings. Otherwise, use setProperty.
+    // These are the 3 required properties.
+    // 1. Cluster membership: partition leaders, etc
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093");
+
+    // 2. How keys and values are deserialized.
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER);
+
+    // 3. The consumer Group ID is NOT required for assign;
+    // props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
+
+    KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
+
+    ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
+    TopicPartition myTopicPart0 = new TopicPartition(KafkaCommon.TOPIC, 0);
+    TopicPartition myTopicPart1 = new TopicPartition(KafkaCommon.TOPIC, 1);
+    partitions.add(myTopicPart0);
+    // Will block poll if the partition does not exist.
+    partitions.add(myTopicPart1);
+
+    /* Don't let the consumer handle the partitions */
+    myConsumer.assign(partitions);
+
+    try {
+      while (true) {
+        ConsumerRecords<String, String> records = myConsumer.poll(KafkaConsumerCommon.TIMEOUT);
+        KafkaConsumerCommon.process(records);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      myConsumer.close();
     }
+  }
+
+  private static void assignExamples(KafkaConsumer<String, String> c) {
+    TopicPartition partition0 = new TopicPartition(KafkaCommon.TOPIC, 0);
+    ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
+    partitions.add(partition0);
+
+    c.assign(partitions); // NOT incremental !
+  }
+
 }

+ 3 - 0
src/main/java/KafkaCommon.java

@@ -0,0 +1,3 @@
+public class KafkaCommon {
+  public static final String TOPIC = "my-topic";
+}

+ 56 - 83
src/main/java/KafkaConsumerApp.java

@@ -1,9 +1,6 @@
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -11,85 +8,61 @@ import java.util.Properties;
 
 public class KafkaConsumerApp {
 
-    public static final String STRING_DESERIALIZER = StringDeserializer.class.getName();
-    public static final String TOPIC = "my-topic";
-    public static final Integer TIMEOUT = 200;
-
-    public static void main(String[] args) {
-        Properties props = new Properties();
-        // We use "put" since there are strings. Otherwise, use setProperty.
-        // These are the 3 required properties.
-        // 1. Cluster membership: partition leaders, etc
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093");
-
-        // 2. How keys and values are deserialized.
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
-        // 3. The consumer Group ID is now required.
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
-
-        KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
-        myConsumer.subscribe(Arrays.asList(TOPIC));
-
-        Boolean started = false;
-        Integer pass = 0;
-        try {
-            while (true) {
-                ConsumerRecords<String, String> records = myConsumer.poll(TIMEOUT);
-                if (!started) {
-                    started = true;
-                    System.out.printf("Started");
-                }
-//                for (ConsumerRecord<String, String> cr : records.records(TOPIC)) {
-//                    System.out.printf("\t\tKey: %s Value: %s\n", cr.key(), cr.value());
-//                }
-                for (ConsumerRecord<String, String> cr : records) {
-                    System.out.printf("Pass: %d/%d Topic: %s Partition: %d, Offset: %d, Key: %s Value: %s\n",
-                            pass, records.count(),
-                            cr.key(), cr.partition(), cr.offset(), cr.key(), cr.value());
-                }
-                pass++;
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            myConsumer.close();
-        }
-    }
-
-    private static void assignDemo(KafkaConsumer<String, String> c) {
-        TopicPartition partition0 = new TopicPartition(TOPIC, 0);
-        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
-        partitions.add(partition0);
-
-        c.assign(partitions); // NOT incremental !
-    }
-
-    private static void subscribeExamples(KafkaConsumer<String, String> c) {
-        // In tutorial but does not compile: needs a rebalance callback
-        // c.subscribe("my-*");
-        // Maybe that way ?
-        // c.subscribe(Pattern.compile("my-[\\w]+"), null);
-
-        // Typical initial subscription.
-        c.subscribe(Arrays.asList(TOPIC));
-
-        // Replaces the current subscription set, does not extend it
-        c.subscribe(Arrays.asList("another-topic"));
-
-        // Better for incremental cases.
-        ArrayList<String> topics = new ArrayList<String>();
-        topics.add(TOPIC);
-        topics.add("my-other-topic");
-        topics.add("yetAnotherTopic");
-        c.subscribe(topics);
-
-        // Unsubcribe all topics.
-        c.unsubscribe();
-
-        // Alternative
-        topics.clear();
-        c.subscribe(topics);
+  public static void main(String[] args) {
+    Properties props = new Properties();
+    // We use "put" since there are strings. Otherwise, use setProperty.
+    // These are the 3 required properties.
+    // 1. Cluster membership: partition leaders, etc
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093");
+
+    // 2. How keys and values are deserialized.
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER);
+
+    // 3. The consumer Group ID is now required.
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
+
+    KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
+
+    /* Let the consumer handle the partitions */
+    myConsumer.subscribe(Arrays.asList(KafkaCommon.TOPIC));
+
+    try {
+      while (true) {
+        ConsumerRecords<String, String> records = myConsumer.poll(KafkaConsumerCommon.TIMEOUT);
+        KafkaConsumerCommon.process(records);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      myConsumer.close();
     }
+  }
+
+  private static void subscribeExamples(KafkaConsumer<String, String> c) {
+    // In tutorial but does not compile: needs a rebalance callback
+    // c.subscribe("my-*");
+    // Maybe that way ?
+    // c.subscribe(Pattern.compile("my-[\\w]+"), null);
+
+    // Typical initial subscription.
+    c.subscribe(Arrays.asList(KafkaCommon.TOPIC));
+
+    // Replaces the current subscription set, does not extend it
+    c.subscribe(Arrays.asList("another-topic"));
+
+    // Better for incremental cases.
+    ArrayList<String> topics = new ArrayList<String>();
+    topics.add(KafkaCommon.TOPIC);
+    topics.add("my-other-topic");
+    topics.add("yetAnotherTopic");
+    c.subscribe(topics);
+
+    // Unsubcribe all topics.
+    c.unsubscribe();
+
+    // Alternative
+    topics.clear();
+    c.subscribe(topics);
+  }
 }

+ 26 - 0
src/main/java/KafkaConsumerCommon.java

@@ -0,0 +1,26 @@
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+public class KafkaConsumerCommon {
+  public static final String STRING_DESERIALIZER = StringDeserializer.class.getName();
+  public static final Integer TIMEOUT = 200;
+  protected static Boolean started = false;
+  protected static Integer pass = 0;
+
+  static void process(ConsumerRecords<String, String> records) {
+    if (!started) {
+      started = true;
+      System.out.printf("Started");
+    }
+//                for (ConsumerRecord<String, String> cr : records.records(TOPIC)) {
+//                    System.out.printf("\t\tKey: %s Value: %s\n", cr.key(), cr.value());
+//                }
+    for (ConsumerRecord<String, String> cr : records) {
+      System.out.printf("Pass: %d/%d Topic: %s Partition: %d, Offset: %d, Key: %s Value: %s\n",
+        pass, records.count(),
+        cr.key(), cr.partition(), cr.offset(), cr.key(), cr.value());
+    }
+    pass++;
+  }
+}