Kafka: From Zero to Topic with Producer and Consumer.

Programmer
3 min readJun 1, 2021

Quick yet gentle introduction in get Kafka running

In this article I am using Kafka 2.8.0 for client and server. Hence one may notice some discrepancy with the use of Zookeeper. This is due to KIP-500 to replace Zookeeper with self-managed quorum.

Introduction

The best way to introduce Kafka is by installing one. Currently I have a way to install a 3-node Kafka cluster using helm for kubernetes . In the future, I shall attempt to add steps for bare-metal and/or for other cloud vendors.

I have the following in config.yaml

replicaCount: 3
nodeSelector: |
node-type: 2-cores

With the above config one can install a 3-node cluster using the following

helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install -f ./config.yaml kafka bitnami/kafka -n ns1

Once successfully installed the following would be the response.

To create a pod that you can use as a Kafka client run the following commands:kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.8.0-debian-10-r27 --namespace ns1 --command -- sleep infinity
kubectl exec --tty -i kafka-client --namespace ns1 -- bash
PRODUCER:
kafka-console-producer.sh \
--broker-list kafka-0.kafka-headless.ns1.svc.cluster.local:9092,kafka-1.kafka-headless.ns1.svc.cluster.local:9092,kafka-2.kafka-headless.ns1.svc.cluster.local:9092 \
--topic test
CONSUMER:
kafka-console-consumer.sh \
--bootstrap-server kafka.ns1.svc.cluster.local:9092 \
--topic test \
--from-beginning

In my case, I use a local copy of kafka to run producer / consumer and/or tweak the kafka topics.

On a side note, an instance of Zookeeper is also available for use with older clients with the following service ports available.

kafka-zookeeper.ns1:2181
kafka-zookeeper.ns1:2888
kafka-zookeeper.ns1:3888

One can download Kafka 2.8.0 from https://kafka.apache.org/quickstart and untar the downloaded archive.

Note, the program requires JRE to be available in classpath.

Commands

Once installed, here are some useful commands to navigate around kafka topics.

View topics

./kafka-topics.sh  --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092 --list

Produce messages

./kafka-console-producer.sh --bootstrap-server kafka-0.kafka-headless.ns1.svc.cluster.local:9092,kafka-1.kafka-headless.ns1.svc.cluster.local:9092,kafka-2.kafka-headless.ns1.svc.cluster.local:9092 --topic test

By default if a message is produced with a non-existent topic, a new topic is created with a single partition.

Consume messages

./kafka-console-consumer.sh  --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092 --topic test --group gp1

Note: --group is optional and if not provided and more than a consumer is started on the same topic with the same group, then the messages get distributed by partitions

Describe topic

./kafka-topics.sh --describe --topic test --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092

would respond

Topic: test     TopicId: PgZj2P-zSj2wPDLS_g6wyQ PartitionCount: 5       ReplicationFactor: 1    Configs: flush.ms=1000,segment.bytes=1073741824,retention.ms=86400000,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: test Partition: 4 Leader: 2 Replicas: 2 Isr: 2

Create topic

One can create / alter a topic explicitly using the following

./kafka-topic.sh --create --topic test --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092
./kafka-topics.sh --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092 --alter --topic test --partitions 5
./kafka-configs.sh --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092 --entity-type topics --entity-name test --alter --add-config retention.ms=86400000

In the above example, I am creating a topic called test .

And then I am modifying the topic to scale to 5 partitions. This way the messages can scale across multiple nodes from the producer stand-point.

I also change the message retention to 1-day (which is defaulted to 7-days). For more configuration changes please refer to https://kafka.apache.org/documentation/#topicconfigs_cleanup.policy

Message Counts

In order to get message counts across partitions for a particular topic,

./kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --broker-list kafka-headless.ns1.svc.cluster.local:9092

with a sample response like below

test:0:15
test:1:5
test:2:4
test:3:1
test:4:2

⛑ Suggestions / Feedback ! 😃

--

--