Jakarta EE plugin
Overview
The Jakarta EE plugin connects QALIPSIS to a messaging platform that supports Jakarta EE Messaging.
- Technology addressed
-
Jakarta EE Messaging and the products compliant with it, including Apache ActiveMQ, IBM MQ, and any other JMS 3.x-compliant broker.
- Dependency
-
io.qalipsis.plugin:qalipsis-plugin-jakarta-ee-messaging - Namespace in scenario
-
.jakarta() - Client library
-
Refer to Jakarta EE. Use the client library provided by your JMS broker (for example,
activemq-clientfor Apache ActiveMQ).
Supported steps
Consume step
The consume step polls messages from a queue or topic on a Jakarta EE-compatible broker. It is created once as a singleton reader and delivers each deserialized message individually to the next step in the scenario.
- Ancestor
-
Scenario
- Functionality
-
The
consumestep creates a singleton JMS listener. On each execution, it reads one message from the configured queue or topic, deserializes the body using the configuredJakartaDeserializer, and forwards aJakartaConsumerResultto the next step. Only queues or topics can be configured at a time — callingqueues()clears any configured topics, and vice versa. - Example
-
.jakarta().consume { (1) queues("battery_state") (2) queueConnection { (3) ActiveMQConnectionFactory( "tcp://localhost:61616", "qalipsis_user", "qalipsis_password" ).createQueueConnection() } session { connection -> (4) connection.createSession(false, Session.AUTO_ACKNOWLEDGE) } } (5) .deserialize(JakartaJsonDeserializer(targetClass = BatteryState::class)) (6) .map { result -> (7) result.record.value }1 Enter the Jakarta EE namespace and open the consumeconfiguration block.2 Declare the queue name to poll messages from. 3 Provide the factory that creates the QueueConnectionto the broker.4 Optionally override the default session; here, non-transacted mode with AUTO_ACKNOWLEDGEis set explicitly.5 Close the consumeconfiguration block..deserialize()must be chained here, outside the block.6 Decode each incoming JMS message body into a BatteryStateinstance using the JSON deserializer.7 Extract the deserialized value from JakartaConsumerResultand forward it to the next step.
- Tips
-
-
Use
queues()+queueConnection {}for point-to-point messaging; usetopics()+topicConnection {}for publish-subscribe. -
Calling
queues()automatically clears any configured topics, and callingtopics()automatically clears any configured queues. -
Pair the
consumestep with.innerJoin()to correlate consumed messages with data produced by a parallel branch. -
If the default
JakartaStringDeserializeris sufficient (plain text or raw UTF-8 bytes),.deserialize()may be omitted.
-
- Reference Documentation
-
Refer to the Jakarta EE documentation for further parameter and configuration information.
Produce step
The produce step sends a list of JakartaProducerRecord instances to queues or topics on a Jakarta EE-compatible broker. After sending, it forwards the original step input downstream wrapped in a JakartaProducerResult.
- Ancestor
-
Step
- Functionality
-
On each execution, the
recordslambda is invoked with the current step context and the upstream input to build the list of records to send. Each record is converted to the appropriate JMSMessagesubtype based on itsmessageTypefield and dispatched to the configured destination. The original input is always preserved inJakartaProducerResult.input. - Example
-
.jakarta().produce { (1) connect { (2) ActiveMQConnectionFactory("tcp://localhost:61616") .createConnection("qalipsis_user", "qalipsis_password") } session { connection -> (3) connection.createSession(false, Session.AUTO_ACKNOWLEDGE) } records { stepContext, input -> (4) listOf( JakartaProducerRecord( destination = Queue(name = "battery_state"), (5) messageType = JakartaMessageType.BYTES, (6) value = ObjectMapper().writeValueAsBytes(input) (7) ) ) } }1 Enter the Jakarta EE namespace and open the produceconfiguration block.2 Provide the factory that creates the Connectionto the broker. Anyjakarta.jms.Connectionis accepted.3 Optionally override the default session; here, non-transacted mode with AUTO_ACKNOWLEDGEis set explicitly.4 Build the list of JakartaProducerRecordinstances to send. The lambda receives the step context and the upstream input.5 Set the destination to the queue named battery_state. UseTopic(name = "…")to publish to a topic instead.6 Set the message type to BYTESso the payload is sent as a JMSBytesMessage.7 Serialize the input object to a byte array and assign it as the message payload.
- Tips
-
-
Use
JakartaMessageType.AUTOif you are unsure of the correct message type. The producer will infer the appropriate JMSMessagesubtype from the runtime type ofvalue. -
Use
producers(n)to run multiple concurrent producer threads and increase throughput. -
The original input value is always available on
JakartaProducerResult.inputafter the produce step, and can be extracted with.map { it.input }.
-
- Reference Documentation
-
Refer to the Jakarta EE documentation for further parameter and configuration information.
Configuration
DSL parameters
Available parameters are described in the table below.
| Parameter | Description |
|---|---|
|
Configures the connection to the Jakarta EE-compatible broker. Example
|
|
Declares the name(s) of the queues to consume messages from. Example
|
|
Provides the factory that creates the Example
|
|
Declares the name(s) of the topics to subscribe to and consume messages from. Example
|
|
Provides the factory that creates the Example
|
|
Builds the list of Example
|
|
Defines the target destination for a Example
|
|
Specifies the JMS message subtype to use when sending the record. Example
|
|
The payload to include in the JMS message. When Example
|
|
Overrides the default JMS session created for the step. The lambda receives the active Example
|
|
Sets the number of concurrent producer threads. Increase this value to scale message throughput. Example
|
|
Specifies the deserializer used to decode the body of each incoming JMS message. This method is chained on the result of Example
|
|
Enables collection of step-level metrics such as consumed or produced byte counts and record counts. Example
|
|
Configures how the singleton consumer delivers messages to scenario minions. |
Result types
Consume step result: JakartaConsumerResult<V>
The consume step wraps each received message in a JakartaConsumerResult<V>. Access its properties as shown below.
| Property | Type | Description |
|---|---|---|
|
|
The deserialized message record. |
|
|
The deserialized message body. |
|
|
The JMS destination from which the message was received. |
|
|
The consumer-relative offset of the message. |
|
|
The JMS message ID assigned by the broker. |
|
|
The JMS correlation ID, if set. |
|
|
The JMS priority (0–9) of the message. |
|
|
The message expiration timestamp in milliseconds since the Unix epoch. |
|
|
The time at which the message was handed to the broker, in milliseconds since the Unix epoch. |
|
|
Execution metrics for the consumer step. |
|
|
Total number of bytes consumed in this execution. |
Produce step result: JakartaProducerResult<I>
The produce step wraps its output in a JakartaProducerResult<I>. Access its properties as shown below.
| Property | Type | Description |
|---|---|---|
|
|
The original input received from the previous step, passed through unchanged. |
|
|
Execution metrics for the producer step. |
|
|
The number of records that were submitted for sending. |
|
|
The number of records successfully sent. |
|
|
Total number of bytes sent. |
Shared defaults for Jakarta EE steps
You can define defaults once in the scenario section or just after, and let all following Jakarta EE steps inherit them.
scenario {
jakarta().defaults { (1)
connection {
ActiveMQConnectionFactory("tcp://localhost:61616").createConnection()
}
queueConnection {
ActiveMQConnectionFactory("tcp://localhost:61616").createQueueConnection()
}
topicConnection {
ActiveMQConnectionFactory("tcp://localhost:61616").createTopicConnection()
}
session { connection ->
connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
}
monitoring {
events = true
meters = true
}
}
}
| 1 | Defaults are applied to subsequent Jakarta EE steps in the same scenario. Individual steps can still override values. |