Author : MD TAREQ HASSAN | Updated : 2021/04/24

Prerequisites

What will happen

Creating Kafka Cluster

Installing Kafka Cluster on Kubernetes Cluster (i.e. AKS cluster) using Strimzi

az login # for local shell, no need for Azure cloud shell

az aks get-credentials --resource-group xyz-rg --name xyz-aks-cluster

# make sure you set the correct namespace
kubectl config view --minify | grep namespace  # check namespace
kubectl config set-context --current --namespace xyz # set namespace


# Apply the `Kafka` Cluster CR file
kubectl apply -f <file-location>
kubectl apply -f https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/main/examples/kafka/kafka-persistent.yaml

Example: kubectl apply -f kafka-persistent.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 2.7.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      log.message.format.version: "2.7"
      inter.broker.protocol.version: "2.7"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        deleteClaim: false
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

Checking Kafka Cluster

# make sure you set the correct namespace
kubectl config view --minify | grep namespace  # check namespace
kubectl config set-context --current --namespace xyz # set namespace


# Kafka cluster
kubectl get kafka


# Nodes
kubectl get nodes


# Pods
kubectl get pods


# ConfigMap
kubectl get configmap


# Services
kubectl get svc


# Storage Class
kubectl get sc


# Persitent Volume Claim
kubectl get pvc


# Persitent Volume
kubectl get pv

Connecting to a Running Pod

kubectl exec -it <pod-name> -- <path/target-script.sh> <params>

# example: connect to 'kafka-console-producer' pod to write topic
# kubectl exec -it kafka-producer -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic

Creating Topic For Testing Purpose

See

Creating and Testing Producer and Consumer

# check kafka cluster
kubectl get kafka

# check topic
kubectl get kafkatopics

# set cluster name if needed
export KAFKA_CLUSTER_NAME=my-cluster

Cloudshell 1 (producer command)

kubectl -n kafka run kafka-producer -it --image=quay.io/strimzi/kafka:0.22.1-kafka-2.7.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic

# Press 'Ctrl + C' to exit

Cloudshell 2 (consumer command)

kubectl -n kafka run kafka-consumer -it --image=quay.io/strimzi/kafka:0.22.1-kafka-2.7.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning

# Press 'Ctrl + C' to exit

Now write some messages in console producer and messages will be shown in console consumer.

Testing Already Running Producer and Consumer

Check console producer and consumer pods are running

kubectl get pod kafka-producer

kubectl get pod kafka-consumer

Producer (in clouldshell 1)

# -i : --stdin
# -t : --tty


# topic already created
kubectl exec -it kafka-producer -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic

# topic does not exist, will be created by producer
kubectl exec -it kafka-producer -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --partitions 3 --replication-factor 3

Consumer (in clouldshell 2)

# -i : --stdin
# -t : --tty

kubectl exec -it kafka-consumer -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning

Press ‘Ctrl + C’ to exit

Creating User

Creating Kafka user uing Kubectl command