# Understanding Topics, Partitions, and Brokers ## Concepts In summary: Kafka is pub/sub messaging rethought as a **distributed commit log** - Topics = named feed/category of messages (== queue, mailbox) - Physically represented as a log (one or more files) - Topics can scale across all or part of a cluster for FT/Scalability - Events in a topic form a time-ordered sequence of immutable values - Style == Event Sourcing - Message: - `{timestamp}` set upon reception by the broker - `{id}` unique - `[data content]` binary - `{timestamp}` + `{id}` form its placement in the sequence of messages received within a topic - `{id}` is referenceable by consumers. - Offset : placeholder - maintains the last read message position - in a topic == last read message - entirely established and maintained by the consumer, not the broker - is actually a `{id}` - how do consumers retain offset across relaunches ? - Message retention policy: time K maintains messages - default 168 hours = 7 days - per-topic - may be constrained by physical resources available ## Commands - Creating a topic: - command: `kafka-topics --create --topic my_topic -zookeeper localhost:2181 --replication-factor 1 --partitions 1` - in Kafka logs (`/usr/local/var/log/kafka`): ```text [2021-04-01 15:34:42,097] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(my_topic-0) (kafka.server.ReplicaFetcherManager) [2021-04-01 15:34:42,216] INFO [Log partition=my_topic-0, dir=/usr/local/var/lib/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log) [2021-04-01 15:34:42,226] INFO Created log for partition my_topic-0 in /usr/local/var/lib/kafka-logs/my_topic-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager) [2021-04-01 15:34:42,227] INFO [Partition my_topic-0 broker=0] No checkpointed highwatermark is found for partition my_topic-0 (kafka.cluster.Partition) [2021-04-01 15:34:42,228] INFO [Partition my_topic-0 broker=0] Log loaded for partition my_topic-0 with initial high watermark 0 (kafka.cluster.Partition) ``` - - the actual physical logs are in `/usr/local/var/lig/kakfa-logs`: ```text $ ls -l /usr/local/var/lig/kakfa-logs -rw-r--r-- 1 fgm admin 0 1 avr 15:26 cleaner-offset-checkpoint -rw-r--r-- 1 fgm admin 4 1 avr 15:37 log-start-offset-checkpoint -rw-r--r-- 1 fgm admin 89 1 avr 15:26 meta.properties drwxr-xr-x 6 fgm admin 192 1 avr 15:34 my_topic-0 -rw-r--r-- 1 fgm admin 17 1 avr 15:37 recovery-point-offset-checkpoint -rw-r--r-- 1 fgm admin 17 1 avr 15:37 replication-offset-checkpoint $ ls -l /usr/local/var/lig/kakfa-logs/my_topic-0 -rw-r--r-- 1 fgm admin 10485760 1 avr 15:34 00000000000000000000.index -rw-r--r-- 1 fgm admin 0 1 avr 15:34 00000000000000000000.log -rw-r--r-- 1 fgm admin 10485756 1 avr 15:34 00000000000000000000.timeindex -rw-r--r-- 1 fgm admin 8 1 avr 15:34 leader-epoch-checkpoint ``` - listing topics: ```text $ kafka-topics -zookeeper localhost:2181 --list my_topic ``` - start a producer on `my_topic`: ```text kafka-console-producer --broker-list localhost:9092 --topic my_topic` >MyMessage 1 ``` - consume from `my_topic`. ```bash kafka-console-consumer --bootstrap-server localhost:9092 --topic my_topic --from-beginning # Tutorial says: kafka-console-consumer --zookeper localhost:2181 --topic my_topic --from-beginning # but that option is not recognized in homebrew kafka # Using an explicit offset, needs a partition id: kafka-console-consumer --bootstrap-server localhost:9092 --topic my_topic --partition 0 --offset 2 ``` - examine topic: ````text $ kafka-topics --zookeeper localhost:2181 --describe --topic my_topic Topic: my_topic PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: my_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 ```` ## Partitions - Partition: - physical log file in a topic - configurable per topc - basis for scaleability, fault-tolerance, throughput - maintained on >= 1 broker(s) - must fit on 1 machine, unsplittable - 1 leader per partition ` kafka-topics --create \ --topic my_topic \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \` - Why use a single-partition topic ? Doable, but: - limits scalability and throughput - scalability of K is determined by the number of partitions managed by multiple broker nodes - scalability of K is determined by the number of partitions managed by multiple broker nodes - consumers receiving from multiple partitions may receive messages out of order - Partitioning trade-offs: - ZK load increases with the number of partitions - message ordering can become complex: no global order across partitions. To get it: - use a single partition - handle ordering on the consumer / consumergroup side - consumergroups are consumers sharing the same offsets (N(consumer) > N(partitions) => unused consumers) - identified by `group.id` - leader fail-over time increases with the number of partitions (msec per partition on a leader) - solution in very large setups is multiple clusters - Fault-tolerance - Broker failure - Network issue - Disk failure - => replication factor - N > 1, guarantee against N-1 broker failures - per topic - ISR = In Sync Replica