Browse Source

Refactorings.

Frederic G. MARAND 3 years ago
parent
commit
f71f1bbaa6

+ 1 - 1
.idea/runConfigurations/KafkaProducerApp.xml

@@ -1,7 +1,7 @@
 <component name="ProjectRunConfigurationManager">
   <configuration default="false" name="KafkaProducerApp" type="Application" factoryName="Application">
     <option name="ALTERNATIVE_JRE_PATH" value="1.8" />
-    <option name="MAIN_CLASS_NAME" value="producer.KafkaProducerApp" />
+    <option name="MAIN_CLASS_NAME" value="KafkaProducerApp" />
     <module name="GettingStartedWithKafka" />
     <RunnerSettings RunnerId="Run" />
     <ConfigurationWrapper RunnerId="Run" />

+ 1 - 1
docs/05 Producing messages.md

@@ -13,7 +13,7 @@
     - and others
     - => `KafkaProducer`
     - [Producer config doc](http://kafka.apache.org/documentation.html#producerconfigs)
-- See [producer.KafkaProducerApp](../src/main/java/KafkaProducerApp.java)
+- See [KafkaProducerApp](../src/main/java/KafkaProducerApp.java)
 
 ## Creating messages
 

+ 3 - 13
src/main/java/KafkaAssignApp.java

@@ -1,5 +1,3 @@
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 
@@ -10,16 +8,9 @@ public class KafkaAssignApp {
 
   public static void main(String[] args) {
     Properties props = new Properties();
-    // We use "put" since there are strings. Otherwise, use setProperty.
-    // These are the 3 required properties.
-    // 1. Cluster membership: partition leaders, etc
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093");
+    KafkaConsumerCommon.configure(props);
 
-    // 2. How keys and values are deserialized.
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER);
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER);
-
-    // 3. The consumer Group ID is NOT required for assign;
+    // The consumer Group ID is NOT required for assign;
     // props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
 
     KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
@@ -36,8 +27,7 @@ public class KafkaAssignApp {
 
     try {
       while (true) {
-        ConsumerRecords<String, String> records = myConsumer.poll(KafkaConsumerCommon.TIMEOUT);
-        KafkaConsumerCommon.process(records);
+        KafkaConsumerCommon.process(myConsumer);
       }
     } catch (Exception e) {
       e.printStackTrace();

+ 1 - 0
src/main/java/KafkaCommon.java

@@ -1,3 +1,4 @@
 public class KafkaCommon {
   public static final String TOPIC = "my-topic";
+  public static final String BROKERS = "localhost:9092, BROKER-1:9092, BROKER-2:9093";
 }

+ 4 - 12
src/main/java/KafkaConsumerApp.java

@@ -10,17 +10,10 @@ public class KafkaConsumerApp {
 
   public static void main(String[] args) {
     Properties props = new Properties();
-    // We use "put" since there are strings. Otherwise, use setProperty.
-    // These are the 3 required properties.
-    // 1. Cluster membership: partition leaders, etc
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093");
+    KafkaConsumerCommon.configure(props);
 
-    // 2. How keys and values are deserialized.
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER);
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaConsumerCommon.STRING_DESERIALIZER);
-
-    // 3. The consumer Group ID is now required.
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");
+    // The consumer Group ID is now required on subscribe().
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerCommon.GROUP);
 
     KafkaConsumer<String, String> myConsumer = new KafkaConsumer<String, String>(props);
 
@@ -29,8 +22,7 @@ public class KafkaConsumerApp {
 
     try {
       while (true) {
-        ConsumerRecords<String, String> records = myConsumer.poll(KafkaConsumerCommon.TIMEOUT);
-        KafkaConsumerCommon.process(records);
+        KafkaConsumerCommon.process(myConsumer);
       }
     } catch (Exception e) {
       e.printStackTrace();

+ 19 - 2
src/main/java/KafkaConsumerCommon.java

@@ -1,14 +1,31 @@
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
+import java.util.Properties;
+
 public class KafkaConsumerCommon {
+  public static final String GROUP = "test-group";
   public static final String STRING_DESERIALIZER = StringDeserializer.class.getName();
-  public static final Integer TIMEOUT = 200;
+  public static final Integer TIMEOUT = 10;
   protected static Boolean started = false;
   protected static Integer pass = 0;
 
-  static void process(ConsumerRecords<String, String> records) {
+  static void configure(Properties props) {
+    // We use "put" since there are strings. Otherwise, use setProperty.
+    // These are the 3 required properties.
+    // 1. Cluster membership: partition leaders, etc
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCommon.BROKERS);
+
+    // 2. How keys and values are deserialized.
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, STRING_DESERIALIZER);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, STRING_DESERIALIZER);
+  }
+
+  static void process(KafkaConsumer<String, String> c) {
+    ConsumerRecords<String, String> records = c.poll(KafkaConsumerCommon.TIMEOUT);
     if (!started) {
       started = true;
       System.out.printf("Started");

+ 65 - 0
src/main/java/KafkaProducerApp.java

@@ -0,0 +1,65 @@
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.time.Instant;
+import java.util.Properties;
+
+public class KafkaProducerApp {
+
+  public static void main(String[] args) {
+    Properties props = new Properties();
+    KafkaProducerCommon.configure(props);
+
+    KafkaProducer<String, String> myProducer = new KafkaProducer<String, String>(props);
+
+    try {
+      for (int i = 0; i < 100; i++) {
+        Instant ts = Instant.now();
+        Double ss = ts.toEpochMilli() + ts.getNano() / 1E9;
+        ProducerRecord myMessage = new ProducerRecord(KafkaCommon.TOPIC, String.format("%3d : %09.3f", i, ss));
+        myProducer.send(myMessage); // Best practice: wrap in try..catch.
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      myProducer.close();
+    }
+  }
+
+  public static void createMessagesExample() {
+    // Message with only the required properties.
+    ProducerRecord myMessage = new ProducerRecord(
+      KafkaCommon.TOPIC, // Topic to which the record will be sent.
+      "My Message 1" // Message content, matching the serializer type for value
+    );
+
+    // Non-matching type: runtime exception
+    // ProducerRecord myBadMessage = new ProducerRecord(TOPIC, 3.14159);
+
+    ProducerRecord myPartitionedMessage = new ProducerRecord(
+      KafkaCommon.TOPIC, // String Topic
+      1,  // Integer Partition
+      "My Message 1" // String Message
+    );
+
+    ProducerRecord myKeyedMessage = new ProducerRecord(
+      KafkaCommon.TOPIC, // String Topic
+      "Course-001",  // K key
+      "My Message 1" // String Message
+    );
+
+    // Adding optional properties
+    ProducerRecord msg3 = new ProducerRecord(
+      KafkaCommon.TOPIC, // String Topic
+      1, // Integer Partition
+      124535353325L, // Long timestamp, added in Kafka 0.10.
+      "Course-001", // K key: basis to determine partitioning strategy. Don't use blank or NULL.
+      // Key may contain additional message information, but adds overhead, depends on serializer.
+      "My Message 1" // V value
+    );
+    // The actual TS being send with the message is defined in server.properties:
+    // log.message.timestamp.type = [CreateTime, LogAppendTime]
+    // - CreateTime: producer-set timestamp is used
+    // - LogAppendtime: broker-set to the time when the message is appended to the commit log. Overrides the producet-set one.
+  }
+}

+ 19 - 0
src/main/java/KafkaProducerCommon.java

@@ -0,0 +1,19 @@
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import java.util.Properties;
+
+public class KafkaProducerCommon {
+  public static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+
+  static void configure(Properties props) {
+    // We use "put" since there are strings. Otherwise, use setProperty.
+    // These are the 3 required properties.
+    // 1. Cluster membership: partition leaders, etc
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCommon.BROKERS);
+
+    // 2. How keys and values are serialized.
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER);
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER);
+  }
+
+}

+ 0 - 78
src/main/java/producer/KafkaProducerApp.java

@@ -1,78 +0,0 @@
-package producer;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import java.time.Instant;
-import java.util.Properties;
-
-public class KafkaProducerApp {
-
-    public static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
-    public static final String TOPIC = "my-topic";
-
-    public static void main(String[] args) {
-        Properties props = new Properties();
-
-        // These are the 3 required properties.
-        // 1. Cluster membership: partition leaders, etc
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093");
-
-        // 2. How keys and values are serialized.
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER);
-
-        KafkaProducer<String, String> myProducer = new KafkaProducer<String, String>(props);
-
-        try {
-            for (int i = 0; i < 100; i++) {
-                Instant ts = Instant.now();
-                Double ss = ts.toEpochMilli() + ts.getNano() / 1E9;
-                ProducerRecord myMessage = new ProducerRecord(TOPIC, String.format("%3d : %09.3f", i, ss));
-                myProducer.send(myMessage); // Best practice: wrap in try..catch.
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            myProducer.close();
-        }
-    }
-
-    public static void createMessagesExample() {
-        // Message with only the required properties.
-        ProducerRecord myMessage = new ProducerRecord(
-                TOPIC, // Topic to which the record will be sent.
-                "My Message 1" // Message content, matching the serializer type for value
-        );
-
-        // Non-matching type: runtime exception
-        // ProducerRecord myBadMessage = new ProducerRecord(TOPIC, 3.14159);
-
-        ProducerRecord myPartitionedMessage = new ProducerRecord(
-                TOPIC, // String Topic
-                1,  // Integer Partition
-                "My Message 1" // String Message
-        );
-
-        ProducerRecord myKeyedMessage = new ProducerRecord(
-                TOPIC, // String Topic
-                "Course-001",  // K key
-                "My Message 1" // String Message
-        );
-
-        // Adding optional properties
-        ProducerRecord msg3 = new ProducerRecord(
-                TOPIC, // String Topic
-                1, // Integer Partition
-                124535353325L, // Long timestamp, added in Kafka 0.10.
-                "Course-001", // K key: basis to determine partitioning strategy. Don't use blank or NULL.
-                // Key may contain additional message information, but adds overhead, depends on serializer.
-                "My Message 1" // V value
-        );
-        // The actual TS being send with the message is defined in server.properties:
-        // log.message.timestamp.type = [CreateTime, LogAppendTime]
-        // - CreateTime: producer-set timestamp is used
-        // - LogAppendtime: broker-set to the time when the message is appended to the commit log. Overrides the producet-set one.
-    }
-}

+ 1 - 1
src/main/java/producer/META-INF/MANIFEST.MF

@@ -1,2 +1,2 @@
 Manifest-Version: 1.0
-Main-Class: producer.KafkaProducerApp
+Main-Class: KafkaProducerApp