Apache Kafka plugin

Overview

The Apache Kafka plugin connects QALIPSIS to an Apache Kafka cluster.

Technology addressed

Apache Kafka: https://kafka.apache.org/

Dependency

io.qalipsis.plugin:qalipsis-plugin-kafka

Namespace in scenario

kafka()

Client library

Apache Kafka Client: refer to Kafka API

Supported steps

Consume step

The consume step polls Kafka records and forwards them to the next step as batches by default.

Use .flatten(…​) when you want one consumed result per downstream event.

  .kafka()
  .consume { (1)
    name = "orders-consumer"
    bootstrap("localhost:9092")
    topics("orders")
    groupId("orders-group")
    pollTimeout(100)
    offsetReset(OffsetResetStrategy.EARLIEST)
    concurrency(2)
  }
  .flatten(Serdes.Integer().deserializer(), Serdes.String().deserializer()) (2)
  .map { it.record.value } (3)
  .filterNotNull()
1 Configures the consume step and the subscription settings.
2 Switches output from batch mode to one consumed result at a time, deserializing keys as Integer and values as String inline.
3 Reads the deserialized value from KafkaConsumerResult.record.value.

Produce step

The produce step publishes records generated from step context and input.

  .kafka()
  .produce( (1)
    Serdes.Integer().serializer(),
    Serdes.String().serializer()
  ) {
    name = "orders-producer"
    bootstrap("localhost:9092")
    clientName("producer-orders") (2)
    records { stepContext, input -> (3)
      listOf(
        KafkaProducerRecord(
          topic = "processed-orders",
          key = input.record.key,
          value = "processed-${input.record.value}"
        )
      )
    }
  }
1 Declares key and value serializers required by produce(…​).
2 Sets the producer client name used by the step.
3 Builds one or more KafkaProducerRecord per consumed input.

Configuration

DSL parameters

Available parameters are described in the table below.

Property Description

bootstrap

Configures the bootstrap servers used by the Kafka consumer.
Applicable Steps: Consume, Produce
Optional/Required: Optional
Data Type: String (vararg)
Default Value: localhost:9092

Example
bootstrap("localhost:9092")

topics

Sets the explicit list of topics to subscribe to. Mutually exclusive with topicsPattern(…​).
Applicable Steps: Consume, Produce
Optional/Required: Required when topicsPattern(…​) is not set
Data Type: String (vararg)
Default Value: empty list

Example
topics("orders", "orders-retry")

topicsPattern

Subscribes to topics matching a regex pattern. Mutually exclusive with topics(…​).
Applicable Steps: Consume
Optional/Required: Required when topics(…​) is not set
Data Type: java.util.regex.Pattern
Default Value: null

Example
topicsPattern(Pattern.compile("orders-.*"))

groupId

Sets the Kafka consumer group id.
Applicable Steps: Consume
Optional/Required: Required
Data Type: String
Default Value: "" empty string

Example
groupId("orders-consumer")

pollTimeout

Sets the timeout used for each Kafka poll.
Applicable Steps: Consume
Optional/Required: Optional
Data Type: Duration or Function taking Long argument for milliseconds
Default Value: Duration.ofSeconds(1) or 1000+

Example
pollTimeout(Duration.ofMillis(250)) or pollTimeout(250)

offsetReset

Strategy used when the configured group has no committed offset.
Applicable Steps: Consume
Optional/Required: Optional
Data Type: OffsetResetStrategy (enum) EARLIEST or LATEST
Default Value: OffsetResetStrategy.LATEST

Example
offsetReset(OffsetResetStrategy.EARLIEST)

maxPolledRecords

Maximum number of records returned per poll call. Must be strictly positive.
Applicable Steps: Consume
Optional/Required: Optional
Data Type: Int
Default Value: 500

Example
maxPolledRecords(1000)

concurrency

Number of concurrent consumer threads for the same step and group id. Must be strictly positive.
Applicable Steps: Consume
Optional/Required: Optional
Data Type: Int
Default Value: 1

Example
concurrency(4)

properties

Additional Kafka consumer properties forwarded as-is.
Applicable Steps: Consume, Produce
Optional/Required: Optional
Data Type: Pair<String, Any>…​ or Map<String, Any>
Default Value: empty map

Example
properties(
  "client.id" to "orders-consumer-1",
  "fetch.min.bytes" to 1024
)

deserialize

Sets key and value deserializers and keeps batch output (List<KafkaConsumerResult<K?, V?>>).
Applicable Steps: Consume
Optional/Required: Optional
Data Type: Overloads with class names, KClass, or Deserializer instances
Default Value: Byte array deserializers for key and value

Example
.deserialize(Serdes.Integer().deserializer(), Serdes.String().deserializer())

keySerializer

Serializer used for the produced record key. Declared as a required argument of produce(…​).
Applicable Steps: Produce
Optional/Required: Required
Data Type: Serializer for the key type K: Serializer<K>
Default Value: N/A

Example
Serdes.String().serializer()

valueSerializer

Serializer used for the produced record value. Declared as a required argument of produce(…​).
Applicable Steps: Produce
Optional/Required: Required
Data Type: Serializer for the value type V: Serializer<V>
Default Value: N/A

Example
Serdes.String().serializer()

clientName

Defines the logical name used by the producer step.
Applicable Steps: Produce
Optional/Required: Required
Data Type: String
Default Value: empty string

Example
clientName("producer-orders")

records

Builds the records to publish for each input received by the step.
Applicable Steps: Produce
Optional/Required: Required in practice
Data Type: Lambda that takes the step context and input and returns a list of KafkaProducerRecord<K, V>
Default Value: empty list

Example
records { stepContext, input ->
  listOf(
    KafkaProducerRecord(
      topic = "orders",
      key = input.record.key,
      value = "processed-${input.record.value}"
    )
  )
}

Shared defaults for Kafka steps

You can define defaults once in the scenario section or just after, and let all following Kafka steps inherit them.

scenario {
  kafka().defaults { (1)
    bootstrap("localhost:9092")
    properties("security.protocol" to "PLAINTEXT")
  }
}
1 Defaults are applied to subsequent Kafka consume and produce steps in the same scenario. Individual steps can still override values.

Analytics

QALIPSIS can publish the meters and events collected during a campaign to Apache Kafka through dedicated publishers.

Configure the publishers in the factory configuration, separate from the scenario DSL. Refer to Provide the configuration to QALIPSIS for details about supplying configuration files to QALIPSIS.

Meters

The meters publisher serializes the meter snapshots collected during a campaign and produces them to a Kafka topic.

Example configuration file (YAML)
meters:
  export:
    kafka:
      enabled: true
      bootstrap: localhost:9092
      topic: qalipsis-meters
      prefix: qalipsis
      timestamp-field-name: timestamp
      serializer: json
      linger-ms: 1000
      batch-size: 1
      configuration:
        delivery.timeout.ms: 10000

The parameters used to configure the publication of meters to Apache Kafka are described in the table below.

Parameter Description

meters.export.kafka.enabled

Activates the publication of meters to Apache Kafka. Must be set to true to enable publishing.
Data Type: Boolean
Default Value: false

meters.export.kafka.bootstrap

Comma‑separated list of bootstrap servers used by the Kafka producer (bootstrap.servers in Kafka terminology).
Data Type: String
Default Value: localhost:9092

meters.export.kafka.topic

Name of the Kafka topic where the meter records are produced.
Data Type: String
Default Value: qalipsis-meters

meters.export.kafka.prefix

Prefix prepended to every metric name when publishing to Kafka.
Data Type: String
Default Value: qalipsis

meters.export.kafka.timestamp-field-name

Name of the field used to carry the timestamp in the serialized records.
Data Type: String
Default Value: timestamp

meters.export.kafka.serializer

Identifier of the serializer used to encode the meters.
Data Type: String
Default Value: json

meters.export.kafka.linger-ms

Maximum time, in milliseconds, the producer waits before sending a batch even if it is not full.
Data Type: Positive long
Default Value: 1000

meters.export.kafka.batch-size

Maximum size in bytes of a producer batch (batch.size in Kafka terminology).
Data Type: Positive integer
Default Value: 1

meters.export.kafka.configuration.<kafka-property>

Additional Kafka producer properties forwarded as-is to the underlying client (e.g. delivery.timeout.ms, acks, compression.type). Each entry uses the standard Kafka producer property name.
Data Type: Map of String to String
Default Value: None

Events

The events publisher buffers the events emitted during a campaign, serializes them and produces them to a Kafka topic.

Example configuration file (YAML)
events:
  export:
    kafka:
      enabled: true
      min-level: INFO
      bootstrap: localhost:9092
      topic: qalipsis-events
      duration-as-nano: false
      linger-period: 1s
      batch-size: 2000
      format: JSON
      configuration:
        delivery.timeout.ms: 10000

The parameters used to configure the publication of events to Apache Kafka are described in the table below.

Parameter Description

events.export.kafka.enabled

Activates the publication of events to Apache Kafka. Must be set to true to enable publishing.
Data Type: Boolean
Default Value: false

events.export.kafka.min-level

Minimum event level published. Acceptable values are TRACE, DEBUG, INFO, WARN, ERROR, OFF.
Data Type: ENUM
Default Value: INFO

events.export.kafka.bootstrap

Comma‑separated list of bootstrap servers used by the Kafka producer (bootstrap.servers in Kafka terminology).
Data Type: String
Default Value: localhost:9092

events.export.kafka.topic

Name of the Kafka topic where the event records are produced.
Data Type: String
Default Value: qalipsis-events

events.export.kafka.duration-as-nano

Exports event durations as nanoseconds when set to true, otherwise converts them to milliseconds.
Data Type: Boolean
Default Value: false

events.export.kafka.linger-period

Maximum time the events are buffered before sending a batch to the brokers, even when the batch size is not reached.
Data Type: Duration
Default Value: 1s

events.export.kafka.batch-size

Maximum number of events buffered between two publications to Kafka.
Data Type: Positive integer
Default Value: 2000

events.export.kafka.format

Serialization format used for the exported events.
Data Type: ENUM
Default Value: JSON

events.export.kafka.configuration.<kafka-property>

Additional Kafka producer properties forwarded as-is to the underlying client (e.g. delivery.timeout.ms, acks, compression.type). Each entry uses the standard Kafka producer property name.
Data Type: Map of String to String
Default Value: None

Reference Documentation

Refer to Monitoring test campaigns for additional background on meter and event publication.