RabbitMQ plugin

Overview

The RabbitMQ plugin connects QALIPSIS to the RabbitMQ broker.

Technology addressed

RabbitMQ: https://www.rabbitmq.com/

Dependency

io.qalipsis.plugin:qalipsis-plugin-rabbitmq

Namespace in scenario

rabbitmq()

Client library

RabbitMQ: refer to rabbitMQ

Supported steps

Consume step

The consume step within the RabbitMQ plugin consumes data from queues of the RabbitMQ broker and forwards it to the next step.

This step is commonly used with join to assert data or inject data into a workflow.

This step supports concurrent consumers through the concurrency() property. When concurrency is greater than 1, record order is no longer guaranteed.

To learn more, visit the RabbitMQ website.

Ancestor

Scenario

Functionality

The consume step starts a polling loop for consumption of the configured queue. As new records arrive, the consume step polls them and sends them to the next step.

Example
//Preceded by scenario configuration and a workflow entry point such as `start()`.
 .rabbitmq() (1)
  .consume {
    name = "consume-battery-state" (2)
    connection {
      host = "localhost"
      port = 5672
      username = "Celia45"
      password = "Columbus1963"
    }
    queue("battery_state") (3)
    prefetchCount(10)
    concurrency(1)
  }
  .deserialize(io.qalipsis.api.messaging.deserializer.MessageStringDeserializer::class) (4)
1 Enter into the RabbitMQ namespace.
2 Set a unique step name for reporting and troubleshooting.
3 Select the RabbitMQ queue to consume.
4 Deserialize message bodies as strings using a built-in deserializer.
Tip

Use the consume step with join to assert data or inject data into a workflow.

Reference documentation

Produce step

The produce step in the RabbitMQ plugin sends messages to the RabbitMQ broker; the results are then forwarded to the next step.

Ancestor

Scenario

Functionality

The produce step’s input and step context are used to generate records that are sent to the broker; results are then forwarded to the next step(s).

Example
//Preceded by scenario configuration and a workflow entry point such as `start()`.
  .rabbitmq() (1)
  .produce { (2)
    connection { (3)
      host = "localhost"
      port = 5672
      username = "qalipsis"
      password = "qalipsis"
    }
    concurrency(1)
    records { stepContext, input -> (4)
      listOf(
        RabbitMqProducerRecord(
          exchange = "battery_state",
          routingKey = "battery_state",
          props = null,
          value = input.toString().toByteArray() (5)
        )
      )
    }
  }
1 Enter into the RabbitMQ namespace to access the produce step.
2 Configure the produce step.
3 Configure the connection to the RabbitMQ broker.
4 Build the list of records to publish.
5 Convert the string payload to a ByteArray, which is the required type for RabbitMqProducerRecord.value.
Reference documentation
  • For more information about RabbitMQ connection configuration, see Connecting to RabbitMQ.

  • For more information on configuring the produce step, refer to the RabbitMQ documentation.

Configuration

DSL parameters

Available parameters are described in the table below.

Parameter Description

connection

Configures the connection to the RabbitMQ broker.
Applicable Steps: Consume, Produce
Optional/Required: Required
Data Type: Lambda within which configuration parameters are defined.
Default Value: N/A

Example
connection {
  host = "localhost"
  port = 5672
  username = "Celia45"
  password = "Columbus1963"
  virtualHost = "/"
  clientProperties = emptyMap()
}

connection.host

Hostname or IP address of the RabbitMQ broker.
Applicable Steps: Consume, Produce
Optional/Required: Optional
Data Type: String
Default Value: "localhost"

Example
connection {
  host = "localhost"
}

connection.port

TCP port the RabbitMQ broker listens on. Use 5671 for TLS connections.
Applicable Steps: Consume, Produce
Optional/Required: Optional
Data Type: Int
Default Value: 5672

Example
connection {
  port = 5672
}

connection.username

Username used to authenticate with the broker.
Applicable Steps: Consume, Produce
Optional/Required: Optional
Data Type: String
Default Value: "guest"

Example
connection {
  username = "qalipsis"
}

connection.password

Password used to authenticate with the broker.
Applicable Steps: Consume, Produce
Optional/Required: Optional
Data Type: String
Default Value: "guest"

Example
connection {
  password = "qalipsis"
}

connection.virtualHost

RabbitMQ virtual host that scopes exchanges, queues, and bindings. Allows multiple isolated environments on a single broker instance.
Applicable Steps: Consume, Produce
Optional/Required: Optional
Data Type: String
Default Value: "/"

Example
connection {
  virtualHost = "/"
}

connection.clientProperties

Additional client properties sent to the broker during the AMQP handshake, for example a human-readable connection name visible in the RabbitMQ management UI.
Applicable Steps: Consume, Produce
Optional/Required: Optional
Data Type: Map<String, Any>
Default Value: emptyMap()

Example
connection {
  clientProperties = mapOf("connection.name" to "my-consumer")
}

concurrency

Defines the number of concurrent channels for consuming or producing messages.
Applicable Steps: Consume, Produce
Optional/Required: Optional
Data Type: Int
Default Value: 1

Example
concurrency(2)

records

Configures the factory that generates the list of records to produce for each minion execution.
Applicable Step: Produce
Optional/Required: Required
Data Type: Lambda that takes the step context and input and returns List<RabbitMqProducerRecord>
Default Value: N/A

Example
records { stepContext, input ->
  listOf(
    RabbitMqProducerRecord(
      exchange = "battery_state",
      routingKey = "battery_state",
      props = null,
      value = objectMapper.writeValueAsBytes(input)
    )
  )
}

queue

Defines the name of the queue to consume.
Applicable Step: Consume
Optional/Required: Required
Data Type: String
Default Value: N/A

Example
queue("battery_state")

prefetchCount

Sets the prefetch count for the consumer, which limits the number of unacknowledged messages that can be delivered to the consumer at a time.
Applicable Step: Consume
Optional/Required: Optional
Data Type: Int
Default Value: 10

Example
prefetchCount(10)

singletonConfiguration

Configures how polled results are delivered to scenario minions.
Applicable Step: Consume
Optional/Required: Optional
Data Type: SingletonConfiguration, configured via unicast(bufferSize: Int, idleTimeout: Duration)
Default Value: SingletonType.UNICAST

Example (Consume)
unicast(bufferSize = 100, idleTimeout = Duration.ofSeconds(10))

Refer to SingletonConfiguration parameters for details, strategies, and usage examples.

Shared defaults for RabbitMQ steps

You can define defaults once in the scenario section or just after, and let all following RabbitMQ steps inherit them.

scenario {
  rabbitmq().defaults { (1)
    connection {
      host = "localhost"
      port = 5672
      username = "qalipsis"
      password = "qalipsis"
    }
    monitoring {
      events = true
      meters = true
    }
  }
}
1 Defaults are applied to subsequent RabbitMQ steps in the same scenario. Individual steps can still override values.