BatchProducerApp.java 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. import org.apache.kafka.clients.producer.*;
  2. import java.text.*;
  3. import java.util.*;
  4. public class BatchProducerApp {
  5. public static void main(String[] args){
  6. // Create the Properties class to instantiate the Consumer with the desired settings:
  7. Properties props = new Properties();
  8. props.put("bootstrap.servers", "localhost:9092, localhost:9093");
  9. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  10. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  11. props.put("acks", "");
  12. props.put("buffer.memory", 33554432);
  13. props.put("compression.type", "none");
  14. props.put("retries", 0);
  15. props.put("batch.size", 16384);
  16. props.put("client.id", "");
  17. props.put("linger.ms", 0);
  18. props.put("max.block.ms", 60000);
  19. props.put("max.request.size", 1048576);
  20. props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
  21. props.put("request.timeout.ms", 30000);
  22. props.put("timeout.ms", 30000);
  23. props.put("max.in.flight.requests.per.connection", 5);
  24. props.put("retry.backoff.ms", 5);
  25. KafkaProducer<String, String> myProducer = new KafkaProducer<String, String>(props);
  26. DateFormat dtFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
  27. String topic = "my-topic";
  28. try {
  29. int batchNumber = 1;
  30. int counter = 0;
  31. while (true) {
  32. do {
  33. myProducer.send(
  34. new ProducerRecord<String, String>(topic, String.format("Batch: %s || %s", Integer.toString(batchNumber), dtFormat.format(new Date())))
  35. );
  36. counter++; // Increase record counter...
  37. // Thread.sleep(500); // use if you want to add latency between record sends
  38. // Thread.sleep(new Random(1000).nextLong()); // use if you want to add random latency between record sends
  39. } while (counter < 10); // Number of records sent in a batch...
  40. counter = 0; // Reset the record counter...
  41. Thread.sleep(500); // Set how long before a new batch is sent...
  42. // Thread.sleep(new Random(5000).nextLong()); // use if you want to randomize the time between batch record sends
  43. batchNumber++; // Increase the batch number...
  44. }
  45. } catch (Exception e) {
  46. e.printStackTrace();
  47. } finally {
  48. myProducer.close();
  49. }
  50. }
  51. }