Apache Kafka plugin

Overview

The Apache Kafka plugin connects QALIPSIS to an Apache Kafka cluster.

Technology addressed

Kafka: https://kafka.apache.org/

Dependency

io.qalipsis.plugin:qalipsis-plugin-kafka

Namespace in scenario

kafka()

Client library

Kafka: refer to Apache Kafka

Supported steps
  • consume: consumes data from topics of an Apache Kafka cluster.

  • produce: pushes data to topics of the Apache Kafka cluster.

An example scenario that uses a combination of the kafka consume and produce steps is provided on GitHub.

Consume step

The consume step within the Apache Kafka plugin polls data from topics of an Apache Kafka cluster and forwards the data to the next step, either in batches or individually.

The consume step:

  • is generally used in conjunction with a left join to assert data or inject data into a workflow.

  • supports multithreading (concurrent consumers) using the concurrency property.

    Ancestor

    Scenario

    Functionality

    The consume step starts a polling loop for the designated client, using the provided subscription topics name, or topicsPattern and groupId. When new records appear, consume polls them and sends them to the next step(s).

    Example
    kafka().consume {
      bootstrap("localhost:9092")
      topics("battery_state")
      groupId("kafka-example")
      offsetReset(OffsetResetStrategy.EARLIEST)
      pollTimeout(Duration.ofSeconds(1))
    }
    Notable parameters
  • bootstrap: configures the bootstrap hosts of the Kafka cluster; defaults to localhost:9092.

  • groupId: defines the consumer group.

  • pollTimeout: defines the consumer poll timeout to wait for records; defaults to 1 second.

  • offsetReset (required): defines the strategy to apply when the consumer starts consuming messages from the queue.

  • pollDelay (required): specifies the delay between query executions.

  • maxPolledRecords (optional): defines the maximum number of records returned in a unique poll batch; defaults to 500.

  • broadcast (optional): specifies the broadcast parameters for the step.

    Tips
  • Use the consume step with a left join to assert data or inject data into a workflow.

  • Valid offsetReset strategies are:

    • OffsetResetStrategy.EARLIEST: starts consuming messages from the beginning of the queue.

    • OffsetResetStrategy.LATEST: starts consuming messages when the consumer starts, ignoring all previous messages in the queue.

      OffsetResetStrategy.LATEST is the default.

      Reference documentation

      Refer to the Apache KAFKA documentation for further parameter and configuration information.

Produce step

The produce step within the Apache Kafka plugin pushes data onto topics of the Kafka broker and forwards the data as input to the next step.

Ancestor

Scenario

Functionality

The produce step’s input and step context are used to generate values to complete the step. The batch of results, or a single result, is then forwarded to the next step(s).

Example
.kafka()
  .produce(
    keySerializer = Serdes.String().serializer(),
    valueSerializer = Serdes.String().serializer()
    ) {
    bootstrap("localhost:9092")
    clientName("producer")
    records { _, input ->
      listOf(
        KafkaProducerRecord(
          topic = "battery_state",
          value = objectMapper.writeValueAsString(input),
          key = input.deviceId
        )
      )
    }
  }
Notable parameters
  • keySerializer: describes how to serialize the record keys to send to kafka.

  • valueSerializer: describes how to serialize the record values to send to kafka.

  • bootstrap: configures the bootstrap hosts of the Kafka cluster; defaults to localhost:9092.

  • clientName: defines the name of the kafka client

  • records (required): generates and configures a list of produced records.

Reference documentation

Refer to the Apache KAFKA documentation for further parameter and configuration information.

Configuration of publication of events and meters to Apache Kafka

Configuration namespace

Events

events.export.kafka

Meters

meters.export.kafka

Example configuration file

events:
  export:
    kafka:
      enabled: true
      bootstrap`: localhost:9092
      topic: qalipsis-events
      duration-as-nano: false
      linger-period: 10s
      batch-size: 2
      configuration:
        delivery.timeout.ms: 10000

meters:
  export:
    kafka:
      enabled: true
      step: PT10S
      bootstrap.servers: localhost:9092
      topic: qalipsis-meters
      linger.ms: 5000
      batch-size: 800000 # In bytes.
      configuration:
        delivery.timeout.ms: 10000
Event parameters
  • enabled (required): boolean flag that activates/deactivates event publishing to Kafka; defaults to false; must be set to true.

  • min-level (required): minimal accepted level of events; defaults to INFO; allowable values are `EventLevel`values: TRACE, DEBUG, INFO, WARN, ERROR, OFF.

  • bootstrap (required): bootstraps (string of host and port pair) of the Kafka broker; defaults to localhost:9092.

  • topic (required): string name of the topic to write the events to; defaults to qalipsis-events.

  • duration-as-nano (required): boolean flag that activates/deactivates converting the duration to nanoseconds; defaults to false (converts to milliseconds).

  • linger-period (required): maximum period between two publications of events to Kafka; positive duration; defaults to 10s.

  • batch-size (required): maximum number of events buffered between two publications of events to Kafka; positive integer; defaults to 2.

  • format (optional): format of the exported data; defaults to JSON.

  • configuration (optional): key, value configuration parameters of type Properties, for the Kafka producer.

    • delivery.timeout.ms (optional): maximum integer value in milliseconds before time out; defaults to 10000.

      Refer to the Kafka documentation for other event monitoring parameters.

Meter parameters
  • enabled (required): boolean flag that activates/deactivates meter publishing to Kafka; defaults to false; must be set to true.

  • step (required): The step size (reporting frequency); defaults to PT10S.

  • bootstrap.servers (required): bootstraps (string of host and port pair) of the Kafka broker; defaults to localhost:9092.

  • topic (required): string name of the topic to write events to; defaults to qalipsis-meters.

  • linger.ms (required): maximum period between two batches of events; positive duration; defaults to 5000.

  • batch-size (required): maximum number of events buffered between two publications of events to Kafka; positive integer; defaults to 2.

  • format (required): format of the exported data; defaults to JSON.

  • configuration (optional): key, value configuration parameters of type Properties, for the Kafka producer.

    • delivery.timeout.ms (optional): maximum integer value in milliseconds before time out; defaults to 10000.

      Refer to the Kafka documentation for other meter monitoring parameters.