Author : MD TAREQ HASSAN | Updated : 2021/04/24
Prerequisites
In case you want to delete already running kafka and start fresh:
Securing Kafka
- Authorization is used to secure access to Kafka brokers
- To establish secure access to Kafka brokers, you configure and apply:
- A Kafka resource to:
- Create listeners with a specified authentication type
- Configure authorization for the whole Kafka cluster
- A Kafka resource to:
- A KafkaUser resource to access the Kafka brokers securely through the listeners
- You can configure authorization for Kafka brokers using the authorization property in the
Kafka.spec.kafka
resource. The authorization method is defined in the type field - If the authorization property is missing, no authorization is enabled and clients have no restrictions
- Authentication and Authorization:
- Authorization is always configured for the whole Kafka cluster
- Authentication is configured independently for each listener
- Authorization is applied to all enabled listeners
Configure the Kafka resource to set up:
- Listener authentication
- Network policies that restrict access to Kafka listeners
- Kafka authorization
- Super users for unconstrained access to brokers
Creating Kafka User
- A client will access kafka as a user
- When the user is created by the User Operator, it creates a new Secret with the same name as the KafkaUser resource
- The Secret contains a private and public key for TLS client authentication. The public key is contained in a user certificate, which is signed by the client Certificate Authority (CA)
Super User
- Super users can access all resources in your Kafka cluster regardless of any access restrictions, and are supported by all authorization mechanisms
- To designate super users for a Kafka cluster, add a list of user principals to the superUsers property. If a user uses TLS client authentication, their username is the common name from their certificate subject prefixed with
CN=
strimzi-kafka-admin.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: strimzi-kafka-admin
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
# authorization:
# type: simple
# ...
Command: kubectl apply -f strimzi-kafka-admin.yaml
(don’t forget to use correct namespace)
To make the above ‘strimzi-kafka-admin’ as super user
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: kafka
spec:
kafka:
authorization:
type: simple
superUsers:
- CN=strimzi-kafka-admin
# ...
User for external client: producer-consumer-client-user.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: producer-consumer-client-user
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
- resource:
type: topic
name: my-topic
patternType: literal
operation: All
# ...
Command: kubectl apply -f producer-consumer-client-user.yaml
(don’t forget to use correct namespace)
External Listeners
- Use KafkaUser to enable the authentication and authorization mechanisms that a specific client uses to access Kafka
- The Cluster Operator creates the listeners and sets up the cluster and client certificate authority (CA) certificates to enable authentication within the Kafka cluster
kafka-persistent-with-external-access-v1.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: kafka
spec:
kafka:
version: 2.7.0
replicas: 3
authorization:
type: simple
superUsers:
- CN=strimzi-kafka-admin
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: loadbalancer
tls: true
authentication:
type: tls
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: "2.7"
inter.broker.protocol.version: "2.7"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: false
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 10Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
Command: kubectl apply -f kafka-persistent-with-external-access-v1.yaml
Checking Service
kubectl get svc
# should show -> my-cluster-kafka-external-bootstrap LoadBalancer 10.0.72.188 20.x.y.z 9094:30745/TCP 42m
Checking Load Balancer
# az network lb list
az network lb list -g mc_${AKS_RESOURCE_GROUP}_${AKS_CLUSTER_NAME}_${AKS_LOCATION}
Checking Super User
- Azure Portal > AKS
- Kubernetes resources: Configuration > Config Maps > my-cluster-kafka-config
- server.config > Authorization > Check super user (alternatively search super user i.e. “strimzi-kafka-admin”)
More
Extracting Certificates
- Communication to Kafka cluster has to be encrypted (non TLS client connections will be rejected)
- TLS/SSL implicitly implies one way authentication, where the client validates the Kafka broker identity
- In order to do this, client applications need to trust the cluster CA certificate (the cluster CA certificate is stored in a Kubernetes Secret)
- By default, these are auto-generated by Strimzi, but you can provide your own certificates as well (See: https://strimzi.io/docs/operators/master/using.html#kafka-listener-certificates-str)
In Azure portal
- AKS > Kubernetes resources: Configuration > Secrets
- my-cluster-cluster-ca-cert > Click eye symbol:
ca.crt
- strimzi-kafka-admin > Click eye symbol:
user.crt
user.key
Extract using kubectl
command
- Create a folder
C:\kafkacerts
- Open Git bash as Admin > Cd
C:\kafkacerts
- Make sure Git bash have access to AKS cluster
- Set default browser (the browser you use to login to Azure portal)
- Browser > Login to Azure portal
- Git bash >
az login
> default browser will open and be logged into Azure portal - If credentials are not saved into kubeconfig file (did not connect to AKS before, skip if you previously connected to AKS -> in that case only
az login
would be enough)az aks get-credentials --resource-group xyz-rg --name xyz-aks-cluster
- Check current namespace:
kubectl config view --minify | grep namespace
(empty means default namespace) kubectl config set-context --current --namespace xyz
- Execute following commands and check
C:\kafkacerts
to see certs are saved accordingly
#
# Cluster CA certificate and password
#
# kubectl get secret <CLUSTER_NAME>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 --decode > ca.crt
# kubectl get secret <CLUSTER_NAME>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 --decode > ca.password
# Cloudshell > Click '{}' > ca.cert > you can see decoded certificate
# Cloudshell > Click '{}' > ca.password > you can see decoded password
#
kubectl get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 --decode > cluster-ca.crt
kubectl get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 --decode > cluster-ca.password
#
# Kafka Admin User
#
# kubectl get secret <KAFKA_USER_NAME> -o jsonpath='{.data.user\.crt}' | base64 --decode > user.crt
# kubectl get secret <KAFKA_USER_NAME> -o jsonpath='{.data.user\.key}' | base64 --decode > user.key
# Cloudshell > Click '{}' > user.cert > you can see decoded certificate
# Cloudshell > Click '{}' > user.key > you can see decoded key
#
kubectl get secret strimzi-kafka-admin -o jsonpath='{.data.user\.crt}' | base64 --decode > producer-consumer-client-user.crt
kubectl get secret strimzi-kafka-admin -o jsonpath='{.data.user\.key}' | base64 --decode > producer-consumer-client-user.key
# extract others i.e. 'xxx.password', 'xxx.p12' in the same way
Verify
- Azure portal > AKS > Configuration > Secret > secret strimzi-kafka-admin > see the password (Click eye icon)
- Open
C:\kafkacerts\user.password
> check password - Verify that both password are same > indicates that above commands executed successfully
Testing With Kafka Console Clients
Kafka comes with 2 console clients (kafka-console-producer and kafka-console-consumer). For windows:
- Location:
<kafka-folder>\bin\windows
(i.e.C:\kafka\bin\windows
) kafka-console-producer.bat
kafka-console-consumer.bat
Public access (via internet) to kafka cluster
- my-cluster-kafka-external-bootstrap (loadBalancer service)
- PublicIP
- Port:
9094
- Just because kafka is accessible via internet, does not mean you can get into the cluster and perform operations
- SSL/TLS authentication and authorization are enabled
- Need to import certificates in order to console client work (certificates are imported in above sections)
- After importing certificates, console clients (
kafka-console-producer.bat
andkafka-console-consumer.bat
) should be able to access kafka cluster - Console clients will use
client-ssl.properties
as config file (contains necessary server info, ssl info, keystore info etc.)
Get LoadBalancer DNS name (use DNS name over IP because IP might be changed)
# if DNS name is not set -> AKS > external bootstrap > assign DNS name (might be optional and therefore not set by default)
kubectl get service/my-cluster-kafka-external-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].hostname}
Get LoadBalancer Public IP address
# kubectl get service/<kafka-cluster-name>-external-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].ip}
kubectl get service/my-cluster-kafka-external-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].ip}
Importing Certificates for Console Clients
- TSL is enabled, so external clientmust use certificate
- Some Kafka clients (e.g. Confluent Go client) use the CA certificate directly, others (e.g. Java client, Kafka CLI etc.) require access to the CA certificate via a truststore
- You can use the built-in truststore which comes in with a JDK (Java) installation (you can create your own truststore)
Activate Java Keytool
- Environment variable > Add new user variable > “JAVA_HOME” :
C:\Program Files\Java\jdk1.8.0_291
(location might be different depending on your JDK installation) - Edit environment variable > edit path
%JAVA_HOME%\bin
(location might be different depending on your JDK installation) - Check keytool: Git Bash or CMD or Cmder > write ‘keytool’ and enter
Importing strimzi-kafka-admin
user certs to kafka-auth-keystore.jks
#
# Set variables
#
export KAFKA_USER_NAME=strimzi-kafka-admin
export USER_P12_FILE_PATH=user.p12
export USER_KEY_PASSWORD_FILE_PATH=user.password
export KEYSTORE_NAME=kafka-auth-keystore.jks
export KEYSTORE_PASSWORD=changeit
export PASSWORD=`cat $USER_KEY_PASSWORD_FILE_PATH`
#
# Command
#
# Linux: sudo keytool -importkeystore -deststorepass $KEYSTORE_PASSWORD -destkeystore $KEYSTORE_NAME -srckeystore $USER_P12_FILE_PATH -srcstorepass $PASSWORD -srcstoretype PKCS12
# Import command completed: 1 entries successfully imported, 0 entries failed or cancelled
#
keytool -importkeystore -deststorepass $KEYSTORE_PASSWORD -destkeystore $KEYSTORE_NAME -srckeystore $USER_P12_FILE_PATH -srcstorepass $PASSWORD -srcstoretype PKCS12
#
# Check
#
keytool -list -alias $KAFKA_USER_NAME -keystore kafka-auth-keystore.jks
Importing cluster CA cert to Java (default) trustStore
- Java (default) trustStore
- Location :
C:\Program Files\Java\jdk1.8.0_291\jre\lib\security\cacerts
(location might be different depending on your JDK installation) - The default password for JDK truststore is “changeit” (will be prompted for the truststore password)
- Type yes in response to the ‘Trust this certificate?
- Location :
- Certificates are in folder
C:\kafkacerts
(certs are extracted in previous steps) - Open Git bash as Admin > cd “
C:\Program Files\Java\jdk1.8.0_291\jre\lib\security\
” (double quote is needed) - Execute commands below
#
# Set variables
#
export CLUSTER_CERT_FILE_PATH=C:\\kafkacerts\\ca.crt
export CLUSTER_CERT_PASSWORD_FILE_PATH=C:\\kafkacerts\\ca.password
export CLUSTER_CERT_PASSWORD=`cat $CLUSTER_CERT_PASSWORD_FILE_PATH`
export CLUSTER_CERT_ALIAS=my-cluster-cluster-ca-cert
export JAVA_DEFAULT_TRUSTSTORE_NAME=cacerts
#
#
# keytool -import -file (a cert file) -alias (a name) -keypass PASSWORD -keystore (trustStore file name)
#
# Open cmd or Git bash as admin
# cd "C:\Program Files\Java\jdk1.8.0_291\jre\lib\security" (double quote is needed)
#
# Don't forget to set variables (i.e. in case you re-started Git bash)
#
keytool -importcert -alias $CLUSTER_CERT_ALIAS -file $CLUSTER_CERT_FILE_PATH -keypass $CLUSTER_CERT_PASSWORD -keystore $JAVA_DEFAULT_TRUSTSTORE_NAME
# Trust this certificate? [no]: yes
# Certificate was added to keystore
#
# Check cert was added successfully
#
keytool -exportcert -keystore $JAVA_DEFAULT_TRUSTSTORE_NAME -alias $CLUSTER_CERT_ALIAS -list -v # will be promted for password, use "changeit" (default)
Using Console Clients
Create client-ssl.properties
file (C:\kafkacerts\client-ssl.properties
)
bootstrap.servers=<LOADBALANCER_PUBLIC_IP>:9094
security.protocol=SSL
ssl.truststore.location=C:/Program Files/Java/jdk1.8.0_291/jre/lib/security/cacerts
ssl.truststore.password=changeit
ssl.keystore.location=C:/kafkacerts/kafka-auth-keystore.jks
ssl.keystore.password=changeit
ssl.key.password=<contents of user.password file>
Execute following command (if you use Git Bash it might not work) in CMD/Cmder (open as Admin)
#
# Make sure you have 'C:\kafkacerts\client-ssl.properties' with requires configs
#
cd "C:\kafkacerts"
#
# Producer
#
kafka-console-producer.bat --bootstrap-server <LOADBALANCER_PUBLIC_IP>:9094 --topic my-topic --producer.config client-ssl.properties
#
# Consumer
#
kafka-console-consumer.bat --bootstrap-server <LOADBALANCER_PUBLIC_IP>:9094 --topic my-topic --consumer.config client-ssl.properties --from-beginning
Using SSL Certificates For Dotnet Client
- Install confluent nuget package for kafka
- PM:
Install-Package Confluent.Kafka
- https://www.nuget.org/packages/Confluent.Kafka/
- PM:
- Links
Prepare SSL certificates
- Create a folder named “kafkacerts” (
C:\kafkacerts
) - Put decoded certificates in that folder
- Ways to decode certificates
- In case of AKS (Azure portal): go to secrets > click eye icons
- Use cmd/Cmder (
az login
):kubectl get secret <strimzi-kafka-user> -o jsonpath='{.data.user\.crt}' | base64 --decode > producer-consumer-client-user.crt
kubectl get secret <strimzi-kafka-user> -o jsonpath='{.data.user\.key}' | base64 --decode > producer-consumer-client-user.key
C:\kafkacerts
cluster-ca.crt
producer-consumer-client-user.crt
producer-consumer-client-user.key
Install nuget package for json config
Install-Package Microsoft.Extensions.Configuration.Json
appsettings.json
- Create
appsettings.json
file in the root folder (no need for AspNetCore app) - Right click on
appsettings.json
> properties > Copy to Output Directory: Copy if newer
appsettings.json
{
"ApplicationName": "DotNetConsoleClientForStrimziKafka",
"KafkaClientConfig": {
"bootstrap.servers": "127.0.0.1:9094",
"security.protocol": "SSL",
"ssl.ca.location": "C:\\kafkacerts\\cluster-ca.crt",
"ssl.certificate.location": "C:\\kafkacerts\\producer-consumer-client-user.crt",
"ssl.key.location": "C:\\kafkacerts\\producer-consumer-client-user.key"
}
}
Implementation of C# console app
Utils
class KafkaClientConfigKeys
{
//
// Configuration details: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
//
public const string BootstrapServers = "bootstrap.servers";
public const string SecurityProtocal = "security.protocol";
public const string SslCaLocation = "ssl.ca.location"; // server varification and encrypted communication
public const string SslUserCrtLocation = "ssl.certificate.location"; // (client) user certificate (client AuthN & AuthZ)
public const string SslUserKeyLocation = "ssl.key.location"; // (client) user private key (client AuthN & AuthZ)
}
class KafkaSSLUtil
{
public const string DefaultSettingsJsonFile = "appsettings.json";
public const string DefaultConfigurationSectionName = "KafkaClientConfig";
private KafkaSSLUtil()
{
}
public static IDictionary<string, string> GetClientConfigDict(IConfiguration appsettingsConfig = default, string configSectionName = DefaultConfigurationSectionName, Dictionary<string, string> configDict = default)
{
try
{
//
// Configuration from appsettings.json
//
if (appsettingsConfig == null)
{
//
// System.Reflection.Assembly.GetExecutingAssembly().CodeBase
// System.AppContext.BaseDirectory
// Path.GetDirectoryName(Application.ExecutablePath)
//
appsettingsConfig = new ConfigurationBuilder()
.SetBasePath(System.AppContext.BaseDirectory)
.AddJsonFile(DefaultSettingsJsonFile, false, true)
.Build();
}
//
// Get target section (SSL Config)
//
var configSectionAsDict = appsettingsConfig.GetSection(configSectionName).GetChildren().ToDictionary(x => x.Key, x => x.Value);
//
// Settings values
//
if (configDict != null)
{
foreach (var pair in configSectionAsDict)
{
configDict.Add(pair.Key, pair.Value);
}
}
else
{
configDict = configSectionAsDict;
}
}
catch (Exception ex)
{
System.Diagnostics.Debug.WriteLine($"Exception occured while processing section in appsettings.json. Error -> {ex.Message}");
}
return configDict;
}
}
Program.cs
using Confluent.Kafka;
using System;
using System.IO;
using System.Threading.Tasks;
namespace DotNetConsoleClientForStrimziKafka
{
class Program
{
public static async Task Main(string[] args)
{
var topicName = "my-topic";
var configProperties = KafkaSSLUtil.GetClientConfigDict();
var producerConfig = new ProducerConfig(configProperties);
using var producer = new ProducerBuilder<string, string>(producerConfig).Build();
Console.WriteLine("-----------------------------------------------------------------------");
Console.WriteLine($"Producer '{producer.Name}' producing on topic '{topicName}'.\n");
Console.WriteLine("Press 'Ctrl-C' to quit.\n");
Console.WriteLine("To create a kafka message with (UTF-8 encoded) key and value -> key <space> value");
Console.WriteLine("-----------------------------------------------------------------------");
var cancelled = false;
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
while (!cancelled)
{
Console.Write("> ");
string text;
try
{
text = Console.ReadLine();
}
catch (IOException)
{
// IO exception is thrown when ConsoleCancelEventArgs.Cancel == true.
break;
}
if (text == null)
{
// Console returned null before
// the CancelKeyPress was treated
break;
}
string key = null;
string val = text;
// split line if both key and value specified.
int index = text.IndexOf(" ");
if (index != -1)
{
key = text.Substring(0, index);
val = text.Substring(index + 1);
}
try
{
var deliveryReport = await producer.ProduceAsync(topicName, new Message<string, string> { Key = key, Value = val });
Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
}
}
}
}
}