# 05 Producing messages with Kafka producers - 2 types centraux: - `KafkaProducer` - `ProducerRecord` ## Creating a producer - Properties: - `bootstrap.servers` - `key.serializer` - `value.serializer` - and others - => `KafkaProducer` - [Producer config doc](http://kafka.apache.org/documentation.html#producerconfigs) - See [KafkaProducerApp](../src/main/java/KafkaProducerApp.java) ## Creating messages - K calls messages "ProducerRecord" - 2 requires properties: - Topic - Value - Additional properties: - Partition - Timestamp - Key - KP instances can only send PRs that match the key and value serializer types they are configured with. ## Sending messages When the producer sends: - it connects to the cluster using the boostrap.servers to discover the cluster membership, returned as "metadata" - topics - partitions - brokers handling them - it creates a Metadata object within itself, which it will refresh in the background - it passes the message through the serializer(s) (K, V) - it passes it to the partitioner, which will choose a partition based on the message and the cluster information in Metadata. Strategies: - direct (is record contains a valid (see cluster) partition ID) - round-robin (in `DefaultPartitioner` class), if it has no key - key mod-hash (if it has a key but no customer partitioner), a Murmur hash in DefaultPartitioner. See `DefaultPartitioner.partition`. - custom (defined in `PARTITIONER_CLASS_CONFIG == "partitioner.class"` property) - it pushes the message to an in-memory queue, the `RecordAccumulator` - micro-batching: as scale, efficiency is everything. - Use on producer, broker, and consumer. - Also used in the OS (page cache, Linux sendfile() syscall). - Amortizes the constant cost of sends - Collection of `RecordBatch` objects, one by partition. - Choice of batch size is complex, influence by advanced configuration settings. - The size of `RecordBatch` instances used by Message buffering is defined by `batch.size` in bytes, not number of messages - Their number is determined by how many fit into `buffer.memory` - When more is needed, the `max.block.ms` setting determines how long the producer `send` method will be blocked, forcing back-pressure on the producer - The goal is to leave time for the messages to be transmitted, freeing up buffer memory - When a `RecordBatch` gets full, it is sent immediately - When none is full, `linger.ms` defines how long a not-full buffer can wait before being sent - When a batch is sent, the broker returns `RecordMetadata`, providing success/failure info ## Delivery guarantees - Producer can define what level of acknowledgment it expects from the brokers, with setting `acks` - 0: fire and forget. By far the fastest but riskiest, as the broker may not even log any issue - 1: leader acknowledged. Mid-ground. - 2: replication quorum acknowleldged. Slowest, safest. May be unpredictable due to topology changes on broker failures - Broker errors can be configured too: - `retries` defines how many times the producer will retry to send a failed message - `retry.backoff.ms` is a fixed delay between retries - Ordering guarantees: - Only preserved within a given partition - No global order across partitions of a topic - Can get complicated with errors, especially with retries - Can be alleviated entirely by setting `max.in.flight.request.per.connection` to 1, but that has a high cost. - Delivery semantics result from these settings: - At-least once - At-most once - Exactly once ## Advanced topics - Cusom serializers - Custom Partitioners - Asynchronous send - Compression - Advanced settings