JMS plugin

Overview

The JMS plugin connects QALIPSIS to any messaging platform that implements the JMS standard, such as ActiveMQ, IBM MQ, or RabbitMQ.

Technology addressed

JMS: https://www.oracle.com/java/technologies/java-message-service.html

Dependency

io.qalipsis.plugin:qalipsis-plugin-jms

Namespace in scenario

jms()

Client library

JMS API: refer to Java Interface Summary.

Supported steps

Consume step

The consume step within the JMS plugin consumes records from ActiveMQ.

Ancestor

Scenario

Functionality

The consume step polls messages from the configured ActiveMQ connection, then uses the specified deserializer to convert each message into the target data type.

Example
jms().consume { (1)
    queues("battery_state") (2)
    queueConnection { (3)
        ActiveMQConnectionFactory("tcp://localhost:61616").createQueueConnection()
    }
}.deserialize(JmsJsonDeserializer(BatteryState::class)) (4)
 .map { result -> (5)
    result.record.value (6)
}
1 Enter the JMS namespace and start the Consume step.
2 Specify the name of the queue to consume messages from. Calling queues() clears any previously set topics.
3 Provide a QueueConnection instance. The lambda is called once per step execution to establish the connection.
4 Deserialize each consumed javax.jms.Message into a BatteryState instance using JmsJsonDeserializer.
5 Map the JmsConsumerResult to extract only the deserialized value for downstream steps.
6 result.record.value holds the deserialized BatteryState object produced by the deserializer.
Tip

The ActiveMQ connection can be configured for either queues (QueueConnection) or topics (TopicConnection), but not both.

Reference documentation

Refer to the JMS documentation for further parameter and configuration information.

Produce step

The produce step within the JMS plugin produces records in ActiveMQ using JMS.

Ancestor

Step

Functionality

The produce step produces a batch of records into one or more JMS destinations. The record values may be provided by previous steps.

Example
.jms()
.produce { (1)
    connect { (2)
        ActiveMQConnectionFactory("tcp://localhost:61616").createConnection()
    }
    records { _, input -> (3)
        listOf(
            JmsProducerRecord( (4)
                destination = ActiveMQQueue("battery_state"), (5)
                messageType = JmsMessageType.BYTES, (6)
                value = objectMapper.writeValueAsBytes(input) (7)
            )
        )
    }
}
1 Enter the JMS namespace and start the Produce step.
2 Provide a javax.jms.Connection instance. The lambda is called once to establish the connection used for all produced records.
3 Define the list of JmsProducerRecord objects to produce. The lambda receives the step context (_, unused here) and the input value from the previous step.
4 Create a JmsProducerRecord — the QALIPSIS representation of a JMS message to be sent.
5 Set the destination as an ActiveMQQueue. Any concrete javax.jms.Destination implementation is valid here.
6 Set the message type to BYTES. Use JmsMessageType.AUTO if you want the type inferred from the value’s runtime type.
7 Serialize input to a byte array using Jackson. This is passed as the message payload.
Tip

If you are not sure about the JmsMessageType, use AUTO to allow the system to infer the JMS message type from the value’s runtime type (StringTextMessage, ByteArrayBytesMessage, other SerializableObjectMessage).

Reference documentation

Refer to the JMS documentation for further parameter and configuration information.

Configuration

DSL parameters

Available parameters are described in the table below.

Parameter Description

connect

Defines the connection factory used by the produce step to establish a JMS connection.
Applicable Step: Produce
Optional/Required: Required
Data Type: Lambda within which connection parameters are defined.
Default Value: N/A

Example
connect {
    ActiveMQConnectionFactory("tcp://localhost:61616").createConnection()
}

consume

Configures the ActiveMQ connection for the consume step.
Applicable Step: Consume
Optional/Required: Required
Data Type: Lambda within which connection parameters are defined.
Default Value: N/A

Example
jms().consume {
    queues("battery_state")
    queueConnection {
        ActiveMQConnectionFactory("tcp://localhost:61616").createQueueConnection()
    }
}

queues

Specifies the name(s) of the queues to consume from.
Applicable Step: Consume
Optional/Required: Required
Data Type: String (vararg)
Default Value: N/A

Example
queues("battery_state")

queueConnection

Provides a javax.jms.QueueConnection instance for consuming from the specified queues. Setting this clears any previously set topicConnection.
Applicable Step: Consume
Optional/Required: Optional
Data Type: Lambda within which connection parameters are defined; returns a javax.jms.QueueConnection.
Default Value: N/A

Example
queueConnection {
    ActiveMQConnectionFactory(
      "tcp://localhost:61616",
      "qalipsis_user",
      "qalipsis_password"
    )
}

topics

Specifies the name(s) of the topics to consume from.
Applicable Step: Consume
Optional/Required: Required
Data Type: String (vararg)
Default Value: N/A

Example
topics("battery_state")

topicConnection

Provides a javax.jms.TopicConnection instance for consuming from the specified topics. Setting this clears any previously set queueConnection.
Applicable Step: Consume
Optional/Required: Required
Data Type: Lambda within which connection parameters are defined; javax.jms.TopicConnection.
Default Value: N/A

Example
topicConnection {
  ActiveMQConnectionFactory(
    "tcp://localhost:61616",
    "qalipsis_user",
    "qalipsis_password"
  )
}

records

Generates the list of JmsProducerRecord instances to send to the messaging platform.
Applicable Step: Produce
Optional/Required: Required
Data Type: Lambda that receives the step context and the input value, and returns a List<JmsProducerRecord>.
Default Value: N/A

Example
records { stepContext, input ->
    listOf(
        JmsProducerRecord(
            destination = ActiveMQQueue("battery_state"),
            messageType = JmsMessageType.BYTES,
            value = ObjectMapper().writeValueAsBytes(input)
        )
    )
}

destination

Defines the destination for each JmsProducerRecord as either a queue or a topic. The value must be a concrete implementation of javax.jms.Destination.
Applicable Step: Produce
Optional/Required: Required
Data Type: javax.jms.Destination
Default Value: N/A

Example
destination = ActiveMQQueue("battery_state")

messageType

Controls how the value of a JmsProducerRecord is converted into a native JMS Message.
Applicable Step: Produce
Optional/Required: Optional
Data Type: JmsMessageType enum — one of TEXT, BYTES, OBJECT, AUTO
Default Value: JmsMessageType.AUTO

Example
messageType = JmsMessageType.BYTES

value

The payload of the JMS message. For Produce, this is the value passed into JmsProducerRecord. For Consume, this is the deserialized message body exposed on JmsConsumerRecord<T>.
Applicable Steps: Consume, Produce
Optional/Required: Required
Data Type:
Consume: the type parameter of JmsConsumerRecord<T>, determined by the configured deserializer
Produce: any value; must be compatible with the chosen JmsMessageType (String for TEXT, ByteArray for BYTES, Serializable for OBJECT)
Default Value: N/A

Example
.map { result ->
    result.record.value  // type is BatteryState, as resolved by JmsJsonDeserializer(BatteryState::class)
}
Example (Produce)
value = ObjectMapper().writeValueAsBytes(input)  //ByteArray, used with JmsMessageType.BYTES.

deserialize

Specifies the deserializer used to convert a consumed javax.jms.Message body into a typed value. Three overloads are available: pass a fully-qualified class name (String), a KClass reference, or a pre-built JmsDeserializer<V> instance.
Applicable Step: Consume
Optional/Required: Required
Data Type: Function overloads accepting either a String (fully-qualified class name), a KClass reference, or a pre-built JmsDeserializer<V> instance. The deserializer must be configured with the target type (e.g., BatteryState) as the type parameter V.
Default Value: JmsStringDeserializer (applied automatically when using jms().consume {})

Example
.deserialize(JmsJsonDeserializer(BatteryState::class))

.deserialize<String>("io.qalipsis.plugins.jms.deserializer.JmsStringDeserializer")

.deserialize(JmsJsonDeserializer(BatteryState::class) {
    disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
})

Shared defaults for JMS steps

You can define defaults once in the scenario section or just after, and let all following JMS steps inherit them.

scenario {
  jms().defaults { (1)
    connection {
      ActiveMQConnectionFactory("tcp://localhost:61616").createConnection()
    }
    queueConnection {
      ActiveMQConnectionFactory("tcp://localhost:61616").createQueueConnection()
    }
    topicConnection {
      ActiveMQConnectionFactory("tcp://localhost:61616").createTopicConnection()
    }
    monitoring {
      events = true
      meters = true
    }
  }
}
1 Defaults are applied to subsequent JMS steps in the same scenario. Individual steps can still override values.