Browse Source

05 Consumer: Subscribe/Assign.

Frederic G. MARAND 3 years ago
parent
commit
6a84b0f42f

+ 5 - 0
.editorconfig

@@ -0,0 +1,5 @@
+root = true
+
+[*.md]
+indent_size = 4
+indent_style = space

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+target/*
+/kafka*

+ 3 - 0
.idea/.gitignore

@@ -0,0 +1,3 @@
+# Default ignored files
+/shelf/
+/workspace.xml

+ 5 - 0
.idea/codeStyles/codeStyleConfig.xml

@@ -0,0 +1,5 @@
+<component name="ProjectCodeStyleConfiguration">
+  <state>
+    <option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
+  </state>
+</component>

+ 13 - 0
.idea/compiler.xml

@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="CompilerConfiguration">
+    <annotationProcessing>
+      <profile name="Maven default annotation processors profile" enabled="true">
+        <sourceOutputDir name="target/generated-sources/annotations" />
+        <sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
+        <outputRelativeToContentRoot value="true" />
+        <module name="GettingStartedWithKafka" />
+      </profile>
+    </annotationProcessing>
+  </component>
+</project>

+ 1 - 0
.idea/description.html

@@ -0,0 +1 @@
+<html>Simple <b>Java</b> application that includes a class with <code>main()</code> method</html>

+ 6 - 0
.idea/encodings.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="Encoding">
+    <file url="PROJECT" charset="UTF-8" />
+  </component>
+</project>

+ 25 - 0
.idea/jarRepositories.xml

@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="RemoteRepositoriesConfiguration">
+    <remote-repository>
+      <option name="id" value="central" />
+      <option name="name" value="Central Repository" />
+      <option name="url" value="https://repo.maven.apache.org/maven2" />
+    </remote-repository>
+    <remote-repository>
+      <option name="id" value="repo.jenkins-ci.org" />
+      <option name="name" value="repo.jenkins-ci.org" />
+      <option name="url" value="https://repo.jenkins-ci.org/public/" />
+    </remote-repository>
+    <remote-repository>
+      <option name="id" value="central" />
+      <option name="name" value="Maven Central repository" />
+      <option name="url" value="https://repo1.maven.org/maven2" />
+    </remote-repository>
+    <remote-repository>
+      <option name="id" value="jboss.community" />
+      <option name="name" value="JBoss Community repository" />
+      <option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
+    </remote-repository>
+  </component>
+</project>

+ 14 - 0
.idea/misc.xml

@@ -0,0 +1,14 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ExternalStorageConfigurationManager" enabled="true" />
+  <component name="MavenProjectsManager">
+    <option name="originalFiles">
+      <list>
+        <option value="$PROJECT_DIR$/pom.xml" />
+      </list>
+    </option>
+  </component>
+  <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8" project-jdk-type="JavaSDK">
+    <output url="file://$PROJECT_DIR$/out" />
+  </component>
+</project>

+ 9 - 0
.idea/runConfigurations/KafkaAssignApp.xml

@@ -0,0 +1,9 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="KafkaAssignApp" type="Application" factoryName="Application">
+    <option name="MAIN_CLASS_NAME" value="KafkaAssignApp" />
+    <module name="GettingStartedWithKafka" />
+    <method v="2">
+      <option name="Make" enabled="true" />
+    </method>
+  </configuration>
+</component>

+ 9 - 0
.idea/runConfigurations/KafkaConsmumerApp.xml

@@ -0,0 +1,9 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="KafkaConsmumerApp" type="Application" factoryName="Application">
+    <option name="MAIN_CLASS_NAME" value="KafkaConsumerApp" />
+    <module name="GettingStartedWithKafka" />
+    <method v="2">
+      <option name="Make" enabled="true" />
+    </method>
+  </configuration>
+</component>

+ 11 - 0
.idea/runConfigurations/KafkaProducerApp.xml

@@ -0,0 +1,11 @@
+<component name="ProjectRunConfigurationManager">
+  <configuration default="false" name="KafkaProducerApp" type="Application" factoryName="Application">
+    <option name="MAIN_CLASS_NAME" value="KafkaProducerApp" />
+    <module name="GettingStartedWithKafka" />
+    <RunnerSettings RunnerId="Run" />
+    <ConfigurationWrapper RunnerId="Run" />
+    <method v="2">
+      <option name="Make" enabled="true" />
+    </method>
+  </configuration>
+</component>

+ 124 - 0
.idea/uiDesigner.xml

@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="Palette2">
+    <group name="Swing">
+      <item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.png" removable="false" auto-create-binding="false" can-attach-label="false">
+        <default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
+      </item>
+      <item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.png" removable="false" auto-create-binding="false" can-attach-label="false">
+        <default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
+      </item>
+      <item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.png" removable="false" auto-create-binding="false" can-attach-label="false">
+        <default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
+      </item>
+      <item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.png" removable="false" auto-create-binding="false" can-attach-label="true">
+        <default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
+      </item>
+      <item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.png" removable="false" auto-create-binding="true" can-attach-label="false">
+        <default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
+        <initial-values>
+          <property name="text" value="Button" />
+        </initial-values>
+      </item>
+      <item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.png" removable="false" auto-create-binding="true" can-attach-label="false">
+        <default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
+        <initial-values>
+          <property name="text" value="RadioButton" />
+        </initial-values>
+      </item>
+      <item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.png" removable="false" auto-create-binding="true" can-attach-label="false">
+        <default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
+        <initial-values>
+          <property name="text" value="CheckBox" />
+        </initial-values>
+      </item>
+      <item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.png" removable="false" auto-create-binding="false" can-attach-label="false">
+        <default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
+        <initial-values>
+          <property name="text" value="Label" />
+        </initial-values>
+      </item>
+      <item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.png" removable="false" auto-create-binding="true" can-attach-label="true">
+        <default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
+          <preferred-size width="150" height="-1" />
+        </default-constraints>
+      </item>
+      <item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.png" removable="false" auto-create-binding="true" can-attach-label="true">
+        <default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
+          <preferred-size width="150" height="-1" />
+        </default-constraints>
+      </item>
+      <item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.png" removable="false" auto-create-binding="true" can-attach-label="true">
+        <default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
+          <preferred-size width="150" height="-1" />
+        </default-constraints>
+      </item>
+      <item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.png" removable="false" auto-create-binding="true" can-attach-label="true">
+        <default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
+          <preferred-size width="150" height="50" />
+        </default-constraints>
+      </item>
+      <item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.png" removable="false" auto-create-binding="true" can-attach-label="true">
+        <default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
+          <preferred-size width="150" height="50" />
+        </default-constraints>
+      </item>
+      <item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.png" removable="false" auto-create-binding="true" can-attach-label="true">
+        <default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
+          <preferred-size width="150" height="50" />
+        </default-constraints>
+      </item>
+      <item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.png" removable="false" auto-create-binding="true" can-attach-label="true">
+        <default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
+      </item>
+      <item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.png" removable="false" auto-create-binding="true" can-attach-label="false">
+        <default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
+          <preferred-size width="150" height="50" />
+        </default-constraints>
+      </item>
+      <item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.png" removable="false" auto-create-binding="true" can-attach-label="false">
+        <default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
+          <preferred-size width="150" height="50" />
+        </default-constraints>
+      </item>
+      <item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.png" removable="false" auto-create-binding="true" can-attach-label="false">
+        <default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
+          <preferred-size width="150" height="50" />
+        </default-constraints>
+      </item>
+      <item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.png" removable="false" auto-create-binding="true" can-attach-label="false">
+        <default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
+          <preferred-size width="200" height="200" />
+        </default-constraints>
+      </item>
+      <item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.png" removable="false" auto-create-binding="false" can-attach-label="false">
+        <default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
+          <preferred-size width="200" height="200" />
+        </default-constraints>
+      </item>
+      <item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.png" removable="false" auto-create-binding="true" can-attach-label="true">
+        <default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
+      </item>
+      <item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.png" removable="false" auto-create-binding="true" can-attach-label="false">
+        <default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
+      </item>
+      <item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.png" removable="false" auto-create-binding="false" can-attach-label="false">
+        <default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
+      </item>
+      <item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.png" removable="false" auto-create-binding="true" can-attach-label="false">
+        <default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
+      </item>
+      <item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.png" removable="false" auto-create-binding="false" can-attach-label="false">
+        <default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
+          <preferred-size width="-1" height="20" />
+        </default-constraints>
+      </item>
+      <item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.png" removable="false" auto-create-binding="false" can-attach-label="false">
+        <default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
+      </item>
+      <item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.png" removable="false" auto-create-binding="true" can-attach-label="false">
+        <default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
+      </item>
+    </group>
+  </component>
+</project>

+ 6 - 0
.idea/vcs.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+  </component>
+</project>

+ 2 - 0
GettingStartedWithKafka.iml

@@ -0,0 +1,2 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="JAVA_MODULE" version="4" />

File diff suppressed because it is too large
+ 36 - 0
docs/04 Understanding Topics, Partitions and Brokers.md


+ 92 - 0
docs/05 Producing messages with Kafka producers.md

@@ -0,0 +1,92 @@
+# 05 Producing messages with Kafka producers
+
+- 2 types centraux:
+    - `KafkaProducer`
+    - `ProducerRecord`
+
+## Creating a producer
+
+- Properties: 
+    - `bootstrap.servers`
+    - `key.serializer`
+    - `value.serializer`
+    - and others
+    - => `KafkaProducer`
+    - [Producer config doc](http://kafka.apache.org/documentation.html#producerconfigs)
+- See [KafkaProducerApp](../src/main/java/KafkaProducerApp.java)
+  
+## Creating messages
+
+- K calls messages "ProducerRecord"
+    - 2 requires properties:
+        - Topic
+        - Value
+    - Additional properties:
+        - Partition
+        - Timestamp
+        - Key
+- KP instances can only send PRs that match the key and value
+  serializer types they are configured with.
+  
+## Sending messages
+
+When the producer sends:
+
+- it connects to the cluster using the boostrap.servers to discover the cluster
+  membership, returned as "metadata"
+    - topics
+    - partitions
+    - brokers handling them
+- it creates a Metadata object within itself, which it will refresh in the background
+- it passes the message through the serializer(s) (K, V)
+- it passes it to the partitioner, which will choose a partition based on the message
+  and the cluster information in Metadata. Strategies:
+    - direct (is record contains a valid (see cluster) partition ID)
+    - round-robin (in `DefaultPartitioner` class), if it has no key
+    - key mod-hash (if it has a key but no customer partitioner),
+      a Murmur hash in DefaultPartitioner. See `DefaultPartitioner.partition`.
+    - custom (defined in `PARTITIONER_CLASS_CONFIG == "partitioner.class"` property)
+- it pushes the message to an in-memory queue, the `RecordAccumulator`
+    - micro-batching: as scale, efficiency is everything. 
+    - Use on producer, broker, and consumer.
+    - Also used in the OS (page cache, Linux sendfile() syscall).
+    - Amortizes the constant cost of sends
+    - Collection of `RecordBatch` objects, one by partition.
+    - Choice of batch size is complex, influence by advanced configuration settings.
+        - The size of `RecordBatch` instances used by Message buffering is defined by `batch.size` in bytes, not number of messages
+        - Their number is determined by how many fit into `buffer.memory`
+- When more is needed, the `max.block.ms` setting determines how long the
+  producer `send` method will be blocked, forcing back-pressure on the producer
+    - The goal is to leave time for the messages to be transmitted, freeing up buffer memory
+- When a `RecordBatch` gets full, it is sent immediately
+    - When none is full, `linger.ms` defines how long a not-full buffer can wait before being sent
+- When a batch is sent, the broker returns `RecordMetadata`, providing success/failure info
+
+## Delivery guarantees
+
+- Producer can define what level of acknowledgment it expects from the brokers, with setting `acks`
+    - 0: fire and forget. By far the fastest but riskiest, as the broker may not even log any issue
+    - 1: leader acknowledged. Mid-ground.
+    - 2: replication quorum acknowleldged. Slowest, safest. May be unpredictable due to
+      topology changes on broker failures
+- Broker errors can be configured too:
+    - `retries` defines how many times the producer will retry to send a failed message
+    - `retry.backoff.ms` is a fixed delay between retries
+- Ordering guarantees:
+    - Only preserved within a given partition
+    - No global order across partitions of a topic
+    - Can get complicated with errors, especially with retries
+    - Can be alleviated entirely by setting `max.in.flight.request.per.connection` to 1,
+      but that has a high cost.
+- Delivery semantics result from these settings:
+    - At-least once
+    - At-most once
+    - Exactly once
+    
+## Advanced topics
+
+- Cusom serializers
+- Custom Partitioners
+- Asynchronous send
+- Compression
+- Advanced settings

+ 48 - 0
docs/06 Consuming messages with Kafka Consumers and Consumer groups

@@ -0,0 +1,48 @@
+# 06 Consuming messages with Kafka Consumers and Consumer Groups
+
+Very similar to producer.
+
+## Creating a consumer
+
+- Properties:
+    - `bootstrap.servers`
+    - `key.serializer`
+    - `value.serializer`
+    - and others
+    - => `KafkaProducer`
+    - [Consumer config doc](http://kafka.apache.org/documentation.html#consumerconfigs)
+- See [KafkaConsumerApp](../src/main/java/KafkaConsumerApp.java)
+
+## (Un)Subscribing
+
+- A single consumer can subscribe to any number of topics
+- subscribe takes either a list of strings or a regex string,
+    - each call replaces the previous subscription list
+- They can also unsubscribe all topics at once.
+
+## Subscribing/assigning
+### consumer.subscribe()
+
+- automatic/dynamic partition assignment
+    - that single consumer listens to every partition on every listed topic
+    - entirely managed by the `KafkaConsumer`
+        - will get newly added partitions to subscribed topics (`SubscriptionState`)
+- now needs a valid `group.id` == `ConsumerConfig.GROUP_ID_CONFIG`
+
+### consumer.assign()
+
+- used to subscribe to explicit partitions
+    - one or more partitions, regardless of topic
+    - manual, self-administering mode
+- more of an advanced case
+- does not needs a `group.id`
+- blocks instead of erroring if one of the partitions does not exist, in 
+  `updateFetchPositions.updateFetchPositions()`.
+
+## The poll loop
+
+- primary function of the `KafkaConsumer`
+- continuously poll the brokers for data
+- single API for handling all Consumer <-> Broker interactions
+    - a lot of interactions beyond message retrieval
+    

BIN
docs/images/ConsumerGlobal.png


BIN
docs/images/ProducerExplicitPartition.png


BIN
docs/images/ProducerInternals.png


BIN
docs/images/ProducerSendGlobal.png


+ 2 - 0
kafkasamples.iml

@@ -0,0 +1,2 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="JAVA_MODULE" version="4" />

+ 23 - 0
pom.xml

@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>fr.osinet.ps.kafka</groupId>
+    <artifactId>samples</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.0.1</version>
+        </dependency>
+    </dependencies>
+</project>

+ 57 - 0
src/main/java/KafkaAssignApp.java

@@ -0,0 +1,57 @@
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Properties;
+
+public class KafkaAssignApp {
+
+    public static final String TOPIC = "my-topic";
+    public static final Integer TIMEOUT = 10;
+
+    public static void main(String[] args) {
+        Properties props = new Properties();
+        // We use "put" since there are strings. Otherwise, use setProperty.
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        // props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
+
+        KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
+
+        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
+        TopicPartition myTopicPart0 = new TopicPartition(TOPIC, 0);
+        TopicPartition myTopicPart1 = new TopicPartition(TOPIC, 1);
+        partitions.add(myTopicPart0);
+        // Will block poll if the partition does not exist.
+        partitions.add(myTopicPart1);
+        myConsumer.assign(partitions);
+
+        Boolean started = false;
+        Integer pass = 0;
+        try {
+            while (true) {
+                ConsumerRecords<String, String> records = myConsumer.poll(TIMEOUT);
+                if (!started) {
+                    started = true;
+                    System.out.printf("Started");
+                }
+                for (ConsumerRecord<String, String> cr : records) {
+                    System.out.printf("Pass: %d/%d Topic: %s Partition: %d, Offset: %d, Key: %s Value: %s\n",
+                            pass, records.count(),
+                            cr.key(), cr.partition(), cr.offset(), cr.key(), cr.value());
+                }
+                pass++;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            myConsumer.close();
+        }
+    }
+}

+ 95 - 0
src/main/java/KafkaConsumerApp.java

@@ -0,0 +1,95 @@
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Properties;
+
+public class KafkaConsumerApp {
+
+    public static final String STRING_DESERIALIZER = StringDeserializer.class.getName();
+    public static final String TOPIC = "my-topic";
+    public static final Integer TIMEOUT = 200;
+
+    public static void main(String[] args) {
+        Properties props = new Properties();
+        // We use "put" since there are strings. Otherwise, use setProperty.
+        // These are the 3 required properties.
+        // 1. Cluster membership: partition leaders, etc
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093");
+
+        // 2. How keys and values are deserialized.
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+        // 3. The consumer Group ID is now required.
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
+
+        KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
+        myConsumer.subscribe(Arrays.asList(TOPIC));
+
+        Boolean started = false;
+        Integer pass = 0;
+        try {
+            while (true) {
+                ConsumerRecords<String, String> records = myConsumer.poll(TIMEOUT);
+                if (!started) {
+                    started = true;
+                    System.out.printf("Started");
+                }
+//                for (ConsumerRecord<String, String> cr : records.records(TOPIC)) {
+//                    System.out.printf("\t\tKey: %s Value: %s\n", cr.key(), cr.value());
+//                }
+                for (ConsumerRecord<String, String> cr : records) {
+                    System.out.printf("Pass: %d/%d Topic: %s Partition: %d, Offset: %d, Key: %s Value: %s\n",
+                            pass, records.count(),
+                            cr.key(), cr.partition(), cr.offset(), cr.key(), cr.value());
+                }
+                pass++;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            myConsumer.close();
+        }
+    }
+
+    private static void assignDemo(KafkaConsumer<String, String> c) {
+        TopicPartition partition0 = new TopicPartition(TOPIC, 0);
+        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
+        partitions.add(partition0);
+
+        c.assign(partitions); // NOT incremental !
+    }
+
+    private static void subscribeExamples(KafkaConsumer<String, String> c) {
+        // In tutorial but does not compile: needs a rebalance callback
+        // c.subscribe("my-*");
+        // Maybe that way ?
+        // c.subscribe(Pattern.compile("my-[\\w]+"), null);
+
+        // Typical initial subscription.
+        c.subscribe(Arrays.asList(TOPIC));
+
+        // Replaces the current subscription set, does not extend it
+        c.subscribe(Arrays.asList("another-topic"));
+
+        // Better for incremental cases.
+        ArrayList<String> topics = new ArrayList<String>();
+        topics.add(TOPIC);
+        topics.add("my-other-topic");
+        topics.add("yetAnotherTopic");
+        c.subscribe(topics);
+
+        // Unsubcribe all topics.
+        c.unsubscribe();
+
+        // Alternative
+        topics.clear();
+        c.subscribe(topics);
+    }
+}

+ 76 - 0
src/main/java/KafkaProducerApp.java

@@ -0,0 +1,76 @@
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.time.Instant;
+import java.util.Properties;
+
+public class KafkaProducerApp {
+
+    public static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+    public static final String TOPIC = "my-topic";
+
+    public static void main(String[] args) {
+        Properties props = new Properties();
+
+        // These are the 3 required properties.
+        // 1. Cluster membership: partition leaders, etc
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093");
+
+        // 2. How keys and values are serialized.
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER);
+
+        KafkaProducer<String, String> myProducer = new KafkaProducer<String, String>(props);
+
+        try {
+            for (int i = 0; i < 100; i++) {
+                Instant ts = Instant.now();
+                Double ss = ts.toEpochMilli() + ts.getNano() / 1E9;
+                ProducerRecord myMessage = new ProducerRecord(TOPIC, String.format("%3d : %09.3f", i, ss));
+                myProducer.send(myMessage); // Best practice: wrap in try..catch.
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            myProducer.close();
+        }
+    }
+
+    public static void createMessagesExample() {
+        // Message with only the required properties.
+        ProducerRecord myMessage = new ProducerRecord(
+                TOPIC, // Topic to which the record will be sent.
+                "My Message 1" // Message content, matching the serializer type for value
+        );
+
+        // Non-matching type: runtime exception
+        // ProducerRecord myBadMessage = new ProducerRecord(TOPIC, 3.14159);
+
+        ProducerRecord myPartitionedMessage = new ProducerRecord(
+                TOPIC, // String Topic
+                1,  // Integer Partition
+                "My Message 1" // String Message
+        );
+
+        ProducerRecord myKeyedMessage = new ProducerRecord(
+                TOPIC, // String Topic
+                "Course-001",  // K key
+                "My Message 1" // String Message
+        );
+
+        // Adding optional properties
+        ProducerRecord msg3 = new ProducerRecord(
+                TOPIC, // String Topic
+                1, // Integer Partition
+                124535353325L, // Long timestamp, added in Kafka 0.10.
+                "Course-001", // K key: basis to determine partitioning strategy. Don't use blank or NULL.
+                // Key may contain additional message information, but adds overhead, depends on serializer.
+                "My Message 1" // V value
+        );
+        // The actual TS being send with the message is defined in server.properties:
+        // log.message.timestamp.type = [CreateTime, LogAppendTime]
+        // - CreateTime: producer-set timestamp is used
+        // - LogAppendtime: broker-set to the time when the message is appended to the commit log. Overrides the producet-set one.
+    }
+}

Some files were not shown because too many files changed in this diff