05 Producing messages with Kafka producers.md 3.8 KB

05 Producing messages with Kafka producers

  • 2 types centraux:
    • KafkaProducer
    • ProducerRecord

Creating a producer

Creating messages

  • K calls messages "ProducerRecord"
    • 2 requires properties:
      • Topic
      • Value
    • Additional properties:
      • Partition
      • Timestamp
      • Key
  • KP instances can only send PRs that match the key and value serializer types they are configured with.

Sending messages

When the producer sends:

  • it connects to the cluster using the boostrap.servers to discover the cluster membership, returned as "metadata"
    • topics
    • partitions
    • brokers handling them
  • it creates a Metadata object within itself, which it will refresh in the background
  • it passes the message through the serializer(s) (K, V)
  • it passes it to the partitioner, which will choose a partition based on the message and the cluster information in Metadata. Strategies:
    • direct (is record contains a valid (see cluster) partition ID)
    • round-robin (in DefaultPartitioner class), if it has no key
    • key mod-hash (if it has a key but no customer partitioner), a Murmur hash in DefaultPartitioner. See DefaultPartitioner.partition.
    • custom (defined in PARTITIONER_CLASS_CONFIG == "partitioner.class" property)
  • it pushes the message to an in-memory queue, the RecordAccumulator
    • micro-batching: as scale, efficiency is everything.
    • Use on producer, broker, and consumer.
    • Also used in the OS (page cache, Linux sendfile() syscall).
    • Amortizes the constant cost of sends
    • Collection of RecordBatch objects, one by partition.
    • Choice of batch size is complex, influence by advanced configuration settings.
      • The size of RecordBatch instances used by Message buffering is defined by batch.size in bytes, not number of messages
      • Their number is determined by how many fit into buffer.memory
  • When more is needed, the max.block.ms setting determines how long the producer send method will be blocked, forcing back-pressure on the producer
    • The goal is to leave time for the messages to be transmitted, freeing up buffer memory
  • When a RecordBatch gets full, it is sent immediately
    • When none is full, linger.ms defines how long a not-full buffer can wait before being sent
  • When a batch is sent, the broker returns RecordMetadata, providing success/failure info

Delivery guarantees

  • Producer can define what level of acknowledgment it expects from the brokers, with setting acks
    • 0: fire and forget. By far the fastest but riskiest, as the broker may not even log any issue
    • 1: leader acknowledged. Mid-ground.
    • 2: replication quorum acknowleldged. Slowest, safest. May be unpredictable due to topology changes on broker failures
  • Broker errors can be configured too:
    • retries defines how many times the producer will retry to send a failed message
    • retry.backoff.ms is a fixed delay between retries
  • Ordering guarantees:
    • Only preserved within a given partition
    • No global order across partitions of a topic
    • Can get complicated with errors, especially with retries
    • Can be alleviated entirely by setting max.in.flight.request.per.connection to 1, but that has a high cost.
  • Delivery semantics result from these settings:
    • At-least once
    • At-most once
    • Exactly once

Advanced topics

  • Cusom serializers
  • Custom Partitioners
  • Asynchronous send
  • Compression
  • Advanced settings