SQL plugin

Overview

The SQL plugin connects QALIPSIS to relational databases through R2DBC. It is designed to insert test data, execute parameterized read queries, and continuously poll tables or views for new rows during a scenario.

Technology addressed

SQL databases via R2DBC: PostgreSQL, MariaDB, MySQL, Microsoft SQL Server, Oracle Database.

Dependency

io.qalipsis.plugin:qalipsis-plugin-sql

Namespace in scenario

sql()

Client library

R2DBC SPI with r2dbc-pool and vendor drivers for PostgreSQL, MariaDB, MySQL, Microsoft SQL Server, and Oracle Database.

Supported steps

Poll step

The poll step periodically executes an ordered query and forwards the rows that match it.

Ancestor

Scenario

Functionality

The poll step is created from the scenario namespace. On each execution, it runs the configured SELECT, emits the returned rows, remembers the value of the first ORDER BY column from the last row of the batch, and reuses that value on the next poll to continue from the latest seen record.

Example
sql().poll {
    name = "poll-new-events"
    protocol(Protocol.POSTGRESQL) (1)
    connection { (2)
        port = 5432
        database = "qalipsis"
        username = "qalipsis"
        password = "qalipsis"
    }
    query( (3)
        """
        select id, device, eventname
        from events
        where device <> ?
        order by id
        """.trimIndent()
    )
    parameters("Truck #1") (4)
    pollDelay(Duration.ofSeconds(1)) (5)
}.flatten() (6)
1 Select the SQL dialect used to create the connection pool and rewrite placeholders if needed.
2 Configure the connection details for the target database.
3 Declare the ordered SELECT statement. The first ORDER BY column (id) is used automatically as the poll tie-breaker.
4 Bind the query parameter in the same order as the ? placeholder.
5 Wait one second between two polling executions.
6 Emits each row individually as DatasourceRecord<Map<String, Any?>> instead of forwarding a full SqlPollResults batch.

The poll step derives its tie-breaker automatically from the first column in the ORDER BY clause of query.

The plugin parses the SQL statement using Apache Calcite. After the first successful poll, it appends a condition on the tie-breaker column and adds the latest tie-breaker value as an extra prepared-statement parameter.

Use placeholders with ? for the parameters you configure explicitly through parameters(…​). The generated tie-breaker parameter is appended by the plugin and must not be added manually.

When the query uses reserved identifiers, escape them using the quoting style expected by the target database. Refer to your database vendor documentation for details.

Reference Documentation

Refer to your database vendor documentation for the exact SQL dialect and data types.

Save step

The save step inserts one or more rows into a SQL table.

Ancestor

Step

Functionality

The save step builds an INSERT INTO …​ VALUES …​ statement from the table name and column list, then executes it for each SqlSaveRecord produced from the current input and step context.

Example
.sql() (1)
.save {
    name = "save-entry"
    protocol(Protocol.POSTGRESQL) (2)
    connection { (3)
        port = 5432
        database = "qalipsis"
        username = "qalipsis"
        password = "qalipsis"
    }
    tableName { stepContext, input -> "buildingentries" } (4)
    columns { stepContext, input -> listOf("timestamp", "action", "username", "enabled") } (5)
    values { stepContext, input -> (6)
        listOf(
            SqlSaveRecord(
                input.timestamp,
                input.action,
                input.username,
                input.enabled
            )
        )
    }
}
1 Enter the SQL plugin namespace from the previous step.
2 Select the target SQL dialect.
3 Configure the connection pool for the save step.
4 Resolve the table to insert into.
5 Declare the ordered list of destination columns.
6 Build the rows to insert. The order of values in SqlSaveRecord must match the order of columns exactly.

The save step builds an INSERT INTO …​ VALUES …​ prepared statement from tableName, columns, and values.

Each SqlSaveRecord represents one row. The plugin sends rows by batches to optimize the performances, and the order of values in SqlSaveRecord must exactly match the order of columns.

The save output is SqlSaveResult<I>, which exposes the original input via result.input, the generated identifiers supported by the driver via result.resultIds, and execution metrics via result.sqlSaveStepMeters.

Reference Documentation

Refer to the documentation of your target SQL database for table definitions, constraints, and generated key behavior.

Search step

The search step executes a parameterized query and forwards the matching rows together with the original input.

Ancestor

Step

Functionality

The search step computes the query and parameters from the current input and step context, executes the statement, and returns the rows as SqlSearchBatchResults<I, Map<String, Any?>>.

Example
.sql() (1)
.search {
    name = "search-entries"
    protocol(Protocol.POSTGRESQL) (2)
    connection { (3)
        port = 5432
        database = "qalipsis"
        username = "qalipsis"
        password = "qalipsis"
    }
    query { stepContext, input -> (4)
        """
        select id, username, enabled
        from buildingentries
        where action = ? and enabled = ?
        order by id
        """.trimIndent()
    }
    parameters { stepContext, input -> (5)
        listOf(input.action, input.enabled)
    }
}
1 Enter the SQL plugin namespace from the previous step.
2 Select the target SQL dialect.
3 Configure the connection pool for the search step.
4 Build the SQL query dynamically from the current input and step context.
5 Provide the parameter values in the same order as the ? placeholders in the query.
Tip

The returned rows are available as result.records. Each SqlSearchRecord.value is a Map<String, Any?> keyed by lower-cased column names, for example record.value["username"].

Reference Documentation

Refer to your database documentation for the supported SQL syntax, indexes, and query tuning recommendations.

Transaction support

The current SQL plugin DSL does not expose explicit transaction settings, isolation levels, or commit / rollback controls. Each step acquires a connection from the pool, executes its operation, and closes the connection.

Configuration

DSL parameters

Available properties are described in the table below.

Property Description

connection

Configures the database connection pool for the step. The same block is available inside defaults {}.

+ Applicable Steps: Poll, Search, Save
Optional/Required: Required unless inherited from defaults
Data Type: Lambda with receiver SqlConnection
Default Value: SqlConnection(host = "localhost", port = -1, database = null, username = null, password = null, maxSize = 1, maxIdleTime = Duration.ofMinutes(1), maxCreateConnectionTime = Duration.ofMillis(5000), applicationName = "sql", options = emptyMap())

+ .Example

connection {
    host = "localhost"
    port = 5432
    database = "qalipsis"
    username = "qalipsis"
    password = "qalipsis"
}

connection.host

Host name of the database server.

+ Applicable Steps: Poll, Search, Save
Optional/Required: Optional
Data Type: String
Default Value: "localhost"

+ .Example

connection {
    host = "db.internal"
}

connection.port

Port of the database server.

+ Applicable Steps: Poll, Search, Save
Optional/Required: Required unless inherited from defaults
Data Type: Int
Default Value: -1

+ .Example

connection {
    port = 5432
}

connection.database

Name of the target database or schema catalog passed to the R2DBC connection factory.

+ Applicable Steps: Poll, Search, Save
Optional/Required: Optional
Data Type: String
Default Value: null

+ .Example

connection {
    database = "qalipsis"
}

connection.username

Username used to authenticate to the database.

+ Applicable Steps: Poll, Search, Save
Optional/Required: Optional
Data Type: String
Default Value: null

+ .Example

connection {
    username = "qalipsis"
}

connection.password

Password used to authenticate to the database.

+ Applicable Steps: Poll, Search, Save
Optional/Required: Optional
Data Type: String
Default Value: null

+ .Example

connection {
    password = "secret"
}

connection.maxSize

Maximum number of connections in the R2DBC pool.

+ Applicable Steps: Poll, Search, Save
Optional/Required: Optional
Data Type: Int
Default Value: 1

+ .Example

connection {
    maxSize = 10
}

connection.maxIdleTime

Maximum duration a pooled connection can stay idle before being recycled.

+ Applicable Steps: Poll, Search, Save
Optional/Required: Optional
Data Type: Duration
Default Value: Duration.ofMinutes(1)

+ .Example

connection {
    maxIdleTime = Duration.ofMinutes(5)
}

connection.maxCreateConnectionTime

Timeout used while establishing a new connection to the database.

+ Applicable Steps: Poll, Search, Save
Optional/Required: Optional
Data Type: Duration
Default Value: Duration.ofMillis(5000)

+ .Example

connection {
    maxCreateConnectionTime = Duration.ofSeconds(10)
}

connection.applicationName

Optional application name stored on the SqlConnection configuration object.

+ Applicable Steps: Poll, Search, Save
Optional/Required: Optional
Data Type: String
Default Value: "sql"

+ .Example

connection {
    applicationName = "my-load-campaign"
}

connection.options

Optional extra options stored on the SqlConnection configuration object.

+ Applicable Steps: Poll, Search, Save
Optional/Required: Optional
Data Type: Map<String, String>
Default Value: emptyMap()

+ .Example

connection {
    options = mapOf("key1" to "value1")
}

protocol

Selects the SQL dialect. The dialect drives the connection factory and rewrites the DSL placeholders to the vendor-specific syntax when necessary.
Applicable Steps: Poll, Search, Save
Optional/Required: Required unless inherited from defaults
Data Type: Function with parameter Protocol (enum): POSTGRESQL, MARIADB, MYSQL, MSSQL, ORACLE
Default Value: N/A

+ .Example

protocol(Protocol.POSTGRESQL)

query

Defines the SQL statement to execute.

+ Applicable Steps: Poll, Search
Optional/Required: Required
Data Type:
Poll: String
Search: Lambda which receives the current StepContext and input, and returns a String
Default Value: N/A

+ For Poll, the statement must be an ordered SELECT; the first ORDER BY column is used automatically as the polling tie-breaker. There is no separate tieBreaker property in the SQL DSL.

+ All placeholders must be written as ? in the DSL. The plugin converts them automatically for PostgreSQL ($1, $2, …​), Microsoft SQL Server (@P1, @P2, …​), and Oracle (:1, :2, …​).

+ .Example (Poll)

query(
    """
    select id, device, eventname
    from events
    where device <> ?
    order by id
    """.trimIndent()
)

+ .Example (Search)

query { stepContext, input ->
    """
    select id, username, enabled
    from buildingentries
    where action = ? and enabled = ?
    order by id
    """.trimIndent()
}

parameters

Ordered values bound to the placeholders in query.

+ Applicable Steps: Poll, Search
Optional/Required: Optional for queries without placeholders; otherwise required
Data Type:
Poll: vararg Any?
Search: Lambda which receives the current StepContext and input, and returns a List<Any?>
Default Value:
Poll: []
Search: return emptyList() when no parameters are required

+ The order of the values must match the order of the ? placeholders in the query.

+ .Example (Poll)

parameters("Truck #1")

+ .Example (Search)

parameters { stepContext, input ->
    listOf(input.action, input.enabled)
}

tableName

Resolves the destination table for the save step.

+ Applicable Steps: Save
Optional/Required: Required
Data Type: Lambda which receives the current StepContext and input, and returns a String
Default Value: { _, _ → "" }

+ .Example

tableName { stepContext, input ->
    "buildingentries"
}

columns

Resolves the ordered list of columns to populate in the generated INSERT INTO …​ VALUES …​ statement.

+ Applicable Steps: Save
Optional/Required: Required
Data Type: Lambda which receives the current StepContext and input, and returns a List<String>
Default Value: { _, _ → emptyList() }

+ .Example

columns { stepContext, input ->
    listOf("timestamp", "action", "username", "enabled")
}

values

Produces the rows to insert. Each SqlSaveRecord is matched positionally to columns.

+ Applicable Steps: Save
Optional/Required: Required
Data Type: Lambda which receives the current StepContext and input, and returns a List<SqlSaveRecord>
Default Value: { _, _ → emptyList() }

+ The save step does not batch records into one multi-row SQL statement; it inserts each SqlSaveRecord one by one.

+ .Example

values { stepContext, input ->
    listOf(
        SqlSaveRecord(
            input.timestamp,
            input.action,
            input.username,
            input.enabled
        )
    )
}

pollDelay

Delay between the end of one poll execution and the start of the next one.

+ Applicable Steps: Poll
Optional/Required: Required
Data Type: Duration
Default Value: N/A

+ .Example

pollDelay(Duration.ofSeconds(1))

flatten

Changes the output of poll from a batch SqlPollResults<Map<String, Any?>> to individual DatasourceRecord<Map<String, Any?>> items.

+ Applicable Steps: Poll
Optional/Required: Optional
Data Type: Function call flatten()
Default Value: Not enabled; poll emits batch results

+ .Example

scenario.sql().poll {
    // configuration
}.flatten()

singletonConfiguration

Configures how the results of poll are delivered to scenario minions. The default delivery mode is unicast; it can be switched to broadcast.

+ Applicable Steps: Poll
Optional/Required: Optional
Data Type: unicast() or broadcast(bufferSize, idleTimeout)
Default Value: unicast() (SingletonType.UNICAST)

+ .Example

broadcast(123, Duration.ofSeconds(20))

+ Refer to SingletonConfiguration parameters for details on the available strategies.

Shared defaults for SQL steps

You can define defaults once in the scenario section or just after, and let all following SQL steps inherit them.

scenario {
  sql().defaults { (1)
    connection {
      host = "localhost"
      port = 5432
      database = "qalipsis"
      username = "qalipsis"
      password = "qalipsis"
    }
    protocol(Protocol.POSTGRESQL)
    monitoring {
      events = true
      meters = true
    }
  }
}
1 Defaults are applied to subsequent SQL steps in the same scenario. Individual steps can still override values.