Jakarta EE plugin

Overview

The Jakarta EE plugin connects QALIPSIS to a messaging platform that supports Jakarta EE Messaging.

Technology addressed

Jakarta EE Messaging and the products compliant with it, including Apache ActiveMQ, IBM MQ, and any other JMS 3.x-compliant broker.

Dependency

io.qalipsis.plugin:qalipsis-plugin-jakarta-ee-messaging

Namespace in scenario

.jakarta()

Client library

Refer to Jakarta EE. Use the client library provided by your JMS broker (for example, activemq-client for Apache ActiveMQ).

Supported steps

Consume step

The consume step polls messages from a queue or topic on a Jakarta EE-compatible broker. It is created once as a singleton reader and delivers each deserialized message individually to the next step in the scenario.

Ancestor

Scenario

Functionality

The consume step creates a singleton JMS listener. On each execution, it reads one message from the configured queue or topic, deserializes the body using the configured JakartaDeserializer, and forwards a JakartaConsumerResult to the next step. Only queues or topics can be configured at a time — calling queues() clears any configured topics, and vice versa.

Example
.jakarta().consume { (1)
    queues("battery_state") (2)
    queueConnection { (3)
        ActiveMQConnectionFactory(
            "tcp://localhost:61616",
            "qalipsis_user",
            "qalipsis_password"
        ).createQueueConnection()
    }
    session { connection -> (4)
        connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    }
} (5)
.deserialize(JakartaJsonDeserializer(targetClass = BatteryState::class)) (6)
.map { result -> (7)
    result.record.value
}
1 Enter the Jakarta EE namespace and open the consume configuration block.
2 Declare the queue name to poll messages from.
3 Provide the factory that creates the QueueConnection to the broker.
4 Optionally override the default session; here, non-transacted mode with AUTO_ACKNOWLEDGE is set explicitly.
5 Close the consume configuration block. .deserialize() must be chained here, outside the block.
6 Decode each incoming JMS message body into a BatteryState instance using the JSON deserializer.
7 Extract the deserialized value from JakartaConsumerResult and forward it to the next step.
Tips
  • Use queues() + queueConnection {} for point-to-point messaging; use topics() + topicConnection {} for publish-subscribe.

  • Calling queues() automatically clears any configured topics, and calling topics() automatically clears any configured queues.

  • Pair the consume step with .innerJoin() to correlate consumed messages with data produced by a parallel branch.

  • If the default JakartaStringDeserializer is sufficient (plain text or raw UTF-8 bytes), .deserialize() may be omitted.

Reference Documentation

Refer to the Jakarta EE documentation for further parameter and configuration information.

Produce step

The produce step sends a list of JakartaProducerRecord instances to queues or topics on a Jakarta EE-compatible broker. After sending, it forwards the original step input downstream wrapped in a JakartaProducerResult.

Ancestor

Step

Functionality

On each execution, the records lambda is invoked with the current step context and the upstream input to build the list of records to send. Each record is converted to the appropriate JMS Message subtype based on its messageType field and dispatched to the configured destination. The original input is always preserved in JakartaProducerResult.input.

Example
.jakarta().produce { (1)
    connect { (2)
        ActiveMQConnectionFactory("tcp://localhost:61616")
            .createConnection("qalipsis_user", "qalipsis_password")
    }
    session { connection -> (3)
        connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    }
    records { stepContext, input -> (4)
        listOf(
            JakartaProducerRecord(
                destination = Queue(name = "battery_state"), (5)
                messageType = JakartaMessageType.BYTES, (6)
                value = ObjectMapper().writeValueAsBytes(input) (7)
            )
        )
    }
}
1 Enter the Jakarta EE namespace and open the produce configuration block.
2 Provide the factory that creates the Connection to the broker. Any jakarta.jms.Connection is accepted.
3 Optionally override the default session; here, non-transacted mode with AUTO_ACKNOWLEDGE is set explicitly.
4 Build the list of JakartaProducerRecord instances to send. The lambda receives the step context and the upstream input.
5 Set the destination to the queue named battery_state. Use Topic(name = "…​") to publish to a topic instead.
6 Set the message type to BYTES so the payload is sent as a JMS BytesMessage.
7 Serialize the input object to a byte array and assign it as the message payload.
Tips
  • Use JakartaMessageType.AUTO if you are unsure of the correct message type. The producer will infer the appropriate JMS Message subtype from the runtime type of value.

  • Use producers(n) to run multiple concurrent producer threads and increase throughput.

  • The original input value is always available on JakartaProducerResult.input after the produce step, and can be extracted with .map { it.input }.

Reference Documentation

Refer to the Jakarta EE documentation for further parameter and configuration information.

Configuration

DSL parameters

Available parameters are described in the table below.

Parameter Description

connect

Configures the connection to the Jakarta EE-compatible broker.
Applicable Step: Produce
Optional/Required: Required
Data Type: Lambda within which the connection parameters are defined
Default Value: N/A

Example
connect {
    ActiveMQConnectionFactory(
        "tcp://localhost:61616",
        "qalipsis_user",
        "qalipsis_password"
    ).createQueueConnection()
}

queues

Declares the name(s) of the queues to consume messages from.
Applicable Step: Consume
Optional/Required: Required (if queueConnection is not configured)
Data Type: vararg String
Default Value: N/A

Example
queues("battery_state")

queueConnection

Provides the factory that creates the QueueConnection used by the consumer.
Applicable Step: Consume
Optional/Required: Required
Data Type: Lambda within which connection parameters are defined.
Default Value: N/A

Example
queueConnection {
    ActiveMQConnectionFactory(
        "tcp://localhost:61616",
        "qalipsis_user",
        "qalipsis_password"
    ).createQueueConnection()
}

topics

Declares the name(s) of the topics to subscribe to and consume messages from.
Applicable Step: Consume
Optional/Required: Required (if queueConnection is not configured)
Data Type: vararg String
Default Value: N/A

Example
topics("battery_state")

topicConnection

Provides the factory that creates the TopicConnection used by the consumer.
Applicable Step: Consume
Optional/Required: Required (if topics is configured)
Data Type: Lambda within which connection parameters are defined.
Default Value: N/A

Example
topicConnection {
    ActiveMQConnectionFactory(
        "tcp://localhost:61616",
        "qalipsis_user",
        "qalipsis_password"
    ).createTopicConnection()
}

records

Builds the list of JakartaProducerRecord instances to send to the broker.
Applicable Step: Produce
Optional/Required: Required
Data Type: Lambda that receives the step context and the upstream input, and returns a List<JakartaProducerRecord>
Default Value: N/A

Example
records { stepContext, input ->
    listOf(
        JakartaProducerRecord(
            destination = Queue(name = "battery_state"),
            messageType = JakartaMessageType.BYTES,
            value = objectMapper.writeValueAsBytes(input)
        )
    )
}

destination

Defines the target destination for a JakartaProducerRecord. Use Queue(name = "…​") to send to a queue or Topic(name = "…​") to publish to a topic.
Applicable Step: Produce
Optional/Required: Required
Data Type: Destination — either Queue or Topic
Default Value: N/A

Example
destination = Queue(name = "battery_state")
destination = Topic(name = "battery_state")

messageType

Specifies the JMS message subtype to use when sending the record. AUTO instructs the producer to infer the correct subtype from the runtime type of value.
Applicable Step: Produce
Optional/Required: Optional
Data Type: JakartaMessageType (enum)— one of AUTO, TEXT, BYTES, or OBJECT
Default Value: JakartaMessageType.AUTO

Example
messageType = JakartaMessageType.BYTES

value

The payload to include in the JMS message. When messageType is BYTES, pass a ByteArray; when TEXT, pass a String; when OBJECT, pass a Serializable object.
Applicable Step: Produce
Optional/Required: Required
Data Type: ByteArray for BYTES, String for TEXT, or any Serializable object for OBJECT
Default Value: N/A

Example
value = objectMapper.writeValueAsBytes(input)

session

Overrides the default JMS session created for the step. The lambda receives the active Connection and must return a configured Session.
Applicable Steps: Produce, Consume
Optional/Required: Optional
Data Type: Lambda that receives a Connection and returns a Session
Default Value: { connection → connection.createSession() }

Example
session { connection ->
    connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
}

producers

Sets the number of concurrent producer threads. Increase this value to scale message throughput.
Applicable Step: Produce
Optional/Required: Optional
Data Type: Int
Default Value: 1

Example
producers(3)

deserialize

Specifies the deserializer used to decode the body of each incoming JMS message. This method is chained on the result of consume {}, outside the configuration block. Three overloads are available: pass an instance, a KClass, or a fully-qualified class name as a String.
Applicable Step: Consume
Optional/Required: Optional
Data Type:
Instance: JakartaDeserializer<V>
Class reference: KClass<out JakartaDeserializer<V>>
Class name: String (fully-qualified)
Default Value: JakartaStringDeserializer (decodes TextMessage and BytesMessage as UTF-8 strings)

Example
.deserialize(JakartaJsonDeserializer(targetClass = BatteryState::class))

.deserialize(JakartaJsonDeserializer::class)

.deserialize("io.qalipsis.plugins.jakarta.deserializer.JakartaStringDeserializer")

monitoring

Enables collection of step-level metrics such as consumed or produced byte counts and record counts.
Applicable Steps: Produce, Consume
Optional/Required: Optional
Data Type: Lambda with receiver StepMonitoringConfiguration
Default Value: Monitoring disabled

Example
monitoring {
    events = true
    meters = true
}

singletonConfiguration

Configures how the singleton consumer delivers messages to scenario minions.
Refer to SingletonConfiguration parameters for details on strategies and examples of usage.

Result types

Consume step result: JakartaConsumerResult<V>

The consume step wraps each received message in a JakartaConsumerResult<V>. Access its properties as shown below.

Property Type Description

record

JakartaConsumerRecord<V>

The deserialized message record.

record.value

V

The deserialized message body.

record.destination

jakarta.jms.Destination?

The JMS destination from which the message was received.

record.offset

Long?

The consumer-relative offset of the message.

record.messageId

String?

The JMS message ID assigned by the broker.

record.correlationId

String?

The JMS correlation ID, if set.

record.priority

Int?

The JMS priority (0–9) of the message.

record.expiration

Long?

The message expiration timestamp in milliseconds since the Unix epoch.

record.timestamp

Long?

The time at which the message was handed to the broker, in milliseconds since the Unix epoch.

meters

JakartaConsumerMeters

Execution metrics for the consumer step.

meters.consumedBytes

Int

Total number of bytes consumed in this execution.

Produce step result: JakartaProducerResult<I>

The produce step wraps its output in a JakartaProducerResult<I>. Access its properties as shown below.

Property Type Description

input

I

The original input received from the previous step, passed through unchanged.

meters

JakartaProducerMeters

Execution metrics for the producer step.

meters.recordsToProduce

Int

The number of records that were submitted for sending.

meters.producedRecords

Int

The number of records successfully sent.

meters.producedBytes

Int

Total number of bytes sent.

Shared defaults for Jakarta EE steps

You can define defaults once in the scenario section or just after, and let all following Jakarta EE steps inherit them.

scenario {
  jakarta().defaults { (1)
    connection {
      ActiveMQConnectionFactory("tcp://localhost:61616").createConnection()
    }
    queueConnection {
      ActiveMQConnectionFactory("tcp://localhost:61616").createQueueConnection()
    }
    topicConnection {
      ActiveMQConnectionFactory("tcp://localhost:61616").createTopicConnection()
    }
    session { connection ->
      connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    }
    monitoring {
      events = true
      meters = true
    }
  }
}
1 Defaults are applied to subsequent Jakarta EE steps in the same scenario. Individual steps can still override values.