Kafka: From Zero to Topic with Producer and Consumer.
Quick yet gentle introduction in get Kafka running
In this article I am using Kafka 2.8.0 for client and server. Hence one may notice some discrepancy with the use of Zookeeper. This is due to KIP-500
to replace Zookeeper with self-managed quorum.
Introduction
The best way to introduce Kafka is by installing one. Currently I have a way to install a 3-node Kafka cluster using helm
for kubernetes
. In the future, I shall attempt to add steps for bare-metal and/or for other cloud vendors.
I have the following in config.yaml
replicaCount: 3
nodeSelector: |
node-type: 2-cores
With the above config one can install a 3-node cluster using the following
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install -f ./config.yaml kafka bitnami/kafka -n ns1
Once successfully installed the following would be the response.
To create a pod that you can use as a Kafka client run the following commands:kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.8.0-debian-10-r27 --namespace ns1 --command -- sleep infinity
kubectl exec --tty -i kafka-client --namespace ns1 -- bashPRODUCER:
kafka-console-producer.sh \--broker-list kafka-0.kafka-headless.ns1.svc.cluster.local:9092,kafka-1.kafka-headless.ns1.svc.cluster.local:9092,kafka-2.kafka-headless.ns1.svc.cluster.local:9092 \
--topic testCONSUMER:
kafka-console-consumer.sh \--bootstrap-server kafka.ns1.svc.cluster.local:9092 \
--topic test \
--from-beginning
In my case, I use a local copy of kafka to run producer / consumer and/or tweak the kafka topics.
On a side note, an instance of Zookeeper is also available for use with older clients with the following service ports available.
kafka-zookeeper.ns1:2181
kafka-zookeeper.ns1:2888
kafka-zookeeper.ns1:3888
One can download Kafka 2.8.0 from https://kafka.apache.org/quickstart and untar the downloaded archive.
Note, the program requires JRE to be available in classpath.
Commands
Once installed, here are some useful commands to navigate around kafka topics.
View topics
./kafka-topics.sh --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092 --list
Produce messages
./kafka-console-producer.sh --bootstrap-server kafka-0.kafka-headless.ns1.svc.cluster.local:9092,kafka-1.kafka-headless.ns1.svc.cluster.local:9092,kafka-2.kafka-headless.ns1.svc.cluster.local:9092 --topic test
By default if a message is produced with a non-existent topic, a new topic is created with a single partition.
Consume messages
./kafka-console-consumer.sh --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092 --topic test --group gp1
Note: --group
is optional and if not provided and more than a consumer is started on the same topic with the same group
, then the messages get distributed by partitions
Describe topic
./kafka-topics.sh --describe --topic test --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092
would respond
Topic: test TopicId: PgZj2P-zSj2wPDLS_g6wyQ PartitionCount: 5 ReplicationFactor: 1 Configs: flush.ms=1000,segment.bytes=1073741824,retention.ms=86400000,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: test Partition: 4 Leader: 2 Replicas: 2 Isr: 2
Create topic
One can create / alter a topic explicitly using the following
./kafka-topic.sh --create --topic test --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092
./kafka-topics.sh --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092 --alter --topic test --partitions 5
./kafka-configs.sh --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092 --entity-type topics --entity-name test --alter --add-config retention.ms=86400000
In the above example, I am creating a topic called test
.
And then I am modifying the topic to scale to 5 partitions. This way the messages can scale across multiple nodes from the producer stand-point.
I also change the message retention to 1-day (which is defaulted to 7-days). For more configuration changes please refer to https://kafka.apache.org/documentation/#topicconfigs_cleanup.policy
Message Counts
In order to get message counts across partitions for a particular topic,
./kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --broker-list kafka-headless.ns1.svc.cluster.local:9092
with a sample response like below
test:0:15
test:1:5
test:2:4
test:3:1
test:4:2
⛑ Suggestions / Feedback ! 😃