Netty plugin

Overview

The Netty plugin enables load testing and performance testing for network protocols including TCP, UDP, and MQTT.

Technologies addressed
Dependency

io.qalipsis.plugin:qalipsis-plugin-netty

Namespace in scenario

netty()

Client library

Netty: refer to the Netty project.

Supported steps

TCP steps

The TCP step model allows establishing connections to TCP servers and exchanging byte-level data. Two main patterns are supported: simple connections with built-in lifecycle management, and connection reuse across multiple steps.

TCP step (simple connection)

The tcp step creates a TCP connection, sends a request, and either maintains or closes the connection based on configuration.

Ancestor

Scenario or Step

Functionality
  • Establishes a TCP connection to the specified remote address.

  • Sends the payload generated by the request lambda.

  • Receives a response from the server.

  • If keepConnectionAlive is false, closes the connection after receiving the response.

  • If keepConnectionAlive is true, the connection persists for reuse via tcpWith steps.

When connections are kept open without a pool, use a closeTcp step at the end of the workflow to properly close the connection and avoid system resource saturation.

Example
.start()
.netty()
.tcp { (1)
    name = "my-tcp" (2)
    connect {
        address("remote-server.acme.com", 9000) (3)
        noDelay = true (4)
        keepConnectionAlive = true (5)
    }
    request { stepContext, input -> (6)
        "My TCP request".toByteArray(StandardCharsets.UTF_8)
    }
.verify { (7)
    assertThat(it.meters.timeToSuccessfulConnect)
        .isNotNull()
        .isLessThan(Duration.ofSeconds(1))
}
1 Enter the Netty plugin namespace and start a TCP step.
2 Name the step for later reference by tcpWith or closeTcp.
3 Set the remote server address and port.
4 Disable Nagle Algorithm for reduced latency.
5 Keep the connection open across multiple steps for reuse.
6 Define the payload to send; returns a ByteArray from the input or context.
7 Verify that the connection was established within 1 second.
Reference Documentation

Refer to the Netty documentation for advanced channel configuration and tuning options.

TCP WITH step (connection reuse)

The tcpWith step reuses an existing TCP connection created by a preceding tcp step, avoiding the overhead of repeated connection establishment.

Ancestor

Step

Functionality
  • Requires a preceding tcp step with the name specified in tcpWith.

  • Reuses the exact connection configuration (address, TLS, proxy settings) from the tcp step.

  • Sends a new request payload on the existing connection.

  • Does not establish a new connection or renegotiate TLS.

Example
.netty().tcp { (1)
    name = "my-tcp"
    connect {
        address("remote-server.acme.com", 9000)
        noDelay = true
        keepConnectionAlive = true
    }
    request { stepContext, input -> "First request".toByteArray() }
}
.netty().tcpWith("my-tcp") { (2)
    name = "reuse-tcp"
    request { stepContext, input -> "Second request".toByteArray() } (3)
    iterate(100) (4)
}
1 Establish a TCP connection named "my-tcp" with keepConnectionAlive = true.
2 Reuse the connection from "my-tcp" for a second request.
3 Send a different payload on the same connection.
4 Repeat this request 100 times on the same connection.
Tips
  • tcpWith connects to the same remote address as its companion tcp step.

  • All connection settings (TLS, proxy, timeouts) are inherited from the tcp step.

  • Use iterate() to send multiple requests on the same connection.

  • Chain multiple tcpWith steps to build workflows across one persistent connection.

Reference Documentation

Refer to the Netty documentation for advanced connection patterns.

CLOSE TCP step

The closeTcp step closes a TCP connection that was established with keepConnectionAlive = true, allowing graceful shutdown and resource cleanup.

Ancestor

Step

Functionality
  • Closes the connection associated with the named tcp step.

  • Respects the shutdownTimeout setting for graceful closure.

  • Simply forwards the input to the next step(s) without modification.

It is best practice to use closeTcp when reusing a TCP connection across multiple steps without a connection pool.

Example
.netty().tcp {
    name = "my-tcp"
    connect {
        address("remote-server.acme.com", 9000)
        keepConnectionAlive = true
    }
    request { stepContext, input -> "Hello".toByteArray() }
}
.netty().tcpWith("my-tcp") {
    request { stepContext, input -> "Goodbye".toByteArray() }
}
.closeTcp("my-tcp") (1)
1 Closes the TCP connection associated with "my-tcp".
Reference Documentation

Refer to the Netty documentation for connection lifecycle management.

TCP with connection pooling

TCP connection pooling maintains a pool of reusable connections, automatically balancing load across minions.

Pattern
.start()
.netty()
.tcp { (1)
    name = "pooled-tcp"
    connect {
        address("remote-server.acme.com", 9000)
        noDelay = true
        pool { size = 50 } (2)
    }
    request {stepContext, input -> "Request via pool".toByteArray() }
}
.netty().tcpWith("pooled-tcp") { (3)
    request { stepContext, input -> "Another request via pool".toByteArray() }
    iterate(50) (4)
}
1 Create a TCP step with connection pooling enabled.
2 Set the pool to maintain 50 concurrent connections, distributed among minions.
3 Reuse a pooled connection for the next request.
4 Send 50 additional requests, each pulling from the pool.
Tips
  • Pooling is ideal for high-concurrency scenarios with many minions.

  • No closeTcp is needed when using pooling; connections are managed automatically.

  • Tune pool.size based on server capacity and target concurrency.

  • Enable pool.checkHealthBeforeUse to detect and refresh stale connections.

UDP step

The UDP step sends datagrams to a UDP endpoint and receives responses.

Ancestor

Scenario or Step

Functionality
  • Creates UDP datagrams and sends them to the specified endpoint.

  • Receives responses back from the endpoint (if available).

Example
.start()
.netty()
.udp { (1)
    iterations = 5 (2)
    connect {
        address("remote-server.acme.com", 5000) (3)
        connectTimeout = Duration.ofSeconds(2) (4)
    }
    request { context, input -> (5)
        "My UDP datagram".toByteArray(StandardCharsets.UTF_8)
    }
}
1 Enter the Netty plugin namespace and start a UDP step.
2 Each minion sends 5 datagrams to the endpoint.
3 Specify the remote endpoint address and port.
4 Set the maximum time allowed to establish the UDP channel.
5 Generates the datagram payload from the input or context.
Tips
  • Increase iterations to send multiple datagrams per minion.

  • UDP is ideal for stateless, request-response scenarios (DNS, metrics, etc.).

  • Response timeouts can be controlled via readTimeout.

Reference Documentation

Refer to the Netty documentation for UDP protocol details.

MQTT steps

MQTT provides publish-subscribe messaging through a broker. Two step models support independent publish and subscribe operations.

MQTT Publish step

The mqttPublish step sends records (messages) to MQTT topics on a broker.

Ancestor

Step

Functionality
  • Connects to an MQTT broker as a publisher client.

  • Sends one or more messages per execution via the records lambda.

  • Supports configurable Quality of Service (QoS) and message retention.

  • Each MqttPublishRecord can specify topic, payload, QoS, and retention independently.

Example
.start()
.netty()
.mqttPublish { (1)
    connect {
        host = "mqtt-broker.acme.com" (2)
        port = 1883 (3)
    }
    protocol(MqttVersion.MQTT_3_1_1) (4)
    clientName("publisher-client") (5)
    records { context, input -> (6)
        listOf(
            MqttPublishRecord(
                value = "Sensor reading: 23.5°C", (7)
                topicName = "devices/sensors/temperature", (8)
                qoS = MqttQoS.EXACTLY_ONCE, (9)
                retainedMessage = true (10)
            )
        )
    }
}
1 Enter the Netty plugin namespace and start an MQTT publish step.
2 Specify the MQTT broker hostname.
3 Set the MQTT broker port (standard is 1883, 8883 for TLS).
4 Select the MQTT protocol version for compatibility with the broker.
5 Set the client identifier for the publisher on the broker.
6 Factory lambda to generate a list of records to publish from the input/context.
7 Message payload (typically JSON or text).
8 MQTT topic to publish to.
9 QoS level: AT_MOST_ONCE (0), AT_LEAST_ONCE (1), or EXACTLY_ONCE (2).
10 If true, broker retains the last message on this topic for new subscribers.
Tips
  • Generate multiple MqttPublishRecord objects in the records lambda to publish several topics in one execution.

  • Use EXACTLY_ONCE QoS for critical messages; AT_LEAST_ONCE for typical scenarios; AT_MOST_ONCE for best-effort.

  • Enable retainedMessage to preserve state across subscribers (e.g., for device status).

  • Reuse minion input/context values in topic names or payloads for dynamic scenarios.

Reference Documentation

Refer to the MQTT specification and your broker’s documentation for topic design and QoS implications.

MQTT Subscribe step

The mqttSubscribe step consumes messages from MQTT topics on a broker.

Ancestor

Scenario or Step

Functionality
  • Connects to an MQTT broker as a subscriber client.

  • Subscribes to one or more topics using filters (supporting + and # wildcards).

  • Receives messages from the broker in real-time.

  • Output can be deserialized to typed objects or strings via .deserialize().

  • Concurrency controls how many subscriber tasks process messages in parallel.

Example
.start()
.netty()
.mqttSubscribe { (1)
    connect {
        host = "mqtt-broker.acme.com" (2)
        port = 1883 (3)
    }
    protocol(MqttVersion.MQTT_3_1_1) (4)
    clientName("subscriber-client") (5)
    concurrency(2) (6)
    topicFilter("devices/sensors/+/temperature") (7)
    qoS(MqttQoS.AT_LEAST_ONCE) (8)
}
.deserialize(MessageStringDeserializer::class) (9)
.onEach { receivedMessage -> (10)
    println("Received: ${receivedMessage.value}")
}
1 Enter the Netty plugin namespace and start an MQTT subscribe step.
2 Specify the MQTT broker hostname.
3 Set the MQTT broker port.
4 Select the MQTT protocol version.
5 Unique client identifier for the subscriber on the broker.
6 Number of concurrent subscriber tasks (threads) processing messages.
7 Topic filter with wildcard: + matches a single level, # matches multiple levels. Here matches devices/sensors/*/temperature.
8 QoS for subscriptions: guarantees matching the server’s capability.
9 Deserialize raw MQTT messages to strings (or JSON, byte arrays, etc.).
10 Processes each deserialized message in the workflow.
Tips
  • Combine multiple mqttSubscribe steps with different topic filters in a single scenario for multi-topic monitoring.

  • Use topic wildcards (+, #) to subscribe to many topics with a single filter.

  • Increase concurrency for high-throughput scenarios, but monitor latency impact.

  • Match QoS between publisher and subscriber for optimal performance.

Reference Documentation

Refer to the MQTT specification for topic filter syntax and QoS semantics.

Configuration

DSL parameters

Available parameters are described in the table below, organized by protocol.

Common parameters

The following parameters are applicable to all protocol types.

Parameter Description

connect

Configures the connection to the remote endpoint.
Applicable Steps: TCP, UDP, MQTT Publish, MQTT Subscribe
Optional/Required: Required
Data Type: Lambda/Configuration block
Default Value: N/A

Example (TCP)
connect {
    address("remote-server.acme.com", 9000)
    connectTimeout = Duration.ofSeconds(5)
    noDelay = true
}
Example (UDP)
connect {
    address("remote-server.acme.com", 9000)
    connectTimeout = Duration.ofSeconds(5)
}
Example (MQTT)
connect {
    host = "localhost"
    port = 1883
}

connect.connectTimeout

Maximum duration to establish a connection before timing out.
Applicable Steps: TCP, UDP
Optional/Required: Optional
Data Type: java.time.Duration
Default Value: 10 seconds

Example
connect {
    connectTimeout = Duration.ofSeconds(5)
}

connect.readTimeout

Maximum duration to wait for a response after sending a request before timing out.
Applicable Steps: TCP, UDP
Optional/Required: Optional
Data Type: java.time.Duration
Default Value: 10 seconds

Example
connect {
    readTimeout = Duration.ofSeconds(5)
}

connect.shutdownTimeout

Maximum duration to gracefully close a connection.
Applicable Steps: TCP, UDP
Optional/Required: Optional
Data Type: java.time.Duration
Default Value: 10 seconds

Example
connect {
    shutdownTimeout = Duration.ofSeconds(5)
}

connect.sendBufferSize

Size of the buffer used to prepare data for transmission. When omitted, the operating system default is used.
Applicable Steps: TCP, UDP
Optional/Required: Optional
Data Type: Int (bytes)
Default Value: null (uses the operating system default)

Example
connect {
    sendBufferSize = 2048
}

connect.receiveBufferSize

Size of the buffer used to receive incoming data. When omitted, the operating system default is used.
Applicable Steps: TCP, UDP
Optional/Required: Optional
Data Type: Int (bytes)
Default Value: null (uses the operating system default)

Example
connect {
    receiveBufferSize = 2048
}

connect.nettyChannelOption

Adds a single Netty channel option for advanced tuning.
Applicable Steps: TCP, UDP
Optional/Required: Optional
Data Type: ChannelOption<T> and T
Default Value: N/A

Example
connect {
    nettyChannelOption(ChannelOption.SO_REUSEADDR, true)
}

connect.nettyChannelOptions

Adds multiple Netty channel options for advanced tuning.
Applicable Steps: TCP, UDP
Optional/Required: Optional
Data Type: Vararg Pair<ChannelOption<*>, Any>
Default Value: emptyMap() (uses Netty/OS defaults)

Example
connect {
    nettyChannelOptions(
        ChannelOption.SO_KEEPALIVE to true,
        ChannelOption.TCP_NODELAY to true,
        ChannelOption.CONNECT_TIMEOUT_MILLIS to 5000,
        ChannelOption.SO_REUSEADDR to true
    )
}

TCP-specific parameters

The following parameters apply only to TCP connections.

Parameter Description

connect.address

The remote host and port to connect to.
Applicable Step: TCP
Optional/Required: Required
Data Type: String (hostname/IP) and Int (port)
Default Value: N/A

Example
connect {
    address("remote-server.acme.com", 9000)
}

connect.noDelay

Disables the Nagle Algorithm for reduced latency at the cost of potential throughput reduction. When true, data is sent immediately.
Applicable Step: TCP
Optional/Required: Optional
Data Type: Boolean
Default Value: true

Example
connect {
    noDelay = true
}

connect.keepConnectionAlive

Keeps the connection open across multiple steps instead of closing after each request. When true, the connection persists; when false, connections are closed after use.
Applicable Step: TCP
Optional/Required: Optional
Data Type: Boolean
Default Value: true

Example
connect {
    keepConnectionAlive = true
}

connect.tls

Enables SSL/TLS encryption for the TCP connection.
Applicable Step: TCP
Optional/Required: Optional
Data Type: Lambda that receives a TLS configuration block
Default Value: N/A (no encryption)

Example
connect {
    tls {
        disableCertificateVerification = true
        protocols(SslProtocol.TLS_1_2, SslProtocol.TLS_1_3)
        ciphers = listOf(
            "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
            "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"
        )
    }
}

connect.tls.disableCertificateVerification

Disables SSL/TLS certificate verification.
Applicable Step: TCP
Optional/Required: Optional
Data Type: Boolean
Default Value: false

Example
connect {
    tls {
        disableCertificateVerification = true
    }
}

connect.tls.protocols

List of allowed SSL/TLS protocol versions.
Applicable Step: TCP
Optional/Required: Optional
Data Type: Vararg String
Default Value: emptyArray<String>() (uses Netty defaults)

Example
connect {
    tls {
        protocols("TLSv1.2", "TLSv1.3")
    }
}

connect.tls.ciphers

List of supported cipher suites for SSL/TLS negotiation. Defaults to Netty/JDK defaults when not specified.
Applicable Step: TCP
Optional/Required: Optional
Data Type: Vararg String
Default Value: emptyArray<String>() (uses Netty/JDK defaults)

Example
connect {
    tls {
        ciphers(
            "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
            "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
            "TLS_AES_128_GCM_SHA256"
        )
    }
}

connect.proxy

Configuration for routing connections through a proxy server (SOCKS4, SOCKS5).
Applicable Step: TCP
Optional/Required: Optional
Data Type: Configuration block
Default Value: N/A (direct connection)

Example
connect {
    proxy {
        type = TcpProxyType.SOCKS5
        address("my-proxy.acme.com", 9876)
        authenticate("tunnel_user", "secure_pass")
    }
}

connect.proxy.type

Type of proxy protocol to use.
Applicable Step: TCP
Optional/Required: Optional
Data Type: ENUM (TcpProxyType: SOCKS4, SOCKS5)
Default Value: SOCKS4

Example
proxy {
    type = TcpProxyType.SOCKS5
}

connect.proxy.address

The proxy server host and port.
Applicable Step: TCP
Optional/Required: Required (if proxy block present)
Data Type: String (hostname/IP) and Int (port)
Default Value: N/A

Example
proxy {
    address("my-proxy.acme.com", 9876)
}

connect.proxy.authenticate

Optional credentials for proxy authentication.
Applicable Step: TCP
Optional/Required: Optional
Data Type: String? (username) and String? (password)
Default Value: null, null

Example
proxy {
    authenticate("tunnel_user", "secure_pass")
}

connect.pool

Configures connection pooling to reuse existing connections across multiple requests. Reduces overhead of repeated TCP handshakes.
Applicable Step: TCP
Optional/Required: Optional
Data Type: Configuration block
Default Value: N/A (individual connections per minion)

Example
connect {
    pool {
        size = 50
        checkHealthBeforeUse = true
    }
}

connect.pool.size

Maximum number of concurrent connections maintained in the pool.
Applicable Step: TCP
Optional/Required: Required (if pool block present)
Data Type: Int
Default Value: 1

Example
connect {
    pool {
        size = 50
    }
}

connect.pool.checkHealthBeforeUse

Attempts a health check on a pooled connection before reusing it. Helps detect stale connections.
Applicable Step: TCP
Optional/Required: Optional
Data Type: Boolean
Default Value: false

Example
connect {
    pool {
        checkHealthBeforeUse = true
    }
}

request

Factory lambda to create the TCP payload from the step context and input. Returns a ByteArray to transmit.
Applicable Steps: TCP (via tcp and tcpWith)
Optional/Required: Optional
Data Type: Lambda that receives the stepContext, and input and returns ByteArray
Default Value: ByteArray(0)

Example
request { context, input ->
    "Hello, server!".toByteArray(StandardCharsets.UTF_8)
}

UDP-specific parameters

The following parameters apply only to UDP connections.

Parameter Description

connect.address

The remote host and port to send datagrams to.
Applicable Step: UDP
Optional/Required: Required
Data Type: String (hostname/IP) and Int (port)
Default Value: N/A

Example
connect {
    address("remote-server.acme.com", 5000)
}

request

Factory lambda to create the UDP datagram payload from the step context and input.
Applicable Step: UDP
Optional/Required: Optional
Data Type: Lambda that receives the stepContext and input and returns ByteArray
Default Value: ByteArray(0)

Example
request { context, input ->
    "UDP datagram content".toByteArray(StandardCharsets.UTF_8)
}

iterations

Number of UDP datagrams to send from each minion.
Applicable Step: UDP
Optional/Required: Optional
Data Type: Long
Default Value: 1

Example
iterations = 10

MQTT-specific parameters

The following parameters apply only to MQTT connections.

Parameter Description

connect.host

The hostname or IP address of the MQTT broker.
Applicable Steps: MQTT Publish, MQTT Subscribe
Optional/Required: Optional
Data Type: String
Default Value: "localhost"

Example
connect {
    host = "mqtt-broker.acme.com"
}

connect.port

The port number of the MQTT broker. Standard MQTT port is 1883; MQTT over TLS uses 8883.
Applicable Steps: MQTT Publish, MQTT Subscribe
Optional/Required: Optional
Data Type: Int
Default Value: 1883

Example
connect {
    port = 1883
}

connect.reconnect

Controls whether the MQTT client should try to reconnect when the connection is closed.
Applicable Steps: MQTT Publish, MQTT Subscribe
Optional/Required: Optional
Data Type: Boolean
Default Value: true

Example
connect {
    reconnect = false
}

protocol

MQTT protocol version to use. Specifies compatibility level with the broker.
Applicable Steps: MQTT Publish, MQTT Subscribe
Optional/Required: Optional
Data Type: MqttVersion enum: MqttVersion: MQTT_3_1, MQTT_3_1_1, MQTT_5
Default Value: MqttVersion.MQTT_3_1_1

Example
protocol(MqttVersion.MQTT_3_1_1)

clientName

Unique identifier for the MQTT client on the broker. Used for connection authentication and session tracking.
Applicable Steps: MQTT Publish, MQTT Subscribe
Optional/Required: Required
Data Type: String
Default Value: N/A

Example
clientName("test-mqtt-client-1")

auth

Configures username and password authentication for the MQTT client.
Applicable Steps: MQTT Publish, MQTT Subscribe
Optional/Required: Optional
Data Type: Configuration block
Default Value: username = "", password = ""

Example
auth {
    username = "mqtt-user"
    password = "mqtt-password"
}

auth.username

Username used to authenticate against the MQTT broker.
Applicable Steps: MQTT Publish, MQTT Subscribe
Optional/Required: Optional
Data Type: String
Default Value: ""

Example
auth {
    username = "mqtt-user"
}

auth.password

Password used to authenticate against the MQTT broker.
Applicable Steps: MQTT Publish, MQTT Subscribe
Optional/Required: Optional
Data Type: String
Default Value: ""

Example
auth {
    password = "mqtt-password"
}

records

Factory lambda to create a list of MqttPublishRecord objects from the step context and input.
Applicable Step: MQTT Publish
Optional/Required: Optional
Data Type: Lambda that receives the stepContext and input and returns List<MqttPublishRecord>
Default Value: emptyList()

Example
records { stepContext, input ->
    listOf(
        MqttPublishRecord(
            value = "sensor reading: 23.5C",
            topicName = "devices/sensors/temperature",
            qoS = MqttQoS.EXACTLY_ONCE,
            retainedMessage = true
        )
    )
}

records.value

Payload value of the MqttPublishRecord to send.
Applicable Step: MQTT Publish
Optional/Required: Required
Data Type: String
Default Value: N/A

Example
records { stepContext, input ->
    listOf(MqttPublishRecord(value = "sensor reading: 23.5C", topicName = "devices/sensors/temperature"))
}

records.topicName

The MQTT topic to publish messages to. Can contain MQTT topic wildcards at the application level.
Applicable Step: MQTT Publish
Optional/Required: Required
Data Type: String
Default Value: N/A

Example
records { stepContext, input ->
    listOf(MqttPublishRecord(value = "payload", topicName = "devices/sensors/temperature"))
}

records.qoS

Quality of Service level to apply to each published MQTT record:
- AT_MOST_ONCE (0): Best effort, message may not arrive.
- AT_LEAST_ONCE (1): Guaranteed delivery, may receive duplicates.
- EXACTLY_ONCE (2): Guaranteed exact-once delivery.
Applicable Step: MQTT Publish
Optional/Required: Optional
Data Type: MqttQoS (enum) one of MqttQoS: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
Default Value: AT_LEAST_ONCE

Example
records { stepContext, input ->
    listOf(MqttPublishRecord(value = "payload", topicName = "devices/sensors/temperature", qoS = MqttQoS.EXACTLY_ONCE))
}

records.properties

MQTT properties to attach to the published record.
Applicable Step: MQTT Publish
Optional/Required: Optional
Data Type: io.netty.handler.codec.mqtt.MqttProperties?
Default Value: null

Example
records { stepContext, input ->
    listOf(
        MqttPublishRecord(
            value = "payload",
            topicName = "devices/sensors/temperature",
            properties = MqttProperties()
        )
    )
}

records.retainedMessage

If true, the MQTT broker retains the last message on the topic for future subscribers.
Applicable Step: MQTT Publish
Optional/Required: Optional
Data Type: Boolean
Default Value: true

Example
records { stepContext, input ->
    listOf(MqttPublishRecord(value = "payload", topicName = "devices/sensors/temperature", retainedMessage = true))
}

topicFilter

MQTT topic filter pattern for subscription. Supports wildcards: + (single level) and # (multi level).
Applicable Step: MQTT Subscribe
Optional/Required: Required
Data Type: String
Default Value: N/A

Example
topicFilter("devices/sensors/+/temperature")

qoS

Quality of Service level for the MQTT subscription:
- AT_MOST_ONCE (0): Best effort, message may not arrive.
- AT_LEAST_ONCE (1): Guaranteed delivery, may receive duplicates.
- EXACTLY_ONCE (2): Guaranteed exact-once delivery.
Applicable Steps: MQTT Subscribe
Optional/Required: Optional
Data Type: MqttQoS ENUM (MqttQoS: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE)
Default Value: AT_LEAST_ONCE

Example
qoS(MqttQoS.EXACTLY_ONCE)

concurrency

Number of concurrent subscriber threads/tasks for this step.
Applicable Step: MQTT Subscribe
Optional/Required: Optional
Data Type: Int
Default Value: 2

Example
concurrency(4)

Shared defaults for Netty steps

You can define defaults once in the scenario section or just after, and let all following Netty steps inherit them.

scenario {
  netty().defaults { (1)
    tcpConnection {
      address("remote-server.acme.com", 9000)
    }
    httpConnection {
      url("https://api.example.com")
    }
    udpConnection {
      address("remote-server.acme.com", 5000)
    }
    mqttConnection {
      host("mqtt.acme.com")
      port(1883)
    }
    monitoring {
      events = true
      meters = true
    }
  }
}
1 Defaults are applied to subsequent Netty steps in the same scenario. Individual steps can still override values.