R2DBC-jasync plugin

Overview

The R2DBC-jasync plugin connects QALIPSIS to MariaDB, MySQL, and PostgreSQL.

Technologies addressed
Dependency

io.qalipsis.plugin:qalipsis-plugin-r2dbc-jasync

Namespace in scenario

r2dbcJasync()

Client library

Jasync-sql: refer to jasync-sql.

Jasync-sql is an asynchronous database driver for PostgreSQL and MySQL written in Kotlin.

Supported steps

Poll step

The poll step within the R2DBC-jasync plugin polls the newest records from the database, respecting a delay between each execution of a select query.

Ancestor

Scenario

Functionality

The poll step is created only once in a scenario; it is then used throughout the scenario as called. The poll step within the R2DBC-jasync plugin uses a strategy of "delivered at least once".

Example
r2dbcJasync().poll {
    name = "poll-step"
    protocol(Protocol.POSTGRESQL) (1)
    connection { (2)
        database = "iot"
        port = 5432
        username = "DavidW"
        password = "Awesome_1978"
    }
    query("SELECT * FROM battery_state ORDER BY \"timestamp\" ASC") (3)
    parameters() (4)
    pollDelay(Duration.ofSeconds(1)) (5)
}
1 Specify the protocol of the target database.
2 Configure the database connection.
3 Define the SQL query to execute. The first sorting column in ORDER BY becomes the poll tie-breaker.
4 Provide the query parameters. Here the statement does not require any explicit placeholder, so the parameter list is empty.
5 Wait one second between two poll executions.

The poll step derives its tie-breaker automatically from the first column in the ORDER BY clause of query.

The plugin parses the SQL statement using Apache Calcite. After the first successful poll, it appends a condition on the tie-breaker column and adds the latest tie-breaker value as an extra prepared-statement parameter.

Use placeholders with ? for the parameters you configure explicitly through parameters(…​). The generated tie-breaker parameter is appended by the plugin and must not be added manually.

When the query uses reserved identifiers, escape them using the quoting style expected by the target database: generally double quotes for PostgreSQL and backticks for MySQL or MariaDB.

Tips
  • Use the poll step with a join operator when you need to verify the values saved by your tested system in the database.

  • query requires at least one sorting field to filter the newest results and not fetch the same records repeatedly.

Reference documentation

Refer to Configuring and Managing Connections for further parameter and configuration information.

Save step

The save step within the R2DBC-jasync plugin persists a batch of records into an SQL table using a prepared statement.

Ancestor

Step

Functionality

The save step’s input and step context are used to generate the table name, the ordered list of columns, and the rows to insert. The output is a JasyncSaveResult<I> that contains the original input, the generated identifiers returned by the driver, and execution metrics.

Example
.r2dbcJasync() (1)
.save {
    name = "save"
    protocol(Protocol.POSTGRESQL) (2)
    connection { (3)
        database = "iot"
        port = 5432
        username = "DavidW"
        password = "Awesome_1978"
    }
    tableName { stepContext, input -> (4)
        "battery_state"
    }
    columns { stepContext, input -> (5)
        listOf("device_id", "timestamp", "battery_level")
    }
    values { stepContext, input -> (6)
        listOf(
            JasyncSaveRecord(
                input.deviceId,
                input.timestamp.epochSecond,
                input.batteryLevel
            )
        )
    }
    monitoring { (7)
        events = false
        meters = true
    }
}
1 Wrap the preceding step into the R2DBC-jasync plugin namespace. Required before any R2DBC-jasync step.
2 Specify the protocol of the target database.
3 Configure the database connection.
4 Returns the name of the table to insert into.
5 Returns the ordered list of columns used to build the INSERT statement.
6 Build the list of rows to insert. Each JasyncSaveRecord represents one row, and its values must match the order of columns.
7 Enable meter publication for the step while keeping event emission disabled.

The save step builds an INSERT INTO …​ VALUES …​ prepared statement from tableName, columns, and values.

Each JasyncSaveRecord represents one row. The plugin sends rows one by one, and the order of values in JasyncSaveRecord must exactly match the order of columns.

The save output is JasyncSaveResult<I>, which exposes the original input via result.input, the generated identifiers supported by the driver via result.resultIds, and execution metrics via result.jasyncSaveStepMeters.

Tips
  • Use the save step after a collect() step in order to reduce the number of queries executed onto the database and to reduce the load your scenario generates on the test system.

  • The save step is extremely flexible; table name, columns, and values may depend on the context and input value received from the previous step.

Reference documentation

Refer to Configuring and Managing Connections for further parameter and configuration information.

Search step

The search step within the R2DBC-jasync plugin searches records in the database using a prepared SQL query.

Ancestor

Step

Functionality

The search step’s input and step context are used to generate a prepared statement and the values bound to its placeholders. The matching results are wrapped in a JasyncSearchBatchResults<I, Map<String, Any?>> and forwarded to the next step(s). Access the rows via result.records: List<JasyncSearchRecord<Map<String, Any?>>>.

Example
.r2dbcJasync() (1)
.search {
    name = "search"
    protocol(Protocol.POSTGRESQL) (2)
    connection { (3)
        database = "iot"
        port = 3306
        username = "DavidW"
        password = "Awesome_1978"
    }
    query { stepContext, input -> (4)
        "SELECT DISTINCT * FROM battery_state WHERE \"timestamp\" = ? AND device_id = ?"
    }
    parameters { stepContext, input -> (5)
        listOf(input.timestamp.epochSecond, input.deviceId)
    }
}
1 Enter the R2DBC-jasync plugin namespace.
2 Specify the protocol of the target database.
3 Configure the database connection.
4 Build the prepared SQL query to execute.
5 Returns the values bound to the placeholders in the same order as they appear in the query.
Tip

Use the search step after a collect() step in order to reduce the number of queries executed onto the database and to reduce the load the scenario generates on the test system.

Reference documentation

Refer to Configuring and Managing Connections for further parameter and configuration information.

Configuration

DSL parameters

Available parameters are described in the table below.

Parameter Description

connection

Configures the connection to the target SQL database. The same block is used by Poll, Save, and Search. The supported sub-properties are described as connection.<name> rows below.
Applicable Steps: Poll, Save, Search
Optional/Required: Required
Data Type: Lambda with receiver JasyncConnection
Default Value: N/A

Example
connection {
    host = "localhost"
    port = 5432
    database = "iot"
    username = "postgres"
    password = "root"
}

connection.host

Database host name.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: String
Default Value: "localhost"

Example
connection {
    host = "db.internal"
}

connection.port

Database port.
Applicable Steps: Poll, Save, Search
Optional/Required: Required
Data Type: Int
Default Value: N/A

Example
connection {
    port = 5432
}

connection.database

Database name to connect to.
Applicable Steps: Poll, Save, Search
Optional/Required: Required
Data Type: String
Default Value: N/A

Example
connection {
    database = "iot"
}

connection.username

User name for the database connection.
Applicable Steps: Poll, Save, Search
Optional/Required: Required
Data Type: String
Default Value: N/A

Example
connection {
    username = "postgres"
}

connection.password

Password for the database connection.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: String?
Default Value: null

Example
connection {
    password = "root"
}

connection.maxActiveConnections

Maximum number of active connections in the Jasync pool.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: Int
Default Value: 1

Example
connection {
    maxActiveConnections = 10
}

connection.maxIdleTime

Maximum idle time for pooled connections.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: java.time.Duration
Default Value: Duration.ofMinutes(1)

Example
connection {
    maxIdleTime = Duration.ofMinutes(5)
}

connection.maxPendingQueries

Maximum number of queued requests waiting for a pooled connection.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: Int
Default Value: Int.MAX_VALUE

Example
connection {
    maxPendingQueries = 500
}

connection.connectionValidationInterval

Period used by the pool to validate idle connections.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: java.time.Duration
Default Value: Duration.ofMillis(5000)

Example
connection {
    connectionValidationInterval = Duration.ofSeconds(10)
}

connection.connectionCreateTimeout

Timeout for establishing a new database connection.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: java.time.Duration
Default Value: Duration.ofMillis(5000)

Example
connection {
    connectionCreateTimeout = Duration.ofSeconds(15)
}

connection.connectionTestTimeout

Timeout for connection health checks performed by the pool.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: java.time.Duration
Default Value: Duration.ofMillis(5000)

Example
connection {
    connectionTestTimeout = Duration.ofSeconds(15)
}

connection.queryTimeout

Optional timeout applied to SQL queries.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: java.time.Duration?
Default Value: null

Example
connection {
    queryTimeout = Duration.ofSeconds(30)
}

connection.eventLoopGroup

Netty event loop group used by the Jasync client. Override it only when transport selection or advanced tuning is required.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: io.netty.channel.EventLoopGroup
Default Value: NettyUtils.DefaultEventLoopGroup

Example
connection {
    eventLoopGroup = NettyUtils.DefaultEventLoopGroup
}

connection.executionContext

Executor used by Jasync to run callbacks.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: java.util.concurrent.Executor
Default Value: ExecutorServiceUtils.CommonPool

Example
connection {
    executionContext = ExecutorServiceUtils.CommonPool
}

connection.ssl

SSL configuration passed to the Jasync client.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: SSLConfiguration
Default Value: SSLConfiguration()

Example
connection {
    ssl = SSLConfiguration()
}

connection.charset

Character set used for the connection.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: java.nio.charset.Charset
Default Value: StandardCharsets.UTF_8

Example
connection {
    charset = StandardCharsets.UTF_8
}

connection.maximumMessageSize

Maximum size, in bytes, of a server message accepted by the client.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: Int
Default Value: 16777216

Example
connection {
    maximumMessageSize = 16777216
}

connection.allocator

Netty buffer allocator used by the client.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: io.netty.buffer.ByteBufAllocator
Default Value: PooledByteBufAllocator.DEFAULT

Example
connection {
    allocator = PooledByteBufAllocator.DEFAULT
}

connection.applicationName

Optional application name reported to the database server.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: String?
Default Value: "r2dbc-jasync"

Example
connection {
    applicationName = "inventory-checks"
}

connection.interceptors

Query interceptors invoked by the Jasync client.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: List<Supplier<QueryInterceptor>>
Default Value: emptyList()

Example
connection {
    interceptors = emptyList()
}

connection.maxConnectionTtl

Maximum lifetime for a pooled connection before it is recycled.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: java.time.Duration?
Default Value: null

Example
connection {
    maxConnectionTtl = Duration.ofMinutes(15)
}

protocol

Specifies the protocol of the target database.
Applicable Steps: Poll, Save, Search
Optional/Required: Required
Data Type: Protocol (enum) one of Protocol.POSTGRESQL, Protocol.MYSQL, or Protocol.MARIADB
Default Value: N/A

Example
protocol(Protocol.POSTGRESQL)

query

Configures the SQL statement to execute. The shape of the function differs between Poll and Search. In Poll, the statement must contain an ORDER BY clause and its first sorting column becomes the tie-breaker tracked by the plugin between poll executions. In Search, the statement is generated from the step context and the current input.
Applicable Steps: Poll, Search
Optional/Required: Required
Data Type:
Poll: String
Search: Lambda that takes step context and input and returns a String
Default Value: N/A

Example (Poll)
query("SELECT * FROM battery_state ORDER BY \"timestamp\" ASC")
Example (Search)
query { stepContext, input ->
    "SELECT DISTINCT * FROM battery_state WHERE \"timestamp\" = ? AND device_id = ?"
}

parameters

Supplies the values bound to the SQL placeholders. In Poll, parameters are provided directly and can be omitted when the statement has no placeholder. In Search, a parameter factory is expected even when the query does not require arguments, in which case it should return emptyList(). Java time values are converted to Jasync-compatible types automatically.
Applicable Steps: Poll, Search
Optional/Required: Optional (Poll); Required (Search)
Data Type:
Poll: vararg Any?
Search: Lambda that takes step context and input and returns List<Any?>
Default Value:
Poll: empty parameter list
Search: N/A

Example (Poll)
parameters()
Example (Search)
parameters { stepContext, input ->
    listOf(input.timestamp.epochSecond, input.deviceId)
}

tableName

Defines the target table for the save operation. The table name is computed from the step context and current input.
Applicable Step: Save
Optional/Required: Required
Data Type: Lambda that takes step context and input and returns a String
Default Value: { stepContext, input → "" }

Example
tableName { stepContext, input ->
    "battery_state"
}

columns

Defines the ordered list of columns to insert into. The order of the column names must match the order of values in each JasyncSaveRecord.
Applicable Step: Save
Optional/Required: Required
Data Type: Lambda that takes step context and input and returns List<String>
Default Value: { stepContext, input → emptyList() }

Example
columns { stepContext, input ->
    listOf("device_id", "timestamp", "battery_level")
}

values

Builds the records to insert. Each JasyncSaveRecord represents one row, and its values are bound in the same order as columns. The plugin sends rows one by one using prepared statements.
Applicable Step: Save
Optional/Required: Required
Data Type: Lambda that takes step context and input and returns List<JasyncSaveRecord>
Default Value: { stepContext, input → emptyList() }

Example
values { stepContext, input ->
    listOf(
        JasyncSaveRecord(
            input.deviceId,
            input.timestamp.epochSecond,
            input.batteryLevel
        )
    )
}

pollDelay

Delay between two poll executions. The delay is applied between the end of one poll and the start of the next one.
Applicable Step: Poll
Optional/Required: Required
Data Type: java.time.Duration
Default Value: N/A

Example
pollDelay(Duration.ofSeconds(1))

monitoring

Configures step-level monitoring. The block allows enabling event emission and meter publication for Poll, Save, and Search.
Applicable Steps: Poll, Save, Search
Optional/Required: Optional
Data Type: Lambda with receiver StepMonitoringConfiguration
Default Value: events = false, meters = false

Example
monitoring {
    events = false
    meters = true
}

singletonConfiguration

Configures how polled results are delivered to scenario minions. The poll step defaults to unicast delivery and can be switched to broadcast using the inherited singleton functions.
Applicable Step: Poll
Optional/Required: Optional
Data Type: SingletonConfiguration configured via unicast() or broadcast(bufferSize, idleTimeout)
Default Value: SingletonConfiguration(SingletonType.UNICAST)

Example
r2dbcJasync().poll {
    broadcast(123, Duration.ofSeconds(20))
}

Shared defaults for R2DBC-jasync steps

You can define defaults once in the scenario section or just after, and let all following R2DBC-jasync steps inherit them.

scenario {
  r2dbcJasync().defaults { (1)
    connection {
      host = "localhost"
      port = 5432
      database = "iot"
      username = "postgres"
      password = "root"
    }
    protocol(Protocol.POSTGRESQL)
    monitoring {
      events = true
      meters = true
    }
  }
}
1 Defaults are applied to subsequent R2DBC-jasync steps in the same scenario. Individual steps can still override values.