import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.time.Instant; import java.util.Properties; public class KafkaProducerApp { public static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; public static final String TOPIC = "my-topic"; public static void main(String[] args) { Properties props = new Properties(); // These are the 3 required properties. // 1. Cluster membership: partition leaders, etc props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, BROKER-1:9092, BROKER-2:9093"); // 2. How keys and values are serialized. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER); KafkaProducer myProducer = new KafkaProducer(props); try { for (int i = 0; i < 100; i++) { Instant ts = Instant.now(); Double ss = ts.toEpochMilli() + ts.getNano() / 1E9; ProducerRecord myMessage = new ProducerRecord(TOPIC, String.format("%3d : %09.3f", i, ss)); myProducer.send(myMessage); // Best practice: wrap in try..catch. } } catch (Exception e) { e.printStackTrace(); } finally { myProducer.close(); } } public static void createMessagesExample() { // Message with only the required properties. ProducerRecord myMessage = new ProducerRecord( TOPIC, // Topic to which the record will be sent. "My Message 1" // Message content, matching the serializer type for value ); // Non-matching type: runtime exception // ProducerRecord myBadMessage = new ProducerRecord(TOPIC, 3.14159); ProducerRecord myPartitionedMessage = new ProducerRecord( TOPIC, // String Topic 1, // Integer Partition "My Message 1" // String Message ); ProducerRecord myKeyedMessage = new ProducerRecord( TOPIC, // String Topic "Course-001", // K key "My Message 1" // String Message ); // Adding optional properties ProducerRecord msg3 = new ProducerRecord( TOPIC, // String Topic 1, // Integer Partition 124535353325L, // Long timestamp, added in Kafka 0.10. "Course-001", // K key: basis to determine partitioning strategy. Don't use blank or NULL. // Key may contain additional message information, but adds overhead, depends on serializer. "My Message 1" // V value ); // The actual TS being send with the message is defined in server.properties: // log.message.timestamp.type = [CreateTime, LogAppendTime] // - CreateTime: producer-set timestamp is used // - LogAppendtime: broker-set to the time when the message is appended to the commit log. Overrides the producet-set one. } }