Author : MD TAREQ HASSAN | Updated : 2021/04/24

Prerequisites

In case you want to delete already running kafka and start fresh:

Securing Kafka

Configure the Kafka resource to set up:

Creating Kafka User

Super User

strimzi-kafka-admin.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: strimzi-kafka-admin
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
#  authorization:
#    type: simple
# ...

Command: kubectl apply -f strimzi-kafka-admin.yaml (don’t forget to use correct namespace)

To make the above ‘strimzi-kafka-admin’ as super user

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  kafka:
    authorization:
      type: simple
      superUsers:
        - CN=strimzi-kafka-admin
    # ...

User for external client: producer-consumer-client-user.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: producer-consumer-client-user
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operation: All
  # ...

Command: kubectl apply -f producer-consumer-client-user.yaml (don’t forget to use correct namespace)

External Listeners

kafka-persistent-with-external-access-v1.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  kafka:
    version: 2.7.0
    replicas: 3
    authorization:
      type: simple
      superUsers:
        - CN=strimzi-kafka-admin
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
      - name: external
        port: 9094
        type: loadbalancer
        tls: true
        authentication:
          type: tls
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      log.message.format.version: "2.7"
      inter.broker.protocol.version: "2.7"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        deleteClaim: false
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

Command: kubectl apply -f kafka-persistent-with-external-access-v1.yaml

Checking Service

kubectl get svc

# should show -> my-cluster-kafka-external-bootstrap   LoadBalancer   10.0.72.188    20.x.y.z   9094:30745/TCP               42m

Checking Load Balancer

# az network lb list
az network lb list -g mc_${AKS_RESOURCE_GROUP}_${AKS_CLUSTER_NAME}_${AKS_LOCATION}

Checking Super User

More

Extracting Certificates

In Azure portal

Extract using kubectl command

# 
# Cluster CA certificate and password
#
# kubectl get secret <CLUSTER_NAME>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 --decode > ca.crt
# kubectl get secret <CLUSTER_NAME>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 --decode > ca.password
# Cloudshell > Click '{}' > ca.cert > you can see decoded certificate
# Cloudshell > Click '{}' > ca.password > you can see decoded password
#
kubectl get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 --decode > cluster-ca.crt
kubectl get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 --decode > cluster-ca.password


# 
# Kafka Admin User
#
# kubectl get secret  <KAFKA_USER_NAME> -o jsonpath='{.data.user\.crt}' | base64 --decode > user.crt
# kubectl get secret <KAFKA_USER_NAME> -o jsonpath='{.data.user\.key}' | base64 --decode > user.key
# Cloudshell > Click '{}' > user.cert > you can see decoded certificate
# Cloudshell > Click '{}' > user.key > you can see decoded key
#
kubectl get secret strimzi-kafka-admin -o jsonpath='{.data.user\.crt}' | base64 --decode > producer-consumer-client-user.crt
kubectl get secret strimzi-kafka-admin -o jsonpath='{.data.user\.key}' | base64 --decode > producer-consumer-client-user.key

# extract others i.e. 'xxx.password', 'xxx.p12' in the same way

Verify

Testing With Kafka Console Clients

Kafka comes with 2 console clients (kafka-console-producer and kafka-console-consumer). For windows:

Public access (via internet) to kafka cluster

Get LoadBalancer DNS name (use DNS name over IP because IP might be changed)

# if DNS name is not set -> AKS > external bootstrap > assign DNS name (might be optional and therefore not set by default)
kubectl get service/my-cluster-kafka-external-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].hostname}

Get LoadBalancer Public IP address

# kubectl get service/<kafka-cluster-name>-external-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].ip}
kubectl get service/my-cluster-kafka-external-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].ip}

Importing Certificates for Console Clients

Activate Java Keytool

Importing strimzi-kafka-admin user certs to kafka-auth-keystore.jks

#
# Set variables
#
export KAFKA_USER_NAME=strimzi-kafka-admin
export USER_P12_FILE_PATH=user.p12
export USER_KEY_PASSWORD_FILE_PATH=user.password
export KEYSTORE_NAME=kafka-auth-keystore.jks
export KEYSTORE_PASSWORD=changeit
export PASSWORD=`cat $USER_KEY_PASSWORD_FILE_PATH`


#
# Command
#
# Linux: sudo keytool -importkeystore -deststorepass $KEYSTORE_PASSWORD -destkeystore $KEYSTORE_NAME -srckeystore $USER_P12_FILE_PATH -srcstorepass $PASSWORD -srcstoretype PKCS12
# Import command completed:  1 entries successfully imported, 0 entries failed or cancelled
#
keytool -importkeystore -deststorepass $KEYSTORE_PASSWORD -destkeystore $KEYSTORE_NAME -srckeystore $USER_P12_FILE_PATH -srcstorepass $PASSWORD -srcstoretype PKCS12


#
# Check
#
keytool -list -alias $KAFKA_USER_NAME -keystore kafka-auth-keystore.jks

Importing cluster CA cert to Java (default) trustStore

#
# Set variables
#
export CLUSTER_CERT_FILE_PATH=C:\\kafkacerts\\ca.crt
export CLUSTER_CERT_PASSWORD_FILE_PATH=C:\\kafkacerts\\ca.password
export CLUSTER_CERT_PASSWORD=`cat $CLUSTER_CERT_PASSWORD_FILE_PATH`
export CLUSTER_CERT_ALIAS=my-cluster-cluster-ca-cert
export JAVA_DEFAULT_TRUSTSTORE_NAME=cacerts

#
# 
# keytool -import -file (a cert file) -alias (a name) -keypass PASSWORD -keystore (trustStore file name)
#
# Open cmd or Git bash as admin
# cd "C:\Program Files\Java\jdk1.8.0_291\jre\lib\security" (double quote is needed)
# 
# Don't forget to set variables (i.e. in case you re-started Git bash)
#
keytool -importcert -alias $CLUSTER_CERT_ALIAS -file $CLUSTER_CERT_FILE_PATH  -keypass $CLUSTER_CERT_PASSWORD -keystore $JAVA_DEFAULT_TRUSTSTORE_NAME
# Trust this certificate? [no]:  yes
# Certificate was added to keystore


#
# Check cert was added successfully
#
keytool -exportcert -keystore $JAVA_DEFAULT_TRUSTSTORE_NAME -alias $CLUSTER_CERT_ALIAS -list -v # will be promted for password, use "changeit" (default)

Using Console Clients

Create client-ssl.properties file (C:\kafkacerts\client-ssl.properties)

bootstrap.servers=<LOADBALANCER_PUBLIC_IP>:9094
security.protocol=SSL
ssl.truststore.location=C:/Program Files/Java/jdk1.8.0_291/jre/lib/security/cacerts
ssl.truststore.password=changeit
ssl.keystore.location=C:/kafkacerts/kafka-auth-keystore.jks
ssl.keystore.password=changeit
ssl.key.password=<contents of user.password file>

Execute following command (if you use Git Bash it might not work) in CMD/Cmder (open as Admin)

#
# Make sure you have 'C:\kafkacerts\client-ssl.properties' with requires configs
#
cd "C:\kafkacerts"

#
# Producer
#
kafka-console-producer.bat --bootstrap-server <LOADBALANCER_PUBLIC_IP>:9094 --topic my-topic --producer.config client-ssl.properties

#
# Consumer
#
kafka-console-consumer.bat --bootstrap-server <LOADBALANCER_PUBLIC_IP>:9094 --topic my-topic --consumer.config client-ssl.properties --from-beginning

Using SSL Certificates For Dotnet Client

Prepare SSL certificates

C:\kafkacerts
  cluster-ca.crt
  producer-consumer-client-user.crt
  producer-consumer-client-user.key

Install nuget package for json config

Install-Package Microsoft.Extensions.Configuration.Json

appsettings.json

appsettings.json

{
  "ApplicationName": "DotNetConsoleClientForStrimziKafka",
  "KafkaClientConfig": {
    "bootstrap.servers": "127.0.0.1:9094",
    "security.protocol": "SSL",
    "ssl.ca.location": "C:\\kafkacerts\\cluster-ca.crt",
    "ssl.certificate.location": "C:\\kafkacerts\\producer-consumer-client-user.crt",
    "ssl.key.location": "C:\\kafkacerts\\producer-consumer-client-user.key"
  }
}

Implementation of C# console app

Utils

class KafkaClientConfigKeys
{
	//
	// Configuration details: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
	//
	public const string BootstrapServers = "bootstrap.servers";
	public const string SecurityProtocal = "security.protocol";
	public const string SslCaLocation = "ssl.ca.location"; // server varification and encrypted communication
	public const string SslUserCrtLocation = "ssl.certificate.location"; // (client) user certificate (client AuthN & AuthZ)
	public const string SslUserKeyLocation = "ssl.key.location"; // (client) user private key (client AuthN & AuthZ)
}

class KafkaSSLUtil
{
	public const string DefaultSettingsJsonFile = "appsettings.json";
	public const string DefaultConfigurationSectionName = "KafkaClientConfig";

	private KafkaSSLUtil()
	{

	}

	public static IDictionary<string, string> GetClientConfigDict(IConfiguration appsettingsConfig = default, string configSectionName = DefaultConfigurationSectionName, Dictionary<string, string> configDict = default)
	{

		try
		{
			//
			// Configuration from appsettings.json
			//
			if (appsettingsConfig == null)
			{
				//
				// System.Reflection.Assembly.GetExecutingAssembly().CodeBase
				// System.AppContext.BaseDirectory
				// Path.GetDirectoryName(Application.ExecutablePath)
				//
				appsettingsConfig = new ConfigurationBuilder()
					.SetBasePath(System.AppContext.BaseDirectory)
					.AddJsonFile(DefaultSettingsJsonFile, false, true)
					.Build();
			}

			//
			// Get target section (SSL Config)
			//
			var configSectionAsDict = appsettingsConfig.GetSection(configSectionName).GetChildren().ToDictionary(x => x.Key, x => x.Value);

			//
			// Settings values
			//
			if (configDict != null)
			{
				foreach (var pair in configSectionAsDict)
				{
					configDict.Add(pair.Key, pair.Value);
				}
			}
			else
			{
				configDict = configSectionAsDict;
			}
		}
		catch (Exception ex)
		{

			System.Diagnostics.Debug.WriteLine($"Exception occured while processing section in appsettings.json. Error -> {ex.Message}");
		}

		return configDict;
	}
}

Program.cs

using Confluent.Kafka;
using System;
using System.IO;
using System.Threading.Tasks;


namespace DotNetConsoleClientForStrimziKafka
{
    class Program
    {

        public static async Task Main(string[] args)
        {
            var topicName = "my-topic";

            var configProperties = KafkaSSLUtil.GetClientConfigDict();

            var producerConfig = new ProducerConfig(configProperties);

            using var producer = new ProducerBuilder<string, string>(producerConfig).Build();

            Console.WriteLine("-----------------------------------------------------------------------");
            Console.WriteLine($"Producer '{producer.Name}' producing on topic '{topicName}'.\n");
            Console.WriteLine("Press 'Ctrl-C' to quit.\n");
            Console.WriteLine("To create a kafka message with (UTF-8 encoded) key and value -> key <space> value");
            Console.WriteLine("-----------------------------------------------------------------------");


            var cancelled = false;
            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true; // prevent the process from terminating.
                cancelled = true;
            };

            while (!cancelled)
            {
                Console.Write("> ");

                string text;
                try
                {
                    text = Console.ReadLine();
                }
                catch (IOException)
                {
                    // IO exception is thrown when ConsoleCancelEventArgs.Cancel == true.
                    break;
                }
                if (text == null)
                {
                    // Console returned null before 
                    // the CancelKeyPress was treated
                    break;
                }

                string key = null;
                string val = text;

                // split line if both key and value specified.
                int index = text.IndexOf(" ");
                if (index != -1)
                {
                    key = text.Substring(0, index);
                    val = text.Substring(index + 1);
                }

                try
                {
                    var deliveryReport = await producer.ProduceAsync(topicName, new Message<string, string> { Key = key, Value = val });

                    Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
                }
                catch (ProduceException<string, string> e)
                {
                    Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
                }
            }
        }
    }
}