JMS plugin
Overview
The JMS plugin connects QALIPSIS to any messaging platform that implements the JMS standard, such as ActiveMQ, IBM MQ, or RabbitMQ.
- Technology addressed
-
JMS: https://www.oracle.com/java/technologies/java-message-service.html
- Dependency
-
io.qalipsis.plugin:qalipsis-plugin-jms - Namespace in scenario
-
jms() - Client library
-
JMS API: refer to Java Interface Summary.
Supported steps
Consume step
The consume step within the JMS plugin consumes records from ActiveMQ.
- Ancestor
-
Scenario
- Functionality
-
The
consumestep polls messages from the configured ActiveMQ connection, then uses the specified deserializer to convert each message into the target data type. - Example
-
jms().consume { (1) queues("battery_state") (2) queueConnection { (3) ActiveMQConnectionFactory("tcp://localhost:61616").createQueueConnection() } }.deserialize(JmsJsonDeserializer(BatteryState::class)) (4) .map { result -> (5) result.record.value (6) }1 Enter the JMS namespace and start the Consume step. 2 Specify the name of the queue to consume messages from. Calling queues()clears any previously set topics.3 Provide a QueueConnectioninstance. The lambda is called once per step execution to establish the connection.4 Deserialize each consumed javax.jms.Messageinto aBatteryStateinstance usingJmsJsonDeserializer.5 Map the JmsConsumerResultto extract only the deserialized value for downstream steps.6 result.record.valueholds the deserializedBatteryStateobject produced by the deserializer.
Produce step
- Ancestor
-
Step
- Functionality
-
The
producestep produces a batch of records into one or more JMSdestinations. The record values may be provided by previous steps. - Example
-
.jms() .produce { (1) connect { (2) ActiveMQConnectionFactory("tcp://localhost:61616").createConnection() } records { _, input -> (3) listOf( JmsProducerRecord( (4) destination = ActiveMQQueue("battery_state"), (5) messageType = JmsMessageType.BYTES, (6) value = objectMapper.writeValueAsBytes(input) (7) ) ) } }1 Enter the JMS namespace and start the Produce step. 2 Provide a javax.jms.Connectioninstance. The lambda is called once to establish the connection used for all produced records.3 Define the list of JmsProducerRecordobjects to produce. The lambda receives the step context (_, unused here) and the input value from the previous step.4 Create a JmsProducerRecord— the QALIPSIS representation of a JMS message to be sent.5 Set the destination as an ActiveMQQueue. Any concretejavax.jms.Destinationimplementation is valid here.6 Set the message type to BYTES. UseJmsMessageType.AUTOif you want the type inferred from the value’s runtime type.7 Serialize inputto a byte array using Jackson. This is passed as the message payload.
- Tip
-
If you are not sure about the
JmsMessageType, useAUTOto allow the system to infer the JMS message type from the value’s runtime type (String→TextMessage,ByteArray→BytesMessage, otherSerializable→ObjectMessage). - Reference documentation
-
Refer to the JMS documentation for further parameter and configuration information.
Configuration
DSL parameters
Available parameters are described in the table below.
| Parameter | Description |
|---|---|
|
Defines the connection factory used by the Example
|
|
Configures the ActiveMQ connection for the Example
|
|
Specifies the name(s) of the queues to consume from. Example
|
|
Provides a Example
|
|
Specifies the name(s) of the topics to consume from. Example
|
|
Provides a Example
|
|
Generates the list of Example
|
|
Defines the destination for each Example
|
|
Controls how the value of a Example
|
|
The payload of the JMS message. For Produce, this is the value passed into Example
Example (Produce)
|
|
Specifies the deserializer used to convert a consumed Example
|
Shared defaults for JMS steps
You can define defaults once in the scenario section or just after, and let all following JMS steps inherit them.
scenario {
jms().defaults { (1)
connection {
ActiveMQConnectionFactory("tcp://localhost:61616").createConnection()
}
queueConnection {
ActiveMQConnectionFactory("tcp://localhost:61616").createQueueConnection()
}
topicConnection {
ActiveMQConnectionFactory("tcp://localhost:61616").createTopicConnection()
}
monitoring {
events = true
meters = true
}
}
}
| 1 | Defaults are applied to subsequent JMS steps in the same scenario. Individual steps can still override values. |