Apache Kafka plugin
Overview
The Apache Kafka plugin connects QALIPSIS to an Apache Kafka cluster.
- Technology addressed
-
Kafka: https://kafka.apache.org/
- Dependency
-
io.qalipsis.plugin:qalipsis-plugin-kafka - Namespace in scenario
-
kafka() - Client library
-
Kafka: refer to Apache Kafka
- Supported steps
An example scenario that uses a combination of the kafka consume and produce steps is provided on GitHub.
Consume step
The consume step within the Apache Kafka plugin polls data from topics of an Apache Kafka cluster and forwards the data to the next step, either in batches or individually.
The consume step:
-
is generally used in conjunction with a left join to assert data or inject data into a workflow.
-
supports multithreading (concurrent consumers) using the
concurrencyproperty.- Ancestor
-
Scenario
- Functionality
-
The
consumestep starts a polling loop for the designatedclient, using the provided subscriptiontopicsname, ortopicsPatternandgroupId. When new records appear,consumepolls them and sends them to the next step(s). - Example
-
kafka().consume { bootstrap("localhost:9092") topics("battery_state") groupId("kafka-example") offsetReset(OffsetResetStrategy.EARLIEST) pollTimeout(Duration.ofSeconds(1)) } - Notable parameters
-
bootstrap: configures the bootstrap hosts of theKafkacluster; defaults to localhost:9092. -
groupId: defines the consumer group. -
pollTimeout: defines the consumer poll timeout to wait for records; defaults to 1 second. -
offsetReset(required): defines the strategy to apply when the consumer starts consuming messages from the queue. -
pollDelay(required): specifies the delay between query executions. -
maxPolledRecords(optional): defines the maximum number of records returned in a unique poll batch; defaults to 500. -
broadcast(optional): specifies the broadcast parameters for the step.- Tips
-
Use the
consumestep with a left join to assert data or inject data into a workflow. -
Valid
offsetResetstrategies are:-
OffsetResetStrategy.EARLIEST: starts consuming messages from the beginning of the queue. -
OffsetResetStrategy.LATEST: starts consuming messages when the consumer starts, ignoring all previous messages in the queue.OffsetResetStrategy.LATESTis the default.- Reference documentation
-
Refer to the Apache KAFKA documentation for further parameter and configuration information.
-
Produce step
The produce step within the Apache Kafka plugin pushes data onto topics of the Kafka broker and forwards the data as input to the next step.
- Ancestor
-
Scenario
- Functionality
-
The
producestep’s input and step context are used to generate values to complete the step. The batch of results, or a single result, is then forwarded to the next step(s). - Example
.kafka()
.produce(
keySerializer = Serdes.String().serializer(),
valueSerializer = Serdes.String().serializer()
) {
bootstrap("localhost:9092")
clientName("producer")
records { _, input ->
listOf(
KafkaProducerRecord(
topic = "battery_state",
value = objectMapper.writeValueAsString(input),
key = input.deviceId
)
)
}
}
- Notable parameters
-
-
keySerializer: describes how to serialize the record keys to send tokafka. -
valueSerializer: describes how to serialize the record values to send tokafka. -
bootstrap: configures the bootstrap hosts of the Kafka cluster; defaults tolocalhost:9092. -
clientName: defines the name of thekafkaclient -
records(required): generates and configures a list of produced records.
-
- Reference documentation
-
Refer to the Apache KAFKA documentation for further parameter and configuration information.
Configuration of publication of events and meters to Apache Kafka
Example configuration file
events:
export:
kafka:
enabled: true
bootstrap`: localhost:9092
topic: qalipsis-events
duration-as-nano: false
linger-period: 10s
batch-size: 2
configuration:
delivery.timeout.ms: 10000
meters:
export:
kafka:
enabled: true
step: PT10S
bootstrap.servers: localhost:9092
topic: qalipsis-meters
linger.ms: 5000
batch-size: 800000 # In bytes.
configuration:
delivery.timeout.ms: 10000
- Event parameters
-
-
enabled(required): boolean flag that activates/deactivates event publishing to Kafka; defaults tofalse; must be set totrue. -
min-level(required): minimal accepted level of events; defaults to INFO; allowable values are `EventLevel`values: TRACE, DEBUG, INFO, WARN, ERROR, OFF. -
bootstrap(required): bootstraps (string of host and port pair) of the Kafka broker; defaults tolocalhost:9092. -
topic(required): string name of the topic to write the events to; defaults toqalipsis-events. -
duration-as-nano(required): boolean flag that activates/deactivates converting the duration to nanoseconds; defaults tofalse(converts to milliseconds). -
linger-period(required): maximum period between two publications of events to Kafka; positive duration; defaults to10s. -
batch-size(required): maximum number of events buffered between two publications of events to Kafka; positive integer; defaults to2. -
format(optional): format of the exported data; defaults to JSON. -
configuration(optional): key, value configuration parameters of typeProperties, for the Kafka producer.-
delivery.timeout.ms(optional): maximum integer value in milliseconds before time out; defaults to10000.Refer to the Kafka documentation for other event monitoring parameters.
-
-
- Meter parameters
-
-
enabled(required): boolean flag that activates/deactivates meter publishing to Kafka; defaults tofalse; must be set totrue. -
step(required): The step size (reporting frequency); defaults toPT10S. -
bootstrap.servers(required): bootstraps (string of host and port pair) of the Kafka broker; defaults tolocalhost:9092. -
topic(required): string name of the topic to write events to; defaults toqalipsis-meters. -
linger.ms(required): maximum period between two batches of events; positive duration; defaults to5000. -
batch-size(required): maximum number of events buffered between two publications of events to Kafka; positive integer; defaults to2. -
format(required): format of the exported data; defaults to JSON. -
configuration(optional): key, value configuration parameters of typeProperties, for the Kafka producer.-
delivery.timeout.ms (optional): maximum integer value in milliseconds before time out; defaults to
10000.Refer to the Kafka documentation for other meter monitoring parameters.
-
-