KafkaProducerApp.java 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package producer;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.time.Instant;
  6. import java.util.Properties;
  7. public class KafkaProducerApp {
  8. public static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
  9. public static final String TOPIC = "my-topic";
  10. public static void main(String[] args) {
  11. Properties props = new Properties();
  12. // These are the 3 required properties.
  13. // 1. Cluster membership: partition leaders, etc
  14. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093");
  15. // 2. How keys and values are serialized.
  16. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER);
  17. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER);
  18. KafkaProducer<String, String> myProducer = new KafkaProducer<String, String>(props);
  19. try {
  20. for (int i = 0; i < 100; i++) {
  21. Instant ts = Instant.now();
  22. Double ss = ts.toEpochMilli() + ts.getNano() / 1E9;
  23. ProducerRecord myMessage = new ProducerRecord(TOPIC, String.format("%3d : %09.3f", i, ss));
  24. myProducer.send(myMessage); // Best practice: wrap in try..catch.
  25. }
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. } finally {
  29. myProducer.close();
  30. }
  31. }
  32. public static void createMessagesExample() {
  33. // Message with only the required properties.
  34. ProducerRecord myMessage = new ProducerRecord(
  35. TOPIC, // Topic to which the record will be sent.
  36. "My Message 1" // Message content, matching the serializer type for value
  37. );
  38. // Non-matching type: runtime exception
  39. // ProducerRecord myBadMessage = new ProducerRecord(TOPIC, 3.14159);
  40. ProducerRecord myPartitionedMessage = new ProducerRecord(
  41. TOPIC, // String Topic
  42. 1, // Integer Partition
  43. "My Message 1" // String Message
  44. );
  45. ProducerRecord myKeyedMessage = new ProducerRecord(
  46. TOPIC, // String Topic
  47. "Course-001", // K key
  48. "My Message 1" // String Message
  49. );
  50. // Adding optional properties
  51. ProducerRecord msg3 = new ProducerRecord(
  52. TOPIC, // String Topic
  53. 1, // Integer Partition
  54. 124535353325L, // Long timestamp, added in Kafka 0.10.
  55. "Course-001", // K key: basis to determine partitioning strategy. Don't use blank or NULL.
  56. // Key may contain additional message information, but adds overhead, depends on serializer.
  57. "My Message 1" // V value
  58. );
  59. // The actual TS being send with the message is defined in server.properties:
  60. // log.message.timestamp.type = [CreateTime, LogAppendTime]
  61. // - CreateTime: producer-set timestamp is used
  62. // - LogAppendtime: broker-set to the time when the message is appended to the commit log. Overrides the producet-set one.
  63. }
  64. }