Elasticsearch plugin

Overview

The Elasticsearch plugin connects QALIPSIS to Elasticsearch using the elasticsearch REST client.

Elasticsearch allows QALIPSIS to poll, search, save, and analyze data from your tested system.

This page explains configuration, core concepts, secure connection options, runnable examples and best practices.

Technology addressed

Elasticsearch: https://www.elastic.co/

Dependency

io.qalipsis.plugin:qalipsis-plugin-elasticsearch

Namespace in scenario

elasticsearch()

Client library

JAVA API: refer to Elasticsearch JAVA Client.

Supported steps

Poll step

The poll step within the Elasticsearch plugin reads batches of documents periodically from Elasticsearch. The output is a list of ElasticsearchDocument containing JSON data or deserialized to objects.

Ancestor

Scenario

Functionality
  • The poll step is created only once in a scenario; it is then used throughout the scenario as called.

  • The poll step within the Elasticsearch plugin uses a strategy of “delivered exactly once”.

  • The output of the poll step is an ElasticsearchDocument that contains maps of column to values.

Example
elasticsearch().poll {  (1)
    name = "my-poll-step"
    client { RestClient.builder(HttpHost.create("http://localhost:9200")).build() }  (2)
    query {       (3)
        """{
            "query":{
                "match_all":{}
            },
                "sort" : ["_score"]
            }""".trimIndent()
    }
    index("battery-state")   (4)
    pollDelay(Duration.ofSeconds(1))  (5)
}
<1> Start the `poll` step.
<2> Configure the REST client connection.
<3> Configure the query.
<4> Set the `elasticsearch` index to use for the queries.
<5> Specify the delay between query executions.
Tips
  • The poll step is generally used in conjunction with a join to assert data or inject data into a workflow.

  • The query parameter requires at least one sorting field to filter the newest results and not fetch the same records repeatedly.

  • The output of the poll step is an ElasticsearchDocument that contains maps of column names to values. If flatten is called, the records are provided one by one to the next step; otherwise each poll batch remains complete.

Reference Documentation

Save step

The save step within the Elasticsearch plugin persists a batch of documents into Elasticsearch.

Ancestor

Step

Functionality

The save step’s input and step context are used to generate a list of documents that are forwarded to Elasticsearch.

Example
previousStep.elasticsearch().save {   (1)
    name = "my-save-step" (2)
    client {    (3)
        RestClient.builder(HttpHost.create("http://localhost:9200"))
            .build()
    }
    documents { stepContext, input ->  (4)
        listOf(
            Document(
                index = "battery-state",
                id = input.deviceId,
                source = objectMapper.writeValueAsString(input)
            )
        )
    }
}
1 Start the save step. Note: previousStep is a placeholder for any preceding step (e.g. collect() etc.).
2 Give a name to the step.
3 Configure the REST client connection.
4 Define the strategy to create documents to save using the step context and input.
Reference Documentation

Refer to the Elasticsearch documentation for further parameter information.

Search step

The search step within the Elasticsearch plugin searches documents in the Elasticsearch database using a search query.

Ancestor

Step

Functionality

The search step’s input and step context are used to generate a query; the batch of results is then forwarded to the next step(s).

Example
previousStep.elasticsearch().search {   (1)
    name = "my-search-step"
    client {    (2)
        RestClient.builder(HttpHost.create("http://localhost:9200")).build()
    }
    index {  stepContext, input -> listOf("battery-state") }   (3)
    query { stepContext, input ->     (4)
        """{
            "query": {
                "bool": {
                    "must": [
                        { "match": { "deviceId": "${input.input.deviceId}" }},
                        { "term": { "timestamp": ${input.input.timestamp.epochSecond} }}
                    ]
                }
            }
        }"""
    }
}
1 Start the search step. Note: previousStep is a placeholder for any preceding step (e.g. collect() etc.)
2 Configure the REST client connection.
3 Set the elasticsearch indices to use for the queries, resolved from the step context and input.
4 Configure the builder for the JSON query to perform a search, resolved from the step context and input.
Reference Documentation

Configuration

DSL parameters

Available parameters are described in the table below.

Parameter Description

client

Configures the REST client to connect to Elasticsearch
Applicable Steps: Poll, Search, Save
Optional/Required: Optional
Data Type: Lambda that returns an instance of RestClient
Default Value: http://localhost:9200

Example
client {
    RestClient.builder(HttpHost.create("http://localhost:9200")).build()
}
OR
client {
  RestClient.builder(HttpHost(hostname = "localhost", port = 9200,  scheme = "http")).build()
}

index

Sets the elasticsearch indices to use for the queries.
Applicable Steps: Poll, Search
Optional/Required: Optional
Data Type:
Poll: vararg String
Search: Lambda that receives the step context and input to return List<String>
Default Value: _all

Example (Poll)
index("battery-state")
Example (Search)
index { stepContext, input -> listOf("${input.batteryState}") }

query

Configures the builder for the JSON query to perform the poll/search.
Applicable Steps: Poll, Search
Optional/Required: Required
Data Type:
Poll: Lambda that returns a String.
Search: Lambda that receives the step context and input to return a String.
Default Value: N/A

Example (Poll)
query {
        """{
            "query":{
                "match_all":{}
            },
                "sort" : ["_score"]
            }"""
    }
Example (Search)
query { stepContext, input ->
        """{
            "query": {
                "bool": {
                    "must": [
                        { "match": { "deviceId": "${input.deviceId}" }},
                        { "term": { "timestamp": ${input.timestamp.epochSecond} }}
                    ]
                }
            }
        }"""
    }

queryParameters

Adds parameters to the query.
Applicable Steps: Poll, Search
Optional/Required: Optional
Data Type:
Poll: vararg Pair<String, String>
Search: Lambda that receives the step context and input to return Map<String, String>
Default Value: N/A

Example (Poll)
queryParameters("deviceId" to "deviceId1", "param-2" to "val-2")
Example (Search)
queryParameters { stepContext, input → mapOf("deviceId" to "${input.deviceId}") }

pollDelay

Specifies the delay between two successive polls.
Applicable Step: Poll
Optional/Required: Optional
Data Type: java.time.Duration
Default Value: null

Example
pollDelay(Duration.ofSeconds(1)) +

singletonConfiguration

Configures how polled results are delivered to scenario minions.
Refer to SingletonConfiguration parameters for details on strategies and examples of usage.

documents

Builds the list of documents to save using the Elasticsearch Bulk API.
Applicable Step: Save
Optional/Required: Required
Data Type: Lambda that receives the step context and input to return List<Document>
Default Value: N/A

Example
documents { stepContext, input ->
    listOf(
        Document(
            index = "battery-state",
            id = input.deviceId,
            source = objectMapper.writeValueAsString(input)
        )
    )
}

monitoring

Configures QALIPSIS monitoring (meters/events) for the step.
Refer to Monitoring parameters for configuration details and examples of usage.

keepResponse

When called, keeps the response from Elasticsearch after saving documents and forwards it to the next step.
Applicable Step: Save
Optional/Required: Optional
Data Type: Function
Default Value: N/A

Example
keepResponse()

fetchAll

Fetches all results from Elasticsearch, even if over the result-sized window.
Applicable Step: Search
Optional/Required: Optional
Data Type: Function
Default Value: N/A

Example
   fetchAll()

mapper

Configures the JSON mapper to use for deserializing JSON documents from Elasticsearch into objects.
Applicable Steps: Search, Poll
Optional/Required: Optional
Data Type: Lambda that receives a JsonMapper to use in deserializing JSON records.
Default Value: N/A

Example
mapper { jsonMapper ->
    // Custom configuration of the mapper
}

Shared defaults for Elasticsearch steps

You can define defaults once in the scenario section or just after, and let all following Elasticsearch steps inherit them.

scenario {
  elasticsearch().defaults { (1)
    client {
      RestClient.builder(HttpHost.create("http://localhost:9200")).build()
    }
    monitoring {
      events = true
      meters = true
    }
  }
}
1 Defaults are applied to subsequent Elasticsearch steps in the same scenario. Individual steps can still override values.

Analytics

QALIPSIS can publish the meters and events collected during a campaign to Elasticsearch through dedicated publishers.

Configure the publishers in the factory configuration, separate from the scenario DSL. Refer to Provide the configuration to QALIPSIS for information about specifying the configuration to QALIPSIS.

Meters

The meters publisher writes the meter snapshots collected during a campaign as documents to time‑partitioned Elasticsearch indices.

Example configuration file (YAML)
meters:
  export:
    elasticsearch:
      enabled: true
      urls:
        - http://localhost:9200
      path-prefix: /
      index-prefix: qalipsis-meters
      refresh-interval: 10s
      store-source: true
      index-date-pattern: yyyy-MM-dd
      publishers: 1
      username:
      password:
      shards: 1
      replicas: 0
      proxy:

The parameters used to configure the publication of meters to Elasticsearch are described in the table below.

Parameter Description

meters.export.elasticsearch.enabled

Activates the publication of meters to Elasticsearch. Must be set to true to enable publishing.
Data Type: Boolean
Default Value: false

meters.export.elasticsearch.urls

List of URLs to the Elasticsearch instances.
Data Type: List of String
Default Value: [http://localhost:9200]

meters.export.elasticsearch.path-prefix

Prefix prepended to the URL paths sent to Elasticsearch.
Data Type: String
Default Value: /

meters.export.elasticsearch.index-prefix

Prefix used for the created indices that contain the meters.
Data Type: String
Default Value: qalipsis-meters

meters.export.elasticsearch.refresh-interval

Delay between two refreshes of the meter indices in Elasticsearch.
Data Type: String
Default Value: 10s

meters.export.elasticsearch.store-source

Stores the _source field of the documents alongside the values when set to true.
Data Type: Boolean
Default Value: true

meters.export.elasticsearch.index-date-pattern

Pattern of the date suffix appended to the index name to generate time‑partitioned indices, as supported by java.time.format.DateTimeFormatter.
Data Type: String
Default Value: yyyy-MM-dd

meters.export.elasticsearch.publishers

Number of concurrent publishers that can send meter batches to Elasticsearch.
Data Type: Positive integer
Default Value: 1 (no concurrency)

meters.export.elasticsearch.username

Name of the user for basic authentication when connecting to Elasticsearch.
Data Type: String
Default Value: None

meters.export.elasticsearch.password

Password of the user for basic authentication when connecting to Elasticsearch.
Data Type: String
Default Value: None

meters.export.elasticsearch.shards

Number of shards applied to the created meter indices.
Data Type: Positive integer
Default Value: 1

meters.export.elasticsearch.replicas

Number of replicas applied to the created meter indices.
Data Type: Positive integer
Default Value: 0

meters.export.elasticsearch.proxy

URL of the HTTP proxy to use to access Elasticsearch. Useful when an alternative authentication mechanism is required to reach the cluster.
Data Type: String
Default Value: None

Events

The events publisher buffers the events emitted during a campaign and writes them in bulk as documents to time‑partitioned Elasticsearch indices.

Example configuration file (YAML)
events:
  export:
    elasticsearch:
      enabled: true
      min-level: INFO
      urls:
        - http://localhost:9200
      path-prefix: /
      index-prefix: qalipsis-events
      refresh-interval: 10s
      store-source: true
      index-date-pattern: yyyy-MM-dd
      linger-period: 10s
      batch-size: 40000
      publishers: 1
      username:
      password:
      shards: 1
      replicas: 0
      proxy:

The parameters used to configure the publication of events to Elasticsearch are described in the table below.

Parameter Description

events.export.elasticsearch.enabled

Activates the publication of events to Elasticsearch. Must be set to true to enable publishing.
Data Type: Boolean
Default Value: false

events.export.elasticsearch.min-level

Minimum event level published. Acceptable values are TRACE, DEBUG, INFO, WARN, ERROR, OFF.
Data Type: ENUM
Default Value: INFO

events.export.elasticsearch.urls

List of URLs to the Elasticsearch instances.
Data Type: List of String
Default Value: [http://localhost:9200]

events.export.elasticsearch.path-prefix

Prefix prepended to the URL paths sent to Elasticsearch.
Data Type: String
Default Value: /

events.export.elasticsearch.index-prefix

Prefix used for the created indices that contain the events.
Data Type: String
Default Value: qalipsis-events

events.export.elasticsearch.refresh-interval

Delay between two refreshes of the event indices in Elasticsearch.
Data Type: String
Default Value: 10s

events.export.elasticsearch.store-source

Stores the _source field of the documents alongside the values when set to true.
Data Type: Boolean
Default Value: true

events.export.elasticsearch.index-date-pattern

Pattern of the date suffix appended to the index name to generate time‑partitioned indices, as supported by java.time.format.DateTimeFormatter.
Data Type: String
Default Value: yyyy-MM-dd

events.export.elasticsearch.linger-period

Maximum time the events are buffered before forcing a flush to Elasticsearch, even when the batch size is not reached.
Data Type: Positive duration
Default Value: 10s

events.export.elasticsearch.batch-size

Maximum number of events buffered between two publications to Elasticsearch before flushing a bulk request.
Data Type: Positive integer
Default Value: 40000

events.export.elasticsearch.publishers

Number of concurrent publishers that can send event batches to Elasticsearch.
Data Type: Positive integer
Default Value: 1 (no concurrency)

events.export.elasticsearch.username

Name of the user for basic authentication when connecting to Elasticsearch.
Data Type: String
Default Value: None

events.export.elasticsearch.password

Password of the user for basic authentication when connecting to Elasticsearch.
Data Type: String
Default Value: None

events.export.elasticsearch.shards

Number of shards applied to the created event indices.
Data Type: Positive integer
Default Value: 1

events.export.elasticsearch.replicas

Number of replicas applied to the created event indices.
Data Type: Positive integer
Default Value: 0

events.export.elasticsearch.proxy

URL of the HTTP proxy to use to access Elasticsearch. Useful when an alternative authentication mechanism is required to reach the cluster.
Data Type: String
Default Value: None

Reference Documentation

Refer to Monitoring test campaigns for a further explanation of the meter and event parameter values.