Kafka: From Zero to Topic with Producer and Consumer.

3 min readJun 1, 2021


Quick yet gentle introduction in get Kafka running

In this article I am using Kafka 2.8.0 for client and server. Hence one may notice some discrepancy with the use of Zookeeper. This is due to KIP-500 to replace Zookeeper with self-managed quorum.


The best way to introduce Kafka is by installing one. Currently I have a way to install a 3-node Kafka cluster using helm for kubernetes . In the future, I shall attempt to add steps for bare-metal and/or for other cloud vendors.

I have the following in config.yaml

replicaCount: 3
nodeSelector: |
node-type: 2-cores

With the above config one can install a 3-node cluster using the following

helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install -f ./config.yaml kafka bitnami/kafka -n ns1

Once successfully installed the following would be the response.

To create a pod that you can use as a Kafka client run the following commands:kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.8.0-debian-10-r27 --namespace ns1 --command -- sleep infinity
kubectl exec --tty -i kafka-client --namespace ns1 -- bash
kafka-console-producer.sh \
--broker-list kafka-0.kafka-headless.ns1.svc.cluster.local:9092,kafka-1.kafka-headless.ns1.svc.cluster.local:9092,kafka-2.kafka-headless.ns1.svc.cluster.local:9092 \
--topic test
kafka-console-consumer.sh \
--bootstrap-server kafka.ns1.svc.cluster.local:9092 \
--topic test \

In my case, I use a local copy of kafka to run producer / consumer and/or tweak the kafka topics.

On a side note, an instance of Zookeeper is also available for use with older clients with the following service ports available.


One can download Kafka 2.8.0 from https://kafka.apache.org/quickstart and untar the downloaded archive.

Note, the program requires JRE to be available in classpath.


Once installed, here are some useful commands to navigate around kafka topics.

View topics

./kafka-topics.sh  --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092 --list

Produce messages

./kafka-console-producer.sh --bootstrap-server kafka-0.kafka-headless.ns1.svc.cluster.local:9092,kafka-1.kafka-headless.ns1.svc.cluster.local:9092,kafka-2.kafka-headless.ns1.svc.cluster.local:9092 --topic test

By default if a message is produced with a non-existent topic, a new topic is created with a single partition.

Consume messages

./kafka-console-consumer.sh  --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092 --topic test --group gp1

Note: --group is optional and if not provided and more than a consumer is started on the same topic with the same group, then the messages get distributed by partitions

Describe topic

./kafka-topics.sh --describe --topic test --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092

would respond

Topic: test     TopicId: PgZj2P-zSj2wPDLS_g6wyQ PartitionCount: 5       ReplicationFactor: 1    Configs: flush.ms=1000,segment.bytes=1073741824,retention.ms=86400000,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: test Partition: 4 Leader: 2 Replicas: 2 Isr: 2

Create topic

One can create / alter a topic explicitly using the following

./kafka-topic.sh --create --topic test --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092
./kafka-topics.sh --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092 --alter --topic test --partitions 5
./kafka-configs.sh --bootstrap-server kafka-headless.ns1.svc.cluster.local:9092 --entity-type topics --entity-name test --alter --add-config retention.ms=86400000

In the above example, I am creating a topic called test .

And then I am modifying the topic to scale to 5 partitions. This way the messages can scale across multiple nodes from the producer stand-point.

I also change the message retention to 1-day (which is defaulted to 7-days). For more configuration changes please refer to https://kafka.apache.org/documentation/#topicconfigs_cleanup.policy

Message Counts

In order to get message counts across partitions for a particular topic,

./kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --broker-list kafka-headless.ns1.svc.cluster.local:9092

with a sample response like below


