Running an MQTT Kafka broker
Running an MQTT Kafka broker
In this guide, you create Kafka topics and use Zilla to mediate MQTT broker messages onto those topics.
Specifically, you will:
Verify prerequisites to run this guide.
Install and run Zilla with Kafka or use your own.
Create topics for the MQTT broker messages.
Watch Kafka for new messages on the topics.
Pub & Sub with an MQTT client.
Route messages to different Kafka topics.
Tl;Dr
Download and run the Zilla zilla-examples/mqtt.kafka.broker example using this install script. It will start Zilla and everything you need for this guide.
wget -qO- https://raw.githubusercontent.com/aklivity/zilla-examples/main/startup.sh | sh -s -- mqtt.kafka.broker
Note
Alternatively, download mqtt.kafka.broker or the startup.sh script yourself.
Prerequisites
Before proceeding, you should have Compose or optionally Helm and Kubernetes installed.
Detailed prerequisites
- A connection to the internet
- Docker version 1.13.0+ or later is installed and running
- Docker Desktop or Docker Desktop for Windows on WSL 2
- Container host resources: 1 CPU, 1GB memory
Optional:
- Kafka 3.0+ hosted with the Docker network allowed to communicate
- Helm 3.0+
- Kubernetes 1.13.0+
Check the Kafka topics
Run the docker command under the Verify the Kafka topics created
section of the script output. Verify these topics are listed. Read more on the data in these topics in the overview.
mqtt-messages
mqtt-retained
mqtt-sessions
mqtt-devices
Listen for messages
Run the docker command under the Start a topic consumer to listen for messages
section of the script output. If you didn't use your own Kafka, you can also see all the topics in the Kafka UI.
Send a greeting
Using eclipse-mosquitto subscribe to the zilla
topic.
docker run -it --rm eclipse-mosquitto \
mosquitto_sub -V 'mqttv5' --topic 'zilla' \
--host 'host.docker.internal' --port 7183 --debug
In a separate session, publish a message on the zilla
topic.
docker run -it --rm eclipse-mosquitto \
mosquitto_pub -V 'mqttv5' --topic 'zilla' --message 'Hello, world' \
--host 'host.docker.internal' --port 7183 --debug --insecure
Send messages with the retained flag.
docker run -it --rm eclipse-mosquitto \
mosquitto_pub -V 'mqttv5' --topic 'zilla' --message 'Hello, retained' --retain \
--host 'host.docker.internal' --port 7183 --debug --insecure
Then restart the mosquitto_sub
above. The latest retained message is delivered, and the other messages are not.
Message routing
Send a message from a device and a sensor.
docker run -it --rm eclipse-mosquitto \
mosquitto_pub -V 'mqttv5' --topic 'place/01/device/01' --message 'I am device01' \
--host 'host.docker.internal' --port 7183 --debug --insecure
docker run -it --rm eclipse-mosquitto \
mosquitto_pub -V 'mqttv5' --topic 'place/01/sensor/01' --message 'I am sensor01' \
--host 'host.docker.internal' --port 7183 --debug --insecure
You can check the Kafka UI and see that device01's message was delivered to the mqtt-devices
topic while sensor01's message is on the mqtt-messages
topic.
Creating this example yourself
Start a Kafka instance
You can use your own Kafka or set up a local Kafka with kafka.broker and follow the setup instructions in the README.md
.
Export these environment variables or overwrite them with your remote Kafka if you skipped the local setup.
export KAFKA_HOST=host.docker.internal
export KAFKA_PORT=9092
Bootstrap Kafka
Create these topics in the Kafka environment.
\
/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic mqtt-sessions
/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic mqtt-messages --config cleanup.policy=compact
/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic mqtt-retained --config cleanup.policy=compact
Create your config
Create a new file called zilla.yaml
and append the below yaml to it.
Entrypoint
This will configure Zilla for accepting all of the mqtt
traffic. The tcp binding defines the ports Zilla will accept traffic for both MQTT and WebSocket connections.
name: zilla-mqtt-kafka-broker
bindings:
# Proxy service entrypoint
north_tcp_server:
type: tcp
kind: server
options:
host: 0.0.0.0
port:
- 7114
- 7183
routes:
- when:
- port: 7114
exit: north_http_server
- when:
- port: 7183
exit: north_mqtt_server
A ws binding is added to handle any MQTT over WebSocket using the mqtt
protocol. The mqtt binding then handles all of the MQTT message traffic that needs to go to Kafka.
# WebSocket server
north_http_server:
type: http
kind: server
routes:
- when:
- headers:
:scheme: http
:authority: localhost:7114
upgrade: websocket
exit: north_ws_server
north_ws_server:
type: ws
kind: server
routes:
- when:
- protocol: mqtt
exit: north_mqtt_server
# Shared MQTT server
north_mqtt_server:
type: mqtt
kind: server
exit: north_mqtt_kafka_mapping
Service definition
The service definition defines how the clients using this service will interact with Kafka through Zilla. The required set of Kafka topics are defined in the options.topics where Zilla manages any MQTT required features. A client identity can be determined by pulling the identifier out of the topic using the options.clients property.
# MQTT messages to Kafka topics
north_mqtt_kafka_mapping:
type: mqtt-kafka
kind: proxy
options:
topics:
sessions: mqtt-sessions
messages: mqtt-messages
retained: mqtt-retained
clients:
- place/{identity}/#
routes:
- when:
- publish:
- topic: place/+/device/#
- topic: device/#
- subscribe:
- topic: place/+/device/#
- topic: device/#
with:
messages: mqtt-devices
exit: north_kafka_cache_client
exit: north_kafka_cache_client
Additionally, a route is defined to capture any "device" messages and route them to a specific topic called mqtt-devices
. Here Zilla enables routing different topic patterns into one Kafka topic using MQTT supported wildcards. All other messages will use the default exit
and end up in the mqtt-messages
topic.
routes:
- when:
- publish:
- topic: place/+/device/#
- topic: device/#
- subscribe:
- topic: place/+/device/#
- topic: device/#
with:
messages: mqtt-devices
exit: north_kafka_cache_client
Add a Kafka sync layer
The Zilla cache_client and cache_server helps manage the smooth data transfer between the service definition and Kafka. It is important to bootstrap the topics that will be brokering MQTT messages.
# Kafka sync layer
north_kafka_cache_client:
type: kafka
kind: cache_client
exit: south_kafka_cache_server
south_kafka_cache_server:
type: kafka
kind: cache_server
options:
bootstrap:
- mqtt-messages
- mqtt-retained
- mqtt-devices
exit: south_kafka_client
Point to a Running Kafka instance
This will define the location and connection for Zilla to communicate with Kafka.
# Connect to Kafka
south_kafka_client:
type: kafka
kind: client
exit: south_tcp_client
south_tcp_client:
type: tcp
kind: client
options:
host: ${{env.KAFKA_HOST}}
port: ${{env.KAFKA_PORT}}
routes:
- when:
- cidr: 0.0.0.0/0
Full zilla.yaml
name: zilla-mqtt-kafka-broker
bindings:
# Proxy service entrypoint
north_tcp_server:
type: tcp
kind: server
options:
host: 0.0.0.0
port:
- 7114
- 7183
routes:
- when:
- port: 7114
exit: north_http_server
- when:
- port: 7183
exit: north_mqtt_server
# WebSocket server
north_http_server:
type: http
kind: server
routes:
- when:
- headers:
:scheme: http
:authority: localhost:7114
upgrade: websocket
exit: north_ws_server
north_ws_server:
type: ws
kind: server
routes:
- when:
- protocol: mqtt
exit: north_mqtt_server
# Shared MQTT server
north_mqtt_server:
type: mqtt
kind: server
exit: north_mqtt_kafka_mapping
# MQTT messages to Kafka topics
north_mqtt_kafka_mapping:
type: mqtt-kafka
kind: proxy
options:
topics:
sessions: mqtt-sessions
messages: mqtt-messages
retained: mqtt-retained
clients:
- place/{identity}/#
routes:
- when:
- publish:
- topic: place/+/device/#
- topic: device/#
- subscribe:
- topic: place/+/device/#
- topic: device/#
with:
messages: mqtt-devices
exit: north_kafka_cache_client
exit: north_kafka_cache_client
# Kafka sync layer
north_kafka_cache_client:
type: kafka
kind: cache_client
exit: south_kafka_cache_server
south_kafka_cache_server:
type: kafka
kind: cache_server
options:
bootstrap:
- mqtt-messages
- mqtt-retained
- mqtt-devices
exit: south_kafka_client
# Connect to Kafka
south_kafka_client:
type: kafka
kind: client
exit: south_tcp_client
south_tcp_client:
type: tcp
kind: client
options:
host: ${{env.KAFKA_HOST}}
port: ${{env.KAFKA_PORT}}
routes:
- when:
- cidr: 0.0.0.0/0
Start Zilla
With your zilla.yaml
config, follow the Zilla install instructions using your method of choice. Set the necessary Kafka environment variables.
--env KAFKA_HOST="host.docker.internal" --env KAFKA_PORT="9092"
extraEnv:
- name: KAFKA_HOST
value: "kafka.zilla-kafka-broker.svc.cluster.local"
- name: KAFKA_PORT
value: "9092"
Adding TLS
You can add TLS to this broker by adding a vault and tls binding as described in the Server Encryption section.
Remove the running containers
Find the path to the teardown.sh
script(s) in the use the teardown script(s) to clean up
section of the example output and run it. If you didn't provide an external Kafka endpoint, there will be scripts for both Zilla and the local Kafka installs.