In summary: Kafka is pub/sub messaging rethought as a distributed commit log
{timestamp}
set upon reception by the broker{id}
unique[data content]
binary{timestamp}
+ {id}
form its placement in the sequence of messages received within a topic{id}
is referenceable by consumers.{id}
Creating a topic:
kafka-topics --create --topic my_topic -zookeeper localhost:2181 --replication-factor 1 --partitions 1
in Kafka logs (/usr/local/var/log/kafka
):
[2021-04-01 15:34:42,097] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(my_topic-0) (kafka.server.ReplicaFetcherManager)
[2021-04-01 15:34:42,216] INFO [Log partition=my_topic-0, dir=/usr/local/var/lib/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-04-01 15:34:42,226] INFO Created log for partition my_topic-0 in /usr/local/var/lib/kafka-logs/my_topic-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-01 15:34:42,227] INFO [Partition my_topic-0 broker=0] No checkpointed highwatermark is found for partition my_topic-0 (kafka.cluster.Partition)
[2021-04-01 15:34:42,228] INFO [Partition my_topic-0 broker=0] Log loaded for partition my_topic-0 with initial high watermark 0 (kafka.cluster.Partition)
-
- the actual physical logs are in `/usr/local/var/lig/kakfa-logs`:
$ ls -l /usr/local/var/lig/kakfa-logs
-rw-r--r-- 1 fgm admin 0 1 avr 15:26 cleaner-offset-checkpoint
-rw-r--r-- 1 fgm admin 4 1 avr 15:37 log-start-offset-checkpoint
-rw-r--r-- 1 fgm admin 89 1 avr 15:26 meta.properties
drwxr-xr-x 6 fgm admin 192 1 avr 15:34 my_topic-0
-rw-r--r-- 1 fgm admin 17 1 avr 15:37 recovery-point-offset-checkpoint
-rw-r--r-- 1 fgm admin 17 1 avr 15:37 replication-offset-checkpoint
$ ls -l /usr/local/var/lig/kakfa-logs/my_topic-0
-rw-r--r-- 1 fgm admin 10485760 1 avr 15:34 00000000000000000000.index
-rw-r--r-- 1 fgm admin 0 1 avr 15:34 00000000000000000000.log
-rw-r--r-- 1 fgm admin 10485756 1 avr 15:34 00000000000000000000.timeindex
-rw-r--r-- 1 fgm admin 8 1 avr 15:34 leader-epoch-checkpoint
listing topics:
$ kafka-topics -zookeeper localhost:2181 --list
my_topic
start a producer on my_topic
:
kafka-console-producer --broker-list localhost:9092 --topic my_topic`
>MyMessage 1
consume from my_topic
.
kafka-console-consumer --bootstrap-server localhost:9092 --topic my_topic --from-beginning
# Tutorial says:
kafka-console-consumer --zookeper localhost:2181 --topic my_topic --from-beginning
# but that option is not recognized in homebrew kafka
# Using an explicit offset, needs a partition id:
kafka-console-consumer --bootstrap-server localhost:9092 --topic my_topic --partition 0 --offset 2
examine topic:
$ kafka-topics --zookeeper localhost:2181 --describe --topic my_topic
Topic: my_topic PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
kafka-topics --create \
--topic my_topic \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
group.id