[フレーム]
BT

InfoQ Software Architects' Newsletter

A monthly overview of things you need to know as an architect or aspiring architect.

View an example

We protect your privacy.

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Unlock the full InfoQ experience

Unlock the full InfoQ experience by logging in! Stay updated with your favorite authors and topics, engage with content, and download exclusive resources.

Log In
or

Don't have an InfoQ account?

Register
  • Stay updated on topics and peers that matter to youReceive instant alerts on the latest insights and trends.
  • Quickly access free resources for continuous learningMinibooks, videos with transcripts, and training materials.
  • Save articles and read at anytimeBookmark articles to read whenever youre ready.

Topics

Choose your language

InfoQ Homepage Articles Securing a Kafka Cluster in Kubernetes Using Strimzi

Securing a Kafka Cluster in Kubernetes Using Strimzi

Dec 30, 2022 26 min read

reviewed by

Write for InfoQ

Feed your curiosity. Help 550k+ global
senior developers
each month stay ahead.
Get in touch

Key Takeaways

  • Strimzi simplifies the deployment of a Kafka cluster to a Kubernetes cluster.
  • Strimzi configuration lets you secure Kafka communications and provide user/topic RBAC management in a declarative way.
  • Debezium Server provides attributes to connect to a secured Kafka cluster.
  • Debezium Embedded can be used as Strimzi creates Kubernetes Secrets with the required credentials that any Kubernetes deployment can read.
  • By default, Kubernetes doesn’t encrypt secrets, which you need to configure to protect them against attacks.

In part 3 of this series, we learned about dual writes problems and how to solve them using Change Data Capture patterns, specifically using Debezium to read changes done in the database (through transaction log) and populating these changes to a Kafka topic.

In part 4 of this series, we moved the example one step forward, running the application from the local development machine to Kubernetes (production environment). We rely on Strimzi to deploy and configure Kafka and Debezium into the Kafka cluster.

    Related Sponsors

But overall, we missed one important thing not covered at that time to make things simpler yet very important; this is security.

  • How to secure the MySQL instance without having the username/password directly hardcoded in the deployment file.
  • How to add authn in the Kafka cluster using Strimzi.
  • How to configure Debezium to authenticate against Kafka and MySQL instances securely.

In this article, we’ll answer all these questions by taking the application developed in the previous article (using the Debezium Server approach) and securing it.

Kubernetes

We need a Kubernetes cluster with Strimzi installed. We explained this in part 4 of this series; if you are reusing it, you first need to delete the application, the MySQL database, the Kafka cluster, and the Debezium instance.

IMPORTANT: You only need to run the following steps IF you still have the cluster from part 4. If you already deleted it, continue reading after the section that describes how to delete the cluster.

Run the following commands in a terminal window to delete them:

kubectl delete deployment movie-plays-producer-debezium-server -n kafka
kubectl delete service movie-plays-producer-debezium-server -n kafka
kubectl delete -f mysql-deployment.yaml -n kafka
kubectl delete -f debezium-kafka-connector.yaml -n kafka
kubectl delete -f debezium-kafka-connect.yaml -n kafka
kubectl delete -f kafka.yaml -n kafka

IMPORTANT: You only need to run the following step if you don’t have a Kuberntes cluster.

If you have already destroyed the cluster, follow the quick instructions to create a new one. In a terminal window, run these commands:

minikube start -p strimzi --kubernetes-version='v1.22.12' --vm-driver='virtualbox' --memory=12096 --cpus=3
kubectl create namespace kafka
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

Validate the operator installation by running the following command:

kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
strimzi-cluster-operator-597d67c7d6-ms987 1/1 Running 0 4m27s

Wait until the operator is running and ready.

At this point, we can start installing all the components with authentication and authorization instead of anonymous access.

MySQL

In the previous article, we deployed the MySQL instance, hardcoding the username/password in the deployment file as an environment variable:

env:
 - name: MYSQL_ROOT_PASSWORD
 value: alex
 - name: MYSQL_DATABASE
 value: moviesdb
 - name: MYSQL_USER
 value: alex
 - name: MYSQL_PASSWORD
 value: alex

Let’s create a Kubernetes Secret to store these sensitive data. Data in a Kubernetes secrets file must be encoded in base64 format. The alex string encoded in base64 is YWxleA==.

To generate this value, run the following command:

echo -n 'alex' | base64
YWxleA==

Create the mysql-secret.yaml file with the secrets set:

apiVersion: v1
kind: Secret
metadata:
 name: mysqlsecret
type: Opaque
data:
 mysqlrootpassword: YWxleA==
 mysqluser: YWxleA==
 mysqlpassword: YWxleA==

And apply it to the cluster:

kubectl apply -f mysql-secret.yaml -n kafka

Then update the MySQL deployment file to read the values from the secret created in the previous step using the secretKeyRef field in the value section:

apiVersion: v1
kind: Service
metadata:
 name: mysql
 labels:
 app: mysql
spec:
 ports:
 - port: 3306
 selector:
 app: mysql
 clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
 name: mysql
 labels:
 app: mysql
spec:
 selector:
 matchLabels:
 app: mysql
 strategy:
 type: Recreate
 template:
 metadata:
 labels:
 app: mysql
 spec:
 containers:
 - image: mysql:8.0.30
 name: mysql
 env:
 - name: MYSQL_ROOT_PASSWORD
 valueFrom:
 secretKeyRef:
 key: mysqlrootpassword
 name: mysqlsecret
 - name: MYSQL_DATABASE
 value: moviesdb
 - name: MYSQL_USER
 valueFrom:
 secretKeyRef:
 key: mysqluser
 name: mysqlsecret
 - name: MYSQL_PASSWORD
 valueFrom:
 secretKeyRef:
 key: mysqlpassword
 name: mysqlsecret
 ports:
 - containerPort: 3306
 name: mysql

In a secretKeyRef section, we specify the secret name where secrets are stored; in this case, we named mysqlsecret in the mysql-secret.yaml file.

Deploy the MySQL instance into the Kubernetes cluster:

kubectl apply -f mysql-deployment.yaml -n kafka

We can validate secrets that are injected correctly by exporting the environment variables. First, let’s get the Pod name:

kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
mysql-7888f99967-4cj47 1/1 Running 0 90s

And then running export command inside the container by running the following commands in a terminal window::

kubectl exec -n kafka -ti mysql-7888f99967-4cj47 /bin/bash
bash-4.4# export
declare -x GOSU_VERSION="1.14"
declare -x HOME="/root"
declare -x HOSTNAME="mysql-7888f99967-4cj47"
declare -x KUBERNETES_PORT="tcp://10.96.0.1:443"
declare -x KUBERNETES_PORT_443_TCP="tcp://10.96.0.1:443"
declare -x KUBERNETES_PORT_443_TCP_ADDR="10.96.0.1"
declare -x KUBERNETES_PORT_443_TCP_PORT="443"
declare -x KUBERNETES_PORT_443_TCP_PROTO="tcp"
declare -x KUBERNETES_SERVICE_HOST="10.96.0.1"
declare -x KUBERNETES_SERVICE_PORT="443"
declare -x KUBERNETES_SERVICE_PORT_HTTPS="443"
declare -x MYSQL_DATABASE="moviesdb"
declare -x MYSQL_MAJOR="8.0"
declare -x MYSQL_PASSWORD="alex"
declare -x MYSQL_ROOT_PASSWORD="alex"
declare -x MYSQL_SHELL_VERSION="8.0.30-1.el8"
declare -x MYSQL_USER="alex"
declare -x MYSQL_VERSION="8.0.30-1.el8"
declare -x OLDPWD
declare -x PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
declare -x PWD="/"
declare -x SHLVL="1"
declare -x TERM="xterm"

Now you can exit the container:

exit

The MySQL database credentials are now configured using a Kubernetes Secret, which is much better than setting them in the deployment file. The other part to update is the application as it now needs to read the credentials from the Secret instead of having them statically set in the configuration file.

Movie Plays Producer Debezium

The database username and password are hardcoded in the application.properties file. It would be better if the application could be configured automatically with username and password set in the Kubernetes Secret when deployed to Kubernetes.

One way to do this could be by injecting the secrets as environment variables as we did in the MySQL deployment into the application Pod. For example, in the case of the password, the env part of the deployment file would be:

- name: MYSQL_PASSWORD
 valueFrom:
 secretKeyRef:
 key: mysqlpassword
 name: mysqlsecret

Now update the application.properties file to set the password value from the environment variable:

%prod.quarkus.datasource.password=${mysql-password}

This works, but storing secrets as environment variables isn’t the most secure way to do it, as they can easily be hacked by anyone listing the environment variables.

Quarkus includes the kubernetes-config extension that allows the application to read Kubernetes ConfigMaps and Secrets directly from the Kubernetes API server. This way, secrets are securely transmitted from the Kubernetes cluster to the application memory without any middle step like materializing them as environment variables or mounting them as volumes.

Kubernetes Config Extension

The first thing to do is register the kubernetes-config extension. Open the pom.xml file and add the following dependency:

<dependency>
 <groupId>io.quarkus</groupId>
 <artifactId>quarkus-kubernetes-config</artifactId>
</dependency>

Then, enable the application to read Kubernetes Secrets directly from the Kubernetes API, and set the name of the Secret (in our case mysqlsecret) to read.

Open the src/main/resources/application.properties file and append the following lines:

%prod.quarkus.kubernetes-config.secrets.enabled=true 
quarkus.kubernetes-config.secrets=mysqlsecret

Then update the quarkus.datasource.username and quarkus.datasource.password properties to read their values from the keys, mysqluser and mysqlpassword, from the mysqlsecret Secret.

In the application.properties file, update these properties accordingly:

%prod.quarkus.datasource.username=${mysqluser}
%prod.quarkus.datasource.password=${mysqlpassword}

Both values are assigned with the value obtained from the key set in the mysqlsecret Secret.

Since reading Kubernetes Secrets involves interacting with the Kubernetes API Server, when RBAC (roll-based access control) is enabled on the cluster, the ServiceAccount used to run the application must have the proper permissions for such access.

Because we registered the Kubernetes extension in the previous article, all the necessary Kubernetes resources to make that happen are automatically generated, so we don’t need to do anything.

Let’s deploy the application running the following command in a terminal window:

./mvnw clean package -DskipTests -Dquarkus.kubernetes.deploy=true
...
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Deploying to kubernetes server: https://192.168.59.104:8443/ in namespace: kafka.
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Applied: Service movie-plays-producer-debezium-server.
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Applied: Deployment movie-plays-producer-debezium-server.
[INFO] [io.quarkus.deployment.QuarkusAugmentor] Quarkus augmentation completed in 9537ms

To validate the correctness of the deployment, inspect the Pod’s log so no error is shown and SQL statements are executed correctly:

kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
movie-plays-producer-debezium-server-auth-7cc69fb56c-nc8tx 1/1 Running 0 44s
kubectl logs movie-plays-producer-debezium-server-auth-7cc69fb56c-nc8tx -n kafka
__ ____ __ _____ ___ __ ____ ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2022年08月21日 21:00:41,277 INFO [io.deb.out.qua.int.AdditionalJaxbMappingProducerImpl] (main) Contributed XML mapping for entity: io.debezium.outbox.quarkus.internal.OutboxEvent
...
Hibernate:
 create table Movie (
 id bigint not null,
 director varchar(255),
 genre varchar(255),
 name varchar(255),
 primary key (id)
 ) engine=InnoDB
Hibernate:
 create table OutboxEvent (
 id binary(255) not null,
 aggregatetype varchar(255) not null,
 aggregateid varchar(255) not null,
 type varchar(255) not null,
 timestamp datetime(6) not null,
 payload varchar(8000),
 tracingspancontext varchar(256),
 primary key (id)
 ) engine=InnoDB

In the following illustration, you can see the part we correctly secured.



Now that the application is running, with the MySQL credentials correctly managed, let’s move on to secure Kafka and Debezium parts.

Kafka

So far, we’ve deployed an open Kafka cluster; no authentication or authorization logic was enabled.

Strimzi allows to deploy of a Kafka cluster with the following authentication mechanisms:

  • SASL SCRAM-SHA-512
  • TLS client authentication
  • OAuth 2.0 token-based authentication

Since the Strimzi Operator is already installed on the Kubernetes cluster, we can use the Kafka custom resource. Kafka resource configures a cluster deployment, and in this case, with TLS client authentication enabled.

Strimzi has options to set a listener in the listeners block to use mTLS as communication protocol (tls=true) and authentication method type (authentication field).

Create a new file named kafka.yaml with the following content to configure a secured Kafka:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
 name: my-cluster
 namespace: kafka
spec:
 kafka:
 version: 3.2.0
 replicas: 1
 listeners:
 - name: demo
 port: 9092
 type: internal
 tls: false
 - name: secure
 port: 9093
 type: internal
 tls: true
 authentication:
 type: tls
 authorization:
 type: simple
 config:
 offsets.topic.replication.factor: 1
 transaction.state.log.replication.factor: 1
 transaction.state.log.min.isr: 1
 default.replication.factor: 1
 min.insync.replicas: 1
 inter.broker.protocol.version: "3.2"
 storage:
 type: ephemeral
 zookeeper:
 replicas: 1
 storage:
 type: ephemeral
 entityOperator:
 topicOperator: {}
 userOperator: {}

And apply it to the Kubernetes cluster:

kubectl apply -f kafka.yaml -n kafka
kafka.kafka.strimzi.io/my-cluster created

Let’s validate the Kafka cluster is up and running:

kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGE
my-cluster-entity-operator-d4db5ff58-rt96n 3/3 Running 0 2m26s
my-cluster-kafka-0 1/1 Running 0 2m58s
my-cluster-zookeeper-0 1/1 Running 0 3m31s

Since we set the listener to use TLS, Strimzi has automatically created a Kubernetes Secret with the cluster certificate, pkcs12 truststore, and associated password as data.

kubectl get secrets -n kafka
my-cluster-clients-ca Opaque 1 9m14s
my-cluster-clients-ca-cert Opaque 3 9m14s
my-cluster-cluster-ca Opaque 1 9m14s
my-cluster-cluster-ca-cert Opaque 3 9m14s
my-cluster-cluster-operator-certs Opaque 4 9m14s
my-cluster-entity-operator-dockercfg-5wwb5 kubernetes.io/dockercfg 1 8m9s
my-cluster-entity-operator-token-h9xkq kubernetes.io/service-account-token 4 8m9s
my-cluster-entity-operator-token-npvfc kubernetes.io/service-account-token 4 8m9s
my-cluster-entity-topic-operator-certs Opaque 4 8m9s
my-cluster-entity-user-operator-certs Opaque 4 8m8s
my-cluster-kafka-brokers Opaque 4 8m41s
my-cluster-kafka-dockercfg-fgpx2 kubernetes.io/dockercfg 1 8m41s
my-cluster-kafka-token-2x7s8 kubernetes.io/service-account-token 4 8m41s
my-cluster-kafka-token-6qdgk kubernetes.io/service-account-token 4 8m41s
my-cluster-zookeeper-dockercfg-p296g kubernetes.io/dockercfg 1 9m13s
my-cluster-zookeeper-nodes Opaque 4 9m13s
my-cluster-zookeeper-token-dp9sc kubernetes.io/service-account-token 4 9m13s
my-cluster-zookeeper-token-gbrxg kubernetes.io/service-account-token 4 9m13s

The important secret here is the one named <clustername>-cluster-ca-cert (in this case my-cluster-cluster-ca-cert).

List the content of the secret by running the following command in a terminal window:

kubectl get secret my-cluster-cluster-ca-cert -o yaml -n kafka
apiVersion: v1
data:
 ca.crt: LS0tLS1CRUdJTiBDRVJU
 ca.p12: MIIGkwIBAzCCBk==
 ca.password: azJjY2tIMEs1c091
kind: Secret
metadata:
 annotations:
 strimzi.io/ca-cert-generation: "0"
 creationTimestamp: "2022-08-21T19:32:55Z"
 labels:
 app.kubernetes.io/instance: my-cluster
 app.kubernetes.io/managed-by: strimzi-cluster-operator
 app.kubernetes.io/name: strimzi
 app.kubernetes.io/part-of: strimzi-my-cluster
 strimzi.io/cluster: my-cluster
 strimzi.io/kind: Kafka
 strimzi.io/name: strimzi
 name: my-cluster-cluster-ca-cert
 namespace: kafka
 ownerReferences:
 - apiVersion: kafka.strimzi.io/v1beta2
 blockOwnerDeletion: false
 controller: false
 kind: Kafka
 name: my-cluster
 uid: 23c84dfb-bb33-47ed-bd41-b4e87e0a4c3a
 resourceVersion: "49424"
 uid: 6c2679a8-216f-421b-880a-de0e6a0879fa
type: Opaque

Let’s create a user assigned to the mTLS authorization.

Security and Debezium

With Kafka secured, let’s create a KafkaUser resource to set the authorization roles to the groups and topics authenticated for users using the mTLS mode.

Create a new file named kafka-user-connect-all-topics.yaml with the following content:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
 name: my-connect
 namespace: kafka
 labels:
 # Cluster name set previously
 strimzi.io/cluster: my-cluster
spec:
 authentication:
 type: tls
 authorization:
 type: simple
 acls:
 # Kafka Connects internal topics used to store configuration, offsets or status
 - resource:
 type: group
 name: outbox-viewer
 operation: Read
 - resource:
 type: group
 name: outbox-viewer
 operation: Describe
 - resource:
 type: group
 name: mysql-dbhistory
 operation: Read
 - resource:
 type: group
 name: mysql-dbhistory
 operation: Describe
 - resource:
 type: group
 name: connect-cluster
 operation: Read
 - resource:
 type: group
 name: connect-cluster
 operation: Describe
 - resource:
 type: topic
 name: connect-cluster-configs
 operation: Read
 - resource:
 type: topic
 name: connect-cluster-configs
 operation: Describe
 - resource:
 type: topic
 name: connect-cluster-configs
 operation: Write
 - resource:
 type: topic
 name: connect-cluster-configs
 operation: Create
 - resource:
 type: topic
 name: connect-cluster-status
 operation: Read
 - resource:
 type: topic
 name: connect-cluster-status
 operation: Describe
 - resource:
 type: topic
 name: connect-cluster-status
 operation: Write
 - resource:
 type: topic
 name: connect-cluster-status
 operation: Create
 - resource:
 type: topic
 name: connect-cluster-offsets
 operation: Read
 - resource:
 type: topic
 name: connect-cluster-offsets
 operation: Write
 - resource:
 type: topic
 name: connect-cluster-offsets
 operation: Describe
 - resource:
 type: topic
 name: connect-cluster-offsets
 operation: Create
 - resource:
 type: group
 name: connect-cluster
 operation: Read
 # Debezium topics
 - resource:
 type: topic
 name: "*"
 operation: Read
 - resource:
 type: topic
 name: "*"
 operation: Describe
 - resource:
 type: topic
 name: "*"
 operation: Write
 - resource:
 type: topic
 name: "*"
 operation: Create

Apply the resource in a terminal window:

kubectl apply -f kafka-user-connect-all-topics.yaml -n kafka
kafkauser.kafka.strimzi.io/my-connect created

After registering this Kafka user, Strimzi creates a new secret with the same name as the KafkaUser resource (my-connect) with the pkcs12 keystore holding the client's private key and the password to access it.

kubectl get secret my-connect -n kafka -o yaml
apiVersion: v1
data:
 ca.crt: LS0tLS1CK
 user.crt: LS0tLS1CRUdJTiB==
 user.key: LS0tLS1CRUdJTiBQUklWQVRK
 user.p12: MIILNAIBAzCAA==
 user.password: UUR4Nk5NemsxUVFF
kind: Secret
metadata:
 creationTimestamp: "2022-08-21T20:12:44Z"
 labels:
 app.kubernetes.io/instance: my-connect
 app.kubernetes.io/managed-by: strimzi-user-operator
 app.kubernetes.io/name: strimzi-user-operator
 app.kubernetes.io/part-of: strimzi-my-connect
 strimzi.io/cluster: my-cluster
 strimzi.io/kind: KafkaUser
 name: my-connect
 namespace: kafka
 ownerReferences:
 - apiVersion: kafka.strimzi.io/v1beta2
 blockOwnerDeletion: false
 controller: false
 kind: KafkaUser
 name: my-connect
 uid: 882447cc-7759-4884-9d2f-f57f8be92711
 resourceVersion: "60439"
 uid: 9313676f-3417-42d8-b3fb-a1b1fe1b3a39
type: Opaque

So now, we’ve got a new Kafka user with required permissions to use the required Kafka topics.

Before deploying the Debezium Kafka Connector, let’s permit the Kafka Connector object to read MySQL secrets directly from the mysqlsecret Secret object using the Kubernetes API (like we did in the application) so the Connector can authenticate the database to read the transaction log.

Create the kafka-role-binding.yaml file with the following content:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
 name: connector-configuration-role
 namespace: kafka
rules:
- apiGroups: [""]
 resources: ["secrets"]
 resourceNames: ["mysqlsecret", "my-connect", "my-cluster-cluster-ca-cert"]
 verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
 name: connector-configuration-role-binding
 namespace: kafka
subjects:
- kind: ServiceAccount
 name: debezium-connect-cluster-connect
 namespace: kafka
roleRef:
 kind: Role
 name: connector-configuration-role
 apiGroup: rbac.authorization.k8s.io

Notice that the name under the subjects block is the service account running the Debezium Kafka Connect Pod. We’ve not deployed the Pod yet, but when deploying a Kafka Connect component, the service account created follows the format $KafkaConnectName-connect. Since the Debezium Kafka Connect will be named debezium-connect-cluster-connect, the service account created will be my-connect-connect, and we give permissions to this account to read Kubernetes Secrets directly.

Apply the kafka-role-binding.yaml before deploying the Debezium Kafka Connect:

kubectl apply -f kafka-role-binding.yaml -n kafka
role.rbac.authorization.k8s.io/connector-configuration-role created
rolebinding.rbac.authorization.k8s.io/connector-configuration-role-binding created

The following illustration summarizes the current secured communications:

To deploy the Debezium Kafka Connect, we’ll use the KafkaConnect object again provided by Strimzi but with some changes to authenticate against the Kafka cluster and enable reading configuration parameters from Kubernetes Secrets (the main purpose is to read MySQL credentials to authenticate from Debezium).

The following fields are configured:

  • The port is now 9093.
  • mTLS certificate is set to communicate with the cluster (tls field).
  • The certificate and key user are set to authenticate against the cluster (authentication field).
  • config.providers is set to read configuration from Kubernetes Secrets in the MySQL Connector.
  • The externalConfiguration section is used to materialize the truststores and keystores from secrets to a file. They are materialized in the /opt/kafka/external-configuration/<secretName> directory. They are accessed by MySQL connector.

Create the kafka-connect.yaml file as shown in the following listing:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
 name: debezium-connect-cluster
 namespace: kafka
 annotations:
 strimzi.io/use-connector-resources: "true"
spec:
 version: 3.2.0
 image: quay.io/lordofthejars/debezium-connector-mysql:1.9.4
 replicas: 1
 bootstrapServers: my-cluster-kafka-bootstrap:9093
 logging:
 type: inline
 loggers:
 connect.root.logger.level: "INFO"
 tls:
 trustedCertificates:
 - secretName: my-cluster-cluster-ca-cert
 certificate: ca.crt
 authentication:
 type: tls
 certificateAndKey:
 secretName: my-connect
 certificate: user.crt
 key: user.key
 config:
 config.providers: secrets
 config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
 group.id: connect-cluster
 offset.storage.topic: connect-cluster-offsets
 offset.storage.replication.factor: 1
 config.storage.topic: connect-cluster-configs
 config.storage.replication.factor: 1
 status.storage.topic: connect-cluster-status
 status.storage.replication.factor: 1
 externalConfiguration:
 volumes:
 - name: cluster-ca
 secret:
 secretName: my-cluster-cluster-ca-cert
 - name: my-user
 secret:
 secretName: my-connect

The trustedCertificates are set from the secret created when the Kafka cluster was deployed using the Kafka object.

The certificateAndKey, under the authentication block, is set from the secret created when KafkaUser was registered.

Deploy the resource and validate it’s correctly deployed and configured:

kubectl apply -f kafka-connect.yaml -n kafka
kafkaconnect.kafka.strimzi.io/debezium-connect-cluster created

Create a new file named debezium-kafka-connector.yaml configuring Debezium to register the MySQL connector to access the transaction log of MySQL instance. In this case, we are not using plain text username and password in the connector configuration but referring to the Secret object we previously created with MySQL credentials. The format to access to the Secret is secrets:<namespace>/<secretname>:<key>. Moreover, it reads the trust stores and key stores materialized when you applied the KafkaConnect definition.


apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
 name: debezium-connector-mysql
 namespace: kafka
 labels:
 strimzi.io/cluster: debezium-connect-cluster
spec:
 class: io.debezium.connector.mysql.MySqlConnector
 tasksMax: 1
 config:
 group.id: connect-cluster
 tasks.max: 1
 database.hostname: mysql
 database.port: 3306
 database.user: root
 database.password: ${secrets:kafka/mysqlsecret:mysqlpassword}
 database.server.id: 184054
 database.server.name: mysql
 database.include.list: moviesdb
 database.allowPublicKeyRetrieval: true
 table.include.list: moviesdb.OutboxEvent
 database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9093
 database.history.kafka.topic: schema-changes.movies
 database.history.producer.security.protocol: SSL
 database.history.producer.ssl.keystore.type: PKCS12
 database.history.producer.ssl.keystore.location: /opt/kafka/external-configuration/my-user/user.p12
 database.history.producer.ssl.keystore.password: ${secrets:kafka/my-connect:user.password}
 database.history.producer.ssl.truststore.type: PKCS12
 database.history.producer.ssl.truststore.location: /opt/kafka/external-configuration/cluster-ca/ca.p12
 database.history.producer.ssl.truststore.password: ${secrets:kafka/my-cluster-cluster-ca-cert:ca.password}
 database.history.consumer.security.protocol: SSL
 database.history.consumer.ssl.keystore.type: PKCS12
 database.history.consumer.ssl.keystore.location: /opt/kafka/external-configuration/my-user/user.p12
 database.history.consumer.ssl.keystore.password: ${secrets:kafka/my-connect:user.password}
 database.history.consumer.ssl.truststore.type: PKCS12
 database.history.consumer.ssl.truststore.location: /opt/kafka/external-configuration/cluster-ca/ca.p12
 database.history.consumer.ssl.truststore.password: ${secrets:kafka/my-cluster-cluster-ca-cert:ca.password}

Apply the file to register the MySQL connector running the following command in a terminal window:

kubectl apply -f kafka-connector.yaml -n kafka
kafkaconnector.kafka.strimzi.io/debezium-connector-mysql created

Finally, all the communications are secured.

Demo

And that’s all, now we’ve got the same example shown in the previous article, but now it’s secured.

To test it, there is a Quarkus application named outbox-viewer that prints all the content of the OutboxEvent topic to the console. Apply the following YAML file to deploy it:

---
apiVersion: v1
kind: ServiceAccount
metadata:
 annotations:
 app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca
 app.quarkus.io/build-timestamp: 2022年08月23日 - 11:14:36 +0000
 labels:
 app.kubernetes.io/name: outbox-viewer
 app.kubernetes.io/version: 1.0.0-SNAPSHOT
 name: outbox-viewer
 namespace: kafka
---
apiVersion: v1
kind: Service
metadata:
 annotations:
 app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca
 app.quarkus.io/build-timestamp: 2022年08月23日 - 11:14:36 +0000
 labels:
 app.kubernetes.io/name: outbox-viewer
 app.kubernetes.io/version: 1.0.0-SNAPSHOT
 name: outbox-viewer
 namespace: kafka
spec:
 ports:
 - name: http
 port: 80
 targetPort: 8080
 selector:
 app.kubernetes.io/name: outbox-viewer
 app.kubernetes.io/version: 1.0.0-SNAPSHOT
 type: ClusterIP
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
 name: view-secrets
 namespace: kafka
rules:
 - apiGroups:
 - ""
 resources:
 - secrets
 verbs:
 - get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
 name: outbox-viewer-view
 namespace: kafka
roleRef:
 kind: ClusterRole
 apiGroup: rbac.authorization.k8s.io
 name: view
subjects:
 - kind: ServiceAccount
 name: outbox-viewer
 namespace: kafka
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
 name: outbox-viewer-view-secrets
 namespace: kafka
roleRef:
 kind: Role
 apiGroup: rbac.authorization.k8s.io
 name: view-secrets
subjects:
 - kind: ServiceAccount
 name: outbox-viewer
 namespace: kafka
---
apiVersion: apps/v1
kind: Deployment
metadata:
 annotations:
 app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca
 app.quarkus.io/build-timestamp: 2022年08月23日 - 11:14:36 +0000
 labels:
 app.kubernetes.io/name: outbox-viewer
 app.kubernetes.io/version: 1.0.0-SNAPSHOT
 name: outbox-viewer
 namespace: kafka
spec:
 replicas: 1
 selector:
 matchLabels:
 app.kubernetes.io/name: outbox-viewer
 app.kubernetes.io/version: 1.0.0-SNAPSHOT
 template:
 metadata:
 annotations:
 app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca
 app.quarkus.io/build-timestamp: 2022年08月23日 - 11:14:36 +0000
 labels:
 app.kubernetes.io/name: outbox-viewer
 app.kubernetes.io/version: 1.0.0-SNAPSHOT
 namespace: kafka
 spec:
 containers:
 - env:
 - name: KUBERNETES_NAMESPACE
 valueFrom:
 fieldRef:
 fieldPath: metadata.namespace
 image: quay.io/lordofthejars/outbox-viewer:1.0.0-SNAPSHOT
 imagePullPolicy: Always
 name: outbox-viewer
 ports:
 - containerPort: 8080
 name: http
 protocol: TCP
 volumeMounts:
 - mountPath: /home/jboss/cluster
 name: cluster-volume
 readOnly: false
 - mountPath: /home/jboss/user
 name: user-volume
 readOnly: false
 serviceAccountName: outbox-viewer
 volumes:
 - name: cluster-volume
 secret:
 optional: false
 secretName: my-cluster-cluster-ca-cert
 - name: user-volume
 secret:
 optional: false
 secretName: my-connect

Then in one terminal window, follow the logs of the application’s Pod.

kubectl logs outbox-viewer-684969f9f6-7snng -f

Substitute the Pod name with your Pod.

Find the IP and port of the Movie Player Producer application running the following commands in a terminal:

minikube ip -p strimzi
192.168.59.106

Gets the exposed port of the movie-plays-producer-debezium, which is the second port (in bold in the following snippet).

kubectl get services -n kafka
movie-plays-producer-debezium LoadBalancer 10.100.117.203 <pending> 80:32460/TCP 67m

The, send a curl request to the Movie Plays Producer application:

curl -X 'POST' \
 'http://192.168.59.106:32460/movie' \
 -H 'accept: application/json' \
 -H 'Content-Type: application/json' \
 -d '{
 "name": "Minions: The Rise of Gru",
 "director": "Kyle Balda",
 "genre": "Animation"
}'

Adapt the IP and port to your case.

Finally, inspect the output of the outbox-viewer Pod to see the transmission of the data from the database to Kafka using the Debezium Server approach.

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"bytes","optional":false,"field"
...
,"aggregatetype":"Movie","aggregateid":"1","type":"MovieCreated","timestamp":1661339188708005,"payload":"{\"id\":1,\"name\":\"Minions: The Rise of Gru\",\"director\":\"Kyle Balda\",\"genre\":\"Animation\"}","tracingspancontext":null},"source":{"version":"1.9.4.Final","connector":"mysql","name":"mysql","ts_ms":1661339188000,"snapshot":"false","db":"moviesdb","sequence":null,"table":"OutboxEvent","server_id":1,"gtid":null,"file":"binlog.000002","pos":2967,"row":0,"thread":15,"query":null},"op":"c","ts_ms":1661339188768,"transaction":null}}

Debezium Embedded

So far, we’ve secured the interactions between the Application and the MySQL database, Debezium Server and MySQL, Debezium Server and Kafka.

But you might wonder, what happens if instead of using Debezium Server, I am using Debezium Embedded deployed within the Quarkus application? How can I configure the Kafka connection to be secured using the mTLS method?

Quarkus offers two ways to connect to Kafka, using the Kafka client or the Reactive Message client; let’s see the properties required in both cases to authenticate to a Kafka cluster using the mTLS authentication method.

KeyStore and TrustStore

To configure mTLS on the client side, four elements are required:

  • Cluster TrustStore to make the mTLS connection
  • TrustStore password
  • Kafka User KeyStore to authenticate
  • KeyStore password

The first two elements are stored in the my-cluster-cluster-ca-cert Kubernetes Secret created before when we applied the Strimzi resources. To get them, run the following commands in a terminal window:

kubectl get secret my-cluster-cluster-ca-cert -n kafka -o jsonpath='{.data.ca\.p12}' | base64 -d > mtls-cluster-ca.p12

And the password:

kubectl get secret my-cluster-cluster-ca-cert -n kafka -o jsonpath='{.data.ca\.password}' | base64 -d
k2cckH0K5sOu

The later elements are stored in the my-connect Kubernetes Secret. To get them, run the following commands in a terminal window:

kubectl get secret my-connect -n kafka -o jsonpath='{.data.user\.p12}' | base64 -d > mtls-user.p12

And the password:

kubectl get secret my-connect -n kafka -o jsonpath='{.data.user\.password}' | base64 -d
QDx6NMzk1QQE

Now, set the Quarkus Kafka configuration properties to authenticate to the Kafka cluster using the previous credentials:

%prod.kafka.ssl.truststore.location=mtls-cluster-ca.p12
%prod.kafka.ssl.truststore.password=k2cckH0K5sOu
%prod.kafka.ssl.truststore.type=PKCS12
%prod.kafka.ssl.keystore.location=mtls-user.p12
%prod.kafka.ssl.keystore.password=QDx6NMzk1QQE
%prod.kafka.ssl.keystore.type=PKCS12
%prod.kafka.security.protocol=SSL
%prod.mp.messaging.incoming.movies.ssl.truststore.location=mtls-cluster-ca.p12
%prod.mp.messaging.incoming.movies.ssl.truststore.password=k2cckH0K5sOu
%prod.mp.messaging.incoming.movies.ssl.truststore.type=PKCS12
%prod.mp.messaging.incoming.movies.ssl.keystore.location=mtls-user.p12
%prod.mp.messaging.incoming.movies.ssl.keystore.password=QDx6NMzk1QQE
%prod.mp.messaging.incoming.movies.ssl.keystore.type=PKCS12
%prod.mp.messaging.incoming.movies.security.protocol=SSL

We could use the Quarkus Kubernetes Config extension as done with MySQL credentials to inject the credentials directly, but for the sake of simplification, we did it in this way.

But in terms of security, there is still one missing important point: how do we correctly store secrets inside a YAML file, and how do we keep secrets at rest securely inside a Kubernetes cluster?

Encryption of Secrets

At the beginning of this article, we created a Kubernetes Secret object with the MySQL credentials, but it was a YAML file with sensitive information encoded in Base64 format, so that is not very secure at all. Probably this YAML file will end up in a Git repository making the secrets available to anyone with access to the repo. In the following section, we’ll fix this.

Sealed Secrets

Sealed Secrets is a Kubernetes controller permitting to encrypt Kubernetes Secrets resources at the client side (local machine) and decrypting them inside the Kubernetes cluster when applied.

There are two components to start using the Sealed Secrets project. The first one is the kubeseal CLI tool to encrypt secrets.

To install kubeseal, download the package depending on your operative system from the following link.

The second one is the kubeseal Kubernetes controller. To install it, run the following command on the command line:

kubectl apply -f https://github.com/bitnami-labs/sealed-secrets/releases/download/v0.18.1/controller.yaml -n kube-system
role.rbac.authorization.k8s.io/sealed-secrets-service-proxier created
clusterrole.rbac.authorization.k8s.io/secrets-unsealer created
deployment.apps/sealed-secrets-controller created
customresourcedefinition.apiextensions.k8s.io/sealedsecrets.bitnami.com created
service/sealed-secrets-controller created
role.rbac.authorization.k8s.io/sealed-secrets-key-admin created
clusterrolebinding.rbac.authorization.k8s.io/sealed-secrets-controller created
serviceaccount/sealed-secrets-controller created
rolebinding.rbac.authorization.k8s.io/sealed-secrets-service-proxier created
rolebinding.rbac.authorization.k8s.io/sealed-secrets-controller created

Inspect that controller has correctly deployed and running by running the following command:

kubectl get pods -n kube-system
sealed-secrets-controller-554d94cb68-xr6mw 1/1 Running 0 8m46s

After that, we can take the mysql-secret.yaml file and use kubeseal tool to automatically create a new Kubernetes resource of the kind SealedSecret with the data field encrypted.

kubeseal -n kube -o yaml <mysql-secret.yaml > mysql-secret-encrypted.yaml

The new file, named mysql-secret-encrypted.yaml, is of kind SealedSecret with the value of each key encrypted:

apiVersion: bitnami.com/v1alpha1
kind: SealedSecret
metadata:
 creationTimestamp: null
 name: mysqlsecret
 namespace: kube
spec:
 encryptedData:
 mysqlpassword: AgBl721mnowwPlC35FfO26zP0
 mysqlrootpassword: AgAKl1tWV8hahn00yGS4ucs
 mysqluser: AgCWrWFl1/LcS
template:
 data: null
 metadata:
 creationTimestamp: null
 name: mysqlsecret
 namespace: kafka
 type: Opaque

At this point, you can safely remove the mysql-secret.yaml file as it’s not required anymore.

Apply the encrypted resource as any other Kubernetes resource file, and the Sealed Secrets Kubernetes controller will decrypt and store it correctly inside Kubernetes as a normal Secret.

You can validate the Secret by running the following command:

kubectl get secret mysqlsecret -n kafka -o yaml
apiVersion: v1
data:
 mysqlpassword: YWxleA==
 mysqlrootpassword: YWxleA==
 mysqluser: YWxleA==
kind: Secret
metadata:
 creationTimestamp: "2022-08-21T19:05:21Z"
 name: mysqlsecret
 namespace: kafka
 ownerReferences:
 - apiVersion: bitnami.com/v1alpha1
 controller: true
 kind: SealedSecret
 name: mysqlsecret
 uid: 2a5ee74b-c2b2-49b3-9a9f-877e7a77b163
 resourceVersion: "41514"
 uid: 494cbe8b-7480-4ebd-9cc5-6fe396795eaa
type: Opaque

It’s important to note that it’s a decrypted Kubernetes Secret having a reference to the SealedSecret responsible for its creation. In this way, the lifecycle of the SealedSecret is tight to the Secret too.

We’ve fixed the problem of storing the YAML file correctly without revealing sensitive data, but when the Secret is applied to the Kubernetes cluster, it’s stored in Base64 encoding format, so it's not secret.

Secrets at Rest

By default, Kubernetes doesn’t store secrets encrypted at rest in the etcd database. Encrypting Secret Data at Rest is a huge topic that would deserve its own post (in fact there is a book Kubernetes Secret Management dedicated to this topic). Every Kubernetes implementation might have different ways to enable encryption of secrets at rest, although at the very end, it’s a configuration file (EncryptionConfiguration ), copied inside every kube-apiserver node.

This file is the form of:

apiVersion: apiserver.config.k8s.io/v1
kind: EncryptionConfiguration
resources:
 - resources:
 - secrets
 providers:
 - identity: {}
 - aesgcm:
 keys:
 - name: key1
 secret: c2VjcmV0IGlzIHNlY3VyZQ==
 - name: key2
 secret: dGhpcyBpcyBwYXNzd29yZA==
 - aescbc:
 keys:
 - name: key1
 secret: c2VjcmV0IGlzIHNlY3VyZQ==
 - name: key2
 secret: dGhpcyBpcyBwYXNzd29yZA==
 - secretbox:
 keys:
 - name: key1
 secret: YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY=

In the following image, we can see the flow of a secret when an EncryptionConfiguration file is registered in the kube-apiserver.

Now, secrets are encrypted at the YAML file thanks to the SealedSecrets object and also protected when at rest using the EncryptionConfiguration file.

Conclusions

Securing all the infrastructure is important, and we’ve learned in this article how to secure the access to the database and to Kafka using Kubernetes Secrets.

With Strimzi we are able to define not only the authentication part, but also the authorization part providing some rules on who can do what regarding Kafka topics.

Accessing these secrets is also an important part, and Quarkus and Debezium let you access those secrets in an efficient yet secured way, without persisting the secret in the filesystem (or as environment variable) but injecting them directly into memory.

Security is an important topic, and Strimzi is the perfect match when it’s time to manage it in a Kafka cluster.

The source code is available on GitHub.

About the Author

Alex Soto

Show moreShow less

Rate this Article

Adoption
Style

Related Content

The InfoQ Newsletter

A round-up of last week’s content on InfoQ sent out every Tuesday. Join a community of over 250,000 senior developers. View an example

We protect your privacy.

BT

AltStyle によって変換されたページ (->オリジナル) /