05 Producing messages with Kafka producers
- 2 types centraux:
KafkaProducer
ProducerRecord
Creating a producer
Creating messages
- K calls messages "ProducerRecord"
- 2 requires properties:
- Additional properties:
- KP instances can only send PRs that match the key and value
serializer types they are configured with.
Sending messages
When the producer sends:
- it connects to the cluster using the boostrap.servers to discover the cluster
membership, returned as "metadata"
- topics
- partitions
- brokers handling them
- it creates a Metadata object within itself, which it will refresh in the background
- it passes the message through the serializer(s) (K, V)
- it passes it to the partitioner, which will choose a partition based on the message
and the cluster information in Metadata. Strategies:
- direct (is record contains a valid (see cluster) partition ID)
- round-robin (in
DefaultPartitioner
class), if it has no key
- key mod-hash (if it has a key but no customer partitioner),
a Murmur hash in DefaultPartitioner. See
DefaultPartitioner.partition
.
- custom (defined in
PARTITIONER_CLASS_CONFIG == "partitioner.class"
property)
- it pushes the message to an in-memory queue, the
RecordAccumulator
- micro-batching: as scale, efficiency is everything.
- Use on producer, broker, and consumer.
- Also used in the OS (page cache, Linux sendfile() syscall).
- Amortizes the constant cost of sends
- Collection of
RecordBatch
objects, one by partition.
- Choice of batch size is complex, influence by advanced configuration settings.
- The size of
RecordBatch
instances used by Message buffering is defined by batch.size
in bytes, not number of messages
- Their number is determined by how many fit into
buffer.memory
- When more is needed, the
max.block.ms
setting determines how long the
producer send
method will be blocked, forcing back-pressure on the producer
- The goal is to leave time for the messages to be transmitted, freeing up buffer memory
- When a
RecordBatch
gets full, it is sent immediately
- When none is full,
linger.ms
defines how long a not-full buffer can wait before being sent
- When a batch is sent, the broker returns
RecordMetadata
, providing success/failure info
Delivery guarantees
- Producer can define what level of acknowledgment it expects from the brokers, with setting
acks
- 0: fire and forget. By far the fastest but riskiest, as the broker may not even log any issue
- 1: leader acknowledged. Mid-ground.
- 2: replication quorum acknowleldged. Slowest, safest. May be unpredictable due to
topology changes on broker failures
- Broker errors can be configured too:
retries
defines how many times the producer will retry to send a failed message
retry.backoff.ms
is a fixed delay between retries
- Ordering guarantees:
- Only preserved within a given partition
- No global order across partitions of a topic
- Can get complicated with errors, especially with retries
- Can be alleviated entirely by setting
max.in.flight.request.per.connection
to 1,
but that has a high cost.
- Delivery semantics result from these settings:
- At-least once
- At-most once
- Exactly once
Advanced topics
- Cusom serializers
- Custom Partitioners
- Asynchronous send
- Compression
- Advanced settings