KafkaProducerApp.java 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import java.time.Instant;
  4. import java.util.Properties;
  5. public class KafkaProducerApp {
  6. public static void main(String[] args) {
  7. Properties props = new Properties();
  8. KafkaProducerCommon.configure(props);
  9. KafkaProducer<String, String> myProducer = new KafkaProducer<String, String>(props);
  10. try {
  11. for (int i = 0; i < 100; i++) {
  12. Instant ts = Instant.now();
  13. Double ss = ts.toEpochMilli() + ts.getNano() / 1E9;
  14. ProducerRecord myMessage = new ProducerRecord(KafkaCommon.TOPIC, String.format("%3d : %09.3f", i, ss));
  15. myProducer.send(myMessage); // Best practice: wrap in try..catch.
  16. }
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. } finally {
  20. myProducer.close();
  21. }
  22. }
  23. public static void createMessagesExample() {
  24. // Message with only the required properties.
  25. ProducerRecord myMessage = new ProducerRecord(
  26. KafkaCommon.TOPIC, // Topic to which the record will be sent.
  27. "My Message 1" // Message content, matching the serializer type for value
  28. );
  29. // Non-matching type: runtime exception
  30. // ProducerRecord myBadMessage = new ProducerRecord(TOPIC, 3.14159);
  31. ProducerRecord myPartitionedMessage = new ProducerRecord(
  32. KafkaCommon.TOPIC, // String Topic
  33. 1, // Integer Partition
  34. "My Message 1" // String Message
  35. );
  36. ProducerRecord myKeyedMessage = new ProducerRecord(
  37. KafkaCommon.TOPIC, // String Topic
  38. "Course-001", // K key
  39. "My Message 1" // String Message
  40. );
  41. // Adding optional properties
  42. ProducerRecord msg3 = new ProducerRecord(
  43. KafkaCommon.TOPIC, // String Topic
  44. 1, // Integer Partition
  45. 124535353325L, // Long timestamp, added in Kafka 0.10.
  46. "Course-001", // K key: basis to determine partitioning strategy. Don't use blank or NULL.
  47. // Key may contain additional message information, but adds overhead, depends on serializer.
  48. "My Message 1" // V value
  49. );
  50. // The actual TS being send with the message is defined in server.properties:
  51. // log.message.timestamp.type = [CreateTime, LogAppendTime]
  52. // - CreateTime: producer-set timestamp is used
  53. // - LogAppendtime: broker-set to the time when the message is appended to the commit log. Overrides the producet-set one.
  54. }
  55. }