04 Topics, Partitions and Brokers.md 5.9 KB

Understanding Topics, Partitions, and Brokers

Concepts

In summary: Kafka is pub/sub messaging rethought as a distributed commit log

  • Topics = named feed/category of messages (== queue, mailbox)
  • Physically represented as a log (one or more files)
  • Topics can scale across all or part of a cluster for FT/Scalability
  • Events in a topic form a time-ordered sequence of immutable values
  • Style == Event Sourcing
  • Message:
    • {timestamp} set upon reception by the broker
    • {id} unique
    • [data content] binary
    • {timestamp} + {id} form its placement in the sequence of messages received within a topic
    • {id} is referenceable by consumers.
  • Offset : placeholder
    • maintains the last read message position
      • in a topic == last read message
    • entirely established and maintained by the consumer, not the broker
    • is actually a {id}
    • how do consumers retain offset across relaunches ?
  • Message retention policy: time K maintains messages
    • default 168 hours = 7 days
    • per-topic
    • may be constrained by physical resources available

Commands

  • Creating a topic:

    • command: kafka-topics --create --topic my_topic -zookeeper localhost:2181 --replication-factor 1 --partitions 1
    • in Kafka logs (/usr/local/var/log/kafka):

      [2021-04-01 15:34:42,097] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(my_topic-0) (kafka.server.ReplicaFetcherManager)
      [2021-04-01 15:34:42,216] INFO [Log partition=my_topic-0, dir=/usr/local/var/lib/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
      [2021-04-01 15:34:42,226] INFO Created log for partition my_topic-0 in /usr/local/var/lib/kafka-logs/my_topic-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
      [2021-04-01 15:34:42,227] INFO [Partition my_topic-0 broker=0] No checkpointed highwatermark is found for partition my_topic-0 (kafka.cluster.Partition)
      [2021-04-01 15:34:42,228] INFO [Partition my_topic-0 broker=0] Log loaded for partition my_topic-0 with initial high watermark 0 (kafka.cluster.Partition)
      

-

- the actual physical logs are in `/usr/local/var/lig/kakfa-logs`: 
$ ls -l /usr/local/var/lig/kakfa-logs
-rw-r--r--  1 fgm  admin    0  1 avr 15:26 cleaner-offset-checkpoint
-rw-r--r--  1 fgm  admin    4  1 avr 15:37 log-start-offset-checkpoint
-rw-r--r--  1 fgm  admin   89  1 avr 15:26 meta.properties
drwxr-xr-x  6 fgm  admin  192  1 avr 15:34 my_topic-0
-rw-r--r--  1 fgm  admin   17  1 avr 15:37 recovery-point-offset-checkpoint
-rw-r--r--  1 fgm  admin   17  1 avr 15:37 replication-offset-checkpoint
$ ls -l /usr/local/var/lig/kakfa-logs/my_topic-0
-rw-r--r--  1 fgm  admin  10485760  1 avr 15:34 00000000000000000000.index
-rw-r--r--  1 fgm  admin         0  1 avr 15:34 00000000000000000000.log
-rw-r--r--  1 fgm  admin  10485756  1 avr 15:34 00000000000000000000.timeindex
-rw-r--r--  1 fgm  admin         8  1 avr 15:34 leader-epoch-checkpoint
  • listing topics:

    $ kafka-topics -zookeeper localhost:2181 --list
    my_topic
    
  • start a producer on my_topic:

    kafka-console-producer --broker-list localhost:9092 --topic my_topic`
    >MyMessage 1
    
  • consume from my_topic.

    kafka-console-consumer --bootstrap-server localhost:9092 --topic my_topic --from-beginning
    # Tutorial says:
    kafka-console-consumer --zookeper localhost:2181 --topic my_topic --from-beginning
    # but that option is not recognized in homebrew kafka
    # Using an explicit offset, needs a partition id:
    kafka-console-consumer --bootstrap-server localhost:9092 --topic my_topic --partition 0 --offset 2
    
  • examine topic:

    $ kafka-topics --zookeeper localhost:2181 --describe --topic my_topic
    Topic: my_topic	PartitionCount: 1	ReplicationFactor: 1	Configs:
    	Topic: my_topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
    

Partitions

  • Partition:
    • physical log file in a topic
    • configurable per topc
    • basis for scaleability, fault-tolerance, throughput
    • maintained on >= 1 broker(s)
    • must fit on 1 machine, unsplittable
    • 1 leader per partition kafka-topics --create \ --topic my_topic \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \
  • Why use a single-partition topic ? Doable, but:
    • limits scalability and throughput
    • scalability of K is determined by the number of partitions managed by multiple broker nodes
    • scalability of K is determined by the number of partitions managed by multiple broker nodes
    • consumers receiving from multiple partitions may receive messages out of order
  • Partitioning trade-offs:
    • ZK load increases with the number of partitions
    • message ordering can become complex: no global order across partitions. To get it:
      • use a single partition
      • handle ordering on the consumer / consumergroup side
      • consumergroups are consumers sharing the same offsets (N(consumer) > N(partitions) => unused consumers)
      • identified by group.id
    • leader fail-over time increases with the number of partitions (msec per partition on a leader)
      • solution in very large setups is multiple clusters
  • Fault-tolerance
    • Broker failure
    • Network issue
    • Disk failure
    • => replication factor
      • N > 1, guarantee against N-1 broker failures
      • per topic
      • ISR = In Sync Replica