RabbitMQ plugin
Overview
The RabbitMQ plugin connects QALIPSIS to the RabbitMQ broker.
- Technology addressed
-
RabbitMQ: https://www.rabbitmq.com/
- Dependency
-
io.qalipsis.plugin:qalipsis-plugin-rabbitmq - Namespace in scenario
-
rabbitmq() - Client library
-
RabbitMQ: refer to rabbitMQ
Supported steps
Consume step
The consume step within the RabbitMQ plugin consumes data from queues of the RabbitMQ broker and forwards it to the next step.
This step is commonly used with join to assert data or inject data into a workflow.
This step supports concurrent consumers through the concurrency() property. When concurrency is greater than 1, record order is no longer guaranteed.
To learn more, visit the RabbitMQ website.
- Ancestor
-
Scenario
- Functionality
-
The
consumestep starts a polling loop for consumption of the configured queue. As new records arrive, theconsumestep polls them and sends them to the next step. - Example
//Preceded by scenario configuration and a workflow entry point such as `start()`.
.rabbitmq() (1)
.consume {
name = "consume-battery-state" (2)
connection {
host = "localhost"
port = 5672
username = "Celia45"
password = "Columbus1963"
}
queue("battery_state") (3)
prefetchCount(10)
concurrency(1)
}
.deserialize(io.qalipsis.api.messaging.deserializer.MessageStringDeserializer::class) (4)
| 1 | Enter into the RabbitMQ namespace. |
| 2 | Set a unique step name for reporting and troubleshooting. |
| 3 | Select the RabbitMQ queue to consume. |
| 4 | Deserialize message bodies as strings using a built-in deserializer.
|
Produce step
The produce step in the RabbitMQ plugin sends messages to the RabbitMQ broker; the results are then forwarded to the next step.
- Ancestor
-
Scenario
- Functionality
-
The
producestep’s input and step context are used to generate records that are sent to the broker; results are then forwarded to the next step(s). - Example
//Preceded by scenario configuration and a workflow entry point such as `start()`.
.rabbitmq() (1)
.produce { (2)
connection { (3)
host = "localhost"
port = 5672
username = "qalipsis"
password = "qalipsis"
}
concurrency(1)
records { stepContext, input -> (4)
listOf(
RabbitMqProducerRecord(
exchange = "battery_state",
routingKey = "battery_state",
props = null,
value = input.toString().toByteArray() (5)
)
)
}
}
| 1 | Enter into the RabbitMQ namespace to access the produce step. |
| 2 | Configure the produce step. |
| 3 | Configure the connection to the RabbitMQ broker. |
| 4 | Build the list of records to publish. |
| 5 | Convert the string payload to a ByteArray, which is the required type for RabbitMqProducerRecord.value.
|
Configuration
DSL parameters
Available parameters are described in the table below.
| Parameter | Description |
|---|---|
|
Configures the connection to the RabbitMQ broker. Example
|
|
Hostname or IP address of the RabbitMQ broker. Example
|
|
TCP port the RabbitMQ broker listens on. Use Example
|
|
Username used to authenticate with the broker. Example
|
|
Password used to authenticate with the broker. Example
|
|
RabbitMQ virtual host that scopes exchanges, queues, and bindings. Allows multiple isolated environments on a single broker instance. Example
|
|
Additional client properties sent to the broker during the AMQP handshake, for example a human-readable connection name visible in the RabbitMQ management UI. Example
|
|
Defines the number of concurrent channels for consuming or producing messages. Example
|
|
Configures the factory that generates the list of records to produce for each minion execution. Example
|
|
Defines the name of the queue to consume. Example
|
|
Sets the prefetch count for the consumer, which limits the number of unacknowledged messages that can be delivered to the consumer at a time. Example
|
|
Configures how polled results are delivered to scenario minions. Example (Consume)
Refer to SingletonConfiguration parameters for details, strategies, and usage examples. |
Shared defaults for RabbitMQ steps
You can define defaults once in the scenario section or just after, and let all following RabbitMQ steps inherit them.
scenario {
rabbitmq().defaults { (1)
connection {
host = "localhost"
port = 5672
username = "qalipsis"
password = "qalipsis"
}
monitoring {
events = true
meters = true
}
}
}
| 1 | Defaults are applied to subsequent RabbitMQ steps in the same scenario. Individual steps can still override values. |