Apache Kafka plugin
Overview
The Apache Kafka plugin connects QALIPSIS to an Apache Kafka cluster.
- Technology addressed
-
Apache Kafka: https://kafka.apache.org/
- Dependency
-
io.qalipsis.plugin:qalipsis-plugin-kafka - Namespace in scenario
-
kafka() - Client library
-
Apache Kafka Client: refer to Kafka API
Supported steps
Consume step
The consume step polls Kafka records and forwards them to the next step as batches by default.
Use .flatten(…) when you want one consumed result per downstream event.
.kafka()
.consume { (1)
name = "orders-consumer"
bootstrap("localhost:9092")
topics("orders")
groupId("orders-group")
pollTimeout(100)
offsetReset(OffsetResetStrategy.EARLIEST)
concurrency(2)
}
.flatten(Serdes.Integer().deserializer(), Serdes.String().deserializer()) (2)
.map { it.record.value } (3)
.filterNotNull()
| 1 | Configures the consume step and the subscription settings. |
| 2 | Switches output from batch mode to one consumed result at a time, deserializing keys as Integer and values as String inline. |
| 3 | Reads the deserialized value from KafkaConsumerResult.record.value. |
Produce step
The produce step publishes records generated from step context and input.
.kafka()
.produce( (1)
Serdes.Integer().serializer(),
Serdes.String().serializer()
) {
name = "orders-producer"
bootstrap("localhost:9092")
clientName("producer-orders") (2)
records { stepContext, input -> (3)
listOf(
KafkaProducerRecord(
topic = "processed-orders",
key = input.record.key,
value = "processed-${input.record.value}"
)
)
}
}
| 1 | Declares key and value serializers required by produce(…). |
| 2 | Sets the producer client name used by the step. |
| 3 | Builds one or more KafkaProducerRecord per consumed input. |
Configuration
DSL parameters
Available parameters are described in the table below.
| Property | Description |
|---|---|
|
Configures the bootstrap servers used by the Kafka consumer. Example
|
|
Sets the explicit list of topics to subscribe to. Mutually exclusive with Example
|
|
Subscribes to topics matching a regex pattern. Mutually exclusive with Example
|
|
Sets the Kafka consumer group id. Example
|
|
Sets the timeout used for each Kafka poll. Example
|
|
Strategy used when the configured group has no committed offset. Example
|
|
Maximum number of records returned per poll call. Must be strictly positive. Example
|
|
Number of concurrent consumer threads for the same step and group id. Must be strictly positive. Example
|
|
Additional Kafka consumer properties forwarded as-is. Example
|
|
Sets key and value deserializers and keeps batch output ( Example
|
|
Serializer used for the produced record key. Declared as a required argument of Example
|
|
Serializer used for the produced record value. Declared as a required argument of Example
|
|
Defines the logical name used by the producer step. Example
|
|
Builds the records to publish for each input received by the step. Example
|
Shared defaults for Kafka steps
You can define defaults once in the scenario section or just after, and let all following Kafka steps inherit them.
scenario {
kafka().defaults { (1)
bootstrap("localhost:9092")
properties("security.protocol" to "PLAINTEXT")
}
}
| 1 | Defaults are applied to subsequent Kafka consume and produce steps in the same scenario. Individual steps can still override values. |
Analytics
QALIPSIS can publish the meters and events collected during a campaign to Apache Kafka through dedicated publishers.
Configure the publishers in the factory configuration, separate from the scenario DSL. Refer to Provide the configuration to QALIPSIS for details about supplying configuration files to QALIPSIS.
Meters
The meters publisher serializes the meter snapshots collected during a campaign and produces them to a Kafka topic.
meters:
export:
kafka:
enabled: true
bootstrap: localhost:9092
topic: qalipsis-meters
prefix: qalipsis
timestamp-field-name: timestamp
serializer: json
linger-ms: 1000
batch-size: 1
configuration:
delivery.timeout.ms: 10000
The parameters used to configure the publication of meters to Apache Kafka are described in the table below.
| Parameter | Description |
|---|---|
|
Activates the publication of meters to Apache Kafka. Must be set to |
|
Comma‑separated list of bootstrap servers used by the Kafka producer ( |
|
Name of the Kafka topic where the meter records are produced. |
|
Prefix prepended to every metric name when publishing to Kafka. |
|
Name of the field used to carry the timestamp in the serialized records. |
|
Identifier of the serializer used to encode the meters. |
|
Maximum time, in milliseconds, the producer waits before sending a batch even if it is not full. |
|
Maximum size in bytes of a producer batch ( |
|
Additional Kafka producer properties forwarded as-is to the underlying client (e.g. |
Events
The events publisher buffers the events emitted during a campaign, serializes them and produces them to a Kafka topic.
events:
export:
kafka:
enabled: true
min-level: INFO
bootstrap: localhost:9092
topic: qalipsis-events
duration-as-nano: false
linger-period: 1s
batch-size: 2000
format: JSON
configuration:
delivery.timeout.ms: 10000
The parameters used to configure the publication of events to Apache Kafka are described in the table below.
| Parameter | Description |
|---|---|
|
Activates the publication of events to Apache Kafka. Must be set to |
|
Minimum event level published. Acceptable values are |
|
Comma‑separated list of bootstrap servers used by the Kafka producer ( |
|
Name of the Kafka topic where the event records are produced. |
|
Exports event durations as nanoseconds when set to |
|
Maximum time the events are buffered before sending a batch to the brokers, even when the batch size is not reached. |
|
Maximum number of events buffered between two publications to Kafka. |
|
Serialization format used for the exported events. |
|
Additional Kafka producer properties forwarded as-is to the underlying client (e.g. |
- Reference Documentation
-
Refer to Monitoring test campaigns for additional background on meter and event publication.