Browse Source

06 Consumer: groups. JAR building.

Frederic G. MARAND 3 years ago
parent
commit
4398ec1cc7

+ 3 - 0
.editorconfig

@@ -17,3 +17,6 @@ indent_style = tab
 
 [*.md]
 indent_size = 4
+
+[MANIFEST.MF]
+end_of_line = lf

+ 1 - 0
.gitignore

@@ -1,2 +1,3 @@
 target/*
 /kafka*
+out/*

+ 15 - 0
.idea/artifacts/assign_jar.xml

@@ -0,0 +1,15 @@
+<component name="ArtifactManager">
+  <artifact type="jar" build-on-make="true" name="assign:jar">
+    <output-path>$PROJECT_DIR$/out/artifacts</output-path>
+    <root id="archive" name="assign.jar">
+      <element id="directory" name="META-INF">
+        <element id="file-copy" path="$PROJECT_DIR$/src/main/java/assign/META-INF/MANIFEST.MF" />
+      </element>
+      <element id="module-output" name="GettingStartedWithKafka" />
+      <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.2.6/snappy-java-1.1.2.6.jar" path-in-jar="/" />
+      <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.21/slf4j-api-1.7.21.jar" path-in-jar="/" />
+      <element id="extracted-dir" path="$MAVEN_REPOSITORY$/net/jpountz/lz4/lz4/1.3.0/lz4-1.3.0.jar" path-in-jar="/" />
+      <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/0.10.0.1/kafka-clients-0.10.0.1.jar" path-in-jar="/" />
+    </root>
+  </artifact>
+</component>

+ 15 - 0
.idea/artifacts/consumer_jar.xml

@@ -0,0 +1,15 @@
+<component name="ArtifactManager">
+  <artifact type="jar" build-on-make="true" name="consumer:jar">
+    <output-path>$PROJECT_DIR$/out/artifacts</output-path>
+    <root id="archive" name="consumer.jar">
+      <element id="directory" name="META-INF">
+        <element id="file-copy" path="$PROJECT_DIR$/src/main/java/consumer/META-INF/MANIFEST.MF" />
+      </element>
+      <element id="module-output" name="GettingStartedWithKafka" />
+      <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.2.6/snappy-java-1.1.2.6.jar" path-in-jar="/" />
+      <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.21/slf4j-api-1.7.21.jar" path-in-jar="/" />
+      <element id="extracted-dir" path="$MAVEN_REPOSITORY$/net/jpountz/lz4/lz4/1.3.0/lz4-1.3.0.jar" path-in-jar="/" />
+      <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/0.10.0.1/kafka-clients-0.10.0.1.jar" path-in-jar="/" />
+    </root>
+  </artifact>
+</component>

+ 15 - 0
.idea/artifacts/producer_jar.xml

@@ -0,0 +1,15 @@
+<component name="ArtifactManager">
+  <artifact type="jar" build-on-make="true" name="producer:jar">
+    <output-path>$PROJECT_DIR$/out/artifacts</output-path>
+    <root id="archive" name="producer.jar">
+      <element id="directory" name="META-INF">
+        <element id="file-copy" path="$PROJECT_DIR$/src/main/java/producer/META-INF/MANIFEST.MF" />
+      </element>
+      <element id="module-output" name="GettingStartedWithKafka" />
+      <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.2.6/snappy-java-1.1.2.6.jar" path-in-jar="/" />
+      <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.21/slf4j-api-1.7.21.jar" path-in-jar="/" />
+      <element id="extracted-dir" path="$MAVEN_REPOSITORY$/net/jpountz/lz4/lz4/1.3.0/lz4-1.3.0.jar" path-in-jar="/" />
+      <element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/0.10.0.1/kafka-clients-0.10.0.1.jar" path-in-jar="/" />
+    </root>
+  </artifact>
+</component>

+ 1 - 1
.idea/runConfigurations/KafkaAssignApp.xml

@@ -1,7 +1,7 @@
 <component name="ProjectRunConfigurationManager">
   <configuration default="false" name="KafkaAssignApp" type="Application" factoryName="Application">
     <option name="MAIN_CLASS_NAME" value="KafkaAssignApp" />
-    <module name="kafkasamples" />
+    <module name="GettingStartedWithKafka" />
     <method v="2">
       <option name="Make" enabled="true" />
     </method>

+ 2 - 2
.idea/runConfigurations/KafkaConsumerApp.xml → .idea/runConfigurations/KafkaConsumerApp_1.xml

@@ -1,7 +1,7 @@
 <component name="ProjectRunConfigurationManager">
-  <configuration default="false" name="KafkaConsumerApp" type="Application" factoryName="Application" nameIsGenerated="true">
+  <configuration default="false" name="KafkaConsumerApp 1" type="Application" factoryName="Application">
     <option name="MAIN_CLASS_NAME" value="KafkaConsumerApp" />
-    <module name="kafkasamples" />
+    <module name="GettingStartedWithKafka" />
     <method v="2">
       <option name="Make" enabled="true" />
     </method>

+ 9 - 0
.idea/runConfigurations/KafkaConsumerApp_2.xml

@@ -0,0 +1,9 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="KafkaConsumerApp 2" type="Application" factoryName="Application">
+    <option name="MAIN_CLASS_NAME" value="KafkaConsumerApp" />
+    <module name="GettingStartedWithKafka" />
+    <method v="2">
+      <option name="Make" enabled="true" />
+    </method>
+  </configuration>
+</component>

+ 2 - 2
.idea/runConfigurations/KafkaProducerApp.xml

@@ -1,8 +1,8 @@
 <component name="ProjectRunConfigurationManager">
   <configuration default="false" name="KafkaProducerApp" type="Application" factoryName="Application">
     <option name="ALTERNATIVE_JRE_PATH" value="1.8" />
-    <option name="MAIN_CLASS_NAME" value="KafkaProducerApp" />
-    <module name="kafkasamples" />
+    <option name="MAIN_CLASS_NAME" value="producer.KafkaProducerApp" />
+    <module name="GettingStartedWithKafka" />
     <RunnerSettings RunnerId="Run" />
     <ConfigurationWrapper RunnerId="Run" />
     <method v="2">

+ 7 - 7
docs/05 Producing messages.md

@@ -6,15 +6,15 @@
 
 ## Creating a producer
 
-- Properties: 
+- Properties:
     - `bootstrap.servers`
     - `key.serializer`
     - `value.serializer`
     - and others
     - => `KafkaProducer`
     - [Producer config doc](http://kafka.apache.org/documentation.html#producerconfigs)
-- See [KafkaProducerApp](../src/main/java/KafkaProducerApp.java)
-  
+- See [producer.KafkaProducerApp](../src/main/java/KafkaProducerApp.java)
+
 ## Creating messages
 
 - K calls messages "ProducerRecord"
@@ -27,7 +27,7 @@
         - Key
 - KP instances can only send PRs that match the key and value
   serializer types they are configured with.
-  
+
 ## Sending messages
 
 When the producer sends:
@@ -47,7 +47,7 @@ When the producer sends:
       a Murmur hash in DefaultPartitioner. See `DefaultPartitioner.partition`.
     - custom (defined in `PARTITIONER_CLASS_CONFIG == "partitioner.class"` property)
 - it pushes the message to an in-memory queue, the `RecordAccumulator`
-    - micro-batching: as scale, efficiency is everything. 
+    - micro-batching: as scale, efficiency is everything.
     - Use on producer, broker, and consumer.
     - Also used in the OS (page cache, Linux sendfile() syscall).
     - Amortizes the constant cost of sends
@@ -82,11 +82,11 @@ When the producer sends:
     - At-least once
     - At-most once
     - Exactly once
-    
+
 ## Advanced topics
 
 - Cusom serializers
 - Custom Partitioners
 - Asynchronous send
 - Compression
-- Advanced settings
+- Advanced settings

+ 65 - 2
docs/06 Consuming messages.md

@@ -94,7 +94,6 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
 
 Just because something is _read_ does not mean it is _committed_:
 
-
 - There are different categories of offsets, representing the stage they're in:
     - a consumer needs to know what it has vs has not read
     - what it confirms it has read (and processed) is the _last committed offset_
@@ -119,7 +118,7 @@ By default, consumers start reading from a new partition at the `latest` committ
 
 - Optional property:
     - `auto.offset.reset` can be `earliest`, `latest` (default), or `none` which
-      throws an exception and lets code decide.
+      throws an exception and lets code decide. See "rebalancing" below.
 
 Offset choice is different depending on whether the topology has a single consumer,
 or a ConsumerGroup.
@@ -161,3 +160,67 @@ There are two methods to commit:
 - Atomicity:
     - ability to treat consumption and processing as a single atomic operation
     - obtaining _exactly-once_ semantics instead of _at-least-once_
+
+## Scaling out consumers
+
+- Scaling a single-thread, single-consumer app to the bandwidth, number of topics
+  and partitions of a full K cluster is not realistic
+- The solution is to scale out consuming to more consumers, but they can't jusst
+  consumer anything without synchronizing in some way
+- This is the reason for Consumer groups: a collection of independent Consumer
+  working as a team, i.e. declaring the same `group.id`.
+    - This allows them to share the message consumption and processing load, with more parallelism.
+    - It allows more redundancy: failure or limitations of a given consumer are
+      automatically handled and balanced by K.
+    - It offers more performance, with the ability to support a large backlog
+- A Consumer group is create when individual consumers
+    - with a commmon `group.id`...
+    - invoke the `subscribe()` method...
+    - and pass a common topics list.
+- One of the brokers gets elected as the `GroupCoordinator` for that topic.
+    - Its job is to monitor and maintain group membership.
+    - It collaborates with the `ClusterCoordinator` and ZooKeeper to monitor and
+      assign partitions within a topic to individual consumers in the group.
+- As soon as a Consumer group is formed, each consumer is sending heartbeats,
+    - configured by properties:
+        - `heartbeat.interval.ms` == `HEARTBEAT_INTERVAL_MS_CONFIG` (default 3000 msec):
+          the interval between heartbeat sends
+        - `session.timeout.ms` == `SESSION_TIMEOUT_MS_CONFIG` (default 30000 msec)
+    - the CG coordinator relies on these heartbeats to evalue whether the consumer
+      is alive and able to participate in the group
+    - if the coordinator does not receive heartbeat during a "total time" (?)
+      larger than `session.timeout.ms`, it will consider the consumer failed and take
+      corrective action, following its priority: ensuring that the purpose of the
+      group (sharing the load of consuming those topics) is being met.
+    - these corrections are _consumer rebalance_ operations, which is complex
+        - remaining consumers now need to absorb the workload no longer handled
+          by the failed consumer
+        - they need to find up to where the failed consumer had worked (commit offset)
+          for all partitions, and catch up without creating duplicates.
+        - the ability to perform these rebalances is critical to cluster health.
+        - example 1: the failed consumer handled messages but could not commit
+          them; in that case the new consumer are likely to re-process them,
+          possibly introducing duplicates
+        - example 2: a new consumer joins the group
+        - example 3: a new partition is added to the topic
+- Applications are configured with a consumer group to handle their topics
+
+## Rebalancing
+
+The offset at which a new consumer in a group starts consuming is defined by
+the `auto.offset.reset` == `AUTO_OFFSET_RESET_CONFIG` property.
+
+If the rebalance was triggered at a point when a previous consumer had already
+read but not yet committed some offset, the new consumer is likely to read it again.
+
+The primary purpose of the Group Coordinator is to evenly ablance available consumers
+to partitions.
+
+- If possible, it will assign a 1:1 consumer/partition ratio.
+- If there are more consumers than partitions, it will let the extra consumers idle,
+  leading to over-provisioning
+  - Apparently (to be confirmed), even if there are more partitions than consumers,
+    it will not share a partition across multiple consumers
+- If a new partition becomes available, or a consumer fails, or is added,
+  the Group Coordinator initiates the rebalancing protocol, engaging each Consumer coordinator
+

BIN
docs/images/ConsumerGroups.png


+ 2 - 0
src/main/java/assign/META-INF/MANIFEST.MF

@@ -0,0 +1,2 @@
+Manifest-Version: 1.0
+Main-Class: KafkaAssignApp

+ 2 - 0
src/main/java/consumer/META-INF/MANIFEST.MF

@@ -0,0 +1,2 @@
+Manifest-Version: 1.0
+Main-Class: KafkaConsumerApp

+ 2 - 0
src/main/java/KafkaProducerApp.java → src/main/java/producer/KafkaProducerApp.java

@@ -1,3 +1,5 @@
+package producer;
+
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;

+ 2 - 0
src/main/java/producer/META-INF/MANIFEST.MF

@@ -0,0 +1,2 @@
+Manifest-Version: 1.0
+Main-Class: producer.KafkaProducerApp