sse-kafka Binding
sse-kafka Binding
Zilla runtime sse-kafka binding.
sse_kafka_proxy:
type: sse-kafka
kind: proxy
routes:
- when:
- path: /items
exit: kafka_cache_client
with:
topic: items-snapshots
event:
id: '["${base64(key)}","${etag}"]'
Summary
Defines a binding with sse-kafka support, with proxy behavior.
The proxy kind sse-kafka binding adapts sse data streams into kafka data streams, so that kafka messages can be delivered to sse clients.
Filtering can be performed by kafka message key, message headers, or a combination of both message key and headers, extracting the parameter values from the inbound sse path.
Progress across kafka topic partitions is conveyed to the sse client via event id and when the stream is implicitly paused during sse client reconnect, the last-event-id header in the sse reconnect request contains the last received event id value, allowing the sse stream to resume reliable message delivery automatically.
The event id can be configured to include the message key and etag of each message, avoiding the need to duplicate the key in the message body and making it suitable for integration with http-kafka binding's use of etag for conditional if-match operations.
When a kafka tombstone (null value) message is received by the sse-kafka binding, it delivers a delete event to the sse client. This informs the client which specific message has been deleted by observing the message key from the sse delete event id.
Configuration
kind*
enum[ "proxy" ]
Behave as a sse-kafka proxy.
exit
string
Default exit binding when no conditional routes are viable.
exit: kafka_cache_client
routes
arrayofobject
Conditional sse-kafka-specific routes for adapting sse data streams to kafka data streams.
routes:
- when:
- path: /items
exit: kafka_cache_client
with:
topic: items-snapshots
event:
id: '["${base64(key)}","${etag}"]'
routes[].guarded
objectas named map ofstring:stringarray
List of roles required by each named guard to authorize this route.
routes:
- guarded:
test:
- read:items
routes[].when
arrayofobject
List of conditions (any match) to match this route.
Read more: When a route matches
routes:
- when:
- path: /items
when[].path*
string
Path with optional embedded parameter names, such as /{topic}.
routes[].exit*
string
Next binding when following this route.
routes:
- when:
...
exit: kafka_cache_client
routes[].with
object
Kafka parameters used when adapting sse data streams to kafka data streams.
with.topic*
string
Topic name, optionally referencing path parameter such as ${params.topic}.
with.filters
arrayofobject
List of criteria (any match)Kafka filters for matched route when adapting sse data streams to kafka data streams.
All specified headers and key must match for the combined criteria to match.
filters[].key
string
Message key, optionally referencing path parameter such as ${params.key}.
filters[].headers
object
Message headers, with value optionally referencing path parameter such as ${params.headerX}.
filters[].event
object
with.event
object
Defines the SSE event syntax used when delivering Kafka messages to SSE clients.
event.id*
enum["${etag}","["${base64(key)}","${etag}"]"] | Default:"${etag}"
Format of id field in sse event
* required

